futures-batch

Crates.iofutures-batch
lib.rsfutures-batch
version0.7.0
created_at2019-12-06 16:14:47.889829+00
updated_at2025-07-25 21:13:08.070082+00
descriptionAn adaptor that chunks up elements and flushes them after a timeout or when the buffer is full. (Formerly known as tokio-batch.)
homepage
repositoryhttps://github.com/mre/futures-batch
max_upload_size
id186876
size63,169
Matthias Endler (mre)

documentation

README

futures-batch

Build status Cargo Documentation

A stream adaptor that chunks up items with timeout support. Items are flushed when:

  • The buffer reaches capacity or
  • A timeout occurs

Based on the Chunks adaptor from futures-util, with added timeout functionality.

Note: Originally called tokio-batch, but renamed since it has no dependency on Tokio.

Usage

Add to your Cargo.toml:

[dependencies]
futures-batch = "0.7"

Use as a stream combinator:

use std::time::Duration;
use futures::{stream, StreamExt};
use futures_batch::ChunksTimeoutStreamExt;

#[tokio::main]
async fn main() {
    let results = stream::iter(0..10)
        .chunks_timeout(5, Duration::from_secs(10))
        .collect::<Vec<_>>()
        .await;

    assert_eq!(vec![vec![0, 1, 2, 3, 4], vec![5, 6, 7, 8, 9]], results);
}

This creates chunks of up to 5 items with a 10-second timeout.

TryChunksTimeout

For streams that yield Result values, use try_chunks_timeout to batch successful values and immediately propagate errors:

use std::time::Duration;
use futures::{stream, StreamExt};
use futures_batch::TryChunksTimeoutStreamExt;

#[tokio::main]
async fn main() {
    let results = stream::iter((0..10).map(|i| if i == 5 { Err("error") } else { Ok(i) }))
        .try_chunks_timeout(3, Duration::from_secs(10))
        .collect::<Vec<_>>()
        .await;

    // Results in: [Ok([0, 1, 2]), Ok([3, 4]), Err("error"), Ok([6, 7, 8]), Ok([9])]
    println!("{:?}", results);
}

This batches Ok values until the buffer is full or timeout occurs, while immediately propagating any Err values.

Features

sink (optional)

Enable Sink support for bidirectional streams:

[dependencies]
futures-batch = { version = "0.7", features = ["sink"] }

When enabled, both ChunksTimeout and TryChunksTimeout implement Sink and forward sink operations to the underlying stream.

Performance

futures-batch has minimal overhead and is suitable for high-performance applications:

Benchmarks show consistent ~20ns per operation across different batch sizes.

Credits

Thanks to arielb1, alexcrichton, doyoubi, leshow, spebern, and wngr for their contributions!

Commit count: 70

cargo fmt