| Crates.io | repo-stream |
| lib.rs | repo-stream |
| version | 0.4.0 |
| created_at | 2025-10-10 19:09:06.628294+00 |
| updated_at | 2026-01-15 20:06:10.159769+00 |
| description | Fast and robust atproto CAR file processing |
| homepage | |
| repository | https://tangled.org/@microcosm.blue/repo-stream |
| max_upload_size | |
| id | 1877424 |
| size | 5,661,675 |
A robust CAR file -> MST walker for atproto
use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder, Output};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// repo-stream takes any AsyncRead as input, like a tokio::fs::File
let reader = tokio::fs::File::open("repo.car").await?;
let reader = tokio::io::BufReader::new(reader);
// example repo workload is simply counting the total record bytes
let mut total_size = 0;
match DriverBuilder::new()
.with_mem_limit_mb(10)
.with_block_processor( // block processing: just extract the raw record size
|rec| rec.len().to_ne_bytes().to_vec())
.load_car(reader)
.await?
{
// if all blocks fit within memory
Driver::Memory(_commit, mut driver) => {
while let Some(chunk) = driver.next_chunk(256).await? {
for Output { rkey: _, cid: _, data } in chunk {
let size = usize::from_ne_bytes(data.try_into().unwrap());
total_size += size;
}
}
},
// if the CAR was too big for in-memory processing
Driver::Disk(paused) => {
// set up a disk store we can spill to
let store = DiskBuilder::new().open("some/path.db".into()).await?;
// do the spilling, get back a (similar) driver
let (_commit, mut driver) = paused.finish_loading(store).await?;
while let Some(chunk) = driver.next_chunk(256).await? {
for Output { rkey: _, cid: _, data } in chunk {
let size = usize::from_ne_bytes(data.try_into().unwrap());
total_size += size;
}
}
}
};
println!("sum of size of all records: {total_size}");
Ok(())
}
more recent todo
#sync handling)some ideas
Driver and move the thread stuff from the disk one to generic helper functions. (might create async footguns though)hashbrown also optional vs builtin hashmap?older stuff (to clean up):
current car processing times (records processed into their length usize, phil's dev machine):
1.3s350ms6.8ms160us5.1us690nsit's a little faster with mimalloc
use mimalloc::MiMalloc;
#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;
1.2s (-8%)300ms (-14%)6.0ms (-11%)150us (-7%)4.7us (-8%)670ns (-4%)processing CARs requires buffering blocks, so it can consume a lot of memory. repo-stream's in-memory driver has minimal memory overhead, but there are two ways to make it work with less mem (you can do either or both!)
this is a little slower but can greatly reduce the memory used. there's nothing special you need to do for this.
if you don't need to store the complete records, you can have repo-stream try to optimistically apply a processing function to the raw blocks as they are streamed in.
sketchy benchmark but hey. mimalloc is enabled, and the processing spills to disk. inline processing reduces entire records to 8 bytes (usize of the raw record block size):
5.0s (4.5x slowdown for disk)1.27s (4.1x slowdown)fortunately, most CARs in the ATmosphere are very small, so for eg. backfill purposes, the vast majority of inputs will not face this slowdown.
to avoid committing it to the repo, you have to pass it in through the env for now.
HUGE_CAR=~/Downloads/did_plc_redacted.car cargo bench -- huge-car
todo
newer ideas
fixing the interleaved mst walk/ block load actually does perform ok: just need the walker to tell the block loader which block we actually need next, so that the block loader can go ahead and load all blocks until that one without checking back with the walker. so i think we're streaming-order ready!
later ideas
just buffering all the blocks is 2.5x faster than interleaving optimistic walking
transform function is a little tricky because we can't know if a block is a record or a node until we actually walk the tree to it (after they're all buffered in memory anyway).
still might as well benchmark a test with optimistic block probing+transform on the way in
original ideas:
future work:
redb has an in-memory backend, so it would be possible to always use it for block caching. user can choose if they want to allow disk or just do memory, and then "spilling" from the cache to disk would be mostly free?
This work is dual-licensed under MIT and Apache 2.0. You can choose between one of them if you use this work.
SPDX-License-Identifier: MIT OR Apache-2.0