use std::future::Future; use std::time::Duration; use tokio::join; use tokio::sync::{mpsc, oneshot}; use msg_channel::*; pub struct MsgHandler; impl MsgHandler { pub fn test(&mut self) {} } impl HandleSync for MsgHandler { type Replay = (); fn handle(&mut self, _msg: SyncMsgA) -> Self::Replay { println!("SyncMsg"); } } impl HandleSync for MsgHandler { type Replay = (); fn handle(&mut self, _msg: SyncMsgB) -> Self::Replay { println!("SyncMsgB"); } } impl HandleAsync for MsgHandler { type Replay = (); async fn handle(&mut self, _msg: AsyncMsg) -> Self::Replay { tokio::time::sleep(Duration::from_secs(2)).await; println!("AsyncMsg") } } impl HandleAsyncConcurrent for MsgHandler { type Replay = (); async fn handle(&self, _msg: AsyncConcurrentMsgA) -> Self::Replay { tokio::time::sleep(Duration::from_secs(2)).await; println!("AsyncConcurrentMsgA") } } impl HandleAsyncConcurrent for MsgHandler { type Replay = (); async fn handle(&self, _msg: AsyncConcurrentMsgB) -> Self::Replay { tokio::time::sleep(Duration::from_secs(1)).await; println!("AsyncConcurrentMsgB") } } impl HandleSyncConcurrent for MsgHandler { type Replay = (); fn handle(&self, _msg: SyncConcurrentMsgA) -> Self::Replay { println!("SyncConcurrentMsgA") } } impl HandleSyncConcurrent for MsgHandler { type Replay = (); fn handle(&self, _msg: SyncConcurrentMsgB) -> Self::Replay { println!("SyncConcurrentMsgB") } } pub struct SyncMsgA; pub struct SyncMsgB; pub struct AsyncMsg; pub struct SyncConcurrentMsgA; pub struct SyncConcurrentMsgB; pub struct AsyncConcurrentMsgA; pub struct AsyncConcurrentMsgB; pub struct TestMsgSet; pub struct TestMsgSet2; #[msg_set] impl MessageSet for TestMsgSet { type Handler = MsgHandler; type Async = (AsyncMsg,); type Sync = (SyncMsgA, SyncMsgB); type AsyncConcurrent = (AsyncConcurrentMsgA, AsyncConcurrentMsgB); type SyncConcurrent = (SyncConcurrentMsgA, SyncConcurrentMsgB); } pub struct MsgHandler2 { x: MessageSetSender, } pub struct TestMsgSetMsg(MessageSetItem); impl From for TestMsgSetMsg where T: Into>, { fn from(value: T) -> Self { Self(value.into()) } } #[msg_set] impl MessageSet for TestMsgSet2 { type Handler = MsgHandler2; type Async = (TestMsgSetMsg,); type Sync = (); type AsyncConcurrent = (); type SyncConcurrent = (); } impl HandleAsync for MsgHandler2 { type Replay = Result< MessageSetReplayItem, mpsc::error::SendError<( MessageSetItem, oneshot::Sender>, )>, >; async fn handle(&mut self, _msg: TestMsgSetMsg) -> Self::Replay { Ok(self.x.send(_msg.0)?.await) } } #[tokio::main] async fn main() -> color_eyre::Result<()> { let (sender, mut handler) = msg_channel::(); tokio::spawn(async move { let _r = sender.send(SyncMsgA)?.await; let _r = sender.send(SyncMsgB)?.await; let _r = sender.send(AsyncMsg)?.await; join!( sender.send(SyncConcurrentMsgA)?, sender.send(SyncConcurrentMsgB)? ); let r = sender .send(TestMsgSetSyncVariant::SyncMsgA(SyncMsgA))? .await; match r { TestMsgSetSyncReplayVariant::SyncMsgA(_) => {} TestMsgSetSyncReplayVariant::SyncMsgB(_) => {} } let r = sender .send(TestMsgSetAsyncConcurrentVariant::AsyncConcurrentMsgB( AsyncConcurrentMsgB, ))? .await; match r { TestMsgSetAsyncConcurrentReplayVariant::AsyncConcurrentMsgA(_) => {} TestMsgSetAsyncConcurrentReplayVariant::AsyncConcurrentMsgB(_) => {} } Ok::<(), color_eyre::Report>(()) }); let mut msg_handler = MsgHandler; loop { tokio::select! { _ = std::future::pending() => { msg_handler.test(); } result = async { handler.handle_next(&mut msg_handler).await } => { if let Some(_result) = result? { msg_handler.test(); }else{ break; } } } } Ok(()) }