| Crates.io | pulse-io |
| lib.rs | pulse-io |
| version | 0.1.2 |
| created_at | 2025-10-19 07:13:10.652031+00 |
| updated_at | 2025-10-29 07:52:22.504173+00 |
| description | Input/output connectors for Pulse — integrates with external systems such as Kafka, Arrow, and Parquet. |
| homepage | |
| repository | https://github.com/brbtavares/pulse |
| max_upload_size | |
| id | 1890152 |
| size | 98,319 |
| Crate | crates.io | docs.rs |
|---|---|---|
| pulse-core | ||
| pulse-state | ||
| pulse-ops | ||
| pulse-io |
Pulse is a tiny, modular, event-time streaming framework (Flink/Beam-like) written in Rust. It focuses on clarity, testability, and a local-first workflow. It supports watermarks, windowing, pluggable state, Prometheus metrics, and a single-binary CLI that runs pipelines from a TOML file.
Goals:
pulse-core: core types/traits, Executor, Record, event-time Watermark, timers, metrics, and config loaderpulse-ops: operators (Map, Filter, KeyBy, Aggregate, WindowedAggregate), event-time window helperspulse-state: state backends
InMemoryState (default)RocksDbState (feature rocksdb)pulse-io: sources/sinks
FileSource (JSONL/CSV) with EOF watermarkFileSink (JSONL)ParquetSink (feature parquet) with date partitioning and rotationKafkaSource/KafkaSink (feature kafka) with resume from persisted offsetspulse-examples: runnable examples and sample datapulse-bin: CLI (pulse) and /metrics HTTP serverRecord { event_time: chrono::DateTime<Utc>, value: serde_json::Value }Watermark(EventTime) propagated through the pipelineon_watermark for operators and informs sinkspulse_watermark_lag_ms) = now - watermarkSemantics overview:
watermark >= window.end.WindowedAggregate supports Tumbling/Sliding/Sessioncount, sum, avg, distinctFileSource flushes windowsKvState trait: get/put/delete/iter_prefix/snapshot/restorepulse-state::InMemoryState implements full API (snapshots kept in-memory)pulse-state::RocksDbState (feature rocksdb): prefix iteration and checkpoint directory creation via RocksDB CheckpointWindowOperator can persist per-window state via a backend and restore after restart (optional hook)librdkafka.FileSource (JSONL/CSV): parses event_time from RFC3339 or epoch ms; final watermark at EOFFileSink: writes JSON lines to stdout/fileParquetSink (feature parquet):
event_time: timestamp(ms), payload: utf8 (full JSON)out_dir/dt=YYYY-MM-DD/part-*.parquet (configurable format via partition_format)out_dir/<field>=<value>/part-*.parquet (set partition_field)max_bytes)snappy (default), zstd, or noneKafkaSource/KafkaSink (feature kafka): integration with rdkafka, with resuming offsets from persisted statetracingpulse-core::metrics):
pulse_operator_records_total{operator,stage=receive|emit}pulse_watermark_lag_ms (gauge)pulse_bytes_written_total{sink}pulse_state_size{operator}pulse_operator_process_latency_ms (histogram)pulse_sink_process_latency_ms (histogram)pulse_queue_depth (gauge)pulse_dropped_records_total{reason} (counter)/metrics HTTP endpoint served by pulse-bin (axum 0.7)Binary crate: pulse-bin. Subcommands:
pulse serve --port 9898
/metrics in Prometheus format.pulse run --config pipeline.toml [--http-port 9898]
--http-port is provided, starts /metrics on that port.PULSE_CHANNEL_BOUND (e.g., PULSE_CHANNEL_BOUND=10000) to drop new records when the in-flight depth reaches the bound. Watermarks are never dropped.pulse-core::config)[source]
kind = "file"
path = "pulse-examples/examples/sliding_avg.jsonl"
time_field = "event_time"
[time]
allowed_lateness = "10s"
[window]
# supported: tumbling|sliding|session
type = "sliding"
size = "60s"
slide = "15s" # for sliding; for session, use: gap = "30s"
[ops]
# aggregation over a key; supported: count (default), sum, avg, distinct
count_by = "word"
# agg = "count" # default
# agg_field = "value" # obrigatório para sum|avg|distinct
[sink]
kind = "parquet"
out_dir = "outputs"
## Optional Parquet settings
# compression = "snappy" # one of: snappy (default) | zstd | none
# max_bytes = 104857600 # rotate file when ~bytes reached (e.g. 100MB)
# partition_field = "user_id" # partition by a payload field value
# partition_format = "%Y-%m" # date partition format when partitioning by event_time
Validation rules:
source.kind must be file (or kafka)sink.kind must be parquet/file (or kafka)ops.count_by must be present# Build
cargo build
# Run the pipeline and export metrics
cargo run -p pulse-bin -- run --config examples/pipeline.toml --http-port 9898
# Scrape metrics
curl http://127.0.0.1:9898/metrics
Expected output:
outputs/dt=YYYY-MM-DD/part-*.parquet.CLI supports JSONL and CSV via source.format.
[source]
kind = "file"
format = "csv" # jsonl | csv
path = "input.csv"
time_field = "event_time" # epoch ms in CSV
[time]
allowed_lateness = "10s"
[window]
type = "tumbling"
size = "60s"
[ops]
count_by = "word"
[sink]
kind = "parquet"
out_dir = "outputs"
use pulse_core::Executor;
use pulse_io::{FileSource, ParquetSink, ParquetSinkConfig, PartitionSpec};
use pulse_ops::{KeyBy, WindowedAggregate};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// CSV source: header must include event_time (epoch ms)
let src = FileSource { path: "input.csv".into(), format: pulse_io::FileFormat::Csv, event_time_field: "event_time".into(), text_field: None };
let mut exec = Executor::new();
exec.source(src)
.operator(KeyBy::new("word"))
.operator(WindowedAggregate::tumbling_count("key", 60_000))
.sink(ParquetSink::new(ParquetSinkConfig {
out_dir: "outputs".into(),
partition_by: PartitionSpec::ByDate { field: "event_time".into(), fmt: "%Y-%m-%d".into() },
max_rows: 1_000_000,
max_age: std::time::Duration::from_secs(300),
}));
exec.run().await?;
Ok(())
}
[source]
kind = "file"
path = "pulse-examples/examples/sliding_avg.jsonl"
time_field = "event_time"
[time]
allowed_lateness = "10s"
[window]
type = "tumbling"
size = "60s"
[ops]
count_by = "word"
agg = "avg" # or: sum | distinct | count
agg_field = "score" # required for avg/sum/distinct
[sink]
kind = "parquet"
out_dir = "outputs"
pulse-examples/examples/sliding_avg.jsonl (generated earlier) works with the file sourcepulse-examples using the operators from pulse-opsEnable per crate features:
# Kafka
cargo build -p pulse-io --features kafka
# Arrow/Parquet
cargo build -p pulse-io --features parquet
Windows + Kafka notes: enabling kafka builds the native librdkafka by default and requires CMake and MSVC Build Tools.
cmake-build featureFrom the workspace root, you can run tests per crate. Some features are crate-specific:
# Operators crate
cargo test -p pulse-ops -- --nocapture
# I/O crate with Parquet
cargo test -p pulse-io --features parquet -- --nocapture
# CLI crate (includes end-to-end goldens)
cargo test -p pulse-bin -- --nocapture
# All workspace tests (no extra features)
cargo test -- --nocapture
Notes:
pulse-io crate defines the parquet feature. Do not pass --features parquet to other crates.kafka feature is also only on pulse-io and requires native dependencies on Windows.Pulse takes a pragmatic, local-first approach for single-node pipelines. A comparison at a glance:
| System | Install/runtime footprint | Local-first UX | Event-time/windowing | SQL | Cluster | Primary niche |
|---|---|---|---|---|---|---|
| Flink | Heavy (cluster/services) | No (dev spins cluster) | Yes (rich) | Yes | Yes | Large-scale distributed streaming |
| Arroyo | Moderate (service + workers) | Partial | Yes | Yes | Yes | Cloud-native streaming w/ SQL |
| Fluvio | Broker + clients | Partial | Limited | No | Yes | Distributed streaming data plane |
| Materialize | Service + storage | Partial | Incremental views | Yes | Yes | Streaming SQL materialized views |
| Pulse | Single binary | Yes | Yes (MVP focused) | No | No | Local-first pipelines & testing |
If you need distributed scale, multi-tenant scheduling, or a SQL-first experience, those systems are a better fit. Pulse aims to be the simplest way to iterate on event-time pipelines locally and ship small, self-contained jobs.