#include #include #include #include #include #include #include template struct LFQThreadData { LFQThreadData() : count_(0) {} std::atomic count_; std::shared_ptr q_ = std::make_shared(); std::shared_ptr ready_ = std::make_shared(); std::mutex cs_map_; std::set thread_map_; }; template static int PushThread(const int id, std::shared_ptr> data) { ++data->count_; data->ready_->wait(); data->q_->enqueue(id); std::unique_lock lk(data->cs_map_); data->thread_map_.erase(id); return 0; } template static int PullThread(const int id, std::shared_ptr> data) { ++data->count_; data->ready_->wait(); int val; CHECK_EQ(data->q_->try_dequeue(val), true); std::unique_lock lk(data->cs_map_); data->thread_map_.erase(id); return 0; } template static int BlockingPullThread(const int id, std::shared_ptr> data) { ++data->count_; data->ready_->wait(); int val; data->q_->wait_dequeue(val); std::unique_lock lk(data->cs_map_); data->thread_map_.erase(id); return 0; } static inline std::string TName(const std::string& s, int x) { return s + "-" + std::to_string(x); } TEST(Lockfree, ConcurrentQueue) { dmlc::ThreadGroup threads; const size_t ITEM_COUNT = 100; auto data = std::make_shared>>(); for(size_t x = 0; x < ITEM_COUNT; ++x) { std::unique_lock lk(data->cs_map_); data->thread_map_.insert(x); threads.create(TName("PushThread", x), true, PushThread>, x, data); } while(data->count_ < ITEM_COUNT) { std::this_thread::sleep_for(std::chrono::milliseconds(1)); } data->ready_->signal(); size_t remaining = ITEM_COUNT; do { std::this_thread::sleep_for(std::chrono::milliseconds(10)); std::unique_lock lk(data->cs_map_); remaining = data->thread_map_.size(); } while (remaining); size_t count = data->q_->size_approx(); GTEST_ASSERT_EQ(count, ITEM_COUNT); threads.join_all(); GTEST_ASSERT_EQ(threads.size(), 0U); for(size_t x = 0; x < ITEM_COUNT; ++x) { std::unique_lock lk(data->cs_map_); data->thread_map_.insert(x); // Just to mix things up, don't auto-remove threads.create(TName("PullThread", x), false, PullThread>, x, data); } data->ready_->signal(); threads.join_all(); GTEST_ASSERT_EQ(threads.size(), 0U); count = data->q_->size_approx(); GTEST_ASSERT_EQ(count, 0UL); } TEST(Lockfree, BlockingConcurrentQueue) { using BlockingQueue = dmlc::moodycamel::BlockingConcurrentQueue< int, dmlc::moodycamel::ConcurrentQueueDefaultTraits>; using BlockingQueue = dmlc::moodycamel::BlockingConcurrentQueue< int, dmlc::moodycamel::ConcurrentQueueDefaultTraits>; dmlc::ThreadGroup threads; const size_t ITEM_COUNT = 100; auto data = std::make_shared>(); for(size_t x = 0; x < ITEM_COUNT; ++x) { std::unique_lock lk(data->cs_map_); data->thread_map_.insert(x); // Just to mix things up, don't auto-remove threads.create(TName("PushThread", x), false, PushThread, x, data); } while(data->count_ < ITEM_COUNT) { std::this_thread::sleep_for(std::chrono::milliseconds(1)); } data->ready_->signal(); size_t remaining = ITEM_COUNT; do { std::this_thread::sleep_for(std::chrono::milliseconds(10)); std::unique_lock lk(data->cs_map_); remaining = data->thread_map_.size(); } while (remaining); size_t count = data->q_->size_approx(); GTEST_ASSERT_EQ(count, ITEM_COUNT); threads.join_all(); GTEST_ASSERT_EQ(threads.size(), 0U); for(size_t x = 0; x < ITEM_COUNT; ++x) { std::unique_lock lk(data->cs_map_); data->thread_map_.insert(x); threads.create(TName("BlockingPullThread", x), true, BlockingPullThread, x, data); } data->ready_->signal(); threads.join_all(); GTEST_ASSERT_EQ(threads.size(), 0U); count = data->q_->size_approx(); GTEST_ASSERT_EQ(count, 0UL); }