bluesky-firehose-stream

Crates.iobluesky-firehose-stream
lib.rsbluesky-firehose-stream
version0.4.1
created_at2025-01-13 08:41:38.887329+00
updated_at2025-09-08 13:12:25.675286+00
descriptionDecode bluesky firehose messages
homepage
repository
max_upload_size
id1514411
size106,396
Philippe GASSMANN (zenria)

documentation

README

bluesky-firehose-stream

Stream and parse bluesky firehose messages.

This crate is heavily inspired from skystreamer. Some very small amount of code has been partially copied and adapted from the original project (types.rs and subscription.rs).

Contrary to skystreamer, firehose message frames are fully decoded into atrium types, without further conversion.

Messages with unknown kind are also decoded as an Ipld enum allowing this crate to handle even the more exotic events.

I'm currently running bluesky-prometheus-exporter for a month or so without any crash and only a handful of errors per days (invalid frames).

#[tokio::main]
async fn main() {
    loop {
        let mut subscription = match RepoSubscription::new("bsky.network").await {
            Ok(sub) => sub,
            Err(e) => {
                error!("Connecting to the bluesky firehose, {e}");
                tokio::time::sleep(Duration::from_secs(1)).await;
                continue;
            }
        };
        info!("Connected to firehose, let's stream");
        let timeout_duration = tokio::time::Duration::from_secs(30);
        while let Ok(Some(frame)) =
            tokio::time::timeout(timeout_duration, subscription.next()).await
        {
            let frame = match frame {
                Ok(f) => f,
                Err(e) => {
                    warn!("Unable to decode frame, skipping it! {e}");
                    continue;
                }
            };
            if let Err(e) = handle_frame(frame) {
                error!("Unable to handle frame: {e}");
            }
        }
        error!("Timeout occurred, reconnecting...");
    }
}
fn handle_frame(frame: Frame) -> Result<(), bluesky_firehose_stream::Error> {
    let message = FirehoseMessage::try_from(frame)?;
    // do something with the decoded message here 🚀
}

Run examples

cargo run --example bluesky-prometheus-exporter --features examples

TLS backend

By default, this crate depends on native-tls for handling TLS when connecting to the fireshose. To switch to rustls backend, use rustls-tls-native-roots or rustls-tls-webpki-roots features.

eg.

bluesky-firehose-stream = { version="*", features = [
    "websocket",
    "rustls-tls-native-roots",
], default-features = false }

License

Licensed under MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT)

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, shall be MIT licensed as above, without any additional terms or conditions.

Commit count: 0

cargo fmt