// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #ifndef ROCKSDB_LITE #ifndef __STDC_FORMAT_MACROS #define __STDC_FORMAT_MACROS #endif #include "util/transaction_test_util.h" #include #include #include "rocksdb/db.h" #include "rocksdb/utilities/optimistic_transaction_db.h" #include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/transaction_db.h" #include "util/random.h" #include "util/string_util.h" namespace rocksdb { RandomTransactionInserter::RandomTransactionInserter( Random64* rand, const WriteOptions& write_options, const ReadOptions& read_options, uint64_t num_keys, uint16_t num_sets) : rand_(rand), write_options_(write_options), read_options_(read_options), num_keys_(num_keys), num_sets_(num_sets) {} RandomTransactionInserter::~RandomTransactionInserter() { if (txn_ != nullptr) { delete txn_; } if (optimistic_txn_ != nullptr) { delete optimistic_txn_; } } bool RandomTransactionInserter::TransactionDBInsert( TransactionDB* db, const TransactionOptions& txn_options) { txn_ = db->BeginTransaction(write_options_, txn_options, txn_); return DoInsert(nullptr, txn_, false); } bool RandomTransactionInserter::OptimisticTransactionDBInsert( OptimisticTransactionDB* db, const OptimisticTransactionOptions& txn_options) { optimistic_txn_ = db->BeginTransaction(write_options_, txn_options, optimistic_txn_); return DoInsert(nullptr, optimistic_txn_, true); } bool RandomTransactionInserter::DBInsert(DB* db) { return DoInsert(db, nullptr, false); } bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn, bool is_optimistic) { Status s; WriteBatch batch; std::string value; // pick a random number to use to increment a key in each set uint64_t incr = (rand_->Next() % 100) + 1; bool unexpected_error = false; // For each set, pick a key at random and increment it for (uint8_t i = 0; i < num_sets_; i++) { uint64_t int_value = 0; char prefix_buf[5]; // prefix_buf needs to be large enough to hold a uint16 in string form // key format: [SET#][random#] std::string rand_key = ToString(rand_->Next() % num_keys_); Slice base_key(rand_key); // Pad prefix appropriately so we can iterate over each set snprintf(prefix_buf, sizeof(prefix_buf), "%.4u", i + 1); std::string full_key = std::string(prefix_buf) + base_key.ToString(); Slice key(full_key); if (txn != nullptr) { s = txn->GetForUpdate(read_options_, key, &value); } else { s = db->Get(read_options_, key, &value); } if (s.ok()) { // Found key, parse its value int_value = std::stoull(value); if (int_value == 0 || int_value == ULONG_MAX) { unexpected_error = true; fprintf(stderr, "Get returned unexpected value: %s\n", value.c_str()); s = Status::Corruption(); } } else if (s.IsNotFound()) { // Have not yet written to this key, so assume its value is 0 int_value = 0; s = Status::OK(); } else { // Optimistic transactions should never return non-ok status here. // Non-optimistic transactions may return write-coflict/timeout errors. if (is_optimistic || !(s.IsBusy() || s.IsTimedOut() || s.IsTryAgain())) { fprintf(stderr, "Get returned an unexpected error: %s\n", s.ToString().c_str()); unexpected_error = true; } break; } if (s.ok()) { // Increment key std::string sum = ToString(int_value + incr); if (txn != nullptr) { s = txn->Put(key, sum); if (!s.ok()) { // Since we did a GetForUpdate, Put should not fail. fprintf(stderr, "Put returned an unexpected error: %s\n", s.ToString().c_str()); unexpected_error = true; } } else { batch.Put(key, sum); } } } if (s.ok()) { if (txn != nullptr) { s = txn->Commit(); if (!s.ok()) { if (is_optimistic) { // Optimistic transactions can have write-conflict errors on commit. // Any other error is unexpected. if (!(s.IsBusy() || s.IsTimedOut() || s.IsTryAgain())) { unexpected_error = true; } } else { // Non-optimistic transactions should only fail due to expiration // or write failures. For testing purproses, we do not expect any // write failures. if (!s.IsExpired()) { unexpected_error = true; } } if (unexpected_error) { fprintf(stderr, "Commit returned an unexpected error: %s\n", s.ToString().c_str()); } } } else { s = db->Write(write_options_, &batch); if (!s.ok()) { unexpected_error = true; fprintf(stderr, "Write returned an unexpected error: %s\n", s.ToString().c_str()); } } } else { if (txn != nullptr) { txn->Rollback(); } } if (s.ok()) { success_count_++; } else { failure_count_++; } last_status_ = s; // return success if we didn't get any unexpected errors return !unexpected_error; } Status RandomTransactionInserter::Verify(DB* db, uint16_t num_sets) { uint64_t prev_total = 0; // For each set of keys with the same prefix, sum all the values for (uint32_t i = 0; i < num_sets; i++) { char prefix_buf[6]; snprintf(prefix_buf, sizeof(prefix_buf), "%.4u", i + 1); uint64_t total = 0; Iterator* iter = db->NewIterator(ReadOptions()); for (iter->Seek(Slice(prefix_buf, 4)); iter->Valid(); iter->Next()) { Slice key = iter->key(); // stop when we reach a different prefix if (key.ToString().compare(0, 4, prefix_buf) != 0) { break; } Slice value = iter->value(); uint64_t int_value = std::stoull(value.ToString()); if (int_value == 0 || int_value == ULONG_MAX) { fprintf(stderr, "Iter returned unexpected value: %s\n", value.ToString().c_str()); return Status::Corruption(); } total += int_value; } delete iter; if (i > 0) { if (total != prev_total) { fprintf(stderr, "RandomTransactionVerify found inconsistent totals. " "Set[%" PRIu32 "]: %" PRIu64 ", Set[%" PRIu32 "]: %" PRIu64 " \n", i - 1, prev_total, i, total); return Status::Corruption(); } } prev_total = total; } return Status::OK(); } } // namespace rocksdb #endif // ROCKSDB_LITE