use std::sync::atomic::AtomicUsize; use anyhow::Result; use futures::{join, try_join}; use once_cell::sync::OnceCell; use tracing::info; use xtor::actor::{addr::WeakAddr, context::Context, message::Handler, runner::Actor}; #[derive(Debug, Clone)] #[xtor::message(result = "isize")] struct Ping(isize); #[xtor::message(result = "()")] struct SetPingAddress(WeakAddr); struct PingActor { sleeper: isize, n: isize, counter: AtomicUsize, ping_address: OnceCell, } #[async_trait::async_trait] impl Actor for PingActor {} #[async_trait::async_trait] impl Handler for PingActor { #[tracing::instrument( skip(self, ctx), name = "PingActor::Ping", fields(addr = self.get_name_or_id_string(ctx).as_str()) )] async fn handle(&self, ctx: &Context, msg: Ping) -> Result { info!("{:?} received", &msg); match self.ping_address.get().expect("fail to get").upgrade() { Some(addr) => { let n = self.n; let _ = tokio::task::spawn(async move { addr.call_unblock::(Ping(msg.0 + n)) .await .await }); tokio::time::sleep(std::time::Duration::from_millis(self.sleeper as u64)).await; self.counter .fetch_add(1, std::sync::atomic::Ordering::SeqCst); Ok(msg.0 + 1) } None => { ctx.stop(); Ok(msg.0) } } } } #[async_trait::async_trait] impl Handler for PingActor { async fn handle(&self, _ctx: &Context, msg: SetPingAddress) -> Result<()> { self.ping_address.set(msg.0).expect("fail to set"); Ok(()) } } #[xtor::main] async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt::init(); // create actors let (ping_addr, pong_addr) = try_join!( PingActor { sleeper: 100, n: 1, counter: AtomicUsize::new(0), ping_address: OnceCell::new(), } .spawn(), PingActor { sleeper: 200, n: -1, counter: AtomicUsize::new(0), ping_address: OnceCell::new(), } .spawn(), )?; // set their name join!(ping_addr.set_name("ping"), pong_addr.set_name("pong")); // set their address to each other try_join!( ping_addr.call::(SetPingAddress(pong_addr.downgrade())), pong_addr.call::(SetPingAddress(ping_addr.downgrade())), )?; ping_addr.call::(Ping(0)).await?; tokio::time::sleep(std::time::Duration::from_millis(10000)).await; ping_addr.stop(Ok(())); pong_addr.stop(Ok(())); Ok(()) }