intercom-rs

Crates.iointercom-rs
lib.rsintercom-rs
version1.1.1
created_at2026-01-16 13:05:39.825417+00
updated_at2026-01-16 18:08:42.571909+00
descriptionA fully typed async wrapper for NATS with JetStream support
homepage
repositoryhttps://github.com/veronoicc/intercom
max_upload_size
id2048570
size165,860
veronoicc (veronoicc)

documentation

https://docs.rs/intercom-rs

README

Intercom

A fully typed async wrapper for NATS with JetStream support.

Features

  • Fully typed publish/subscribe with turbofish syntax support
  • Pluggable codec support (MessagePack default, JSON optional)
  • JetStream support with builder pattern for all options
  • Push and pull consumers with typed messages
  • Interest-based consumers for complex routing scenarios
  • Work queues with automatic acknowledgment tracking
  • Stream trait for subscribers and consumers
  • Sink trait for publishers

Installation

Add to your Cargo.toml:

[dependencies]
intercom = "0.1"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
futures = "0.3"

Codec Features

By default, MessagePack is used for serialization. You can enable JSON support:

# MessagePack only (default)
intercom = "0.1"

# JSON only
intercom = { version = "0.1", default-features = false, features = ["json"] }

# Both codecs
intercom = { version = "0.1", features = ["json"] }

Quick Start

Basic Publish/Subscribe

use intercom::{Client, MsgPackCodec, Result};
use serde::{Deserialize, Serialize};
use futures::StreamExt;

#[derive(Debug, Serialize, Deserialize)]
struct MyMessage {
    content: String,
}

#[tokio::main]
async fn main() -> Result<()> {
    // Specify the codec type when creating the client
    let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
    
    // Type-safe publish with turbofish syntax
    client.publish::<MyMessage>("subject", &MyMessage { 
        content: "hello".into() 
    }).await?;
    
    // Create a typed subscriber that implements Stream
    let mut subscriber = client.subscribe::<MyMessage>("subject").await?;
    
    while let Some(result) = subscriber.next().await {
        match result {
            Ok(msg) => println!("Received: {:?}", msg.payload),
            Err(e) => eprintln!("Error: {}", e),
        }
    }
    
    Ok(())
}

Using JSON Codec

use intercom::{Client, JsonCodec, Result};  // Requires `json` feature
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
struct MyMessage { content: String }

async fn example() -> Result<()> {
    // Use JSON for human-readable serialization
    let client = Client::<JsonCodec>::connect("nats://localhost:4222").await?;
    
    client.publish::<MyMessage>("subject", &MyMessage { 
        content: "hello".into() 
    }).await?;
    
    Ok(())
}

Request/Reply

use intercom::{Client, MsgPackCodec};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
struct Request { query: String }

#[derive(Serialize, Deserialize)]
struct Response { result: String }

async fn example() -> intercom::Result<()> {
    let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
    
    // Type-safe request/reply
    let response = client.request::<Request, Response>(
        "service",
        &Request { query: "hello".into() }
    ).await?;
    
    println!("Response: {}", response.result);
    Ok(())
}

Publisher with Sink Trait

use intercom::{Client, MsgPackCodec};
use serde::{Deserialize, Serialize};
use futures::SinkExt;

#[derive(Serialize, Deserialize)]
struct MyMessage { content: String }

async fn example() -> intercom::Result<()> {
    let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
    
    // Create a publisher that implements Sink
    let mut publisher = client.publisher::<MyMessage>("subject");
    
    // Use Sink trait methods
    publisher.send(MyMessage { content: "hello".into() }).await?;
    
    // Or batch with feed + flush
    publisher.feed(MyMessage { content: "msg1".into() }).await?;
    publisher.feed(MyMessage { content: "msg2".into() }).await?;
    publisher.flush().await?;
    
    Ok(())
}

Queue Groups

use intercom::{Client, MsgPackCodec};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug)]
struct Task { id: u64 }

async fn example() -> intercom::Result<()> {
    let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
    
    // Subscribe to a queue group - messages are load-balanced
    let subscriber = client.queue_subscribe::<Task>("tasks", "workers").await?;
    
    Ok(())
}

JetStream

Creating Streams with Builder Pattern

use intercom::{Client, MsgPackCodec, RetentionPolicy};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug)]
struct Event { id: u64, data: String }

async fn example() -> intercom::Result<()> {
    let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
    let jetstream = client.jetstream();
    
    // Create a stream with builder pattern
    let stream = jetstream
        .stream_builder("events")
        .subjects(vec!["events.>".to_string()])
        .max_messages(1_000_000)
        .max_bytes(1024 * 1024 * 100)  // 100MB
        .max_age(std::time::Duration::from_secs(86400))  // 1 day
        .replicas(3)
        .create()
        .await?;
    
    Ok(())
}

Publishing to JetStream

use intercom::{Client, MsgPackCodec};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
struct Event { id: u64, data: String }

async fn example() -> intercom::Result<()> {
    let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
    let jetstream = client.jetstream();
    
    // Publish with acknowledgment
    let ack = jetstream.publish::<Event>("events.user", &Event {
        id: 1,
        data: "user created".to_string(),
    }).await?;
    
    println!("Published to stream: {}, seq: {}", ack.stream, ack.sequence);
    
    // Async publish for batching
    let ack_future = jetstream.publish_async::<Event>("events.user", &Event {
        id: 2,
        data: "user updated".to_string(),
    }).await?;
    
    // Do other work, then await the ack
    let ack = ack_future.await?;
    
    Ok(())
}

Pull Consumers

use intercom::{Client, MsgPackCodec};
use serde::{Deserialize, Serialize};
use futures::StreamExt;

#[derive(Serialize, Deserialize, Debug)]
struct Event { id: u64 }

async fn example() -> intercom::Result<()> {
    let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
    let jetstream = client.jetstream();
    let stream = jetstream.get_stream("events").await?;
    
    // Create a pull consumer with turbofish syntax
    let consumer = stream
        .pull_consumer_builder::<Event>("my-consumer")
        .durable()
        .filter_subject("events.user.>")
        .ack_wait(std::time::Duration::from_secs(30))
        .max_deliver(3)
        .create()
        .await?;
    
    // Fetch a batch of messages
    let mut messages = consumer.fetch(10).await?;
    while let Some(result) = messages.next().await {
        let msg = result?;
        println!("Got: {:?}", msg.payload);
        msg.ack().await?;
    }
    
    // Or get a continuous stream
    let mut messages = consumer.messages().await?;
    while let Some(result) = messages.next().await {
        let msg = result?;
        // Process message
        msg.ack().await?;
    }
    
    Ok(())
}

Push Consumers

use intercom::{Client, MsgPackCodec};
use serde::{Deserialize, Serialize};
use futures::StreamExt;

#[derive(Serialize, Deserialize, Debug)]
struct Event { id: u64 }

async fn example() -> intercom::Result<()> {
    let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
    let jetstream = client.jetstream();
    let stream = jetstream.get_stream("events").await?;
    
    // Create a push consumer
    let consumer = stream
        .push_consumer_builder::<Event>("my-push-consumer")
        .deliver_subject("deliver.events")
        .deliver_group("workers")  // Queue group for load balancing
        .durable()
        .create()
        .await?;
    
    // Get messages
    let mut messages = consumer.messages().await?;
    while let Some(result) = messages.next().await {
        let msg = result?;
        println!("Got: {:?}", msg.payload);
        msg.ack().await?;
    }
    
    Ok(())
}

Work Queues

use intercom::{Client, MsgPackCodec, WorkQueue};
use serde::{Deserialize, Serialize};
use futures::StreamExt;

#[derive(Serialize, Deserialize, Debug)]
struct Job { id: u64, payload: String }

async fn example() -> intercom::Result<()> {
    let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
    let jetstream = client.jetstream();
    
    // Create a work queue
    let queue = WorkQueue::<Job, MsgPackCodec>::builder(&jetstream, "jobs")
        .max_messages(10_000)
        .create()
        .await?;
    
    // Push jobs
    queue.push(&Job { id: 1, payload: "do work".into() }).await?;
    
    // Option 1: Use as a Stream (pulls one job at a time)
    let mut queue = queue.into_stream().await?;
    while let Some(result) = queue.next().await {
        let job = result?;
        println!("Processing: {:?}", job.payload);
        job.ack().await?;  // Message removed from queue after ack
    }
    
    // Option 2: Pull a batch of jobs
    // let mut jobs = queue.pull(10).await?;
    // while let Some(result) = jobs.next().await {
    //     let job = result?;
    //     job.ack().await?;
    // }
    
    Ok(())
}

Interest-Based Consumers

use intercom::{Client, MsgPackCodec, InterestQueue};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug)]
struct Event { id: u64, data: String }

async fn example() -> intercom::Result<()> {
    let client = Client::<MsgPackCodec>::connect("nats://localhost:4222").await?;
    let jetstream = client.jetstream();
    
    // Create an interest-based queue
    let queue = InterestQueue::<Event, MsgPackCodec>::builder(&jetstream, "events")
        .subject("events.>")
        .create()
        .await?;
    
    // Add multiple consumers - message is removed only when ALL acknowledge
    let consumer1 = queue.add_consumer("service-a").await?;
    let consumer2 = queue.add_consumer("service-b").await?;
    
    // Publish an event
    queue.publish(&Event { id: 1, data: "test".into() }).await?;
    
    Ok(())
}

Message Acknowledgment

JetStream messages support various acknowledgment modes:

// Positive acknowledgment
msg.ack().await?;

// Double acknowledgment (waits for server confirmation)
msg.double_ack().await?;

// Negative acknowledgment (redelivery)
msg.nak().await?;

// Negative acknowledgment with delay
msg.nak_with_delay(std::time::Duration::from_secs(10)).await?;

// Mark as in progress (extend ack deadline)
msg.in_progress().await?;

// Terminate (message won't be redelivered)
msg.term().await?;

Serialization

Intercom supports pluggable codecs:

  • MsgPackCodec (default): Efficient binary serialization via rmp-serde
  • JsonCodec (optional): Human-readable JSON via serde_json

All message types must implement Serialize and/or Deserialize:

use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
struct MyMessage {
    id: u64,
    data: Vec<u8>,
    tags: Vec<String>,
}

License

MIT

Commit count: 0

cargo fmt