//! Example of using `CallbackCell` to elevate crossbeam `SegQueue` (a concurrent queue which does //! not support blocking, only polling) into an mpsc queue which supports both blocking and //! async-awaiting. use callback_cell::CallbackCell; use std::{ sync::Arc, thread::{ self, sleep, }, time::Duration, }; use crossbeam::{ queue::SegQueue, sync::Parker, }; use tokio::{ sync::Notify, runtime::Runtime, }; /// Sender half to mpsc queue. pub struct Sender(Arc>); /// Receiver half to mpsc queue. pub struct Receiver(Arc>); struct State { queue: SegQueue, callback_cell: CallbackCell, } /// Create an mpsc queue. fn new_queue() -> (Sender, Receiver) { let state_1 = Arc::new(State { queue: SegQueue::new(), callback_cell: CallbackCell::new(), }); let state_2 = Arc::clone(&state_1); (Sender(state_1), Receiver(state_2)) } impl Sender { /// Send item into queue. fn send(&self, item: T) { self.0.queue.push(item); self.0.callback_cell.take_call(); } } impl Clone for Sender { fn clone(&self) -> Self { Sender(Arc::clone(&self.0)) } } impl Receiver { /// Take item from queue, blocking until one is present. fn recv_blocking(&mut self) -> T { if let Some(item) = self.0.queue.pop() { return item; } let parker = Parker::new(); loop { let unparker = parker.unparker().clone(); self.0.callback_cell.put(move || unparker.unpark()); if let Some(item) = self.0.queue.pop() { return item; } parker.park(); } } /// Future to take item from queue, yielding until one is present. async fn recv_async(&mut self) -> T { if let Some(item) = self.0.queue.pop() { return item; } let notify_1 = Arc::new(Notify::new()); loop { let notify_2 = Arc::clone(¬ify_1); self.0.callback_cell.put(move || notify_2.notify_one()); if let Some(item) = self.0.queue.pop() { return item; } notify_1.notified().await; } } } fn main() { let (send, mut recv) = new_queue(); thread::spawn(move || { for i in 0..10 { send.send(i); sleep(Duration::from_millis(100)); } }); for _ in 0..5 { let i = recv.recv_blocking(); println!("received through blocking: {}", i); } println!("entering tokio runtime"); Runtime::new().unwrap().block_on(async move { for _ in 0..5 { let i = recv.recv_async().await; println!("receiving through awaiting: {}", i); } }); }