// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "utilities/ttl/db_ttl_impl.h" #include "db/write_batch_internal.h" #include "file/filename.h" #include "logging/logging.h" #include "rocksdb/convenience.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" #include "rocksdb/system_clock.h" #include "rocksdb/utilities/db_ttl.h" #include "rocksdb/utilities/object_registry.h" #include "rocksdb/utilities/options_type.h" #include "util/coding.h" namespace ROCKSDB_NAMESPACE { static std::unordered_map ttl_merge_op_type_info = {{"user_operator", OptionTypeInfo::AsCustomSharedPtr( 0, OptionVerificationType::kByName, OptionTypeFlags::kNone)}}; TtlMergeOperator::TtlMergeOperator( const std::shared_ptr& merge_op, SystemClock* clock) : user_merge_op_(merge_op), clock_(clock) { RegisterOptions("TtlMergeOptions", &user_merge_op_, &ttl_merge_op_type_info); } bool TtlMergeOperator::FullMergeV2(const MergeOperationInput& merge_in, MergeOperationOutput* merge_out) const { const uint32_t ts_len = DBWithTTLImpl::kTSLength; if (merge_in.existing_value && merge_in.existing_value->size() < ts_len) { ROCKS_LOG_ERROR(merge_in.logger, "Error: Could not remove timestamp from existing value."); return false; } // Extract time-stamp from each operand to be passed to user_merge_op_ std::vector operands_without_ts; for (const auto& operand : merge_in.operand_list) { if (operand.size() < ts_len) { ROCKS_LOG_ERROR(merge_in.logger, "Error: Could not remove timestamp from operand value."); return false; } operands_without_ts.push_back(operand); operands_without_ts.back().remove_suffix(ts_len); } // Apply the user merge operator (store result in *new_value) bool good = true; MergeOperationOutput user_merge_out(merge_out->new_value, merge_out->existing_operand); if (merge_in.existing_value) { Slice existing_value_without_ts(merge_in.existing_value->data(), merge_in.existing_value->size() - ts_len); good = user_merge_op_->FullMergeV2( MergeOperationInput(merge_in.key, &existing_value_without_ts, operands_without_ts, merge_in.logger), &user_merge_out); } else { good = user_merge_op_->FullMergeV2( MergeOperationInput(merge_in.key, nullptr, operands_without_ts, merge_in.logger), &user_merge_out); } merge_out->op_failure_scope = user_merge_out.op_failure_scope; // Return false if the user merge operator returned false if (!good) { return false; } if (merge_out->existing_operand.data()) { merge_out->new_value.assign(merge_out->existing_operand.data(), merge_out->existing_operand.size()); merge_out->existing_operand = Slice(nullptr, 0); } // Augment the *new_value with the ttl time-stamp int64_t curtime; if (!clock_->GetCurrentTime(&curtime).ok()) { ROCKS_LOG_ERROR( merge_in.logger, "Error: Could not get current time to be attached internally " "to the new value."); return false; } else { char ts_string[ts_len]; EncodeFixed32(ts_string, (int32_t)curtime); merge_out->new_value.append(ts_string, ts_len); return true; } } bool TtlMergeOperator::PartialMergeMulti(const Slice& key, const std::deque& operand_list, std::string* new_value, Logger* logger) const { const uint32_t ts_len = DBWithTTLImpl::kTSLength; std::deque operands_without_ts; for (const auto& operand : operand_list) { if (operand.size() < ts_len) { ROCKS_LOG_ERROR(logger, "Error: Could not remove timestamp from value."); return false; } operands_without_ts.push_back( Slice(operand.data(), operand.size() - ts_len)); } // Apply the user partial-merge operator (store result in *new_value) assert(new_value); if (!user_merge_op_->PartialMergeMulti(key, operands_without_ts, new_value, logger)) { return false; } // Augment the *new_value with the ttl time-stamp int64_t curtime; if (!clock_->GetCurrentTime(&curtime).ok()) { ROCKS_LOG_ERROR( logger, "Error: Could not get current time to be attached internally " "to the new value."); return false; } else { char ts_string[ts_len]; EncodeFixed32(ts_string, (int32_t)curtime); new_value->append(ts_string, ts_len); return true; } } Status TtlMergeOperator::PrepareOptions(const ConfigOptions& config_options) { if (clock_ == nullptr) { clock_ = config_options.env->GetSystemClock().get(); } return MergeOperator::PrepareOptions(config_options); } Status TtlMergeOperator::ValidateOptions( const DBOptions& db_opts, const ColumnFamilyOptions& cf_opts) const { if (user_merge_op_ == nullptr) { return Status::InvalidArgument( "UserMergeOperator required by TtlMergeOperator"); } else if (clock_ == nullptr) { return Status::InvalidArgument("SystemClock required by TtlMergeOperator"); } else { return MergeOperator::ValidateOptions(db_opts, cf_opts); } } void DBWithTTLImpl::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options, SystemClock* clock) { if (options->compaction_filter) { options->compaction_filter = new TtlCompactionFilter(ttl, clock, options->compaction_filter); } else { options->compaction_filter_factory = std::shared_ptr(new TtlCompactionFilterFactory( ttl, clock, options->compaction_filter_factory)); } if (options->merge_operator) { options->merge_operator.reset( new TtlMergeOperator(options->merge_operator, clock)); } } static std::unordered_map ttl_type_info = { {"ttl", {0, OptionType::kInt32T}}, }; static std::unordered_map ttl_cff_type_info = { {"user_filter_factory", OptionTypeInfo::AsCustomSharedPtr( 0, OptionVerificationType::kByNameAllowFromNull, OptionTypeFlags::kNone)}}; static std::unordered_map user_cf_type_info = { {"user_filter", OptionTypeInfo::AsCustomRawPtr( 0, OptionVerificationType::kByName, OptionTypeFlags::kAllowNull)}}; TtlCompactionFilter::TtlCompactionFilter( int32_t ttl, SystemClock* clock, const CompactionFilter* _user_comp_filter, std::unique_ptr _user_comp_filter_from_factory) : LayeredCompactionFilterBase(_user_comp_filter, std::move(_user_comp_filter_from_factory)), ttl_(ttl), clock_(clock) { RegisterOptions("TTL", &ttl_, &ttl_type_info); RegisterOptions("UserFilter", &user_comp_filter_, &user_cf_type_info); } bool TtlCompactionFilter::Filter(int level, const Slice& key, const Slice& old_val, std::string* new_val, bool* value_changed) const { if (DBWithTTLImpl::IsStale(old_val, ttl_, clock_)) { return true; } if (user_comp_filter() == nullptr) { return false; } assert(old_val.size() >= DBWithTTLImpl::kTSLength); Slice old_val_without_ts(old_val.data(), old_val.size() - DBWithTTLImpl::kTSLength); if (user_comp_filter()->Filter(level, key, old_val_without_ts, new_val, value_changed)) { return true; } if (*value_changed) { new_val->append(old_val.data() + old_val.size() - DBWithTTLImpl::kTSLength, DBWithTTLImpl::kTSLength); } return false; } Status TtlCompactionFilter::PrepareOptions( const ConfigOptions& config_options) { if (clock_ == nullptr) { clock_ = config_options.env->GetSystemClock().get(); } return LayeredCompactionFilterBase::PrepareOptions(config_options); } Status TtlCompactionFilter::ValidateOptions( const DBOptions& db_opts, const ColumnFamilyOptions& cf_opts) const { if (clock_ == nullptr) { return Status::InvalidArgument( "SystemClock required by TtlCompactionFilter"); } else { return LayeredCompactionFilterBase::ValidateOptions(db_opts, cf_opts); } } TtlCompactionFilterFactory::TtlCompactionFilterFactory( int32_t ttl, SystemClock* clock, std::shared_ptr comp_filter_factory) : ttl_(ttl), clock_(clock), user_comp_filter_factory_(comp_filter_factory) { RegisterOptions("UserOptions", &user_comp_filter_factory_, &ttl_cff_type_info); RegisterOptions("TTL", &ttl_, &ttl_type_info); } std::unique_ptr TtlCompactionFilterFactory::CreateCompactionFilter( const CompactionFilter::Context& context) { std::unique_ptr user_comp_filter_from_factory = nullptr; if (user_comp_filter_factory_) { user_comp_filter_from_factory = user_comp_filter_factory_->CreateCompactionFilter(context); } return std::unique_ptr(new TtlCompactionFilter( ttl_, clock_, nullptr, std::move(user_comp_filter_from_factory))); } Status TtlCompactionFilterFactory::PrepareOptions( const ConfigOptions& config_options) { if (clock_ == nullptr) { clock_ = config_options.env->GetSystemClock().get(); } return CompactionFilterFactory::PrepareOptions(config_options); } Status TtlCompactionFilterFactory::ValidateOptions( const DBOptions& db_opts, const ColumnFamilyOptions& cf_opts) const { if (clock_ == nullptr) { return Status::InvalidArgument( "SystemClock required by TtlCompactionFilterFactory"); } else { return CompactionFilterFactory::ValidateOptions(db_opts, cf_opts); } } int RegisterTtlObjects(ObjectLibrary& library, const std::string& /*arg*/) { library.AddFactory( TtlMergeOperator::kClassName(), [](const std::string& /*uri*/, std::unique_ptr* guard, std::string* /* errmsg */) { guard->reset(new TtlMergeOperator(nullptr, nullptr)); return guard->get(); }); library.AddFactory( TtlCompactionFilterFactory::kClassName(), [](const std::string& /*uri*/, std::unique_ptr* guard, std::string* /* errmsg */) { guard->reset(new TtlCompactionFilterFactory(0, nullptr, nullptr)); return guard->get(); }); library.AddFactory( TtlCompactionFilter::kClassName(), [](const std::string& /*uri*/, std::unique_ptr* /*guard*/, std::string* /* errmsg */) { return new TtlCompactionFilter(0, nullptr, nullptr); }); size_t num_types; return static_cast(library.GetFactoryCount(&num_types)); } // Open the db inside DBWithTTLImpl because options needs pointer to its ttl DBWithTTLImpl::DBWithTTLImpl(DB* db) : DBWithTTL(db), closed_(false) {} DBWithTTLImpl::~DBWithTTLImpl() { if (!closed_) { Close().PermitUncheckedError(); } } Status DBWithTTLImpl::Close() { Status ret = Status::OK(); if (!closed_) { Options default_options = GetOptions(); // Need to stop background compaction before getting rid of the filter CancelAllBackgroundWork(db_, /* wait = */ true); ret = db_->Close(); delete default_options.compaction_filter; closed_ = true; } return ret; } void DBWithTTLImpl::RegisterTtlClasses() { static std::once_flag once; std::call_once(once, [&]() { ObjectRegistry::Default()->AddLibrary("TTL", RegisterTtlObjects, ""); }); } Status DBWithTTL::Open(const Options& options, const std::string& dbname, DBWithTTL** dbptr, int32_t ttl, bool read_only) { DBOptions db_options(options); ColumnFamilyOptions cf_options(options); std::vector column_families; column_families.push_back( ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options)); std::vector handles; Status s = DBWithTTL::Open(db_options, dbname, column_families, &handles, dbptr, {ttl}, read_only); if (s.ok()) { assert(handles.size() == 1); // i can delete the handle since DBImpl is always holding a reference to // default column family delete handles[0]; } return s; } Status DBWithTTL::Open( const DBOptions& db_options, const std::string& dbname, const std::vector& column_families, std::vector* handles, DBWithTTL** dbptr, const std::vector& ttls, bool read_only) { DBWithTTLImpl::RegisterTtlClasses(); if (ttls.size() != column_families.size()) { return Status::InvalidArgument( "ttls size has to be the same as number of column families"); } SystemClock* clock = (db_options.env == nullptr) ? SystemClock::Default().get() : db_options.env->GetSystemClock().get(); std::vector column_families_sanitized = column_families; for (size_t i = 0; i < column_families_sanitized.size(); ++i) { DBWithTTLImpl::SanitizeOptions( ttls[i], &column_families_sanitized[i].options, clock); } DB* db; Status st; if (read_only) { st = DB::OpenForReadOnly(db_options, dbname, column_families_sanitized, handles, &db); } else { st = DB::Open(db_options, dbname, column_families_sanitized, handles, &db); } if (st.ok()) { *dbptr = new DBWithTTLImpl(db); } else { *dbptr = nullptr; } return st; } Status DBWithTTLImpl::CreateColumnFamilyWithTtl( const ColumnFamilyOptions& options, const std::string& column_family_name, ColumnFamilyHandle** handle, int ttl) { RegisterTtlClasses(); ColumnFamilyOptions sanitized_options = options; DBWithTTLImpl::SanitizeOptions(ttl, &sanitized_options, GetEnv()->GetSystemClock().get()); return DBWithTTL::CreateColumnFamily(sanitized_options, column_family_name, handle); } Status DBWithTTLImpl::CreateColumnFamily(const ColumnFamilyOptions& options, const std::string& column_family_name, ColumnFamilyHandle** handle) { return CreateColumnFamilyWithTtl(options, column_family_name, handle, 0); } // Appends the current timestamp to the string. // Returns false if could not get the current_time, true if append succeeds Status DBWithTTLImpl::AppendTS(const Slice& val, std::string* val_with_ts, SystemClock* clock) { val_with_ts->reserve(kTSLength + val.size()); char ts_string[kTSLength]; int64_t curtime; Status st = clock->GetCurrentTime(&curtime); if (!st.ok()) { return st; } EncodeFixed32(ts_string, (int32_t)curtime); val_with_ts->append(val.data(), val.size()); val_with_ts->append(ts_string, kTSLength); return st; } // Returns corruption if the length of the string is lesser than timestamp, or // timestamp refers to a time lesser than ttl-feature release time Status DBWithTTLImpl::SanityCheckTimestamp(const Slice& str) { if (str.size() < kTSLength) { return Status::Corruption("Error: value's length less than timestamp's\n"); } // Checks that TS is not lesser than kMinTimestamp // Guards against corruption & normal database opened incorrectly in ttl mode int32_t timestamp_value = DecodeFixed32(str.data() + str.size() - kTSLength); if (timestamp_value < kMinTimestamp) { return Status::Corruption("Error: Timestamp < ttl feature release time!\n"); } return Status::OK(); } // Checks if the string is stale or not according to TTl provided // Generic IsStale implementation bool DBWithTTLImpl::IsStale(const Slice& value, int32_t ttl, SystemClock* clock) { if (ttl <= 0) { // Data is fresh if TTL is non-positive return false; } int64_t curtime; if (!clock->GetCurrentTime(&curtime).ok()) { return false; // Treat the data as fresh if could not get current time } /* int32_t may overflow when timestamp_value + ttl * for example ttl = 86400 * 365 * 15 * convert timestamp_value to int64_t */ int64_t timestamp_value = DecodeFixed32(value.data() + value.size() - kTSLength); return (timestamp_value + ttl) < curtime; } // IsStale for strict ttl bool DBWithTTLImpl::IsStaleStrictTtl(const Slice& value, ColumnFamilyHandle* column_family, const ReadOptions& options) { Options opts = GetOptions(column_family); auto filter = std::static_pointer_cast( opts.compaction_filter_factory); int32_t ttl = filter->GetTtl(); if (ttl <= 0) { return false; } if (options.snapshot == nullptr) { SystemClock* clock = (opts.env == nullptr) ? SystemClock::Default().get() : opts.env->GetSystemClock().get(); return IsStale(value, ttl, clock); } else { int64_t snapshot_time = options.snapshot->GetUnixTime(); int32_t timestamp_value = DecodeFixed32(value.data() + value.size() - kTSLength); return (timestamp_value + ttl) < snapshot_time; } } // Strips the TS from the end of the slice Status DBWithTTLImpl::StripTS(PinnableSlice* pinnable_val) { if (pinnable_val->size() < kTSLength) { return Status::Corruption("Bad timestamp in key-value"); } // Erasing characters which hold the TS pinnable_val->remove_suffix(kTSLength); return Status::OK(); } // Strips the TS from the end of the string Status DBWithTTLImpl::StripTS(std::string* str) { if (str->length() < kTSLength) { return Status::Corruption("Bad timestamp in key-value"); } // Erasing characters which hold the TS str->erase(str->length() - kTSLength, kTSLength); return Status::OK(); } Status DBWithTTLImpl::Put(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, const Slice& val) { WriteBatch batch; Status st = batch.Put(column_family, key, val); if (st.ok()) { st = Write(options, &batch); } return st; } Status DBWithTTLImpl::Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) { Status st = db_->Get(options, column_family, key, value); if (!st.ok()) { return st; } st = SanityCheckTimestamp(*value); if (!st.ok()) { return st; } if (options.skip_expired_data) { if (IsStaleStrictTtl(*value, column_family, options)) { return Status::NotFound(); } } return StripTS(value); } std::vector DBWithTTLImpl::MultiGet( const ReadOptions& options, const std::vector& column_family, const std::vector& keys, std::vector* values) { auto statuses = db_->MultiGet(options, column_family, keys, values); for (size_t i = 0; i < keys.size(); ++i) { if (!statuses[i].ok()) { continue; } statuses[i] = SanityCheckTimestamp((*values)[i]); if (!statuses[i].ok()) { continue; } // check if the key has been expired if is_stale == true it's expired // re-check if the key expired for each key requested by the multiget bool is_stale = false; if (options.skip_expired_data) { if (IsStaleStrictTtl((*values)[i], column_family[i], options)) { statuses[i] = Status::NotFound(); is_stale = true; } } if (!is_stale) { statuses[i] = StripTS(&(*values)[i]); } else { (*values)[i] = ""; } } return statuses; } bool DBWithTTLImpl::KeyMayExist(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, std::string* value, bool* value_found) { bool ret = db_->KeyMayExist(options, column_family, key, value, value_found); if (ret && value != nullptr && value_found != nullptr && *value_found) { if (!SanityCheckTimestamp(*value).ok() || !StripTS(value).ok()) { return false; } } return ret; } Status DBWithTTLImpl::Merge(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { WriteBatch batch; Status st = batch.Merge(column_family, key, value); if (st.ok()) { st = Write(options, &batch); } return st; } Status DBWithTTLImpl::Write(const WriteOptions& opts, WriteBatch* updates) { class Handler : public WriteBatch::Handler { public: explicit Handler(SystemClock* clock) : clock_(clock) {} WriteBatch updates_ttl; Status PutCF(uint32_t column_family_id, const Slice& key, const Slice& value) override { std::string value_with_ts; Status st = AppendTS(value, &value_with_ts, clock_); if (!st.ok()) { return st; } return WriteBatchInternal::Put(&updates_ttl, column_family_id, key, value_with_ts); } Status MergeCF(uint32_t column_family_id, const Slice& key, const Slice& value) override { std::string value_with_ts; Status st = AppendTS(value, &value_with_ts, clock_); if (!st.ok()) { return st; } return WriteBatchInternal::Merge(&updates_ttl, column_family_id, key, value_with_ts); } Status DeleteCF(uint32_t column_family_id, const Slice& key) override { return WriteBatchInternal::Delete(&updates_ttl, column_family_id, key); } Status DeleteRangeCF(uint32_t column_family_id, const Slice& begin_key, const Slice& end_key) override { return WriteBatchInternal::DeleteRange(&updates_ttl, column_family_id, begin_key, end_key); } void LogData(const Slice& blob) override { updates_ttl.PutLogData(blob); } private: SystemClock* clock_; }; Handler handler(GetEnv()->GetSystemClock().get()); Status st = updates->Iterate(&handler); if (!st.ok()) { return st; } else { return db_->Write(opts, &(handler.updates_ttl)); } } Iterator* DBWithTTLImpl::NewIterator(const ReadOptions& opts, ColumnFamilyHandle* column_family) { Options cfopts = GetOptions(column_family); auto filter = std::static_pointer_cast( cfopts.compaction_filter_factory); int32_t ttl = filter->GetTtl(); bool skip_expired = opts.skip_expired_data; int64_t creation_time; if (opts.snapshot == nullptr) { auto status = cfopts.env->GetSystemClock().get()->GetCurrentTime(&creation_time); if (!status.ok()) { return NewErrorIterator(status); } } else { creation_time = opts.snapshot->GetUnixTime(); } return new TtlIterator(db_->NewIterator(opts, column_family), ttl, skip_expired, creation_time); } void TtlIterator::HandleExpired(bool move_forward) { if (!skip_expired_data_) { return; } while (Valid()) { if ((ttl_timestamp() + ttl_) < creation_time_) { if (move_forward) { iter_->Next(); } else { iter_->Prev(); } } else { return; } } } void DBWithTTLImpl::SetTtl(ColumnFamilyHandle* h, int32_t ttl) { std::shared_ptr filter; Options opts; opts = GetOptions(h); filter = std::static_pointer_cast( opts.compaction_filter_factory); if (!filter) return; filter->SetTtl(ttl); } } // namespace ROCKSDB_NAMESPACE