//! Ring benchmark: //! //! > Create N processes in a ring. Send a message round the ring M times so //! > that a total of N*M messages get sent. Time how long this takes for //! > different values of N and M. //! //! For more info, see `print_help`. // NOTE: Unbounded inboxes are used in this example for speed. use std::env; use std::process; use std::time::SystemTime; use aktrs::actor::{self, Behavior, Behaviors, InboxKind, Pid, SpawnOptions}; use anyhow::Context; fn print_help() { println!( r#"aktrs-example-ring Spawns a group of actors in a ring and times how long it takes to loop a message around the ring a certain number of times. USAGE: cargo run --release --example ring ARGUMENTS: N The number of worker actors to spawn in the ring M The number of times the message should be looped "# ); } /// The payload that will be sent around the actor ring. /// /// This contains a count that will be incremented every time the message is /// sent, and a `Pid` pointing to the parent actor that is monitoring the /// workers. struct Payload(usize, Pid<()>); fn main() -> aktrs::Result<()> { let mut args = env::args(); let _ = args.next(); let (n, m) = match (args.next(), args.next()) { (Some(n), Some(m)) => (n, m), _ => { print_help(); process::exit(0); } }; let n: usize = n.parse().with_context(|| format!("failed to parse '{}' as a usize", n))?; let m: usize = m.parse().with_context(|| format!("failed to parse '{}' as a usize", m))?; if n < 2 { eprintln!("Error: N must be at least 2!"); process::exit(1); } if m < 1 { eprintln!("Error: M must be at least 1!"); process::exit(1); } let mut sys = actor::System::new()?; sys.spawn( coordinator(n, m), SpawnOptions { inbox_kind: InboxKind::Unbounded, ..SpawnOptions::default() }, ); let now = SystemTime::now(); sys.run()?; let elapsed = now.elapsed()?; println!("Time taken: {}.{:06} seconds", elapsed.as_secs(), elapsed.subsec_micros()); Ok(()) } // Creates a coordinator actor that will spawn the actor ring and wait for it // to complete. pub fn coordinator(num_actors: usize, num_msgs: usize) -> Behavior<()> { let lim = num_actors * num_msgs; Behaviors::with_context(move |cx| { let first = Behaviors::with_context(move |cx| { let mut next = cx.spawn( worker(lim, cx.this()), SpawnOptions { inbox_kind: InboxKind::Unbounded, ..SpawnOptions::default() }, ); for _ in 0..num_actors - 2 { next = cx.spawn( worker(lim, next), SpawnOptions { inbox_kind: InboxKind::Unbounded, ..SpawnOptions::default() }, ); } Ok(worker(lim, next)) }); // Spawn the ring of actors. let mut ring = cx.spawn( first, SpawnOptions { inbox_kind: InboxKind::Unbounded, ..SpawnOptions::default() }, ); // Kick off the round robin. ring.try_tell(Payload(0, cx.this()))?; // Wait for a worker to signal that the benchmark is complete. Ok(Behaviors::receive_message(|_, _: ()| Ok(Behaviors::stopped())).into()) }) } // Creates a worker that forwards a message to the next actor in the ring. fn worker(lim: usize, mut next: Pid) -> Behavior { Behaviors::receive_message(move |_, Payload(count, mut parent): Payload| { if count >= lim { parent.try_tell(())?; Ok(Behaviors::stopped()) } else { next.try_tell(Payload(count + 1, parent))?; Ok(Behaviors::same()) } }) .into() }