sturgeon

Crates.iosturgeon
lib.rssturgeon
version0.2.0
created_at2025-10-13 23:16:09.785395+00
updated_at2025-10-18 19:31:03.828344+00
descriptionRecord async streams with timing, replay deterministically
homepage
repositoryhttps://github.com/synoet/sturgeon
max_upload_size
id1881298
size39,714
teo (synoet)

documentation

README

sturgeon

Record async streams with timing information and replay them deterministically.

What it does

Wraps any Stream, records each item with a timestamp, then lets you replay the recording with the same timing characteristics. Think of it as a DVR for async streams.

use sturgeon::record;
use futures::StreamExt;

let mut recorded = record(your_stream);
while let Some(item) = recorded.next().await {
    // Stream passes through normally
}

// Later: replay with original timing
let replay = recorded.recording().replay();
tokio::pin!(replay);
while let Some(item) = replay.next().await {
    // Items arrive with same delays as original
}

Why use it

Deterministic tests for timing-sensitive code. If you're testing rate limiters, batchers, debouncing, backpressure, or any logic that depends on when events arrive - not just what arrives - you need reproducible timing. Sturgeon records real timing patterns once, then replays them identically in tests.

Without sturgeon: sprinkle tokio::time::sleep throughout tests, hope the timing works out, deal with flaky CI.

With sturgeon: record production traffic or realistic patterns once, replay perfectly every time.

Features

  • Pass-through recording - items flow through unchanged, timing captured alongside
  • Multiple replay modes - original timing, speed-adjusted (2x, 0.5x), or instant
  • Persistence - save/load recordings with serde
  • Bounded recording - record_with_capacity(stream, n) keeps only last n items
  • Partial replay - replay from sequence number, time range, or slice

Writing tests

Record once, replay many times

#[tokio::test]
async fn create_recording() {
    let stream = /* your stream */;
    let mut recorded = record(stream);
    while recorded.next().await.is_some() {}
    
    recorded.recording().save("traffic.bin").await.unwrap();
}

// Test with recorded timing
#[tokio::test]
async fn handles_burst_traffic() {
    let recording: Recording<Event> = Recording::load("traffic.bin").await.unwrap();
    let replay = recording.replay();
    tokio::pin!(replay);
    
    let mut handler = YourHandler::new();
    while let Some(event) = replay.next().await {
        handler.process(event).await;
    }
    
    assert!(handler.within_limits());
}

Fast functional tests

#[tokio::test]
async fn processes_all_events() {
    let recording: Recording<Event> = Recording::load("traffic.bin").await.unwrap();
    
    // Instant replay - no timing delays
    let results: Vec<_> = recording
        .replay_immediate()
        .then(|event| process(event))
        .collect()
        .await;
    
    assert_eq!(results.len(), expected);
}

Speed control

// 2x speed for faster tests
let fast = recording.replay_with_speed(Speed::new(2.0)?);

// Slow motion for debugging
let slow = recording.replay_with_speed(Speed::new(0.5)?);
Commit count: 0

cargo fmt