| Crates.io | amqp-bridge |
| lib.rs | amqp-bridge |
| version | 0.2.4 |
| created_at | 2025-12-18 09:55:27.079847+00 |
| updated_at | 2026-01-12 13:26:16.60423+00 |
| description | AMQP bridge |
| homepage | |
| repository | |
| max_upload_size | |
| id | 1992090 |
| size | 171,533 |
A service that bridges messages between two AMQP instances with automatic reconnection and health checks. Also available as an extendable library (crate) so other codebases can plug in a custom message transformer and run the bridge.
git clone https://github.com/bixority/amqp-bridge
cd amqp-bridge
cargo build --release
Alternatively, build a static Linux binary via Makefile (musl target):
make release
# output: target/amqp-bridge
Create a .env file:
# Required
SOURCE_DSN=amqp://user:password@source-host:5672/%2f
TARGET_DSN=amqp://user:password@target-host:5672/%2f
# Optional (with defaults)
SOURCE_QUEUE=old
TARGET_EXCHANGE=new_xchg
TARGET_ROUTING_KEY=update
HEALTH_PORT=8080
LOG_FORMAT=json # or 'pretty'
RUST_LOG=info # trace, debug, info, warn, error
cargo run --release
# or
./target/release/amqp-bridge
GET /healthz - Liveness probeGET /ready - Readiness probeGET /startup - Startup probeHealth states: Starting → Healthy / Unhealthy
LOG_FORMAT=json cargo run
Python-compatible structured logs for log aggregation systems.
LOG_FORMAT=pretty cargo run
Human-readable colored output.
Source Queue → Message Bridge → Target Exchange
↓
Consume → Publish → Confirm → Acknowledge
Guarantee: Messages are acknowledged only after successful publish confirmation. Failed messages are nack'd and requeued.
connection_refused - Service not running or firewall blockingaccess_refused - Authentication failurestimeout - No response from serverdns_resolution - Hostname resolution failed# Build
podman build -t amqp-bridge .
# Run
podman run -d \
-e SOURCE_DSN="amqp://user:pass@source:5672/%2f" \
-e TARGET_DSN="amqp://user:pass@target:5672/%2f" \
-p 8080:8080 \
amqp-bridge
podman compose --env-file .env up --build --remove-orphans
src/
├── main.rs # Entry point; delegates to recovery runner
├── bridge.rs # Message bridging logic
├── conf.rs # Configuration
├── health.rs # Health endpoints
├── logging.rs # Logging setup
└── transform.rs # Transformer trait and helper types
## Using as a Library (Extendable Crate)
You can depend on this crate and provide your own message transformation logic.
Implement a transformer and run the bridge:
```rust
use std::sync::Arc;
use anyhow::Result;
use amqp_bridge::{
Config,
HealthState,
MessageBridge,
MessageTransformer,
Message,
};
use async_trait::async_trait;
use tokio::sync::RwLock;
struct MyTransformer;
#[async_trait]
impl MessageTransformer for MyTransformer {
async fn transform(&self, input: Message) -> Result<Message> {
// Example: uppercase the body if it's UTF-8
let data = match String::from_utf8(input.data) {
Ok(s) => s.to_uppercase().into_bytes(),
Err(e) => e.into_bytes(),
};
Ok(Message { data, properties: input.properties })
}
}
#[tokio::main]
async fn main() -> Result<()> {
// Load config from env the same way the binary does
let config = Config::from_env()?;
let health_state = Arc::new(RwLock::new(HealthState::default()));
// Create bridge with optional transformer and run
let transformer: Arc<dyn MessageTransformer> = Arc::new(MyTransformer);
let bridge = MessageBridge::new(
config.clone(),
health_state.clone(),
Some(transformer),
).await?;
bridge.run().await?;
Ok(())
}
Alternatively, use the convenience runners with auto-recovery and Ctrl+C handling:
use std::sync::Arc;
use amqp_bridge::{
Config,
HealthState,
run_with_ctrl_c,
MessageTransformer,
};
use tokio::sync::RwLock;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let config = Config::from_env()?;
let health_state = Arc::new(RwLock::new(HealthState::default()));
let transformer: Arc<dyn MessageTransformer> = Arc::new(MyTransformer);
// Pass Some(transformer) to enable transformation
run_with_ctrl_c(config, health_state, Some(transformer)).await
}
Notes:
Add this crate to your project (if not using a local path), for example via Git:
[dependencies]
amqp-bridge = { git = "https://github.com/bixority/amqp-bridge" }
To run with pass-through behavior (no transform), just don't pass a transformer:
use amqp_bridge::{Config, HealthState, run_with_ctrl_c};
use tokio::sync::RwLock;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let config = Config::from_env()?;
let health_state = Arc::new(RwLock::new(HealthState::default()));
// None means: do not transform; forward as-is
run_with_ctrl_c(config, health_state, None).await
}
# Build
cargo build --release
# Test
cargo test
# Debug logging
RUST_LOG=debug cargo run
This makes it suitable for container/Kubernetes probes out of the box.
Check logs for error categories and hints:
Messages not appearing? Check:
event="message_received"