use std::pin::pin; use async_main::{async_main, LocalSpawner, Spawn}; use futures::future; use shared_cell::{Shared, TaskGroup}; use whisk::Channel; enum Command { Increment(u32, Channel), Double(Channel), } struct Context { counter: u32, } struct Actor(Channel>); impl Actor { /// Create a new `Actor` fn new(spawner: &LocalSpawner) -> Self { let channel = Channel::new(); spawner.spawn_local({ let channel = channel.clone(); async move { Self::worker(channel).await } }); Self(channel) } pub async fn increment(&self, x: u32) -> u32 { let oneshot = Channel::new(); self.0 .send(Some(Command::Increment(x, oneshot.clone()))) .await; oneshot.await } pub async fn double(&self) -> u32 { let oneshot = Channel::new(); self.0.send(Some(Command::Double(oneshot.clone()))).await; oneshot.await } pub async fn shutdown(&self) { self.0.send(None).await; } async fn next( tasks: &mut TaskGroup<'_, Context, ()>, channel: &mut Channel>, ) -> Option { loop { let mut advance = pin!(tasks.advance()); if let future::Either::Left((command, _)) = future::select(&mut *channel, &mut advance).await { break command; } } } /// Worker thread for this actor async fn worker(mut channel: Channel>) { let mut context = Context { counter: 0 }; let mut tasks = TaskGroup::new(&mut context); while let Some(command) = Self::next(&mut tasks, &mut channel).await { use Command::*; match command { Increment(x, oneshot) => { shared_cell::spawn!(tasks, increment(x, oneshot)) } Double(oneshot) => { shared_cell::spawn!(tasks, double(oneshot)) } } } tasks.finish().await; println!("Worker task is going down!"); } } async fn increment( cx: &mut Shared<'_, Context>, x: u32, oneshot: Channel, ) { let counter = cx.with(|cx| { cx.counter += x; cx.counter }); oneshot.send(counter).await; } async fn double(cx: &mut Shared<'_, Context>, oneshot: Channel) { let counter = cx.with(|cx| { cx.counter *= 2; cx.counter }); oneshot.send(counter).await; } #[async_main] async fn main(spawner: LocalSpawner) { let actor = Actor::new(&spawner); let mut list = Vec::new(); list.push(actor.increment(1).await); list.push(actor.double().await); list.push(actor.increment(2).await); list.push(actor.double().await); list.push(actor.increment(5).await); list.push(actor.double().await); assert_eq!(list, [1, 2, 4, 8, 13, 26]); actor.shutdown().await; println!("Sent shutdown event"); }