Crates.io | tiny_kafka |
lib.rs | tiny_kafka |
version | 1.0.6 |
source | src |
created_at | 2023-10-21 20:32:13.018545 |
updated_at | 2024-11-24 20:54:39.119337 |
description | A tiny Kafka client library with producer and consumer functionalities. |
homepage | https://github.com/cploutarchou/tiny_kafka |
repository | https://github.com/cploutarchou/tiny_kafka |
max_upload_size | |
id | 1010225 |
size | 67,276 |
A lightweight, async Rust implementation of a Kafka producer and consumer. This library provides a simple, reliable interface for interacting with Apache Kafka, with built-in timeout handling and connection retries.
Add this to your Cargo.toml
:
[dependencies]
tiny_kafka = "1.0.6"
tokio = { version = "1.0", features = ["full"] }
use tiny_kafka::consumer::KafkaConsumer;
use tokio;
#[tokio::main]
async fn main() -> std::io::Result<()> {
// Create and configure consumer
let mut consumer = KafkaConsumer::new(
"127.0.0.1:9092".to_string(),
"my-group".to_string(),
"my-topic".to_string(),
).await?;
// Connect with automatic retries
consumer.connect().await?;
// Consume messages
loop {
match consumer.consume().await {
Ok(messages) => {
for msg in messages {
println!("Received message: {:?}", msg);
}
// Commit offset after processing
consumer.commit().await?;
}
Err(e) => {
eprintln!("Error consuming messages: {}", e);
break;
}
}
}
// Clean up
consumer.close().await?;
Ok(())
}
use tiny_kafka::producer::KafkaProducer;
#[tokio::main]
async fn main() -> std::io::Result<()> {
// Create producer
let producer = KafkaProducer::new(
"127.0.0.1:9092".to_string(),
None, // Optional configurations
);
// Send a message
producer.send_message(
"my-topic",
Message::new("key", "value"),
).await?;
Ok(())
}
const RESPONSE_TIMEOUT: Duration = Duration::from_secs(5); // Operation timeout
const CONNECTION_TIMEOUT: Duration = Duration::from_secs(10); // Initial connection timeout
The producer supports various configuration options including:
The library provides detailed error types for different scenarios:
ConnectionError
: Failed to establish connectionTimeoutError
: Operation exceeded configured timeoutProtocolError
: Kafka protocol-related errorsSerializationError
: Message serialization failuresBytesMut
Run the test suite:
cargo test
For integration tests with a running Kafka instance:
cargo test --features integration-tests
Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.
docker-compose up -d
to start Kafkacargo test
This project is licensed under the MIT License - see the LICENSE file for details.