| Crates.io | intercom-rs |
| lib.rs | intercom-rs |
| version | 1.1.1 |
| created_at | 2026-01-16 13:05:39.825417+00 |
| updated_at | 2026-01-16 18:08:42.571909+00 |
| description | A fully typed async wrapper for NATS with JetStream support |
| homepage | |
| repository | https://github.com/veronoicc/intercom |
| max_upload_size | |
| id | 2048570 |
| size | 165,860 |
A fully typed async wrapper for NATS with JetStream support.
Add to your Cargo.toml:
[dependencies]
intercom = "0.1"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
futures = "0.3"
By default, MessagePack is used for serialization. You can enable JSON support:
# MessagePack only (default)
intercom = "0.1"
# JSON only
intercom = { version = "0.1", default-features = false, features = ["json"] }
# Both codecs
intercom = { version = "0.1", features = ["json"] }
use intercom::{Client, MsgPackCodec, Result};
use serde::{Deserialize, Serialize};
use futures::StreamExt;
#[derive(Debug, Serialize, Deserialize)]
struct MyMessage {
content: String,
}
#[tokio::main]
async fn main() -> Result<()> {
// Specify the codec type when creating the client
let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
// Type-safe publish with turbofish syntax
client.publish::<MyMessage>("subject", &MyMessage {
content: "hello".into()
}).await?;
// Create a typed subscriber that implements Stream
let mut subscriber = client.subscribe::<MyMessage>("subject").await?;
while let Some(result) = subscriber.next().await {
match result {
Ok(msg) => println!("Received: {:?}", msg.payload),
Err(e) => eprintln!("Error: {}", e),
}
}
Ok(())
}
use intercom::{Client, JsonCodec, Result}; // Requires `json` feature
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct MyMessage { content: String }
async fn example() -> Result<()> {
// Use JSON for human-readable serialization
let client = Client::<JsonCodec>::connect("nats://localhost:4222").await?;
client.publish::<MyMessage>("subject", &MyMessage {
content: "hello".into()
}).await?;
Ok(())
}
use intercom::{Client, MsgPackCodec};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct Request { query: String }
#[derive(Serialize, Deserialize)]
struct Response { result: String }
async fn example() -> intercom::Result<()> {
let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
// Type-safe request/reply
let response = client.request::<Request, Response>(
"service",
&Request { query: "hello".into() }
).await?;
println!("Response: {}", response.result);
Ok(())
}
use intercom::{Client, MsgPackCodec};
use serde::{Deserialize, Serialize};
use futures::SinkExt;
#[derive(Serialize, Deserialize)]
struct MyMessage { content: String }
async fn example() -> intercom::Result<()> {
let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
// Create a publisher that implements Sink
let mut publisher = client.publisher::<MyMessage>("subject");
// Use Sink trait methods
publisher.send(MyMessage { content: "hello".into() }).await?;
// Or batch with feed + flush
publisher.feed(MyMessage { content: "msg1".into() }).await?;
publisher.feed(MyMessage { content: "msg2".into() }).await?;
publisher.flush().await?;
Ok(())
}
use intercom::{Client, MsgPackCodec};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)]
struct Task { id: u64 }
async fn example() -> intercom::Result<()> {
let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
// Subscribe to a queue group - messages are load-balanced
let subscriber = client.queue_subscribe::<Task>("tasks", "workers").await?;
Ok(())
}
use intercom::{Client, MsgPackCodec, RetentionPolicy};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)]
struct Event { id: u64, data: String }
async fn example() -> intercom::Result<()> {
let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
let jetstream = client.jetstream();
// Create a stream with builder pattern
let stream = jetstream
.stream_builder("events")
.subjects(vec!["events.>".to_string()])
.max_messages(1_000_000)
.max_bytes(1024 * 1024 * 100) // 100MB
.max_age(std::time::Duration::from_secs(86400)) // 1 day
.replicas(3)
.create()
.await?;
Ok(())
}
use intercom::{Client, MsgPackCodec};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct Event { id: u64, data: String }
async fn example() -> intercom::Result<()> {
let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
let jetstream = client.jetstream();
// Publish with acknowledgment
let ack = jetstream.publish::<Event>("events.user", &Event {
id: 1,
data: "user created".to_string(),
}).await?;
println!("Published to stream: {}, seq: {}", ack.stream, ack.sequence);
// Async publish for batching
let ack_future = jetstream.publish_async::<Event>("events.user", &Event {
id: 2,
data: "user updated".to_string(),
}).await?;
// Do other work, then await the ack
let ack = ack_future.await?;
Ok(())
}
use intercom::{Client, MsgPackCodec};
use serde::{Deserialize, Serialize};
use futures::StreamExt;
#[derive(Serialize, Deserialize, Debug)]
struct Event { id: u64 }
async fn example() -> intercom::Result<()> {
let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
let jetstream = client.jetstream();
let stream = jetstream.get_stream("events").await?;
// Create a pull consumer with turbofish syntax
let consumer = stream
.pull_consumer_builder::<Event>("my-consumer")
.durable()
.filter_subject("events.user.>")
.ack_wait(std::time::Duration::from_secs(30))
.max_deliver(3)
.create()
.await?;
// Fetch a batch of messages
let mut messages = consumer.fetch(10).await?;
while let Some(result) = messages.next().await {
let msg = result?;
println!("Got: {:?}", msg.payload);
msg.ack().await?;
}
// Or get a continuous stream
let mut messages = consumer.messages().await?;
while let Some(result) = messages.next().await {
let msg = result?;
// Process message
msg.ack().await?;
}
Ok(())
}
use intercom::{Client, MsgPackCodec};
use serde::{Deserialize, Serialize};
use futures::StreamExt;
#[derive(Serialize, Deserialize, Debug)]
struct Event { id: u64 }
async fn example() -> intercom::Result<()> {
let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
let jetstream = client.jetstream();
let stream = jetstream.get_stream("events").await?;
// Create a push consumer
let consumer = stream
.push_consumer_builder::<Event>("my-push-consumer")
.deliver_subject("deliver.events")
.deliver_group("workers") // Queue group for load balancing
.durable()
.create()
.await?;
// Get messages
let mut messages = consumer.messages().await?;
while let Some(result) = messages.next().await {
let msg = result?;
println!("Got: {:?}", msg.payload);
msg.ack().await?;
}
Ok(())
}
use intercom::{Client, MsgPackCodec, WorkQueue};
use serde::{Deserialize, Serialize};
use futures::StreamExt;
#[derive(Serialize, Deserialize, Debug)]
struct Job { id: u64, payload: String }
async fn example() -> intercom::Result<()> {
let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
let jetstream = client.jetstream();
// Create a work queue
let queue = WorkQueue::<Job, MsgPackCodec>::builder(&jetstream, "jobs")
.max_messages(10_000)
.create()
.await?;
// Push jobs
queue.push(&Job { id: 1, payload: "do work".into() }).await?;
// Option 1: Use as a Stream (pulls one job at a time)
let mut queue = queue.into_stream().await?;
while let Some(result) = queue.next().await {
let job = result?;
println!("Processing: {:?}", job.payload);
job.ack().await?; // Message removed from queue after ack
}
// Option 2: Pull a batch of jobs
// let mut jobs = queue.pull(10).await?;
// while let Some(result) = jobs.next().await {
// let job = result?;
// job.ack().await?;
// }
Ok(())
}
use intercom::{Client, MsgPackCodec, InterestQueue};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)]
struct Event { id: u64, data: String }
async fn example() -> intercom::Result<()> {
let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
let jetstream = client.jetstream();
// Create an interest-based queue
let queue = InterestQueue::<Event, MsgPackCodec>::builder(&jetstream, "events")
.subject("events.>")
.create()
.await?;
// Add multiple consumers - message is removed only when ALL acknowledge
let consumer1 = queue.add_consumer("service-a").await?;
let consumer2 = queue.add_consumer("service-b").await?;
// Publish an event
queue.publish(&Event { id: 1, data: "test".into() }).await?;
Ok(())
}
JetStream messages support various acknowledgment modes:
// Positive acknowledgment
msg.ack().await?;
// Double acknowledgment (waits for server confirmation)
msg.double_ack().await?;
// Negative acknowledgment (redelivery)
msg.nak().await?;
// Negative acknowledgment with delay
msg.nak_with_delay(std::time::Duration::from_secs(10)).await?;
// Mark as in progress (extend ack deadline)
msg.in_progress().await?;
// Terminate (message won't be redelivered)
msg.term().await?;
Intercom supports pluggable codecs:
All message types must implement Serialize and/or Deserialize:
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct MyMessage {
id: u64,
data: Vec<u8>,
tags: Vec<String>,
}
MIT