// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). // // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/compaction_job.h" #ifndef __STDC_FORMAT_MACROS #define __STDC_FORMAT_MACROS #endif #include #include #include #include #include #include #include #include #include #include #include "db/builder.h" #include "db/db_iter.h" #include "db/dbformat.h" #include "db/event_helpers.h" #include "db/log_reader.h" #include "db/log_writer.h" #include "db/memtable.h" #include "db/memtable_list.h" #include "db/merge_context.h" #include "db/merge_helper.h" #include "db/version_set.h" #include "monitoring/iostats_context_imp.h" #include "monitoring/perf_context_imp.h" #include "monitoring/thread_status_util.h" #include "port/likely.h" #include "port/port.h" #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/statistics.h" #include "rocksdb/status.h" #include "rocksdb/table.h" #include "table/block.h" #include "table/block_based_table_factory.h" #include "table/merging_iterator.h" #include "table/table_builder.h" #include "util/coding.h" #include "util/file_reader_writer.h" #include "util/filename.h" #include "util/log_buffer.h" #include "util/logging.h" #include "util/mutexlock.h" #include "util/random.h" #include "util/sst_file_manager_impl.h" #include "util/stop_watch.h" #include "util/string_util.h" #include "util/sync_point.h" namespace rocksdb { // Maintains state for each sub-compaction struct CompactionJob::SubcompactionState { const Compaction* compaction; std::unique_ptr c_iter; // The boundaries of the key-range this compaction is interested in. No two // subcompactions may have overlapping key-ranges. // 'start' is inclusive, 'end' is exclusive, and nullptr means unbounded Slice *start, *end; // The return status of this subcompaction Status status; // Files produced by this subcompaction struct Output { FileMetaData meta; bool finished; std::shared_ptr table_properties; }; // State kept for output being generated std::vector outputs; std::unique_ptr outfile; std::unique_ptr builder; Output* current_output() { if (outputs.empty()) { // This subcompaction's outptut could be empty if compaction was aborted // before this subcompaction had a chance to generate any output files. // When subcompactions are executed sequentially this is more likely and // will be particulalry likely for the later subcompactions to be empty. // Once they are run in parallel however it should be much rarer. return nullptr; } else { return &outputs.back(); } } uint64_t current_output_file_size; // State during the subcompaction uint64_t total_bytes; uint64_t num_input_records; uint64_t num_output_records; CompactionJobStats compaction_job_stats; uint64_t approx_size; // An index that used to speed up ShouldStopBefore(). size_t grandparent_index = 0; // The number of bytes overlapping between the current output and // grandparent files used in ShouldStopBefore(). uint64_t overlapped_bytes = 0; // A flag determine whether the key has been seen in ShouldStopBefore() bool seen_key = false; std::string compression_dict; SubcompactionState(Compaction* c, Slice* _start, Slice* _end, uint64_t size = 0) : compaction(c), start(_start), end(_end), outfile(nullptr), builder(nullptr), current_output_file_size(0), total_bytes(0), num_input_records(0), num_output_records(0), approx_size(size), grandparent_index(0), overlapped_bytes(0), seen_key(false), compression_dict() { assert(compaction != nullptr); } SubcompactionState(SubcompactionState&& o) { *this = std::move(o); } SubcompactionState& operator=(SubcompactionState&& o) { compaction = std::move(o.compaction); start = std::move(o.start); end = std::move(o.end); status = std::move(o.status); outputs = std::move(o.outputs); outfile = std::move(o.outfile); builder = std::move(o.builder); current_output_file_size = std::move(o.current_output_file_size); total_bytes = std::move(o.total_bytes); num_input_records = std::move(o.num_input_records); num_output_records = std::move(o.num_output_records); compaction_job_stats = std::move(o.compaction_job_stats); approx_size = std::move(o.approx_size); grandparent_index = std::move(o.grandparent_index); overlapped_bytes = std::move(o.overlapped_bytes); seen_key = std::move(o.seen_key); compression_dict = std::move(o.compression_dict); return *this; } // Because member unique_ptrs do not have these. SubcompactionState(const SubcompactionState&) = delete; SubcompactionState& operator=(const SubcompactionState&) = delete; // Returns true iff we should stop building the current output // before processing "internal_key". bool ShouldStopBefore(const Slice& internal_key, uint64_t curr_file_size) { const InternalKeyComparator* icmp = &compaction->column_family_data()->internal_comparator(); const std::vector& grandparents = compaction->grandparents(); // Scan to find earliest grandparent file that contains key. while (grandparent_index < grandparents.size() && icmp->Compare(internal_key, grandparents[grandparent_index]->largest.Encode()) > 0) { if (seen_key) { overlapped_bytes += grandparents[grandparent_index]->fd.GetFileSize(); } assert(grandparent_index + 1 >= grandparents.size() || icmp->Compare( grandparents[grandparent_index]->largest.Encode(), grandparents[grandparent_index + 1]->smallest.Encode()) <= 0); grandparent_index++; } seen_key = true; if (overlapped_bytes + curr_file_size > compaction->max_compaction_bytes()) { // Too much overlap for current output; start new output overlapped_bytes = 0; return true; } return false; } }; // Maintains state for the entire compaction struct CompactionJob::CompactionState { Compaction* const compaction; // REQUIRED: subcompaction states are stored in order of increasing // key-range std::vector sub_compact_states; Status status; uint64_t total_bytes; uint64_t num_input_records; uint64_t num_output_records; explicit CompactionState(Compaction* c) : compaction(c), total_bytes(0), num_input_records(0), num_output_records(0) {} size_t NumOutputFiles() { size_t total = 0; for (auto& s : sub_compact_states) { total += s.outputs.size(); } return total; } Slice SmallestUserKey() { for (const auto& sub_compact_state : sub_compact_states) { if (!sub_compact_state.outputs.empty() && sub_compact_state.outputs[0].finished) { return sub_compact_state.outputs[0].meta.smallest.user_key(); } } // If there is no finished output, return an empty slice. return Slice(nullptr, 0); } Slice LargestUserKey() { for (auto it = sub_compact_states.rbegin(); it < sub_compact_states.rend(); ++it) { if (!it->outputs.empty() && it->current_output()->finished) { assert(it->current_output() != nullptr); return it->current_output()->meta.largest.user_key(); } } // If there is no finished output, return an empty slice. return Slice(nullptr, 0); } }; void CompactionJob::AggregateStatistics() { for (SubcompactionState& sc : compact_->sub_compact_states) { compact_->total_bytes += sc.total_bytes; compact_->num_input_records += sc.num_input_records; compact_->num_output_records += sc.num_output_records; } if (compaction_job_stats_) { for (SubcompactionState& sc : compact_->sub_compact_states) { compaction_job_stats_->Add(sc.compaction_job_stats); } } } CompactionJob::CompactionJob( int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, const EnvOptions& env_options, VersionSet* versions, const std::atomic* shutting_down, LogBuffer* log_buffer, Directory* db_directory, Directory* output_directory, Statistics* stats, InstrumentedMutex* db_mutex, Status* db_bg_error, std::vector existing_snapshots, SequenceNumber earliest_write_conflict_snapshot, std::shared_ptr table_cache, EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname, CompactionJobStats* compaction_job_stats) : job_id_(job_id), compact_(new CompactionState(compaction)), compaction_job_stats_(compaction_job_stats), compaction_stats_(1), dbname_(dbname), db_options_(db_options), env_options_(env_options), env_(db_options.env), versions_(versions), shutting_down_(shutting_down), log_buffer_(log_buffer), db_directory_(db_directory), output_directory_(output_directory), stats_(stats), db_mutex_(db_mutex), db_bg_error_(db_bg_error), existing_snapshots_(std::move(existing_snapshots)), earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot), table_cache_(std::move(table_cache)), event_logger_(event_logger), paranoid_file_checks_(paranoid_file_checks), measure_io_stats_(measure_io_stats) { assert(log_buffer_ != nullptr); const auto* cfd = compact_->compaction->column_family_data(); ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env, db_options_.enable_thread_tracking); ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION); ReportStartedCompaction(compaction); } CompactionJob::~CompactionJob() { assert(compact_ == nullptr); ThreadStatusUtil::ResetThreadStatus(); } void CompactionJob::ReportStartedCompaction( Compaction* compaction) { const auto* cfd = compact_->compaction->column_family_data(); ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env, db_options_.enable_thread_tracking); ThreadStatusUtil::SetThreadOperationProperty( ThreadStatus::COMPACTION_JOB_ID, job_id_); ThreadStatusUtil::SetThreadOperationProperty( ThreadStatus::COMPACTION_INPUT_OUTPUT_LEVEL, (static_cast(compact_->compaction->start_level()) << 32) + compact_->compaction->output_level()); // In the current design, a CompactionJob is always created // for non-trivial compaction. assert(compaction->IsTrivialMove() == false || compaction->is_manual_compaction() == true); ThreadStatusUtil::SetThreadOperationProperty( ThreadStatus::COMPACTION_PROP_FLAGS, compaction->is_manual_compaction() + (compaction->deletion_compaction() << 1)); ThreadStatusUtil::SetThreadOperationProperty( ThreadStatus::COMPACTION_TOTAL_INPUT_BYTES, compaction->CalculateTotalInputSize()); IOSTATS_RESET(bytes_written); IOSTATS_RESET(bytes_read); ThreadStatusUtil::SetThreadOperationProperty( ThreadStatus::COMPACTION_BYTES_WRITTEN, 0); ThreadStatusUtil::SetThreadOperationProperty( ThreadStatus::COMPACTION_BYTES_READ, 0); // Set the thread operation after operation properties // to ensure GetThreadList() can always show them all together. ThreadStatusUtil::SetThreadOperation( ThreadStatus::OP_COMPACTION); if (compaction_job_stats_) { compaction_job_stats_->is_manual_compaction = compaction->is_manual_compaction(); } } void CompactionJob::Prepare() { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_COMPACTION_PREPARE); // Generate file_levels_ for compaction berfore making Iterator auto* c = compact_->compaction; assert(c->column_family_data() != nullptr); assert(c->column_family_data()->current()->storage_info() ->NumLevelFiles(compact_->compaction->level()) > 0); // Is this compaction producing files at the bottommost level? bottommost_level_ = c->bottommost_level(); if (c->ShouldFormSubcompactions()) { const uint64_t start_micros = env_->NowMicros(); GenSubcompactionBoundaries(); MeasureTime(stats_, SUBCOMPACTION_SETUP_TIME, env_->NowMicros() - start_micros); assert(sizes_.size() == boundaries_.size() + 1); for (size_t i = 0; i <= boundaries_.size(); i++) { Slice* start = i == 0 ? nullptr : &boundaries_[i - 1]; Slice* end = i == boundaries_.size() ? nullptr : &boundaries_[i]; compact_->sub_compact_states.emplace_back(c, start, end, sizes_[i]); } MeasureTime(stats_, NUM_SUBCOMPACTIONS_SCHEDULED, compact_->sub_compact_states.size()); } else { compact_->sub_compact_states.emplace_back(c, nullptr, nullptr); } } struct RangeWithSize { Range range; uint64_t size; RangeWithSize(const Slice& a, const Slice& b, uint64_t s = 0) : range(a, b), size(s) {} }; // Generates a histogram representing potential divisions of key ranges from // the input. It adds the starting and/or ending keys of certain input files // to the working set and then finds the approximate size of data in between // each consecutive pair of slices. Then it divides these ranges into // consecutive groups such that each group has a similar size. void CompactionJob::GenSubcompactionBoundaries() { auto* c = compact_->compaction; auto* cfd = c->column_family_data(); const Comparator* cfd_comparator = cfd->user_comparator(); std::vector bounds; int start_lvl = c->start_level(); int out_lvl = c->output_level(); // Add the starting and/or ending key of certain input files as a potential // boundary for (size_t lvl_idx = 0; lvl_idx < c->num_input_levels(); lvl_idx++) { int lvl = c->level(lvl_idx); if (lvl >= start_lvl && lvl <= out_lvl) { const LevelFilesBrief* flevel = c->input_levels(lvl_idx); size_t num_files = flevel->num_files; if (num_files == 0) { continue; } if (lvl == 0) { // For level 0 add the starting and ending key of each file since the // files may have greatly differing key ranges (not range-partitioned) for (size_t i = 0; i < num_files; i++) { bounds.emplace_back(flevel->files[i].smallest_key); bounds.emplace_back(flevel->files[i].largest_key); } } else { // For all other levels add the smallest/largest key in the level to // encompass the range covered by that level bounds.emplace_back(flevel->files[0].smallest_key); bounds.emplace_back(flevel->files[num_files - 1].largest_key); if (lvl == out_lvl) { // For the last level include the starting keys of all files since // the last level is the largest and probably has the widest key // range. Since it's range partitioned, the ending key of one file // and the starting key of the next are very close (or identical). for (size_t i = 1; i < num_files; i++) { bounds.emplace_back(flevel->files[i].smallest_key); } } } } } std::sort(bounds.begin(), bounds.end(), [cfd_comparator] (const Slice& a, const Slice& b) -> bool { return cfd_comparator->Compare(ExtractUserKey(a), ExtractUserKey(b)) < 0; }); // Remove duplicated entries from bounds bounds.erase(std::unique(bounds.begin(), bounds.end(), [cfd_comparator] (const Slice& a, const Slice& b) -> bool { return cfd_comparator->Compare(ExtractUserKey(a), ExtractUserKey(b)) == 0; }), bounds.end()); // Combine consecutive pairs of boundaries into ranges with an approximate // size of data covered by keys in that range uint64_t sum = 0; std::vector ranges; auto* v = cfd->current(); for (auto it = bounds.begin();;) { const Slice a = *it; it++; if (it == bounds.end()) { break; } const Slice b = *it; uint64_t size = versions_->ApproximateSize(v, a, b, start_lvl, out_lvl + 1); ranges.emplace_back(a, b, size); sum += size; } // Group the ranges into subcompactions const double min_file_fill_percent = 4.0 / 5; uint64_t max_output_files = static_cast( std::ceil(sum / min_file_fill_percent / c->mutable_cf_options()->MaxFileSizeForLevel(out_lvl))); uint64_t subcompactions = std::min({static_cast(ranges.size()), static_cast(db_options_.max_subcompactions), max_output_files}); if (subcompactions > 1) { double mean = sum * 1.0 / subcompactions; // Greedily add ranges to the subcompaction until the sum of the ranges' // sizes becomes >= the expected mean size of a subcompaction sum = 0; for (size_t i = 0; i < ranges.size() - 1; i++) { sum += ranges[i].size; if (subcompactions == 1) { // If there's only one left to schedule then it goes to the end so no // need to put an end boundary continue; } if (sum >= mean) { boundaries_.emplace_back(ExtractUserKey(ranges[i].range.limit)); sizes_.emplace_back(sum); subcompactions--; sum = 0; } } sizes_.emplace_back(sum + ranges.back().size); } else { // Only one range so its size is the total sum of sizes computed above sizes_.emplace_back(sum); } } Status CompactionJob::Run() { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_COMPACTION_RUN); TEST_SYNC_POINT("CompactionJob::Run():Start"); log_buffer_->FlushBufferToLog(); LogCompaction(); const size_t num_threads = compact_->sub_compact_states.size(); assert(num_threads > 0); const uint64_t start_micros = env_->NowMicros(); // Launch a thread for each of subcompactions 1...num_threads-1 std::vector thread_pool; thread_pool.reserve(num_threads - 1); for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) { thread_pool.emplace_back(&CompactionJob::ProcessKeyValueCompaction, this, &compact_->sub_compact_states[i]); } // Always schedule the first subcompaction (whether or not there are also // others) in the current thread to be efficient with resources ProcessKeyValueCompaction(&compact_->sub_compact_states[0]); // Wait for all other threads (if there are any) to finish execution for (auto& thread : thread_pool) { thread.join(); } if (output_directory_) { output_directory_->Fsync(); } compaction_stats_.micros = env_->NowMicros() - start_micros; MeasureTime(stats_, COMPACTION_TIME, compaction_stats_.micros); // Check if any thread encountered an error during execution Status status; for (const auto& state : compact_->sub_compact_states) { if (!state.status.ok()) { status = state.status; break; } } TablePropertiesCollection tp; for (const auto& state : compact_->sub_compact_states) { for (const auto& output : state.outputs) { auto fn = TableFileName(db_options_.db_paths, output.meta.fd.GetNumber(), output.meta.fd.GetPathId()); tp[fn] = output.table_properties; } } compact_->compaction->SetOutputTableProperties(std::move(tp)); // Finish up all book-keeping to unify the subcompaction results AggregateStatistics(); UpdateCompactionStats(); RecordCompactionIOStats(); LogFlush(db_options_.info_log); TEST_SYNC_POINT("CompactionJob::Run():End"); compact_->status = status; return status; } Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_COMPACTION_INSTALL); db_mutex_->AssertHeld(); Status status = compact_->status; ColumnFamilyData* cfd = compact_->compaction->column_family_data(); cfd->internal_stats()->AddCompactionStats( compact_->compaction->output_level(), compaction_stats_); if (status.ok()) { status = InstallCompactionResults(mutable_cf_options); } VersionStorageInfo::LevelSummaryStorage tmp; auto vstorage = cfd->current()->storage_info(); const auto& stats = compaction_stats_; double read_write_amp = 0.0; double write_amp = 0.0; if (stats.bytes_read_non_output_levels > 0) { read_write_amp = (stats.bytes_written + stats.bytes_read_output_level + stats.bytes_read_non_output_levels) / static_cast(stats.bytes_read_non_output_levels); write_amp = stats.bytes_written / static_cast(stats.bytes_read_non_output_levels); } ROCKS_LOG_BUFFER( log_buffer_, "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, " "files in(%d, %d) out(%d) " "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) " "write-amplify(%.1f) %s, records in: %d, records dropped: %d\n", cfd->GetName().c_str(), vstorage->LevelSummary(&tmp), (stats.bytes_read_non_output_levels + stats.bytes_read_output_level) / static_cast(stats.micros), stats.bytes_written / static_cast(stats.micros), compact_->compaction->output_level(), stats.num_input_files_in_non_output_levels, stats.num_input_files_in_output_level, stats.num_output_files, stats.bytes_read_non_output_levels / 1048576.0, stats.bytes_read_output_level / 1048576.0, stats.bytes_written / 1048576.0, read_write_amp, write_amp, status.ToString().c_str(), stats.num_input_records, stats.num_dropped_records); UpdateCompactionJobStats(stats); auto stream = event_logger_->LogToBuffer(log_buffer_); stream << "job" << job_id_ << "event" << "compaction_finished" << "compaction_time_micros" << compaction_stats_.micros << "output_level" << compact_->compaction->output_level() << "num_output_files" << compact_->NumOutputFiles() << "total_output_size" << compact_->total_bytes << "num_input_records" << compact_->num_input_records << "num_output_records" << compact_->num_output_records << "num_subcompactions" << compact_->sub_compact_states.size(); if (compaction_job_stats_ != nullptr) { stream << "num_single_delete_mismatches" << compaction_job_stats_->num_single_del_mismatch; stream << "num_single_delete_fallthrough" << compaction_job_stats_->num_single_del_fallthru; } if (measure_io_stats_ && compaction_job_stats_ != nullptr) { stream << "file_write_nanos" << compaction_job_stats_->file_write_nanos; stream << "file_range_sync_nanos" << compaction_job_stats_->file_range_sync_nanos; stream << "file_fsync_nanos" << compaction_job_stats_->file_fsync_nanos; stream << "file_prepare_write_nanos" << compaction_job_stats_->file_prepare_write_nanos; } stream << "lsm_state"; stream.StartArray(); for (int level = 0; level < vstorage->num_levels(); ++level) { stream << vstorage->NumLevelFiles(level); } stream.EndArray(); CleanupCompaction(); return status; } void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { assert(sub_compact != nullptr); ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); std::unique_ptr range_del_agg( new RangeDelAggregator(cfd->internal_comparator(), existing_snapshots_)); std::unique_ptr input(versions_->MakeInputIterator( sub_compact->compaction, range_del_agg.get())); AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_COMPACTION_PROCESS_KV); // I/O measurement variables PerfLevel prev_perf_level = PerfLevel::kEnableTime; const uint64_t kRecordStatsEvery = 1000; uint64_t prev_write_nanos = 0; uint64_t prev_fsync_nanos = 0; uint64_t prev_range_sync_nanos = 0; uint64_t prev_prepare_write_nanos = 0; if (measure_io_stats_) { prev_perf_level = GetPerfLevel(); SetPerfLevel(PerfLevel::kEnableTime); prev_write_nanos = IOSTATS(write_nanos); prev_fsync_nanos = IOSTATS(fsync_nanos); prev_range_sync_nanos = IOSTATS(range_sync_nanos); prev_prepare_write_nanos = IOSTATS(prepare_write_nanos); } const MutableCFOptions* mutable_cf_options = sub_compact->compaction->mutable_cf_options(); // To build compression dictionary, we sample the first output file, assuming // it'll reach the maximum length, and then use the dictionary for compressing // subsequent output files. The dictionary may be less than max_dict_bytes if // the first output file's length is less than the maximum. const int kSampleLenShift = 6; // 2^6 = 64-byte samples std::set sample_begin_offsets; if (bottommost_level_ && cfd->ioptions()->compression_opts.max_dict_bytes > 0) { const size_t kMaxSamples = cfd->ioptions()->compression_opts.max_dict_bytes >> kSampleLenShift; const size_t kOutFileLen = mutable_cf_options->MaxFileSizeForLevel( compact_->compaction->output_level()); if (kOutFileLen != port::kMaxSizet) { const size_t kOutFileNumSamples = kOutFileLen >> kSampleLenShift; Random64 generator{versions_->NewFileNumber()}; for (size_t i = 0; i < kMaxSamples; ++i) { sample_begin_offsets.insert(generator.Uniform(kOutFileNumSamples) << kSampleLenShift); } } } auto compaction_filter = cfd->ioptions()->compaction_filter; std::unique_ptr compaction_filter_from_factory = nullptr; if (compaction_filter == nullptr) { compaction_filter_from_factory = sub_compact->compaction->CreateCompactionFilter(); compaction_filter = compaction_filter_from_factory.get(); } MergeHelper merge( env_, cfd->user_comparator(), cfd->ioptions()->merge_operator, compaction_filter, db_options_.info_log.get(), false /* internal key corruption is expected */, existing_snapshots_.empty() ? 0 : existing_snapshots_.back(), compact_->compaction->level(), db_options_.statistics.get(), shutting_down_); TEST_SYNC_POINT("CompactionJob::Run():Inprogress"); Slice* start = sub_compact->start; Slice* end = sub_compact->end; if (start != nullptr) { IterKey start_iter; start_iter.SetInternalKey(*start, kMaxSequenceNumber, kValueTypeForSeek); input->Seek(start_iter.GetInternalKey()); } else { input->SeekToFirst(); } // we allow only 1 compaction event listener. Used by blob storage CompactionEventListener* comp_event_listener = nullptr; #ifndef ROCKSDB_LITE for (auto& celitr : cfd->ioptions()->listeners) { comp_event_listener = celitr->GetCompactionEventListener(); if (comp_event_listener != nullptr) { break; } } #endif // ROCKSDB_LITE Status status; sub_compact->c_iter.reset(new CompactionIterator( input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(), &existing_snapshots_, earliest_write_conflict_snapshot_, env_, false, range_del_agg.get(), sub_compact->compaction, compaction_filter, comp_event_listener, shutting_down_)); auto c_iter = sub_compact->c_iter.get(); c_iter->SeekToFirst(); if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) { // ShouldStopBefore() maintains state based on keys processed so far. The // compaction loop always calls it on the "next" key, thus won't tell it the // first key. So we do that here. sub_compact->ShouldStopBefore( c_iter->key(), sub_compact->current_output_file_size); } const auto& c_iter_stats = c_iter->iter_stats(); auto sample_begin_offset_iter = sample_begin_offsets.cbegin(); // data_begin_offset and compression_dict are only valid while generating // dictionary from the first output file. size_t data_begin_offset = 0; std::string compression_dict; compression_dict.reserve(cfd->ioptions()->compression_opts.max_dict_bytes); while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) { // Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid() // returns true. const Slice& key = c_iter->key(); const Slice& value = c_iter->value(); // If an end key (exclusive) is specified, check if the current key is // >= than it and exit if it is because the iterator is out of its range if (end != nullptr && cfd->user_comparator()->Compare(c_iter->user_key(), *end) >= 0) { break; } if (c_iter_stats.num_input_records % kRecordStatsEvery == kRecordStatsEvery - 1) { RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats); c_iter->ResetRecordCounts(); RecordCompactionIOStats(); } // Open output file if necessary if (sub_compact->builder == nullptr) { status = OpenCompactionOutputFile(sub_compact); if (!status.ok()) { break; } } assert(sub_compact->builder != nullptr); assert(sub_compact->current_output() != nullptr); sub_compact->builder->Add(key, value); sub_compact->current_output_file_size = sub_compact->builder->FileSize(); sub_compact->current_output()->meta.UpdateBoundaries( key, c_iter->ikey().sequence); sub_compact->num_output_records++; if (sub_compact->outputs.size() == 1) { // first output file // Check if this key/value overlaps any sample intervals; if so, appends // overlapping portions to the dictionary. for (const auto& data_elmt : {key, value}) { size_t data_end_offset = data_begin_offset + data_elmt.size(); while (sample_begin_offset_iter != sample_begin_offsets.cend() && *sample_begin_offset_iter < data_end_offset) { size_t sample_end_offset = *sample_begin_offset_iter + (1 << kSampleLenShift); // Invariant: Because we advance sample iterator while processing the // data_elmt containing the sample's last byte, the current sample // cannot end before the current data_elmt. assert(data_begin_offset < sample_end_offset); size_t data_elmt_copy_offset, data_elmt_copy_len; if (*sample_begin_offset_iter <= data_begin_offset) { // The sample starts before data_elmt starts, so take bytes starting // at the beginning of data_elmt. data_elmt_copy_offset = 0; } else { // data_elmt starts before the sample starts, so take bytes starting // at the below offset into data_elmt. data_elmt_copy_offset = *sample_begin_offset_iter - data_begin_offset; } if (sample_end_offset <= data_end_offset) { // The sample ends before data_elmt ends, so take as many bytes as // needed. data_elmt_copy_len = sample_end_offset - (data_begin_offset + data_elmt_copy_offset); } else { // data_elmt ends before the sample ends, so take all remaining // bytes in data_elmt. data_elmt_copy_len = data_end_offset - (data_begin_offset + data_elmt_copy_offset); } compression_dict.append(&data_elmt.data()[data_elmt_copy_offset], data_elmt_copy_len); if (sample_end_offset > data_end_offset) { // Didn't finish sample. Try to finish it with the next data_elmt. break; } // Next sample may require bytes from same data_elmt. sample_begin_offset_iter++; } data_begin_offset = data_end_offset; } } // Close output file if it is big enough. Two possibilities determine it's // time to close it: (1) the current key should be this file's last key, (2) // the next key should not be in this file. // // TODO(aekmekji): determine if file should be closed earlier than this // during subcompactions (i.e. if output size, estimated by input size, is // going to be 1.2MB and max_output_file_size = 1MB, prefer to have 0.6MB // and 0.6MB instead of 1MB and 0.2MB) bool output_file_ended = false; Status input_status; if (sub_compact->compaction->output_level() != 0 && sub_compact->current_output_file_size >= sub_compact->compaction->max_output_file_size()) { // (1) this key terminates the file. For historical reasons, the iterator // status before advancing will be given to FinishCompactionOutputFile(). input_status = input->status(); output_file_ended = true; } c_iter->Next(); if (!output_file_ended && c_iter->Valid() && sub_compact->compaction->output_level() != 0 && sub_compact->ShouldStopBefore( c_iter->key(), sub_compact->current_output_file_size) && sub_compact->builder != nullptr) { // (2) this key belongs to the next file. For historical reasons, the // iterator status after advancing will be given to // FinishCompactionOutputFile(). input_status = input->status(); output_file_ended = true; } if (output_file_ended) { const Slice* next_key = nullptr; if (c_iter->Valid()) { next_key = &c_iter->key(); } CompactionIterationStats range_del_out_stats; status = FinishCompactionOutputFile(input_status, sub_compact, range_del_agg.get(), &range_del_out_stats, next_key); RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats); if (sub_compact->outputs.size() == 1) { // Use dictionary from first output file for compression of subsequent // files. sub_compact->compression_dict = std::move(compression_dict); } } } sub_compact->num_input_records = c_iter_stats.num_input_records; sub_compact->compaction_job_stats.num_input_deletion_records = c_iter_stats.num_input_deletion_records; sub_compact->compaction_job_stats.num_corrupt_keys = c_iter_stats.num_input_corrupt_records; sub_compact->compaction_job_stats.num_single_del_fallthru = c_iter_stats.num_single_del_fallthru; sub_compact->compaction_job_stats.num_single_del_mismatch = c_iter_stats.num_single_del_mismatch; sub_compact->compaction_job_stats.total_input_raw_key_bytes += c_iter_stats.total_input_raw_key_bytes; sub_compact->compaction_job_stats.total_input_raw_value_bytes += c_iter_stats.total_input_raw_value_bytes; RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME, c_iter_stats.total_filter_time); RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats); RecordCompactionIOStats(); if (status.ok() && (shutting_down_->load(std::memory_order_relaxed) || cfd->IsDropped())) { status = Status::ShutdownInProgress( "Database shutdown or Column family drop during compaction"); } if (status.ok()) { status = input->status(); } if (status.ok()) { status = c_iter->status(); } if (status.ok() && sub_compact->builder == nullptr && sub_compact->outputs.size() == 0 && range_del_agg->ShouldAddTombstones(bottommost_level_)) { // handle subcompaction containing only range deletions status = OpenCompactionOutputFile(sub_compact); } // Call FinishCompactionOutputFile() even if status is not ok: it needs to // close the output file. if (sub_compact->builder != nullptr) { CompactionIterationStats range_del_out_stats; Status s = FinishCompactionOutputFile( status, sub_compact, range_del_agg.get(), &range_del_out_stats); if (status.ok()) { status = s; } RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats); } if (measure_io_stats_) { sub_compact->compaction_job_stats.file_write_nanos += IOSTATS(write_nanos) - prev_write_nanos; sub_compact->compaction_job_stats.file_fsync_nanos += IOSTATS(fsync_nanos) - prev_fsync_nanos; sub_compact->compaction_job_stats.file_range_sync_nanos += IOSTATS(range_sync_nanos) - prev_range_sync_nanos; sub_compact->compaction_job_stats.file_prepare_write_nanos += IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos; if (prev_perf_level != PerfLevel::kEnableTime) { SetPerfLevel(prev_perf_level); } } sub_compact->c_iter.reset(); input.reset(); sub_compact->status = status; } void CompactionJob::RecordDroppedKeys( const CompactionIterationStats& c_iter_stats, CompactionJobStats* compaction_job_stats) { if (c_iter_stats.num_record_drop_user > 0) { RecordTick(stats_, COMPACTION_KEY_DROP_USER, c_iter_stats.num_record_drop_user); } if (c_iter_stats.num_record_drop_hidden > 0) { RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY, c_iter_stats.num_record_drop_hidden); if (compaction_job_stats) { compaction_job_stats->num_records_replaced += c_iter_stats.num_record_drop_hidden; } } if (c_iter_stats.num_record_drop_obsolete > 0) { RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE, c_iter_stats.num_record_drop_obsolete); if (compaction_job_stats) { compaction_job_stats->num_expired_deletion_records += c_iter_stats.num_record_drop_obsolete; } } if (c_iter_stats.num_record_drop_range_del > 0) { RecordTick(stats_, COMPACTION_KEY_DROP_RANGE_DEL, c_iter_stats.num_record_drop_range_del); } if (c_iter_stats.num_range_del_drop_obsolete > 0) { RecordTick(stats_, COMPACTION_RANGE_DEL_DROP_OBSOLETE, c_iter_stats.num_range_del_drop_obsolete); } } Status CompactionJob::FinishCompactionOutputFile( const Status& input_status, SubcompactionState* sub_compact, RangeDelAggregator* range_del_agg, CompactionIterationStats* range_del_out_stats, const Slice* next_table_min_key /* = nullptr */) { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_COMPACTION_SYNC_FILE); assert(sub_compact != nullptr); assert(sub_compact->outfile); assert(sub_compact->builder != nullptr); assert(sub_compact->current_output() != nullptr); uint64_t output_number = sub_compact->current_output()->meta.fd.GetNumber(); assert(output_number != 0); TableProperties table_properties; // Check for iterator errors Status s = input_status; auto meta = &sub_compact->current_output()->meta; if (s.ok()) { Slice lower_bound_guard, upper_bound_guard; const Slice *lower_bound, *upper_bound; if (sub_compact->outputs.size() == 1) { // For the first output table, include range tombstones before the min key // but after the subcompaction boundary. lower_bound = sub_compact->start; } else if (meta->smallest.size() > 0) { // For subsequent output tables, only include range tombstones from min // key onwards since the previous file was extended to contain range // tombstones falling before min key. lower_bound_guard = meta->smallest.user_key(); lower_bound = &lower_bound_guard; } else { lower_bound = nullptr; } if (next_table_min_key != nullptr) { // This isn't the last file in the subcompaction, so extend until the next // file starts. upper_bound_guard = ExtractUserKey(*next_table_min_key); upper_bound = &upper_bound_guard; } else { // This is the last file in the subcompaction, so extend until the // subcompaction ends. upper_bound = sub_compact->end; } range_del_agg->AddToBuilder(sub_compact->builder.get(), lower_bound, upper_bound, meta, range_del_out_stats, bottommost_level_); } const uint64_t current_entries = sub_compact->builder->NumEntries(); meta->marked_for_compaction = sub_compact->builder->NeedCompact(); if (s.ok()) { s = sub_compact->builder->Finish(); } else { sub_compact->builder->Abandon(); } const uint64_t current_bytes = sub_compact->builder->FileSize(); meta->fd.file_size = current_bytes; sub_compact->current_output()->finished = true; sub_compact->total_bytes += current_bytes; // Finish and check for file errors if (s.ok()) { StopWatch sw(env_, stats_, COMPACTION_OUTFILE_SYNC_MICROS); s = sub_compact->outfile->Sync(db_options_.use_fsync); } if (s.ok()) { s = sub_compact->outfile->Close(); } sub_compact->outfile.reset(); ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); TableProperties tp; if (s.ok() && current_entries > 0) { // Verify that the table is usable // We set for_compaction to false and don't OptimizeForCompactionTableRead // here because this is a special case after we finish the table building // No matter whether use_direct_io_for_flush_and_compaction is true, // we will regrad this verification as user reads since the goal is // to cache it here for further user reads InternalIterator* iter = cfd->table_cache()->NewIterator( ReadOptions(), env_options_, cfd->internal_comparator(), meta->fd, nullptr /* range_del_agg */, nullptr, cfd->internal_stats()->GetFileReadHist( compact_->compaction->output_level()), false); s = iter->status(); if (s.ok() && paranoid_file_checks_) { for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {} s = iter->status(); } delete iter; // Output to event logger and fire events. if (s.ok()) { tp = sub_compact->builder->GetTableProperties(); sub_compact->current_output()->table_properties = std::make_shared(tp); ROCKS_LOG_INFO(db_options_.info_log, "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64 " keys, %" PRIu64 " bytes%s", cfd->GetName().c_str(), job_id_, output_number, current_entries, current_bytes, meta->marked_for_compaction ? " (need compaction)" : ""); } } std::string fname = TableFileName(db_options_.db_paths, meta->fd.GetNumber(), meta->fd.GetPathId()); EventHelpers::LogAndNotifyTableFileCreationFinished( event_logger_, cfd->ioptions()->listeners, dbname_, cfd->GetName(), fname, job_id_, meta->fd, tp, TableFileCreationReason::kCompaction, s); #ifndef ROCKSDB_LITE // Report new file to SstFileManagerImpl auto sfm = static_cast(db_options_.sst_file_manager.get()); if (sfm && meta->fd.GetPathId() == 0) { auto fn = TableFileName(cfd->ioptions()->db_paths, meta->fd.GetNumber(), meta->fd.GetPathId()); sfm->OnAddFile(fn); if (sfm->IsMaxAllowedSpaceReached()) { InstrumentedMutexLock l(db_mutex_); if (db_bg_error_->ok()) { s = Status::IOError("Max allowed space was reached"); *db_bg_error_ = s; TEST_SYNC_POINT( "CompactionJob::FinishCompactionOutputFile:MaxAllowedSpaceReached"); } } } #endif sub_compact->builder.reset(); sub_compact->current_output_file_size = 0; return s; } Status CompactionJob::InstallCompactionResults( const MutableCFOptions& mutable_cf_options) { db_mutex_->AssertHeld(); auto* compaction = compact_->compaction; // paranoia: verify that the files that we started with // still exist in the current version and in the same original level. // This ensures that a concurrent compaction did not erroneously // pick the same files to compact_. if (!versions_->VerifyCompactionFileConsistency(compaction)) { Compaction::InputLevelSummaryBuffer inputs_summary; ROCKS_LOG_ERROR(db_options_.info_log, "[%s] [JOB %d] Compaction %s aborted", compaction->column_family_data()->GetName().c_str(), job_id_, compaction->InputLevelSummary(&inputs_summary)); return Status::Corruption("Compaction input files inconsistent"); } { Compaction::InputLevelSummaryBuffer inputs_summary; ROCKS_LOG_INFO( db_options_.info_log, "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes", compaction->column_family_data()->GetName().c_str(), job_id_, compaction->InputLevelSummary(&inputs_summary), compact_->total_bytes); } // Add compaction outputs compaction->AddInputDeletions(compact_->compaction->edit()); for (const auto& sub_compact : compact_->sub_compact_states) { for (const auto& out : sub_compact.outputs) { compaction->edit()->AddFile(compaction->output_level(), out.meta); } } return versions_->LogAndApply(compaction->column_family_data(), mutable_cf_options, compaction->edit(), db_mutex_, db_directory_); } void CompactionJob::RecordCompactionIOStats() { RecordTick(stats_, COMPACT_READ_BYTES, IOSTATS(bytes_read)); ThreadStatusUtil::IncreaseThreadOperationProperty( ThreadStatus::COMPACTION_BYTES_READ, IOSTATS(bytes_read)); IOSTATS_RESET(bytes_read); RecordTick(stats_, COMPACT_WRITE_BYTES, IOSTATS(bytes_written)); ThreadStatusUtil::IncreaseThreadOperationProperty( ThreadStatus::COMPACTION_BYTES_WRITTEN, IOSTATS(bytes_written)); IOSTATS_RESET(bytes_written); } Status CompactionJob::OpenCompactionOutputFile( SubcompactionState* sub_compact) { assert(sub_compact != nullptr); assert(sub_compact->builder == nullptr); // no need to lock because VersionSet::next_file_number_ is atomic uint64_t file_number = versions_->NewFileNumber(); std::string fname = TableFileName(db_options_.db_paths, file_number, sub_compact->compaction->output_path_id()); // Fire events. ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); #ifndef ROCKSDB_LITE EventHelpers::NotifyTableFileCreationStarted( cfd->ioptions()->listeners, dbname_, cfd->GetName(), fname, job_id_, TableFileCreationReason::kCompaction); #endif // !ROCKSDB_LITE // Make the output file unique_ptr writable_file; EnvOptions opt_env_opts = env_->OptimizeForCompactionTableWrite(env_options_, db_options_); TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile", &opt_env_opts.use_direct_writes); Status s = NewWritableFile(env_, fname, &writable_file, opt_env_opts); if (!s.ok()) { ROCKS_LOG_ERROR( db_options_.info_log, "[%s] [JOB %d] OpenCompactionOutputFiles for table #%" PRIu64 " fails at NewWritableFile with status %s", sub_compact->compaction->column_family_data()->GetName().c_str(), job_id_, file_number, s.ToString().c_str()); LogFlush(db_options_.info_log); EventHelpers::LogAndNotifyTableFileCreationFinished( event_logger_, cfd->ioptions()->listeners, dbname_, cfd->GetName(), fname, job_id_, FileDescriptor(), TableProperties(), TableFileCreationReason::kCompaction, s); return s; } SubcompactionState::Output out; out.meta.fd = FileDescriptor(file_number, sub_compact->compaction->output_path_id(), 0); out.finished = false; sub_compact->outputs.push_back(out); writable_file->SetIOPriority(Env::IO_LOW); writable_file->SetPreallocationBlockSize(static_cast( sub_compact->compaction->OutputFilePreallocationSize())); sub_compact->outfile.reset(new WritableFileWriter( std::move(writable_file), env_options_, db_options_.statistics.get())); // If the Column family flag is to only optimize filters for hits, // we can skip creating filters if this is the bottommost_level where // data is going to be found bool skip_filters = cfd->ioptions()->optimize_filters_for_hits && bottommost_level_; sub_compact->builder.reset(NewTableBuilder( *cfd->ioptions(), cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), sub_compact->outfile.get(), sub_compact->compaction->output_compression(), cfd->ioptions()->compression_opts, sub_compact->compaction->output_level(), &sub_compact->compression_dict, skip_filters)); LogFlush(db_options_.info_log); return s; } void CompactionJob::CleanupCompaction() { for (SubcompactionState& sub_compact : compact_->sub_compact_states) { const auto& sub_status = sub_compact.status; if (sub_compact.builder != nullptr) { // May happen if we get a shutdown call in the middle of compaction sub_compact.builder->Abandon(); sub_compact.builder.reset(); } else { assert(!sub_status.ok() || sub_compact.outfile == nullptr); } for (const auto& out : sub_compact.outputs) { // If this file was inserted into the table cache then remove // them here because this compaction was not committed. if (!sub_status.ok()) { TableCache::Evict(table_cache_.get(), out.meta.fd.GetNumber()); } } } delete compact_; compact_ = nullptr; } #ifndef ROCKSDB_LITE namespace { void CopyPrefix( const Slice& src, size_t prefix_length, std::string* dst) { assert(prefix_length > 0); size_t length = src.size() > prefix_length ? prefix_length : src.size(); dst->assign(src.data(), length); } } // namespace #endif // !ROCKSDB_LITE void CompactionJob::UpdateCompactionStats() { Compaction* compaction = compact_->compaction; compaction_stats_.num_input_files_in_non_output_levels = 0; compaction_stats_.num_input_files_in_output_level = 0; for (int input_level = 0; input_level < static_cast(compaction->num_input_levels()); ++input_level) { if (compaction->level(input_level) != compaction->output_level()) { UpdateCompactionInputStatsHelper( &compaction_stats_.num_input_files_in_non_output_levels, &compaction_stats_.bytes_read_non_output_levels, input_level); } else { UpdateCompactionInputStatsHelper( &compaction_stats_.num_input_files_in_output_level, &compaction_stats_.bytes_read_output_level, input_level); } } for (const auto& sub_compact : compact_->sub_compact_states) { size_t num_output_files = sub_compact.outputs.size(); if (sub_compact.builder != nullptr) { // An error occurred so ignore the last output. assert(num_output_files > 0); --num_output_files; } compaction_stats_.num_output_files += static_cast(num_output_files); for (const auto& out : sub_compact.outputs) { compaction_stats_.bytes_written += out.meta.fd.file_size; } if (sub_compact.num_input_records > sub_compact.num_output_records) { compaction_stats_.num_dropped_records += sub_compact.num_input_records - sub_compact.num_output_records; } } } void CompactionJob::UpdateCompactionInputStatsHelper( int* num_files, uint64_t* bytes_read, int input_level) { const Compaction* compaction = compact_->compaction; auto num_input_files = compaction->num_input_files(input_level); *num_files += static_cast(num_input_files); for (size_t i = 0; i < num_input_files; ++i) { const auto* file_meta = compaction->input(input_level, i); *bytes_read += file_meta->fd.GetFileSize(); compaction_stats_.num_input_records += static_cast(file_meta->num_entries); } } void CompactionJob::UpdateCompactionJobStats( const InternalStats::CompactionStats& stats) const { #ifndef ROCKSDB_LITE if (compaction_job_stats_) { compaction_job_stats_->elapsed_micros = stats.micros; // input information compaction_job_stats_->total_input_bytes = stats.bytes_read_non_output_levels + stats.bytes_read_output_level; compaction_job_stats_->num_input_records = compact_->num_input_records; compaction_job_stats_->num_input_files = stats.num_input_files_in_non_output_levels + stats.num_input_files_in_output_level; compaction_job_stats_->num_input_files_at_output_level = stats.num_input_files_in_output_level; // output information compaction_job_stats_->total_output_bytes = stats.bytes_written; compaction_job_stats_->num_output_records = compact_->num_output_records; compaction_job_stats_->num_output_files = stats.num_output_files; if (compact_->NumOutputFiles() > 0U) { CopyPrefix( compact_->SmallestUserKey(), CompactionJobStats::kMaxPrefixLength, &compaction_job_stats_->smallest_output_key_prefix); CopyPrefix( compact_->LargestUserKey(), CompactionJobStats::kMaxPrefixLength, &compaction_job_stats_->largest_output_key_prefix); } } #endif // !ROCKSDB_LITE } void CompactionJob::LogCompaction() { Compaction* compaction = compact_->compaction; ColumnFamilyData* cfd = compaction->column_family_data(); // Let's check if anything will get logged. Don't prepare all the info if // we're not logging if (db_options_.info_log_level <= InfoLogLevel::INFO_LEVEL) { Compaction::InputLevelSummaryBuffer inputs_summary; ROCKS_LOG_INFO( db_options_.info_log, "[%s] [JOB %d] Compacting %s, score %.2f", cfd->GetName().c_str(), job_id_, compaction->InputLevelSummary(&inputs_summary), compaction->score()); char scratch[2345]; compaction->Summary(scratch, sizeof(scratch)); ROCKS_LOG_INFO(db_options_.info_log, "[%s] Compaction start summary: %s\n", cfd->GetName().c_str(), scratch); // build event logger report auto stream = event_logger_->Log(); stream << "job" << job_id_ << "event" << "compaction_started"; for (size_t i = 0; i < compaction->num_input_levels(); ++i) { stream << ("files_L" + ToString(compaction->level(i))); stream.StartArray(); for (auto f : *compaction->inputs(i)) { stream << f->fd.GetNumber(); } stream.EndArray(); } stream << "score" << compaction->score() << "input_data_size" << compaction->CalculateTotalInputSize(); } } } // namespace rocksdb