| Crates.io | nsq-async-rs |
| lib.rs | nsq-async-rs |
| version | 0.1.8 |
| created_at | 2025-04-28 02:04:53.867902+00 |
| updated_at | 2025-10-11 03:12:24.98874+00 |
| description | An asynchronous Rust NSQ client library with support for concurrent message processing |
| homepage | https://github.com/liudiandesilv/nsq-async-rs |
| repository | https://github.com/liudiandesilv/nsq-async-rs |
| max_upload_size | |
| id | 1651660 |
| size | 232,655 |
Other language versions: English, 简体中文
nsq-async-rs is a high-performance, reliable NSQ client library written in Rust. The project is inspired by the official go-nsq implementation and provides similar functionality and interfaces in the Rust ecosystem.
Add the following to your Cargo.toml:
[dependencies]
nsq-async-rs = "0.1.8"
use nsq_async_rs::consumer::{Consumer, ConsumerConfig, Handler};
use nsq_async_rs::error::Result;
use nsq_async_rs::protocol::Message;
#[derive(Default)]
struct MessageHandler;
#[async_trait::async_trait]
impl Handler for MessageHandler {
async fn handle_message(&self, message: Message) -> Result<()> {
println!("Received message: {:?}", String::from_utf8_lossy(&message.body));
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<()> {
let config = ConsumerConfig::default();
let consumer = Consumer::new(
"test_topic".to_string(),
"test_channel".to_string(),
config,
MessageHandler::default(),
)?;
consumer.connect_to_nsqlookupd("http://127.0.0.1:4161".to_string()).await?;
consumer.start().await?;
tokio::signal::ctrl_c().await?;
consumer.stop().await?;
Ok(())
}
use async_trait::async_trait;
use log::{error, info};
use nsq_async_rs::consumer::{Consumer, ConsumerConfig, Handler};
use nsq_async_rs::error::Result;
use nsq_async_rs::protocol::Message;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, Mutex};
/// Concurrent message handler
struct ConcurrentMessageHandler {
worker_count: usize,
sender: Arc<Mutex<mpsc::Sender<Message>>>,
}
impl ConcurrentMessageHandler {
pub fn new(worker_count: usize) -> Self {
// Create message channel with buffer size 10x worker count
let (tx, rx) = mpsc::channel(worker_count * 10);
let sender = Arc::new(Mutex::new(tx));
let receiver = Arc::new(Mutex::new(rx));
let handler = Self {
worker_count,
sender,
};
// Start worker threads
handler.start_workers(receiver);
handler
}
fn start_workers(&self, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) {
for i in 0..self.worker_count {
let worker_id = i + 1;
let rx = receiver.clone();
tokio::spawn(async move {
info!("Worker {} started", worker_id);
loop {
// Get message from channel
let msg = {
let mut rx_guard = rx.lock().await;
match rx_guard.recv().await {
Some(msg) => msg,
None => break,
}
};
// Process message
let msg_id = String::from_utf8_lossy(&msg.id).to_string();
info!("Worker {} processing message: {}", worker_id, msg_id);
// Add your message processing logic here
info!("Worker {} finished processing message: {}", worker_id, msg_id);
}
});
}
}
}
#[async_trait]
impl Handler for ConcurrentMessageHandler {
async fn handle_message(&self, message: Message) -> Result<()> {
let msg_id = String::from_utf8_lossy(&message.id).to_string();
let sender = self.sender.lock().await;
// Try non-blocking send first
let send_result = sender.try_send(message.clone());
match send_result {
Ok(_) => {
info!("Message sent to worker channel: ID={}", msg_id);
}
Err(mpsc::error::TrySendError::Full(msg)) => {
// Channel full, use blocking send
if let Err(e) = sender.send(msg).await {
error!("Failed to send message to worker channel: {}", e);
return Err(nsq_async_rs::error::Error::Other(e.to_string()));
}
}
Err(mpsc::error::TrySendError::Closed(_)) => {
error!("Worker channel closed: ID={}", msg_id);
return Err(nsq_async_rs::error::Error::Other("Worker channel closed".into()));
}
}
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<()> {
// Create consumer config
let config = ConsumerConfig {
max_in_flight: 100, // Increase for higher throughput
max_attempts: 5,
// Other config options...
..Default::default()
};
// Create concurrent handler with 20 worker threads
let handler = ConcurrentMessageHandler::new(20);
// Create consumer
let consumer = Consumer::new(
"test_topic".to_string(),
"test_channel".to_string(),
config,
handler,
)?;
consumer.connect_to_nsqlookupd("http://127.0.0.1:4161".to_string()).await?;
consumer.start().await?;
tokio::signal::ctrl_c().await?;
consumer.stop().await?;
Ok(())
}
use nsq_async_rs::producer::Producer;
use nsq_async_rs::error::Result;
#[tokio::main]
async fn main() -> Result<()> {
let producer = Producer::connect("127.0.0.1:4150").await?;
producer.publish("test_topic", "Hello, NSQ!".as_bytes()).await?;
Ok(())
}
use log::{error, info};
use nsq_async_rs::{
producer::{new_producer, NsqProducer},
Producer, ProducerConfig,
};
use std::error::Error;
use std::time::Duration;
use nsq_async_rs::pool::{Pool, PoolConfig, PoolError};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Configure the connection pool
let pool_config = PoolConfig {
initial_cap: 3, // Initial connections
max_cap: 10, // Maximum connections
max_idle: 5, // Maximum idle connections
idle_timeout: Duration::from_secs(30), // Idle timeout
max_lifetime: Duration::from_secs(300), // Connection lifetime (5 minutes)
};
// Create NSQ producer connection pool
let pool = Pool::new(
pool_config,
// Factory function to create new connections
|| {
let p_cfg = ProducerConfig {
nsqd_addresses: vec!["127.0.0.1:4150".to_string()],
..Default::default()
};
Ok(new_producer(p_cfg))
},
// Close function (NSQ producers don't need explicit closing)
|_producer| Ok(()),
// Optional ping function to check connection health
Some(|_producer| Ok(())),
).await?;
// Get a connection from the pool
let topic = "test_topic";
let pooled_conn = pool.get().await?;
// Use the connection
match pooled_conn.conn.publish(topic, "Hello from connection pool!").await {
Ok(_) => info!("Message published successfully"),
Err(e) => error!("Failed to publish message: {}", e),
}
// Return the connection to the pool
pool.put(pooled_conn).await?;
// Close the pool when done
pool.release().await?;
Ok(())
}
use chrono::Local;
use nsq_async_rs::producer::{new_producer, ProducerConfig};
use std::error::Error;
use std::time::Instant;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Create producer config
let mut config = ProducerConfig::default();
config.nsqd_addresses = vec!["127.0.0.1:4150".to_string()];
// Create NSQ producer
let producer = new_producer(config);
let topic = "test_topic";
// Prepare multiple messages
let mut messages = vec![];
for i in 0..100 {
messages.push(format!(
"Message #{}, Time:{}",
i + 1,
Local::now().to_string()
));
}
// Measure batch publishing performance
let start = Instant::now();
producer.publish_multi(topic, messages).await?;
let elapsed = start.elapsed();
println!("Batch publishing 100 messages took: {:?}", elapsed);
println!("Average time per message: {:?}", elapsed / 100);
Ok(())
}
ConsumerConfig {
max_in_flight: 100, // Maximum number of messages to process simultaneously
max_attempts: 5, // Maximum number of retries
dial_timeout: Duration::from_secs(1), // Connection timeout
read_timeout: Duration::from_secs(60), // Read timeout
write_timeout: Duration::from_secs(1), // Write timeout
lookup_poll_interval: Duration::from_secs(60), // nsqlookupd polling interval
lookup_poll_jitter: 0.3, // Polling jitter coefficient
max_requeue_delay: Duration::from_secs(15 * 60), // Maximum requeue delay
default_requeue_delay: Duration::from_secs(90), // Default requeue delay
shutdown_timeout: Duration::from_secs(30), // Shutdown timeout
backoff_strategy: true, // Enable exponential backoff reconnection strategy
}
PoolConfig {
initial_cap: 5, // Initial connections to create
max_cap: 20, // Maximum connections allowed
max_idle: 10, // Maximum idle connections to keep
idle_timeout: Duration::from_secs(30), // How long connections can remain idle
max_lifetime: Duration::from_secs(300), // Maximum connection lifetime (5 minutes)
}
// Ping with default timeout (5 seconds)
let result = producer.ping(None, None).await;
// Ping with custom address and timeout
let result = producer.ping(
Some("127.0.0.1:4150"),
Some(Duration::from_millis(500))
).await;
// Check ping result before proceeding
if let Err(err) = result {
println!("NSQ server connection error: {}", err);
// Handle connection error...
}
producer.publish_with_delay("test_topic", "Delayed message".as_bytes(), Duration::from_secs(60)).await?;
let messages = vec![
"Message 1".as_bytes().to_vec(),
"Message 2".as_bytes().to_vec(),
"Message 3".as_bytes().to_vec(),
];
producer.publish_multiple("test_topic", messages).await?;
nsq-async-rs supports manual message acknowledgement, giving you full control over when messages are acknowledged. This is particularly useful for concurrent processing, batch processing, or complex error handling scenarios.
Manual acknowledgement is ideal for:
TOUCH to extend timeout for messages that take longer to processlet config = ConsumerConfig {
max_in_flight: 100,
disable_auto_response: true, // Enable manual acknowledgement
..Default::default()
};
#[async_trait]
impl Handler for MyHandler {
async fn handle_message(&self, message: Message) -> Result<()> {
// Send to worker thread for async processing
self.worker_tx.send(message).await?;
// Return Ok, but won't auto-FIN (because manual ack is enabled)
Ok(())
}
}
// In worker thread
async fn worker(message: Message) {
match process_message(&message.body).await {
Ok(_) => {
// Manually send FIN
message.finish().await.unwrap();
}
Err(_) => {
// Manually requeue with 5 second delay
message.requeue(5000).await.unwrap();
}
}
}
message.finish() - Send FIN command to mark message as successfully processedmessage.requeue(delay) - Send REQ command to requeue the message with a delay (in milliseconds)message.touch() - Send TOUCH command to reset message timeoutmessage.disable_auto_response() - Disable auto-response for a specific messageHere's a complete example showing manual acknowledgement with concurrent message processing:
use async_trait::async_trait;
use nsq_async_rs::consumer::{Consumer, ConsumerConfig, Handler};
use nsq_async_rs::protocol::Message;
use tokio::sync::mpsc;
struct ConcurrentHandler {
worker_tx: mpsc::Sender<Message>,
}
impl ConcurrentHandler {
fn new(worker_count: usize) -> Self {
let (tx, mut rx) = mpsc::channel(100);
// Start worker threads
for worker_id in 0..worker_count {
let mut worker_rx = rx.clone();
tokio::spawn(async move {
while let Some(msg) = worker_rx.recv().await {
// Process message
match process_message(&msg.body).await {
Ok(_) => {
msg.finish().await.ok();
}
Err(_) => {
if msg.attempts < 3 {
msg.requeue(5000).await.ok();
} else {
msg.finish().await.ok(); // Give up retrying
}
}
}
}
});
}
Self { worker_tx: tx }
}
}
#[async_trait]
impl Handler for ConcurrentHandler {
async fn handle_message(&self, message: Message) -> Result<()> {
self.worker_tx.send(message).await?;
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<()> {
let config = ConsumerConfig {
max_in_flight: 100,
disable_auto_response: true, // Enable manual acknowledgement
..Default::default()
};
let handler = ConcurrentHandler::new(20);
let consumer = Consumer::new("topic", "channel", config, handler)?;
consumer.connect_to_nsqlookupd("http://127.0.0.1:4161").await?;
consumer.start().await?;
tokio::signal::ctrl_c().await?;
consumer.stop().await?;
Ok(())
}
# Run manual acknowledgement example
cargo run --example manual_ack_consume
# Run automatic acknowledgement example (for comparison)
cargo run --example simple_consume
max_in_flight - Can be set higher (e.g., 100) with manual ack, but ensure it doesn't exceed worker capacitymessage.attempts to avoid infinite retries and use exponential backofftry_send or timeout mechanismsconsumer.stats() to monitor messages_received, messages_finished, and messages_requeuedMessage Timeout Issues
attempts keeps increasingmessage.touch() to reset timeout or increase IdentifyConfig.msg_timeoutMessage Loss
Memory Leaks
Contributions are welcome! Please feel free to submit issues and pull requests.
MIT License