SeaStreamer
🌊 A real-time stream processing toolkit for Rust
[![crate](https://img.shields.io/crates/v/sea-streamer.svg)](https://crates.io/crates/sea-streamer)
[![docs](https://docs.rs/sea-streamer/badge.svg)](https://docs.rs/sea-streamer)
[![build status](https://github.com/SeaQL/sea-streamer/actions/workflows/rust.yml/badge.svg)](https://github.com/SeaQL/sea-streamer/actions/workflows/rust.yml)
SeaStreamer is a toolkit to help you build real-time stream processors in Rust.
## Features
1. Async
SeaStreamer provides an async API, and it supports both `tokio` and `async-std`. In tandem with other async Rust libraries,
you can build highly concurrent stream processors.
2. Generic
We provide integration for Redis & Kafka / Redpanda behind a generic trait interface, so your program can be backend-agnostic.
3. Testable
SeaStreamer also provides a set of tools to work with streams via unix pipes, so it is testable without setting up a cluster,
and extremely handy when working locally.
4. Micro-service Oriented
Let's build real-time (multi-threaded, no GC), self-contained (aka easy to deploy), low-resource-usage, long-running stream processors in Rust!
## Quick Start
Add the following to your `Cargo.toml`
```toml
sea-streamer = { version = "0", features = ["kafka", "redis", "stdio", "socket", "runtime-tokio"] }
```
Here is a basic [stream consumer](https://github.com/SeaQL/sea-streamer/tree/main/examples/src/bin/consumer.rs):
```rust
#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();
let Args { stream } = Args::parse();
let streamer = SeaStreamer::connect(stream.streamer(), Default::default()).await?;
let mut options = SeaConsumerOptions::new(ConsumerMode::RealTime);
options.set_auto_stream_reset(SeaStreamReset::Earliest);
let consumer: SeaConsumer = streamer
.create_consumer(stream.stream_keys(), options)
.await?;
loop {
let mess: SeaMessage = consumer.next().await?;
println!("[{}] {}", mess.timestamp(), mess.message().as_str()?);
}
}
```
Here is a basic [stream producer](https://github.com/SeaQL/sea-streamer/tree/main/examples/src/bin/producer.rs):
```rust
#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();
let Args { stream } = Args::parse();
let streamer = SeaStreamer::connect(stream.streamer(), Default::default()).await?;
let producer: SeaProducer = streamer
.create_producer(stream.stream_key()?, Default::default())
.await?;
for tick in 0..100 {
let message = format!(r#""tick {tick}""#);
eprintln!("{message}");
producer.send(message)?;
tokio::time::sleep(Duration::from_secs(1)).await;
}
producer.end().await?; // flush
Ok(())
}
```
Here is a [basic stream processor](https://github.com/SeaQL/sea-streamer/tree/main/examples/src/bin/processor.rs).
See also other [advanced stream processors](https://github.com/SeaQL/sea-streamer/tree/main/examples/).
```rust
#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();
let Args { input, output } = Args::parse();
let streamer = SeaStreamer::connect(input.streamer(), Default::default()).await?;
let options = SeaConsumerOptions::new(ConsumerMode::RealTime);
let consumer: SeaConsumer = streamer
.create_consumer(input.stream_keys(), options)
.await?;
let streamer = SeaStreamer::connect(output.streamer(), Default::default()).await?;
let producer: SeaProducer = streamer
.create_producer(output.stream_key()?, Default::default())
.await?;
loop {
let message: SeaMessage = consumer.next().await?;
let message = process(message).await?;
eprintln!("{message}");
producer.send(message)?; // send is non-blocking
}
}
```
Now, let's put them into action.
With Redis / Kafka:
```shell
STREAMER_URI="redis://localhost:6379" # or
STREAMER_URI="kafka://localhost:9092"
# Produce some input
cargo run --bin producer -- --stream $STREAMER_URI/hello1 &
# Start the processor, producing some output
cargo run --bin processor -- --input $STREAMER_URI/hello1 --output $STREAMER_URI/hello2 &
# Replay the output
cargo run --bin consumer -- --stream $STREAMER_URI/hello2
# Remember to stop the processes
kill %1 %2
```
With Stdio:
```shell
# Pipe the producer to the processor
cargo run --bin producer -- --stream stdio:///hello1 | \
cargo run --bin processor -- --input stdio:///hello1 --output stdio:///hello2
```
## Architecture
The architecture of [`sea-streamer`](https://docs.rs/sea-streamer) is constructed by a number of sub-crates:
+ [`sea-streamer-types`](https://docs.rs/sea-streamer-types)
+ [`sea-streamer-socket`](https://docs.rs/sea-streamer-socket)
+ [`sea-streamer-kafka`](https://docs.rs/sea-streamer-kafka)
+ [`sea-streamer-redis`](https://docs.rs/sea-streamer-redis)
+ [`sea-streamer-stdio`](https://docs.rs/sea-streamer-stdio)
+ [`sea-streamer-file`](https://docs.rs/sea-streamer-file)
+ [`sea-streamer-runtime`](https://docs.rs/sea-streamer-runtime)
All crates share the same major version. So `0.1` of `sea-streamer` depends on `0.1` of `sea-streamer-socket`.
### `sea-streamer-types`: Traits & Types
This crate defines all the traits and types for the SeaStreamer API, but does not provide any implementation.
### `sea-streamer-socket`: Backend-agnostic Socket API
Akin to how SeaORM allows you to build applications for different databases, SeaStreamer allows you to build
stream processors for different streaming servers.
While the `sea-streamer-types` crate provides a nice trait-based abstraction, this crates provides a concrete-type API,
so that your program can stream from/to any SeaStreamer backend selected by the user *on runtime*.
This allows you to do neat things, like generating data locally and then stream them to Redis / Kafka. Or in the other
way, sink data from server to work on them locally. All _without recompiling_ the stream processor.
If you only ever work with one backend, feel free to depend on `sea-streamer-redis` / `sea-streamer-kafka` directly.
A small number of cli programs are provided for demonstration. Let's set them up first:
```shell
# The `clock` program generate messages in the form of `{ "tick": N }`
alias clock='cargo run --package sea-streamer-stdio --features=executables --bin clock'
# The `relay` program redirect messages from `input` to `output`
alias relay='cargo run --package sea-streamer-socket --features=executables,backend-kafka,backend-redis --bin relay'
```
Here is how to stream from Stdio ➡️ Redis / Kafka. We generate messages using `clock` and then pipe it to `relay`,
which then streams to Redis / Kafka:
```shell
# Stdio -> Redis
clock -- --stream clock --interval 1s | \
relay -- --input stdio:///clock --output redis://localhost:6379/clock
# Stdio -> Kafka
clock -- --stream clock --interval 1s | \
relay -- --input stdio:///clock --output kafka://localhost:9092/clock
```
Here is how to stream between Redis ↔️ Kafka:
```shell
# Redis -> Kafka
relay -- --input redis://localhost:6379/clock --output kafka://localhost:9092/clock
# Kafka -> Redis
relay -- --input kafka://localhost:9092/clock --output redis://localhost:6379/clock
```
Here is how to *replay* the stream from Kafka / Redis:
```shell
relay -- --input redis://localhost:6379/clock --output stdio:///clock --offset start
relay -- --input kafka://localhost:9092/clock --output stdio:///clock --offset start
```
### `sea-streamer-kafka`: Kafka / Redpanda Backend
This is the Kafka / Redpanda backend implementation for SeaStreamer.
This crate provides a comprehensive type system that makes working with Kafka easier and safer.
First of all, all API (many are sync) are properly wrapped as async. Methods are also marked `&mut` to eliminate possible race conditions.
`KafkaConsumerOptions` has typed parameters.
`KafkaConsumer` allows you to `seek` to point in time, `rewind` to particular offset, and `commit` message read.
`KafkaProducer` allows you to `await` a send `Receipt` or discard it if you are uninterested. You can also flush the Producer.
`KafkaStreamer` allows you to flush all producers on `disconnect`.
See [tests](https://github.com/SeaQL/sea-streamer/blob/main/sea-streamer-kafka/tests/consumer.rs) for an illustration of the stream semantics.
This crate depends on [`rdkafka`](https://docs.rs/rdkafka),
which in turn depends on [librdkafka-sys](https://docs.rs/librdkafka-sys), which itself is a wrapper of
[librdkafka](https://docs.confluent.io/platform/current/clients/librdkafka/html/index.html).
Configuration Reference: