// Copyright (c) 2013, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #ifndef ROCKSDB_LITE #include "utilities/persistent_cache/block_cache_tier_file.h" #ifndef OS_WIN #include <unistd.h> #endif #include <functional> #include <memory> #include <vector> #include "env/composite_env_wrapper.h" #include "logging/logging.h" #include "port/port.h" #include "util/crc32c.h" namespace rocksdb { // // File creation factories // Status NewWritableCacheFile(Env* const env, const std::string& filepath, std::unique_ptr<WritableFile>* file, const bool use_direct_writes = false) { EnvOptions opt; opt.use_direct_writes = use_direct_writes; Status s = env->NewWritableFile(filepath, file, opt); return s; } Status NewRandomAccessCacheFile(Env* const env, const std::string& filepath, std::unique_ptr<RandomAccessFile>* file, const bool use_direct_reads = true) { assert(env); EnvOptions opt; opt.use_direct_reads = use_direct_reads; Status s = env->NewRandomAccessFile(filepath, file, opt); return s; } // // BlockCacheFile // Status BlockCacheFile::Delete(uint64_t* size) { assert(env_); Status status = env_->GetFileSize(Path(), size); if (!status.ok()) { return status; } return env_->DeleteFile(Path()); } // // CacheRecord // // Cache record represents the record on disk // // +--------+---------+----------+------------+---------------+-------------+ // | magic | crc | key size | value size | key data | value data | // +--------+---------+----------+------------+---------------+-------------+ // <-- 4 --><-- 4 --><-- 4 --><-- 4 --><-- key size --><-- v-size --> // struct CacheRecordHeader { CacheRecordHeader() : magic_(0), crc_(0), key_size_(0), val_size_(0) {} CacheRecordHeader(const uint32_t magic, const uint32_t key_size, const uint32_t val_size) : magic_(magic), crc_(0), key_size_(key_size), val_size_(val_size) {} uint32_t magic_; uint32_t crc_; uint32_t key_size_; uint32_t val_size_; }; struct CacheRecord { CacheRecord() {} CacheRecord(const Slice& key, const Slice& val) : hdr_(MAGIC, static_cast<uint32_t>(key.size()), static_cast<uint32_t>(val.size())), key_(key), val_(val) { hdr_.crc_ = ComputeCRC(); } uint32_t ComputeCRC() const; bool Serialize(std::vector<CacheWriteBuffer*>* bufs, size_t* woff); bool Deserialize(const Slice& buf); static uint32_t CalcSize(const Slice& key, const Slice& val) { return static_cast<uint32_t>(sizeof(CacheRecordHeader) + key.size() + val.size()); } static const uint32_t MAGIC = 0xfefa; bool Append(std::vector<CacheWriteBuffer*>* bufs, size_t* woff, const char* data, const size_t size); CacheRecordHeader hdr_; Slice key_; Slice val_; }; static_assert(sizeof(CacheRecordHeader) == 16, "DataHeader is not aligned"); uint32_t CacheRecord::ComputeCRC() const { uint32_t crc = 0; CacheRecordHeader tmp = hdr_; tmp.crc_ = 0; crc = crc32c::Extend(crc, reinterpret_cast<const char*>(&tmp), sizeof(tmp)); crc = crc32c::Extend(crc, reinterpret_cast<const char*>(key_.data()), key_.size()); crc = crc32c::Extend(crc, reinterpret_cast<const char*>(val_.data()), val_.size()); return crc; } bool CacheRecord::Serialize(std::vector<CacheWriteBuffer*>* bufs, size_t* woff) { assert(bufs->size()); return Append(bufs, woff, reinterpret_cast<const char*>(&hdr_), sizeof(hdr_)) && Append(bufs, woff, reinterpret_cast<const char*>(key_.data()), key_.size()) && Append(bufs, woff, reinterpret_cast<const char*>(val_.data()), val_.size()); } bool CacheRecord::Append(std::vector<CacheWriteBuffer*>* bufs, size_t* woff, const char* data, const size_t data_size) { assert(*woff < bufs->size()); const char* p = data; size_t size = data_size; while (size && *woff < bufs->size()) { CacheWriteBuffer* buf = (*bufs)[*woff]; const size_t free = buf->Free(); if (size <= free) { buf->Append(p, size); size = 0; } else { buf->Append(p, free); p += free; size -= free; assert(!buf->Free()); assert(buf->Used() == buf->Capacity()); } if (!buf->Free()) { *woff += 1; } } assert(!size); return !size; } bool CacheRecord::Deserialize(const Slice& data) { assert(data.size() >= sizeof(CacheRecordHeader)); if (data.size() < sizeof(CacheRecordHeader)) { return false; } memcpy(&hdr_, data.data(), sizeof(hdr_)); assert(hdr_.key_size_ + hdr_.val_size_ + sizeof(hdr_) == data.size()); if (hdr_.key_size_ + hdr_.val_size_ + sizeof(hdr_) != data.size()) { return false; } key_ = Slice(data.data_ + sizeof(hdr_), hdr_.key_size_); val_ = Slice(key_.data_ + hdr_.key_size_, hdr_.val_size_); if (!(hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_)) { fprintf(stderr, "** magic %d ** \n", hdr_.magic_); fprintf(stderr, "** key_size %d ** \n", hdr_.key_size_); fprintf(stderr, "** val_size %d ** \n", hdr_.val_size_); fprintf(stderr, "** key %s ** \n", key_.ToString().c_str()); fprintf(stderr, "** val %s ** \n", val_.ToString().c_str()); for (size_t i = 0; i < hdr_.val_size_; ++i) { fprintf(stderr, "%d.", (uint8_t)val_.data()[i]); } fprintf(stderr, "\n** cksum %d != %d **", hdr_.crc_, ComputeCRC()); } assert(hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_); return hdr_.magic_ == MAGIC && ComputeCRC() == hdr_.crc_; } // // RandomAccessFile // bool RandomAccessCacheFile::Open(const bool enable_direct_reads) { WriteLock _(&rwlock_); return OpenImpl(enable_direct_reads); } bool RandomAccessCacheFile::OpenImpl(const bool enable_direct_reads) { rwlock_.AssertHeld(); ROCKS_LOG_DEBUG(log_, "Opening cache file %s", Path().c_str()); std::unique_ptr<RandomAccessFile> file; Status status = NewRandomAccessCacheFile(env_, Path(), &file, enable_direct_reads); if (!status.ok()) { Error(log_, "Error opening random access file %s. %s", Path().c_str(), status.ToString().c_str()); return false; } freader_.reset(new RandomAccessFileReader( NewLegacyRandomAccessFileWrapper(file), Path(), env_)); return true; } bool RandomAccessCacheFile::Read(const LBA& lba, Slice* key, Slice* val, char* scratch) { ReadLock _(&rwlock_); assert(lba.cache_id_ == cache_id_); if (!freader_) { return false; } Slice result; Status s = freader_->Read(lba.off_, lba.size_, &result, scratch); if (!s.ok()) { Error(log_, "Error reading from file %s. %s", Path().c_str(), s.ToString().c_str()); return false; } assert(result.data() == scratch); return ParseRec(lba, key, val, scratch); } bool RandomAccessCacheFile::ParseRec(const LBA& lba, Slice* key, Slice* val, char* scratch) { Slice data(scratch, lba.size_); CacheRecord rec; if (!rec.Deserialize(data)) { assert(!"Error deserializing data"); Error(log_, "Error de-serializing record from file %s off %d", Path().c_str(), lba.off_); return false; } *key = Slice(rec.key_); *val = Slice(rec.val_); return true; } // // WriteableCacheFile // WriteableCacheFile::~WriteableCacheFile() { WriteLock _(&rwlock_); if (!eof_) { // This file never flushed. We give priority to shutdown since this is a // cache // TODO(krad): Figure a way to flush the pending data if (file_) { assert(refs_ == 1); --refs_; } } assert(!refs_); ClearBuffers(); } bool WriteableCacheFile::Create(const bool /*enable_direct_writes*/, const bool enable_direct_reads) { WriteLock _(&rwlock_); enable_direct_reads_ = enable_direct_reads; ROCKS_LOG_DEBUG(log_, "Creating new cache %s (max size is %d B)", Path().c_str(), max_size_); assert(env_); Status s = env_->FileExists(Path()); if (s.ok()) { ROCKS_LOG_WARN(log_, "File %s already exists. %s", Path().c_str(), s.ToString().c_str()); } s = NewWritableCacheFile(env_, Path(), &file_); if (!s.ok()) { ROCKS_LOG_WARN(log_, "Unable to create file %s. %s", Path().c_str(), s.ToString().c_str()); return false; } assert(!refs_); ++refs_; return true; } bool WriteableCacheFile::Append(const Slice& key, const Slice& val, LBA* lba) { WriteLock _(&rwlock_); if (eof_) { // We can't append since the file is full return false; } // estimate the space required to store the (key, val) uint32_t rec_size = CacheRecord::CalcSize(key, val); if (!ExpandBuffer(rec_size)) { // unable to expand the buffer ROCKS_LOG_DEBUG(log_, "Error expanding buffers. size=%d", rec_size); return false; } lba->cache_id_ = cache_id_; lba->off_ = disk_woff_; lba->size_ = rec_size; CacheRecord rec(key, val); if (!rec.Serialize(&bufs_, &buf_woff_)) { // unexpected error: unable to serialize the data assert(!"Error serializing record"); return false; } disk_woff_ += rec_size; eof_ = disk_woff_ >= max_size_; // dispatch buffer for flush DispatchBuffer(); return true; } bool WriteableCacheFile::ExpandBuffer(const size_t size) { rwlock_.AssertHeld(); assert(!eof_); // determine if there is enough space size_t free = 0; // compute the free space left in buffer for (size_t i = buf_woff_; i < bufs_.size(); ++i) { free += bufs_[i]->Free(); if (size <= free) { // we have enough space in the buffer return true; } } // expand the buffer until there is enough space to write `size` bytes assert(free < size); assert(alloc_); while (free < size) { CacheWriteBuffer* const buf = alloc_->Allocate(); if (!buf) { ROCKS_LOG_DEBUG(log_, "Unable to allocate buffers"); return false; } size_ += static_cast<uint32_t>(buf->Free()); free += buf->Free(); bufs_.push_back(buf); } assert(free >= size); return true; } void WriteableCacheFile::DispatchBuffer() { rwlock_.AssertHeld(); assert(bufs_.size()); assert(buf_doff_ <= buf_woff_); assert(buf_woff_ <= bufs_.size()); if (pending_ios_) { return; } if (!eof_ && buf_doff_ == buf_woff_) { // dispatch buffer is pointing to write buffer and we haven't hit eof return; } assert(eof_ || buf_doff_ < buf_woff_); assert(buf_doff_ < bufs_.size()); assert(file_); assert(alloc_); auto* buf = bufs_[buf_doff_]; const uint64_t file_off = buf_doff_ * alloc_->BufferSize(); assert(!buf->Free() || (eof_ && buf_doff_ == buf_woff_ && buf_woff_ < bufs_.size())); // we have reached end of file, and there is space in the last buffer // pad it with zero for direct IO buf->FillTrailingZeros(); assert(buf->Used() % kFileAlignmentSize == 0); writer_->Write(file_.get(), buf, file_off, std::bind(&WriteableCacheFile::BufferWriteDone, this)); pending_ios_++; buf_doff_++; } void WriteableCacheFile::BufferWriteDone() { WriteLock _(&rwlock_); assert(bufs_.size()); pending_ios_--; if (buf_doff_ < bufs_.size()) { DispatchBuffer(); } if (eof_ && buf_doff_ >= bufs_.size() && !pending_ios_) { // end-of-file reached, move to read mode CloseAndOpenForReading(); } } void WriteableCacheFile::CloseAndOpenForReading() { // Our env abstraction do not allow reading from a file opened for appending // We need close the file and re-open it for reading Close(); RandomAccessCacheFile::OpenImpl(enable_direct_reads_); } bool WriteableCacheFile::ReadBuffer(const LBA& lba, Slice* key, Slice* block, char* scratch) { rwlock_.AssertHeld(); if (!ReadBuffer(lba, scratch)) { Error(log_, "Error reading from buffer. cache=%d off=%d", cache_id_, lba.off_); return false; } return ParseRec(lba, key, block, scratch); } bool WriteableCacheFile::ReadBuffer(const LBA& lba, char* data) { rwlock_.AssertHeld(); assert(lba.off_ < disk_woff_); assert(alloc_); // we read from the buffers like reading from a flat file. The list of buffers // are treated as contiguous stream of data char* tmp = data; size_t pending_nbytes = lba.size_; // start buffer size_t start_idx = lba.off_ / alloc_->BufferSize(); // offset into the start buffer size_t start_off = lba.off_ % alloc_->BufferSize(); assert(start_idx <= buf_woff_); for (size_t i = start_idx; pending_nbytes && i < bufs_.size(); ++i) { assert(i <= buf_woff_); auto* buf = bufs_[i]; assert(i == buf_woff_ || !buf->Free()); // bytes to write to the buffer size_t nbytes = pending_nbytes > (buf->Used() - start_off) ? (buf->Used() - start_off) : pending_nbytes; memcpy(tmp, buf->Data() + start_off, nbytes); // left over to be written pending_nbytes -= nbytes; start_off = 0; tmp += nbytes; } assert(!pending_nbytes); if (pending_nbytes) { return false; } assert(tmp == data + lba.size_); return true; } void WriteableCacheFile::Close() { rwlock_.AssertHeld(); assert(size_ >= max_size_); assert(disk_woff_ >= max_size_); assert(buf_doff_ == bufs_.size()); assert(bufs_.size() - buf_woff_ <= 1); assert(!pending_ios_); Info(log_, "Closing file %s. size=%d written=%d", Path().c_str(), size_, disk_woff_); ClearBuffers(); file_.reset(); assert(refs_); --refs_; } void WriteableCacheFile::ClearBuffers() { assert(alloc_); for (size_t i = 0; i < bufs_.size(); ++i) { alloc_->Deallocate(bufs_[i]); } bufs_.clear(); } // // ThreadedFileWriter implementation // ThreadedWriter::ThreadedWriter(PersistentCacheTier* const cache, const size_t qdepth, const size_t io_size) : Writer(cache), io_size_(io_size) { for (size_t i = 0; i < qdepth; ++i) { port::Thread th(&ThreadedWriter::ThreadMain, this); threads_.push_back(std::move(th)); } } void ThreadedWriter::Stop() { // notify all threads to exit for (size_t i = 0; i < threads_.size(); ++i) { q_.Push(IO(/*signal=*/true)); } // wait for all threads to exit for (auto& th : threads_) { th.join(); assert(!th.joinable()); } threads_.clear(); } void ThreadedWriter::Write(WritableFile* const file, CacheWriteBuffer* buf, const uint64_t file_off, const std::function<void()> callback) { q_.Push(IO(file, buf, file_off, callback)); } void ThreadedWriter::ThreadMain() { while (true) { // Fetch the IO to process IO io(q_.Pop()); if (io.signal_) { // that's secret signal to exit break; } // Reserve space for writing the buffer while (!cache_->Reserve(io.buf_->Used())) { // We can fail to reserve space if every file in the system // is being currently accessed /* sleep override */ Env::Default()->SleepForMicroseconds(1000000); } DispatchIO(io); io.callback_(); } } void ThreadedWriter::DispatchIO(const IO& io) { size_t written = 0; while (written < io.buf_->Used()) { Slice data(io.buf_->Data() + written, io_size_); Status s = io.file_->Append(data); assert(s.ok()); if (!s.ok()) { // That is definite IO error to device. There is not much we can // do but ignore the failure. This can lead to corruption of data on // disk, but the cache will skip while reading fprintf(stderr, "Error writing data to file. %s\n", s.ToString().c_str()); } written += io_size_; } } } // namespace rocksdb #endif