// Copyright (c) 2013, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). // // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #ifndef ROCKSDB_LITE #include "utilities/persistent_cache/persistent_cache_test.h" #include #include #include #include "utilities/persistent_cache/block_cache_tier.h" namespace rocksdb { static const double kStressFactor = .125; #ifdef OS_LINUX static void OnOpenForRead(void* arg) { int* val = static_cast(arg); *val &= ~O_DIRECT; rocksdb::SyncPoint::GetInstance()->SetCallBack( "NewRandomAccessFile:O_DIRECT", std::bind(OnOpenForRead, std::placeholders::_1)); } static void OnOpenForWrite(void* arg) { int* val = static_cast(arg); *val &= ~O_DIRECT; rocksdb::SyncPoint::GetInstance()->SetCallBack( "NewWritableFile:O_DIRECT", std::bind(OnOpenForWrite, std::placeholders::_1)); } #endif static void RemoveDirectory(const std::string& folder) { std::vector files; Status status = Env::Default()->GetChildren(folder, &files); if (!status.ok()) { // we assume the directory does not exist return; } // cleanup files with the patter :digi:.rc for (auto file : files) { if (file == "." || file == "..") { continue; } status = Env::Default()->DeleteFile(folder + "/" + file); assert(status.ok()); } status = Env::Default()->DeleteDir(folder); assert(status.ok()); } static void OnDeleteDir(void* arg) { char* dir = static_cast(arg); RemoveDirectory(std::string(dir)); } // // Simple logger that prints message on stdout // class ConsoleLogger : public Logger { public: using Logger::Logv; ConsoleLogger() : Logger(InfoLogLevel::ERROR_LEVEL) {} void Logv(const char* format, va_list ap) override { MutexLock _(&lock_); vprintf(format, ap); printf("\n"); } port::Mutex lock_; }; // construct a tiered RAM+Block cache std::unique_ptr NewTieredCache( const size_t mem_size, const PersistentCacheConfig& opt) { std::unique_ptr tcache(new PersistentTieredCache()); // create primary tier assert(mem_size); auto pcache = std::shared_ptr(new VolatileCacheTier( /*is_compressed*/ true, mem_size)); tcache->AddTier(pcache); // create secondary tier auto scache = std::shared_ptr(new BlockCacheTier(opt)); tcache->AddTier(scache); Status s = tcache->Open(); assert(s.ok()); return tcache; } // create block cache std::unique_ptr NewBlockCache( Env* env, const std::string& path, const uint64_t max_size = std::numeric_limits::max(), const bool enable_direct_writes = false) { const uint32_t max_file_size = static_cast(12 * 1024 * 1024 * kStressFactor); auto log = std::make_shared(); PersistentCacheConfig opt(env, path, max_size, log); opt.cache_file_size = max_file_size; opt.max_write_pipeline_backlog_size = std::numeric_limits::max(); opt.enable_direct_writes = enable_direct_writes; std::unique_ptr scache(new BlockCacheTier(opt)); Status s = scache->Open(); assert(s.ok()); return scache; } // create a new cache tier std::unique_ptr NewTieredCache( Env* env, const std::string& path, const uint64_t max_volatile_cache_size, const uint64_t max_block_cache_size = std::numeric_limits::max()) { const uint32_t max_file_size = static_cast(12 * 1024 * 1024 * kStressFactor); auto log = std::make_shared(); auto opt = PersistentCacheConfig(env, path, max_block_cache_size, log); opt.cache_file_size = max_file_size; opt.max_write_pipeline_backlog_size = std::numeric_limits::max(); // create tier out of the two caches auto cache = NewTieredCache(max_volatile_cache_size, opt); return cache; } PersistentCacheTierTest::PersistentCacheTierTest() : path_(test::TmpDir(Env::Default()) + "/cache_test") { #ifdef OS_LINUX rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->SetCallBack("NewRandomAccessFile:O_DIRECT", OnOpenForRead); rocksdb::SyncPoint::GetInstance()->SetCallBack("NewWritableFile:O_DIRECT", OnOpenForWrite); #endif } // Block cache tests TEST_F(PersistentCacheTierTest, DISABLED_BlockCacheInsertWithFileCreateError) { cache_ = NewBlockCache(Env::Default(), path_, /*size=*/std::numeric_limits::max(), /*direct_writes=*/ false); rocksdb::SyncPoint::GetInstance()->SetCallBack( "BlockCacheTier::NewCacheFile:DeleteDir", OnDeleteDir); RunNegativeInsertTest(/*nthreads=*/ 1, /*max_keys*/ static_cast(10 * 1024 * kStressFactor)); rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); } #ifdef TRAVIS // Travis is unable to handle the normal version of the tests running out of // fds, out of space and timeouts. This is an easier version of the test // specifically written for Travis TEST_F(PersistentCacheTierTest, BasicTest) { cache_ = std::make_shared(); RunInsertTest(/*nthreads=*/1, /*max_keys=*/1024); cache_ = NewBlockCache(Env::Default(), path_, /*size=*/std::numeric_limits::max(), /*direct_writes=*/true); RunInsertTest(/*nthreads=*/1, /*max_keys=*/1024); cache_ = NewTieredCache(Env::Default(), path_, /*memory_size=*/static_cast(1 * 1024 * 1024)); RunInsertTest(/*nthreads=*/1, /*max_keys=*/1024); } #else // Volatile cache tests TEST_F(PersistentCacheTierTest, VolatileCacheInsert) { for (auto nthreads : {1, 5}) { for (auto max_keys : {10 * 1024 * kStressFactor, 1 * 1024 * 1024 * kStressFactor}) { cache_ = std::make_shared(); RunInsertTest(nthreads, static_cast(max_keys)); } } } TEST_F(PersistentCacheTierTest, VolatileCacheInsertWithEviction) { for (auto nthreads : {1, 5}) { for (auto max_keys : {1 * 1024 * 1024 * kStressFactor}) { cache_ = std::make_shared( /*compressed=*/true, /*size=*/static_cast(1 * 1024 * 1024 * kStressFactor)); RunInsertTestWithEviction(nthreads, static_cast(max_keys)); } } } // Block cache tests TEST_F(PersistentCacheTierTest, BlockCacheInsert) { for (auto direct_writes : {true, false}) { for (auto nthreads : {1, 5}) { for (auto max_keys : {10 * 1024 * kStressFactor, 1 * 1024 * 1024 * kStressFactor}) { cache_ = NewBlockCache(Env::Default(), path_, /*size=*/std::numeric_limits::max(), direct_writes); RunInsertTest(nthreads, static_cast(max_keys)); } } } } TEST_F(PersistentCacheTierTest, BlockCacheInsertWithEviction) { for (auto nthreads : {1, 5}) { for (auto max_keys : {1 * 1024 * 1024 * kStressFactor}) { cache_ = NewBlockCache(Env::Default(), path_, /*max_size=*/static_cast(200 * 1024 * 1024 * kStressFactor)); RunInsertTestWithEviction(nthreads, static_cast(max_keys)); } } } // Tiered cache tests TEST_F(PersistentCacheTierTest, TieredCacheInsert) { for (auto nthreads : {1, 5}) { for (auto max_keys : {10 * 1024 * kStressFactor, 1 * 1024 * 1024 * kStressFactor}) { cache_ = NewTieredCache(Env::Default(), path_, /*memory_size=*/static_cast(1 * 1024 * 1024 * kStressFactor)); RunInsertTest(nthreads, static_cast(max_keys)); } } } // the tests causes a lot of file deletions which Travis limited testing // environment cannot handle TEST_F(PersistentCacheTierTest, TieredCacheInsertWithEviction) { for (auto nthreads : {1, 5}) { for (auto max_keys : {1 * 1024 * 1024 * kStressFactor}) { cache_ = NewTieredCache( Env::Default(), path_, /*memory_size=*/static_cast(1 * 1024 * 1024 * kStressFactor), /*block_cache_size*/ static_cast(200 * 1024 * 1024 * kStressFactor)); RunInsertTestWithEviction(nthreads, static_cast(max_keys)); } } } #endif std::shared_ptr MakeVolatileCache( const std::string& /*dbname*/) { return std::make_shared(); } std::shared_ptr MakeBlockCache(const std::string& dbname) { return NewBlockCache(Env::Default(), dbname); } std::shared_ptr MakeTieredCache( const std::string& dbname) { const auto memory_size = 1 * 1024 * 1024 * kStressFactor; return NewTieredCache(Env::Default(), dbname, static_cast(memory_size)); } #ifdef OS_LINUX static void UniqueIdCallback(void* arg) { int* result = reinterpret_cast(arg); if (*result == -1) { *result = 0; } rocksdb::SyncPoint::GetInstance()->ClearTrace(); rocksdb::SyncPoint::GetInstance()->SetCallBack( "GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback); } #endif TEST_F(PersistentCacheTierTest, FactoryTest) { for (auto nvm_opt : {true, false}) { ASSERT_FALSE(cache_); auto log = std::make_shared(); std::shared_ptr cache; ASSERT_OK(NewPersistentCache(Env::Default(), path_, /*size=*/1 * 1024 * 1024 * 1024, log, nvm_opt, &cache)); ASSERT_TRUE(cache); ASSERT_EQ(cache->Stats().size(), 1); ASSERT_TRUE(cache->Stats()[0].size()); cache.reset(); } } PersistentCacheDBTest::PersistentCacheDBTest() : DBTestBase("/cache_test") { #ifdef OS_LINUX rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->SetCallBack( "GetUniqueIdFromFile:FS_IOC_GETVERSION", UniqueIdCallback); rocksdb::SyncPoint::GetInstance()->SetCallBack("NewRandomAccessFile:O_DIRECT", OnOpenForRead); #endif } // test template void PersistentCacheDBTest::RunTest( const std::function(bool)>& new_pcache, const size_t max_keys = 100 * 1024, const size_t max_usecase = 5) { if (!Snappy_Supported()) { return; } // number of insertion interations int num_iter = static_cast(max_keys * kStressFactor); for (size_t iter = 0; iter < max_usecase; iter++) { Options options; options.write_buffer_size = static_cast(64 * 1024 * kStressFactor); // small write buffer options.statistics = rocksdb::CreateDBStatistics(); options = CurrentOptions(options); // setup page cache std::shared_ptr pcache; BlockBasedTableOptions table_options; table_options.cache_index_and_filter_blocks = true; const size_t size_max = std::numeric_limits::max(); switch (iter) { case 0: // page cache, block cache, no-compressed cache pcache = new_pcache(/*is_compressed=*/true); table_options.persistent_cache = pcache; table_options.block_cache = NewLRUCache(size_max); table_options.block_cache_compressed = nullptr; options.table_factory.reset(NewBlockBasedTableFactory(table_options)); break; case 1: // page cache, block cache, compressed cache pcache = new_pcache(/*is_compressed=*/true); table_options.persistent_cache = pcache; table_options.block_cache = NewLRUCache(size_max); table_options.block_cache_compressed = NewLRUCache(size_max); options.table_factory.reset(NewBlockBasedTableFactory(table_options)); break; case 2: // page cache, block cache, compressed cache + KNoCompression // both block cache and compressed cache, but DB is not compressed // also, make block cache sizes bigger, to trigger block cache hits pcache = new_pcache(/*is_compressed=*/true); table_options.persistent_cache = pcache; table_options.block_cache = NewLRUCache(size_max); table_options.block_cache_compressed = NewLRUCache(size_max); options.table_factory.reset(NewBlockBasedTableFactory(table_options)); options.compression = kNoCompression; break; case 3: // page cache, no block cache, no compressed cache pcache = new_pcache(/*is_compressed=*/false); table_options.persistent_cache = pcache; table_options.block_cache = nullptr; table_options.block_cache_compressed = nullptr; options.table_factory.reset(NewBlockBasedTableFactory(table_options)); break; case 4: // page cache, no block cache, no compressed cache // Page cache caches compressed blocks pcache = new_pcache(/*is_compressed=*/true); table_options.persistent_cache = pcache; table_options.block_cache = nullptr; table_options.block_cache_compressed = nullptr; options.table_factory.reset(NewBlockBasedTableFactory(table_options)); break; default: ASSERT_TRUE(false); } std::vector values; // insert data Insert(options, table_options, num_iter, &values); // flush all data in cache to device pcache->TEST_Flush(); // verify data Verify(num_iter, values); auto block_miss = TestGetTickerCount(options, BLOCK_CACHE_MISS); auto compressed_block_hit = TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_HIT); auto compressed_block_miss = TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_MISS); auto page_hit = TestGetTickerCount(options, PERSISTENT_CACHE_HIT); auto page_miss = TestGetTickerCount(options, PERSISTENT_CACHE_MISS); // check that we triggered the appropriate code paths in the cache switch (iter) { case 0: // page cache, block cache, no-compressed cache ASSERT_GT(page_miss, 0); ASSERT_GT(page_hit, 0); ASSERT_GT(block_miss, 0); ASSERT_EQ(compressed_block_miss, 0); ASSERT_EQ(compressed_block_hit, 0); break; case 1: // page cache, block cache, compressed cache ASSERT_GT(page_miss, 0); ASSERT_GT(block_miss, 0); ASSERT_GT(compressed_block_miss, 0); break; case 2: // page cache, block cache, compressed cache + KNoCompression ASSERT_GT(page_miss, 0); ASSERT_GT(page_hit, 0); ASSERT_GT(block_miss, 0); ASSERT_GT(compressed_block_miss, 0); // remember kNoCompression ASSERT_EQ(compressed_block_hit, 0); break; case 3: case 4: // page cache, no block cache, no compressed cache ASSERT_GT(page_miss, 0); ASSERT_GT(page_hit, 0); ASSERT_EQ(compressed_block_hit, 0); ASSERT_EQ(compressed_block_miss, 0); break; default: ASSERT_TRUE(false); } options.create_if_missing = true; DestroyAndReopen(options); pcache->Close(); } } #ifdef TRAVIS // Travis is unable to handle the normal version of the tests running out of // fds, out of space and timeouts. This is an easier version of the test // specifically written for Travis TEST_F(PersistentCacheDBTest, BasicTest) { RunTest(std::bind(&MakeBlockCache, dbname_), /*max_keys=*/1024, /*max_usecase=*/1); } #else // test table with block page cache TEST_F(PersistentCacheDBTest, BlockCacheTest) { RunTest(std::bind(&MakeBlockCache, dbname_)); } // test table with volatile page cache TEST_F(PersistentCacheDBTest, VolatileCacheTest) { RunTest(std::bind(&MakeVolatileCache, dbname_)); } // test table with tiered page cache TEST_F(PersistentCacheDBTest, TieredCacheTest) { RunTest(std::bind(&MakeTieredCache, dbname_)); } #endif } // namespace rocksdb int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } #else int main() { return 0; } #endif