kawadb-broker

Crates.iokawadb-broker
lib.rskawadb-broker
version0.1.0
created_at2025-07-10 08:05:15.33929+00
updated_at2025-07-10 08:05:15.33929+00
descriptionHigh-performance Kafka-compatible message broker
homepagehttps://github.com/jun784/kawadb.com
repositoryhttps://github.com/jun784/kawadb.com
max_upload_size
id1746018
size212,601
Jun Kawasaki (jun784)

documentation

README

๐ŸŽฏ Kawa Broker - Kafka-Compatible High-Performance Message Broker

kawa-broker is a high-performance message broker with full Apache Kafka compatibility. It achieves excellent performance through Rust's asynchronous processing and zero-copy optimization.

๐ŸŽฏ Key Features

  • ๐Ÿ“ก Full Kafka Protocol Support: Existing Kafka clients can be used as-is
  • ๐Ÿš€ High-Performance Processing: Asynchronous I/O & concurrent session management
  • ๐Ÿ”ง Producer/Consumer API: Fully implemented
  • โš™๏ธ Management Features: Topic, offset, and statistics management
  • ๐ŸŒ TCP/TLS Support: Secure communication (future implementation)
  • ๐Ÿ—„๏ธ DuckDB-style Database: Local + WASM-compatible SQL processing engine
  • ๐Ÿ“Š Real-time Analytics: SQL transformation & aggregation of streaming data
  • ๐ŸŒ WASM Support: Database operations in the browser

๐Ÿ—๏ธ Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                   Kawa Ecosystem                           โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  ๐ŸŒ Network Layer (TCP Server)                             โ”‚
โ”‚   โ”œโ”€โ”€ TcpListener (asynchronous)                           โ”‚
โ”‚   โ”œโ”€โ”€ Session management                                   โ”‚
โ”‚   โ”œโ”€โ”€ Connection pool                                      โ”‚
โ”‚   โ””โ”€โ”€ Dynamic port allocation                              โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  ๐Ÿ“ก Kafka Protocol Layer                                   โ”‚
โ”‚   โ”œโ”€โ”€ Binary protocol parsing                              โ”‚
โ”‚   โ”œโ”€โ”€ Produce API (0)                                      โ”‚
โ”‚   โ”œโ”€โ”€ Fetch API (1)                                        โ”‚
โ”‚   โ”œโ”€โ”€ Metadata API (3)                                     โ”‚
โ”‚   โ””โ”€โ”€ ApiVersions API (18)                                 โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  ๐Ÿ“จ Messaging Services                                      โ”‚
โ”‚   โ”œโ”€โ”€ ProducerService                                      โ”‚
โ”‚   โ”œโ”€โ”€ ConsumerService                                      โ”‚
โ”‚   โ”œโ”€โ”€ TopicManager                                         โ”‚
โ”‚   โ””โ”€โ”€ OffsetManager                                        โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  ๐Ÿ—„๏ธ Database Layer (kawa-db)                               โ”‚
โ”‚   โ”œโ”€โ”€ SQL Query Engine (DataFusion)                       โ”‚
โ”‚   โ”œโ”€โ”€ Table Management                                     โ”‚
โ”‚   โ”œโ”€โ”€ Schema Definition                                    โ”‚
โ”‚   โ”œโ”€โ”€ Arrow Integration                                    โ”‚
โ”‚   โ””โ”€โ”€ Stream Processing                                    โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  ๐ŸŒ WASM Interface (kawa-wasm)                              โ”‚
โ”‚   โ”œโ”€โ”€ JavaScript Bindings                                  โ”‚
โ”‚   โ”œโ”€โ”€ Browser Storage                                      โ”‚
โ”‚   โ”œโ”€โ”€ Web Workers Support                                  โ”‚
โ”‚   โ””โ”€โ”€ Local Database Access                                โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  ๐Ÿ’พ Storage Integration (kawa-storage)                     โ”‚
โ”‚   โ”œโ”€โ”€ Write-Ahead Log (WAL)                                โ”‚
โ”‚   โ”œโ”€โ”€ Segment Management                                   โ”‚
โ”‚   โ”œโ”€โ”€ Event Sourcing                                       โ”‚
โ”‚   โ””โ”€โ”€ Parquet Integration                                  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

๐Ÿš€ Quick Start

Basic Server Startup

use kawa_broker::{MessageBroker, BrokerConfig};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Broker configuration
    let config = BrokerConfig {
        network: NetworkConfig {
            bind_host: "0.0.0.0".to_string(),
            bind_port: 9092,
        },
        storage: StorageConfig::default(),
        cluster: ClusterConfig::default(),
    };
    
    // Start broker
    let mut broker = MessageBroker::new(config).await?;
    let addr = broker.start().await?;
    
    println!("Kawa broker started on {}", addr);
    
    // Stop with Ctrl+C
    tokio::signal::ctrl_c().await?;
    broker.stop().await?;
    
    Ok(())
}

Producer API

use kawa_broker::{MessageBroker, SessionId};

// Send message
let session_id = SessionId::new();
let topic = "user-events";
let partition = 0;
let message = b"Hello, Kawa!";

let offset = broker.produce_message(topic, partition, message, session_id).await?;
println!("Message sent at offset: {}", offset);

// Batch send
let messages = vec![
    b"Message 1".to_vec(),
    b"Message 2".to_vec(), 
    b"Message 3".to_vec(),
];

let offsets = broker.produce_batch(topic, partition, messages, session_id).await?;
println!("Batch sent at offsets: {:?}", offsets);

Consumer API

// Receive messages
let messages = broker.fetch_messages(
    topic,
    partition,
    0,            // start offset
    10,           // max messages
    session_id
).await?;

for message in messages {
    println!("Offset: {}, Data: {:?}", message.offset, message.message);
}

// Consumer group
let group_id = "my-consumer-group";
let group_messages = broker.fetch_group_messages(
    topic,
    partition, 
    group_id,
    10,
    session_id
).await?;

๐Ÿ“ก Kafka Protocol Support

Supported APIs

API Code Version Status
Produce 0 0-7 โœ… Fully supported
Fetch 1 0-11 โœ… Fully supported
Metadata 3 0-9 โœ… Fully supported
ApiVersions 18 0-3 โœ… Fully supported

Future Support Planned

API Code Planned Version
CreateTopics 19 v0.2.0
DeleteTopics 20 v0.2.0
JoinGroup 11 v0.3.0
SyncGroup 14 v0.3.0

Binary Protocol Example

use kawa_broker::protocol::{ProtocolReader, ProtocolWriter};

// Request parsing
let mut reader = ProtocolReader::new(&request_data);
let api_key = reader.read_i16()?;
let api_version = reader.read_i16()?;
let correlation_id = reader.read_u32()?;

// Response creation
let mut writer = ProtocolWriter::new();
writer.write_u32(correlation_id);
writer.write_i16(0); // error code = success
let response_data = writer.into_bytes();

๐Ÿ—„๏ธ DuckDB-style Database Features

Local Database Usage

use kawa_db::{DatabaseEngine, DbConfig, TableSchema, ColumnDefinition, DataType};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Initialize database engine
    let config = DbConfig::new("./kawa-data".into());
    let db = DatabaseEngine::new(config).await?;
    
    // Create table
    let schema = TableSchema::new(
        "users".to_string(),
        vec![
            ColumnDefinition::new("id".to_string(), DataType::Int64, false),
            ColumnDefinition::new("name".to_string(), DataType::String, false),
            ColumnDefinition::new("email".to_string(), DataType::String, true),
            ColumnDefinition::new("created_at".to_string(), DataType::Timestamp, false),
        ],
    );
    db.create_table(schema).await?;
    
    // Insert data
    let data = vec![
        serde_json::json!({
            "id": 1,
            "name": "Alice",
            "email": "alice@example.com",
            "created_at": "2024-01-01T00:00:00Z"
        }),
        serde_json::json!({
            "id": 2,
            "name": "Bob",
            "email": "bob@example.com", 
            "created_at": "2024-01-02T00:00:00Z"
        }),
    ];
    db.insert_data("users", data).await?;
    
    // Execute SQL query
    let result = db.execute_sql("SELECT * FROM users WHERE id > 1").await?;
    println!("Query result: {:?}", result);
    
    Ok(())
}

WASM Environment Usage

import init, { WasmKawaDb, WasmTableConfig } from './pkg/kawa_wasm.js';

async function main() {
    // Initialize WASM module
    await init();
    
    // Create database
    const db = new WasmKawaDb();
    
    // Create table configuration
    const tableConfig = new WasmTableConfig("products");
    tableConfig.add_column("id", "int64", false);
    tableConfig.add_column("name", "string", false);
    tableConfig.add_column("price", "float64", false);
    tableConfig.add_column("category", "string", true);
    
    // Create table
    await db.create_table(tableConfig);
    
    // Insert data
    const data = JSON.stringify([
        { id: 1, name: "Laptop", price: 999.99, category: "Electronics" },
        { id: 2, name: "Book", price: 19.99, category: "Education" },
        { id: 3, name: "Coffee", price: 4.99, category: "Food" },
    ]);
    
    const insertCount = await db.insert_data("products", data);
    console.log(`Inserted ${insertCount} rows`);
    
    // Execute SQL query
    const result = await db.execute_sql("SELECT * FROM products WHERE price > 20");
    console.log("Query result:", JSON.parse(result.data));
    
    // List tables
    const tables = await db.list_tables();
    console.log("Tables:", JSON.parse(tables));
}

main().catch(console.error);

Streaming Analytics

// Real-time stream processing
db.process_stream(
    "user-events",                    // input topic
    "SELECT user_id, COUNT(*) as event_count, AVG(value) as avg_value 
     FROM stream 
     GROUP BY user_id 
     WINDOW TUMBLING (INTERVAL 1 MINUTE)",  // SQL transformation
    "user-analytics"                  // output table
).await?;

// Query analytics results
let analytics = db.execute_sql("
    SELECT user_id, event_count, avg_value 
    FROM user_analytics 
    WHERE event_count > 100
    ORDER BY avg_value DESC
    LIMIT 10
").await?;

๐Ÿ› ๏ธ Management Features

Topic Management

// Create topic
broker.create_topic("my-topic", 3, 1).await?; // 3 partitions, replication 1

// List topics
let topics = broker.list_topics().await?;
for topic in topics {
    println!("Topic: {}", topic);
}

// Delete topic
broker.delete_topic("my-topic").await?;

Offset Management

// Commit offset
broker.commit_offset("my-group", "topic", 0, 100).await?;

// Get latest offset
let latest = broker.get_latest_offset("topic", 0).await?;
println!("Latest offset: {:?}", latest);

// Get group offset
let group_offset = broker.get_group_offset("my-group", "topic", 0).await?;

Statistics & Monitoring

// Broker statistics
let stats = broker.get_stats().await?;
println!("Active sessions: {}", stats.active_sessions);
println!("Total topics: {}", stats.total_topics);
println!("Uptime: {}s", stats.uptime_seconds);

// Topic statistics
let topic_stats = broker.get_topic_stats("my-topic").await?;
println!("Messages: {}", topic_stats.total_messages);

โš™๏ธ Configuration

BrokerConfig

use kawa_broker::{BrokerConfig, NetworkConfig, StorageConfig, ClusterConfig};

let config = BrokerConfig {
    network: NetworkConfig {
        bind_host: "0.0.0.0".to_string(),
        bind_port: 9092,
        enable_tls: false,                    // future implementation
        max_connections: 1000,
        connection_timeout_ms: 30000,
    },
    storage: StorageConfig {
        data_dir: "./data".into(),
        segment_size: 1024 * 1024 * 1024,    // 1GB
        sync_interval_ms: 1000,
        enable_compression: false,
    },
    cluster: ClusterConfig {
        node_id: 0,
        enable_replication: false,           // future implementation
        replication_factor: 1,
    },
};

Configuration File (TOML)

[network]
bind_host = "0.0.0.0"
bind_port = 9092
max_connections = 1000

[storage]
data_dir = "./kawa-data"
segment_size = 1073741824  # 1GB
sync_interval_ms = 1000

[cluster]
node_id = 0
enable_replication = false

๐Ÿงช Testing

Run Integration Tests

# Run all tests
cargo test

# Producer/Consumer tests
cargo test --test producer_consumer_integration_test

# Performance tests
cargo test test_basic_performance --release -- --nocapture

Test Results

โœ… test_basic_producer_functionality
โœ… test_basic_consumer_functionality  
โœ… test_multi_partition_messaging
โœ… test_consumer_group_functionality
โœ… test_offset_management
โœ… test_error_handling
โœ… test_basic_performance
โœ… test_topic_management
โœ… test_broker_statistics

Total: 9/9 tests passed

๐Ÿš€ Performance Optimization

Asynchronous Processing

// Concurrent session processing
tokio::spawn(async move {
    handle_client_connection(stream, session_id, storage, protocol_handler).await
});

// Concurrent message processing
let futures: Vec<_> = messages.into_iter()
    .map(|msg| broker.produce_message(topic, partition, msg, session_id))
    .collect();
let results = futures::future::join_all(futures).await;

Memory Optimization

// Zero-copy operations
let response = protocol_handler.handle_request(&request_data, &storage, session_id).await?;

// Buffer reuse (future implementation)
let buffer_pool = BufferPool::new(1024, 4096, 8192);
let buffer = buffer_pool.get_buffer(size).await;

๐Ÿ”ง Error Handling

use kawa_broker::{BrokerError, BrokerResult};

match broker.produce_message(topic, partition, message, session_id).await {
    Ok(offset) => println!("Success: {}", offset),
    Err(BrokerError::TopicNotFound { topic }) => {
        eprintln!("Topic '{}' not found", topic);
    }
    Err(BrokerError::InvalidPartition { partition, max }) => {
        eprintln!("Invalid partition {} (max: {})", partition, max);
    }
    Err(BrokerError::Storage { source }) => {
        eprintln!("Storage error: {}", source);
    }
    Err(e) => eprintln!("Other error: {}", e),
}

๐Ÿšง Future Features

v0.2.0 - Enhanced Management Features

  • CreateTopics/DeleteTopics API
  • Management REST API
  • Metrics Export
  • Configuration Hot Reload
  • DuckDB-style Local Database
  • WASM Support and Browser Integration

v0.3.0 - Enterprise Features

  • Full Consumer Group Support
  • TLS/SASL Authentication
  • Replication
  • Automatic Failover
  • Index Functionality
  • Query Optimization Engine

v0.4.0 - Advanced Features

  • Transactions
  • Exactly-Once Delivery
  • Schema Registry Integration
  • Kubernetes Operator
  • Machine Learning Integration
  • Geospatial Data Support

v0.5.0 - AI/ML Integration

Commit count: 0

cargo fmt