flume-overwrite

Crates.ioflume-overwrite
lib.rsflume-overwrite
version0.12.0
created_at2025-07-21 11:11:12.128369+00
updated_at2025-12-10 14:30:30.031263+00
descriptionA library for sending values to a flume bounded channel, overwriting the oldest values if the channel is full.
homepagehttps://github.com/flejz/flume-overwrite
repositoryhttps://github.com/flejz/flume-overwrite
max_upload_size
id1761955
size32,472
flejz (flejz)

documentation

https://docs.rs/flume-overwrite

README

flume-overwrite

Crates.io Documentation License: MIT

A Rust library that provides bounded channels with overwrite capability, built on top of the flume crate. When the channel reaches capacity, new messages will automatically overwrite the oldest unread messages, ensuring that sends never block due to a full channel.

Features

  • Overwrite semantics: Messages sent to a full channel replace the oldest messages
  • Non-blocking sends: Never blocks when sending, even at capacity
  • Async support: Both blocking and async send operations
  • Drain tracking: Returns information about which messages were overwritten
  • Thread-safe: Built on flume's proven concurrency primitives
  • Zero-copy: Minimal overhead over standard flume channels

Installation

Add this to your Cargo.toml:

[dependencies]
flume-overwrite = "0.1.0"

Usage Examples

Basic Overwrite Behavior

use flume_overwrite::bounded;

let (sender, receiver) = bounded(2);

// Fill the channel
assert_eq!(sender.send_overwrite("first").unwrap(), None);
assert_eq!(sender.send_overwrite("second").unwrap(), None);

// This overwrites "first"
let overwritten = sender.send_overwrite("third").unwrap();
assert_eq!(overwritten, Some(vec!["first"]));

// Only "second" and "third" remain
assert_eq!(receiver.recv().unwrap(), "second");
assert_eq!(receiver.recv().unwrap(), "third");

Async Operations

use flume_overwrite::bounded;
use futures::executor::block_on;

let (sender, receiver) = bounded(1);

block_on(async {
    // Send without overwriting
    assert_eq!(sender.send_overwrite_async("hello").await.unwrap(), None);
    
    // This will overwrite "hello"
    let overwritten = sender.send_overwrite_async("world").await.unwrap();
    assert_eq!(overwritten, Some(vec!["hello"]));
    
    assert_eq!(receiver.recv_async().await.unwrap(), "world");
});

Producer-Consumer with Overwrite

Perfect for scenarios where you want to ensure the consumer always gets the latest data without blocking the producer:

use flume_overwrite::bounded;
use std::thread;
use std::time::Duration;

let (sender, receiver) = bounded(3);

// Producer thread - never blocks
let producer = thread::spawn(move || {
    for i in 0..10 {
        if let Ok(overwritten) = sender.send_overwrite(i) {
            if let Some(old_values) = overwritten {
                println!("Overwritten values: {:?}", old_values);
            }
        }
        thread::sleep(Duration::from_millis(10));
    }
});

// Consumer thread - processes at its own pace
let consumer = thread::spawn(move || {
    while let Ok(value) = receiver.recv() {
        println!("Processing: {}", value);
        thread::sleep(Duration::from_millis(50)); // Simulate slow processing
    }
});

producer.join().unwrap();
// Consumer will process the latest available messages

Integration with Standard Flume Operations

The OverwriteSender implements Deref<Target = Sender<T>>, so you can use all standard flume sender methods:

use flume_overwrite::bounded;

let (sender, receiver) = bounded(5);

// Use standard flume methods
sender.send(42).unwrap();
println!("Channel length: {}", sender.len());
println!("Channel capacity: {:?}", sender.capacity());

// Or use overwrite methods
sender.send_overwrite(43).unwrap();

Use Cases

This library is particularly useful for:

  • Real-time data streams: Sensor data, market feeds, telemetry where latest data is most important
  • Event logging: Keeping recent events while discarding old ones automatically
  • Producer-consumer scenarios: Where producers shouldn't be blocked by slow consumers
  • Buffering latest state: GUI updates, game state, configuration changes
  • Rate limiting: Dropping excess messages while preserving recent ones

Requirements

  • Rust 1.75.0 or later (edition 2024)
  • Compatible with no_std environments (through flume)

License

This project is licensed under the MIT License - see the LICENSE file for details.

Commit count: 0

cargo fmt