// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #include "db/db_test_util.h" #include "port/stack_trace.h" #include "table/unique_id_impl.h" namespace ROCKSDB_NAMESPACE { class MyTestCompactionService : public CompactionService { public: MyTestCompactionService( std::string db_path, Options& options, std::shared_ptr& statistics, std::vector>& listeners, std::vector> table_properties_collector_factories) : db_path_(std::move(db_path)), options_(options), statistics_(statistics), start_info_("na", "na", "na", 0, Env::TOTAL), wait_info_("na", "na", "na", 0, Env::TOTAL), listeners_(listeners), table_properties_collector_factories_( std::move(table_properties_collector_factories)) {} static const char* kClassName() { return "MyTestCompactionService"; } const char* Name() const override { return kClassName(); } CompactionServiceJobStatus StartV2( const CompactionServiceJobInfo& info, const std::string& compaction_service_input) override { InstrumentedMutexLock l(&mutex_); start_info_ = info; assert(info.db_name == db_path_); jobs_.emplace(info.job_id, compaction_service_input); CompactionServiceJobStatus s = CompactionServiceJobStatus::kSuccess; if (is_override_start_status_) { return override_start_status_; } return s; } CompactionServiceJobStatus WaitForCompleteV2( const CompactionServiceJobInfo& info, std::string* compaction_service_result) override { std::string compaction_input; assert(info.db_name == db_path_); { InstrumentedMutexLock l(&mutex_); wait_info_ = info; auto i = jobs_.find(info.job_id); if (i == jobs_.end()) { return CompactionServiceJobStatus::kFailure; } compaction_input = std::move(i->second); jobs_.erase(i); } if (is_override_wait_status_) { return override_wait_status_; } CompactionServiceOptionsOverride options_override; options_override.env = options_.env; options_override.file_checksum_gen_factory = options_.file_checksum_gen_factory; options_override.comparator = options_.comparator; options_override.merge_operator = options_.merge_operator; options_override.compaction_filter = options_.compaction_filter; options_override.compaction_filter_factory = options_.compaction_filter_factory; options_override.prefix_extractor = options_.prefix_extractor; options_override.table_factory = options_.table_factory; options_override.sst_partitioner_factory = options_.sst_partitioner_factory; options_override.statistics = statistics_; if (!listeners_.empty()) { options_override.listeners = listeners_; } if (!table_properties_collector_factories_.empty()) { options_override.table_properties_collector_factories = table_properties_collector_factories_; } OpenAndCompactOptions options; options.canceled = &canceled_; Status s = DB::OpenAndCompact( options, db_path_, db_path_ + "/" + std::to_string(info.job_id), compaction_input, compaction_service_result, options_override); if (is_override_wait_result_) { *compaction_service_result = override_wait_result_; } compaction_num_.fetch_add(1); if (s.ok()) { return CompactionServiceJobStatus::kSuccess; } else { return CompactionServiceJobStatus::kFailure; } } int GetCompactionNum() { return compaction_num_.load(); } CompactionServiceJobInfo GetCompactionInfoForStart() { return start_info_; } CompactionServiceJobInfo GetCompactionInfoForWait() { return wait_info_; } void OverrideStartStatus(CompactionServiceJobStatus s) { is_override_start_status_ = true; override_start_status_ = s; } void OverrideWaitStatus(CompactionServiceJobStatus s) { is_override_wait_status_ = true; override_wait_status_ = s; } void OverrideWaitResult(std::string str) { is_override_wait_result_ = true; override_wait_result_ = std::move(str); } void ResetOverride() { is_override_wait_result_ = false; is_override_start_status_ = false; is_override_wait_status_ = false; } void SetCanceled(bool canceled) { canceled_ = canceled; } private: InstrumentedMutex mutex_; std::atomic_int compaction_num_{0}; std::map jobs_; const std::string db_path_; Options options_; std::shared_ptr statistics_; CompactionServiceJobInfo start_info_; CompactionServiceJobInfo wait_info_; bool is_override_start_status_ = false; CompactionServiceJobStatus override_start_status_ = CompactionServiceJobStatus::kFailure; bool is_override_wait_status_ = false; CompactionServiceJobStatus override_wait_status_ = CompactionServiceJobStatus::kFailure; bool is_override_wait_result_ = false; std::string override_wait_result_; std::vector> listeners_; std::vector> table_properties_collector_factories_; std::atomic_bool canceled_{false}; }; class CompactionServiceTest : public DBTestBase { public: explicit CompactionServiceTest() : DBTestBase("compaction_service_test", true) {} protected: void ReopenWithCompactionService(Options* options) { options->env = env_; primary_statistics_ = CreateDBStatistics(); options->statistics = primary_statistics_; compactor_statistics_ = CreateDBStatistics(); compaction_service_ = std::make_shared( dbname_, *options, compactor_statistics_, remote_listeners, remote_table_properties_collector_factories); options->compaction_service = compaction_service_; DestroyAndReopen(*options); } Statistics* GetCompactorStatistics() { return compactor_statistics_.get(); } Statistics* GetPrimaryStatistics() { return primary_statistics_.get(); } MyTestCompactionService* GetCompactionService() { CompactionService* cs = compaction_service_.get(); return static_cast_with_check(cs); } void GenerateTestData() { // Generate 20 files @ L2 for (int i = 0; i < 20; i++) { for (int j = 0; j < 10; j++) { int key_id = i * 10 + j; ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id))); } ASSERT_OK(Flush()); } MoveFilesToLevel(2); // Generate 10 files @ L1 overlap with all 20 files @ L2 for (int i = 0; i < 10; i++) { for (int j = 0; j < 10; j++) { int key_id = i * 20 + j * 2; ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id))); } ASSERT_OK(Flush()); } MoveFilesToLevel(1); ASSERT_EQ(FilesPerLevel(), "0,10,20"); } void VerifyTestData() { for (int i = 0; i < 200; i++) { auto result = Get(Key(i)); if (i % 2) { ASSERT_EQ(result, "value" + std::to_string(i)); } else { ASSERT_EQ(result, "value_new" + std::to_string(i)); } } } std::vector> remote_listeners; std::vector> remote_table_properties_collector_factories; private: std::shared_ptr compactor_statistics_; std::shared_ptr primary_statistics_; std::shared_ptr compaction_service_; }; TEST_F(CompactionServiceTest, BasicCompactions) { Options options = CurrentOptions(); ReopenWithCompactionService(&options); Statistics* primary_statistics = GetPrimaryStatistics(); Statistics* compactor_statistics = GetCompactorStatistics(); for (int i = 0; i < 20; i++) { for (int j = 0; j < 10; j++) { int key_id = i * 10 + j; ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id))); } ASSERT_OK(Flush()); } for (int i = 0; i < 10; i++) { for (int j = 0; j < 10; j++) { int key_id = i * 20 + j * 2; ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id))); } ASSERT_OK(Flush()); } ASSERT_OK(dbfull()->TEST_WaitForCompact()); // verify result for (int i = 0; i < 200; i++) { auto result = Get(Key(i)); if (i % 2) { ASSERT_EQ(result, "value" + std::to_string(i)); } else { ASSERT_EQ(result, "value_new" + std::to_string(i)); } } auto my_cs = GetCompactionService(); ASSERT_GE(my_cs->GetCompactionNum(), 1); // make sure the compaction statistics is only recorded on the remote side ASSERT_GE(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES), 1); ASSERT_GE(compactor_statistics->getTickerCount(COMPACT_READ_BYTES), 1); ASSERT_EQ(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES), 0); // even with remote compaction, primary host still needs to read SST files to // `verify_table()`. ASSERT_GE(primary_statistics->getTickerCount(COMPACT_READ_BYTES), 1); // all the compaction write happens on the remote side ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES), compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES)); ASSERT_GE(primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 1); ASSERT_GT(primary_statistics->getTickerCount(COMPACT_READ_BYTES), primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES)); // compactor is already the remote side, which doesn't have remote ASSERT_EQ(compactor_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 0); ASSERT_EQ(compactor_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES), 0); // Test failed compaction SyncPoint::GetInstance()->SetCallBack( "DBImplSecondary::CompactWithoutInstallation::End", [&](void* status) { // override job status auto s = static_cast(status); *s = Status::Aborted("MyTestCompactionService failed to compact!"); }); SyncPoint::GetInstance()->EnableProcessing(); Status s; for (int i = 0; i < 10; i++) { for (int j = 0; j < 10; j++) { int key_id = i * 20 + j * 2; s = Put(Key(key_id), "value_new" + std::to_string(key_id)); if (s.IsAborted()) { break; } } if (s.IsAborted()) { break; } s = Flush(); if (s.IsAborted()) { break; } s = dbfull()->TEST_WaitForCompact(); if (s.IsAborted()) { break; } } ASSERT_TRUE(s.IsAborted()); // Test re-open and successful unique id verification std::atomic_int verify_passed{0}; SyncPoint::GetInstance()->SetCallBack( "BlockBasedTable::Open::PassedVerifyUniqueId", [&](void* arg) { // override job status auto id = static_cast(arg); assert(*id != kNullUniqueId64x2); verify_passed++; }); Reopen(options); ASSERT_GT(verify_passed, 0); Close(); } TEST_F(CompactionServiceTest, ManualCompaction) { Options options = CurrentOptions(); options.disable_auto_compactions = true; ReopenWithCompactionService(&options); GenerateTestData(); auto my_cs = GetCompactionService(); std::string start_str = Key(15); std::string end_str = Key(45); Slice start(start_str); Slice end(end_str); uint64_t comp_num = my_cs->GetCompactionNum(); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end)); ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1); VerifyTestData(); start_str = Key(120); start = start_str; comp_num = my_cs->GetCompactionNum(); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, nullptr)); ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1); VerifyTestData(); end_str = Key(92); end = end_str; comp_num = my_cs->GetCompactionNum(); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, &end)); ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1); VerifyTestData(); comp_num = my_cs->GetCompactionNum(); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1); VerifyTestData(); } TEST_F(CompactionServiceTest, CancelCompactionOnRemoteSide) { Options options = CurrentOptions(); options.disable_auto_compactions = true; ReopenWithCompactionService(&options); GenerateTestData(); auto my_cs = GetCompactionService(); std::string start_str = Key(15); std::string end_str = Key(45); Slice start(start_str); Slice end(end_str); uint64_t comp_num = my_cs->GetCompactionNum(); // Test cancel compaction at the beginning my_cs->SetCanceled(true); auto s = db_->CompactRange(CompactRangeOptions(), &start, &end); ASSERT_TRUE(s.IsIncomplete()); // compaction number is not increased ASSERT_GE(my_cs->GetCompactionNum(), comp_num); VerifyTestData(); // Test cancel compaction in progress ReopenWithCompactionService(&options); GenerateTestData(); my_cs = GetCompactionService(); my_cs->SetCanceled(false); std::atomic_bool cancel_issued{false}; SyncPoint::GetInstance()->SetCallBack("CompactionJob::Run():Inprogress", [&](void* /*arg*/) { cancel_issued = true; my_cs->SetCanceled(true); }); SyncPoint::GetInstance()->EnableProcessing(); s = db_->CompactRange(CompactRangeOptions(), &start, &end); ASSERT_TRUE(s.IsIncomplete()); ASSERT_TRUE(cancel_issued); // compaction number is not increased ASSERT_GE(my_cs->GetCompactionNum(), comp_num); VerifyTestData(); } TEST_F(CompactionServiceTest, FailedToStart) { Options options = CurrentOptions(); options.disable_auto_compactions = true; ReopenWithCompactionService(&options); GenerateTestData(); auto my_cs = GetCompactionService(); my_cs->OverrideStartStatus(CompactionServiceJobStatus::kFailure); std::string start_str = Key(15); std::string end_str = Key(45); Slice start(start_str); Slice end(end_str); Status s = db_->CompactRange(CompactRangeOptions(), &start, &end); ASSERT_TRUE(s.IsIncomplete()); } TEST_F(CompactionServiceTest, InvalidResult) { Options options = CurrentOptions(); options.disable_auto_compactions = true; ReopenWithCompactionService(&options); GenerateTestData(); auto my_cs = GetCompactionService(); my_cs->OverrideWaitResult("Invalid Str"); std::string start_str = Key(15); std::string end_str = Key(45); Slice start(start_str); Slice end(end_str); Status s = db_->CompactRange(CompactRangeOptions(), &start, &end); ASSERT_FALSE(s.ok()); } TEST_F(CompactionServiceTest, SubCompaction) { Options options = CurrentOptions(); options.max_subcompactions = 10; options.target_file_size_base = 1 << 10; // 1KB options.disable_auto_compactions = true; ReopenWithCompactionService(&options); GenerateTestData(); VerifyTestData(); auto my_cs = GetCompactionService(); int compaction_num_before = my_cs->GetCompactionNum(); auto cro = CompactRangeOptions(); cro.max_subcompactions = 10; Status s = db_->CompactRange(cro, nullptr, nullptr); ASSERT_OK(s); VerifyTestData(); int compaction_num = my_cs->GetCompactionNum() - compaction_num_before; // make sure there's sub-compaction by checking the compaction number ASSERT_GE(compaction_num, 2); } class PartialDeleteCompactionFilter : public CompactionFilter { public: CompactionFilter::Decision FilterV2( int /*level*/, const Slice& key, ValueType /*value_type*/, const Slice& /*existing_value*/, std::string* /*new_value*/, std::string* /*skip_until*/) const override { int i = std::stoi(key.ToString().substr(3)); if (i > 5 && i <= 105) { return CompactionFilter::Decision::kRemove; } return CompactionFilter::Decision::kKeep; } const char* Name() const override { return "PartialDeleteCompactionFilter"; } }; TEST_F(CompactionServiceTest, CompactionFilter) { Options options = CurrentOptions(); std::unique_ptr delete_comp_filter( new PartialDeleteCompactionFilter()); options.compaction_filter = delete_comp_filter.get(); ReopenWithCompactionService(&options); for (int i = 0; i < 20; i++) { for (int j = 0; j < 10; j++) { int key_id = i * 10 + j; ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id))); } ASSERT_OK(Flush()); } for (int i = 0; i < 10; i++) { for (int j = 0; j < 10; j++) { int key_id = i * 20 + j * 2; ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id))); } ASSERT_OK(Flush()); } ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); // verify result for (int i = 0; i < 200; i++) { auto result = Get(Key(i)); if (i > 5 && i <= 105) { ASSERT_EQ(result, "NOT_FOUND"); } else if (i % 2) { ASSERT_EQ(result, "value" + std::to_string(i)); } else { ASSERT_EQ(result, "value_new" + std::to_string(i)); } } auto my_cs = GetCompactionService(); ASSERT_GE(my_cs->GetCompactionNum(), 1); } TEST_F(CompactionServiceTest, Snapshot) { Options options = CurrentOptions(); ReopenWithCompactionService(&options); ASSERT_OK(Put(Key(1), "value1")); ASSERT_OK(Put(Key(2), "value1")); const Snapshot* s1 = db_->GetSnapshot(); ASSERT_OK(Flush()); ASSERT_OK(Put(Key(1), "value2")); ASSERT_OK(Put(Key(3), "value2")); ASSERT_OK(Flush()); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); auto my_cs = GetCompactionService(); ASSERT_GE(my_cs->GetCompactionNum(), 1); ASSERT_EQ("value1", Get(Key(1), s1)); ASSERT_EQ("value2", Get(Key(1))); db_->ReleaseSnapshot(s1); } TEST_F(CompactionServiceTest, ConcurrentCompaction) { Options options = CurrentOptions(); options.level0_file_num_compaction_trigger = 100; options.max_background_jobs = 20; ReopenWithCompactionService(&options); GenerateTestData(); ColumnFamilyMetaData meta; db_->GetColumnFamilyMetaData(&meta); std::vector threads; for (const auto& file : meta.levels[1].files) { threads.emplace_back(std::thread([&]() { std::string fname = file.db_path + "/" + file.name; ASSERT_OK(db_->CompactFiles(CompactionOptions(), {fname}, 2)); })); } for (auto& thread : threads) { thread.join(); } ASSERT_OK(dbfull()->TEST_WaitForCompact()); // verify result for (int i = 0; i < 200; i++) { auto result = Get(Key(i)); if (i % 2) { ASSERT_EQ(result, "value" + std::to_string(i)); } else { ASSERT_EQ(result, "value_new" + std::to_string(i)); } } auto my_cs = GetCompactionService(); ASSERT_EQ(my_cs->GetCompactionNum(), 10); ASSERT_EQ(FilesPerLevel(), "0,0,10"); } TEST_F(CompactionServiceTest, CompactionInfo) { Options options = CurrentOptions(); ReopenWithCompactionService(&options); for (int i = 0; i < 20; i++) { for (int j = 0; j < 10; j++) { int key_id = i * 10 + j; ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id))); } ASSERT_OK(Flush()); } for (int i = 0; i < 10; i++) { for (int j = 0; j < 10; j++) { int key_id = i * 20 + j * 2; ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id))); } ASSERT_OK(Flush()); } ASSERT_OK(dbfull()->TEST_WaitForCompact()); auto my_cs = static_cast_with_check(GetCompactionService()); uint64_t comp_num = my_cs->GetCompactionNum(); ASSERT_GE(comp_num, 1); CompactionServiceJobInfo info = my_cs->GetCompactionInfoForStart(); ASSERT_EQ(dbname_, info.db_name); std::string db_id, db_session_id; ASSERT_OK(db_->GetDbIdentity(db_id)); ASSERT_EQ(db_id, info.db_id); ASSERT_OK(db_->GetDbSessionId(db_session_id)); ASSERT_EQ(db_session_id, info.db_session_id); ASSERT_EQ(Env::LOW, info.priority); info = my_cs->GetCompactionInfoForWait(); ASSERT_EQ(dbname_, info.db_name); ASSERT_EQ(db_id, info.db_id); ASSERT_EQ(db_session_id, info.db_session_id); ASSERT_EQ(Env::LOW, info.priority); // Test priority USER ColumnFamilyMetaData meta; db_->GetColumnFamilyMetaData(&meta); SstFileMetaData file = meta.levels[1].files[0]; ASSERT_OK(db_->CompactFiles(CompactionOptions(), {file.db_path + "/" + file.name}, 2)); info = my_cs->GetCompactionInfoForStart(); ASSERT_EQ(Env::USER, info.priority); info = my_cs->GetCompactionInfoForWait(); ASSERT_EQ(Env::USER, info.priority); // Test priority BOTTOM env_->SetBackgroundThreads(1, Env::BOTTOM); options.num_levels = 2; ReopenWithCompactionService(&options); my_cs = static_cast_with_check(GetCompactionService()); for (int i = 0; i < 20; i++) { for (int j = 0; j < 10; j++) { int key_id = i * 10 + j; ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id))); } ASSERT_OK(Flush()); } for (int i = 0; i < 4; i++) { for (int j = 0; j < 10; j++) { int key_id = i * 20 + j * 2; ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id))); } ASSERT_OK(Flush()); } ASSERT_OK(dbfull()->TEST_WaitForCompact()); info = my_cs->GetCompactionInfoForStart(); ASSERT_EQ(Env::BOTTOM, info.priority); info = my_cs->GetCompactionInfoForWait(); ASSERT_EQ(Env::BOTTOM, info.priority); } TEST_F(CompactionServiceTest, FallbackLocalAuto) { Options options = CurrentOptions(); ReopenWithCompactionService(&options); auto my_cs = GetCompactionService(); Statistics* compactor_statistics = GetCompactorStatistics(); Statistics* primary_statistics = GetPrimaryStatistics(); uint64_t compactor_write_bytes = compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES); uint64_t primary_write_bytes = primary_statistics->getTickerCount(COMPACT_WRITE_BYTES); my_cs->OverrideStartStatus(CompactionServiceJobStatus::kUseLocal); for (int i = 0; i < 20; i++) { for (int j = 0; j < 10; j++) { int key_id = i * 10 + j; ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id))); } ASSERT_OK(Flush()); } for (int i = 0; i < 10; i++) { for (int j = 0; j < 10; j++) { int key_id = i * 20 + j * 2; ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id))); } ASSERT_OK(Flush()); } ASSERT_OK(dbfull()->TEST_WaitForCompact()); // verify result for (int i = 0; i < 200; i++) { auto result = Get(Key(i)); if (i % 2) { ASSERT_EQ(result, "value" + std::to_string(i)); } else { ASSERT_EQ(result, "value_new" + std::to_string(i)); } } ASSERT_EQ(my_cs->GetCompactionNum(), 0); // make sure the compaction statistics is only recorded on the local side ASSERT_EQ(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES), compactor_write_bytes); ASSERT_GT(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES), primary_write_bytes); ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_READ_BYTES), 0); ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES), 0); } TEST_F(CompactionServiceTest, FallbackLocalManual) { Options options = CurrentOptions(); options.disable_auto_compactions = true; ReopenWithCompactionService(&options); GenerateTestData(); VerifyTestData(); auto my_cs = GetCompactionService(); Statistics* compactor_statistics = GetCompactorStatistics(); Statistics* primary_statistics = GetPrimaryStatistics(); uint64_t compactor_write_bytes = compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES); uint64_t primary_write_bytes = primary_statistics->getTickerCount(COMPACT_WRITE_BYTES); // re-enable remote compaction my_cs->ResetOverride(); std::string start_str = Key(15); std::string end_str = Key(45); Slice start(start_str); Slice end(end_str); uint64_t comp_num = my_cs->GetCompactionNum(); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &end)); ASSERT_GE(my_cs->GetCompactionNum(), comp_num + 1); // make sure the compaction statistics is only recorded on the remote side ASSERT_GT(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES), compactor_write_bytes); ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES), compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES)); ASSERT_EQ(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES), primary_write_bytes); // return run local again with API WaitForComplete my_cs->OverrideWaitStatus(CompactionServiceJobStatus::kUseLocal); start_str = Key(120); start = start_str; comp_num = my_cs->GetCompactionNum(); compactor_write_bytes = compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES); primary_write_bytes = primary_statistics->getTickerCount(COMPACT_WRITE_BYTES); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, nullptr)); ASSERT_EQ(my_cs->GetCompactionNum(), comp_num); // no remote compaction is run // make sure the compaction statistics is only recorded on the local side ASSERT_EQ(compactor_statistics->getTickerCount(COMPACT_WRITE_BYTES), compactor_write_bytes); ASSERT_GT(primary_statistics->getTickerCount(COMPACT_WRITE_BYTES), primary_write_bytes); ASSERT_EQ(primary_statistics->getTickerCount(REMOTE_COMPACT_WRITE_BYTES), compactor_write_bytes); // verify result after 2 manual compactions VerifyTestData(); } TEST_F(CompactionServiceTest, RemoteEventListener) { class RemoteEventListenerTest : public EventListener { public: const char* Name() const override { return "RemoteEventListenerTest"; } void OnSubcompactionBegin(const SubcompactionJobInfo& info) override { auto result = on_going_compactions.emplace(info.job_id); ASSERT_TRUE(result.second); // make sure there's no duplication compaction_num++; EventListener::OnSubcompactionBegin(info); } void OnSubcompactionCompleted(const SubcompactionJobInfo& info) override { auto num = on_going_compactions.erase(info.job_id); ASSERT_TRUE(num == 1); // make sure the compaction id exists EventListener::OnSubcompactionCompleted(info); } void OnTableFileCreated(const TableFileCreationInfo& info) override { ASSERT_EQ(on_going_compactions.count(info.job_id), 1); file_created++; EventListener::OnTableFileCreated(info); } void OnTableFileCreationStarted( const TableFileCreationBriefInfo& info) override { ASSERT_EQ(on_going_compactions.count(info.job_id), 1); file_creation_started++; EventListener::OnTableFileCreationStarted(info); } bool ShouldBeNotifiedOnFileIO() override { file_io_notified++; return EventListener::ShouldBeNotifiedOnFileIO(); } std::atomic_uint64_t file_io_notified{0}; std::atomic_uint64_t file_creation_started{0}; std::atomic_uint64_t file_created{0}; std::set on_going_compactions; // store the job_id std::atomic_uint64_t compaction_num{0}; }; auto listener = new RemoteEventListenerTest(); remote_listeners.emplace_back(listener); Options options = CurrentOptions(); options.max_background_compactions = 1; ReopenWithCompactionService(&options); // multiple compactions might notify on OnSubcompactionBegin simultaneously // which will lead to duplicates in the set. job_id is always 1. was the // intention that no two compaction service jobs run in parallel? or that the // job_id should be unique? env_->SetBackgroundThreads(1, Env::LOW); for (int i = 0; i < 20; i++) { for (int j = 0; j < 10; j++) { int key_id = i * 10 + j; ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id))); } ASSERT_OK(Flush()); } for (int i = 0; i < 10; i++) { for (int j = 0; j < 10; j++) { int key_id = i * 20 + j * 2; ASSERT_OK(Put(Key(key_id), "value_new" + std::to_string(key_id))); } ASSERT_OK(Flush()); } ASSERT_OK(dbfull()->TEST_WaitForCompact()); // check the events are triggered ASSERT_TRUE(listener->file_io_notified > 0); ASSERT_TRUE(listener->file_creation_started > 0); ASSERT_TRUE(listener->file_created > 0); ASSERT_TRUE(listener->compaction_num > 0); ASSERT_TRUE(listener->on_going_compactions.empty()); // verify result for (int i = 0; i < 200; i++) { auto result = Get(Key(i)); if (i % 2) { ASSERT_EQ(result, "value" + std::to_string(i)); } else { ASSERT_EQ(result, "value_new" + std::to_string(i)); } } } TEST_F(CompactionServiceTest, TablePropertiesCollector) { const static std::string kUserPropertyName = "TestCount"; class TablePropertiesCollectorTest : public TablePropertiesCollector { public: Status Finish(UserCollectedProperties* properties) override { *properties = UserCollectedProperties{ {kUserPropertyName, std::to_string(count_)}, }; return Status::OK(); } UserCollectedProperties GetReadableProperties() const override { return UserCollectedProperties(); } const char* Name() const override { return "TablePropertiesCollectorTest"; } Status AddUserKey(const Slice& /*user_key*/, const Slice& /*value*/, EntryType /*type*/, SequenceNumber /*seq*/, uint64_t /*file_size*/) override { count_++; return Status::OK(); } private: uint32_t count_ = 0; }; class TablePropertiesCollectorFactoryTest : public TablePropertiesCollectorFactory { public: TablePropertiesCollector* CreateTablePropertiesCollector( TablePropertiesCollectorFactory::Context /*context*/) override { return new TablePropertiesCollectorTest(); } const char* Name() const override { return "TablePropertiesCollectorFactoryTest"; } }; auto factory = new TablePropertiesCollectorFactoryTest(); remote_table_properties_collector_factories.emplace_back(factory); const int kNumSst = 3; const int kLevel0Trigger = 4; Options options = CurrentOptions(); options.level0_file_num_compaction_trigger = kLevel0Trigger; ReopenWithCompactionService(&options); // generate a few SSTs locally which should not have user property for (int i = 0; i < kNumSst; i++) { for (int j = 0; j < 100; j++) { ASSERT_OK(Put(Key(i * 10 + j), "value")); } ASSERT_OK(Flush()); } TablePropertiesCollection fname_to_props; ASSERT_OK(db_->GetPropertiesOfAllTables(&fname_to_props)); for (const auto& file_props : fname_to_props) { auto properties = file_props.second->user_collected_properties; auto it = properties.find(kUserPropertyName); ASSERT_EQ(it, properties.end()); } // trigger compaction for (int i = kNumSst; i < kLevel0Trigger; i++) { for (int j = 0; j < 100; j++) { ASSERT_OK(Put(Key(i * 10 + j), "value")); } ASSERT_OK(Flush()); } ASSERT_OK(dbfull()->TEST_WaitForCompact(true)); ASSERT_OK(db_->GetPropertiesOfAllTables(&fname_to_props)); bool has_user_property = false; for (const auto& file_props : fname_to_props) { auto properties = file_props.second->user_collected_properties; auto it = properties.find(kUserPropertyName); if (it != properties.end()) { has_user_property = true; ASSERT_GT(std::stoi(it->second), 0); } } ASSERT_TRUE(has_user_property); } } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); ::testing::InitGoogleTest(&argc, argv); RegisterCustomObjects(argc, argv); return RUN_ALL_TESTS(); }