rocketmq-client-v4

Crates.iorocketmq-client-v4
lib.rsrocketmq-client-v4
version0.4.2
sourcesrc
created_at2024-09-19 07:29:50.713808
updated_at2024-10-31 05:52:05.846527
descriptionrocket mq rust client for remote protocol. works on rocket mq V4
homepagehttps://github.com/zhangyangyang3/rocketmq-client-v4
repository
max_upload_size
id1380018
size175,793
thomas.yang (zhangyangyang3)

documentation

README

desc

This is a rust rocket mq client for rocket mq version 4.
if you use rocket mq 5. you should use rocketmq-client-rs

how to use

consumer


use log::{info, LevelFilter};
use rocketmq_client_v4::consumer::message_handler::MessageHandler;
use rocketmq_client_v4::consumer::pull_consumer::MqConsumer;
use rocketmq_client_v4::protocols::body::message_body::MessageBody;
use std::sync::Arc;
use std::time::Duration;
use time::UtcOffset;
use tokio::sync::RwLock;
use rocketmq_client_v4::consumer::pull_consumer_v2::PullConsumer;

struct Handler {}
impl MessageHandler for Handler {
    async fn handle(&self, message: &MessageBody) {
        info!("read message:{:?}", String::from_utf8(message.body.clone()))
    }
}

unsafe impl Send for Handler {
}

unsafe impl Sync for Handler {}
#[tokio::main]
pub async fn main() {
    let offset = UtcOffset::from_hms(8, 0, 0).unwrap();
    simple_logger::SimpleLogger::new()
        .with_utc_offset(offset)
        .with_level(LevelFilter::Debug)
        .env()
        .init()
        .unwrap();

    let name_addr = "192.168.3.49:9876".to_string();
    let topic = "pushNoticeMessage_To".to_string();
    let consume_group = "consume_pushNoticeMessage_test_2".to_string();
    let consumer = PullConsumer::new(name_addr, consume_group, topic);

    let handle = Arc::new(Handler {});
    let lock = Arc::new(RwLock::new(true));
    let run = lock.clone();
    tokio::spawn(async move {
        consumer.start_consume(handle, run).await;
    });
    tokio::time::sleep(Duration::from_secs(40)).await;
    {
        let mut run = lock.write().await;
        *run = false;
    }
    tokio::time::sleep(Duration::from_secs(2)).await;
    info!("quit the test")
}


broadcast consumer

use std::sync::Arc;
use std::time::Duration;
use log::{info, LevelFilter};
use time::UtcOffset;
use tokio::sync::RwLock;
use rocketmq_client_v4::consumer::message_handler::MessageHandler;
use rocketmq_client_v4::consumer::pull_consumer_v2::PullConsumer;
use rocketmq_client_v4::protocols::body::message_body::MessageBody;


struct Handler {}
impl MessageHandler for Handler {
    async fn handle(&self, message: &MessageBody) {
        info!("read message:{:?}", String::from_utf8(message.body.clone()))
    }
}

unsafe impl Send for Handler {}

unsafe impl Sync for Handler {}
#[tokio::main]
pub async fn main() {
    let offset = UtcOffset::from_hms(8, 0, 0).unwrap();
    simple_logger::SimpleLogger::new()
        .with_utc_offset(offset)
        .with_level(LevelFilter::Debug)
        .env()
        .init()
        .unwrap();

    let name_addr = "192.168.3.49:9876".to_string();

    let topic = "MessageCluster_To".to_string();
    let consume_group = "Message_messageClusterInput_group".to_string();
    let consumer = PullConsumer::new_broadcast_consumer(name_addr.clone(), consume_group, topic);


    let handle = Arc::new(Handler {});
    let lock = Arc::new(RwLock::new(true));
    let run = lock.clone();

    tokio::spawn(async move {
        consumer.start_consume(handle.clone(), run.clone()).await;
    });
    tokio::time::sleep(Duration::from_secs(180)).await;
    {
        let mut run = lock.write().await;
        *run = false;
    }
    tokio::time::sleep(Duration::from_secs(2)).await;
    info!("quit the test")
}



producer

#[tokio::test]
    async fn send_message_test() {

        let offset = UtcOffset::from_hms(8, 0, 0).unwrap();
        simple_logger::SimpleLogger::new().with_utc_offset(offset).with_level(LevelFilter::Debug).env().init().unwrap();

        let message_body = r#"{"id":"3910000000000056508"}"#;
        let body = message_body.as_bytes().to_vec();

        let name_addr = "192.168.3.49:9876".to_string();
        let topic = "topic_test_007".to_string();

        let mut producer = Producer::new("rust_send_group_1".to_string(), name_addr.clone()).await;
        for i in 0..10 {
            producer.send_message(topic.clone(), body.clone(), format!("{i}")).await.unwrap();
            tokio::time::sleep(Duration::from_secs(5)).await;
        }
    }


todo *

  • tag supports
  • connect me

    zyy20101289@outlook.com

    Commit count: 0

    cargo fmt