| Crates.io | codas-flow |
| lib.rs | codas-flow |
| version | 0.5.1 |
| created_at | 2024-11-06 21:47:02.868858+00 |
| updated_at | 2025-05-15 04:18:04.327148+00 |
| description | Low-latency, high-throughput bounded queues ("data flows") for (a)synchronous and event-driven systems. |
| homepage | https://www.codas.dev |
| repository | https://github.com/with-caer/codas |
| max_upload_size | |
| id | 1439089 |
| size | 75,806 |
Low-latency, high-throughput bounded queues ("data flows")
for (a)synchronous and event-driven systems, inspired by the
LMAX Disruptor
and built for codas.
This crate provides the flow data structure: A
"ring" buffer
which concurrent, (a)synchronous tasks can publish data to
and receive data from.
flows work kind of like the broadcast channels in
Tokio,
with some key differences:
Zero-Copy Multicast Reads: Every data published to a flow is immediately available to each subscriber, in parallel, with no copies or cloning.
Lock-Free by default: No locks or mutexes are used
when publishing or receiving data from the flow on
supported targets.
Broad Compatibility:
no-std by default.async and synchronous APIs.async functionality doesn't depend on a specific
runtime or framework (not even futures!).flows work wherever channels or queues would work, but they're built
specifically for systems that need the same data processed concurrently
(or in parallel) by multiple tasks.
Flows are created with Flow::new, which returns a
tuple of (flow, subscribers):
use codas_flow::*;
// Create a flow with a capacity of 32 strings,
// and one subscriber.
let (mut flow, [mut sub]) = Flow::<String>::new(32);
// Publish "Hello!" to the next data sequence in the flow.
let seq = flow.try_next().unwrap();
seq.publish("Hello!".to_string());
// Receive the next published data sequence from the flow.
let seq = sub.try_next().unwrap();
assert_eq!("Hello!", *seq);
Data is published into a flow via Flow::try_next
(or await Flow::next), which returns an UnpublishedData
reference. Once this reference is published (via UnpublishedData::publish),
or dropped, it becomes receivable by every subscriber.
Data is received from a flow via FlowSubscriber::try_next
(or await FlowSubscriber::next), which returns a PublishedData
reference.
Using
slice patterns,
any number of subscribers can be returned by Flow::new:
use codas_flow::*;
// Create a flow with a capacity of 32 strings,
// and 2 subscribers.
let (mut flow, [mut sub_a, mut sub_b]) = Flow::<String>::new(32);
New subscribers cannot be added to an active flow. To overcome this challenge, any subscriber can be wrapped in a Stage.
A stage is a dynamic group of data processors that share a single subscriber:
# use core::sync::atomic::Ordering;
# use portable_atomic::AtomicU64;
# use portable_atomic_util::Arc;
use codas_flow::{*, stage::*};
// Create a flow.
let (mut flow, [mut sub]) = Flow::<String>::new(32);
// Wrap the subscriber in a processing stage.
let mut stage = Stage::from(sub);
// Add a data processor to the stage; an indefinite
// number of processors can be added to a stage, even
// while the flow is active.
let calls = Arc::new(AtomicU64::new(0));
let closure_calls = calls.clone();
stage.add_proc(move |proc: &mut Proc, data: &String| {
assert_eq!("Hello!", *data);
closure_calls.add(1, Ordering::SeqCst);
});
// Publish "Hello!" to the next data sequence in the flow.
let seq = flow.try_next().unwrap();
seq.publish("Hello!".to_string());
// Run processors for a set of data in the flow.
stage.proc();
assert_eq!(1, calls.load(Ordering::SeqCst));
Stages only receive data from the flow when one of the
Stage::proc* functions is invoked; refer to the Stage
docs for more information.
This crate uses AtomicU64 to coordinate flow access
without locks. This type is lock-free where possible, but may use locks on some platforms or compile targets.
This section contains a list of the primary targets supported by this crate, along with their support for lock-free behavior.
| Target | Lock-Free? |
|---|
aarch64-unknown-linux-gnu (64-Bit Linux ARM) | Yes
aarch64-apple-darwin (64-Bit MacOS ARM) | Yes
x86_64-unknown-linux-gnu (64-Bit Linux) | Yes
x86_64-apple-darwin (64-Bit MacOS) | Yes
x86_64-pc-windows-gnu (64-Bit Windows) | Yes
wasm32-unknown-unknown (WASM) | Yes1
armv7-unknown-linux-gnueabihf (ARM Cortex A7 and A8) | No2
riscv32i-unknown-none-elf (ESP 32) | No
1 WASM targets don't technically support atomic instructions. However, because WASM code is executed in a single-thread, regular variables are simply substituted for their atomic counterparts, enabling full lock-free support.
2 Confirmation required; a safe assumption is that 32-bit targets don't support atomic operations on 64-bit values.
| Operation | codas (flow) |
codas (stage) |
codas (stage w/ tokio-yield) |
tokio (mpsc) |
tokio (broadcast) |
|---|
Move -> Read | 55ns (18M/s) | 26ns (38M/s) | 19ns (53M/s) | - | - |
Move -> Take | - | - | - | 70ns (14M/s) | - |
Move -> Clone | - | - | - | - | 33ns (30M/s) |
Comparitive performance of different scenarios we've written benchmarks for. Exact numbers will vary between platforms.
Copyright © 2024—2025 With Caer, LLC and Alicorn Systems, LLC.
Licensed under the MIT license. Refer to the license file for more info.