// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). // // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "table/merging_iterator.h" #include "db/arena_wrapped_db_iter.h" namespace ROCKSDB_NAMESPACE { // MergingIterator uses a min/max heap to combine data from point iterators. // Range tombstones can be added and keys covered by range tombstones will be // skipped. // // The following are implementation details and can be ignored by user. // For merging iterator to process range tombstones, it treats the start and end // keys of a range tombstone as two keys and put them into minHeap_ or maxHeap_ // together with regular point keys. Each range tombstone is active only within // its internal key range [start_key, end_key). An `active_` set is used to // track levels that have an active range tombstone. Take forward scanning // for example. Level j is in active_ if its current range tombstone has its // start_key popped from minHeap_ and its end_key in minHeap_. If the top of // minHeap_ is a point key from level L, we can determine if the point key is // covered by any range tombstone by checking if there is an l <= L in active_. // The case of l == L also involves checking range tombstone's sequence number. // // The following (non-exhaustive) list of invariants are maintained by // MergingIterator during forward scanning. After each InternalIterator API, // i.e., Seek*() and Next(), and FindNextVisibleKey(), if minHeap_ is not empty: // (1) minHeap_.top().type == ITERATOR // (2) minHeap_.top()->key() is not covered by any range tombstone. // // After each call to SeekImpl() in addition to the functions mentioned above: // (3) For all level i and j <= i, range_tombstone_iters_[j].prev.end_key() < // children_[i].iter.key(). That is, range_tombstone_iters_[j] is at or before // the first range tombstone from level j with end_key() > // children_[i].iter.key(). // (4) For all level i and j <= i, if j in active_, then // range_tombstone_iters_[j]->start_key() < children_[i].iter.key(). // - When range_tombstone_iters_[j] is !Valid(), we consider its `prev` to be // the last range tombstone from that range tombstone iterator. // - When referring to range tombstone start/end keys, assume it is the value of // HeapItem::tombstone_pik. This value has op_type = kMaxValid, which makes // range tombstone keys have distinct values from point keys. // // Applicable class variables have their own (forward scanning) invariants // listed in the comments above their definition. class MergingIterator : public InternalIterator { public: MergingIterator(const InternalKeyComparator* comparator, InternalIterator** children, int n, bool is_arena_mode, bool prefix_seek_mode, const Slice* iterate_upper_bound = nullptr) : is_arena_mode_(is_arena_mode), prefix_seek_mode_(prefix_seek_mode), direction_(kForward), comparator_(comparator), current_(nullptr), minHeap_(MinHeapItemComparator(comparator_)), pinned_iters_mgr_(nullptr), iterate_upper_bound_(iterate_upper_bound) { children_.resize(n); for (int i = 0; i < n; i++) { children_[i].level = i; children_[i].iter.Set(children[i]); } } void considerStatus(Status s) { if (!s.ok() && status_.ok()) { status_ = s; } } virtual void AddIterator(InternalIterator* iter) { children_.emplace_back(children_.size(), iter); if (pinned_iters_mgr_) { iter->SetPinnedItersMgr(pinned_iters_mgr_); } // Invalidate to ensure `Seek*()` is called to construct the heaps before // use. current_ = nullptr; } // There must be either no range tombstone iterator or the same number of // range tombstone iterators as point iterators after all iters are added. // The i-th added range tombstone iterator and the i-th point iterator // must point to the same LSM level. // Merging iterator takes ownership of `iter` and is responsible for freeing // it. One exception to this is when a LevelIterator moves to a different SST // file or when Iterator::Refresh() is called, the range tombstone iterator // could be updated. In that case, this merging iterator is only responsible // for freeing the new range tombstone iterator that it has pointers to in // range_tombstone_iters_. void AddRangeTombstoneIterator(TruncatedRangeDelIterator* iter) { range_tombstone_iters_.emplace_back(iter); } // Called by MergingIteratorBuilder when all point iterators and range // tombstone iterators are added. Initializes HeapItems for range tombstone // iterators. void Finish() { if (!range_tombstone_iters_.empty()) { assert(range_tombstone_iters_.size() == children_.size()); pinned_heap_item_.resize(range_tombstone_iters_.size()); for (size_t i = 0; i < range_tombstone_iters_.size(); ++i) { pinned_heap_item_[i].level = i; // Range tombstone end key is exclusive. If a point internal key has the // same user key and sequence number as the start or end key of a range // tombstone, the order will be start < end key < internal key with the // following op_type change. This is helpful to ensure keys popped from // heap are in expected order since range tombstone start/end keys will // be distinct from point internal keys. Strictly speaking, this is only // needed for tombstone end points that are truncated in // TruncatedRangeDelIterator since untruncated tombstone end points // always have kMaxSequenceNumber and kTypeRangeDeletion (see // TruncatedRangeDelIterator::start_key()/end_key()). pinned_heap_item_[i].tombstone_pik.type = kTypeMaxValid; } } } ~MergingIterator() override { for (auto child : range_tombstone_iters_) { delete child; } for (auto& child : children_) { child.iter.DeleteIter(is_arena_mode_); } status_.PermitUncheckedError(); } bool Valid() const override { return current_ != nullptr && status_.ok(); } Status status() const override { return status_; } // Add range_tombstone_iters_[level] into min heap. // Updates active_ if the end key of a range tombstone is inserted. // pinned_heap_items_[level].type is updated based on `start_key`. // // If range_tombstone_iters_[level] is after iterate_upper_bound_, // it is removed from the heap. // @param start_key specifies which end point of the range tombstone to add. void InsertRangeTombstoneToMinHeap(size_t level, bool start_key = true, bool replace_top = false) { assert(!range_tombstone_iters_.empty() && range_tombstone_iters_[level]->Valid()); // Maintains Invariant(phi) if (start_key) { pinned_heap_item_[level].type = HeapItem::Type::DELETE_RANGE_START; ParsedInternalKey pik = range_tombstone_iters_[level]->start_key(); // iterate_upper_bound does not have timestamp if (iterate_upper_bound_ && comparator_->user_comparator()->CompareWithoutTimestamp( pik.user_key, true /* a_has_ts */, *iterate_upper_bound_, false /* b_has_ts */) >= 0) { if (replace_top) { // replace_top implies this range tombstone iterator is still in // minHeap_ and at the top. minHeap_.pop(); } return; } pinned_heap_item_[level].SetTombstoneKey(std::move(pik)); // Checks Invariant(active_) assert(active_.count(level) == 0); } else { // allow end key to go over upper bound (if present) since start key is // before upper bound and the range tombstone could still cover a // range before upper bound. // Maintains Invariant(active_) pinned_heap_item_[level].SetTombstoneKey( range_tombstone_iters_[level]->end_key()); pinned_heap_item_[level].type = HeapItem::Type::DELETE_RANGE_END; active_.insert(level); } if (replace_top) { minHeap_.replace_top(&pinned_heap_item_[level]); } else { minHeap_.push(&pinned_heap_item_[level]); } } // Add range_tombstone_iters_[level] into max heap. // Updates active_ if the start key of a range tombstone is inserted. // @param end_key specifies which end point of the range tombstone to add. void InsertRangeTombstoneToMaxHeap(size_t level, bool end_key = true, bool replace_top = false) { assert(!range_tombstone_iters_.empty() && range_tombstone_iters_[level]->Valid()); if (end_key) { pinned_heap_item_[level].SetTombstoneKey( range_tombstone_iters_[level]->end_key()); pinned_heap_item_[level].type = HeapItem::Type::DELETE_RANGE_END; assert(active_.count(level) == 0); } else { pinned_heap_item_[level].SetTombstoneKey( range_tombstone_iters_[level]->start_key()); pinned_heap_item_[level].type = HeapItem::Type::DELETE_RANGE_START; active_.insert(level); } if (replace_top) { maxHeap_->replace_top(&pinned_heap_item_[level]); } else { maxHeap_->push(&pinned_heap_item_[level]); } } // Remove HeapItems from top of minHeap_ that are of type DELETE_RANGE_START // until minHeap_ is empty or the top of the minHeap_ is not of type // DELETE_RANGE_START. Each such item means a range tombstone becomes active, // so `active_` is updated accordingly. void PopDeleteRangeStart() { while (!minHeap_.empty() && minHeap_.top()->type == HeapItem::Type::DELETE_RANGE_START) { TEST_SYNC_POINT_CALLBACK("MergeIterator::PopDeleteRangeStart", nullptr); // Invariant(rti) holds since // range_tombstone_iters_[minHeap_.top()->level] is still valid, and // parameter `replace_top` is set to true here to ensure only one such // HeapItem is in minHeap_. InsertRangeTombstoneToMinHeap( minHeap_.top()->level, false /* start_key */, true /* replace_top */); } } // Remove HeapItems from top of maxHeap_ that are of type DELETE_RANGE_END // until maxHeap_ is empty or the top of the maxHeap_ is not of type // DELETE_RANGE_END. Each such item means a range tombstone becomes active, // so `active_` is updated accordingly. void PopDeleteRangeEnd() { while (!maxHeap_->empty() && maxHeap_->top()->type == HeapItem::Type::DELETE_RANGE_END) { // insert start key of this range tombstone and updates active_ InsertRangeTombstoneToMaxHeap(maxHeap_->top()->level, false /* end_key */, true /* replace_top */); } } void SeekToFirst() override { ClearHeaps(); status_ = Status::OK(); for (auto& child : children_) { child.iter.SeekToFirst(); AddToMinHeapOrCheckStatus(&child); } for (size_t i = 0; i < range_tombstone_iters_.size(); ++i) { if (range_tombstone_iters_[i]) { range_tombstone_iters_[i]->SeekToFirst(); if (range_tombstone_iters_[i]->Valid()) { // It is possible to be invalid due to snapshots. InsertRangeTombstoneToMinHeap(i); } } } FindNextVisibleKey(); direction_ = kForward; current_ = CurrentForward(); } void SeekToLast() override { ClearHeaps(); InitMaxHeap(); status_ = Status::OK(); for (auto& child : children_) { child.iter.SeekToLast(); AddToMaxHeapOrCheckStatus(&child); } for (size_t i = 0; i < range_tombstone_iters_.size(); ++i) { if (range_tombstone_iters_[i]) { range_tombstone_iters_[i]->SeekToLast(); if (range_tombstone_iters_[i]->Valid()) { // It is possible to be invalid due to snapshots. InsertRangeTombstoneToMaxHeap(i); } } } FindPrevVisibleKey(); direction_ = kReverse; current_ = CurrentReverse(); } // Position this merging iterator at the first key >= target (internal key). // If range tombstones are present, keys covered by range tombstones are // skipped, and this merging iter points to the first non-range-deleted key >= // target after Seek(). If !Valid() and status().ok() then this iterator // reaches the end. // // If range tombstones are present, cascading seeks may be called (an // optimization adapted from Pebble https://github.com/cockroachdb/pebble). // Roughly, if there is a range tombstone [start, end) that covers the // target user key at level L, then this range tombstone must cover the range // [target key, end) in all levels > L. So for all levels > L, we can pretend // the target key is `end`. This optimization is applied at each level and // hence the name "cascading seek". void Seek(const Slice& target) override { // Define LevelNextVisible(i, k) to be the first key >= k in level i that is // not covered by any range tombstone. // After SeekImpl(target, 0), invariants (3) and (4) hold. // For all level i, target <= children_[i].iter.key() <= LevelNextVisible(i, // target). By the contract of FindNextVisibleKey(), Invariants (1)-(4) // holds after this call, and minHeap_.top().iter points to the // first key >= target among children_ that is not covered by any range // tombstone. status_ = Status::OK(); SeekImpl(target); FindNextVisibleKey(); direction_ = kForward; { PERF_TIMER_GUARD(seek_min_heap_time); current_ = CurrentForward(); } } void SeekForPrev(const Slice& target) override { assert(range_tombstone_iters_.empty() || range_tombstone_iters_.size() == children_.size()); status_ = Status::OK(); SeekForPrevImpl(target); FindPrevVisibleKey(); direction_ = kReverse; { PERF_TIMER_GUARD(seek_max_heap_time); current_ = CurrentReverse(); } } void Next() override { assert(Valid()); // Ensure that all children are positioned after key(). // If we are moving in the forward direction, it is already // true for all the non-current children since current_ is // the smallest child and key() == current_->key(). if (direction_ != kForward) { // The loop advanced all non-current children to be > key() so current_ // should still be strictly the smallest key. SwitchToForward(); } // For the heap modifications below to be correct, current_ must be the // current top of the heap. assert(current_ == CurrentForward()); // as the current points to the current record. move the iterator forward. current_->Next(); if (current_->Valid()) { // current is still valid after the Next() call above. Call // replace_top() to restore the heap property. When the same child // iterator yields a sequence of keys, this is cheap. assert(current_->status().ok()); minHeap_.replace_top(minHeap_.top()); } else { // current stopped being valid, remove it from the heap. considerStatus(current_->status()); minHeap_.pop(); } // Invariants (3) and (4) hold when after advancing current_. // Let k be the smallest key among children_[i].iter.key(). // k <= children_[i].iter.key() <= LevelNextVisible(i, k) holds for all // level i. After FindNextVisible(), Invariants (1)-(4) hold and // minHeap_.top()->key() is the first key >= k from any children_ that is // not covered by any range tombstone. FindNextVisibleKey(); current_ = CurrentForward(); } bool NextAndGetResult(IterateResult* result) override { Next(); bool is_valid = Valid(); if (is_valid) { result->key = key(); result->bound_check_result = UpperBoundCheckResult(); result->value_prepared = current_->IsValuePrepared(); } return is_valid; } void Prev() override { assert(Valid()); // Ensure that all children are positioned before key(). // If we are moving in the reverse direction, it is already // true for all the non-current children since current_ is // the largest child and key() == current_->key(). if (direction_ != kReverse) { // Otherwise, retreat the non-current children. We retreat current_ // just after the if-block. SwitchToBackward(); } // For the heap modifications below to be correct, current_ must be the // current top of the heap. assert(current_ == CurrentReverse()); current_->Prev(); if (current_->Valid()) { // current is still valid after the Prev() call above. Call // replace_top() to restore the heap property. When the same child // iterator yields a sequence of keys, this is cheap. assert(current_->status().ok()); maxHeap_->replace_top(maxHeap_->top()); } else { // current stopped being valid, remove it from the heap. considerStatus(current_->status()); maxHeap_->pop(); } FindPrevVisibleKey(); current_ = CurrentReverse(); } Slice key() const override { assert(Valid()); return current_->key(); } Slice value() const override { assert(Valid()); return current_->value(); } bool PrepareValue() override { assert(Valid()); if (current_->PrepareValue()) { return true; } considerStatus(current_->status()); assert(!status_.ok()); return false; } // Here we simply relay MayBeOutOfLowerBound/MayBeOutOfUpperBound result // from current child iterator. Potentially as long as one of child iterator // report out of bound is not possible, we know current key is within bound. bool MayBeOutOfLowerBound() override { assert(Valid()); return current_->MayBeOutOfLowerBound(); } IterBoundCheck UpperBoundCheckResult() override { assert(Valid()); return current_->UpperBoundCheckResult(); } void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override { pinned_iters_mgr_ = pinned_iters_mgr; for (auto& child : children_) { child.iter.SetPinnedItersMgr(pinned_iters_mgr); } } bool IsKeyPinned() const override { assert(Valid()); return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && current_->IsKeyPinned(); } bool IsValuePinned() const override { assert(Valid()); return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && current_->IsValuePinned(); } private: // Represents an element in the min/max heap. Each HeapItem corresponds to a // point iterator or a range tombstone iterator, differentiated by // HeapItem::type. struct HeapItem { HeapItem() = default; // corresponding point iterator IteratorWrapper iter; size_t level = 0; // corresponding range tombstone iterator's start or end key value // depending on value of `type`. ParsedInternalKey tombstone_pik; // Will be overwritten before use, initialize here so compiler does not // complain. enum class Type { ITERATOR, DELETE_RANGE_START, DELETE_RANGE_END }; Type type = Type::ITERATOR; explicit HeapItem(size_t _level, InternalIteratorBase* _iter) : level(_level), type(Type::ITERATOR) { iter.Set(_iter); } void SetTombstoneKey(ParsedInternalKey&& pik) { // op_type is already initialized in MergingIterator::Finish(). tombstone_pik.user_key = pik.user_key; tombstone_pik.sequence = pik.sequence; } }; class MinHeapItemComparator { public: explicit MinHeapItemComparator(const InternalKeyComparator* comparator) : comparator_(comparator) {} bool operator()(HeapItem* a, HeapItem* b) const { if (LIKELY(a->type == HeapItem::Type::ITERATOR)) { if (LIKELY(b->type == HeapItem::Type::ITERATOR)) { return comparator_->Compare(a->iter.key(), b->iter.key()) > 0; } else { return comparator_->Compare(a->iter.key(), b->tombstone_pik) > 0; } } else { if (LIKELY(b->type == HeapItem::Type::ITERATOR)) { return comparator_->Compare(a->tombstone_pik, b->iter.key()) > 0; } else { return comparator_->Compare(a->tombstone_pik, b->tombstone_pik) > 0; } } } private: const InternalKeyComparator* comparator_; }; class MaxHeapItemComparator { public: explicit MaxHeapItemComparator(const InternalKeyComparator* comparator) : comparator_(comparator) {} bool operator()(HeapItem* a, HeapItem* b) const { if (LIKELY(a->type == HeapItem::Type::ITERATOR)) { if (LIKELY(b->type == HeapItem::Type::ITERATOR)) { return comparator_->Compare(a->iter.key(), b->iter.key()) < 0; } else { return comparator_->Compare(a->iter.key(), b->tombstone_pik) < 0; } } else { if (LIKELY(b->type == HeapItem::Type::ITERATOR)) { return comparator_->Compare(a->tombstone_pik, b->iter.key()) < 0; } else { return comparator_->Compare(a->tombstone_pik, b->tombstone_pik) < 0; } } } private: const InternalKeyComparator* comparator_; }; using MergerMinIterHeap = BinaryHeap; using MergerMaxIterHeap = BinaryHeap; friend class MergeIteratorBuilder; // Clears heaps for both directions, used when changing direction or seeking void ClearHeaps(bool clear_active = true); // Ensures that maxHeap_ is initialized when starting to go in the reverse // direction void InitMaxHeap(); // Advance this merging iterator until the current key (minHeap_.top()) is // from a point iterator and is not covered by any range tombstone, // or that there is no more keys (heap is empty). SeekImpl() may be called // to seek to the end of a range tombstone as an optimization. void FindNextVisibleKey(); void FindPrevVisibleKey(); // Advance this merging iterators to the first key >= `target` for all // components from levels >= starting_level. All iterators before // starting_level are untouched. // // @param range_tombstone_reseek Whether target is some range tombstone // end, i.e., whether this SeekImpl() call is a part of a "cascading seek". // This is used only for recoding relevant perf_context. void SeekImpl(const Slice& target, size_t starting_level = 0, bool range_tombstone_reseek = false); // Seek to fist key <= target key (internal key) for // children_[starting_level:]. void SeekForPrevImpl(const Slice& target, size_t starting_level = 0, bool range_tombstone_reseek = false); bool is_arena_mode_; bool prefix_seek_mode_; // Which direction is the iterator moving? enum Direction : uint8_t { kForward, kReverse }; Direction direction_; const InternalKeyComparator* comparator_; // HeapItem for all child point iterators. // Invariant(children_): children_[i] is in minHeap_ iff // children_[i].iter.Valid(), and at most one children_[i] is in minHeap_. // TODO: We could use an autovector with a larger reserved size. std::vector children_; // HeapItem for range tombstone start and end keys. // pinned_heap_item_[i] corresponds to range_tombstone_iters_[i]. // Invariant(phi): If range_tombstone_iters_[i]->Valid(), // pinned_heap_item_[i].tombstone_pik is equal to // range_tombstone_iters_[i]->start_key() when // pinned_heap_item_[i].type is DELETE_RANGE_START and // range_tombstone_iters_[i]->end_key() when // pinned_heap_item_[i].type is DELETE_RANGE_END (ignoring op_type which is // kMaxValid for all pinned_heap_item_.tombstone_pik). // pinned_heap_item_[i].type is either DELETE_RANGE_START or DELETE_RANGE_END. std::vector pinned_heap_item_; // range_tombstone_iters_[i] contains range tombstones in the sorted run that // corresponds to children_[i]. range_tombstone_iters_.empty() means not // handling range tombstones in merging iterator. range_tombstone_iters_[i] == // nullptr means the sorted run of children_[i] does not have range // tombstones. // Invariant(rti): pinned_heap_item_[i] is in minHeap_ iff // range_tombstone_iters_[i]->Valid() and at most one pinned_heap_item_[i] is // in minHeap_. std::vector range_tombstone_iters_; // Levels (indices into range_tombstone_iters_/children_ ) that currently have // "active" range tombstones. See comments above MergingIterator for meaning // of "active". // Invariant(active_): i is in active_ iff range_tombstone_iters_[i]->Valid() // and pinned_heap_item_[i].type == DELETE_RANGE_END. std::set active_; bool SkipNextDeleted(); bool SkipPrevDeleted(); // Invariant: at the end of each InternalIterator API, // current_ points to minHeap_.top().iter (maxHeap_ if backward scanning) // or nullptr if no child iterator is valid. // This follows from that current_ = CurrentForward()/CurrentReverse() is // called at the end of each InternalIterator API. IteratorWrapper* current_; // If any of the children have non-ok status, this is one of them. Status status_; // Invariant: min heap property is maintained (parent is always <= child). // This holds by using only BinaryHeap APIs to modify heap. One // exception is to modify heap top item directly (by caller iter->Next()), and // it should be followed by a call to replace_top() or pop(). MergerMinIterHeap minHeap_; // Max heap is used for reverse iteration, which is way less common than // forward. Lazily initialize it to save memory. std::unique_ptr maxHeap_; PinnedIteratorsManager* pinned_iters_mgr_; // Used to bound range tombstones. For point keys, DBIter and SSTable iterator // take care of boundary checking. const Slice* iterate_upper_bound_; // In forward direction, process a child that is not in the min heap. // If valid, add to the min heap. Otherwise, check status. void AddToMinHeapOrCheckStatus(HeapItem*); // In backward direction, process a child that is not in the max heap. // If valid, add to the min heap. Otherwise, check status. void AddToMaxHeapOrCheckStatus(HeapItem*); void SwitchToForward(); // Switch the direction from forward to backward without changing the // position. Iterator should still be valid. void SwitchToBackward(); IteratorWrapper* CurrentForward() const { assert(direction_ == kForward); assert(minHeap_.empty() || minHeap_.top()->type == HeapItem::Type::ITERATOR); return !minHeap_.empty() ? &minHeap_.top()->iter : nullptr; } IteratorWrapper* CurrentReverse() const { assert(direction_ == kReverse); assert(maxHeap_); assert(maxHeap_->empty() || maxHeap_->top()->type == HeapItem::Type::ITERATOR); return !maxHeap_->empty() ? &maxHeap_->top()->iter : nullptr; } }; // Pre-condition: // - Invariants (3) and (4) hold for i < starting_level // - For i < starting_level, range_tombstone_iters_[i].prev.end_key() < // `target`. // - For i < starting_level, if i in active_, then // range_tombstone_iters_[i]->start_key() < `target`. // // Post-condition: // - Invariants (3) and (4) hold for all level i. // - (*) target <= children_[i].iter.key() <= LevelNextVisible(i, target) // for i >= starting_level // - (**) target < pinned_heap_item_[i].tombstone_pik if // range_tombstone_iters_[i].Valid() for i >= starting_level // // Proof sketch: // Invariant (3) holds for all level i. // For j <= i < starting_level, it follows from Pre-condition that (3) holds // and that SeekImpl(-, starting_level) does not update children_[i] or // range_tombstone_iters_[j]. // For j < starting_level and i >= starting_level, it follows from // - Pre-condition that range_tombstone_iters_[j].prev.end_key() < `target` // - range_tombstone_iters_[j] is not updated in SeekImpl(), and // - children_[i].iter.Seek(current_search_key) is called with // current_search_key >= target (shown below). // When current_search_key is updated, it is updated to some // range_tombstone_iter->end_key() after // range_tombstone_iter->SeekInternalKey(current_search_key) was called. So // current_search_key increases if updated and >= target. // For starting_level <= j <= i: // children_[i].iter.Seek(k1) and range_tombstone_iters_[j]->SeekInternalKey(k2) // are called in SeekImpl(). Seek(k1) positions children_[i] at the first key >= // k1 from level i. SeekInternalKey(k2) positions range_tombstone_iters_[j] at // the first range tombstone from level j with end_key() > k2. It suffices to // show that k1 >= k2. Since k1 and k2 are values of current_search_key where // k1 = k2 or k1 is value of a later current_search_key than k2, so k1 >= k2. // // Invariant (4) holds for all level >= 0. // By Pre-condition Invariant (4) holds for i < starting_level. // Since children_[i], range_tombstone_iters_[i] and contents of active_ for // i < starting_level do not change (4) holds for j <= i < starting_level. // By Pre-condition: for all j < starting_level, if j in active_, then // range_tombstone_iters_[j]->start_key() < target. For i >= starting_level, // children_[i].iter.Seek(k) is called for k >= target. So // children_[i].iter.key() >= target > range_tombstone_iters_[j]->start_key() // for j < starting_level and i >= starting_level. So invariant (4) holds for // j < starting_level and i >= starting_level. // For starting_level <= j <= i, j is added to active_ only if // - range_tombstone_iters_[j]->SeekInternalKey(k1) was called // - range_tombstone_iters_[j]->start_key() <= k1 // Since children_[i].iter.Seek(k2) is called for some k2 >= k1 and for all // starting_level <= j <= i, (4) also holds for all starting_level <= j <= i. // // Post-condition (*): target <= children_[i].iter.key() <= LevelNextVisible(i, // target) for i >= starting_level. // target <= children_[i].iter.key() follows from that Seek() is called on some // current_search_key >= target for children_[i].iter. If current_search_key // is updated from k1 to k2 when level = i, we show that the range [k1, k2) is // not visible for children_[j] for any j > i. When current_search_key is // updated from k1 to k2, // - range_tombstone_iters_[i]->SeekInternalKey(k1) was called // - range_tombstone_iters_[i]->Valid() // - range_tombstone_iters_[i]->start_key().user_key <= k1.user_key // - k2 = range_tombstone_iters_[i]->end_key() // We assume that range_tombstone_iters_[i]->start_key() has a higher sequence // number compared to any key from levels > i that has the same user key. So no // point key from levels > i in range [k1, k2) is visible. So // children_[i].iter.key() <= LevelNextVisible(i, target). // // Post-condition (**) target < pinned_heap_item_[i].tombstone_pik for i >= // starting_level if range_tombstone_iters_[i].Valid(). This follows from that // SeekInternalKey() being called for each range_tombstone_iters_ with some key // >= `target` and that we pick start/end key that is > `target` to insert to // minHeap_. void MergingIterator::SeekImpl(const Slice& target, size_t starting_level, bool range_tombstone_reseek) { // active range tombstones before `starting_level` remain active ClearHeaps(false /* clear_active */); ParsedInternalKey pik; if (!range_tombstone_iters_.empty()) { // pik is only used in InsertRangeTombstoneToMinHeap(). ParseInternalKey(target, &pik, false).PermitUncheckedError(); } // TODO: perhaps we could save some upheap cost by add all child iters first // and then do a single heapify. // Invariant(children_) for level < starting_level for (size_t level = 0; level < starting_level; ++level) { PERF_TIMER_GUARD(seek_min_heap_time); AddToMinHeapOrCheckStatus(&children_[level]); } if (!range_tombstone_iters_.empty()) { // Add range tombstones from levels < starting_level. We can insert from // pinned_heap_item_ for the following reasons: // - pinned_heap_item_[level] is in minHeap_ iff // range_tombstone_iters[level]->Valid(). // - If `level` is in active_, then range_tombstone_iters_[level]->Valid() // and pinned_heap_item_[level] is of type RANGE_DELETION_END. for (size_t level = 0; level < starting_level; ++level) { // Restores Invariants(rti), (phi) and (active_) for level < // starting_level if (range_tombstone_iters_[level] && range_tombstone_iters_[level]->Valid()) { // use an iterator on active_ if performance becomes an issue here if (active_.count(level) > 0) { assert(pinned_heap_item_[level].type == HeapItem::Type::DELETE_RANGE_END); // if it was active, then start key must be within upper_bound, // so we can add to minHeap_ directly. minHeap_.push(&pinned_heap_item_[level]); } else { assert(pinned_heap_item_[level].type == HeapItem::Type::DELETE_RANGE_START); // this takes care of checking iterate_upper_bound, but with an extra // key comparison if range_tombstone_iters_[level] was already out of // bound. Consider using a new HeapItem type or some flag to remember // boundary checking result. InsertRangeTombstoneToMinHeap(level); } } else { assert(!active_.count(level)); } } // levels >= starting_level will be reseeked below, so clearing their active // state here. active_.erase(active_.lower_bound(starting_level), active_.end()); } IterKey current_search_key; current_search_key.SetInternalKey(target, false /* copy */); // Seek target might change to some range tombstone end key, so // we need to remember them for async requests. // (level, target) pairs autovector> prefetched_target; for (auto level = starting_level; level < children_.size(); ++level) { { PERF_TIMER_GUARD(seek_child_seek_time); children_[level].iter.Seek(current_search_key.GetInternalKey()); } PERF_COUNTER_ADD(seek_child_seek_count, 1); if (!range_tombstone_iters_.empty()) { if (range_tombstone_reseek) { // This seek is to some range tombstone end key. // Should only happen when there are range tombstones. PERF_COUNTER_ADD(internal_range_del_reseek_count, 1); } if (children_[level].iter.status().IsTryAgain()) { prefetched_target.emplace_back( level, current_search_key.GetInternalKey().ToString()); } auto range_tombstone_iter = range_tombstone_iters_[level]; if (range_tombstone_iter) { range_tombstone_iter->SeekInternalKey( current_search_key.GetInternalKey()); // Invariants (rti) and (phi) if (range_tombstone_iter->Valid()) { // If range tombstone starts after `current_search_key`, // we should insert start key to heap as the range tombstone is not // active yet. InsertRangeTombstoneToMinHeap( level, comparator_->Compare(range_tombstone_iter->start_key(), pik) > 0 /* start_key */); // current_search_key < end_key guaranteed by the SeekInternalKey() // and Valid() calls above. Here we only need to compare user_key // since if target.user_key == // range_tombstone_iter->start_key().user_key and target < // range_tombstone_iter->start_key(), no older level would have any // key in range [target, range_tombstone_iter->start_key()], so no // keys in range [target, range_tombstone_iter->end_key()) from older // level would be visible. So it is safe to seek to // range_tombstone_iter->end_key(). // // TODO: range_tombstone_iter->Seek() finds the max covering // sequence number, can make it cheaper by not looking for max. if (comparator_->user_comparator()->Compare( range_tombstone_iter->start_key().user_key, current_search_key.GetUserKey()) <= 0) { range_tombstone_reseek = true; // Note that for prefix seek case, it is possible that the prefix // is not the same as the original target, it should not affect // correctness. Besides, in most cases, range tombstone start and // end key should have the same prefix? current_search_key.SetInternalKey(range_tombstone_iter->end_key()); } } } } // child.iter.status() is set to Status::TryAgain indicating asynchronous // request for retrieval of data blocks has been submitted. So it should // return at this point and Seek should be called again to retrieve the // requested block and add the child to min heap. if (children_[level].iter.status().IsTryAgain()) { continue; } { // Strictly, we timed slightly more than min heap operation, // but these operations are very cheap. PERF_TIMER_GUARD(seek_min_heap_time); AddToMinHeapOrCheckStatus(&children_[level]); } } if (range_tombstone_iters_.empty()) { for (auto& child : children_) { if (child.iter.status().IsTryAgain()) { child.iter.Seek(target); { PERF_TIMER_GUARD(seek_min_heap_time); AddToMinHeapOrCheckStatus(&child); } PERF_COUNTER_ADD(number_async_seek, 1); } } } else { for (auto& prefetch : prefetched_target) { // (level, target) pairs children_[prefetch.first].iter.Seek(prefetch.second); { PERF_TIMER_GUARD(seek_min_heap_time); AddToMinHeapOrCheckStatus(&children_[prefetch.first]); } PERF_COUNTER_ADD(number_async_seek, 1); } } } // Returns true iff the current key (min heap top) should not be returned // to user (of the merging iterator). This can be because the current key // is deleted by some range tombstone, the current key is some fake file // boundary sentinel key, or the current key is an end point of a range // tombstone. Advance the iterator at heap top if needed. Heap order is restored // and `active_` is updated accordingly. // See FindNextVisibleKey() for more detail on internal implementation // of advancing child iters. // When false is returned, if minHeap is not empty, then minHeap_.top().type // == ITERATOR // // REQUIRES: // - min heap is currently not empty, and iter is in kForward direction. // - minHeap_ top is not DELETE_RANGE_START (so that `active_` is current). bool MergingIterator::SkipNextDeleted() { // 3 types of keys: // - point key // - file boundary sentinel keys // - range deletion end key auto current = minHeap_.top(); if (current->type == HeapItem::Type::DELETE_RANGE_END) { // Invariant(active_): range_tombstone_iters_[current->level] is about to // become !Valid() or that its start key is going to be added to minHeap_. active_.erase(current->level); assert(range_tombstone_iters_[current->level] && range_tombstone_iters_[current->level]->Valid()); range_tombstone_iters_[current->level]->Next(); // Maintain Invariants (rti) and (phi) if (range_tombstone_iters_[current->level]->Valid()) { InsertRangeTombstoneToMinHeap(current->level, true /* start_key */, true /* replace_top */); } else { // TruncatedRangeDelIterator does not have status minHeap_.pop(); } return true /* current key deleted */; } if (current->iter.IsDeleteRangeSentinelKey()) { // If the file boundary is defined by a range deletion, the range // tombstone's end key must come before this sentinel key (see op_type in // SetTombstoneKey()). assert(ExtractValueType(current->iter.key()) != kTypeRangeDeletion || active_.count(current->level) == 0); // When entering a new file, range tombstone iter from the old file is // freed, but the last key from that range tombstone iter may still be in // the heap. We need to ensure the data underlying its corresponding key // Slice is still alive. We do so by popping the range tombstone key from // heap before calling iter->Next(). Technically, this change is not needed: // if there is a range tombstone end key that is after file boundary // sentinel key in minHeap_, the range tombstone end key must have been // truncated at file boundary. The underlying data of the range tombstone // end key Slice is the SST file's largest internal key stored as file // metadata in Version. However, since there are too many implicit // assumptions made, it is safer to just ensure range tombstone iter is // still alive. minHeap_.pop(); // Remove last SST file's range tombstone end key if there is one. // This means file boundary is before range tombstone end key, // which could happen when a range tombstone and a user key // straddle two SST files. Note that in TruncatedRangeDelIterator // constructor, parsed_largest.sequence is decremented 1 in this case. // Maintains Invariant(rti) that at most one // pinned_heap_item_[current->level] is in minHeap_. if (range_tombstone_iters_[current->level] && range_tombstone_iters_[current->level]->Valid()) { if (!minHeap_.empty() && minHeap_.top()->level == current->level) { assert(minHeap_.top()->type == HeapItem::Type::DELETE_RANGE_END); minHeap_.pop(); // Invariant(active_): we are about to enter a new SST file with new // range_tombstone_iters[current->level]. Either it is !Valid() or its // start key is going to be added to minHeap_. active_.erase(current->level); } else { // range tombstone is still valid, but it is not on heap. // This should only happen if the range tombstone is over iterator // upper bound. assert(iterate_upper_bound_ && comparator_->user_comparator()->CompareWithoutTimestamp( range_tombstone_iters_[current->level]->start_key().user_key, true /* a_has_ts */, *iterate_upper_bound_, false /* b_has_ts */) >= 0); } } // LevelIterator enters a new SST file current->iter.Next(); // Invariant(children_): current is popped from heap and added back only if // it is valid if (current->iter.Valid()) { assert(current->iter.status().ok()); minHeap_.push(current); } else { // TODO(cbi): check status and early return if non-ok. considerStatus(current->iter.status()); } // Invariants (rti) and (phi) if (range_tombstone_iters_[current->level] && range_tombstone_iters_[current->level]->Valid()) { InsertRangeTombstoneToMinHeap(current->level); } return true /* current key deleted */; } assert(current->type == HeapItem::Type::ITERATOR); // Point key case: check active_ for range tombstone coverage. ParsedInternalKey pik; ParseInternalKey(current->iter.key(), &pik, false).PermitUncheckedError(); if (!active_.empty()) { auto i = *active_.begin(); if (i < current->level) { // range tombstone is from a newer level, definitely covers assert(comparator_->Compare(range_tombstone_iters_[i]->start_key(), pik) <= 0); assert(comparator_->Compare(pik, range_tombstone_iters_[i]->end_key()) < 0); std::string target; AppendInternalKey(&target, range_tombstone_iters_[i]->end_key()); SeekImpl(target, current->level, true); return true /* current key deleted */; } else if (i == current->level) { // range tombstone is from the same level as current, check sequence // number. By `active_` we know current key is between start key and end // key. assert(comparator_->Compare(range_tombstone_iters_[i]->start_key(), pik) <= 0); assert(comparator_->Compare(pik, range_tombstone_iters_[i]->end_key()) < 0); if (pik.sequence < range_tombstone_iters_[current->level]->seq()) { // covered by range tombstone current->iter.Next(); // Invariant (children_) if (current->iter.Valid()) { minHeap_.replace_top(current); } else { considerStatus(current->iter.status()); minHeap_.pop(); } return true /* current key deleted */; } else { return false /* current key not deleted */; } } else { return false /* current key not deleted */; // range tombstone from an older sorted run with current key < end key. // current key is not deleted and the older sorted run will have its range // tombstone updated when the range tombstone's end key are popped from // minHeap_. } } // we can reach here only if active_ is empty assert(active_.empty()); assert(minHeap_.top()->type == HeapItem::Type::ITERATOR); return false /* current key not deleted */; } void MergingIterator::SeekForPrevImpl(const Slice& target, size_t starting_level, bool range_tombstone_reseek) { // active range tombstones before `starting_level` remain active ClearHeaps(false /* clear_active */); InitMaxHeap(); ParsedInternalKey pik; if (!range_tombstone_iters_.empty()) { ParseInternalKey(target, &pik, false).PermitUncheckedError(); } for (size_t level = 0; level < starting_level; ++level) { PERF_TIMER_GUARD(seek_max_heap_time); AddToMaxHeapOrCheckStatus(&children_[level]); } if (!range_tombstone_iters_.empty()) { // Add range tombstones before starting_level. for (size_t level = 0; level < starting_level; ++level) { if (range_tombstone_iters_[level] && range_tombstone_iters_[level]->Valid()) { assert(static_cast(active_.count(level)) == (pinned_heap_item_[level].type == HeapItem::Type::DELETE_RANGE_START)); maxHeap_->push(&pinned_heap_item_[level]); } else { assert(!active_.count(level)); } } // levels >= starting_level will be reseeked below, active_.erase(active_.lower_bound(starting_level), active_.end()); } IterKey current_search_key; current_search_key.SetInternalKey(target, false /* copy */); // Seek target might change to some range tombstone end key, so // we need to remember them for async requests. // (level, target) pairs autovector> prefetched_target; for (auto level = starting_level; level < children_.size(); ++level) { { PERF_TIMER_GUARD(seek_child_seek_time); children_[level].iter.SeekForPrev(current_search_key.GetInternalKey()); } PERF_COUNTER_ADD(seek_child_seek_count, 1); if (!range_tombstone_iters_.empty()) { if (range_tombstone_reseek) { // This seek is to some range tombstone end key. // Should only happen when there are range tombstones. PERF_COUNTER_ADD(internal_range_del_reseek_count, 1); } if (children_[level].iter.status().IsTryAgain()) { prefetched_target.emplace_back( level, current_search_key.GetInternalKey().ToString()); } auto range_tombstone_iter = range_tombstone_iters_[level]; if (range_tombstone_iter) { range_tombstone_iter->SeekForPrev(current_search_key.GetUserKey()); if (range_tombstone_iter->Valid()) { InsertRangeTombstoneToMaxHeap( level, comparator_->Compare(range_tombstone_iter->end_key(), pik) <= 0 /* end_key */); // start key <= current_search_key guaranteed by the Seek() call above // Only interested in user key coverage since older sorted runs must // have smaller sequence numbers than this tombstone. if (comparator_->user_comparator()->Compare( current_search_key.GetUserKey(), range_tombstone_iter->end_key().user_key) < 0) { range_tombstone_reseek = true; current_search_key.SetInternalKey( range_tombstone_iter->start_key().user_key, kMaxSequenceNumber, kValueTypeForSeekForPrev); } } } } // child.iter.status() is set to Status::TryAgain indicating asynchronous // request for retrieval of data blocks has been submitted. So it should // return at this point and Seek should be called again to retrieve the // requested block and add the child to min heap. if (children_[level].iter.status().IsTryAgain()) { continue; } { // Strictly, we timed slightly more than min heap operation, // but these operations are very cheap. PERF_TIMER_GUARD(seek_max_heap_time); AddToMaxHeapOrCheckStatus(&children_[level]); } } if (range_tombstone_iters_.empty()) { for (auto& child : children_) { if (child.iter.status().IsTryAgain()) { child.iter.SeekForPrev(target); { PERF_TIMER_GUARD(seek_min_heap_time); AddToMaxHeapOrCheckStatus(&child); } PERF_COUNTER_ADD(number_async_seek, 1); } } } else { for (auto& prefetch : prefetched_target) { // (level, target) pairs children_[prefetch.first].iter.SeekForPrev(prefetch.second); { PERF_TIMER_GUARD(seek_max_heap_time); AddToMaxHeapOrCheckStatus(&children_[prefetch.first]); } PERF_COUNTER_ADD(number_async_seek, 1); } } } // See more in comments above SkipNextDeleted(). // REQUIRES: // - max heap is currently not empty, and iter is in kReverse direction. // - maxHeap_ top is not DELETE_RANGE_END (so that `active_` is current). bool MergingIterator::SkipPrevDeleted() { // 3 types of keys: // - point key // - file boundary sentinel keys // - range deletion start key auto current = maxHeap_->top(); if (current->type == HeapItem::Type::DELETE_RANGE_START) { active_.erase(current->level); assert(range_tombstone_iters_[current->level] && range_tombstone_iters_[current->level]->Valid()); range_tombstone_iters_[current->level]->Prev(); if (range_tombstone_iters_[current->level]->Valid()) { InsertRangeTombstoneToMaxHeap(current->level, true /* end_key */, true /* replace_top */); } else { maxHeap_->pop(); } return true /* current key deleted */; } if (current->iter.IsDeleteRangeSentinelKey()) { // LevelIterator enters a new SST file maxHeap_->pop(); // Remove last SST file's range tombstone key if there is one. if (!maxHeap_->empty() && maxHeap_->top()->level == current->level && maxHeap_->top()->type == HeapItem::Type::DELETE_RANGE_START) { maxHeap_->pop(); active_.erase(current->level); } current->iter.Prev(); if (current->iter.Valid()) { assert(current->iter.status().ok()); maxHeap_->push(current); } else { considerStatus(current->iter.status()); } if (range_tombstone_iters_[current->level] && range_tombstone_iters_[current->level]->Valid()) { InsertRangeTombstoneToMaxHeap(current->level); } return true /* current key deleted */; } assert(current->type == HeapItem::Type::ITERATOR); // Point key case: check active_ for range tombstone coverage. ParsedInternalKey pik; ParseInternalKey(current->iter.key(), &pik, false).PermitUncheckedError(); if (!active_.empty()) { auto i = *active_.begin(); if (i < current->level) { // range tombstone is from a newer level, definitely covers assert(comparator_->Compare(range_tombstone_iters_[i]->start_key(), pik) <= 0); assert(comparator_->Compare(pik, range_tombstone_iters_[i]->end_key()) < 0); std::string target; AppendInternalKey(&target, range_tombstone_iters_[i]->start_key()); // This is different from SkipNextDeleted() which does reseek at sorted // runs >= level (instead of i+1 here). With min heap, if level L is at // top of the heap, then levels level L's // current internal key, which means levels level) { // By `active_` we know current key is between start key and end key. assert(comparator_->Compare(range_tombstone_iters_[i]->start_key(), pik) <= 0); assert(comparator_->Compare(pik, range_tombstone_iters_[i]->end_key()) < 0); if (pik.sequence < range_tombstone_iters_[current->level]->seq()) { current->iter.Prev(); if (current->iter.Valid()) { maxHeap_->replace_top(current); } else { considerStatus(current->iter.status()); maxHeap_->pop(); } return true /* current key deleted */; } else { return false /* current key not deleted */; } } else { return false /* current key not deleted */; } } assert(active_.empty()); assert(maxHeap_->top()->type == HeapItem::Type::ITERATOR); return false /* current key not deleted */; } void MergingIterator::AddToMinHeapOrCheckStatus(HeapItem* child) { // Invariant(children_) if (child->iter.Valid()) { assert(child->iter.status().ok()); minHeap_.push(child); } else { considerStatus(child->iter.status()); } } void MergingIterator::AddToMaxHeapOrCheckStatus(HeapItem* child) { if (child->iter.Valid()) { assert(child->iter.status().ok()); maxHeap_->push(child); } else { considerStatus(child->iter.status()); } } // Advance all non current_ child to > current_.key(). // We advance current_ after the this function call as it does not require // Seek(). // Advance all range tombstones iters, including the one corresponding to // current_, to the first tombstone with end_key > current_.key(). // TODO: potentially do cascading seek here too // TODO: show that invariants hold void MergingIterator::SwitchToForward() { ClearHeaps(); Slice target = key(); for (auto& child : children_) { if (&child.iter != current_) { child.iter.Seek(target); // child.iter.status() is set to Status::TryAgain indicating asynchronous // request for retrieval of data blocks has been submitted. So it should // return at this point and Seek should be called again to retrieve the // requested block and add the child to min heap. if (child.iter.status() == Status::TryAgain()) { continue; } if (child.iter.Valid() && comparator_->Equal(target, child.iter.key())) { assert(child.iter.status().ok()); child.iter.Next(); } } AddToMinHeapOrCheckStatus(&child); } for (auto& child : children_) { if (child.iter.status() == Status::TryAgain()) { child.iter.Seek(target); if (child.iter.Valid() && comparator_->Equal(target, child.iter.key())) { assert(child.iter.status().ok()); child.iter.Next(); } AddToMinHeapOrCheckStatus(&child); } } // Current range tombstone iter also needs to seek for the following case: // Previous direction is backward, so range tombstone iter may point to a // tombstone before current_. If there is no such tombstone, then the range // tombstone iter is !Valid(). Need to reseek here to make it valid again. if (!range_tombstone_iters_.empty()) { ParsedInternalKey pik; ParseInternalKey(target, &pik, false /* log_err_key */) .PermitUncheckedError(); for (size_t i = 0; i < range_tombstone_iters_.size(); ++i) { auto iter = range_tombstone_iters_[i]; if (iter) { iter->Seek(pik.user_key); // The while loop is needed as the Seek() call above is only for user // key. We could have a range tombstone with end_key covering user_key, // but still is smaller than target. This happens when the range // tombstone is truncated at iter.largest_. while (iter->Valid() && comparator_->Compare(iter->end_key(), pik) <= 0) { iter->Next(); } if (range_tombstone_iters_[i]->Valid()) { InsertRangeTombstoneToMinHeap( i, comparator_->Compare(range_tombstone_iters_[i]->start_key(), pik) > 0 /* start_key */); } } } } direction_ = kForward; assert(current_ == CurrentForward()); } // Advance all range tombstones iters, including the one corresponding to // current_, to the first tombstone with start_key <= current_.key(). void MergingIterator::SwitchToBackward() { ClearHeaps(); InitMaxHeap(); Slice target = key(); for (auto& child : children_) { if (&child.iter != current_) { child.iter.SeekForPrev(target); TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child); if (child.iter.Valid() && comparator_->Equal(target, child.iter.key())) { assert(child.iter.status().ok()); child.iter.Prev(); } } AddToMaxHeapOrCheckStatus(&child); } ParsedInternalKey pik; ParseInternalKey(target, &pik, false /* log_err_key */) .PermitUncheckedError(); for (size_t i = 0; i < range_tombstone_iters_.size(); ++i) { auto iter = range_tombstone_iters_[i]; if (iter) { iter->SeekForPrev(pik.user_key); // Since the SeekForPrev() call above is only for user key, // we may end up with some range tombstone with start key having the // same user key at current_, but with a smaller sequence number. This // makes current_ not at maxHeap_ top for the CurrentReverse() call // below. If there is a range tombstone start key with the same user // key and the same sequence number as current_.key(), it will be fine as // in InsertRangeTombstoneToMaxHeap() we change op_type to be the smallest // op_type. while (iter->Valid() && comparator_->Compare(iter->start_key(), pik) > 0) { iter->Prev(); } if (iter->Valid()) { InsertRangeTombstoneToMaxHeap( i, comparator_->Compare(range_tombstone_iters_[i]->end_key(), pik) <= 0 /* end_key */); } } } direction_ = kReverse; if (!prefix_seek_mode_) { // Note that we don't do assert(current_ == CurrentReverse()) here // because it is possible to have some keys larger than the seek-key // inserted between Seek() and SeekToLast(), which makes current_ not // equal to CurrentReverse(). current_ = CurrentReverse(); } assert(current_ == CurrentReverse()); } void MergingIterator::ClearHeaps(bool clear_active) { minHeap_.clear(); if (maxHeap_) { maxHeap_->clear(); } if (clear_active) { active_.clear(); } } void MergingIterator::InitMaxHeap() { if (!maxHeap_) { maxHeap_ = std::make_unique(MaxHeapItemComparator(comparator_)); } } // Assume there is a next key that is not covered by range tombstone. // Pre-condition: // - Invariants (3) and (4) // - There is some k where k <= children_[i].iter.key() <= LevelNextVisible(i, // k) for all levels i (LevelNextVisible() defined in Seek()). // // Define NextVisible(k) to be the first key >= k from among children_ that // is not covered by any range tombstone. // Post-condition: // - Invariants (1)-(4) hold // - (*): minHeap_->top()->key() == NextVisible(k) // // Loop invariants: // - Invariants (3) and (4) // - (*): k <= children_[i].iter.key() <= LevelNextVisible(i, k) // // Progress: minHeap_.top()->key() is non-decreasing and strictly increases in // a finite number of iterations. // TODO: it is possible to call SeekImpl(k2) after SeekImpl(k1) with // k2 < k1 in the same FindNextVisibleKey(). For example, l1 has a range // tombstone [2,3) and l2 has a range tombstone [1, 4). Point key 1 from l5 // triggers SeekImpl(4 /* target */, 5). Then point key 2 from l3 triggers // SeekImpl(3 /* target */, 3). // Ideally we should only move iterators forward in SeekImpl(), and the // progress condition can be made simpler: iterator only moves forward. // // Proof sketch: // Post-condition: // Invariant (1) holds when this method returns: // Ignoring the empty minHeap_ case, there are two cases: // Case 1: active_ is empty and !minHeap_.top()->iter.IsDeleteRangeSentinelKey() // By invariants (rti) and (active_), active_ being empty means if a // pinned_heap_item_[i] is in minHeap_, it has type DELETE_RANGE_START. Note // that PopDeleteRangeStart() was called right before the while loop condition, // so minHeap_.top() is not of type DELETE_RANGE_START. So minHeap_.top() must // be of type ITERATOR. // Case 2: SkipNextDeleted() returns false. The method returns false only when // minHeap_.top().type == ITERATOR. // // Invariant (2) holds when this method returns: // From Invariant (1), minHeap_.top().type == ITERATOR. Suppose it is // children_[i] for some i. Suppose that children_[i].iter.key() is covered by // some range tombstone. This means there is a j <= i and a range tombstone from // level j with start_key() < children_[i].iter.key() < end_key(). // - If range_tombstone_iters_[j]->Valid(), by Invariants (rti) and (phi), // pinned_heap_item_[j] is in minHeap_, and pinned_heap_item_[j].tombstone_pik // is either start or end key of this range tombstone. If // pinned_heap_item_[j].tombstone_pik < children_[i].iter.key(), it would be at // top of minHeap_ which would contradict Invariant (1). So // pinned_heap_item_[j].tombstone_pik > children_[i].iter.key(). // By Invariant (3), range_tombstone_iters_[j].prev.end_key() < // children_[i].iter.key(). We assume that in each level, range tombstones // cover non-overlapping ranges. So range_tombstone_iters_[j] is at // the range tombstone with start_key() < children_[i].iter.key() < end_key() // and has its end_key() in minHeap_. By Invariants (phi) and (active_), // j is in active_. From while loop condition, SkipNextDeleted() must have // returned false for this method to return. // - If j < i, then SeekImpl(range_tombstone_iters_[j']->end_key(), i) // was called for some j' < i and j' in active_. Note that since j' is in // active_, pinned_heap_item_[j'] is in minHeap_ and has tombstone_pik = // range_tombstone_iters_[j']->end_key(). So // range_tombstone_iters_[j']->end_key() must be larger than // children_[i].iter.key() to not be at top of minHeap_. This means after // SeekImpl(), children_[i] would be at a key > children_[i].iter.key() // -- contradiction. // - If j == i, children_[i]->Next() would have been called and children_[i] // would be at a key > children_[i].iter.key() -- contradiction. // - If !range_tombstone_iters_[j]->Valid(). Then range_tombstone_iters_[j] // points to an SST file with all range tombstones from that file exhausted. // The file must come before the file containing the first // range tombstone with start_key() < children_[i].iter.key() < end_key(). // Assume files from same level have non-overlapping ranges, the current file's // meta.largest is less than children_[i].iter.key(). So the file boundary key, // which has value meta.largest must have been popped from minHeap_ before // children_[i].iter.key(). So range_tombstone_iters_[j] would not point to // this SST file -- contradiction. // So it is impossible for children_[i].iter.key() to be covered by a range // tombstone. // // Post-condition (*) holds when the function returns: // From loop invariant (*) that k <= children_[i].iter.key() <= // LevelNextVisible(i, k) and Invariant (2) above, when the function returns, // minHeap_.top()->key() is the smallest LevelNextVisible(i, k) among all levels // i. This is equal to NextVisible(k). // // Invariant (3) holds after each iteration: // PopDeleteRangeStart() does not change range tombstone position. // In SkipNextDeleted(): // - If DELETE_RANGE_END is popped from minHeap_, it means the range // tombstone's end key is < all other point keys, so it is safe to advance to // next range tombstone. // - If file boundary is popped (current->iter.IsDeleteRangeSentinelKey()), // we assume that file's last range tombstone's // end_key <= file boundary key < all other point keys. So it is safe to // move to the first range tombstone in the next SST file. // - If children_[i]->Next() is called, then it is fine as it is advancing a // point iterator. // - If SeekImpl(target, l) is called, then (3) follows from SeekImpl()'s // post-condition if its pre-condition holds. First pre-condition follows // from loop invariant where Invariant (3) holds for all levels i. // Now we should second pre-condition holds. Since Invariant (3) holds for // all i, we have for all j <= l, range_tombstone_iters_[j].prev.end_key() // < children_[l].iter.key(). `target` is the value of // range_tombstone_iters_[j'].end_key() for some j' < l and j' in active_. // By Invariant (active_) and (rti), pinned_heap_item_[j'] is in minHeap_ and // pinned_heap_item_[j'].tombstone_pik = range_tombstone_iters_[j'].end_key(). // This end_key must be larger than children_[l].key() since it was not at top // of minHeap_. So for all levels j <= l, // range_tombstone_iters_[j].prev.end_key() < children_[l].iter.key() < target // // Invariant (4) holds after each iteration: // A level i is inserted into active_ during calls to PopDeleteRangeStart(). // In that case, range_tombstone_iters_[i].start_key() < all point keys // by heap property and the assumption that point keys and range tombstone keys // are distinct. // If SeekImpl(target, l) is called, then there is a range_tombstone_iters_[j] // where target = range_tombstone_iters_[j]->end_key() and children_[l]->key() // < target. By loop invariants, (3) and (4) holds for levels. // Since target > children_[l]->key(), it also holds that for j < l, // range_tombstone_iters_[j].prev.end_key() < target and that if j in active_, // range_tombstone_iters_[i]->start_key() < target. So all pre-conditions of // SeekImpl(target, l) holds, and (4) follow from its post-condition. // All other places either in this function either advance point iterators // or remove some level from active_, so (4) still holds. // // Look Invariant (*): for all level i, k <= children_[i] <= LevelNextVisible(i, // k). // k <= children_[i] follows from loop `progress` condition. // Consider when children_[i] is changed for any i. It is through // children_[i].iter.Next() or SeekImpl() in SkipNextDeleted(). // If children_[i].iter.Next() is called, there is a range tombstone from level // i where tombstone seqno > children_[i].iter.key()'s seqno and i in active_. // By Invariant (4), tombstone's start_key < children_[i].iter.key(). By // invariants (active_), (phi), and (rti), tombstone's end_key is in minHeap_ // and that children_[i].iter.key() < end_key. So children_[i].iter.key() is // not visible, and it is safe to call Next(). // If SeekImpl(target, l) is called, by its contract, when SeekImpl() returns, // target <= children_[i]->key() <= LevelNextVisible(i, target) for i >= l, // and children_[end_key() for some j < i and j is in active_. // By Invariant (4), range_tombstone_iters_[j]->start_key() < // children_[i].iter.key() for all i >= l. So for each level i >= l, the range // [children_[i].iter.key(), target) is not visible. So after SeekImpl(), // children_[i].iter.key() <= LevelNextVisible(i, target) <= // LevelNextVisible(i, k). // // `Progress` holds for each iteration: // Very sloppy intuition: // - in PopDeleteRangeStart(): the value of a pinned_heap_item_.tombstone_pik_ // is updated from the start key to the end key of the same range tombstone. // We assume that start key <= end key for the same range tombstone. // - in SkipNextDeleted() // - If the top of heap is DELETE_RANGE_END, the range tombstone is advanced // and the relevant pinned_heap_item_.tombstone_pik is increased or popped // from minHeap_. // - If the top of heap is a file boundary key, then both point iter and // range tombstone iter are advanced to the next file. // - If the top of heap is ITERATOR and current->iter.Next() is called, it // moves to a larger point key. // - If the top of heap is ITERATOR and SeekImpl(k, l) is called, then all // iterators from levels >= l are advanced to some key >= k by its contract. // And top of minHeap_ before SeekImpl(k, l) was less than k. // There are special cases where different heap items have the same key, // e.g. when two range tombstone end keys share the same value). In // these cases, iterators are being advanced, so the minimum key should increase // in a finite number of steps. inline void MergingIterator::FindNextVisibleKey() { PopDeleteRangeStart(); // PopDeleteRangeStart() implies heap top is not DELETE_RANGE_START // active_ being empty implies no DELETE_RANGE_END in heap. // So minHeap_->top() must be of type ITERATOR. while ( !minHeap_.empty() && (!active_.empty() || minHeap_.top()->iter.IsDeleteRangeSentinelKey()) && SkipNextDeleted()) { PopDeleteRangeStart(); } // Checks Invariant (1) assert(minHeap_.empty() || minHeap_.top()->type == HeapItem::Type::ITERATOR); } inline void MergingIterator::FindPrevVisibleKey() { PopDeleteRangeEnd(); // PopDeleteRangeEnd() implies heap top is not DELETE_RANGE_END // active_ being empty implies no DELETE_RANGE_START in heap. // So maxHeap_->top() must be of type ITERATOR. while ( !maxHeap_->empty() && (!active_.empty() || maxHeap_->top()->iter.IsDeleteRangeSentinelKey()) && SkipPrevDeleted()) { PopDeleteRangeEnd(); } } InternalIterator* NewMergingIterator(const InternalKeyComparator* cmp, InternalIterator** list, int n, Arena* arena, bool prefix_seek_mode) { assert(n >= 0); if (n == 0) { return NewEmptyInternalIterator(arena); } else if (n == 1) { return list[0]; } else { if (arena == nullptr) { return new MergingIterator(cmp, list, n, false, prefix_seek_mode); } else { auto mem = arena->AllocateAligned(sizeof(MergingIterator)); return new (mem) MergingIterator(cmp, list, n, true, prefix_seek_mode); } } } MergeIteratorBuilder::MergeIteratorBuilder( const InternalKeyComparator* comparator, Arena* a, bool prefix_seek_mode, const Slice* iterate_upper_bound) : first_iter(nullptr), use_merging_iter(false), arena(a) { auto mem = arena->AllocateAligned(sizeof(MergingIterator)); merge_iter = new (mem) MergingIterator(comparator, nullptr, 0, true, prefix_seek_mode, iterate_upper_bound); } MergeIteratorBuilder::~MergeIteratorBuilder() { if (first_iter != nullptr) { first_iter->~InternalIterator(); } if (merge_iter != nullptr) { merge_iter->~MergingIterator(); } } void MergeIteratorBuilder::AddIterator(InternalIterator* iter) { if (!use_merging_iter && first_iter != nullptr) { merge_iter->AddIterator(first_iter); use_merging_iter = true; first_iter = nullptr; } if (use_merging_iter) { merge_iter->AddIterator(iter); } else { first_iter = iter; } } void MergeIteratorBuilder::AddPointAndTombstoneIterator( InternalIterator* point_iter, TruncatedRangeDelIterator* tombstone_iter, TruncatedRangeDelIterator*** tombstone_iter_ptr) { // tombstone_iter_ptr != nullptr means point_iter is a LevelIterator. bool add_range_tombstone = tombstone_iter || !merge_iter->range_tombstone_iters_.empty() || tombstone_iter_ptr; if (!use_merging_iter && (add_range_tombstone || first_iter)) { use_merging_iter = true; if (first_iter) { merge_iter->AddIterator(first_iter); first_iter = nullptr; } } if (use_merging_iter) { merge_iter->AddIterator(point_iter); if (add_range_tombstone) { // If there was a gap, fill in nullptr as empty range tombstone iterators. while (merge_iter->range_tombstone_iters_.size() < merge_iter->children_.size() - 1) { merge_iter->AddRangeTombstoneIterator(nullptr); } merge_iter->AddRangeTombstoneIterator(tombstone_iter); } if (tombstone_iter_ptr) { // This is needed instead of setting to &range_tombstone_iters_[i] // directly here since the memory address of range_tombstone_iters_[i] // might change during vector resizing. range_del_iter_ptrs_.emplace_back( merge_iter->range_tombstone_iters_.size() - 1, tombstone_iter_ptr); } } else { first_iter = point_iter; } } InternalIterator* MergeIteratorBuilder::Finish(ArenaWrappedDBIter* db_iter) { InternalIterator* ret = nullptr; if (!use_merging_iter) { ret = first_iter; first_iter = nullptr; } else { for (auto& p : range_del_iter_ptrs_) { *(p.second) = &(merge_iter->range_tombstone_iters_[p.first]); } if (db_iter && !merge_iter->range_tombstone_iters_.empty()) { // memtable is always the first level db_iter->SetMemtableRangetombstoneIter( &merge_iter->range_tombstone_iters_.front()); } merge_iter->Finish(); ret = merge_iter; merge_iter = nullptr; } return ret; } } // namespace ROCKSDB_NAMESPACE