| Crates.io | rabbitmq_streamer |
| lib.rs | rabbitmq_streamer |
| version | 1.1.2 |
| created_at | 2025-04-25 16:35:48.45722+00 |
| updated_at | 2025-11-05 14:54:03.914418+00 |
| description | A library to consume RabbiMQ streams |
| homepage | https://github.com/neurono-ml/rabbitmq_streamer |
| repository | https://github.com/neurono-ml/rabbitmq_streamer |
| max_upload_size | |
| id | 1649209 |
| size | 151,075 |
(https://crates.io/crates/rabbitmq_streamer)
A easy library for publishing to RabbitMQ and Consuming from it using Rust Streams. It provides a high-level API for connecting to RabbitMQ, publishing, consuming messages from queues and acking them in batches.
RabbitPublisher: A high-level publisher for sending messages to RabbitMQ exchanges
RabbitConsumer: A stream-based consumer for receiving messages from RabbitMQ queues
load_messages() to consume messages that are automatically acknowledged upon receiptload_ackable_messages() to receive AckableMessage<T> instances for manual message acknowledgment controlAckableMessage: Wrapper for messages requiring manual acknowledgment
ack(): Acknowledge successful message processing
nack(): Negative acknowledge with requeue option
reject(): Reject message without requeue
Access to the original message payload through message()
This library includes comprehensive tests using testcontainers to ensure reliability with real RabbitMQ instances. The test suite covers:
To run the tests:
# Run all tests (requires Docker)
cargo test
# Run specific test suites
cargo test --test publisher_tests
cargo test --test consumer_tests
cargo test --test integration_tests
For detailed testing information, see TESTING.md.
This project uses GitHub Actions for automated testing and publishing:
For complete CI/CD documentation, see CI_CD.md.
use rabbitmq_streamer::RabbitPublisher;
use serde::Serialize;
#[derive(Serialize)]
struct OrderEvent {
order_id: u32,
customer_id: u32,
amount: f64,
status: String,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let uri = "amqp://guest:guest@localhost:5672";
let exchange_name = "orders";
// Connect to RabbitMQ and create publisher
let publisher = RabbitPublisher::connect(uri, exchange_name).await?;
// Create and publish a message
let order = OrderEvent {
order_id: 12345,
customer_id: 67890,
amount: 99.99,
status: "created".to_string(),
};
// Publish with routing key
publisher.publish(&order, "orders.created").await?;
println!("Order event published successfully!");
Ok(())
}
use rabbitmq_streamer::RabbitConsumer;
use serde::Deserialize;
use tokio::time::{timeout, Duration};
#[derive(Deserialize, Debug)]
struct OrderEvent {
order_id: u32,
customer_id: u32,
amount: f64,
status: String,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let uri = "amqp://guest:guest@localhost:5672";
let queue_name = "order_events";
// Connect to RabbitMQ and create consumer
let consumer = RabbitConsumer::connect(uri, queue_name).await?;
// Start consuming messages (auto-acknowledged)
let mut message_receiver = consumer
.load_messages::<OrderEvent, _>(10, Some("order-processor"))
.await?;
println!("Waiting for messages...");
// Process messages
while let Ok(Some(order)) = timeout(Duration::from_secs(30), message_receiver.recv()).await {
println!("Received order: {:?}", order);
// Process the order here
// Message is automatically acknowledged
}
Ok(())
}
use rabbitmq_streamer::RabbitConsumer;
use serde::Deserialize;
use tokio::time::{timeout, Duration};
#[derive(Deserialize, Debug)]
struct PaymentEvent {
payment_id: u32,
order_id: u32,
amount: f64,
status: String,
}
async fn process_payment(payment: &PaymentEvent) -> anyhow::Result<()> {
// Simulate payment processing
println!("Processing payment {} for order {}", payment.payment_id, payment.order_id);
// Simulate some processing logic that might fail
if payment.amount < 0.0 {
return Err(anyhow::anyhow!("Invalid payment amount"));
}
// Process payment...
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let uri = "amqp://guest:guest@localhost:5672";
let queue_name = "payment_events";
// Connect to RabbitMQ and create consumer
let consumer = RabbitConsumer::connect(uri, queue_name).await?;
// Start consuming messages with manual acknowledgment
let mut message_receiver = consumer
.load_ackable_messages::<PaymentEvent, _>(10, Some("payment-processor"))
.await?;
println!("Waiting for payment events...");
// Process messages with manual acknowledgment
while let Ok(Some(ackable_message)) = timeout(Duration::from_secs(30), message_receiver.recv()).await {
let payment = ackable_message.message();
println!("Received payment: {:?}", payment);
match process_payment(&payment).await {
Ok(()) => {
// Success: acknowledge the message
if let Err(e) = ackable_message.ack().await {
eprintln!("Failed to acknowledge message: {}", e);
}
println!("Payment processed and acknowledged");
}
Err(e) => {
eprintln!("Failed to process payment: {}", e);
// Reject message and requeue for retry
if let Err(e) = ackable_message.nack().await {
eprintln!("Failed to nack message: {}", e);
}
}
}
}
Ok(())
}
use rabbitmq_streamer::{RabbitPublisher, RabbitConsumer};
use serde::{Serialize, Deserialize};
use tokio::time::{sleep, timeout, Duration};
#[derive(Serialize, Deserialize, Debug, Clone)]
struct TaskMessage {
task_id: u32,
task_type: String,
payload: String,
created_at: u64,
}
impl TaskMessage {
fn new(task_id: u32, task_type: &str, payload: &str) -> Self {
Self {
task_id,
task_type: task_type.to_string(),
payload: payload.to_string(),
created_at: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
}
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let uri = "amqp://guest:guest@localhost:5672";
let exchange_name = "tasks";
let queue_name = "task_queue";
// Create publisher
let publisher = RabbitPublisher::connect(uri, exchange_name).await?;
// Create consumer
let consumer = RabbitConsumer::connect(uri, queue_name).await?;
let mut message_receiver = consumer
.load_ackable_messages::<TaskMessage, _>(20, Some("task-worker"))
.await?;
// Start consumer task
let consumer_handle = tokio::spawn(async move {
println!("Task worker started...");
while let Ok(Some(ackable_message)) = timeout(Duration::from_secs(60), message_receiver.recv()).await {
let task = ackable_message.message();
println!("Processing task {}: {}", task.task_id, task.task_type);
// Simulate task processing
sleep(Duration::from_millis(500)).await;
// Acknowledge successful processing
if let Err(e) = ackable_message.ack().await {
eprintln!("Failed to acknowledge task {}: {}", task.task_id, e);
} else {
println!("Task {} completed successfully", task.task_id);
}
}
});
// Give consumer time to start
sleep(Duration::from_secs(1)).await;
// Publish some tasks
let tasks = vec![
TaskMessage::new(1, "email", "Send welcome email"),
TaskMessage::new(2, "report", "Generate daily report"),
TaskMessage::new(3, "cleanup", "Clean up temp files"),
];
for task in tasks {
let routing_key = format!("tasks.{}", task.task_type);
publisher.publish(&task, routing_key).await?;
println!("Published task: {}", task.task_id);
// Small delay between publications
sleep(Duration::from_millis(100)).await;
}
// Wait for consumer to process messages
tokio::select! {
_ = consumer_handle => {
println!("Consumer finished");
}
_ = sleep(Duration::from_secs(10)) => {
println!("Example completed");
}
}
Ok(())
}
[dependencies]
rabbitmq_streamer = "0.2.0"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
anyhow = "1.0"