danube-client

Crates.iodanube-client
lib.rsdanube-client
version0.1.5
sourcesrc
created_at2024-06-29 13:57:02.337744
updated_at2024-08-24 12:18:38.818261
descriptionThe async client for Danube Pub/Sub messsaging platform
homepage
repositoryhttps://github.com/danrusei/danube
max_upload_size
id1287389
size196,061
Dan Rusei (danrusei)

documentation

https://docs.rs/danube-client

README

Danube-client

An async Rust client library for interacting with Danube Pub/Sub messaging platform.

Danube is an open-source distributed Pub/Sub messaging platform written in Rust. Consult the documentation for supported concepts and the platform architecture.

I'm working on improving it and adding new features. Please feel free to contribute or report any issues you encounter.

Example usage

Check out the example files.

Producer

let client = DanubeClient::builder()
    .service_url("http://127.0.0.1:6650")
    .build()
    .unwrap();

let topic_name = "/default/test_topic";
let producer_name = "test_prod";

let mut producer = client
    .new_producer()
    .with_topic(topic_name)
    .with_name(producer_name)
    .build();

producer.create().await?;
println!("The Producer {} was created", producer_name);

let encoded_data = "Hello Danube".as_bytes().to_vec();

let message_id = producer.send(encoded_data, None).await?;
println!("The Message with id {} was sent", message_id);

Consumer

let client = DanubeClient::builder()
        .service_url("http://127.0.0.1:6650")
        .build()
        .unwrap();

    let topic = "/default/test_topic";
    let consumer_name = "test_cons";
    let subscription_name = "test_subs";

    let mut consumer = client
        .new_consumer()
        .with_topic(topic)
        .with_consumer_name(consumer_name)
        .with_subscription(subscription_name)
        .with_subscription_type(SubType::Exclusive)
        .build();

    // Subscribe to the topic
    consumer.subscribe().await?;
    println!("The Consumer {} was created", consumer_name);

    // Start receiving messages
    let mut message_stream = consumer.receive().await?;

    while let Some(message) = message_stream.recv().await {
        let payload = message.payload;

        let result = String::from_utf8(payload);

        match result {
            Ok(message) => println!("Received message: {:?}", message),
            Err(e) => println!("Failed to convert Payload to String: {}", e),
        }
    }

Contribution

Check the documentation on how to setup a Danube Broker.

Commit count: 177

cargo fmt