| Crates.io | tapped |
| lib.rs | tapped |
| version | 0.3.0 |
| created_at | 2026-01-16 10:56:10.421319+00 |
| updated_at | 2026-01-26 01:34:23.728382+00 |
| description | Rust wrapper for the tap ATProto utility |
| homepage | |
| repository | https://tangled.org/octet-stream.net/tapped |
| max_upload_size | |
| id | 2048433 |
| size | 117,777 |
A Rust wrapper library for the tap ATProto sync utility.
tapped provides an idiomatic async Rust interface for spawning and communicating with a tap subprocess, making it easy to build applications that sync data from the ATProto network.
tap subprocesses with graceful shutdownAdd to your Cargo.toml:
[dependencies]
tapped = "0.3"
You'll also need the tap binary. Build it from the indigo repository:
cd cmd/tap && go build
tapped has been most recently tested against:
tap version v0.0.0-20260120225912-12d69fa4d209-rev-12d69fa
use tapped::{TapProcess, TapConfig, Event};
#[tokio::main]
async fn main() -> tapped::Result<()> {
let config = TapConfig::builder()
.database_url("sqlite://tap.db")
.collection_filter("app.bsky.feed.post")
.build();
// Spawn tap - looks in current directory first, then PATH
let process = TapProcess::spawn_default(config).await?;
let client = process.client()?;
// Subscribe to events
let mut channel = client.channel().await?;
while let Ok(received) = channel.recv().await {
match &received.event {
Event::Record(record) => {
println!("[{:?}] {}/{}",
record.action,
record.collection,
record.rkey
);
}
Event::Identity(identity) => {
println!("Identity: {} -> {}", identity.did, identity.handle);
}
}
// Event is auto-acknowledged when `received` is dropped
}
Ok(())
}
If you have a tap instance already running:
use tapped::TapClient;
let client = TapClient::new("http://localhost:2480")?;
client.health().await?;
use tapped::{TapProcess, TapConfig};
let config = TapConfig::builder()
.database_url("sqlite://app.db")
.full_network(false)
.build();
// If you need a custom binary path, use spawn() instead:
// let process = TapProcess::spawn("/path/to/tap", config).await?;
let process = TapProcess::spawn_default(config).await?;
let client = process.client()?;
// Use the client
client.health().await?;
let count = client.repo_count().await?;
println!("Tracking {} repos", count);
use tapped::{TapConfig, LogLevel};
use std::time::Duration;
let config = TapConfig::builder()
// Database
.database_url("sqlite://tap.db")
.max_db_conns(10)
// Network
.bind("127.0.0.1:2480")
.relay_url("wss://bsky.network".parse().unwrap())
.plc_url("https://plc.directory".parse().unwrap())
// Filtering
.signal_collection("app.bsky.feed.post")
.collection_filter("app.bsky.feed.post")
.collection_filter("app.bsky.feed.like")
.full_network(false)
// Performance
.firehose_parallelism(10)
.resync_parallelism(5)
.outbox_parallelism(10)
.outbox_capacity(10000)
// Timeouts
.repo_fetch_timeout(Duration::from_secs(30))
.startup_timeout(Duration::from_secs(60))
.shutdown_timeout(Duration::from_secs(10))
// Logging
.log_level(LogLevel::Info)
.build();
Events are automatically acknowledged when dropped:
use tapped::{Event, RecordAction};
let mut channel = client.channel().await?;
while let Ok(received) = channel.recv().await {
match &received.event {
Event::Record(record) => {
match record.action {
RecordAction::Create => {
// Access the raw JSON as a string
if let Some(json) = record.record_as_str() {
println!("Raw JSON: {}", json);
}
// Or deserialize to a specific type
// let post: MyPostType = record.deserialize_as()?;
}
RecordAction::Update => { /* ... */ }
RecordAction::Delete => { /* ... */ }
_ => {}
}
}
Event::Identity(identity) => {
println!("{} is now @{}", identity.did, identity.handle);
}
}
// Ack sent automatically here when `received` goes out of scope
}
// Add repos to track
client.add_repos(&["did:plc:abc123", "did:plc:def456"]).await?;
// Remove repos
client.remove_repos(&["did:plc:abc123"]).await?;
// Get info about a specific repo
let info = client.repo_info("did:plc:def456").await?;
println!("State: {:?}, Records: {}", info.state, info.records);
// Resolve a DID to its document
let doc = client.resolve_did("did:plc:def456").await?;
println!("Handles: {:?}", doc.also_known_as);
let repos = client.repo_count().await?;
let records = client.record_count().await?;
let outbox = client.outbox_buffer().await?;
let resync = client.resync_buffer().await?;
let cursors = client.cursors().await?;
println!("Tracking {} repos with {} records", repos, records);
println!("Outbox buffer: {}, Resync buffer: {}", outbox, resync);
println!("Firehose cursor: {:?}", cursors.firehose);
The repository includes a complete example demonstrating how to sync and validate ATProto records using tapped together with the jacquard crates.
The jacquard ecosystem provides runtime validation of records against their lexicon constraints, and the ability to generate Rust structs from lexicon JSON files.
tapped/
├── tapped/ # The main tapped library
├── lexicons-example/ # Generated types from lexicon schemas
│ ├── lexicons/ # Source lexicon JSON files
│ │ ├── site.standard.publication.json
│ │ ├── site.standard.document.json
│ │ └── ...
│ └── src/ # Generated Rust code
└── standard-site-sync/ # Example binary using both packages
These files were generated like so:
# Install the code generator
cargo install jacquard-lexgen
jacquard-codegen -i lexicons-example/lexicons -o lexicons-example/src
This produces strongly-typed structs with built-in validation. For example, the site.standard.publication lexicon becomes:
use lexicons_example::site_standard::publication::Publication;
// Deserialize from JSON
let publication: Publication = serde_json::from_str(json)?;
// Validate against lexicon constraints (max length, grapheme limits, etc.)
publication.validate()?;
// Access typed fields
println!("Name: {}", publication.name.as_str());
println!("URL: {}", publication.url.as_str());
For more detail see process_record_event in main.rs.
MIT