| Crates.io | race |
| lib.rs | race |
| version | 0.1.14 |
| created_at | 2025-12-18 06:57:27.868495+00 |
| updated_at | 2025-12-19 06:00:18.698821+00 |
| description | Staggered async task executor with race semantics / 阶梯式异步任务执行器,具备竞赛语义 |
| homepage | https://github.com/js0-site/rust/tree/main/race |
| repository | https://github.com/js0-site/rust.git |
| max_upload_size | |
| id | 1991841 |
| size | 91,255 |
race is a high-performance Rust library implementing staggered async task execution. Tasks start at fixed intervals and race to completion - fastest task wins regardless of start order.
Note: "Staggered" means tasks are launched sequentially at fixed time intervals, not simultaneously. For example, with a 50ms interval: Task 1 starts at 0ms, Task 2 at 50ms, Task 3 at 100ms, etc. This creates a "staircase" or "ladder" pattern of task launches.
Key difference from Promise.race(): Instead of starting all tasks simultaneously, tasks launch sequentially at configurable intervals, enabling:
futures::Stream for async iterationdyn dispatch, coarsetime optimization'static requirement - Lifetime tied to Race instancecargo add race
Or add to your Cargo.toml:
[dependencies]
race = "0.1.3"
use futures::StreamExt;
use race::Race;
#[tokio::main]
async fn main() {
let mut race = Race::new(
std::time::Duration::from_millis(50), // Start new task every 50ms
|url: &str| async move {
// Simulate network request with different latencies
let latency = match url {
"server1" => 100,
"server2" => 20, // Fastest
_ => 80,
};
tokio::time::sleep(std::time::Duration::from_millis(latency)).await;
Ok::<(&str, String), &'static str>((url, format!("Response from {url}")))
},
vec!["server1", "server2", "server3"],
);
// Get first completed result (server2 completes first despite starting second)
if let Some((url, Ok(data))) = race.next().await {
println!("First response from {url}: {data}");
}
}
Query multiple hosts with 500ms staggered delay, return first successful result:
use std::net::IpAddr;
use futures::StreamExt;
use race::Race;
use tokio::net::lookup_host;
#[tokio::main]
async fn main() {
let hosts = vec!["google.com:80", "cloudflare.com:80", "github.com:80"];
let mut race = Race::new(
std::time::Duration::from_millis(500),
|host: &str| async move {
let addr = lookup_host(host).await?.next().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::NotFound, "no address")
})?;
Ok::<(&str, IpAddr), std::io::Error>((host, addr.ip()))
},
hosts,
);
// Return first successful response
while let Some((host, result)) = race.next().await {
if let Ok(ip) = result {
println!("Resolved {host}: {ip}");
break;
}
}
}
Timeline:
First completed response wins, remaining tasks continue until Race is dropped.
Handle unlimited iterators efficiently - tasks are started on-demand:
use futures::StreamExt;
use race::Race;
#[tokio::main]
async fn main() {
// Infinite iterator - only starts tasks as needed
let infinite_numbers = 0u64..;
let mut race = Race::new(
std::time::Duration::from_millis(50),
|n: &u64| {
let n = *n;
async move {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
Ok::<(u64, u64), &'static str>((n, n * n))
}
},
infinite_numbers,
);
// Only consume what you need - no memory explosion
for i in 0..5 {
if let Some((n, Ok((n_val, square)))) = race.next().await {
println!("Result {i}: {n_val}² = {square}");
}
}
// Race is dropped here, remaining tasks are cancelled
}
Works seamlessly with non-Copy types like String, Vec, and custom structs:
use futures::StreamExt;
use race::Race;
#[derive(Debug, Clone)]
struct Task {
id: u32,
name: String,
data: Vec<i32>,
}
#[tokio::main]
async fn main() {
let tasks = vec![
Task { id: 1, name: "process".to_string(), data: vec![1, 2, 3] },
Task { id: 2, name: "analyze".to_string(), data: vec![4, 5, 6] },
];
let mut race = Race::new(
std::time::Duration::from_millis(100),
|task: &Task| {
let task = task.clone();
async move {
let sum: i32 = task.data.iter().sum();
let result = format!("{}: sum={sum}", task.name);
Ok::<String, &'static str>(result)
}
},
tasks,
);
while let Some((original_task, Ok(result))) = race.next().await {
println!("Task {}: {result}", original_task.id);
}
}
graph TD
A[Race::new] --> B[poll_next called]
B --> C{Time to start new task?}
C -->|Yes| D[Get next arg from iterator]
D --> E[Call run function with &arg]
E --> F[Store arg and Future in ing vector]
F --> G[Update next_run time]
G --> H{More args available?}
H -->|Yes| C
H -->|No| I[Mark is_end = true]
C -->|No| J[Poll all running tasks]
I --> J
J --> K{Any task completed?}
K -->|Yes| L[Remove from ing vector]
L --> M[Return Some Ok arg, result]
K -->|No| N{All tasks done?}
N -->|Yes| O[Return None]
N -->|No| P[Set timer for next start]
P --> Q[Return Pending]
Race::new stores task generator function, step interval, and argument iteratorpoll_next, check if current time >= next_run to start new tasksrun(&arg) to create Future, store both arg and Future in ing vector(original_arg, result) tuple immediatelytokio::time::Sleep ensures proper wakeup for next task start&A to avoid unnecessary movesSend + Unpin, including non-cloneable typescoarsetime for high-performance interval timinging vector pre-allocated to avoid frequent reallocationsRace<'a, A, T, E, G, Fut, I>Staggered race executor implementing futures::Stream.
Type parameters:
A - Argument typeT - Success result typeE - Error typeG - Task generator function Fn(A) -> FutFut - Future type returning Result<(A, T), E>I - Iterator type yielding Anew(step: std::time::Duration, run: G, args_li: impl IntoIterator) -> SelfCreate executor with step interval, task generator, and arguments.
Parameters:
step: std::time::Duration - Staggered delay interval for task starts. Tasks are launched sequentially at this interval, not all at once. For example, Duration::from_millis(50) means each task starts 50ms after the previous one (converted to coarsetime::Duration)run: G - Function Fn(&A) -> Fut that creates Future from argument referenceargs_li: impl IntoIterator<Item = A> - Iterator of arguments (can be infinite)Returns: Race instance implementing futures::Stream<Item = Result<(A, T), E>>
Constraints:
A: Send + Unpin + 'a - Argument type must be thread-safe and unpinnableT: Send + 'a - Result type must be thread-safeE: Send + 'a - Error type must be thread-safeG: Fn(&A) -> Fut + Send + Unpin + 'a - Generator function must be thread-safeFut: Future<Output = Result<T, E>> + Send + 'a - Future must be thread-safeI: Iterator<Item = A> + Send + Unpin + 'a - Iterator must be thread-safeStream::poll_nextReturns Poll<Option<(A, Result<T, E>)>>:
Poll::Ready(Some((arg, Ok(result)))) - Task completed successfully with original argumentPoll::Ready(Some((arg, Err(e)))) - Task failed with error, still returns original argumentPoll::Ready(None) - All tasks completed and iterator exhaustedPoll::Pending - No task ready, waker registered for future notificationOptimizations implemented:
dyn dispatch - Fully generic, no virtual callscoarsetime - Fast time operations, reduced syscallsBenchmarks show significant performance improvements over channel-based approaches.
race/
├── src/
│ └── lib.rs # Core implementation
├── tests/
│ └── main.rs # Integration tests with logging
├── readme/
│ ├── en.md # English documentation
│ └── zh.md # Chinese documentation
└── Cargo.toml
The "race" pattern in async programming traces back to JavaScript's Promise.race(), introduced in ES6 (2015). Unlike Promise.race() which starts all tasks simultaneously, this library implements a staggered variant inspired by gRPC's "hedged requests" pattern described in Google's 2015 paper "The Tail at Scale".
Hedged requests help reduce tail latency by sending redundant requests after a delay, using whichever response arrives first. This technique is widely used in distributed systems at Google, Amazon, and other large-scale services.
This Rust implementation adds modern optimizations like zero-cost abstractions, efficient time handling, and support for infinite streams.
This project is an open-source component of js0.site ⋅ Refactoring the Internet Plan.
We are redefining the development paradigm of the Internet in a componentized way. Welcome to follow us:
race 是高性能 Rust 阶梯式异步任务执行器。任务按固定间隔启动并竞赛完成 - 最快完成者获胜,与启动顺序无关。
提示:"阶梯式"(Staggered)指任务按固定时间间隔依次启动,而非同时启动。例如,50ms 间隔表示:任务1在 0ms 启动,任务2在 50ms 启动,任务3在 100ms 启动,以此类推。这形成了任务启动的"阶梯"或"梯子"模式。
与 Promise.race() 的关键区别:不是同时启动所有任务,而是按可配置间隔依次启动,适用于:
futures::Stream 用于异步迭代dyn 分发,coarsetime 优化'static 要求 - 生命周期与 Race 实例绑定cargo add race
或添加到 Cargo.toml:
[dependencies]
race = "0.1.3"
use futures::StreamExt;
use race::Race;
#[tokio::main]
async fn main() {
let mut race = Race::new(
std::time::Duration::from_millis(50), // 每 50ms 启动新任务
|url: &str| async move {
// 模拟不同延迟的网络请求
let latency = match url {
"server1" => 100,
"server2" => 20, // 最快
_ => 80,
};
tokio::time::sleep(std::time::Duration::from_millis(latency)).await;
Ok::<(&str, String), &'static str>((url, format!("Response from {url}")))
},
vec!["server1", "server2", "server3"],
);
// 获取首个完成的结果(server2 虽然第二个启动但最先完成)
if let Some((url, Ok(data))) = race.next().await {
println!("首个响应来自 {url}: {data}");
}
}
向多个主机发起解析,每 500ms 启动新请求,返回首个成功结果:
use std::net::IpAddr;
use futures::StreamExt;
use race::Race;
use tokio::net::lookup_host;
#[tokio::main]
async fn main() {
let hosts = vec!["google.com:80", "cloudflare.com:80", "github.com:80"];
let mut race = Race::new(
std::time::Duration::from_millis(500),
|host: &str| async move {
let addr = lookup_host(host).await?.next().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::NotFound, "no address")
})?;
Ok::<(&str, IpAddr), std::io::Error>((host, addr.ip()))
},
hosts,
);
// 返回首个成功响应
while let Some((host, result)) = race.next().await {
if let Ok(ip) = result {
println!("解析 {host}: {ip}");
break;
}
}
}
时间线:
首个完成的响应返回,剩余任务在后台继续运行直到 Race 被 drop。
高效处理无限迭代器 - 任务按需启动:
use futures::StreamExt;
use race::Race;
#[tokio::main]
async fn main() {
// 无限迭代器 - 只按需启动任务
let infinite_numbers = 0u64..;
let mut race = Race::new(
std::time::Duration::from_millis(50),
|n: &u64| {
let n = *n;
async move {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
Ok::<(u64, u64), &'static str>((n, n * n))
}
},
infinite_numbers,
);
// 只消费需要的部分 - 不会内存爆炸
for i in 0..5 {
if let Some((n, Ok((n_val, square)))) = race.next().await {
println!("结果 {i}: {n_val}² = {square}");
}
}
// Race 在此处被 drop,剩余任务被取消
}
无缝支持非 Copy 类型,如 String、Vec 和自定义结构体:
use futures::StreamExt;
use race::Race;
#[derive(Debug, Clone)]
struct Task {
id: u32,
name: String,
data: Vec<i32>,
}
#[tokio::main]
async fn main() {
let tasks = vec![
Task { id: 1, name: "process".to_string(), data: vec![1, 2, 3] },
Task { id: 2, name: "analyze".to_string(), data: vec![4, 5, 6] },
];
let mut race = Race::new(
std::time::Duration::from_millis(100),
|task: &Task| {
let task = task.clone();
async move {
let sum: i32 = task.data.iter().sum();
let result = format!("{}: sum={sum}", task.name);
Ok::<String, &'static str>(result)
}
},
tasks,
);
while let Some((original_task, Ok(result))) = race.next().await {
println!("任务 {}: {result}", original_task.id);
}
}
graph TD
A[Race::new] --> B[poll_next 被调用]
B --> C{是否该启动新任务?}
C -->| 是| D[从迭代器获取下个参数]
D --> E[用 &arg 调用 run 函数]
E --> F[将 arg 和 Future 存入 ing 向量]
F --> G[更新 next_run 时间]
G --> H{还有参数可用?}
H -->| 是| C
H -->| 否| I[标记 is_end = true]
C -->| 否| J[轮询所有运行中的任务]
I --> J
J --> K{有任务完成?}
K -->| 是| L[从 ing 向量移除]
L --> M[返回 Some Ok arg, result]
K -->| 否| N{所有任务完成?}
N -->| 是| O[返回 None]
N -->| 否| P[设置下次启动定时器]
P --> Q[返回 Pending]
Race::new 存储任务生成器函数、步进间隔和参数迭代器poll_next 检查当前时间是否 >= next_run 来启动新任务run(&arg) 创建 Future,将参数和 Future 存储在 ing 向量中(原始参数, 结果) 元组tokio::time::Sleep 确保下次任务启动的正确唤醒&A 避免不必要的移动Send + Unpin 的类型,包括不可克隆类型coarsetime 进行高性能间隔计时ing 向量预分配以避免频繁重新分配Race<'a, A, T, E, G, Fut, I>阶梯式竞赛执行器,实现 futures::Stream。
类型参数:
A - 参数类型T - 成功结果类型E - 错误类型G - 任务生成函数 Fn(A) -> FutFut - Future 类型,返回 Result<(A, T), E>I - 迭代器类型,产生 Anew(step: std::time::Duration, run: G, args_li: impl IntoIterator) -> Self创建执行器,传入步进间隔、任务生成器和参数。
参数:
step: std::time::Duration - 阶梯式延时启动间隔。每个任务按此间隔依次启动,而非同时启动所有任务。例如 Duration::from_millis(50) 表示每隔 50ms 启动下一个任务(转换为 coarsetime::Duration)run: G - 函数 Fn(&A) -> Fut,从参数引用创建 Futureargs_li: impl IntoIterator<Item = A> - 参数迭代器(可以是无限的)返回: 实现 futures::Stream<Item = Result<(A, T), E>> 的 Race 实例
约束:
A: Send + Unpin + 'a - 参数类型必须线程安全且可 unpinT: Send + 'a - 结果类型必须线程安全E: Send + 'a - 错误类型必须线程安全G: Fn(&A) -> Fut + Send + Unpin + 'a - 生成器函数必须线程安全Fut: Future<Output = Result<T, E>> + Send + 'a - Future 必须线程安全I: Iterator<Item = A> + Send + Unpin + 'a - 迭代器必须线程安全Stream::poll_next返回 Poll<Option<(A, Result<T, E>)>>:
Poll::Ready(Some((arg, Ok(result)))) - 任务成功完成,返回原始参数Poll::Ready(Some((arg, Err(e)))) - 任务失败,仍返回原始参数Poll::Ready(None) - 所有任务完成且迭代器耗尽Poll::Pending - 暂无任务就绪,已注册唤醒器等待未来通知实现的优化:
dyn 分发 - 完全泛型,无虚拟调用coarsetime - 快速时间操作,减少系统调用基准测试显示相比基于 channel 的方法有显著性能提升。
race/
├── src/
│ └── lib.rs # 核心实现
├── tests/
│ └── main.rs # 集成测试
├── readme/
│ ├── en.md # 英文文档
│ └── zh.md # 中文文档
└── Cargo.toml
异步编程中的 "race" 模式源于 JavaScript 的 Promise.race(),在 ES6 (2015) 中引入。与 Promise.race() 同时启动所有任务不同,本库实现了阶梯变体,灵感来自 Google 2015 年论文 《The Tail at Scale》 中描述的 gRPC 对冲请求模式。
对冲请求通过延迟发送冗余请求来降低尾延迟,使用最先到达的响应。该技术广泛应用于 Google、Amazon 等大规模分布式系统。
本项目为 js0.site ⋅ 重构互联网计划 的开源组件。
我们正在以组件化的方式重新定义互联网的开发范式,欢迎关注: