// ©2020 Cameron Desrochers. // Distributed under the simplified BSD license (see the license file that // should have come with this header). // Provides a C++11 implementation of a single-producer, single-consumer wait-free concurrent // circular buffer (fixed-size queue). #pragma once #include #include #include #include #include #include // Note that this implementation is fully modern C++11 (not compatible with old MSVC versions) // but we still include atomicops.h for its LightweightSemaphore implementation. #include "atomicops.h" #ifndef MOODYCAMEL_CACHE_LINE_SIZE #define MOODYCAMEL_CACHE_LINE_SIZE 64 #endif namespace moodycamel { template class BlockingReaderWriterCircularBuffer { public: typedef T value_type; public: explicit BlockingReaderWriterCircularBuffer(std::size_t capacity) : maxcap(capacity), mask(), rawData(), data(), slots_(new spsc_sema::LightweightSemaphore(static_cast(capacity))), items(new spsc_sema::LightweightSemaphore(0)), nextSlot(0), nextItem(0) { // Round capacity up to power of two to compute modulo mask. // Adapted from http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2 --capacity; capacity |= capacity >> 1; capacity |= capacity >> 2; capacity |= capacity >> 4; for (std::size_t i = 1; i < sizeof(std::size_t); i <<= 1) capacity |= capacity >> (i << 3); mask = capacity++; rawData = static_cast(std::malloc(capacity * sizeof(T) + std::alignment_of::value - 1)); data = align_for(rawData); } BlockingReaderWriterCircularBuffer(BlockingReaderWriterCircularBuffer&& other) : maxcap(0), mask(0), rawData(nullptr), data(nullptr), slots_(new spsc_sema::LightweightSemaphore(0)), items(new spsc_sema::LightweightSemaphore(0)), nextSlot(), nextItem() { swap(other); } BlockingReaderWriterCircularBuffer(BlockingReaderWriterCircularBuffer const&) = delete; // Note: The queue should not be accessed concurrently while it's // being deleted. It's up to the user to synchronize this. ~BlockingReaderWriterCircularBuffer() { for (std::size_t i = 0, n = items->availableApprox(); i != n; ++i) reinterpret_cast(data)[(nextItem + i) & mask].~T(); std::free(rawData); } BlockingReaderWriterCircularBuffer& operator=(BlockingReaderWriterCircularBuffer&& other) noexcept { swap(other); return *this; } BlockingReaderWriterCircularBuffer& operator=(BlockingReaderWriterCircularBuffer const&) = delete; // Swaps the contents of this buffer with the contents of another. // Not thread-safe. void swap(BlockingReaderWriterCircularBuffer& other) noexcept { std::swap(maxcap, other.maxcap); std::swap(mask, other.mask); std::swap(rawData, other.rawData); std::swap(data, other.data); std::swap(slots_, other.slots_); std::swap(items, other.items); std::swap(nextSlot, other.nextSlot); std::swap(nextItem, other.nextItem); } // Enqueues a single item (by copying it). // Fails if not enough room to enqueue. // Thread-safe when called by producer thread. // No exception guarantee (state will be corrupted) if constructor of T throws. bool try_enqueue(T const& item) { if (!slots_->tryWait()) return false; inner_enqueue(item); return true; } // Enqueues a single item (by moving it, if possible). // Fails if not enough room to enqueue. // Thread-safe when called by producer thread. // No exception guarantee (state will be corrupted) if constructor of T throws. bool try_enqueue(T&& item) { if (!slots_->tryWait()) return false; inner_enqueue(std::move(item)); return true; } // Blocks the current thread until there's enough space to enqueue the given item, // then enqueues it (via copy). // Thread-safe when called by producer thread. // No exception guarantee (state will be corrupted) if constructor of T throws. void wait_enqueue(T const& item) { while (!slots_->wait()); inner_enqueue(item); } // Blocks the current thread until there's enough space to enqueue the given item, // then enqueues it (via move, if possible). // Thread-safe when called by producer thread. // No exception guarantee (state will be corrupted) if constructor of T throws. void wait_enqueue(T&& item) { while (!slots_->wait()); inner_enqueue(std::move(item)); } // Blocks the current thread until there's enough space to enqueue the given item, // or the timeout expires. Returns false without enqueueing the item if the timeout // expires, otherwise enqueues the item (via copy) and returns true. // Thread-safe when called by producer thread. // No exception guarantee (state will be corrupted) if constructor of T throws. bool wait_enqueue_timed(T const& item, std::int64_t timeout_usecs) { if (!slots_->wait(timeout_usecs)) return false; inner_enqueue(item); return true; } // Blocks the current thread until there's enough space to enqueue the given item, // or the timeout expires. Returns false without enqueueing the item if the timeout // expires, otherwise enqueues the item (via move, if possible) and returns true. // Thread-safe when called by producer thread. // No exception guarantee (state will be corrupted) if constructor of T throws. bool wait_enqueue_timed(T&& item, std::int64_t timeout_usecs) { if (!slots_->wait(timeout_usecs)) return false; inner_enqueue(std::move(item)); return true; } // Blocks the current thread until there's enough space to enqueue the given item, // or the timeout expires. Returns false without enqueueing the item if the timeout // expires, otherwise enqueues the item (via copy) and returns true. // Thread-safe when called by producer thread. // No exception guarantee (state will be corrupted) if constructor of T throws. template inline bool wait_enqueue_timed(T const& item, std::chrono::duration const& timeout) { return wait_enqueue_timed(item, std::chrono::duration_cast(timeout).count()); } // Blocks the current thread until there's enough space to enqueue the given item, // or the timeout expires. Returns false without enqueueing the item if the timeout // expires, otherwise enqueues the item (via move, if possible) and returns true. // Thread-safe when called by producer thread. // No exception guarantee (state will be corrupted) if constructor of T throws. template inline bool wait_enqueue_timed(T&& item, std::chrono::duration const& timeout) { return wait_enqueue_timed(std::move(item), std::chrono::duration_cast(timeout).count()); } // Attempts to dequeue a single item. // Returns false if the buffer is empty. // Thread-safe when called by consumer thread. // No exception guarantee (state will be corrupted) if assignment operator of U throws. template bool try_dequeue(U& item) { if (!items->tryWait()) return false; inner_dequeue(item); return true; } // Blocks the current thread until there's something to dequeue, then dequeues it. // Thread-safe when called by consumer thread. // No exception guarantee (state will be corrupted) if assignment operator of U throws. template void wait_dequeue(U& item) { while (!items->wait()); inner_dequeue(item); } // Blocks the current thread until either there's something to dequeue // or the timeout expires. Returns false without setting `item` if the // timeout expires, otherwise assigns to `item` and returns true. // Thread-safe when called by consumer thread. // No exception guarantee (state will be corrupted) if assignment operator of U throws. template bool wait_dequeue_timed(U& item, std::int64_t timeout_usecs) { if (!items->wait(timeout_usecs)) return false; inner_dequeue(item); return true; } // Blocks the current thread until either there's something to dequeue // or the timeout expires. Returns false without setting `item` if the // timeout expires, otherwise assigns to `item` and returns true. // Thread-safe when called by consumer thread. // No exception guarantee (state will be corrupted) if assignment operator of U throws. template inline bool wait_dequeue_timed(U& item, std::chrono::duration const& timeout) { return wait_dequeue_timed(item, std::chrono::duration_cast(timeout).count()); } // Returns a (possibly outdated) snapshot of the total number of elements currently in the buffer. // Thread-safe. inline std::size_t size_approx() const { return items->availableApprox(); } // Returns the maximum number of elements that this circular buffer can hold at once. // Thread-safe. inline std::size_t max_capacity() const { return maxcap; } private: template void inner_enqueue(U&& item) { std::size_t i = nextSlot++; new (reinterpret_cast(data) + (i & mask)) T(std::forward(item)); items->signal(); } template void inner_dequeue(U& item) { std::size_t i = nextItem++; T& element = reinterpret_cast(data)[i & mask]; item = std::move(element); element.~T(); slots_->signal(); } template static inline char* align_for(char* ptr) { const std::size_t alignment = std::alignment_of::value; return ptr + (alignment - (reinterpret_cast(ptr) % alignment)) % alignment; } private: std::size_t maxcap; // actual (non-power-of-two) capacity std::size_t mask; // circular buffer capacity mask (for cheap modulo) char* rawData; // raw circular buffer memory char* data; // circular buffer memory aligned to element alignment std::unique_ptr slots_; // number of slots currently free (named with underscore to accommodate Qt's 'slots' macro) std::unique_ptr items; // number of elements currently enqueued char cachelineFiller0[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(char*) * 2 - sizeof(std::size_t) * 2 - sizeof(std::unique_ptr) * 2]; std::size_t nextSlot; // index of next free slot to enqueue into char cachelineFiller1[MOODYCAMEL_CACHE_LINE_SIZE - sizeof(std::size_t)]; std::size_t nextItem; // index of next element to dequeue from }; }