zirv-kafka

Crates.iozirv-kafka
lib.rszirv-kafka
version
sourcesrc
created_at2025-04-11 18:50:10.54466+00
updated_at2025-04-13 10:55:58.860337+00
descriptionA convinient wrapper for rdkafka.
homepage
repositoryhttps://github.com/Glubiz/zirv-kafka
max_upload_size
id1630130
Cargo.toml error:TOML parse error at line 17, column 1 | 17 | autolib = false | ^^^^^^^ unknown field `autolib`, expected one of `name`, `version`, `edition`, `authors`, `description`, `readme`, `license`, `repository`, `homepage`, `documentation`, `build`, `resolver`, `links`, `default-run`, `default_dash_run`, `rust-version`, `rust_dash_version`, `rust_version`, `license-file`, `license_dash_file`, `license_file`, `licenseFile`, `license_capital_file`, `forced-target`, `forced_dash_target`, `autobins`, `autotests`, `autoexamples`, `autobenches`, `publish`, `metadata`, `keywords`, `categories`, `exclude`, `include`
size0
Jonathan Solskov (Glubiz)

documentation

https://docs.rs/zirv-kafka

README

zirv-kafka

A convenient wrapper around the rdkafka crate that simplifies working with Apache Kafka in Rust applications.

crates.io Documentation CI Pipeline License

Features

  • Easy Kafka producer initialization and management through global instance
  • Simplified message production with convenient macros
  • Streamlined consumer setup and message handling
  • Background thread management for continuous message polling

Installation

Add zirv-kafka to your Cargo.toml:

[dependencies]
zirv-kafka = "0.2.0"

Producer Usage

Initialize the producer

Initialize the Kafka producer early in your application:

use zirv_kafka::init_kafka_producer;

#[tokio::main]
async fn main() {
    init_kafka_producer!().await;
    
    // Your application logic...
}

Send messages

Send messages to Kafka topics using the produce_message! macro:

use zirv_kafka::produce_message;

async fn notify_user_updated(user_id: &str, data: &str) {
    produce_message!("user-updated", user_id, data).await;
}

For more direct control, you can access the producer directly:

use zirv_kafka::get_kafka_producer;
use rdkafka::producer::FutureRecord;

async fn send_custom_message() {
    let producer = get_kafka_producer!();
    let record = FutureRecord::to("my-topic")
        .payload("message content")
        .key("message-key");
    
    let result = producer.send(record, std::time::Duration::from_secs(1)).await;
    // Handle the result...
}

Consumer Usage

Create a consumer and start processing messages

Use the start_base_consumer! macro to initialize a consumer and process messages:

use zirv_kafka::start_base_consumer;

fn main() {
    // Start a consumer that processes messages from the "user-events" topic
    let handle = start_base_consumer!("user-service", &["user-events"], |msg_result| {
        match msg_result {
            Ok(msg) => {
                if let Some(payload) = msg.payload_view::<str>() {
                    if let Ok(content) = payload {
                        println!("Received message: {}", content);
                        // Process the message...
                    }
                }
            },
            Err(e) => eprintln!("Error while consuming message: {:?}", e),
        }
    });
    
    // The consumer runs in the background
    
    // When you're done with the consumer (optional)
    // handle.join().unwrap();
}

For more control over the consumer, you can use the lower-level functions:

use rdkafka::message::BorrowedMessage;
use std::sync::Arc;
use zirv_kafka::consumer::{init_base_consumer, start_consumer_thread};

fn main() {
    let consumer = init_base_consumer("my-app", &["events-topic"])
        .expect("Failed to create consumer");
    
    let arc_consumer = Arc::new(consumer);
    
    let handle = start_consumer_thread(arc_consumer, |msg| {
        // Custom message handling logic
    });
    
    // Later, when shutting down:
    // handle.join().unwrap();
}

Configuration

By default, the library uses the following configuration:

  • KAFKA_BROKERS environment variable with a default of "localhost:9092" for consumers
  • For producers, it uses zirv-config to read:
    • kafka.bootstrap_servers (default: "localhost:9092")
    • kafka.message_timeout_ms (default: 5000)

Requirements

  • Rust (2021 edition or later)
  • librdkafka development libraries if using dynamic linking

License

This project is licensed under either:

Commit count: 0

cargo fmt