Crates.io | rocketmq-client-v4 |
lib.rs | rocketmq-client-v4 |
version | 0.4.2 |
source | src |
created_at | 2024-09-19 07:29:50.713808 |
updated_at | 2024-10-31 05:52:05.846527 |
description | rocket mq rust client for remote protocol. works on rocket mq V4 |
homepage | https://github.com/zhangyangyang3/rocketmq-client-v4 |
repository | |
max_upload_size | |
id | 1380018 |
size | 175,793 |
This is a rust rocket mq client for rocket mq version 4.
if you use rocket mq 5. you should use rocketmq-client-rs
use log::{info, LevelFilter};
use rocketmq_client_v4::consumer::message_handler::MessageHandler;
use rocketmq_client_v4::consumer::pull_consumer::MqConsumer;
use rocketmq_client_v4::protocols::body::message_body::MessageBody;
use std::sync::Arc;
use std::time::Duration;
use time::UtcOffset;
use tokio::sync::RwLock;
use rocketmq_client_v4::consumer::pull_consumer_v2::PullConsumer;
struct Handler {}
impl MessageHandler for Handler {
async fn handle(&self, message: &MessageBody) {
info!("read message:{:?}", String::from_utf8(message.body.clone()))
}
}
unsafe impl Send for Handler {
}
unsafe impl Sync for Handler {}
#[tokio::main]
pub async fn main() {
let offset = UtcOffset::from_hms(8, 0, 0).unwrap();
simple_logger::SimpleLogger::new()
.with_utc_offset(offset)
.with_level(LevelFilter::Debug)
.env()
.init()
.unwrap();
let name_addr = "192.168.3.49:9876".to_string();
let topic = "pushNoticeMessage_To".to_string();
let consume_group = "consume_pushNoticeMessage_test_2".to_string();
let consumer = PullConsumer::new(name_addr, consume_group, topic);
let handle = Arc::new(Handler {});
let lock = Arc::new(RwLock::new(true));
let run = lock.clone();
tokio::spawn(async move {
consumer.start_consume(handle, run).await;
});
tokio::time::sleep(Duration::from_secs(40)).await;
{
let mut run = lock.write().await;
*run = false;
}
tokio::time::sleep(Duration::from_secs(2)).await;
info!("quit the test")
}
use std::sync::Arc;
use std::time::Duration;
use log::{info, LevelFilter};
use time::UtcOffset;
use tokio::sync::RwLock;
use rocketmq_client_v4::consumer::message_handler::MessageHandler;
use rocketmq_client_v4::consumer::pull_consumer_v2::PullConsumer;
use rocketmq_client_v4::protocols::body::message_body::MessageBody;
struct Handler {}
impl MessageHandler for Handler {
async fn handle(&self, message: &MessageBody) {
info!("read message:{:?}", String::from_utf8(message.body.clone()))
}
}
unsafe impl Send for Handler {}
unsafe impl Sync for Handler {}
#[tokio::main]
pub async fn main() {
let offset = UtcOffset::from_hms(8, 0, 0).unwrap();
simple_logger::SimpleLogger::new()
.with_utc_offset(offset)
.with_level(LevelFilter::Debug)
.env()
.init()
.unwrap();
let name_addr = "192.168.3.49:9876".to_string();
let topic = "MessageCluster_To".to_string();
let consume_group = "Message_messageClusterInput_group".to_string();
let consumer = PullConsumer::new_broadcast_consumer(name_addr.clone(), consume_group, topic);
let handle = Arc::new(Handler {});
let lock = Arc::new(RwLock::new(true));
let run = lock.clone();
tokio::spawn(async move {
consumer.start_consume(handle.clone(), run.clone()).await;
});
tokio::time::sleep(Duration::from_secs(180)).await;
{
let mut run = lock.write().await;
*run = false;
}
tokio::time::sleep(Duration::from_secs(2)).await;
info!("quit the test")
}
#[tokio::test]
async fn send_message_test() {
let offset = UtcOffset::from_hms(8, 0, 0).unwrap();
simple_logger::SimpleLogger::new().with_utc_offset(offset).with_level(LevelFilter::Debug).env().init().unwrap();
let message_body = r#"{"id":"3910000000000056508"}"#;
let body = message_body.as_bytes().to_vec();
let name_addr = "192.168.3.49:9876".to_string();
let topic = "topic_test_007".to_string();
let mut producer = Producer::new("rust_send_group_1".to_string(), name_addr.clone()).await;
for i in 0..10 {
producer.send_message(topic.clone(), body.clone(), format!("{i}")).await.unwrap();
tokio::time::sleep(Duration::from_secs(5)).await;
}
}