| Crates.io | fluxmq |
| lib.rs | fluxmq |
| version | 0.1.0 |
| created_at | 2025-09-16 13:10:39.638515+00 |
| updated_at | 2025-09-16 13:10:39.638515+00 |
| description | High-performance message broker and streaming platform inspired by Apache Kafka |
| homepage | https://github.com/gosuda/fluxmq |
| repository | https://github.com/gosuda/fluxmq |
| max_upload_size | |
| id | 1841640 |
| size | 1,340,880 |
A high-performance, Kafka-compatible message broker written in Rust with 100% Java client compatibility and 608k+ msg/sec throughput.
bytes::Bytes for maximum efficiencyorg.apache.kafka:kafka-clients v4.1+ (100% compatible)kafka-python library supportgit clone https://github.com/gosuda/fluxmq.git
cd fluxmq
cargo build --release
cargo run -- --host 0.0.0.0 --port 9092
# For core development
cd core
cargo run --release -- --port 9092 --enable-consumer-groups --log-level info
# Or with full features
RUSTFLAGS="-C target-cpu=native" cargo run --release -- \
--port 9092 \
--enable-consumer-groups \
--data-dir ./data
# Terminal 1: Broker 1
cargo run -- --port 9092 --broker-id 1 --enable-replication --data-dir ./broker1
# Terminal 2: Broker 2
cargo run -- --port 9093 --broker-id 2 --enable-replication --data-dir ./broker2
# Terminal 3: Broker 3
cargo run -- --port 9094 --broker-id 3 --enable-replication --data-dir ./broker3
// Producer Example
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class FluxMQProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
// High performance settings (MegaBatch configuration)
props.put("batch.size", "1048576"); // 1MB batch
props.put("linger.ms", "15");
props.put("compression.type", "lz4");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
try {
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "key", "Hello FluxMQ!");
producer.send(record).get();
System.out.println("Message sent successfully!");
} finally {
producer.close();
}
}
}
from kafka import KafkaProducer, KafkaConsumer
# Producer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: v.encode('utf-8')
)
producer.send('my-topic', 'Hello FluxMQ!')
producer.flush()
# Consumer
consumer = KafkaConsumer(
'my-topic',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: m.decode('utf-8')
)
for message in consumer:
print(f"Received: {message.value}")
break
use fluxmq_client::*;
#[tokio::main]
async fn main() -> Result<()> {
let producer = ProducerBuilder::new()
.brokers(vec!["localhost:9092"])
.build()
.await?;
let record = ProduceRecord::builder()
.topic("my-topic")
.key("user-123")
.value("Hello FluxMQ!")
.build();
let metadata = producer.send(record).await?;
println!("Message sent to partition {} at offset {}",
metadata.partition, metadata.offset);
Ok(())
}
use fluxmq_client::*;
use futures::StreamExt;
#[tokio::main]
async fn main() -> Result<()> {
let consumer = ConsumerBuilder::new()
.brokers(vec!["localhost:9092"])
.group_id("my-consumer-group")
.topics(vec!["my-topic"])
.build()
.await?;
let mut stream = consumer.stream();
while let Some(record) = stream.next().await {
match record {
Ok(record) => {
println!("Received: key={:?}, value={}",
record.key, String::from_utf8_lossy(&record.value));
consumer.commit_sync().await?;
}
Err(e) => eprintln!("Error receiving message: {}", e),
}
}
Ok(())
}
# Terminal 1: Start FluxMQ broker
cd core
RUSTFLAGS="-C target-cpu=native" cargo run --release -- --port 9092 --enable-consumer-groups
# Terminal 2: Run Java benchmark (601k+ msg/sec)
cd fluxmq-java-tests
mvn exec:java -Dexec.mainClass="com.fluxmq.tests.MegaBatchBenchmark"
# Terminal 3: Run simple Java test
mvn exec:java -Dexec.mainClass="com.fluxmq.tests.MinimalProducerTest"
# Or try Rust examples
cd fluxmq-client
cargo run --example simple_producer
cargo run --example simple_consumer
USAGE:
fluxmq [OPTIONS]
OPTIONS:
--host <HOST> Bind address [default: 0.0.0.0]
-p, --port <PORT> Port to listen on [default: 9092]
-l, --log-level <LOG_LEVEL> Log level [default: info]
--broker-id <BROKER_ID> Unique broker identifier [default: 0]
--enable-replication Enable replication features
--enable-consumer-groups Enable consumer group coordination
--recovery-mode Load existing data from disk on startup
--data-dir <DATA_DIR> Data storage directory [default: ./data]
RUST_LOG=debug # Enable debug logging
FLUXMQ_DATA_DIR=/var/lib/fluxmq # Override data directory
cargo test
cargo test storage # Storage layer tests
cargo test consumer # Consumer group tests
cargo test replication # Replication tests
cargo test protocol # Protocol tests
cargo test --release -- --ignored benchmark
src/
โโโ main.rs # Application entry point
โโโ lib.rs # Library root
โโโ broker/ # Broker implementation
โ โโโ handler.rs # Request handlers
โ โโโ server.rs # TCP server
โโโ storage/ # Storage layer
โ โโโ log.rs # Append-only log files
โ โโโ segment.rs # Log segment management
โ โโโ index.rs # Offset indexing
โโโ protocol/ # Network protocol
โ โโโ messages.rs # Protocol messages
โ โโโ codec.rs # Server-side codec
โ โโโ client_codec.rs # Client-side codec
โโโ replication/ # Replication system
โ โโโ leader.rs # Leader state management
โ โโโ follower.rs # Follower synchronization
โโโ consumer/ # Consumer groups
โ โโโ coordinator.rs # Group coordinator
โโโ topic_manager.rs # Topic management
# Install Rust
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
# Install development dependencies
cargo install cargo-audit cargo-clippy
# Format code
cargo fmt
# Check for issues
cargo clippy
# Security audit
cargo audit
# Watch for changes
cargo watch -x check -x test
We welcome contributions! Please see CONTRIBUTING.md for guidelines.
This project is licensed under the MIT License - see the LICENSE file for details.
FluxMQ - High-performance message streaming, built with Rust โก๏ธ