| Crates.io | threadsafe_zmq |
| lib.rs | threadsafe_zmq |
| version | 2.0.0 |
| created_at | 2024-10-06 15:47:31.718327+00 |
| updated_at | 2025-12-20 11:05:32.914544+00 |
| description | Thread-safe ZeroMQ wrapper with sync and async support |
| homepage | |
| repository | https://github.com/Elvis339/threadsafe_zmq |
| max_upload_size | |
| id | 1399162 |
| size | 107,963 |
Thread-safe wrapper for ZeroMQ sockets.
ZeroMQ sockets are not thread-safe. From the ZMQ Guide:
"Do not use or close sockets except in the thread that created them."
If you have a multi-threaded server where workers need to send through a shared socket, you'll get SIGSEGV crashes. Common workarounds like mutexes or proxies have significant overhead.
This library wraps ZMQ sockets in a dedicated thread and exposes channel handles that are Send + Sync.
Thread-safe ZeroMQ wrapper for multi-threaded servers.
Trade-off: ~9x latency overhead per message, but enables parallel sending from multiple threads - which can result in higher total throughput than single-threaded raw ZMQ.
[dependencies]
threadsafe_zmq = "2.0"
# For async support
threadsafe_zmq = { version = "2.0", features = ["async"] }
Requires ZeroMQ on your system:
# macOS
brew install zeromq pkg-config
# Ubuntu/Debian
apt-get install libzmq3-dev pkg-config
Or use Nix: just install-nix && just shell
Server with worker threads:
use threadsafe_zmq::ChannelPairBuilder;
use crossbeam_channel::bounded;
use std::sync::Arc;
use std::thread;
let ctx = zmq::Context::new();
let socket = ctx.socket(zmq::ROUTER)?;
socket.bind("tcp://*:5555")?;
let channel = ChannelPairBuilder::new(&ctx, socket)
.with_bounded_queue(1000)
.build()?;
// Work queue for distributing messages to workers
let (work_tx, work_rx) = bounded(1000);
// Spawn workers - each can send responses through the shared channel
for _ in 0..4 {
let ch = Arc::clone(&channel);
let rx = work_rx.clone();
thread::spawn(move || {
while let Ok(msg) = rx.recv() {
// Process and respond - safe from any thread
ch.send(vec![b"response".to_vec()]).unwrap();
}
});
}
// Main loop receives and dispatches to workers
loop {
let msg = channel.recv()?;
work_tx.send(msg)?;
}
use threadsafe_zmq::AsyncChannelPair;
let channel = AsyncChannelPair::new(&ctx, socket)?;
channel.send(vec![b"hello".to_vec()]).await?;
let response = channel.recv().await?;
channel.shutdown().await;
See the example/ directory for a complete fibonacci server/client:
just example # sync version
just example-async # async version
just bench
Compares ChannelPair against raw ZMQ with mutex and proxy patterns.
Apache-2.0