Low-latency, high-throughput bounded queues ("data flows") for (a)synchronous and event-driven systems, inspired by the [LMAX Disruptor](https://github.com/LMAX-Exchange/disruptor) and built for [`codas`](https://crates.io/crates/codas). ## What's Here This crate provides the `flow` data structure: A ["ring" buffer](https://en.wikipedia.org/wiki/Circular_buffer) which concurrent, (a)synchronous tasks can publish data to and receive data from. `flow`s work kind of like the `broadcast` channels in [Tokio](https://docs.rs/tokio/latest/tokio/sync/broadcast/fn.channel.html), with some key differences: 1. **Zero-Copy Multicast Reads**: Every data published to a flow is immediately available to _each_ subscriber, in parallel, with no copies or cloning. 2. **Lock-Free** by default: No locks or mutexes are used when publishing _or_ receiving data from the `flow`. > If the `dynamic-sub` feature is enabled, a single lock > is used when _publishing_ data to coordinate access to > the set of subscribers. This lock may be replaced in a > future update by a list supporting atomic updates. 3. **Broad Compatibility**: - `no-std` by default. - `async` _and_ synchronous versions of all APIs. - `async` functionality doesn't depend on a specific runtime or framework (not even `futures`!). `flow`s work wherever channels or queues would work, but they're built specifically for systems that need the same data processed concurrently (or in parallel) by multiple tasks. ## Examples Flows are created with `Flow::new`, which returns a tuple of `(flow, subscribers)`: ```rust use codas_flow::*; // Create a flow with a capacity of 32 strings, // and one subscriber. let (mut flow, [mut sub]) = Flow::::new(32).unwrap(); // Publish "Hello!" to the next data sequence in the flow. let seq = flow.try_next().unwrap(); seq.publish("Hello!".to_string()); // Receive the next published data sequence from the flow. let seq = sub.try_next().unwrap(); assert_eq!("Hello!", *seq); ``` Data is published _into_ a `flow` via `Flow::try_next` (or `await Flow::next`), which returns an `UnpublishedData` reference. Once this reference is published (via `UnpublishedData::publish`), or dropped, it becomes receivable by every subscriber. Data is received _from_ a `flow` via `FlowSubscriber::try_next` (or `await FlowSubscriber::next`), which returns a `PublishedData` reference. ### Adding Subscribers Using [slice patterns](https://doc.rust-lang.org/reference/patterns.html#slice-patterns), any number of `subscribers` can be returned by `Flow::new`: ```rust use codas_flow::*; // Create a flow with a capacity of 32 strings, // and 2 subscribers. let (mut flow, [mut sub_a, mut sub_b]) = Flow::::new(32).unwrap(); ``` ### Adding Subscribers _Dynamically_ When the `dynamic-sub` feature is enabled, new subscribers can be attached to a `flow` even _after_ it's created: ```rust use codas_flow::*; // Create a flow with a capacity of 32 strings, // and no subscribers. let (mut flow, []) = Flow::::new(32).unwrap(); // Attach a new subscriber. let mut sub = flow.subscribe(); ``` ## Optional Features This crate offers the following optional [features](https://doc.rust-lang.org/cargo/reference/features.html): - `dynamic-sub`: Enables APIs for adding new subscribers to existing Flows. This feature introduces spin-locks to some critical sections in order to safely add subscribers to active flows. None of these features are enabled by default. ## License Copyright 2024 Alicorn Systems, Inc. Licensed under the GNU Affero General Public License version 3, as published by the Free Software Foundation. Refer to [the license file](../LICENSE.txt) for more information. If you have any questions, please reach out to [`hello@alicorn.systems`].