Crates.io | testeventbus |
lib.rs | testeventbus |
version | 0.1.0 |
source | src |
created_at | 2024-11-21 03:08:13.313615 |
updated_at | 2024-11-21 03:08:13.313615 |
description | eventbus is implemented using nats,kafaka and rabbitmq client |
homepage | |
repository | |
max_upload_size | |
id | 1455644 |
size | 46,043 |
使用nats,kafaka,rabbitmq 实现的eventbus
#[derive(Serialize, Deserialize, Debug, Clone)]
struct MyMessage {
content: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 订阅消息
let sub_handler: MessageHandler<MyMessage> = Arc::new(move |msg: Message<MyMessage>| {
println!(
"---------Received message in subscriber: {:?}--------------",
msg.body
);
Ok("".to_string())
});
tokio::spawn(subscribe_to_topic("my_topic", sub_handler));
// 发布消息
let message = Message::new(
"my_topic".to_string(),
crate::message::NativeEventAction::Other,
None,
Some(MyMessage {
content: "Hello from publisher!".to_string(),
}),
None,
);
tokio::spawn(publish_message(message));
// reply消息
let reply_handler: MessageHandler<MyMessage> = Arc::new(move |msg: Message<MyMessage>| {
println!(
"============Received request, preparing response: {:?}================",
msg
);
Ok("lw".to_string())
});
tokio::spawn(reply_to_topic("my_topic2", reply_handler));
// request消息
let request_message = Message::new(
"my_topic2".to_string(),
crate::message::NativeEventAction::Other,
None,
Some(MyMessage {
content: "Hello from publisher!".to_string(),
}),
None,
);
tokio::spawn(send_request(request_message));
sleep(Duration::from_secs(1000)).await;
println!("Disconnected from NATS!");
Ok(())
}
// 订阅消息并处理
async fn subscribe_to_topic<T>(topic: &str, handler: MessageHandler<T>)
where
T: Serialize + for<'de> Deserialize<'de> + Debug + Clone + Send + Sync + 'static,
{
let mut nats_cli = NatsCli::new().await.unwrap(); // 创建并初始化 NATS 客户端
nats_cli.subscribe(topic, handler).await;
println!("Subscribed to topic: {}", topic);
}
// 发布消息
async fn publish_message<T>(message: Message<T>) -> Result<(), Box<dyn std::error::Error + Send>>
where
T: Serialize + for<'de> Deserialize<'de> + Debug + Send + Clone + Sync + 'static,
{
// 使用 if let 处理 NatsCli::new()
let mut nats_cli = NatsCli::new().await.unwrap(); // 创建并初始化 NATS 客户端
sleep(Duration::from_secs(5));
if let Err(e) = nats_cli.publish(message).await {
println!("Failed to publish message: {:?}", e);
return Err(Box::new(e) as Box<dyn std::error::Error + Send>);
}
println!("---------Message published!---------------");
Ok(())
}
// 收到请求并等待响应
async fn reply_to_topic<T>(
topic: &str,
handler: MessageHandler<T>,
) -> Result<(), Box<dyn std::error::Error + Send>>
where
T: Serialize + for<'de> Deserialize<'de> + Debug + Send + Clone + Sync + 'static,
{
let mut nats_cli = NatsCli::new().await.unwrap(); // 创建并初始化 NATS 客户端
nats_cli
.reply(topic, handler)
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send>)?;
Ok(())
}
// 发送请求并等待响应
async fn send_request<T>(message: Message<T>) -> Result<(), Box<dyn std::error::Error + Send>>
where
T: Serialize + for<'de> Deserialize<'de> + Debug + Clone + Send + Sync + 'static,
{
let mut nats_cli = NatsCli::new().await.unwrap();
sleep(Duration::from_secs(15));
match nats_cli
.request(message, time::Duration::from_secs(100))
.await
{
Ok(response) => {
println!(
"===============Received response: {:?}==================",
response
);
Ok(())
}
Err(e) => {
println!("Failed to get response: {:?}", e);
Err(Box::new(e) as Box<dyn std::error::Error + Send>)
}
}
}