// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #include "rocksdb/experimental.h" #include #include #include #include #include #include #include "db/db_impl/db_impl.h" #include "db/version_util.h" #include "logging/logging.h" #include "util/atomic.h" namespace ROCKSDB_NAMESPACE { namespace experimental { Status SuggestCompactRange(DB* db, ColumnFamilyHandle* column_family, const Slice* begin, const Slice* end) { if (db == nullptr) { return Status::InvalidArgument("DB is empty"); } return db->SuggestCompactRange(column_family, begin, end); } Status PromoteL0(DB* db, ColumnFamilyHandle* column_family, int target_level) { if (db == nullptr) { return Status::InvalidArgument("Didn't recognize DB object"); } return db->PromoteL0(column_family, target_level); } Status SuggestCompactRange(DB* db, const Slice* begin, const Slice* end) { return SuggestCompactRange(db, db->DefaultColumnFamily(), begin, end); } Status UpdateManifestForFilesState( const DBOptions& db_opts, const std::string& db_name, const std::vector& column_families, const UpdateManifestForFilesStateOptions& opts) { // TODO: plumb Env::IOActivity, Env::IOPriority const ReadOptions read_options; const WriteOptions write_options; OfflineManifestWriter w(db_opts, db_name); Status s = w.Recover(column_families); size_t files_updated = 0; size_t cfs_updated = 0; auto fs = db_opts.env->GetFileSystem(); for (auto cfd : *w.Versions().GetColumnFamilySet()) { if (!s.ok()) { break; } assert(cfd); if (cfd->IsDropped() || !cfd->initialized()) { continue; } const auto* current = cfd->current(); assert(current); const auto* vstorage = current->storage_info(); assert(vstorage); VersionEdit edit; edit.SetColumnFamily(cfd->GetID()); /* SST files */ for (int level = 0; level < cfd->NumberLevels(); level++) { if (!s.ok()) { break; } const auto& level_files = vstorage->LevelFiles(level); for (const auto& lf : level_files) { assert(lf); uint64_t number = lf->fd.GetNumber(); std::string fname = TableFileName(w.IOptions().db_paths, number, lf->fd.GetPathId()); std::unique_ptr f; FileOptions fopts; // Use kUnknown to signal the FileSystem to search all tiers for the // file. fopts.temperature = Temperature::kUnknown; IOStatus file_ios = fs->NewSequentialFile(fname, fopts, &f, /*dbg*/ nullptr); if (file_ios.ok()) { if (opts.update_temperatures) { Temperature temp = f->GetTemperature(); if (temp != Temperature::kUnknown && temp != lf->temperature) { // Current state inconsistent with manifest ++files_updated; edit.DeleteFile(level, number); edit.AddFile( level, number, lf->fd.GetPathId(), lf->fd.GetFileSize(), lf->smallest, lf->largest, lf->fd.smallest_seqno, lf->fd.largest_seqno, lf->marked_for_compaction, temp, lf->oldest_blob_file_number, lf->oldest_ancester_time, lf->file_creation_time, lf->epoch_number, lf->file_checksum, lf->file_checksum_func_name, lf->unique_id, lf->compensated_range_deletion_size, lf->tail_size, lf->user_defined_timestamps_persisted); } } } else { s = file_ios; break; } } } if (s.ok() && edit.NumEntries() > 0) { std::unique_ptr db_dir; s = fs->NewDirectory(db_name, IOOptions(), &db_dir, nullptr); if (s.ok()) { s = w.LogAndApply(read_options, write_options, cfd, &edit, db_dir.get()); } if (s.ok()) { ++cfs_updated; } } } if (cfs_updated > 0) { ROCKS_LOG_INFO(db_opts.info_log, "UpdateManifestForFilesState: updated %zu files in %zu CFs", files_updated, cfs_updated); } else if (s.ok()) { ROCKS_LOG_INFO(db_opts.info_log, "UpdateManifestForFilesState: no updates needed"); } if (!s.ok()) { ROCKS_LOG_ERROR(db_opts.info_log, "UpdateManifestForFilesState failed: %s", s.ToString().c_str()); } return s; } // EXPERIMENTAL new filtering features namespace { void GetFilterInput(FilterInput select, const Slice& key, const KeySegmentsExtractor::Result& extracted, Slice* out_input, Slice* out_leadup) { struct FilterInputGetter { explicit FilterInputGetter(const Slice& _key, const KeySegmentsExtractor::Result& _extracted) : key(_key), extracted(_extracted) {} const Slice& key; const KeySegmentsExtractor::Result& extracted; Slice operator()(SelectKeySegment select) { size_t count = extracted.segment_ends.size(); if (count <= select.segment_index) { return Slice(); } assert(count > 0); size_t start = select.segment_index > 0 ? extracted.segment_ends[select.segment_index - 1] : 0; size_t end = extracted .segment_ends[std::min(size_t{select.segment_index}, count - 1)]; return Slice(key.data() + start, end - start); } Slice operator()(SelectKeySegmentRange select) { assert(select.from_segment_index <= select.to_segment_index); size_t count = extracted.segment_ends.size(); if (count <= select.from_segment_index) { return Slice(); } assert(count > 0); size_t start = select.from_segment_index > 0 ? extracted.segment_ends[select.from_segment_index - 1] : 0; size_t end = extracted.segment_ends[std::min( size_t{select.to_segment_index}, count - 1)]; return Slice(key.data() + start, end - start); } Slice operator()(SelectWholeKey) { return key; } Slice operator()(SelectLegacyKeyPrefix) { // TODO assert(false); return Slice(); } Slice operator()(SelectUserTimestamp) { // TODO assert(false); return Slice(); } Slice operator()(SelectColumnName) { // TODO assert(false); return Slice(); } Slice operator()(SelectValue) { // TODO assert(false); return Slice(); } }; Slice input = std::visit(FilterInputGetter(key, extracted), select); *out_input = input; if (input.empty() || input.data() < key.data() || input.data() > key.data() + key.size()) { *out_leadup = key; } else { *out_leadup = Slice(key.data(), input.data() - key.data()); } } const char* DeserializeFilterInput(const char* p, const char* limit, FilterInput* out) { if (p >= limit) { return nullptr; } uint8_t b = static_cast(*p++); if (b & 0x80) { // Reserved for future use to read more bytes return nullptr; } switch (b >> 4) { case 0: // Various cases that don't have an argument switch (b) { case 0: *out = SelectWholeKey{}; return p; case 1: *out = SelectLegacyKeyPrefix{}; return p; case 2: *out = SelectUserTimestamp{}; return p; case 3: *out = SelectColumnName{}; return p; case 4: *out = SelectValue{}; return p; default: // Reserved for future use return nullptr; } case 1: // First 16 cases of SelectKeySegment *out = SelectKeySegment{BitwiseAnd(b, 0xf)}; return p; case 2: // First 16 cases of SelectKeySegmentRange // that are not a single key segment // 0: 0-1 // 1: 0-2 // 2: 1-2 // 3: 0-3 // 4: 1-3 // 5: 2-3 // 6: 0-4 // 7: 1-4 // 8: 2-4 // 9: 3-4 // 10: 0-5 // 11: 1-5 // 12: 2-5 // 13: 3-5 // 14: 4-5 // 15: 0-6 if (b < 6) { if (b >= 3) { *out = SelectKeySegmentRange{static_cast(b - 3), 3}; } else if (b >= 1) { *out = SelectKeySegmentRange{static_cast(b - 1), 2}; } else { *out = SelectKeySegmentRange{0, 1}; } } else if (b < 10) { *out = SelectKeySegmentRange{static_cast(b - 6), 4}; } else if (b < 15) { *out = SelectKeySegmentRange{static_cast(b - 10), 5}; } else { *out = SelectKeySegmentRange{0, 6}; } return p; default: // Reserved for future use return nullptr; } } void SerializeFilterInput(std::string* out, const FilterInput& select) { struct FilterInputSerializer { std::string* out; void operator()(SelectWholeKey) { out->push_back(0); } void operator()(SelectLegacyKeyPrefix) { out->push_back(1); } void operator()(SelectUserTimestamp) { out->push_back(2); } void operator()(SelectColumnName) { out->push_back(3); } void operator()(SelectValue) { out->push_back(4); } void operator()(SelectKeySegment select) { // TODO: expand supported cases assert(select.segment_index < 16); out->push_back(static_cast((1 << 4) | select.segment_index)); } void operator()(SelectKeySegmentRange select) { auto from = select.from_segment_index; auto to = select.to_segment_index; // TODO: expand supported cases assert(from < 6); assert(to < 6 || (to == 6 && from == 0)); assert(from < to); int start = (to - 1) * to / 2; assert(start + from < 16); out->push_back(static_cast((2 << 4) | (start + from))); } }; std::visit(FilterInputSerializer{out}, select); } size_t GetFilterInputSerializedLength(const FilterInput& /*select*/) { // TODO: expand supported cases return 1; } uint64_t CategorySetToUint(const KeySegmentsExtractor::KeyCategorySet& s) { static_assert(sizeof(KeySegmentsExtractor::KeyCategorySet) == sizeof(uint64_t)); return *reinterpret_cast(&s); } KeySegmentsExtractor::KeyCategorySet UintToCategorySet(uint64_t s) { static_assert(sizeof(KeySegmentsExtractor::KeyCategorySet) == sizeof(uint64_t)); return *reinterpret_cast(&s); } enum BuiltinSstQueryFilters : char { // Wraps a set of filters such that they use a particular // KeySegmentsExtractor and a set of categories covering all keys seen. // TODO: unit test category covering filtering kExtrAndCatFilterWrapper = 0x1, // Wraps a set of filters to limit their scope to a particular set of // categories. (Unlike kExtrAndCatFilterWrapper, // keys in other categories may have been seen so are not filtered here.) // TODO: unit test more subtleties kCategoryScopeFilterWrapper = 0x2, // ... (reserve some values for more wrappers) // A filter representing the bytewise min and max values of a numbered // segment or composite (range of segments). The empty value is tracked // and filtered independently because it might be a special case that is // not representative of the minimum in a spread of values. kBytewiseMinMaxFilter = 0x10, }; class SstQueryFilterBuilder { public: virtual ~SstQueryFilterBuilder() {} virtual void Add(const Slice& key, const KeySegmentsExtractor::Result& extracted, const Slice* prev_key, const KeySegmentsExtractor::Result* prev_extracted) = 0; virtual Status GetStatus() const = 0; virtual size_t GetEncodedLength() const = 0; virtual void Finish(std::string& append_to) = 0; }; class SstQueryFilterConfigImpl : public SstQueryFilterConfig { public: explicit SstQueryFilterConfigImpl( const FilterInput& input, const KeySegmentsExtractor::KeyCategorySet& categories) : input_(input), categories_(categories) {} virtual ~SstQueryFilterConfigImpl() {} virtual std::unique_ptr NewBuilder( bool sanity_checks) const = 0; protected: FilterInput input_; KeySegmentsExtractor::KeyCategorySet categories_; }; class CategoryScopeFilterWrapperBuilder : public SstQueryFilterBuilder { public: explicit CategoryScopeFilterWrapperBuilder( KeySegmentsExtractor::KeyCategorySet categories, std::unique_ptr wrapped) : categories_(categories), wrapped_(std::move(wrapped)) {} void Add(const Slice& key, const KeySegmentsExtractor::Result& extracted, const Slice* prev_key, const KeySegmentsExtractor::Result* prev_extracted) override { if (!categories_.Contains(extracted.category)) { // Category not in scope of the contituent filters return; } wrapped_->Add(key, extracted, prev_key, prev_extracted); } Status GetStatus() const override { return wrapped_->GetStatus(); } size_t GetEncodedLength() const override { size_t wrapped_length = wrapped_->GetEncodedLength(); if (wrapped_length == 0) { // Use empty filter // FIXME: needs unit test return 0; } else { // For now in the code, wraps only 1 filter, but schema supports multiple return 1 + VarintLength(CategorySetToUint(categories_)) + VarintLength(1) + wrapped_length; } } void Finish(std::string& append_to) override { size_t encoded_length = GetEncodedLength(); if (encoded_length == 0) { // Nothing to do return; } size_t old_append_to_size = append_to.size(); append_to.reserve(old_append_to_size + encoded_length); append_to.push_back(kCategoryScopeFilterWrapper); PutVarint64(&append_to, CategorySetToUint(categories_)); // Wrapping just 1 filter for now PutVarint64(&append_to, 1); wrapped_->Finish(append_to); } private: KeySegmentsExtractor::KeyCategorySet categories_; std::unique_ptr wrapped_; }; class BytewiseMinMaxSstQueryFilterConfig : public SstQueryFilterConfigImpl { public: using SstQueryFilterConfigImpl::SstQueryFilterConfigImpl; std::unique_ptr NewBuilder( bool sanity_checks) const override { auto b = std::make_unique(*this, sanity_checks); if (categories_ != KeySegmentsExtractor::KeyCategorySet::All()) { return std::make_unique(categories_, std::move(b)); } else { return b; } } static bool RangeMayMatch( const Slice& filter, const Slice& lower_bound_incl, const KeySegmentsExtractor::Result& lower_bound_extracted, const Slice& upper_bound_excl, const KeySegmentsExtractor::Result& upper_bound_extracted) { assert(!filter.empty() && filter[0] == kBytewiseMinMaxFilter); if (filter.size() <= 4) { // Missing some data return true; } bool empty_included = (filter[1] & kEmptySeenFlag) != 0; const char* p = filter.data() + 2; const char* limit = filter.data() + filter.size(); FilterInput in; p = DeserializeFilterInput(p, limit, &in); if (p == nullptr) { // Corrupt or unsupported return true; } uint32_t smallest_size; p = GetVarint32Ptr(p, limit, &smallest_size); if (p == nullptr || static_cast(limit - p) <= smallest_size) { // Corrupt return true; } Slice smallest = Slice(p, smallest_size); p += smallest_size; size_t largest_size = static_cast(limit - p); Slice largest = Slice(p, largest_size); Slice lower_bound_input, lower_bound_leadup; Slice upper_bound_input, upper_bound_leadup; GetFilterInput(in, lower_bound_incl, lower_bound_extracted, &lower_bound_input, &lower_bound_leadup); GetFilterInput(in, upper_bound_excl, upper_bound_extracted, &upper_bound_input, &upper_bound_leadup); if (lower_bound_leadup.compare(upper_bound_leadup) != 0) { // Unable to filter range when bounds have different lead-up to key // segment return true; } if (empty_included && lower_bound_input.empty()) { // May match on 0-length segment return true; } // TODO: potentially fix upper bound to actually be exclusive, but it's not // as simple as changing >= to > below, because it's upper_bound_excl that's // exclusive, and the upper_bound_input part extracted from it might not be. // May match if both the upper bound and lower bound indicate there could // be overlap return upper_bound_input.compare(smallest) >= 0 && lower_bound_input.compare(largest) <= 0; } protected: struct MyBuilder : public SstQueryFilterBuilder { MyBuilder(const BytewiseMinMaxSstQueryFilterConfig& _parent, bool _sanity_checks) : parent(_parent), sanity_checks(_sanity_checks) {} void Add(const Slice& key, const KeySegmentsExtractor::Result& extracted, const Slice* prev_key, const KeySegmentsExtractor::Result* prev_extracted) override { Slice input, leadup; GetFilterInput(parent.input_, key, extracted, &input, &leadup); if (sanity_checks && prev_key && prev_extracted) { // Opportunistic checking of segment ordering invariant Slice prev_input, prev_leadup; GetFilterInput(parent.input_, *prev_key, *prev_extracted, &prev_input, &prev_leadup); int compare = prev_leadup.compare(leadup); if (compare > 0) { status = Status::Corruption( "Ordering invariant violated from 0x" + prev_key->ToString(/*hex=*/true) + " with prefix 0x" + prev_leadup.ToString(/*hex=*/true) + " to 0x" + key.ToString(/*hex=*/true) + " with prefix 0x" + leadup.ToString(/*hex=*/true)); return; } else if (compare == 0) { // On the same prefix leading up to the segment, the segments must // not be out of order. compare = prev_input.compare(input); if (compare > 0) { status = Status::Corruption( "Ordering invariant violated from 0x" + prev_key->ToString(/*hex=*/true) + " with segment 0x" + prev_input.ToString(/*hex=*/true) + " to 0x" + key.ToString(/*hex=*/true) + " with segment 0x" + input.ToString(/*hex=*/true)); return; } } } // Now actually update state for the filter inputs // TODO: shorten largest and smallest if appropriate if (input.empty()) { empty_seen = true; } else if (largest.empty()) { // Step for first non-empty input smallest = largest = input.ToString(); } else if (input.compare(largest) > 0) { largest = input.ToString(); } else if (input.compare(smallest) < 0) { smallest = input.ToString(); } } Status GetStatus() const override { return status; } size_t GetEncodedLength() const override { if (largest.empty()) { // Not an interesting filter -> 0 to indicate no filter // FIXME: needs unit test return 0; } return 2 + GetFilterInputSerializedLength(parent.input_) + VarintLength(smallest.size()) + smallest.size() + largest.size(); } void Finish(std::string& append_to) override { assert(status.ok()); size_t encoded_length = GetEncodedLength(); if (encoded_length == 0) { // Nothing to do return; } size_t old_append_to_size = append_to.size(); append_to.reserve(old_append_to_size + encoded_length); append_to.push_back(kBytewiseMinMaxFilter); append_to.push_back(empty_seen ? kEmptySeenFlag : 0); SerializeFilterInput(&append_to, parent.input_); PutVarint32(&append_to, static_cast(smallest.size())); append_to.append(smallest); // The end of `largest` is given by the end of the filter append_to.append(largest); assert(append_to.size() == old_append_to_size + encoded_length); } const BytewiseMinMaxSstQueryFilterConfig& parent; const bool sanity_checks; // Smallest and largest segment seen, excluding the empty segment which // is tracked separately std::string smallest; std::string largest; bool empty_seen = false; // Only for sanity checks Status status; }; private: static constexpr char kEmptySeenFlag = 0x1; }; const SstQueryFilterConfigs kEmptyNotFoundSQFC{}; class SstQueryFilterConfigsManagerImpl : public SstQueryFilterConfigsManager { public: using ConfigVersionMap = std::map; Status Populate(const Data& data) { if (data.empty()) { return Status::OK(); } // Populate only once assert(min_ver_ == 0 && max_ver_ == 0); min_ver_ = max_ver_ = data.begin()->first; FilteringVersion prev_ver = 0; bool first_entry = true; for (const auto& ver_info : data) { if (ver_info.first == 0) { return Status::InvalidArgument( "Filtering version 0 is reserved for empty configuration and may " "not be overridden"); } if (first_entry) { min_ver_ = ver_info.first; first_entry = false; } else if (ver_info.first != prev_ver + 1) { return Status::InvalidArgument( "Filtering versions must increase by 1 without repeating: " + std::to_string(prev_ver) + " -> " + std::to_string(ver_info.first)); } max_ver_ = ver_info.first; UnorderedSet names_seen_this_ver; for (const auto& config : ver_info.second) { if (!names_seen_this_ver.insert(config.first).second) { return Status::InvalidArgument( "Duplicate name in filtering version " + std::to_string(ver_info.first) + ": " + config.first); } auto& ver_map = name_map_[config.first]; ver_map[ver_info.first] = config.second; if (config.second.extractor) { extractor_map_[config.second.extractor->GetId()] = config.second.extractor; } } prev_ver = ver_info.first; } return Status::OK(); } struct MyCollector : public TablePropertiesCollector { // Keeps a reference to `configs` which should be kept alive by // SstQueryFilterConfigsManagerImpl, which should be kept alive by // any factories // TODO: sanity_checks option explicit MyCollector(const SstQueryFilterConfigs& configs, const SstQueryFilterConfigsManagerImpl& _parent) : parent(_parent), extractor(configs.extractor.get()), sanity_checks(true) { for (const auto& c : configs.filters) { builders.push_back( static_cast(*c).NewBuilder( sanity_checks)); } } Status AddUserKey(const Slice& key, const Slice& /*value*/, EntryType /*type*/, SequenceNumber /*seq*/, uint64_t /*file_size*/) override { // FIXME later: `key` might contain user timestamp. That should be // exposed properly in a future update to TablePropertiesCollector KeySegmentsExtractor::Result extracted; if (extractor) { extractor->Extract(key, KeySegmentsExtractor::kFullUserKey, &extracted); if (UNLIKELY(extracted.category >= KeySegmentsExtractor::kMinErrorCategory)) { // TODO: proper failure scopes Status s = Status::Corruption( "Extractor returned error category from key 0x" + Slice(key).ToString(/*hex=*/true)); overall_status.UpdateIfOk(s); return s; } assert(extracted.category <= KeySegmentsExtractor::kMaxUsableCategory); bool new_category = categories_seen.Add(extracted.category); if (sanity_checks) { // Opportunistic checking of category ordering invariant if (!first_key) { if (prev_extracted.category != extracted.category && !new_category) { Status s = Status::Corruption( "Category ordering invariant violated from key 0x" + Slice(prev_key).ToString(/*hex=*/true) + " to 0x" + key.ToString(/*hex=*/true)); overall_status.UpdateIfOk(s); return s; } } } } for (const auto& b : builders) { if (first_key) { b->Add(key, extracted, nullptr, nullptr); } else { Slice prev_key_slice = Slice(prev_key); b->Add(key, extracted, &prev_key_slice, &prev_extracted); } } prev_key.assign(key.data(), key.size()); prev_extracted = std::move(extracted); first_key = false; return Status::OK(); } Status Finish(UserCollectedProperties* properties) override { assert(properties != nullptr); if (!overall_status.ok()) { return overall_status; } size_t total_size = 1; autovector> filters_to_finish; // Need to determine number of filters before serializing them. Might // as well determine full length also. for (const auto& b : builders) { Status s = b->GetStatus(); if (s.ok()) { size_t len = b->GetEncodedLength(); if (len > 0) { total_size += VarintLength(len) + len; filters_to_finish.emplace_back(*b, len); } } else { // FIXME: no way to report partial failure without getting // remaining filters thrown out } } total_size += VarintLength(filters_to_finish.size()); if (filters_to_finish.empty()) { // No filters to add return Status::OK(); } // Length of the last filter is omitted total_size -= VarintLength(filters_to_finish.back().second); // Need to determine size of // kExtrAndCatFilterWrapper if used std::string extractor_id; if (extractor) { extractor_id = extractor->GetId(); // identifier byte total_size += 1; // fields of the wrapper total_size += VarintLength(extractor_id.size()) + extractor_id.size() + VarintLength(CategorySetToUint(categories_seen)); // outer layer will have just 1 filter in its count (added here) // and this filter wrapper will have filters_to_finish.size() // (added above). total_size += VarintLength(1); } std::string filters; filters.reserve(total_size); // Leave room for drastic changes in the future. filters.push_back(kSchemaVersion); if (extractor) { // Wrap everything in a kExtrAndCatFilterWrapper // TODO in future: put whole key filters outside of this wrapper. // Also TODO in future: order the filters starting with broadest // applicability. // Just one top-level filter (wrapper). Because it's last, we don't // need to encode its length. PutVarint64(&filters, 1); // The filter(s) wrapper itself filters.push_back(kExtrAndCatFilterWrapper); PutVarint64(&filters, extractor_id.size()); filters += extractor_id; PutVarint64(&filters, CategorySetToUint(categories_seen)); } PutVarint64(&filters, filters_to_finish.size()); for (const auto& e : filters_to_finish) { // Encode filter length, except last filter if (&e != &filters_to_finish.back()) { PutVarint64(&filters, e.second); } // Encode filter e.first.Finish(filters); } if (filters.size() != total_size) { assert(false); return Status::Corruption( "Internal inconsistency building SST query filters"); } (*properties)[kTablePropertyName] = std::move(filters); return Status::OK(); } UserCollectedProperties GetReadableProperties() const override { // TODO? return {}; } const char* Name() const override { // placeholder return "SstQueryFilterConfigsImpl::MyCollector"; } Status overall_status; const SstQueryFilterConfigsManagerImpl& parent; const KeySegmentsExtractor* const extractor; const bool sanity_checks; std::vector> builders; bool first_key = true; std::string prev_key; KeySegmentsExtractor::Result prev_extracted; KeySegmentsExtractor::KeyCategorySet categories_seen; }; struct RangeQueryFilterReader { Slice lower_bound_incl; Slice upper_bound_excl; const KeySegmentsExtractor* extractor; const UnorderedMap>& extractor_map; struct State { KeySegmentsExtractor::Result lb_extracted; KeySegmentsExtractor::Result ub_extracted; }; bool MayMatch_CategoryScopeFilterWrapper(Slice wrapper, State& state) const { assert(!wrapper.empty() && wrapper[0] == kCategoryScopeFilterWrapper); // Regardless of the filter values (which we assume is not all // categories; that should skip the wrapper), we need upper bound and // lower bound to be in the same category to do any range filtering. // (There could be another category in range between the bounds.) if (state.lb_extracted.category != state.ub_extracted.category) { // Can't filter between categories return true; } const char* p = wrapper.data() + 1; const char* limit = wrapper.data() + wrapper.size(); uint64_t cats_raw; p = GetVarint64Ptr(p, limit, &cats_raw); if (p == nullptr) { // Missing categories return true; } KeySegmentsExtractor::KeyCategorySet categories = UintToCategorySet(cats_raw); // Check category against those in scope if (!categories.Contains(state.lb_extracted.category)) { // Can't filter this category return true; } // Process the wrapped filters return MayMatch(Slice(p, limit - p), &state); } bool MayMatch_ExtrAndCatFilterWrapper(Slice wrapper) const { assert(!wrapper.empty() && wrapper[0] == kExtrAndCatFilterWrapper); if (wrapper.size() <= 4) { // Missing some data // (1 byte marker, >= 1 byte name length, >= 1 byte name, >= 1 byte // categories, ...) return true; } const char* p = wrapper.data() + 1; const char* limit = wrapper.data() + wrapper.size(); uint64_t name_len; p = GetVarint64Ptr(p, limit, &name_len); if (p == nullptr || name_len == 0 || static_cast(limit - p) < name_len) { // Missing some data return true; } Slice name(p, name_len); p += name_len; const KeySegmentsExtractor* ex = nullptr; if (extractor && name == Slice(extractor->GetId())) { ex = extractor; } else { auto it = extractor_map.find(name.ToString()); if (it != extractor_map.end()) { ex = it->second.get(); } else { // Extractor mismatch / not found // TODO future: try to get the extractor from the ObjectRegistry return true; } } // TODO future: cache extraction? // Ready to run extractor assert(ex); State state; ex->Extract(lower_bound_incl, KeySegmentsExtractor::kInclusiveLowerBound, &state.lb_extracted); if (UNLIKELY(state.lb_extracted.category >= KeySegmentsExtractor::kMinErrorCategory)) { // TODO? Report problem // No filtering return true; } assert(state.lb_extracted.category <= KeySegmentsExtractor::kMaxUsableCategory); ex->Extract(upper_bound_excl, KeySegmentsExtractor::kExclusiveUpperBound, &state.ub_extracted); if (UNLIKELY(state.ub_extracted.category >= KeySegmentsExtractor::kMinErrorCategory)) { // TODO? Report problem // No filtering return true; } assert(state.ub_extracted.category <= KeySegmentsExtractor::kMaxUsableCategory); uint64_t cats_raw; p = GetVarint64Ptr(p, limit, &cats_raw); if (p == nullptr) { // Missing categories return true; } KeySegmentsExtractor::KeyCategorySet categories = UintToCategorySet(cats_raw); // Can only filter out based on category if upper and lower bound have // the same category. (Each category is contiguous by key order, but we // don't know the order between categories.) if (state.lb_extracted.category == state.ub_extracted.category && !categories.Contains(state.lb_extracted.category)) { // Filtered out return false; } // Process the wrapped filters return MayMatch(Slice(p, limit - p), &state); } bool MayMatch(Slice filters, State* state = nullptr) const { const char* p = filters.data(); const char* limit = p + filters.size(); uint64_t filter_count; p = GetVarint64Ptr(p, limit, &filter_count); if (p == nullptr || filter_count == 0) { // TODO? Report problem // No filtering return true; } for (size_t i = 0; i < filter_count; ++i) { uint64_t filter_len; if (i + 1 == filter_count) { // Last filter filter_len = static_cast(limit - p); } else { p = GetVarint64Ptr(p, limit, &filter_len); if (p == nullptr || filter_len == 0 || static_cast(limit - p) < filter_len) { // TODO? Report problem // No filtering return true; } } Slice filter = Slice(p, filter_len); p += filter_len; bool may_match = true; char type = filter[0]; switch (type) { case kExtrAndCatFilterWrapper: may_match = MayMatch_ExtrAndCatFilterWrapper(filter); break; case kCategoryScopeFilterWrapper: if (state == nullptr) { // TODO? Report problem // No filtering return true; } may_match = MayMatch_CategoryScopeFilterWrapper(filter, *state); break; case kBytewiseMinMaxFilter: if (state == nullptr) { // TODO? Report problem // No filtering return true; } may_match = BytewiseMinMaxSstQueryFilterConfig::RangeMayMatch( filter, lower_bound_incl, state->lb_extracted, upper_bound_excl, state->ub_extracted); break; default: // TODO? Report problem {} // Unknown filter type } if (!may_match) { // Successfully filtered return false; } } // Wasn't filtered return true; } }; struct MyFactory : public Factory { explicit MyFactory( std::shared_ptr _parent, const std::string& _configs_name) : parent(std::move(_parent)), ver_map(parent->GetVerMap(_configs_name)), configs_name(_configs_name) {} TablePropertiesCollector* CreateTablePropertiesCollector( TablePropertiesCollectorFactory::Context /*context*/) override { auto& configs = GetConfigs(); if (configs.IsEmptyNotFound()) { return nullptr; } return new MyCollector(configs, *parent); } const char* Name() const override { // placeholder return "SstQueryFilterConfigsManagerImpl::MyFactory"; } Status SetFilteringVersion(FilteringVersion ver) override { if (ver > 0 && ver < parent->min_ver_) { return Status::InvalidArgument( "Filtering version is before earliest known configuration: " + std::to_string(ver) + " < " + std::to_string(parent->min_ver_)); } if (ver > parent->max_ver_) { return Status::InvalidArgument( "Filtering version is after latest known configuration: " + std::to_string(ver) + " > " + std::to_string(parent->max_ver_)); } version.StoreRelaxed(ver); return Status::OK(); } FilteringVersion GetFilteringVersion() const override { return version.LoadRelaxed(); } const std::string& GetConfigsName() const override { return configs_name; } const SstQueryFilterConfigs& GetConfigs() const override { FilteringVersion ver = version.LoadRelaxed(); if (ver == 0) { // Special case return kEmptyNotFoundSQFC; } assert(ver >= parent->min_ver_); assert(ver <= parent->max_ver_); auto it = ver_map.upper_bound(ver); if (it == ver_map.begin()) { return kEmptyNotFoundSQFC; } else { --it; return it->second; } } // The buffers pointed to by the Slices must live as long as any read // operations using this table filter function. std::function GetTableFilterForRangeQuery( Slice lower_bound_incl, Slice upper_bound_excl) const override { // TODO: cache extractor results between SST files, assuming most will // use the same version return [rqf = RangeQueryFilterReader{ lower_bound_incl, upper_bound_excl, GetConfigs().extractor.get(), parent->extractor_map_}](const TableProperties& props) -> bool { auto it = props.user_collected_properties.find(kTablePropertyName); if (it == props.user_collected_properties.end()) { // No filtering return true; } auto& filters = it->second; // Parse the serialized filters string if (filters.size() < 2 || filters[0] != kSchemaVersion) { // TODO? Report problem // No filtering return true; } return rqf.MayMatch(Slice(filters.data() + 1, filters.size() - 1)); }; } const std::shared_ptr parent; const ConfigVersionMap& ver_map; const std::string configs_name; RelaxedAtomic version; }; Status MakeSharedFactory(const std::string& configs_name, FilteringVersion ver, std::shared_ptr* out) const override { auto obj = std::make_shared( static_cast_with_check( shared_from_this()), configs_name); Status s = obj->SetFilteringVersion(ver); if (s.ok()) { *out = std::move(obj); } return s; } const ConfigVersionMap& GetVerMap(const std::string& configs_name) const { static const ConfigVersionMap kEmptyMap; auto it = name_map_.find(configs_name); if (it == name_map_.end()) { return kEmptyMap; } return it->second; } private: static const std::string kTablePropertyName; static constexpr char kSchemaVersion = 1; private: UnorderedMap name_map_; UnorderedMap> extractor_map_; FilteringVersion min_ver_ = 0; FilteringVersion max_ver_ = 0; }; // SstQueryFilterConfigs const std::string SstQueryFilterConfigsManagerImpl::kTablePropertyName = "rocksdb.sqfc"; } // namespace bool SstQueryFilterConfigs::IsEmptyNotFound() const { return this == &kEmptyNotFoundSQFC; } std::shared_ptr MakeSharedBytewiseMinMaxSQFC( FilterInput input, KeySegmentsExtractor::KeyCategorySet categories) { return std::make_shared(input, categories); } Status SstQueryFilterConfigsManager::MakeShared( const Data& data, std::shared_ptr* out) { auto obj = std::make_shared(); Status s = obj->Populate(data); if (s.ok()) { *out = std::move(obj); } return s; } } // namespace experimental } // namespace ROCKSDB_NAMESPACE