Crates.io | codas-flow |
lib.rs | codas-flow |
version | 0.1.0 |
source | src |
created_at | 2024-11-06 21:47:02.868858 |
updated_at | 2024-11-06 21:47:02.868858 |
description | Low-latency, high-throughput bounded queues ("data flows") for (a)synchronous and event-driven systems. |
homepage | https://gitlab.com/alicorn/pub/alicorn |
repository | https://gitlab.com/alicorn/pub/alicorn |
max_upload_size | |
id | 1439089 |
size | 25,747 |
Low-latency, high-throughput bounded queues ("data flows")
for (a)synchronous and event-driven systems, inspired by the
LMAX Disruptor
and built for codas
.
This crate provides the flow
data structure: A
"ring" buffer
which concurrent, (a)synchronous tasks can publish data to
and receive data from.
flow
s work kind of like the broadcast
channels in
Tokio,
with some key differences:
Zero-Copy Multicast Reads: Every data published to a flow is immediately available to each subscriber, in parallel, with no copies or cloning.
Lock-Free by default: No locks or mutexes are used
when publishing or receiving data from the flow
.
If the
dynamic-sub
feature is enabled, a single lock is used when publishing data to coordinate access to the set of subscribers. This lock may be replaced in a future update by a list supporting atomic updates.
Broad Compatibility:
no-std
by default.async
and synchronous versions of all APIs.async
functionality doesn't depend on a specific
runtime or framework (not even futures
!).flow
s work wherever channels or queues would work, but they're built
specifically for systems that need the same data processed concurrently
(or in parallel) by multiple tasks.
Flows are created with Flow::new
, which returns a
tuple of (flow, subscribers)
:
use codas_flow::*;
// Create a flow with a capacity of 32 strings,
// and one subscriber.
let (mut flow, [mut sub]) = Flow::<String>::new(32).unwrap();
// Publish "Hello!" to the next data sequence in the flow.
let seq = flow.try_next().unwrap();
seq.publish("Hello!".to_string());
// Receive the next published data sequence from the flow.
let seq = sub.try_next().unwrap();
assert_eq!("Hello!", *seq);
Data is published into a flow
via Flow::try_next
(or await Flow::next
), which returns an UnpublishedData
reference. Once this reference is published (via UnpublishedData::publish
),
or dropped, it becomes receivable by every subscriber.
Data is received from a flow
via FlowSubscriber::try_next
(or await FlowSubscriber::next
), which returns a PublishedData
reference.
Using
slice patterns,
any number of subscribers
can be returned by Flow::new
:
use codas_flow::*;
// Create a flow with a capacity of 32 strings,
// and 2 subscribers.
let (mut flow, [mut sub_a, mut sub_b]) = Flow::<String>::new(32).unwrap();
When the dynamic-sub
feature is enabled, new subscribers
can be attached to a flow
even after it's created:
use codas_flow::*;
// Create a flow with a capacity of 32 strings,
// and no subscribers.
let (mut flow, []) = Flow::<String>::new(32).unwrap();
// Attach a new subscriber.
let mut sub = flow.subscribe();
This crate offers the following optional features:
dynamic-sub
: Enables APIs for adding new subscribers
to existing Flows. This feature introduces spin-locks to
some critical sections in order to safely add subscribers
to active flows.None of these features are enabled by default.
Copyright 2024 Alicorn Systems, Inc.
Licensed under the GNU Affero General Public License version 3, as published by the Free Software Foundation. Refer to the license file for more information.
If you have any questions, please reach out to [hello@alicorn.systems
].