// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #include "cache/compressed_secondary_cache.h" #include #include #include #include "memory/memory_allocator_impl.h" #include "monitoring/perf_context_imp.h" #include "util/coding.h" #include "util/compression.h" #include "util/string_util.h" namespace ROCKSDB_NAMESPACE { CompressedSecondaryCache::CompressedSecondaryCache( const CompressedSecondaryCacheOptions& opts) : cache_(opts.LRUCacheOptions::MakeSharedCache()), cache_options_(opts), cache_res_mgr_(std::make_shared( std::make_shared>( cache_))), disable_cache_(opts.capacity == 0) {} CompressedSecondaryCache::~CompressedSecondaryCache() {} std::unique_ptr CompressedSecondaryCache::Lookup( const Slice& key, const Cache::CacheItemHelper* helper, Cache::CreateContext* create_context, bool /*wait*/, bool advise_erase, Statistics* stats, bool& kept_in_sec_cache) { assert(helper); // This is a minor optimization. Its ok to skip it in TSAN in order to // avoid a false positive. #ifndef __SANITIZE_THREAD__ if (disable_cache_) { return nullptr; } #endif std::unique_ptr handle; kept_in_sec_cache = false; Cache::Handle* lru_handle = cache_->Lookup(key); if (lru_handle == nullptr) { return nullptr; } void* handle_value = cache_->Value(lru_handle); if (handle_value == nullptr) { cache_->Release(lru_handle, /*erase_if_last_ref=*/false); RecordTick(stats, COMPRESSED_SECONDARY_CACHE_DUMMY_HITS); return nullptr; } CacheAllocationPtr* ptr{nullptr}; CacheAllocationPtr merged_value; size_t handle_value_charge{0}; const char* data_ptr = nullptr; CacheTier source = CacheTier::kVolatileCompressedTier; CompressionType type = cache_options_.compression_type; if (cache_options_.enable_custom_split_merge) { CacheValueChunk* value_chunk_ptr = reinterpret_cast(handle_value); merged_value = MergeChunksIntoValue(value_chunk_ptr, handle_value_charge); ptr = &merged_value; data_ptr = ptr->get(); } else { uint32_t type_32 = static_cast(type); uint32_t source_32 = static_cast(source); ptr = reinterpret_cast(handle_value); handle_value_charge = cache_->GetCharge(lru_handle); data_ptr = ptr->get(); data_ptr = GetVarint32Ptr(data_ptr, data_ptr + 1, static_cast(&type_32)); type = static_cast(type_32); data_ptr = GetVarint32Ptr(data_ptr, data_ptr + 1, static_cast(&source_32)); source = static_cast(source_32); handle_value_charge -= (data_ptr - ptr->get()); } MemoryAllocator* allocator = cache_options_.memory_allocator.get(); Status s; Cache::ObjectPtr value{nullptr}; size_t charge{0}; if (source == CacheTier::kVolatileCompressedTier) { if (cache_options_.compression_type == kNoCompression || cache_options_.do_not_compress_roles.Contains(helper->role)) { s = helper->create_cb(Slice(data_ptr, handle_value_charge), kNoCompression, CacheTier::kVolatileTier, create_context, allocator, &value, &charge); } else { UncompressionContext uncompression_context( cache_options_.compression_type); UncompressionInfo uncompression_info(uncompression_context, UncompressionDict::GetEmptyDict(), cache_options_.compression_type); size_t uncompressed_size{0}; CacheAllocationPtr uncompressed = UncompressData(uncompression_info, (char*)data_ptr, handle_value_charge, &uncompressed_size, cache_options_.compress_format_version, allocator); if (!uncompressed) { cache_->Release(lru_handle, /*erase_if_last_ref=*/true); return nullptr; } s = helper->create_cb(Slice(uncompressed.get(), uncompressed_size), kNoCompression, CacheTier::kVolatileTier, create_context, allocator, &value, &charge); } } else { // The item was not compressed by us. Let the helper create_cb // uncompress it s = helper->create_cb(Slice(data_ptr, handle_value_charge), type, source, create_context, allocator, &value, &charge); } if (!s.ok()) { cache_->Release(lru_handle, /*erase_if_last_ref=*/true); return nullptr; } if (advise_erase) { cache_->Release(lru_handle, /*erase_if_last_ref=*/true); // Insert a dummy handle. cache_ ->Insert(key, /*obj=*/nullptr, GetHelper(cache_options_.enable_custom_split_merge), /*charge=*/0) .PermitUncheckedError(); } else { kept_in_sec_cache = true; cache_->Release(lru_handle, /*erase_if_last_ref=*/false); } handle.reset(new CompressedSecondaryCacheResultHandle(value, charge)); RecordTick(stats, COMPRESSED_SECONDARY_CACHE_HITS); return handle; } bool CompressedSecondaryCache::MaybeInsertDummy(const Slice& key) { auto internal_helper = GetHelper(cache_options_.enable_custom_split_merge); Cache::Handle* lru_handle = cache_->Lookup(key); if (lru_handle == nullptr) { PERF_COUNTER_ADD(compressed_sec_cache_insert_dummy_count, 1); // Insert a dummy handle if the handle is evicted for the first time. cache_->Insert(key, /*obj=*/nullptr, internal_helper, /*charge=*/0) .PermitUncheckedError(); return true; } else { cache_->Release(lru_handle, /*erase_if_last_ref=*/false); } return false; } Status CompressedSecondaryCache::InsertInternal( const Slice& key, Cache::ObjectPtr value, const Cache::CacheItemHelper* helper, CompressionType type, CacheTier source) { if (source != CacheTier::kVolatileCompressedTier && cache_options_.enable_custom_split_merge) { // We don't support custom split/merge for the tiered case return Status::OK(); } auto internal_helper = GetHelper(cache_options_.enable_custom_split_merge); char header[10]; char* payload = header; payload = EncodeVarint32(payload, static_cast(type)); payload = EncodeVarint32(payload, static_cast(source)); size_t header_size = payload - header; size_t data_size = (*helper->size_cb)(value); size_t total_size = data_size + header_size; CacheAllocationPtr ptr = AllocateBlock(total_size, cache_options_.memory_allocator.get()); char* data_ptr = ptr.get() + header_size; Status s = (*helper->saveto_cb)(value, 0, data_size, data_ptr); if (!s.ok()) { return s; } Slice val(data_ptr, data_size); std::string compressed_val; if (cache_options_.compression_type != kNoCompression && type == kNoCompression && !cache_options_.do_not_compress_roles.Contains(helper->role)) { PERF_COUNTER_ADD(compressed_sec_cache_uncompressed_bytes, data_size); CompressionContext compression_context(cache_options_.compression_type, cache_options_.compression_opts); uint64_t sample_for_compression{0}; CompressionInfo compression_info( cache_options_.compression_opts, compression_context, CompressionDict::GetEmptyDict(), cache_options_.compression_type, sample_for_compression); bool success = CompressData(val, compression_info, cache_options_.compress_format_version, &compressed_val); if (!success) { return Status::Corruption("Error compressing value."); } val = Slice(compressed_val); data_size = compressed_val.size(); total_size = header_size + data_size; PERF_COUNTER_ADD(compressed_sec_cache_compressed_bytes, data_size); if (!cache_options_.enable_custom_split_merge) { ptr = AllocateBlock(total_size, cache_options_.memory_allocator.get()); data_ptr = ptr.get() + header_size; memcpy(data_ptr, compressed_val.data(), data_size); } } PERF_COUNTER_ADD(compressed_sec_cache_insert_real_count, 1); if (cache_options_.enable_custom_split_merge) { size_t charge{0}; CacheValueChunk* value_chunks_head = SplitValueIntoChunks(val, cache_options_.compression_type, charge); return cache_->Insert(key, value_chunks_head, internal_helper, charge); } else { std::memcpy(ptr.get(), header, header_size); CacheAllocationPtr* buf = new CacheAllocationPtr(std::move(ptr)); return cache_->Insert(key, buf, internal_helper, total_size); } } Status CompressedSecondaryCache::Insert(const Slice& key, Cache::ObjectPtr value, const Cache::CacheItemHelper* helper, bool force_insert) { if (value == nullptr) { return Status::InvalidArgument(); } if (!force_insert && MaybeInsertDummy(key)) { return Status::OK(); } return InsertInternal(key, value, helper, kNoCompression, CacheTier::kVolatileCompressedTier); } Status CompressedSecondaryCache::InsertSaved( const Slice& key, const Slice& saved, CompressionType type = kNoCompression, CacheTier source = CacheTier::kVolatileTier) { if (type == kNoCompression) { return Status::OK(); } auto slice_helper = &kSliceCacheItemHelper; if (MaybeInsertDummy(key)) { return Status::OK(); } return InsertInternal( key, static_cast(const_cast(&saved)), slice_helper, type, source); } void CompressedSecondaryCache::Erase(const Slice& key) { cache_->Erase(key); } Status CompressedSecondaryCache::SetCapacity(size_t capacity) { MutexLock l(&capacity_mutex_); cache_options_.capacity = capacity; cache_->SetCapacity(capacity); disable_cache_ = capacity == 0; return Status::OK(); } Status CompressedSecondaryCache::GetCapacity(size_t& capacity) { MutexLock l(&capacity_mutex_); capacity = cache_options_.capacity; return Status::OK(); } std::string CompressedSecondaryCache::GetPrintableOptions() const { std::string ret; ret.reserve(20000); const int kBufferSize{200}; char buffer[kBufferSize]; ret.append(cache_->GetPrintableOptions()); snprintf(buffer, kBufferSize, " compression_type : %s\n", CompressionTypeToString(cache_options_.compression_type).c_str()); ret.append(buffer); snprintf(buffer, kBufferSize, " compression_opts : %s\n", CompressionOptionsToString( const_cast(cache_options_.compression_opts)) .c_str()); ret.append(buffer); snprintf(buffer, kBufferSize, " compress_format_version : %d\n", cache_options_.compress_format_version); ret.append(buffer); return ret; } CompressedSecondaryCache::CacheValueChunk* CompressedSecondaryCache::SplitValueIntoChunks(const Slice& value, CompressionType compression_type, size_t& charge) { assert(!value.empty()); const char* src_ptr = value.data(); size_t src_size{value.size()}; CacheValueChunk dummy_head = CacheValueChunk(); CacheValueChunk* current_chunk = &dummy_head; // Do not split when value size is large or there is no compression. size_t predicted_chunk_size{0}; size_t actual_chunk_size{0}; size_t tmp_size{0}; while (src_size > 0) { predicted_chunk_size = sizeof(CacheValueChunk) - 1 + src_size; auto upper = std::upper_bound(malloc_bin_sizes_.begin(), malloc_bin_sizes_.end(), predicted_chunk_size); // Do not split when value size is too small, too large, close to a bin // size, or there is no compression. if (upper == malloc_bin_sizes_.begin() || upper == malloc_bin_sizes_.end() || *upper - predicted_chunk_size < malloc_bin_sizes_.front() || compression_type == kNoCompression) { tmp_size = predicted_chunk_size; } else { tmp_size = *(--upper); } CacheValueChunk* new_chunk = reinterpret_cast(new char[tmp_size]); current_chunk->next = new_chunk; current_chunk = current_chunk->next; actual_chunk_size = tmp_size - sizeof(CacheValueChunk) + 1; memcpy(current_chunk->data, src_ptr, actual_chunk_size); current_chunk->size = actual_chunk_size; src_ptr += actual_chunk_size; src_size -= actual_chunk_size; charge += tmp_size; } current_chunk->next = nullptr; return dummy_head.next; } CacheAllocationPtr CompressedSecondaryCache::MergeChunksIntoValue( const void* chunks_head, size_t& charge) { const CacheValueChunk* head = reinterpret_cast(chunks_head); const CacheValueChunk* current_chunk = head; charge = 0; while (current_chunk != nullptr) { charge += current_chunk->size; current_chunk = current_chunk->next; } CacheAllocationPtr ptr = AllocateBlock(charge, cache_options_.memory_allocator.get()); current_chunk = head; size_t pos{0}; while (current_chunk != nullptr) { memcpy(ptr.get() + pos, current_chunk->data, current_chunk->size); pos += current_chunk->size; current_chunk = current_chunk->next; } return ptr; } const Cache::CacheItemHelper* CompressedSecondaryCache::GetHelper( bool enable_custom_split_merge) const { if (enable_custom_split_merge) { static const Cache::CacheItemHelper kHelper{ CacheEntryRole::kMisc, [](Cache::ObjectPtr obj, MemoryAllocator* /*alloc*/) { CacheValueChunk* chunks_head = static_cast(obj); while (chunks_head != nullptr) { CacheValueChunk* tmp_chunk = chunks_head; chunks_head = chunks_head->next; tmp_chunk->Free(); obj = nullptr; }; }}; return &kHelper; } else { static const Cache::CacheItemHelper kHelper{ CacheEntryRole::kMisc, [](Cache::ObjectPtr obj, MemoryAllocator* /*alloc*/) { delete static_cast(obj); obj = nullptr; }}; return &kHelper; } } std::shared_ptr CompressedSecondaryCacheOptions::MakeSharedSecondaryCache() const { return std::make_shared(*this); } Status CompressedSecondaryCache::Deflate(size_t decrease) { return cache_res_mgr_->UpdateCacheReservation(decrease, /*increase=*/true); } Status CompressedSecondaryCache::Inflate(size_t increase) { return cache_res_mgr_->UpdateCacheReservation(increase, /*increase=*/false); } } // namespace ROCKSDB_NAMESPACE