// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #pragma once #include #include #include #include #include #include #include #include #include #include #include #include "db/blob/blob_log_format.h" #include "db/blob/blob_log_writer.h" #include "db/db_iter.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/db.h" #include "rocksdb/file_system.h" #include "rocksdb/listener.h" #include "rocksdb/options.h" #include "rocksdb/statistics.h" #include "rocksdb/wal_filter.h" #include "util/mutexlock.h" #include "util/timer_queue.h" #include "utilities/blob_db/blob_db.h" #include "utilities/blob_db/blob_file.h" namespace ROCKSDB_NAMESPACE { class DBImpl; class ColumnFamilyHandle; class ColumnFamilyData; class SystemClock; struct FlushJobInfo; namespace blob_db { struct BlobCompactionContext; struct BlobCompactionContextGC; class BlobDBImpl; class BlobFile; // Comparator to sort "TTL" aware Blob files based on the lower value of // TTL range. struct BlobFileComparatorTTL { bool operator()(const std::shared_ptr& lhs, const std::shared_ptr& rhs) const; }; struct BlobFileComparator { bool operator()(const std::shared_ptr& lhs, const std::shared_ptr& rhs) const; }; /** * The implementation class for BlobDB. It manages the blob logs, which * are sequentially written files. Blob logs can be of the TTL or non-TTL * varieties; the former are cleaned up when they expire, while the latter * are (optionally) garbage collected. */ class BlobDBImpl : public BlobDB { friend class BlobFile; friend class BlobDBIterator; friend class BlobDBListener; friend class BlobDBListenerGC; friend class BlobIndexCompactionFilterBase; friend class BlobIndexCompactionFilterGC; public: // deletions check period static constexpr uint32_t kDeleteCheckPeriodMillisecs = 2 * 1000; // sanity check task static constexpr uint32_t kSanityCheckPeriodMillisecs = 20 * 60 * 1000; // how many random access open files can we tolerate static constexpr uint32_t kOpenFilesTrigger = 100; // how often to schedule reclaim open files. static constexpr uint32_t kReclaimOpenFilesPeriodMillisecs = 1 * 1000; // how often to schedule delete obs files periods static constexpr uint32_t kDeleteObsoleteFilesPeriodMillisecs = 10 * 1000; // how often to schedule expired files eviction. static constexpr uint32_t kEvictExpiredFilesPeriodMillisecs = 10 * 1000; // when should oldest file be evicted: // on reaching 90% of blob_dir_size static constexpr double kEvictOldestFileAtSize = 0.9; using BlobDB::Put; Status Put(const WriteOptions& options, const Slice& key, const Slice& value) override; using BlobDB::Get; Status Get(const ReadOptions& _read_options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value, std::string* timestamp) override; Status Get(const ReadOptions& _read_options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value, uint64_t* expiration) override; using BlobDB::NewIterator; Iterator* NewIterator(const ReadOptions& read_options) override; using BlobDB::NewIterators; Status NewIterators( const ReadOptions& /*read_options*/, const std::vector& /*column_families*/, std::vector* /*iterators*/) override { return Status::NotSupported("Not implemented"); } using BlobDB::MultiGet; void MultiGet(const ReadOptions& _read_options, size_t num_keys, ColumnFamilyHandle** column_families, const Slice* keys, PinnableSlice* values, std::string* timestamps, Status* statuses, const bool sorted_input) override; using BlobDB::Write; Status Write(const WriteOptions& opts, WriteBatch* updates) override; Status Close() override; using BlobDB::PutWithTTL; Status PutWithTTL(const WriteOptions& options, const Slice& key, const Slice& value, uint64_t ttl) override; using BlobDB::PutUntil; Status PutUntil(const WriteOptions& options, const Slice& key, const Slice& value, uint64_t expiration) override; using BlobDB::CompactFiles; Status CompactFiles( const CompactionOptions& compact_options, const std::vector& input_file_names, const int output_level, const int output_path_id = -1, std::vector* const output_file_names = nullptr, CompactionJobInfo* compaction_job_info = nullptr) override; BlobDBOptions GetBlobDBOptions() const override; BlobDBImpl(const std::string& dbname, const BlobDBOptions& bdb_options, const DBOptions& db_options, const ColumnFamilyOptions& cf_options); Status DisableFileDeletions() override; Status EnableFileDeletions() override; Status GetLiveFiles(std::vector&, uint64_t* manifest_file_size, bool flush_memtable = true) override; void GetLiveFilesMetaData(std::vector*) override; Status GetLiveFilesStorageInfo( const LiveFilesStorageInfoOptions& opts, std::vector* files) override; ~BlobDBImpl(); Status Open(std::vector* handles); Status SyncBlobFiles(const WriteOptions& write_options) override; // Common part of the two GetCompactionContext methods below. // REQUIRES: read lock on mutex_ void GetCompactionContextCommon(BlobCompactionContext* context); void GetCompactionContext(BlobCompactionContext* context); void GetCompactionContext(BlobCompactionContext* context, BlobCompactionContextGC* context_gc); #ifndef NDEBUG Status TEST_GetBlobValue(const Slice& key, const Slice& index_entry, PinnableSlice* value); void TEST_AddDummyBlobFile(uint64_t blob_file_number, SequenceNumber immutable_sequence); std::vector> TEST_GetBlobFiles() const; std::vector> TEST_GetLiveImmNonTTLFiles() const; std::vector> TEST_GetObsoleteFiles() const; Status TEST_CloseBlobFile(std::shared_ptr& bfile); void TEST_ObsoleteBlobFile(std::shared_ptr& blob_file, SequenceNumber obsolete_seq = 0, bool update_size = true); void TEST_EvictExpiredFiles(); void TEST_DeleteObsoleteFiles(); uint64_t TEST_live_sst_size(); const std::string& TEST_blob_dir() const { return blob_dir_; } void TEST_InitializeBlobFileToSstMapping( const std::vector& live_files); void TEST_ProcessFlushJobInfo(const FlushJobInfo& info); void TEST_ProcessCompactionJobInfo(const CompactionJobInfo& info); #endif // !NDEBUG private: class BlobInserter; // Create a snapshot if there isn't one in read options. // Return true if a snapshot is created. bool SetSnapshotIfNeeded(ReadOptions* read_options); Status GetImpl(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value, uint64_t* expiration = nullptr); Status GetBlobValue(const Slice& key, const Slice& index_entry, PinnableSlice* value, uint64_t* expiration = nullptr); Status GetRawBlobFromFile(const Slice& key, uint64_t file_number, uint64_t offset, uint64_t size, PinnableSlice* value, CompressionType* compression_type); Slice GetCompressedSlice(const Slice& raw, std::string* compression_output) const; Status DecompressSlice(const Slice& compressed_value, CompressionType compression_type, PinnableSlice* value_output) const; // Close a file by appending a footer, and removes file from open files list. // REQUIRES: lock held on write_mutex_, write lock held on both the db mutex_ // and the blob file's mutex_. If called on a blob file which is visible only // to a single thread (like in the case of new files written during // compaction/GC), the locks on write_mutex_ and the blob file's mutex_ can be // avoided. Status CloseBlobFile(const WriteOptions& write_options, std::shared_ptr bfile); // Close a file if its size exceeds blob_file_size // REQUIRES: lock held on write_mutex_. Status CloseBlobFileIfNeeded(const WriteOptions& write_options, std::shared_ptr& bfile); // Mark file as obsolete and move the file to obsolete file list. // // REQUIRED: hold write lock of mutex_ or during DB open. void ObsoleteBlobFile(std::shared_ptr blob_file, SequenceNumber obsolete_seq, bool update_size); Status PutBlobValue(const WriteOptions& options, const Slice& key, const Slice& value, uint64_t expiration, WriteBatch* batch); Status AppendBlob(const WriteOptions& write_options, const std::shared_ptr& bfile, const std::string& headerbuf, const Slice& key, const Slice& value, uint64_t expiration, std::string* index_entry); // Create a new blob file and associated writer. Status CreateBlobFileAndWriter(const WriteOptions& write_options, bool has_ttl, const ExpirationRange& expiration_range, const std::string& reason, std::shared_ptr* blob_file, std::shared_ptr* writer); // Get the open non-TTL blob log file, or create a new one if no such file // exists. Status SelectBlobFile(const WriteOptions& write_options, std::shared_ptr* blob_file); // Get the open TTL blob log file for a certain expiration, or create a new // one if no such file exists. Status SelectBlobFileTTL(const WriteOptions& write_options, uint64_t expiration, std::shared_ptr* blob_file); std::shared_ptr FindBlobFileLocked(uint64_t expiration) const; // periodic sanity check. Bunch of checks std::pair SanityCheck(bool aborted); // Delete files that have been marked obsolete (either because of TTL // or GC). Check whether any snapshots exist which refer to the same. std::pair DeleteObsoleteFiles(bool aborted); // periodically check if open blob files and their TTL's has expired // if expired, close the sequential writer and make the file immutable std::pair EvictExpiredFiles(bool aborted); // if the number of open files, approaches ULIMIT's this // task will close random readers, which are kept around for // efficiency std::pair ReclaimOpenFiles(bool aborted); std::pair RemoveTimerQ(TimerQueue* tq, bool aborted); // Adds the background tasks to the timer queue void StartBackgroundTasks(); // add a new Blob File std::shared_ptr NewBlobFile(bool has_ttl, const ExpirationRange& expiration_range, const std::string& reason); // Register a new blob file. // REQUIRES: write lock on mutex_. void RegisterBlobFile(std::shared_ptr blob_file); // collect all the blob log files from the blob directory Status GetAllBlobFiles(std::set* file_numbers); // Open all blob files found in blob_dir. Status OpenAllBlobFiles(); // Link an SST to a blob file. Comes in locking and non-locking varieties // (the latter is used during Open). template void LinkSstToBlobFileImpl(uint64_t sst_file_number, uint64_t blob_file_number, Linker linker); void LinkSstToBlobFile(uint64_t sst_file_number, uint64_t blob_file_number); void LinkSstToBlobFileNoLock(uint64_t sst_file_number, uint64_t blob_file_number); // Unlink an SST from a blob file. void UnlinkSstFromBlobFile(uint64_t sst_file_number, uint64_t blob_file_number); // Initialize the mapping between blob files and SSTs during Open. void InitializeBlobFileToSstMapping( const std::vector& live_files); // Update the mapping between blob files and SSTs after a flush and mark // any unneeded blob files obsolete. void ProcessFlushJobInfo(const FlushJobInfo& info); // Update the mapping between blob files and SSTs after a compaction and // mark any unneeded blob files obsolete. void ProcessCompactionJobInfo(const CompactionJobInfo& info); // Mark an immutable non-TTL blob file obsolete assuming it has no more SSTs // linked to it, and all memtables from before the blob file became immutable // have been flushed. Note: should only be called if the condition holds for // all lower-numbered non-TTL blob files as well. bool MarkBlobFileObsoleteIfNeeded(const std::shared_ptr& blob_file, SequenceNumber obsolete_seq); // Mark all immutable non-TTL blob files that aren't needed by any SSTs as // obsolete. Comes in two varieties; the version used during Open need not // worry about locking or snapshots. template void MarkUnreferencedBlobFilesObsoleteImpl(Functor mark_if_needed); void MarkUnreferencedBlobFilesObsolete(); void MarkUnreferencedBlobFilesObsoleteDuringOpen(); void UpdateLiveSSTSize(const WriteOptions& write_options); Status GetBlobFileReader(const std::shared_ptr& blob_file, std::shared_ptr* reader); // hold write mutex on file and call. // Close the above Random Access reader void CloseRandomAccessLocked(const std::shared_ptr& bfile); // hold write mutex on file and call // creates a sequential (append) writer for this blobfile Status CreateWriterLocked(const std::shared_ptr& bfile); // returns a BlobLogWriter object for the file. If writer is not // already present, creates one. Needs Write Mutex to be held Status CheckOrCreateWriterLocked(const std::shared_ptr& blob_file, std::shared_ptr* writer); // checks if there is no snapshot which is referencing the // blobs bool VisibleToActiveSnapshot(const std::shared_ptr& file); bool FileDeleteOk_SnapshotCheckLocked(const std::shared_ptr& bfile); void CopyBlobFiles(std::vector>* bfiles_copy); uint64_t EpochNow() { return clock_->NowMicros() / 1000000; } // Check if inserting a new blob will make DB grow out of space. // If is_fifo = true, FIFO eviction will be triggered to make room for the // new blob. If force_evict = true, FIFO eviction will evict blob files // even eviction will not make enough room for the new blob. Status CheckSizeAndEvictBlobFiles(const WriteOptions& write_options, uint64_t blob_size, bool force_evict = false); Status CloseImpl(); // name of the database directory std::string dbname_; // the base DB DBImpl* db_impl_; Env* env_; SystemClock* clock_; // the options that govern the behavior of Blob Storage BlobDBOptions bdb_options_; DBOptions db_options_; ColumnFamilyOptions cf_options_; FileOptions file_options_; // Raw pointer of statistic. db_options_ has a std::shared_ptr to hold // ownership. Statistics* statistics_; // by default this is "blob_dir" under dbname_ // but can be configured std::string blob_dir_; // pointer to directory std::unique_ptr dir_ent_; // Read Write Mutex, which protects all the data structures // HEAVILY TRAFFICKED mutable port::RWMutex mutex_; // Writers has to hold write_mutex_ before writing. mutable port::Mutex write_mutex_; // counter for blob file number std::atomic next_file_number_; // entire metadata of all the BLOB files memory std::map> blob_files_; // All live immutable non-TTL blob files. std::map> live_imm_non_ttl_blob_files_; // The largest sequence number that has been flushed. SequenceNumber flush_sequence_; // opened non-TTL blob file. std::shared_ptr open_non_ttl_file_; // all the blob files which are currently being appended to based // on variety of incoming TTL's std::set, BlobFileComparatorTTL> open_ttl_files_; // Flag to check whether Close() has been called on this DB bool closed_; // timer based queue to execute tasks TimerQueue tqueue_; // number of files opened for random access/GET // counter is used to monitor and close excess RA files. std::atomic open_file_count_; // Total size of all live blob files (i.e. exclude obsolete files). std::atomic total_blob_size_; // total size of SST files. std::atomic live_sst_size_; // Latest FIFO eviction timestamp // // REQUIRES: access with metex_ lock held. uint64_t fifo_eviction_seq_; // The expiration up to which latest FIFO eviction evicts. // // REQUIRES: access with metex_ lock held. uint64_t evict_expiration_up_to_; std::list> obsolete_files_; // DeleteObsoleteFiles, DiableFileDeletions and EnableFileDeletions block // on the mutex to avoid contention. // // While DeleteObsoleteFiles hold both mutex_ and delete_file_mutex_, note // the difference. mutex_ only needs to be held when access the // data-structure, and delete_file_mutex_ needs to be held the whole time // during DeleteObsoleteFiles to avoid being run simultaneously with // DisableFileDeletions. // // If both of mutex_ and delete_file_mutex_ needs to be held, it is adviced // to hold delete_file_mutex_ first to avoid deadlock. mutable port::Mutex delete_file_mutex_; // Each call of DisableFileDeletions will increase disable_file_deletion_ // by 1. EnableFileDeletions will either decrease the count by 1 or reset // it to zero, depending on the force flag. // // REQUIRES: access with delete_file_mutex_ held. int disable_file_deletions_ = 0; uint32_t debug_level_; }; } // namespace blob_db } // namespace ROCKSDB_NAMESPACE