// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #include "db/write_controller.h" #include <atomic> #include <cassert> #include <ratio> #include "rocksdb/env.h" namespace rocksdb { std::unique_ptr<WriteControllerToken> WriteController::GetStopToken() { ++total_stopped_; return std::unique_ptr<WriteControllerToken>(new StopWriteToken(this)); } std::unique_ptr<WriteControllerToken> WriteController::GetDelayToken( uint64_t write_rate) { total_delayed_++; // Reset counters. last_refill_time_ = 0; bytes_left_ = 0; set_delayed_write_rate(write_rate); return std::unique_ptr<WriteControllerToken>(new DelayWriteToken(this)); } std::unique_ptr<WriteControllerToken> WriteController::GetCompactionPressureToken() { ++total_compaction_pressure_; return std::unique_ptr<WriteControllerToken>( new CompactionPressureToken(this)); } bool WriteController::IsStopped() const { return total_stopped_.load(std::memory_order_relaxed) > 0; } // This is inside DB mutex, so we can't sleep and need to minimize // frequency to get time. // If it turns out to be a performance issue, we can redesign the thread // synchronization model here. // The function trust caller will sleep micros returned. uint64_t WriteController::GetDelay(Env* env, uint64_t num_bytes) { if (total_stopped_.load(std::memory_order_relaxed) > 0) { return 0; } if (total_delayed_.load(std::memory_order_relaxed) == 0) { return 0; } const uint64_t kMicrosPerSecond = 1000000; const uint64_t kRefillInterval = 1024U; if (bytes_left_ >= num_bytes) { bytes_left_ -= num_bytes; return 0; } // The frequency to get time inside DB mutex is less than one per refill // interval. auto time_now = NowMicrosMonotonic(env); uint64_t sleep_debt = 0; uint64_t time_since_last_refill = 0; if (last_refill_time_ != 0) { if (last_refill_time_ > time_now) { sleep_debt = last_refill_time_ - time_now; } else { time_since_last_refill = time_now - last_refill_time_; bytes_left_ += static_cast<uint64_t>(static_cast<double>(time_since_last_refill) / kMicrosPerSecond * delayed_write_rate_); if (time_since_last_refill >= kRefillInterval && bytes_left_ > num_bytes) { // If refill interval already passed and we have enough bytes // return without extra sleeping. last_refill_time_ = time_now; bytes_left_ -= num_bytes; return 0; } } } uint64_t single_refill_amount = delayed_write_rate_ * kRefillInterval / kMicrosPerSecond; if (bytes_left_ + single_refill_amount >= num_bytes) { // Wait until a refill interval // Never trigger expire for less than one refill interval to avoid to get // time. bytes_left_ = bytes_left_ + single_refill_amount - num_bytes; last_refill_time_ = time_now + kRefillInterval; return kRefillInterval + sleep_debt; } // Need to refill more than one interval. Need to sleep longer. Check // whether expiration will hit // Sleep just until `num_bytes` is allowed. uint64_t sleep_amount = static_cast<uint64_t>(num_bytes / static_cast<long double>(delayed_write_rate_) * kMicrosPerSecond) + sleep_debt; last_refill_time_ = time_now + sleep_amount; return sleep_amount; } uint64_t WriteController::NowMicrosMonotonic(Env* env) { return env->NowNanos() / std::milli::den; } StopWriteToken::~StopWriteToken() { assert(controller_->total_stopped_ >= 1); --controller_->total_stopped_; } DelayWriteToken::~DelayWriteToken() { controller_->total_delayed_--; assert(controller_->total_delayed_.load() >= 0); } CompactionPressureToken::~CompactionPressureToken() { controller_->total_compaction_pressure_--; assert(controller_->total_compaction_pressure_ >= 0); } } // namespace rocksdb