// Copyright (C) 2023 Quickwit, Inc. // // Quickwit is offered under the AGPL v3.0 and as commercial software. // For commercial licensing, contact us at hello@quickwit.io. // // AGPL: // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as // published by the Free Software Foundation, either version 3 of the // License, or (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . use std::time::Duration; use async_trait::async_trait; use rand::prelude::IteratorRandom; use witty_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, Universe}; struct PingReceiver { name: &'static str, num_ping_received: usize, } impl PingReceiver { pub fn with_name(name: &'static str) -> Self { PingReceiver { name, num_ping_received: 0, } } } impl Actor for PingReceiver { type ObservableState = usize; fn observable_state(&self) -> Self::ObservableState { self.num_ping_received } } #[async_trait] impl Handler for PingReceiver { type Reply = String; async fn handle( &mut self, _msg: Ping, _ctx: &ActorContext, ) -> Result { self.num_ping_received += 1; Ok(format!( "Actor `{}` received {} pings", self.name, self.num_ping_received )) } } // ------------------ #[derive(Default)] struct PingSender { peers: Vec>, num_ping_emitted: usize, } #[derive(Debug)] struct Loop; #[derive(Debug)] struct Ping; #[derive(Debug)] pub struct AddPeer(Mailbox); #[async_trait] impl Actor for PingSender { type ObservableState = (); fn observable_state(&self) -> Self::ObservableState {} async fn initialize(&mut self, ctx: &ActorContext) -> Result<(), ActorExitStatus> { ctx.send_self_message(Loop).await?; Ok(()) } } #[async_trait] impl Handler for PingSender { type Reply = (); async fn handle(&mut self, _: Loop, ctx: &ActorContext) -> Result<(), ActorExitStatus> { let random_peer_id_opt = (0..self.peers.len()).choose(&mut rand::thread_rng()); if let Some(random_peer_id) = random_peer_id_opt { match ctx.ask(&self.peers[random_peer_id], Ping).await { Ok(reply_msg) => { println!("{reply_msg}"); } Err(_send_error) => { self.peers.swap_remove(random_peer_id); } } } self.num_ping_emitted += 1; if self.num_ping_emitted == 10 { return Err(ActorExitStatus::Success); } ctx.schedule_self_msg(Duration::from_secs(1), Loop).await; Ok(()) } } #[async_trait] impl Handler for PingSender { type Reply = (); async fn handle( &mut self, add_peer_msg: AddPeer, _ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { let AddPeer(peer_mailbox) = add_peer_msg; self.peers.push(peer_mailbox); Ok(()) } } #[tokio::main] async fn main() { let universe = Universe::new(); let (roger_mailbox, _) = universe .spawn_builder() .spawn(PingReceiver::with_name("Roger")); let (myriam_mailbox, _) = universe .spawn_builder() .spawn(PingReceiver::with_name("Myriam")); let (ping_sender_mailbox, ping_sender_handler) = universe.spawn_builder().spawn(PingSender::default()); ping_sender_mailbox .send_message(AddPeer(roger_mailbox)) .await .unwrap(); ping_sender_mailbox .send_message(AddPeer(myriam_mailbox)) .await .unwrap(); ping_sender_handler.join().await; }