atproto-jetstream

Crates.ioatproto-jetstream
lib.rsatproto-jetstream
version0.11.2
created_at2025-06-08 20:15:37.218766+00
updated_at2025-08-20 18:54:27.609704+00
descriptionAT Protocol Jetstream event consumer library with WebSocket streaming and compression support
homepagehttps://tangled.sh/@smokesignal.events/atproto-identity-rs
repositoryhttps://tangled.sh/@smokesignal.events/atproto-identity-rs
max_upload_size
id1705230
size110,348
Nick Gerakines (ngerakines)

documentation

https://docs.rs/atproto-jetstream

README

atproto-jetstream

WebSocket consumer for AT Protocol Jetstream events.

Overview

Real-time event streaming with Zstandard compression, automatic reconnection, and configurable event filtering for AT Protocol repository changes.

Features

  • WebSocket streaming: High-performance event consumption with automatic reconnection handling
  • Event filtering: Configurable filtering by collections and DIDs for targeted event processing
  • Zstandard compression: Built-in support for compressed event streams with custom dictionaries
  • Event handlers: Flexible handler system supporting multiple custom event processors
  • Background processing: Asynchronous event processing with graceful shutdown via cancellation tokens
  • Structured errors: Comprehensive error handling with detailed error codes

CLI Tools

The following command-line tool is available when built with the clap feature:

  • atproto-jetstream-consumer: Real-time event stream consumer with filtering, compression, and background processing support

Usage

Basic Event Consumer

use atproto_jetstream::{Consumer, ConsumerTaskConfig, EventHandler, JetstreamEvent, CancellationToken};
use async_trait::async_trait;

struct MyEventHandler;

#[async_trait]
impl EventHandler for MyEventHandler {
    async fn handle_event(&self, event: JetstreamEvent) -> anyhow::Result<()> {
        println!("Received event: {:?}", event);
        Ok(())
    }

    fn handler_id(&self) -> String {
        "my-handler".to_string()
    }
}

let config = ConsumerTaskConfig {
    user_agent: "my-app/1.0".to_string(),
    compression: false,
    zstd_dictionary_location: String::new(),
    jetstream_hostname: "jetstream1.us-east.bsky.network".to_string(),
    collections: vec!["app.bsky.feed.post".to_string()],
};

let consumer = Consumer::new(config);
consumer.register_handler(std::sync::Arc::new(MyEventHandler)).await?;

let cancellation_token = CancellationToken::new();
consumer.run_background(cancellation_token).await?;

With Compression

let config = ConsumerTaskConfig {
    compression: true,
    zstd_dictionary_location: "./data/zstd_dictionary".to_string(),
    // ... other config
};

// Download dictionary first:
// curl -o data/zstd_dictionary https://github.com/bluesky-social/jetstream/raw/refs/heads/main/pkg/models/zstd_dictionary

Command Line Examples

# Basic streaming
cargo run --bin atproto-jetstream-consumer \
    --hostname jetstream1.us-east.bsky.network \
    --collections app.bsky.feed.post \
    --user-agent "my-consumer/1.0"

# With compression
cargo run --bin atproto-jetstream-consumer \
    --hostname jetstream1.us-east.bsky.network \
    --collections app.bsky.feed.post \
    --compression \
    --zstd-dictionary ./data/zstd_dictionary

# With filtering
cargo run --bin atproto-jetstream-consumer \
    --hostname jetstream1.us-east.bsky.network \
    --collections "app.bsky.feed.post,app.bsky.actor.profile" \
    --dids "did:plc:user123,did:plc:user456"

License

MIT License

Commit count: 0

cargo fmt