tiny_kafka

Crates.iotiny_kafka
lib.rstiny_kafka
version1.0.6
sourcesrc
created_at2023-10-21 20:32:13.018545
updated_at2024-11-24 20:54:39.119337
descriptionA tiny Kafka client library with producer and consumer functionalities.
homepagehttps://github.com/cploutarchou/tiny_kafka
repositoryhttps://github.com/cploutarchou/tiny_kafka
max_upload_size
id1010225
size67,276
Christos Ploutarchou (cploutarchou)

documentation

https://docs.rs/tiny_kafka

README

Tiny Kafka - Lightweight Kafka Client in Rust

Crates.io Documentation License: MIT

A lightweight, async Rust implementation of a Kafka producer and consumer. This library provides a simple, reliable interface for interacting with Apache Kafka, with built-in timeout handling and connection retries.

Features

  • Async/Await Support: Built on tokio for high-performance asynchronous operations
  • Timeout Handling: Configurable timeouts for all operations with sensible defaults
  • Connection Retries: Automatic retry logic for failed connections with exponential backoff
  • Error Handling: Comprehensive error handling with detailed error types
  • Simple API: Easy-to-use interface for both producer and consumer
  • Zero-Copy: Efficient message handling with minimal memory overhead
  • Type Safety: Strong Rust type system ensuring runtime safety
  • Logging: Integrated tracing support for debugging and monitoring

Prerequisites

  • Rust: Rust 1.70.0 or higher
  • Kafka: A running Kafka broker (default: localhost:9092)

Installation

Add this to your Cargo.toml:

[dependencies]
tiny_kafka = "1.0.6"
tokio = { version = "1.0", features = ["full"] }

Quick Start

Consumer Example

use tiny_kafka::consumer::KafkaConsumer;
use tokio;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // Create and configure consumer
    let mut consumer = KafkaConsumer::new(
        "127.0.0.1:9092".to_string(),
        "my-group".to_string(),
        "my-topic".to_string(),
    ).await?;
    
    // Connect with automatic retries
    consumer.connect().await?;
    
    // Consume messages
    loop {
        match consumer.consume().await {
            Ok(messages) => {
                for msg in messages {
                    println!("Received message: {:?}", msg);
                }
                // Commit offset after processing
                consumer.commit().await?;
            }
            Err(e) => {
                eprintln!("Error consuming messages: {}", e);
                break;
            }
        }
    }
    
    // Clean up
    consumer.close().await?;
    Ok(())
}

Producer Example

use tiny_kafka::producer::KafkaProducer;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    // Create producer
    let producer = KafkaProducer::new(
        "127.0.0.1:9092".to_string(),
        None, // Optional configurations
    );
    
    // Send a message
    producer.send_message(
        "my-topic",
        Message::new("key", "value"),
    ).await?;
    
    Ok(())
}

Configuration

Consumer Configuration

const RESPONSE_TIMEOUT: Duration = Duration::from_secs(5);    // Operation timeout
const CONNECTION_TIMEOUT: Duration = Duration::from_secs(10);  // Initial connection timeout

Producer Configuration

The producer supports various configuration options including:

  • Batch size
  • Compression
  • Acknowledgment level
  • Retry settings

Error Handling

The library provides detailed error types for different scenarios:

  • ConnectionError: Failed to establish connection
  • TimeoutError: Operation exceeded configured timeout
  • ProtocolError: Kafka protocol-related errors
  • SerializationError: Message serialization failures

Performance Considerations

  • Uses zero-copy operations where possible
  • Efficient buffer management with BytesMut
  • Configurable batch sizes for optimal throughput
  • Connection pooling for better resource utilization

Testing

Run the test suite:

cargo test

For integration tests with a running Kafka instance:

cargo test --features integration-tests

Contributing

Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.

Development Setup

  1. Clone the repository
  2. Install Rust (1.70.0 or higher)
  3. Install Docker and Docker Compose (for running integration tests)
  4. Run docker-compose up -d to start Kafka
  5. Run tests with cargo test

Guidelines

  • Write clear commit messages
  • Add tests for new features
  • Update documentation as needed
  • Follow Rust best practices and idioms

License

This project is licensed under the MIT License - see the LICENSE file for details.

Acknowledgments

  • Built with tokio for async runtime
  • Uses bytes for efficient buffer management
  • Logging provided by tracing
Commit count: 11

cargo fmt