# eventbus #### 介绍 使用nats,kafaka,rabbitmq 实现的eventbus #### 使用说明 ```rust #[derive(Serialize, Deserialize, Debug, Clone)] struct MyMessage { content: String, } #[tokio::main] async fn main() -> Result<(), Box> { // 订阅消息 let sub_handler: MessageHandler = Arc::new(move |msg: Message| { println!( "---------Received message in subscriber: {:?}--------------", msg.body ); Ok("".to_string()) }); tokio::spawn(subscribe_to_topic("my_topic", sub_handler)); // 发布消息 let message = Message::new( "my_topic".to_string(), crate::message::NativeEventAction::Other, None, Some(MyMessage { content: "Hello from publisher!".to_string(), }), None, ); tokio::spawn(publish_message(message)); // reply消息 let reply_handler: MessageHandler = Arc::new(move |msg: Message| { println!( "============Received request, preparing response: {:?}================", msg ); Ok("lw".to_string()) }); tokio::spawn(reply_to_topic("my_topic2", reply_handler)); // request消息 let request_message = Message::new( "my_topic2".to_string(), crate::message::NativeEventAction::Other, None, Some(MyMessage { content: "Hello from publisher!".to_string(), }), None, ); tokio::spawn(send_request(request_message)); sleep(Duration::from_secs(1000)).await; println!("Disconnected from NATS!"); Ok(()) } // 订阅消息并处理 async fn subscribe_to_topic(topic: &str, handler: MessageHandler) where T: Serialize + for<'de> Deserialize<'de> + Debug + Clone + Send + Sync + 'static, { let mut nats_cli = NatsCli::new().await.unwrap(); // 创建并初始化 NATS 客户端 nats_cli.subscribe(topic, handler).await; println!("Subscribed to topic: {}", topic); } // 发布消息 async fn publish_message(message: Message) -> Result<(), Box> where T: Serialize + for<'de> Deserialize<'de> + Debug + Send + Clone + Sync + 'static, { // 使用 if let 处理 NatsCli::new() let mut nats_cli = NatsCli::new().await.unwrap(); // 创建并初始化 NATS 客户端 sleep(Duration::from_secs(5)); if let Err(e) = nats_cli.publish(message).await { println!("Failed to publish message: {:?}", e); return Err(Box::new(e) as Box); } println!("---------Message published!---------------"); Ok(()) } // 收到请求并等待响应 async fn reply_to_topic( topic: &str, handler: MessageHandler, ) -> Result<(), Box> where T: Serialize + for<'de> Deserialize<'de> + Debug + Send + Clone + Sync + 'static, { let mut nats_cli = NatsCli::new().await.unwrap(); // 创建并初始化 NATS 客户端 nats_cli .reply(topic, handler) .await .map_err(|e| Box::new(e) as Box)?; Ok(()) } // 发送请求并等待响应 async fn send_request(message: Message) -> Result<(), Box> where T: Serialize + for<'de> Deserialize<'de> + Debug + Clone + Send + Sync + 'static, { let mut nats_cli = NatsCli::new().await.unwrap(); sleep(Duration::from_secs(15)); match nats_cli .request(message, time::Duration::from_secs(100)) .await { Ok(response) => { println!( "===============Received response: {:?}==================", response ); Ok(()) } Err(e) => { println!("Failed to get response: {:?}", e); Err(Box::new(e) as Box) } } } ```