| Crates.io | zirv-kafka |
| lib.rs | zirv-kafka |
| version | 0.2.1 |
| created_at | 2025-04-11 18:50:10.54466+00 |
| updated_at | 2025-04-13 10:55:58.860337+00 |
| description | A convinient wrapper for rdkafka. |
| homepage | |
| repository | https://github.com/Glubiz/zirv-kafka |
| max_upload_size | |
| id | 1630130 |
| size | 35,695 |
A convenient wrapper around the rdkafka crate that simplifies working with Apache Kafka in Rust applications.
Add zirv-kafka to your Cargo.toml:
[dependencies]
zirv-kafka = "0.2.0"
Initialize the Kafka producer early in your application:
use zirv_kafka::init_kafka_producer;
#[tokio::main]
async fn main() {
init_kafka_producer!().await;
// Your application logic...
}
Send messages to Kafka topics using the produce_message! macro:
use zirv_kafka::produce_message;
async fn notify_user_updated(user_id: &str, data: &str) {
produce_message!("user-updated", user_id, data).await;
}
For more direct control, you can access the producer directly:
use zirv_kafka::get_kafka_producer;
use rdkafka::producer::FutureRecord;
async fn send_custom_message() {
let producer = get_kafka_producer!();
let record = FutureRecord::to("my-topic")
.payload("message content")
.key("message-key");
let result = producer.send(record, std::time::Duration::from_secs(1)).await;
// Handle the result...
}
Use the start_base_consumer! macro to initialize a consumer and process messages:
use zirv_kafka::start_base_consumer;
fn main() {
// Start a consumer that processes messages from the "user-events" topic
let handle = start_base_consumer!("user-service", &["user-events"], |msg_result| {
match msg_result {
Ok(msg) => {
if let Some(payload) = msg.payload_view::<str>() {
if let Ok(content) = payload {
println!("Received message: {}", content);
// Process the message...
}
}
},
Err(e) => eprintln!("Error while consuming message: {:?}", e),
}
});
// The consumer runs in the background
// When you're done with the consumer (optional)
// handle.join().unwrap();
}
For more control over the consumer, you can use the lower-level functions:
use rdkafka::message::BorrowedMessage;
use std::sync::Arc;
use zirv_kafka::consumer::{init_base_consumer, start_consumer_thread};
fn main() {
let consumer = init_base_consumer("my-app", &["events-topic"])
.expect("Failed to create consumer");
let arc_consumer = Arc::new(consumer);
let handle = start_consumer_thread(arc_consumer, |msg| {
// Custom message handling logic
});
// Later, when shutting down:
// handle.join().unwrap();
}
By default, the library uses the following configuration:
KAFKA_BROKERS environment variable with a default of "localhost:9092" for consumerskafka.bootstrap_servers (default: "localhost:9092")kafka.message_timeout_ms (default: 5000)This project is licensed under either: