// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #include "utilities/blob_db/blob_file.h" #include #include #include #include #include "db/column_family.h" #include "db/db_impl/db_impl.h" #include "db/dbformat.h" #include "file/filename.h" #include "file/readahead_raf.h" #include "logging/logging.h" #include "utilities/blob_db/blob_db_impl.h" namespace ROCKSDB_NAMESPACE::blob_db { BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn, Logger* info_log) : parent_(p), path_to_dir_(bdir), file_number_(fn), info_log_(info_log) {} BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn, Logger* info_log, uint32_t column_family_id, CompressionType compression, bool has_ttl, const ExpirationRange& expiration_range) : parent_(p), path_to_dir_(bdir), file_number_(fn), info_log_(info_log), column_family_id_(column_family_id), compression_(compression), has_ttl_(has_ttl), expiration_range_(expiration_range), header_(column_family_id, compression, has_ttl, expiration_range), header_valid_(true) {} BlobFile::~BlobFile() { if (obsolete_) { std::string pn(PathName()); Status s = Env::Default()->DeleteFile(PathName()); if (!s.ok()) { // ROCKS_LOG_INFO(db_options_.info_log, // "File could not be deleted %s", pn.c_str()); } } } uint32_t BlobFile::GetColumnFamilyId() const { return column_family_id_; } std::string BlobFile::PathName() const { return BlobFileName(path_to_dir_, file_number_); } std::string BlobFile::DumpState() const { char str[1000]; snprintf( str, sizeof(str), "path: %s fn: %" PRIu64 " blob_count: %" PRIu64 " file_size: %" PRIu64 " closed: %d obsolete: %d expiration_range: (%" PRIu64 ", %" PRIu64 "), writer: %d reader: %d", path_to_dir_.c_str(), file_number_, blob_count_.load(), file_size_.load(), closed_.load(), obsolete_.load(), expiration_range_.first, expiration_range_.second, (!!log_writer_), (!!ra_file_reader_)); return str; } void BlobFile::MarkObsolete(SequenceNumber sequence) { assert(Immutable()); obsolete_sequence_ = sequence; obsolete_.store(true); } Status BlobFile::WriteFooterAndCloseLocked(const WriteOptions& write_options, SequenceNumber sequence) { BlobLogFooter footer; footer.blob_count = blob_count_; if (HasTTL()) { footer.expiration_range = expiration_range_; } // this will close the file and reset the Writable File Pointer. Status s = log_writer_->AppendFooter(write_options, footer, /* checksum_method */ nullptr, /* checksum_value */ nullptr); if (s.ok()) { closed_ = true; immutable_sequence_ = sequence; file_size_ += BlobLogFooter::kSize; } // delete the sequential writer log_writer_.reset(); return s; } Status BlobFile::ReadFooter(BlobLogFooter* bf) { if (file_size_ < (BlobLogHeader::kSize + BlobLogFooter::kSize)) { return Status::IOError("File does not have footer", PathName()); } uint64_t footer_offset = file_size_ - BlobLogFooter::kSize; // assume that ra_file_reader_ is valid before we enter this assert(ra_file_reader_); Slice result; std::string buf; AlignedBuf aligned_buf; Status s; // TODO: rate limit reading footers from blob files. if (ra_file_reader_->use_direct_io()) { s = ra_file_reader_->Read(IOOptions(), footer_offset, BlobLogFooter::kSize, &result, nullptr, &aligned_buf); } else { buf.reserve(BlobLogFooter::kSize + 10); s = ra_file_reader_->Read(IOOptions(), footer_offset, BlobLogFooter::kSize, &result, buf.data(), nullptr); } if (!s.ok()) { return s; } if (result.size() != BlobLogFooter::kSize) { // should not happen return Status::IOError("EOF reached before footer"); } s = bf->DecodeFrom(result); return s; } Status BlobFile::SetFromFooterLocked(const BlobLogFooter& footer) { blob_count_ = footer.blob_count; expiration_range_ = footer.expiration_range; closed_ = true; return Status::OK(); } Status BlobFile::Fsync(const WriteOptions& write_options) { Status s; if (log_writer_.get()) { s = log_writer_->Sync(write_options); } return s; } void BlobFile::CloseRandomAccessLocked() { ra_file_reader_.reset(); last_access_ = -1; } Status BlobFile::GetReader(Env* env, const FileOptions& file_options, std::shared_ptr* reader, bool* fresh_open) { assert(reader != nullptr); assert(fresh_open != nullptr); *fresh_open = false; int64_t current_time = 0; if (env->GetCurrentTime(¤t_time).ok()) { last_access_.store(current_time); } Status s; { ReadLock lockbfile_r(&mutex_); if (ra_file_reader_) { *reader = ra_file_reader_; return s; } } WriteLock lockbfile_w(&mutex_); // Double check. if (ra_file_reader_) { *reader = ra_file_reader_; return s; } std::unique_ptr rfile; s = env->GetFileSystem()->NewRandomAccessFile(PathName(), file_options, &rfile, nullptr); if (!s.ok()) { ROCKS_LOG_ERROR(info_log_, "Failed to open blob file for random-read: %s status: '%s'" " exists: '%s'", PathName().c_str(), s.ToString().c_str(), env->FileExists(PathName()).ToString().c_str()); return s; } ra_file_reader_ = std::make_shared(std::move(rfile), PathName()); *reader = ra_file_reader_; *fresh_open = true; return s; } Status BlobFile::ReadMetadata(const std::shared_ptr& fs, const FileOptions& file_options) { assert(Immutable()); // Get file size. uint64_t file_size = 0; Status s = fs->GetFileSize(PathName(), file_options.io_options, &file_size, nullptr); if (s.ok()) { file_size_ = file_size; } else { ROCKS_LOG_ERROR(info_log_, "Failed to get size of blob file %" PRIu64 ", status: %s", file_number_, s.ToString().c_str()); return s; } if (file_size < BlobLogHeader::kSize) { ROCKS_LOG_ERROR( info_log_, "Incomplete blob file blob file %" PRIu64 ", size: %" PRIu64, file_number_, file_size); return Status::Corruption("Incomplete blob file header."); } // Create file reader. std::unique_ptr file_reader; s = RandomAccessFileReader::Create(fs, PathName(), file_options, &file_reader, nullptr); if (!s.ok()) { ROCKS_LOG_ERROR(info_log_, "Failed to open blob file %" PRIu64 ", status: %s", file_number_, s.ToString().c_str()); return s; } // Read file header. std::string header_buf; AlignedBuf aligned_buf; Slice header_slice; // TODO: rate limit reading headers from blob files. if (file_reader->use_direct_io()) { s = file_reader->Read(IOOptions(), 0, BlobLogHeader::kSize, &header_slice, nullptr, &aligned_buf); } else { header_buf.reserve(BlobLogHeader::kSize); s = file_reader->Read(IOOptions(), 0, BlobLogHeader::kSize, &header_slice, header_buf.data(), nullptr); } if (!s.ok()) { ROCKS_LOG_ERROR( info_log_, "Failed to read header of blob file %" PRIu64 ", status: %s", file_number_, s.ToString().c_str()); return s; } BlobLogHeader header; s = header.DecodeFrom(header_slice); if (!s.ok()) { ROCKS_LOG_ERROR(info_log_, "Failed to decode header of blob file %" PRIu64 ", status: %s", file_number_, s.ToString().c_str()); return s; } column_family_id_ = header.column_family_id; compression_ = header.compression; has_ttl_ = header.has_ttl; if (has_ttl_) { expiration_range_ = header.expiration_range; } header_valid_ = true; // Read file footer. if (file_size_ < BlobLogHeader::kSize + BlobLogFooter::kSize) { // OK not to have footer. assert(!footer_valid_); return Status::OK(); } std::string footer_buf; Slice footer_slice; // TODO: rate limit reading footers from blob files. if (file_reader->use_direct_io()) { s = file_reader->Read(IOOptions(), file_size - BlobLogFooter::kSize, BlobLogFooter::kSize, &footer_slice, nullptr, &aligned_buf); } else { footer_buf.reserve(BlobLogFooter::kSize); s = file_reader->Read(IOOptions(), file_size - BlobLogFooter::kSize, BlobLogFooter::kSize, &footer_slice, footer_buf.data(), nullptr); } if (!s.ok()) { ROCKS_LOG_ERROR( info_log_, "Failed to read footer of blob file %" PRIu64 ", status: %s", file_number_, s.ToString().c_str()); return s; } BlobLogFooter footer; s = footer.DecodeFrom(footer_slice); if (!s.ok()) { // OK not to have footer. assert(!footer_valid_); return Status::OK(); } blob_count_ = footer.blob_count; if (has_ttl_) { assert(header.expiration_range.first <= footer.expiration_range.first); assert(header.expiration_range.second >= footer.expiration_range.second); expiration_range_ = footer.expiration_range; } footer_valid_ = true; return Status::OK(); } } // namespace ROCKSDB_NAMESPACE::blob_db