// Copyright (c) Meta Platforms, Inc. and affiliates. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #include #include "util/cast_util.h" #include "utilities/transactions/transaction_test.h" namespace ROCKSDB_NAMESPACE { INSTANTIATE_TEST_CASE_P( Unsupported, TimestampedSnapshotWithTsSanityCheck, ::testing::Values( std::make_tuple(false, false, WRITE_PREPARED, kOrderedWrite), std::make_tuple(false, true, WRITE_PREPARED, kUnorderedWrite), std::make_tuple(false, false, WRITE_UNPREPARED, kOrderedWrite))); INSTANTIATE_TEST_CASE_P(WriteCommitted, TransactionTest, ::testing::Combine(::testing::Bool(), ::testing::Bool(), ::testing::Values(WRITE_COMMITTED), ::testing::Values(kOrderedWrite))); namespace { // Not thread-safe. Caller needs to provide external synchronization. class TsCheckingTxnNotifier : public TransactionNotifier { public: explicit TsCheckingTxnNotifier() = default; ~TsCheckingTxnNotifier() override = default; void SnapshotCreated(const Snapshot* new_snapshot) override { assert(new_snapshot); if (prev_snapshot_seq_ != kMaxSequenceNumber) { assert(prev_snapshot_seq_ <= new_snapshot->GetSequenceNumber()); } prev_snapshot_seq_ = new_snapshot->GetSequenceNumber(); if (prev_snapshot_ts_ != kMaxTxnTimestamp) { assert(prev_snapshot_ts_ <= new_snapshot->GetTimestamp()); } prev_snapshot_ts_ = new_snapshot->GetTimestamp(); } TxnTimestamp prev_snapshot_ts() const { return prev_snapshot_ts_; } private: SequenceNumber prev_snapshot_seq_ = kMaxSequenceNumber; TxnTimestamp prev_snapshot_ts_ = kMaxTxnTimestamp; }; } // anonymous namespace TEST_P(TimestampedSnapshotWithTsSanityCheck, WithoutCommitTs) { std::unique_ptr txn( db->BeginTransaction(WriteOptions(), TransactionOptions())); assert(txn); ASSERT_OK(txn->SetName("txn0")); ASSERT_OK(txn->Put("a", "v")); ASSERT_OK(txn->Prepare()); Status s = txn->CommitAndTryCreateSnapshot(); ASSERT_TRUE(s.IsInvalidArgument()); ASSERT_OK(txn->Rollback()); txn.reset(db->BeginTransaction(WriteOptions(), TransactionOptions())); assert(txn); ASSERT_OK(txn->SetName("txn0")); ASSERT_OK(txn->Put("a", "v")); s = txn->CommitAndTryCreateSnapshot(); ASSERT_TRUE(s.IsInvalidArgument()); } TEST_P(TimestampedSnapshotWithTsSanityCheck, SetCommitTs) { std::unique_ptr txn( db->BeginTransaction(WriteOptions(), TransactionOptions())); assert(txn); ASSERT_OK(txn->SetName("txn0")); ASSERT_OK(txn->Put("a", "v")); ASSERT_OK(txn->Prepare()); std::shared_ptr snapshot; Status s = txn->CommitAndTryCreateSnapshot(nullptr, 10, &snapshot); ASSERT_TRUE(s.IsNotSupported()); ASSERT_OK(txn->Rollback()); txn.reset(db->BeginTransaction(WriteOptions(), TransactionOptions())); assert(txn); ASSERT_OK(txn->SetName("txn0")); ASSERT_OK(txn->Put("a", "v")); s = txn->CommitAndTryCreateSnapshot(nullptr, 10, &snapshot); ASSERT_TRUE(s.IsNotSupported()); } TEST_P(TransactionTest, WithoutCommitTs) { std::unique_ptr txn( db->BeginTransaction(WriteOptions(), TransactionOptions())); assert(txn); ASSERT_OK(txn->SetName("txn0")); ASSERT_OK(txn->Put("a", "v")); ASSERT_OK(txn->Prepare()); Status s = txn->CommitAndTryCreateSnapshot(); ASSERT_TRUE(s.IsInvalidArgument()); ASSERT_OK(txn->Rollback()); txn.reset(db->BeginTransaction(WriteOptions(), TransactionOptions())); assert(txn); ASSERT_OK(txn->SetName("txn0")); ASSERT_OK(txn->Put("a", "v")); s = txn->CommitAndTryCreateSnapshot(); ASSERT_TRUE(s.IsInvalidArgument()); } TEST_P(TransactionTest, ReuseExistingTxn) { Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions()); assert(txn); ASSERT_OK(txn->SetName("txn0")); ASSERT_OK(txn->Put("a", "v1")); ASSERT_OK(txn->Prepare()); auto notifier = std::make_shared(); std::shared_ptr snapshot1; Status s = txn->CommitAndTryCreateSnapshot(notifier, /*commit_ts=*/100, &snapshot1); ASSERT_OK(s); ASSERT_EQ(100, snapshot1->GetTimestamp()); Transaction* txn1 = db->BeginTransaction(WriteOptions(), TransactionOptions(), txn); assert(txn1 == txn); ASSERT_OK(txn1->SetName("txn1")); ASSERT_OK(txn->Put("a", "v2")); ASSERT_OK(txn->Prepare()); std::shared_ptr snapshot2; s = txn->CommitAndTryCreateSnapshot(notifier, /*commit_ts=*/110, &snapshot2); ASSERT_OK(s); ASSERT_EQ(110, snapshot2->GetTimestamp()); delete txn; { std::string value; ReadOptions read_opts; read_opts.snapshot = snapshot1.get(); ASSERT_OK(db->Get(read_opts, "a", &value)); ASSERT_EQ("v1", value); read_opts.snapshot = snapshot2.get(); ASSERT_OK(db->Get(read_opts, "a", &value)); ASSERT_EQ("v2", value); } } TEST_P(TransactionTest, CreateSnapshotWhenCommit) { std::unique_ptr txn( db->BeginTransaction(WriteOptions(), TransactionOptions())); assert(txn); constexpr int batch_size = 10; for (int i = 0; i < batch_size; ++i) { ASSERT_OK(db->Put(WriteOptions(), "k" + std::to_string(i), "v0")); } const SequenceNumber seq0 = db->GetLatestSequenceNumber(); ASSERT_EQ(static_cast(batch_size), seq0); txn->SetSnapshot(); { const Snapshot* const snapshot = txn->GetSnapshot(); assert(snapshot); ASSERT_EQ(seq0, snapshot->GetSequenceNumber()); } for (int i = 0; i < batch_size; ++i) { ASSERT_OK(txn->Put("k" + std::to_string(i), "v1")); } ASSERT_OK(txn->SetName("txn0")); ASSERT_OK(txn->Prepare()); std::shared_ptr snapshot; constexpr TxnTimestamp timestamp = 1; auto notifier = std::make_shared(); Status s = txn->CommitAndTryCreateSnapshot(notifier, timestamp, &snapshot); ASSERT_OK(s); ASSERT_LT(notifier->prev_snapshot_ts(), kMaxTxnTimestamp); assert(snapshot); ASSERT_EQ(timestamp, snapshot->GetTimestamp()); ASSERT_EQ(seq0 + batch_size, snapshot->GetSequenceNumber()); const Snapshot* const raw_snapshot_ptr = txn->GetSnapshot(); ASSERT_EQ(raw_snapshot_ptr, snapshot.get()); ASSERT_EQ(snapshot, txn->GetTimestampedSnapshot()); { std::shared_ptr snapshot1 = db->GetLatestTimestampedSnapshot(); ASSERT_EQ(snapshot, snapshot1); } { std::shared_ptr snapshot1 = db->GetTimestampedSnapshot(timestamp); ASSERT_EQ(snapshot, snapshot1); } { std::vector > snapshots; s = db->GetAllTimestampedSnapshots(snapshots); ASSERT_OK(s); ASSERT_EQ(std::vector >{snapshot}, snapshots); } } TEST_P(TransactionTest, CreateSnapshot) { // First create a non-timestamped snapshot ManagedSnapshot snapshot_guard(db); for (int i = 0; i < 10; ++i) { ASSERT_OK(db->Put(WriteOptions(), "k" + std::to_string(i), "v0_" + std::to_string(i))); } { auto ret = db->CreateTimestampedSnapshot(kMaxTxnTimestamp); ASSERT_TRUE(ret.first.IsInvalidArgument()); auto snapshot = ret.second; ASSERT_EQ(nullptr, snapshot.get()); } constexpr TxnTimestamp timestamp = 100; Status s; std::shared_ptr ts_snap0; std::tie(s, ts_snap0) = db->CreateTimestampedSnapshot(timestamp); ASSERT_OK(s); assert(ts_snap0); ASSERT_EQ(timestamp, ts_snap0->GetTimestamp()); for (int i = 0; i < 10; ++i) { ASSERT_OK(db->Delete(WriteOptions(), "k" + std::to_string(i))); } { ReadOptions read_opts; read_opts.snapshot = ts_snap0.get(); for (int i = 0; i < 10; ++i) { std::string value; s = db->Get(read_opts, "k" + std::to_string(i), &value); ASSERT_OK(s); ASSERT_EQ("v0_" + std::to_string(i), value); } } { std::shared_ptr snapshot = db->GetLatestTimestampedSnapshot(); ASSERT_EQ(ts_snap0, snapshot); } { std::shared_ptr snapshot = db->GetTimestampedSnapshot(timestamp); ASSERT_OK(s); ASSERT_EQ(ts_snap0, snapshot); } { std::vector > snapshots; s = db->GetAllTimestampedSnapshots(snapshots); ASSERT_OK(s); ASSERT_EQ(std::vector >{ts_snap0}, snapshots); } } TEST_P(TransactionTest, SequenceAndTsOrder) { Status s; std::shared_ptr snapshot; std::tie(s, snapshot) = db->CreateTimestampedSnapshot(100); ASSERT_OK(s); assert(snapshot); { // Cannot request smaller timestamp for the new timestamped snapshot. std::shared_ptr tmp_snapshot; std::tie(s, tmp_snapshot) = db->CreateTimestampedSnapshot(50); ASSERT_TRUE(s.IsInvalidArgument()); ASSERT_EQ(nullptr, tmp_snapshot.get()); } // If requesting a new timestamped snapshot with the same timestamp and // sequence number, we avoid creating new snapshot object but reuse // exisisting one. std::shared_ptr snapshot1; std::tie(s, snapshot1) = db->CreateTimestampedSnapshot(100); ASSERT_OK(s); ASSERT_EQ(snapshot.get(), snapshot1.get()); // If there is no write, but we request a larger timestamp, we still create // a new snapshot object. std::shared_ptr snapshot2; std::tie(s, snapshot2) = db->CreateTimestampedSnapshot(200); ASSERT_OK(s); assert(snapshot2); ASSERT_NE(snapshot.get(), snapshot2.get()); ASSERT_EQ(snapshot2->GetSequenceNumber(), snapshot->GetSequenceNumber()); ASSERT_EQ(200, snapshot2->GetTimestamp()); // Increase sequence number. ASSERT_OK(db->Put(WriteOptions(), "foo", "v0")); { // We are requesting the same timestamp for a larger sequence number, thus // we cannot create timestamped snapshot. std::shared_ptr tmp_snapshot; std::tie(s, tmp_snapshot) = db->CreateTimestampedSnapshot(200); ASSERT_TRUE(s.IsInvalidArgument()); ASSERT_EQ(nullptr, tmp_snapshot.get()); } { std::unique_ptr txn1( db->BeginTransaction(WriteOptions(), TransactionOptions())); ASSERT_OK(txn1->Put("bar", "v0")); std::shared_ptr ss; ASSERT_OK(txn1->CommitAndTryCreateSnapshot(nullptr, 200, &ss)); // Cannot create snapshot because requested timestamp is the same as the // latest timestamped snapshot while sequence number is strictly higher. ASSERT_EQ(nullptr, ss); } { std::unique_ptr txn2( db->BeginTransaction(WriteOptions(), TransactionOptions())); ASSERT_OK(txn2->Put("bar", "v0")); std::shared_ptr ss; // Application should never do this. This is just to demonstrate error // handling. ASSERT_OK(txn2->CommitAndTryCreateSnapshot(nullptr, 100, &ss)); // Cannot create snapshot because requested timestamp is smaller than // latest timestamped snapshot. ASSERT_EQ(nullptr, ss); } } TEST_P(TransactionTest, CloseDbWithSnapshots) { std::unique_ptr txn( db->BeginTransaction(WriteOptions(), TransactionOptions())); ASSERT_OK(txn->SetName("txn0")); ASSERT_OK(txn->Put("foo", "v")); ASSERT_OK(txn->Prepare()); std::shared_ptr snapshot; constexpr TxnTimestamp timestamp = 121; auto notifier = std::make_shared(); ASSERT_OK(txn->CommitAndTryCreateSnapshot(notifier, timestamp, &snapshot)); assert(snapshot); ASSERT_LT(notifier->prev_snapshot_ts(), kMaxTxnTimestamp); ASSERT_EQ(timestamp, snapshot->GetTimestamp()); ASSERT_TRUE(db->Close().IsAborted()); } TEST_P(TransactionTest, MultipleTimestampedSnapshots) { auto* dbimpl = static_cast_with_check(db->GetRootDB()); assert(dbimpl); const bool seq_per_batch = dbimpl->seq_per_batch(); // TODO: remove the following assert(!seq_per_batch) once timestamped snapshot // is supported in write-prepared/write-unprepared transactions. assert(!seq_per_batch); constexpr size_t txn_size = 10; constexpr TxnTimestamp ts_delta = 10; constexpr size_t num_txns = 100; std::vector > snapshots(num_txns); constexpr TxnTimestamp start_ts = 10000; auto notifier = std::make_shared(); for (size_t i = 0; i < num_txns; ++i) { std::unique_ptr txn( db->BeginTransaction(WriteOptions(), TransactionOptions())); ASSERT_OK(txn->SetName("txn" + std::to_string(i))); for (size_t j = 0; j < txn_size; ++j) { ASSERT_OK(txn->Put("k" + std::to_string(j), "v" + std::to_string(j) + "_" + std::to_string(i))); } if (0 == (i % 2)) { ASSERT_OK(txn->Prepare()); } ASSERT_OK(txn->CommitAndTryCreateSnapshot(notifier, start_ts + i * ts_delta, &snapshots[i])); assert(snapshots[i]); ASSERT_LT(notifier->prev_snapshot_ts(), kMaxTxnTimestamp); ASSERT_EQ(start_ts + i * ts_delta, snapshots[i]->GetTimestamp()); } { auto snapshot = db->GetTimestampedSnapshot(start_ts + 1); ASSERT_EQ(nullptr, snapshot); } constexpr TxnTimestamp max_ts = start_ts + num_txns * ts_delta; for (size_t i = 0; i < num_txns; ++i) { auto snapshot = db->GetTimestampedSnapshot(start_ts + i * ts_delta); ASSERT_EQ(snapshots[i], snapshot); std::vector > tmp_snapshots; Status s = db->GetTimestampedSnapshots(max_ts, start_ts + i * ts_delta, tmp_snapshots); ASSERT_TRUE(s.IsInvalidArgument()); ASSERT_TRUE(tmp_snapshots.empty()); for (size_t j = i; j < num_txns; ++j) { std::vector > expected_snapshots( snapshots.begin() + i, snapshots.begin() + j); tmp_snapshots.clear(); s = db->GetTimestampedSnapshots(start_ts + i * ts_delta, start_ts + j * ts_delta, tmp_snapshots); if (i < j) { ASSERT_OK(s); } else { ASSERT_TRUE(s.IsInvalidArgument()); } ASSERT_EQ(expected_snapshots, tmp_snapshots); } } { std::vector > tmp_snapshots; const Status s = db->GetAllTimestampedSnapshots(tmp_snapshots); ASSERT_OK(s); ASSERT_EQ(snapshots, tmp_snapshots); const std::shared_ptr latest_snapshot = db->GetLatestTimestampedSnapshot(); ASSERT_EQ(snapshots.back(), latest_snapshot); } for (size_t i = 0; i <= num_txns; ++i) { std::vector > snapshots1( snapshots.begin() + i, snapshots.end()); if (i > 0) { auto snapshot1 = db->GetTimestampedSnapshot(start_ts + (i - 1) * ts_delta); assert(snapshot1); ASSERT_EQ(start_ts + (i - 1) * ts_delta, snapshot1->GetTimestamp()); } db->ReleaseTimestampedSnapshotsOlderThan(start_ts + i * ts_delta); if (i > 0) { auto snapshot1 = db->GetTimestampedSnapshot(start_ts + (i - 1) * ts_delta); ASSERT_EQ(nullptr, snapshot1); } std::vector > tmp_snapshots; const Status s = db->GetAllTimestampedSnapshots(tmp_snapshots); ASSERT_OK(s); ASSERT_EQ(snapshots1, tmp_snapshots); } // Even after released by db, the applications still hold reference to shared // snapshots. for (size_t i = 0; i < num_txns; ++i) { assert(snapshots[i]); ASSERT_EQ(start_ts + i * ts_delta, snapshots[i]->GetTimestamp()); } snapshots.clear(); ASSERT_OK(db->Close()); delete db; db = nullptr; } } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }