# eventbus #### 介绍 使用nats,kafaka,rabbitmq 实现的eventbus #### 使用说明 ```rust /** * Copyright(2024,)Institute of Software, Chinese Academy of Sciences * author: jiangliuwei@iscas.ac.cn * since: 0.1.0 * **/ /** * Copyright(2024,)Institute of Software, Chinese Academy of Sciences * author: jiangliuwei@iscas.ac.cn * since: 0.1.0 * **/ mod err; mod impls; mod message; mod tests; mod traits; mod config; use crate::impls::nats::nats::NatsCli; use crate::message::Message; use crate::traits::consumer::Consumer; use crate::traits::consumer::MessageHandler; use crate::traits::controller::EventBus; use crate::traits::producer::Producer; use serde::{Deserialize, Serialize}; use std::fmt::Debug; use std::sync::Arc; use std::time; use std::time::Duration; use tokio::time::sleep; #[derive(Serialize, Deserialize, Debug, Clone)] struct MyMessage { content: String, } #[derive(Serialize, Deserialize, Debug, Clone)] struct YourMessage { contentyou: String, } async fn ceshi(val: String)->String{ val } #[tokio::main] async fn main() -> Result<(), Box> { let nats_cli = Arc::new(NatsCli::new().await.unwrap()); let reply_handler: MessageHandler = Arc::new(move |msg: Message| { Box::pin(async move { match msg.body { Some(body) => { if let Ok(my_message) = serde_json::from_value::(body.clone()) { ceshi(my_message.content.clone()).await; Ok(my_message.content) } else if let Ok(your_message) = serde_json::from_value::(body) { // println!("Received YourMessage: {:?}", your_message); Ok(your_message.contentyou) } else { Err(crate::err::Error::MessageHandling( "Cargo creation failed: metadata is null".to_string(), )) } } None => { Err(crate::err::Error::MessageHandling( "Cargo creation failed: metadata is null".to_string(), )) } } }) }); tokio::spawn(reply_to_topic( "my_topic2", Arc::clone(&nats_cli), reply_handler, )); // request消息 let request_message = Message::new( "my_topic2".to_string(), crate::message::NativeEventAction::Other, None, Some(MyMessage { content: "request 123!".to_string(), }), None, ); tokio::spawn(send_request(request_message, Arc::clone(&nats_cli))); // request消息 let request_message2 = Message::new( "my_topic2".to_string(), crate::message::NativeEventAction::Other, None, Some(YourMessage { contentyou: "request 234!".to_string(), }), None, ); tokio::spawn(send_request(request_message2, Arc::clone(&nats_cli))); let sub_handler: MessageHandler = Arc::new(move |msg: Message| { Box::pin(async move { match msg.body { Some(body) => { if let Ok(my_message) = serde_json::from_value::(body.clone()) { ceshi(my_message.content.clone()).await; println!("-------{}----------",my_message.content); Ok("---sub Response from MyMessage---".to_string()) } else if let Ok(your_message) = serde_json::from_value::(body) { // println!("Received YourMessage: {:?}", your_message); println!("-------{}------",your_message.contentyou); Ok("---sub Response from YourMessage---".to_string()) } else { Err(crate::err::Error::MessageHandling( "---Cargo creation failed: metadata is null---".to_string(), )) } } None => { Err(crate::err::Error::MessageHandling( "---Cargo creation failed: metadata is null---".to_string(), )) } } }) }); tokio::spawn(subscribe_to_topic( "my_topic3", Arc::clone(&nats_cli), sub_handler, )); // request消息 let pub_message = Message::new( "my_topic3".to_string(), crate::message::NativeEventAction::Other, None, Some(MyMessage { content: "pub 123!".to_string(), }), None, ); tokio::spawn(publish_message(pub_message, Arc::clone(&nats_cli))); // request消息 let pub_message2 = Message::new( "my_topic3".to_string(), crate::message::NativeEventAction::Other, None, Some(YourMessage { contentyou: "pub 234!".to_string(), }), None, ); tokio::spawn(publish_message(pub_message2, Arc::clone(&nats_cli))); sleep(Duration::from_secs(100)).await; Ok(()) } // 收到请求并等待响应 async fn reply_to_topic( topic: &str, nats_cli: Arc, handler: MessageHandler, ) -> Result<(), Box> { nats_cli .reply(topic, handler) .await .map_err(|e| Box::new(e) as Box)?; Ok(()) } // 订阅消息并处理 async fn subscribe_to_topic(topic: &str, nats_cli: Arc, handler: MessageHandler) where T: Serialize + for<'de> Deserialize<'de> + Debug + Clone + Send + Sync + 'static, { nats_cli.subscribe(topic, handler).await; println!("Subscribed to topic: {}", topic); } // 发布消息 async fn publish_message( message: Message, nats_cli: Arc, ) -> Result<(), Box> where T: Serialize + for<'de> Deserialize<'de> + Debug + Send + Clone + Sync + 'static, { if let Err(e) = nats_cli.publish(message).await { println!("Failed to publish message: {:?}", e); return Err(Box::new(e) as Box); } Ok(()) } // 发送请求并等待响应 async fn send_request( message: Message, nats_cli: Arc, ) -> Result<(), Box> where T: Serialize + for<'de> Deserialize<'de> + Debug + Clone + Send + Sync + 'static, { match nats_cli .request(message, time::Duration::from_secs(100)) .await { Ok(response) => { println!( "===============Received response: {:?}==================", response ); Ok(()) } Err(e) => { println!("Failed to get response: {:?}", e); Err(Box::new(e) as Box) } } } ```