use std::time::{Duration, SystemTime}; use futures::{executor::block_on, join}; use once_cell::sync::OnceCell; use tracing::info; use stage::{ actor_msg, actors::{Actor, ActorCtx, ActorError, ActorResult, BuildActorOp, Message, Response}, io::timers::Timer, sys_msgs::{ActorStart, Request}, system::{ActorSystemBuilder, ActorSystemConfig}, }; static TIME_TO_EAT: OnceCell = OnceCell::new(); // START - State objects struct Fork; struct Philosopher { name: &'static str, forks: (Actor, Actor), times_eaten: u16, time_thinking: Duration, start_thinking: SystemTime, } // END - State objects // START - Message objects struct Continue; struct ForkGrant; struct ForkRelease; struct ForkRequest; struct StopSession; actor_msg!(Continue, ForkGrant, ForkRelease, ForkRequest, StopSession); // END - Message objects impl Fork { async fn handle(_: (), _: ActorCtx, msg: Message) -> ActorResult<()> { if let Some(req) = msg.try_cast::() { if req.msg().try_cast::().is_some() { let _ = req.ask_back(ForkGrant, Response::Wait).await; } } Ok(()) } } impl Philosopher { async fn handle(mut self, ctx: ActorCtx, msg: Message) -> ActorResult { if msg.try_cast::().is_some() { info!( "Philosopher {}'s survey | Total time thinking: {}ms | Servings eaten: {}", self.name, self.time_thinking.as_millis(), self.times_eaten, ); return Err(ActorError::Stopping); } else if msg.try_cast::().is_some() { info!("Philosopher {} is seated at seat (id) {}", self.name, ctx.this.get_ref()); self.start_thinking = SystemTime::now(); } let (left_fork, right_fork) = &self.forks; // Get the forks let (left_grant, right_grant) = join!( left_fork.ask(ForkRequest, Response::Wait), right_fork.ask(ForkRequest, Response::Wait) ); let (left_grant, right_grant) = (left_grant.unwrap().unwrap(), right_grant.unwrap().unwrap()); let (left_grant, right_grant): (&Request, &Request) = (left_grant.try_cast::().unwrap(), right_grant.try_cast::().unwrap()); // Collect our thoughts let time_thinking = SystemTime::now().duration_since(self.start_thinking).unwrap(); self.time_thinking += time_thinking; // Eat let stop_at = SystemTime::now() + *TIME_TO_EAT.get().unwrap(); Timer::new(stop_at, ()).await; self.times_eaten += 1; // Return forks left_grant.respond(ForkRelease); right_grant.respond(ForkRelease); // Continue thinking ctx.this.send(Continue).unwrap(); self.start_thinking = SystemTime::now(); Ok(self) } } pub fn main() { let _ = TIME_TO_EAT.set(Duration::from_millis(10)); let sub = tracing_subscriber::FmtSubscriber::builder() .with_max_level(tracing::Level::INFO) // .with_env_filter(EnvFilter::from_env("stage=info")) // .with_env_filter(EnvFilter::from_env("stage=info")) .finish(); let _ = tracing::subscriber::set_global_default(sub); let cfg = ActorSystemConfig::default().threads(5); let sys = ActorSystemBuilder::new(cfg).start(); let handle = sys.get_handle(); let forks = (0..5) .map(|_| { let build = BuildActorOp::new(None, (), Fork::handle); block_on(handle.new_actor(build)).unwrap() }) .collect::>(); let philosophers = [ ("Alpha", 0_usize, 1_usize), ("Beta", 1_usize, 2_usize), ("Gamma", 2_usize, 3_usize), ("Delta", 3_usize, 4_usize), ("Epsilon", 4_usize, 0_usize), ] .iter() .map(|(name, left, right)| { let left = forks.get(*left).unwrap().clone(); let right = forks.get(*right).unwrap().clone(); let p = Philosopher { name: *name, forks: (left, right), times_eaten: 0, time_thinking: Duration::new(0, 0), start_thinking: SystemTime::now(), }; let build = BuildActorOp::new(None, p, Philosopher::handle); block_on(handle.new_actor(build)).unwrap() }) .collect::>(); philosophers.iter().for_each(|p| { let _ = p.send_after(StopSession, Duration::from_secs(5)); }); std::thread::sleep(Duration::from_secs(6)) }