| Crates.io | tiny_kafka |
| lib.rs | tiny_kafka |
| version | 1.0.6 |
| created_at | 2023-10-21 20:32:13.018545+00 |
| updated_at | 2024-11-24 20:54:39.119337+00 |
| description | A tiny Kafka client library with producer and consumer functionalities. |
| homepage | https://github.com/cploutarchou/tiny_kafka |
| repository | https://github.com/cploutarchou/tiny_kafka |
| max_upload_size | |
| id | 1010225 |
| size | 67,276 |
A lightweight, async Rust implementation of a Kafka producer and consumer. This library provides a simple, reliable interface for interacting with Apache Kafka, with built-in timeout handling and connection retries.
Add this to your Cargo.toml:
[dependencies]
tiny_kafka = "1.0.6"
tokio = { version = "1.0", features = ["full"] }
use tiny_kafka::consumer::KafkaConsumer;
use tokio;
#[tokio::main]
async fn main() -> std::io::Result<()> {
// Create and configure consumer
let mut consumer = KafkaConsumer::new(
"127.0.0.1:9092".to_string(),
"my-group".to_string(),
"my-topic".to_string(),
).await?;
// Connect with automatic retries
consumer.connect().await?;
// Consume messages
loop {
match consumer.consume().await {
Ok(messages) => {
for msg in messages {
println!("Received message: {:?}", msg);
}
// Commit offset after processing
consumer.commit().await?;
}
Err(e) => {
eprintln!("Error consuming messages: {}", e);
break;
}
}
}
// Clean up
consumer.close().await?;
Ok(())
}
use tiny_kafka::producer::KafkaProducer;
#[tokio::main]
async fn main() -> std::io::Result<()> {
// Create producer
let producer = KafkaProducer::new(
"127.0.0.1:9092".to_string(),
None, // Optional configurations
);
// Send a message
producer.send_message(
"my-topic",
Message::new("key", "value"),
).await?;
Ok(())
}
const RESPONSE_TIMEOUT: Duration = Duration::from_secs(5); // Operation timeout
const CONNECTION_TIMEOUT: Duration = Duration::from_secs(10); // Initial connection timeout
The producer supports various configuration options including:
The library provides detailed error types for different scenarios:
ConnectionError: Failed to establish connectionTimeoutError: Operation exceeded configured timeoutProtocolError: Kafka protocol-related errorsSerializationError: Message serialization failuresBytesMutRun the test suite:
cargo test
For integration tests with a running Kafka instance:
cargo test --features integration-tests
Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.
docker-compose up -d to start Kafkacargo testThis project is licensed under the MIT License - see the LICENSE file for details.