| Crates.io | jetstream-extra |
| lib.rs | jetstream-extra |
| version | 0.2.1 |
| created_at | 2025-10-03 13:50:13.443923+00 |
| updated_at | 2025-11-04 17:00:22.737094+00 |
| description | Set of utilities and extensions for the JetStream NATS of the async-nats crate |
| homepage | https://github.com/synadia-io/orbit.rs |
| repository | https://github.com/synadia-io/orbit.rs |
| max_upload_size | |
| id | 1866674 |
| size | 188,058 |
Set of utilities and extensions for the JetStream NATS of the async-nats crate.
Atomic batch publishing implementation for JetStream streams, ensuring that either all messages in a batch are stored or none are.
Connect to NATS server with JetStream, and extend the jetstream context with the batch publishing capabilities.
use async_nats::jetstream;
// Extend the JetStream context with batch publishing.
use jetstream_extra::batch_publish::BatchPublishExt;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = jetstream::new(client);
// Create or get a stream with atomic publishing enabled
let _stream = jetstream.get_or_create_stream(jetstream::stream::Config {
name: "events".to_string(),
subjects: vec!["events.*".to_string()],
allow_atomic_publish: true,
..Default::default()
}).await?;
// Build and use a batch publisher
let mut batch = jetstream.batch_publish().build();
// Add messages to the batch
batch.add("events.order", "order-123".into()).await?;
batch.add("events.payment", "payment-456".into()).await?;
batch.add("events.inventory", "item-789".into()).await?;
// Commit the batch atomically
let ack = batch.commit("events.notification", "notify-complete".into()).await?;
println!("Batch published with sequence: {}", ack.sequence);
Ok(())
}
Efficient batch fetching of messages from JetStream streams using the DIRECT.GET API, supporting:
use async_nats::jetstream;
use jetstream_extra::batch_fetch::BatchFetchExt;
use futures::StreamExt;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = async_nats::connect("demo.nats.io").await?;
let context = jetstream::new(client);
// Fetch 100 messages starting from sequence 1
let mut messages = context
.get_batch("my_stream", 100)
.send()
.await?;
while let Some(msg) = messages.next().await {
let msg = msg?;
println!("Message at seq {}: {:?}", msg.sequence, msg.subject);
}
Ok(())
}
use async_nats::jetstream;
use jetstream_extra::batch_fetch::BatchFetchExt;
use futures::StreamExt;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = async_nats::connect("demo.nats.io").await?;
let context = jetstream::new(client);
// Get the last message for each sensor
let subjects = vec![
"sensors.temp".to_string(),
"sensors.humidity".to_string(),
"sensors.pressure".to_string(),
];
let mut messages = context
.get_last_messages_for("sensor_stream")
.subjects(subjects)
.send()
.await?;
while let Some(msg) = messages.next().await {
let msg = msg?;
println!("Last value for {}: {:?}", msg.subject, msg.payload);
}
Ok(())
}