tapped

Crates.iotapped
lib.rstapped
version0.3.0
created_at2026-01-16 10:56:10.421319+00
updated_at2026-01-26 01:34:23.728382+00
descriptionRust wrapper for the tap ATProto utility
homepage
repositoryhttps://tangled.org/octet-stream.net/tapped
max_upload_size
id2048433
size117,777
Dev (github:0xmozak:dev)

documentation

README

tapped

Crates.io Version docs.rs

A Rust wrapper library for the tap ATProto sync utility.

tapped provides an idiomatic async Rust interface for spawning and communicating with a tap subprocess, making it easy to build applications that sync data from the ATProto network.

Features

  • Spawn and manage tap subprocesses with graceful shutdown
  • Strongly-typed configuration for all tap envvars
  • Strongly-typed async Rust functions covering all of tap's HTTP API endpoints
  • WebSocket-based event channel with automatic acknowledgment

Installation

Add to your Cargo.toml:

[dependencies]
tapped = "0.3"

You'll also need the tap binary. Build it from the indigo repository:

cd cmd/tap && go build

tapped has been most recently tested against:

tap version v0.0.0-20260120225912-12d69fa4d209-rev-12d69fa

Quick Start

use tapped::{TapProcess, TapConfig, Event};

#[tokio::main]
async fn main() -> tapped::Result<()> {
    let config = TapConfig::builder()
        .database_url("sqlite://tap.db")
        .collection_filter("app.bsky.feed.post")
        .build();

    // Spawn tap - looks in current directory first, then PATH
    let process = TapProcess::spawn_default(config).await?;
    let client = process.client()?;

    // Subscribe to events
    let mut channel = client.channel().await?;
    
    while let Ok(received) = channel.recv().await {
        match &received.event {
            Event::Record(record) => {
                println!("[{:?}] {}/{}",
                    record.action,
                    record.collection,
                    record.rkey
                );
            }
            Event::Identity(identity) => {
                println!("Identity: {} -> {}", identity.did, identity.handle);
            }
        }
        // Event is auto-acknowledged when `received` is dropped
    }

    Ok(())
}

Usage Patterns

Connect to Existing Instance

If you have a tap instance already running:

use tapped::TapClient;

let client = TapClient::new("http://localhost:2480")?;
client.health().await?;

Spawn an Instance

use tapped::{TapProcess, TapConfig};

let config = TapConfig::builder()
    .database_url("sqlite://app.db")
    .full_network(false)
    .build();

// If you need a custom binary path, use spawn() instead:
// let process = TapProcess::spawn("/path/to/tap", config).await?;
let process = TapProcess::spawn_default(config).await?;
let client = process.client()?;

// Use the client
client.health().await?;
let count = client.repo_count().await?;
println!("Tracking {} repos", count);

Configuration Options

use tapped::{TapConfig, LogLevel};
use std::time::Duration;

let config = TapConfig::builder()
    // Database
    .database_url("sqlite://tap.db")
    .max_db_conns(10)
    
    // Network
    .bind("127.0.0.1:2480")
    .relay_url("wss://bsky.network".parse().unwrap())
    .plc_url("https://plc.directory".parse().unwrap())
    
    // Filtering
    .signal_collection("app.bsky.feed.post")
    .collection_filter("app.bsky.feed.post")
    .collection_filter("app.bsky.feed.like")
    .full_network(false)
    
    // Performance
    .firehose_parallelism(10)
    .resync_parallelism(5)
    .outbox_parallelism(10)
    .outbox_capacity(10000)
    
    // Timeouts
    .repo_fetch_timeout(Duration::from_secs(30))
    .startup_timeout(Duration::from_secs(60))
    .shutdown_timeout(Duration::from_secs(10))
    
    // Logging
    .log_level(LogLevel::Info)
    
    .build();

Working with Events

Events are automatically acknowledged when dropped:

use tapped::{Event, RecordAction};

let mut channel = client.channel().await?;

while let Ok(received) = channel.recv().await {
    match &received.event {
        Event::Record(record) => {
            match record.action {
                RecordAction::Create => {
                    // Access the raw JSON as a string
                    if let Some(json) = record.record_as_str() {
                        println!("Raw JSON: {}", json);
                    }

                    // Or deserialize to a specific type
                    // let post: MyPostType = record.deserialize_as()?;
                }
                RecordAction::Update => { /* ... */ }
                RecordAction::Delete => { /* ... */ }
                _ => {}
            }
        }
        Event::Identity(identity) => {
            println!("{} is now @{}", identity.did, identity.handle);
        }
    }
    // Ack sent automatically here when `received` goes out of scope
}

Managing Repositories

// Add repos to track
client.add_repos(&["did:plc:abc123", "did:plc:def456"]).await?;

// Remove repos
client.remove_repos(&["did:plc:abc123"]).await?;

// Get info about a specific repo
let info = client.repo_info("did:plc:def456").await?;
println!("State: {:?}, Records: {}", info.state, info.records);

// Resolve a DID to its document
let doc = client.resolve_did("did:plc:def456").await?;
println!("Handles: {:?}", doc.also_known_as);

Checking Stats

let repos = client.repo_count().await?;
let records = client.record_count().await?;
let outbox = client.outbox_buffer().await?;
let resync = client.resync_buffer().await?;
let cursors = client.cursors().await?;

println!("Tracking {} repos with {} records", repos, records);
println!("Outbox buffer: {}, Resync buffer: {}", outbox, resync);
println!("Firehose cursor: {:?}", cursors.firehose);

Example: Syncing standard.site Records with Schema Generation and Validation

The repository includes a complete example demonstrating how to sync and validate ATProto records using tapped together with the jacquard crates.

The jacquard ecosystem provides runtime validation of records against their lexicon constraints, and the ability to generate Rust structs from lexicon JSON files.

tapped/
├── tapped/                 # The main tapped library
├── lexicons-example/       # Generated types from lexicon schemas
│   ├── lexicons/           # Source lexicon JSON files
│   │   ├── site.standard.publication.json
│   │   ├── site.standard.document.json
│   │   └── ...
│   └── src/                # Generated Rust code
└── standard-site-sync/     # Example binary using both packages

These files were generated like so:

# Install the code generator
cargo install jacquard-lexgen

jacquard-codegen -i lexicons-example/lexicons -o lexicons-example/src

This produces strongly-typed structs with built-in validation. For example, the site.standard.publication lexicon becomes:

use lexicons_example::site_standard::publication::Publication;

// Deserialize from JSON
let publication: Publication = serde_json::from_str(json)?;

// Validate against lexicon constraints (max length, grapheme limits, etc.)
publication.validate()?;

// Access typed fields
println!("Name: {}", publication.name.as_str());
println!("URL: {}", publication.url.as_str());

For more detail see process_record_event in main.rs.

License

MIT

Commit count: 0

cargo fmt