high-level-kafka

Crates.iohigh-level-kafka
lib.rshigh-level-kafka
version0.2.4
sourcesrc
created_at2023-06-11 14:53:20.143157
updated_at2023-12-31 20:29:30.588188
descriptionHigh Level Kafka client for Rust
homepage
repositoryhttps://github.com/keaz/high-level-kafka
max_upload_size
id887426
size58,196
Kasun Ranasinghe (keaz)

documentation

README

A High Level Kafka client library for Rust

A ldap client library that wraps rdkafka to make it easy to use. Message parsing is done with serde.

Status of the project

Currently this is in early beta stage. Library only support use asynchronously.

Usage

cargo add high-level-kafka

Examples

Producer

#[tokio::main]
async fn main() -> Result<()>{
    let producer_options = publisher::ProducerOptiopns::from(
        "localhost:9092".to_string(),
        "5000".to_string(),
        5,
        HashMap::new(),
    );
    let publisher = publisher::KafkaProducer::with_options(producer_options).unwrap();
    let data  = Data {
        attra_one: "123".to_string(),
        attra_two: 12,
    };

    let mut headers = HashMap::new();
    headers.insert("header_one".to_string(), "value_one".to_string());
    headers.insert("header_two".to_string(), "value_two".to_string());

    let data = Message::new("topic".to_string(), headers, data, "key".to_string());
    let result = publisher.produce(data).await;

    Ok(())
}

#[derive(Serialize, Deserialize, Debug)]
struct Data {
    attra_one: String,
    attra_two: i8,
}

Consumer

#[tokio::main]
async fn main() -> Result<()>{

   let consumer = Consumer::from("group_id", "localhost:9092");
    let mut_consumer = Arc::new(Mutex::new(consumer));
    let mut con = mut_consumer.clone().lock_owned().await;
    con.subscribe_to_topic(
        "topic".to_string(),
        |data: Data, medatad: Metadata| async move {
            info!("data: {:?}, metadata: {:?}", data, medatad);
        },
    )
    .await;
}

#[derive(Serialize, Deserialize, Debug)]
struct Data {
    attra_one: String,
    attra_two: i8,
}

PausableConsumer

This consumer can be paused and resumed. It is useful when you want to pause the consumer for a while and then resume it. Note:: This is not production ready (version 0.0.1).

#[tokio::main]
async fn main() -> Result<()>{

    let publisher = publisher::KafkaProducer::from("localhost:9092");
    let data = Data {
        attra_one: "one".to_string(),
        attra_two: 2,
    };
    let message = publisher::Message::new(
        "topic".to_string(),
        HashMap::new(),
        data,
        "some_key".to_string(),
    );
    let _result = publisher.produce(message).await;
}

#[derive(Serialize, Deserialize, Debug)]
struct Data {
    attra_one: String,
    attra_two: i8,
}
Commit count: 10

cargo fmt