| Crates.io | crossfire |
| lib.rs | crossfire |
| version | 3.0.2 |
| created_at | 2020-06-29 20:41:56.291711+00 |
| updated_at | 2026-01-23 13:13:31.617208+00 |
| description | channels for async and threads |
| homepage | https://github.com/frostyplanet/crossfire-rs |
| repository | https://github.com/frostyplanet/crossfire-rs |
| max_upload_size | |
| id | 259527 |
| size | 468,398 |
High-performance lockless spsc/mpsc/mpmc channels, algorithm derives crossbeam with improvements.
It supports async contexts and bridges the gap between async and blocking contexts.
For the concept, please refer to the wiki.
v1.0: Used in production since 2022.12.
v2.0: [2025.6] Refactored the codebase and API by removing generic types from the ChannelShared type, which made it easier to code with.
v2.1: [2025.9] Removed the dependency on crossbeam-channel and implemented with a modified version of crossbeam-queue, brings 2x performance improvements for both async and blocking contexts.
v3.0: [2026.1] Refactored API back to generic flavor interface, added select. Dedicated optimization: Bounded SPSC +70%, MPSC +30%, one-size +20%. Eliminate enum dispatch cost, async performance improved for another 33%. Checkout compat for migration from v2.x.
Being a lockless channel, crossfire outperforms other async-capable channels. And thanks to a lighter notification mechanism, in a blocking context, most cases are even better than the original crossbeam-channel,
More benchmark data is posted on wiki.
Also, being a lockless channel, the algorithm relies on spinning and yielding. Spinning is good on
multi-core systems, but not friendly to single-core systems (like virtual machines).
So we provide a function detect_backoff_cfg() to detect the running platform.
Calling it within the initialization section of your code, will get a 2x performance boost on
VPS.
The benchmark is written in the criterion framework. You can run the benchmark by:
make bench crossfire
make bench crossfire_select
spsc, mpsc, mpmc. Each has different underlying implementation optimized to its concurrent model. The SP or SC interface is only for non-concurrent operation. It's more memory-efficient in waker registration, and has atomic ops cost reduced in the lockless algorithm.
oneshot has its special sender/receiver type because using Tx / Rx will be too heavy.
The following lockless queues are expose in flavor module, and each one have type alias in spsc/mpsc/mpmc:
List (which use crossbeam SegQueue)Array (which is an enum that wraps crossbeam ArrayQueue, and a One if init with size<=1)
One (which derives from ArrayQueue algorithm, but have better performance in size=1
scenario, because it have two slots to allow reader and writer works concurrently)Null (See the doc null), for cancellation purpose channel, that only wakeup on
closing.NOTE :
Although the name Array, List are the same between spsc/mpsc/mpmc module,
they are different type alias local to its parent module. We suggest distinguish by
namespace when import for use.
Aside from function bounded_*, unbounded_* which specify the sender / receiver type,
each module has build() and new() function, which can apply to any channel flavors, and any async/blocking combinations.
| Context | Sender (Producer) | Receiver (Consumer) | ||
|---|---|---|---|---|
| Single | Multiple | Single | Multiple | |
| Blocking | BlockingTxTrait | BlockingRxTrait | ||
| Tx | MTx | Rx | MRx | |
| Async | AsyncTxTrait | AsyncRxTrait | ||
| AsyncTx | MAsyncTx | AsyncRx | MAsyncRx | |
Safety: For the SP / SC version, AsyncTx, AsyncRx, Tx, and Rx are not Clone and without Sync.
Although can be moved to other threads, but not allowed to use send/recv while in an Arc. (Refer to the compile_fail
examples in the type document).
The benefit of using the SP / SC API is completely lockless waker registration, in exchange for a performance boost.
The sender/receiver can use the From trait to convert between blocking and async context
counterparts (refer to the example below)
Error types are the same as crossbeam-channel:
TrySendError, SendError, SendTimeoutError, TryRecvError, RecvError, RecvTimeoutError
Tested on tokio-1.x and async-std-1.x, crossfire is runtime-agnostic.
The following scenarios are considered:
The AsyncTx::send() and AsyncRx::recv() operations are cancellation-safe in an async context.
You can safely use the select! macro and timeout() function in tokio/futures in combination with recv().
On cancellation, SendFuture and RecvFuture will trigger drop(), which will clean up the state of the waker,
making sure there is no memory-leak and deadlock.
But you cannot know the true result from SendFuture, since it's dropped
upon cancellation. Thus, we suggest using AsyncTx::send_timeout() instead.
When the "tokio" or "async_std" feature is enabled, we also provide two additional functions:
AsyncTx::send_timeout(), which will return the message that failed to be sent in
SendTimeoutError. We guarantee the result is atomic. Alternatively, you can use
AsyncTx::send_with_timer().
AsyncRx::recv_timeout(), we guarantee the result is atomic.
Alternatively, you can use AsyncRx::recv_with_timer().
When using a multi-producer and multi-consumer scenario, there's a small memory overhead to pass along a Weak
reference of wakers.
Because we aim to be lockless, when the sending/receiving futures are canceled (like tokio::time::timeout()),
it might trigger an immediate cleanup if the try-lock is successful, otherwise will rely on lazy cleanup.
(This won't be an issue because weak wakers will be consumed by actual message send and recv).
On an idle-select scenario, like a notification for close, the waker will be reused as much as possible
if poll() returns pending.
The future object created by AsyncTx::send(), AsyncTx::send_timeout(), AsyncRx::recv(),
AsyncRx::recv_timeout() is Sized. You don't need to put them in Box.
If you like to use poll function directly for complex behavior, you can call
AsyncSink::poll_send() or AsyncStream::poll_item() with Context.
Cargo.toml:
[dependencies]
crossfire = "3.0"
compat: Enable the compat model, which has the same API namespace struct as V2.x
tokio: Enable send_timeout(), recv_timeout() with tokio sleep function. (conflict
with async_std feature)
async_std: Enable send_timeout, recv_timeout with async-std sleep function. (conflict
with tokio feature)
trace_log: Development mode, to enable internal log while testing or benchmark, to debug deadlock issues.
blocking / async sender receiver mixed together
extern crate crossfire;
use crossfire::*;
#[macro_use]
extern crate tokio;
use tokio::time::{sleep, interval, Duration};
#[tokio::main]
async fn main() {
let (tx, rx) = mpmc::bounded_async::<usize>(100);
let mut recv_counter = 0;
let mut co_tx = Vec::new();
let mut co_rx = Vec::new();
const ROUND: usize = 1000;
let _tx: MTx<mpmc::Array<usize>> = tx.clone().into_blocking();
co_tx.push(tokio::task::spawn_blocking(move || {
for i in 0..ROUND {
_tx.send(i).expect("send ok");
}
}));
co_tx.push(tokio::spawn(async move {
for i in 0..ROUND {
tx.send(i).await.expect("send ok");
}
}));
let _rx: MRx<mpmc::Array<usize>> = rx.clone().into_blocking();
co_rx.push(tokio::task::spawn_blocking(move || {
let mut count: usize = 0;
'A: loop {
match _rx.recv() {
Ok(_i) => {
count += 1;
}
Err(_) => break 'A,
}
}
count
}));
co_rx.push(tokio::spawn(async move {
let mut count: usize = 0;
'A: loop {
match rx.recv().await {
Ok(_i) => {
count += 1;
}
Err(_) => break 'A,
}
}
count
}));
for th in co_tx {
let _ = th.await.unwrap();
}
for th in co_rx {
recv_counter += th.await.unwrap();
}
assert_eq!(recv_counter, ROUND * 2);
}
NOTE: Because we has push the speed to a level no one has gone before, it can put a pure pressure to the async runtime. Some hidden bug (especially atomic ops on weaker ordering platform) might occur:
The test is placed in test-suite directory, run with:
make test
| arch | runtime | workflow | status |
|---|---|---|---|
| x86_64 | threaded | cron_master_threaded_x86 | STABLE |
| tokio 1.47.1 | cron_master_tokio_x86 | STABLE |
|
| async-std | cron_master_async_std_x86 | STABLE | |
| smol | cron_master_smol-x86 | STABLE | |
| compio | cron_master_compio-x86 | STABLE | |
| arm | threaded |
cron_master_threaded_arm |
STABLE |
| tokio >= 1.48 (tokio PR #7622) |
cron_master_tokio_arm |
SHOULD UPGRADE tokio to 1.48 STABLE |
|
| async-std | cron_master_async_std_arm | STABLE | |
| smol | cron_master_smol_arm | STABLE | |
| compio | cron_master_compio_arm | STABLE | |
| miri (emulation) | threaded | miri_tokio miri_tokio_cur |
STABLE |
| tokio | still verifying | ||
| async-std | - | (timerfd_create) not supported by miri | |
| smol | - | (timerfd_create) not supported by miri |
Debug locally:
Use --features trace_log to run the bench or test until it hangs, then press ctrl+c or send SIGINT, there will be latest log dump to /tmp/crossfire_ring.log (refer to tests/common.rs _setup_log())
Debug with github workflow: https://github.com/frostyplanet/crossfire-rs/issues/37