Crates.io | zirv-kafka |
lib.rs | zirv-kafka |
version | |
source | src |
created_at | 2025-04-11 18:50:10.54466+00 |
updated_at | 2025-04-13 10:55:58.860337+00 |
description | A convinient wrapper for rdkafka. |
homepage | |
repository | https://github.com/Glubiz/zirv-kafka |
max_upload_size | |
id | 1630130 |
Cargo.toml error: | TOML parse error at line 17, column 1 | 17 | autolib = false | ^^^^^^^ unknown field `autolib`, expected one of `name`, `version`, `edition`, `authors`, `description`, `readme`, `license`, `repository`, `homepage`, `documentation`, `build`, `resolver`, `links`, `default-run`, `default_dash_run`, `rust-version`, `rust_dash_version`, `rust_version`, `license-file`, `license_dash_file`, `license_file`, `licenseFile`, `license_capital_file`, `forced-target`, `forced_dash_target`, `autobins`, `autotests`, `autoexamples`, `autobenches`, `publish`, `metadata`, `keywords`, `categories`, `exclude`, `include` |
size | 0 |
A convenient wrapper around the rdkafka crate that simplifies working with Apache Kafka in Rust applications.
Add zirv-kafka
to your Cargo.toml
:
[dependencies]
zirv-kafka = "0.2.0"
Initialize the Kafka producer early in your application:
use zirv_kafka::init_kafka_producer;
#[tokio::main]
async fn main() {
init_kafka_producer!().await;
// Your application logic...
}
Send messages to Kafka topics using the produce_message!
macro:
use zirv_kafka::produce_message;
async fn notify_user_updated(user_id: &str, data: &str) {
produce_message!("user-updated", user_id, data).await;
}
For more direct control, you can access the producer directly:
use zirv_kafka::get_kafka_producer;
use rdkafka::producer::FutureRecord;
async fn send_custom_message() {
let producer = get_kafka_producer!();
let record = FutureRecord::to("my-topic")
.payload("message content")
.key("message-key");
let result = producer.send(record, std::time::Duration::from_secs(1)).await;
// Handle the result...
}
Use the start_base_consumer!
macro to initialize a consumer and process messages:
use zirv_kafka::start_base_consumer;
fn main() {
// Start a consumer that processes messages from the "user-events" topic
let handle = start_base_consumer!("user-service", &["user-events"], |msg_result| {
match msg_result {
Ok(msg) => {
if let Some(payload) = msg.payload_view::<str>() {
if let Ok(content) = payload {
println!("Received message: {}", content);
// Process the message...
}
}
},
Err(e) => eprintln!("Error while consuming message: {:?}", e),
}
});
// The consumer runs in the background
// When you're done with the consumer (optional)
// handle.join().unwrap();
}
For more control over the consumer, you can use the lower-level functions:
use rdkafka::message::BorrowedMessage;
use std::sync::Arc;
use zirv_kafka::consumer::{init_base_consumer, start_consumer_thread};
fn main() {
let consumer = init_base_consumer("my-app", &["events-topic"])
.expect("Failed to create consumer");
let arc_consumer = Arc::new(consumer);
let handle = start_consumer_thread(arc_consumer, |msg| {
// Custom message handling logic
});
// Later, when shutting down:
// handle.join().unwrap();
}
By default, the library uses the following configuration:
KAFKA_BROKERS
environment variable with a default of "localhost:9092" for consumerskafka.bootstrap_servers
(default: "localhost:9092")kafka.message_timeout_ms
(default: 5000)This project is licensed under either: