// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #include "utilities/cassandra/cassandra_compaction_filter.h" #include #include "rocksdb/slice.h" #include "rocksdb/utilities/object_registry.h" #include "rocksdb/utilities/options_type.h" #include "utilities/cassandra/format.h" #include "utilities/cassandra/merge_operator.h" namespace ROCKSDB_NAMESPACE::cassandra { static std::unordered_map cassandra_filter_type_info = { {"purge_ttl_on_expiration", {offsetof(struct CassandraOptions, purge_ttl_on_expiration), OptionType::kBoolean, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, {"gc_grace_period_in_seconds", {offsetof(struct CassandraOptions, gc_grace_period_in_seconds), OptionType::kUInt32T, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, }; CassandraCompactionFilter::CassandraCompactionFilter( bool purge_ttl_on_expiration, int32_t gc_grace_period_in_seconds) : options_(gc_grace_period_in_seconds, 0, purge_ttl_on_expiration) { RegisterOptions(&options_, &cassandra_filter_type_info); } CompactionFilter::Decision CassandraCompactionFilter::FilterV2( int /*level*/, const Slice& /*key*/, ValueType value_type, const Slice& existing_value, std::string* new_value, std::string* /*skip_until*/) const { bool value_changed = false; RowValue row_value = RowValue::Deserialize(existing_value.data(), existing_value.size()); RowValue compacted = options_.purge_ttl_on_expiration ? row_value.RemoveExpiredColumns(&value_changed) : row_value.ConvertExpiredColumnsToTombstones(&value_changed); if (value_type == ValueType::kValue) { compacted = compacted.RemoveTombstones(options_.gc_grace_period_in_seconds); } if (compacted.Empty()) { return Decision::kRemove; } if (value_changed) { compacted.Serialize(new_value); return Decision::kChangeValue; } return Decision::kKeep; } CassandraCompactionFilterFactory::CassandraCompactionFilterFactory( bool purge_ttl_on_expiration, int32_t gc_grace_period_in_seconds) : options_(gc_grace_period_in_seconds, 0, purge_ttl_on_expiration) { RegisterOptions(&options_, &cassandra_filter_type_info); } std::unique_ptr CassandraCompactionFilterFactory::CreateCompactionFilter( const CompactionFilter::Context&) { std::unique_ptr result(new CassandraCompactionFilter( options_.purge_ttl_on_expiration, options_.gc_grace_period_in_seconds)); return result; } int RegisterCassandraObjects(ObjectLibrary& library, const std::string& /*arg*/) { library.AddFactory( CassandraValueMergeOperator::kClassName(), [](const std::string& /*uri*/, std::unique_ptr* guard, std::string* /* errmsg */) { guard->reset(new CassandraValueMergeOperator(0)); return guard->get(); }); library.AddFactory( CassandraCompactionFilter::kClassName(), [](const std::string& /*uri*/, std::unique_ptr* /*guard */, std::string* /* errmsg */) { return new CassandraCompactionFilter(false, 0); }); library.AddFactory( CassandraCompactionFilterFactory::kClassName(), [](const std::string& /*uri*/, std::unique_ptr* guard, std::string* /* errmsg */) { guard->reset(new CassandraCompactionFilterFactory(false, 0)); return guard->get(); }); size_t num_types; return static_cast(library.GetFactoryCount(&num_types)); } } // namespace ROCKSDB_NAMESPACE::cassandra