Crates.io | fe2o3-amqp |
lib.rs | fe2o3-amqp |
version | 0.13.1 |
source | src |
created_at | 2022-02-23 09:50:06.592381 |
updated_at | 2024-10-03 02:55:15.458595 |
description | An implementation of AMQP1.0 protocol based on serde and tokio |
homepage | https://github.com/minghuaw/fe2o3-amqp |
repository | https://github.com/minghuaw/fe2o3-amqp |
max_upload_size | |
id | 537721 |
size | 949,755 |
A rust implementation of AMQP 1.0 protocol based on serde and tokio.
default = []
Feature | Description |
---|---|
"rustls" |
enables TLS integration with tokio-rustls and rustls |
"native-tls" |
enables TLS integration with tokio-native-tls and native-tls |
"acceptor" |
enables ConnectionAcceptor , SessionAcceptor , and LinkAcceptor |
"transaction" |
enables Controller , Transaction , OwnedTransaction and control_link_acceptor |
"scram" |
enables SCRAM auth |
"tracing" |
enables logging with tracing |
"log" |
enables logging with log |
More examples including one showing how to use it with Azure Service Bus can be found on the GitHub repo.
Below is an example with a local broker
(TestAmqpBroker
)
listening on the localhost. The broker is executed with the following command
./TestAmqpBroker.exe amqp://localhost:5672 /creds:guest:guest /queues:q1
The following code requires the [tokio
] async runtime added to the dependencies.
use fe2o3_amqp::{Connection, Session, Sender, Receiver};
use fe2o3_amqp::types::messaging::Outcome;
#[tokio::main]
async fn main() {
let mut connection = Connection::open(
"connection-1", // container id
"amqp://guest:guest@localhost:5672" // url
).await.unwrap();
let mut session = Session::begin(&mut connection).await.unwrap();
// Create a sender
let mut sender = Sender::attach(
&mut session, // Session
"rust-sender-link-1", // link name
"q1" // target address
).await.unwrap();
// Create a receiver
let mut receiver = Receiver::attach(
&mut session,
"rust-receiver-link-1", // link name
"q1" // source address
).await.unwrap();
// Send a message to the broker and wait for outcome (Disposition)
let outcome: Outcome = sender.send("hello AMQP").await.unwrap();
outcome.accepted_or_else(|state| state).unwrap(); // Handle delivery outcome
// Send a message with batchable field set to true
let fut = sender.send_batchable("hello batchable AMQP").await.unwrap();
let outcome: Outcome = fut.await.unwrap(); // Wait for outcome (Disposition)
outcome.accepted_or_else(|state| state).unwrap(); // Handle delivery outcome
// Receive the message from the broker
let delivery = receiver.recv::<String>().await.unwrap();
receiver.accept(&delivery).await.unwrap();
sender.close().await.unwrap(); // Detach sender with closing Detach performatives
receiver.close().await.unwrap(); // Detach receiver with closing Detach performatives
session.end().await.unwrap(); // End the session
connection.close().await.unwrap(); // Close the connection
}
use tokio::net::TcpListener;
use fe2o3_amqp::acceptor::{ConnectionAcceptor, SessionAcceptor, LinkAcceptor, LinkEndpoint};
#[tokio::main]
async fn main() {
let tcp_listener = TcpListener::bind("localhost:5672").await.unwrap();
let connection_acceptor = ConnectionAcceptor::new("example-listener");
while let Ok((stream, addr)) = tcp_listener.accept().await {
let mut connection = connection_acceptor.accept(stream).await.unwrap();
let handle = tokio::spawn(async move {
let session_acceptor = SessionAcceptor::new();
while let Ok(mut session) = session_acceptor.accept(&mut connection).await{
let handle = tokio::spawn(async move {
let link_acceptor = LinkAcceptor::new();
match link_acceptor.accept(&mut session).await.unwrap() {
LinkEndpoint::Sender(sender) => { },
LinkEndpoint::Receiver(recver) => { },
}
});
}
});
}
}
fe2o3-amqp-ws
is needed for WebSocket binding
use fe2o3_amqp::{
types::{messaging::Outcome, primitives::Value},
Connection, Delivery, Receiver, Sender, Session,
};
use fe2o3_amqp_ws::WebSocketStream;
#[tokio::main]
async fn main() {
let (ws_stream, _response) = WebSocketStream::connect("ws://localhost:5673")
.await
.unwrap();
let mut connection = Connection::builder()
.container_id("connection-1")
.open_with_stream(ws_stream)
.await
.unwrap();
connection.close().await.unwrap();
}
More examples of sending and receiving can be found on the GitHub repo. Please note that most examples requires a local broker running. One broker that can be used on Windows is TestAmqpBroker.
Experimental support for wasm32-unknown-unknown
target is added since "0.8.11" and requires use of
fe2o3-amqp-ws
to establish WebSocket connection to the broker. An example of sending and
receiving message in a browser tab can be found
examples/wasm32-in-browser.
Name | Description |
---|---|
serde_amqp_derive |
Custom derive macro for described types as defined in AMQP1.0 protocol |
serde_amqp |
AMQP1.0 serializer and deserializer as well as primitive types |
fe2o3-amqp-types |
AMQP1.0 data types |
fe2o3-amqp |
Implementation of AMQP1.0 Connection , Session , and Link |
fe2o3-amqp-ext |
Extension types and implementations |
fe2o3-amqp-ws |
WebSocket binding for fe2o3-amqp transport |
fe2o3-amqp-management |
Experimental implementation of AMQP1.0 management |
fe2o3-amqp-cbs |
Experimental implementation of AMQP1.0 CBS |
1.75.0
License: MIT/Apache-2.0