| Crates.io | tokio-memq |
| lib.rs | tokio-memq |
| version | 1.0.0 |
| created_at | 2025-12-13 04:56:55.861267+00 |
| updated_at | 2025-12-18 13:57:44.495327+00 |
| description | Simple, high-performance in-memory async message queue |
| homepage | |
| repository | https://github.com/weiwangfds/tokio-memq |
| max_upload_size | |
| id | 1982573 |
| size | 219,073 |
High-performance, feature-rich in-memory async message queue powered by Tokio. Designed for high-throughput local messaging with advanced features like backpressure, TTL, consumer groups, and pluggable serialization.
Stream trait for idiomatic async consumption.publish_batch and recv_batch.Earliest, Latest, and Offset seeking.recv_filter.Add to your Cargo.toml:
[dependencies]
tokio-memq = "1.0"
The simplest way to use the queue with default settings.
use tokio_memq::mq::MessageQueue;
use tokio_memq::MessageSubscriber; // Import trait for recv()
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let mq = MessageQueue::new();
let topic = "demo_topic";
// Publisher
let pub1 = mq.publisher(topic.to_string());
pub1.publish(&"Hello World").await?;
// Subscriber
let sub = mq.subscriber(topic.to_string()).await?;
let msg = sub.recv().await?;
let payload: String = msg.deserialize()?;
println!("Received: {}", payload);
Ok(())
}
Consume messages as an async stream, ideal for continuous processing loops.
use tokio_stream::StreamExt;
use tokio::pin;
// Create a stream from the subscriber
let stream = sub.stream();
pin!(stream);
while let Some(msg_res) = stream.next().await {
match msg_res {
Ok(msg) => println!("Received: {:?}", msg),
Err(e) => eprintln!("Error: {}", e),
}
}
Improve throughput by processing messages in batches.
// Batch Publish
let messages = vec![1, 2, 3, 4, 5];
publisher.publish_batch(messages).await?;
// Batch Receive
// Returns a vector of up to 10 messages
let batch = sub.recv_batch(10).await?;
for msg in batch {
println!("Batch msg: {:?}", msg);
}
use std::time::Duration;
// Receive with Timeout
match sub.recv_timeout(Duration::from_millis(500)).await? {
Some(msg) => println!("Got msg: {:?}", msg),
None => println!("Timed out"),
}
// Receive with Filter (Server-side filtering)
// Only receive messages where payload size > 100 bytes
let large_msg = sub.recv_filter(|msg| msg.payload.len() > 100).await?;
// Metadata-only Mode (Avoids full payload clone/deserialization)
let msg = sub.recv().await?;
let meta = msg.metadata();
println!("Offset: {}, Timestamp: {:?}", meta.offset, meta.created_at);
Manage offsets manually or use consumer groups for persistent state.
use tokio_memq::mq::{ConsumptionMode, TopicOptions};
// Configure topic with retention limits
let options = TopicOptions {
max_messages: Some(1000),
message_ttl: None, // Some(Duration::from_secs(3600))
lru_enabled: true,
..Default::default()
};
// Subscribe as part of a Consumer Group
// Modes: Earliest, Latest, Offset(n), LastOffset
let sub_group = mq.subscriber_group_with_options(
"topic_name".to_string(),
options,
"group_id_1".to_string(),
ConsumptionMode::LastOffset
).await?;
// Manual Commit
let msg = sub_group.recv().await?;
// Process message...
sub_group.commit(msg.offset); // Save progress
Flexible serialization with per-topic and per-publisher overrides.
Global/Topic Defaults:
use tokio_memq::mq::{
SerializationFactory, SerializationFormat, SerializationConfig,
JsonConfig, PipelineConfig, CompressionConfig
};
let topic = "compressed_logs";
// Configure compression pipeline
let pipeline = PipelineConfig {
compression: CompressionConfig::Gzip { level: Some(6) },
pre: None,
post: None,
use_magic_header: true, // Auto-detect format on receive
};
// Register defaults for a topic
SerializationFactory::register_topic_defaults(
topic,
SerializationFormat::Json,
SerializationConfig::Json(JsonConfig { pretty: false }),
Some(pipeline),
);
Per-Publisher Overrides (Independent Keys):
// Register specific settings for a publisher key
SerializationFactory::register_publisher_defaults(
"legacy_system",
SerializationFormat::MessagePack,
SerializationConfig::Default,
None
);
// Create a publisher using that key
let pub_legacy = mq.publisher_with_key(topic.to_string(), "legacy_system".to_string());
// Messages from this publisher will use MessagePack, regardless of topic defaults
pub_legacy.publish(&payload).await?;
use tokio_memq::mq::TopicOptions;
use std::time::Duration;
// Create Topic with Options (Pre-provisioning)
// Useful for setting TTL or max size before usage
let options = TopicOptions {
max_messages: Some(5000),
message_ttl: Some(Duration::from_secs(3600)),
lru_enabled: true,
..Default::default()
};
mq.create_topic("my_topic".to_string(), options).await?;
// Get Topic Statistics
if let Some(stats) = mq.get_topic_stats("my_topic".to_string()).await {
println!(
"Depth: {}, Subscribers: {}, Lag: {:?}",
stats.message_count,
stats.subscriber_count,
stats.consumer_lags
);
}
// Delete Topic
let deleted = mq.delete_topic("my_topic").await;
Async-MQ is designed for high throughput and low latency. On a standard developer machine (Apple Silicon), it achieves:
| Metric | Result | Note |
|---|---|---|
| Throughput | ~2,265,000 msgs/sec | Using batching (100 msgs/batch) |
| Latency | < 0.5 µs | Amortized per message |
| Bandwidth | ~6.5 GB/s | With 1MB payloads |
See PERFORMANCE_REPORT.md for detailed benchmarks.
To run the benchmark suite yourself:
cargo run --release --example perf_runner
flowchart LR
subgraph App
PUB[Publisher]
SUB[Subscriber]
end
MQ[MessageQueue]
TM[TopicManager]
TC[TopicChannel]
BUF[Buffer - VecDeque]
NO[Notify Sender - watch]
RX[Notify Receiver - watch]
OFF[consumer_offsets - RwLock]
NEXT[next_offset - AtomicUsize]
DROP[dropped_count - AtomicUsize]
TTL[TTL Cleaner - background task]
SER[SerializationFactory & Pipeline]
PUB -- publish --> MQ
MQ --> TM
TM -- get_or_create --> TC
TC -- add_to_buffer --> BUF
TC -- add_to_buffer_batch --> BUF
TC --> NO
NO --> RX
SUB -- recv or stream --> RX
SUB -- fetch_from_buffer --> BUF
SUB -- advance_offset --> OFF
TM -- subscribe --> SUB
TC --> OFF
TC --> NEXT
TC --> DROP
TTL --> TC
PUB -- serialize --> SER
SUB -- deserialize --> SER
sequenceDiagram
autonumber
participant App
participant PUB as Publisher
participant SER as Serialization
participant MQ as MessageQueue
participant TM as TopicManager
participant TC as TopicChannel
participant NOTIF as Notify
participant SUB as Subscriber
participant OFFS as ConsumerOffsets
App->>TM: create topic with options (optional)
TM->>TC: provision channel
App->>MQ: subscriber or subscriber group with options
MQ->>TM: subscribe
TM->>TC: add subscriber (mode, consumer id)
TC->>OFFS: init offset for group or local
TC-->>SUB: provide notify receiver
App->>PUB: publish data
PUB->>SER: serialize with defaults
SER-->>PUB: bytes and format
PUB->>MQ: publish message
MQ->>TM: get or create topic
TM->>TC: add to buffer
TC->>TC: ttl cleanup
TC->>TC: lru eviction
TC->>TC: assign offset
TC->>TC: push to buffer
TC-->>NOTIF: send next offset
SUB->>NOTIF: wait changed
SUB->>TC: fetch by target offset
TC-->>SUB: message
SUB->>SUB: advance next offset
SUB->>SER: deserialize
SER-->>SUB: data
App->>SUB: commit offset (optional)
SUB->>OFFS: persist next offset
App->>SUB: seek offset (optional)
SUB->>OFFS: set target offset
App->>PUB: publish batch (optional)
PUB->>MQ: publish batch
MQ->>TM: group by topic
TM->>TC: add batch to buffer
Publisher -> MessageQueue -> TopicManager -> TopicChannel -> Buffer -> Subscriberwatch::Sender/Receiver used for notification; max_messages and TTL enforce retention; LRU via front evictionnext_offset atomically allocates; consumer group offsets stored in RwLock<HashMap>SerializationFactory and PipelineConfigApache-2.0