Crates.io | feventbus |
lib.rs | feventbus |
version | 0.3.1 |
source | src |
created_at | 2024-11-19 09:55:17.388284 |
updated_at | 2024-12-04 01:46:09.273206 |
description | eventbus is implemented using nats,kafaka and rabbitmq client |
homepage | |
repository | |
max_upload_size | |
id | 1453080 |
size | 52,489 |
使用nats,kafaka,rabbitmq 实现的eventbus
/**
* Copyright(2024,)Institute of Software, Chinese Academy of Sciences
* author: jiangliuwei@iscas.ac.cn
* since: 0.1.0
*
**/
/**
* Copyright(2024,)Institute of Software, Chinese Academy of Sciences
* author: jiangliuwei@iscas.ac.cn
* since: 0.1.0
*
**/
mod err;
mod impls;
mod message;
mod tests;
mod traits;
mod config;
use crate::impls::nats::nats::NatsCli;
use crate::message::Message;
use crate::traits::consumer::Consumer;
use crate::traits::consumer::MessageHandler;
use crate::traits::controller::EventBus;
use crate::traits::producer::Producer;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::sync::Arc;
use std::time;
use std::time::Duration;
use tokio::time::sleep;
#[derive(Serialize, Deserialize, Debug, Clone)]
struct MyMessage {
content: String,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
struct YourMessage {
contentyou: String,
}
async fn ceshi(val: String)->String{
val
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let nats_cli = Arc::new(NatsCli::new().await.unwrap());
let reply_handler: MessageHandler<serde_json::Value> = Arc::new(move |msg: Message<serde_json::Value>| {
Box::pin(async move {
match msg.body {
Some(body) => {
if let Ok(my_message) = serde_json::from_value::<MyMessage>(body.clone()) {
ceshi(my_message.content.clone()).await;
Ok(my_message.content)
} else if let Ok(your_message) = serde_json::from_value::<YourMessage>(body) {
// println!("Received YourMessage: {:?}", your_message);
Ok(your_message.contentyou)
} else {
Err(crate::err::Error::MessageHandling(
"Cargo creation failed: metadata is null".to_string(),
))
}
}
None => {
Err(crate::err::Error::MessageHandling(
"Cargo creation failed: metadata is null".to_string(),
))
}
}
})
});
tokio::spawn(reply_to_topic(
"my_topic2",
Arc::clone(&nats_cli),
reply_handler,
));
// request消息
let request_message = Message::new(
"my_topic2".to_string(),
crate::message::NativeEventAction::Other,
None,
Some(MyMessage {
content: "request 123!".to_string(),
}),
None,
);
tokio::spawn(send_request(request_message, Arc::clone(&nats_cli)));
// request消息
let request_message2 = Message::new(
"my_topic2".to_string(),
crate::message::NativeEventAction::Other,
None,
Some(YourMessage {
contentyou: "request 234!".to_string(),
}),
None,
);
tokio::spawn(send_request(request_message2, Arc::clone(&nats_cli)));
let sub_handler: MessageHandler<serde_json::Value> = Arc::new(move |msg: Message<serde_json::Value>| {
Box::pin(async move {
match msg.body {
Some(body) => {
if let Ok(my_message) = serde_json::from_value::<MyMessage>(body.clone()) {
ceshi(my_message.content.clone()).await;
println!("-------{}----------",my_message.content);
Ok("---sub Response from MyMessage---".to_string())
} else if let Ok(your_message) = serde_json::from_value::<YourMessage>(body) {
// println!("Received YourMessage: {:?}", your_message);
println!("-------{}------",your_message.contentyou);
Ok("---sub Response from YourMessage---".to_string())
} else {
Err(crate::err::Error::MessageHandling(
"---Cargo creation failed: metadata is null---".to_string(),
))
}
}
None => {
Err(crate::err::Error::MessageHandling(
"---Cargo creation failed: metadata is null---".to_string(),
))
}
}
})
});
tokio::spawn(subscribe_to_topic(
"my_topic3",
Arc::clone(&nats_cli),
sub_handler,
));
// request消息
let pub_message = Message::new(
"my_topic3".to_string(),
crate::message::NativeEventAction::Other,
None,
Some(MyMessage {
content: "pub 123!".to_string(),
}),
None,
);
tokio::spawn(publish_message(pub_message, Arc::clone(&nats_cli)));
// request消息
let pub_message2 = Message::new(
"my_topic3".to_string(),
crate::message::NativeEventAction::Other,
None,
Some(YourMessage {
contentyou: "pub 234!".to_string(),
}),
None,
);
tokio::spawn(publish_message(pub_message2, Arc::clone(&nats_cli)));
sleep(Duration::from_secs(100)).await;
Ok(())
}
// 收到请求并等待响应
async fn reply_to_topic(
topic: &str,
nats_cli: Arc<NatsCli>,
handler: MessageHandler<serde_json::Value>,
) -> Result<(), Box<dyn std::error::Error + Send>>
{
nats_cli
.reply(topic, handler)
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send>)?;
Ok(())
}
// 订阅消息并处理
async fn subscribe_to_topic<T>(topic: &str, nats_cli: Arc<NatsCli>, handler: MessageHandler<T>)
where
T: Serialize + for<'de> Deserialize<'de> + Debug + Clone + Send + Sync + 'static,
{
nats_cli.subscribe(topic, handler).await;
println!("Subscribed to topic: {}", topic);
}
// 发布消息
async fn publish_message<T>(
message: Message<T>,
nats_cli: Arc<NatsCli>,
) -> Result<(), Box<dyn std::error::Error + Send>>
where
T: Serialize + for<'de> Deserialize<'de> + Debug + Send + Clone + Sync + 'static,
{
if let Err(e) = nats_cli.publish(message).await {
println!("Failed to publish message: {:?}", e);
return Err(Box::new(e) as Box<dyn std::error::Error + Send>);
}
Ok(())
}
// 发送请求并等待响应
async fn send_request<T>(
message: Message<T>,
nats_cli: Arc<NatsCli>,
) -> Result<(), Box<dyn std::error::Error + Send>>
where
T: Serialize + for<'de> Deserialize<'de> + Debug + Clone + Send + Sync + 'static,
{
match nats_cli
.request(message, time::Duration::from_secs(100))
.await
{
Ok(response) => {
println!(
"===============Received response: {:?}==================",
response
);
Ok(())
}
Err(e) => {
println!("Failed to get response: {:?}", e);
Err(Box::new(e) as Box<dyn std::error::Error + Send>)
}
}
}