| Crates.io | anng |
| lib.rs | anng |
| version | 0.1.3 |
| created_at | 2025-10-22 14:14:22.654805+00 |
| updated_at | 2025-11-19 14:20:59.327518+00 |
| description | Safe, async bindings to nanomsg next-generation (NNG) |
| homepage | |
| repository | https://github.com/nanomsg/nng-rs |
| max_upload_size | |
| id | 1895729 |
| size | 399,079 |
Safe, async Rust bindings for NNG (nanomsg next-generation).
From the NNG GitHub repository:
NNG, like its predecessors nanomsg (and to some extent ZeroMQ), is a lightweight, broker-less library, offering a simple API to solve common recurring messaging problems, such as publish/subscribe, RPC-style request/reply, or service discovery. The API frees the programmer from worrying about details like connection management, retries, and other common considerations, so that they can focus on the application instead of the plumbing.
All NNG protocols are supported: REQ/REP, PUB/SUB, PUSH/PULL, SURVEYOR/RESPONDENT, BUS, and PAIR.
Add to your Cargo.toml:
[dependencies]
anng = "0.1"
tokio = { version = "1", features = ["full"] }
use anng::{protocols::reqrep0, Message};
use std::io::Write;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Server
tokio::spawn(async {
let socket = reqrep0::Rep0::listen(c"tcp://127.0.0.1:8080").await?;
let mut ctx = socket.context();
let (request, responder) = ctx.receive().await?;
println!("Request: {:?}", request.as_slice());
let mut reply = Message::with_capacity(100);
write!(&mut reply, "Hello back!")?;
responder.reply(reply).await?;
Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
});
// Client
let socket = reqrep0::Req0::dial(c"tcp://127.0.0.1:8080").await?;
let mut ctx = socket.context();
let mut request = Message::with_capacity(100);
write!(&mut request, "Hello server!")?;
let reply = ctx.request(request).await?.await?;
println!("Reply: {:?}", reply.as_slice());
Ok(())
}
The library uses phantom types to enforce protocol correctness at compile time:
// ✅ This compiles - correct protocol usage
let req_socket = reqrep0::Req0::dial(url).await?;
let reply = req_socket.context().request(msg).await?;
// ❌ This won't compile - protocol violation
let req_socket = reqrep0::Req0::dial(url).await?;
req_socket.publish(msg).await?; // Error: method `publish` not found
Socket types like Socket<reqrep0::Req0> only expose methods valid for
that protocol, making it impossible to accidentally call publish() on
a request socket or reply() before receiving a request.
For concurrent operations, create contexts from sockets:
use std::sync::Arc;
let socket = Arc::new(reqrep0::Rep0::listen(url).await?);
// Spawn multiple workers for concurrent request handling
for worker_id in 0..4 {
let socket = Arc::clone(&socket);
tokio::spawn(async move {
let mut ctx = socket.context();
loop {
let (request, responder) = ctx.receive().await?;
// Handle request concurrently...
responder.reply(process_request(request)).await?;
}
});
}
Each context maintains independent protocol state, enabling safe concurrent operations on the same socket.
All async operations are cancellation safe - futures can be dropped at any time without corrupting the NNG state. Cancelling a send may drop the message if dropped before NNG effects the send. Cancelling a receive will not drop an already-received message (in the case of a race). The library automatically handles NNG cleanup and message recovery when operations are cancelled.
Licensed under either of
at your option.
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.