# hirun 提供rust异步并发框架,底层基于非阻塞的IO操作和事件驱动的机制来实现. A runtime for writing asynchronous applications with the Rust programming language, based on event-driven, non-blocking I/O mechanism. 曾经在工作中深入对比过已有的C/Rust并发框架的并发性能,C版本的私有实现在各个场景下的测试数据都比tokio的好,但Rust版本在编码效率和难度上都胜于C版本. 此外C版本本身在嵌入式环境上使用,内存和磁盘资源都非常受限,因此Rust并发框架支持`no_std`是非常有必要的. We have compared the concurrent performance of the existing C/Rust concurrent framework in our work. The test data of the proprietary implementation of the C version is better than that of the Tokio version in all scenarios, but the Rust version is better than that of the C version in terms of coding efficiency and difficulty. In addition, the C version itself is used in embedded environments, and memory and disk resources are very limited. Therefor, it is necessary for the Rust concurrent framework to support `no_std`. ## 版本更新说明 1. 0.1.21 版本更新情况 > - 新增runtime::local::block_on和runtime::local::block_on_with两个接口, 提供在当前线程调用异步函数的功能. > - thread::Mutex接口全部更新为异步接口(接口无法兼容老版本,大版本号未升级) > - 其他Bug的修复. 1. 0.1.20 版本更新情况 > - 新增Task缓存机制,可以在通过runtime::Builder::max_cache指定每个线程的最大缓存数量来开启,缺省无缓存. > - 新增runtime::task::spawn系列异步接口,对cpu有利. > - 新增runtime::task::JoinSet提供创建异步子任务的异步接口, 对cpu有利. > - 基于新的runtime::task的新接口修改了examples下的示例代码. > - 包括tokioserver在内的全部example完善命令行参数支持,都可以通过-h | --help获取使用方式. > - 补充部分接口的功能注释. 1. 0.1.19 版本更新情况 > - 新增examples/tcpproxy > - 新增Fd::tcp_server_then/tcp_connect/tcp_connect_then, 后续废弃tcp_client > - 新增AioFd::new_with/wait_cookie/copy_bidirectional > - 新增Attr::fdset > - SocketAddr::inet_from支持仅指定端口号 1. 0.1.18 版本更新情况 > - 底层Scheduler等接口不再对外发布,因为直接使用它的很少,未升级大版本号,未完全遵循Rust的版本号规则. > - 性能提升: 针对短连接小数据tcp proxy场景下的热点函数进行优化,cpu占用率有`1-2%`的降低, 吞吐率有所提升. > - 新增接口: AioFd::wait_readable/wait_writable/wait_none > - 即将删除接口: AioFd::wait/Future::wait_entry > - 其他已知Bug修复 1. 0.1.17 版本更新情况 > - 性能优化. tcp_proxy场景,短连接小数据场景下,cpu占用率有所下降,吞吐量有所增加 > - 新增网络数据收发接口,对应libc的recv/send/recvfrom/sendto/recvmsg/sendmsg > - 新增task_exit/task_aborted/task_set_aborted异步接口. > - 其他一些内部实现优化 1. 0.1.16 版本更新情况 > - 修复#14优先级功能Bug > - 集成hipool v0.3.0 > - 每个Worker独立内存池,为后续优化打基础. 1. 0.1.15 版本更新情况 > - Task支持priority,0代表普通,大于0代表高优先级. 网络应用,负责监听的Task可设置为高优先级,对于新建连接,响应时延有利. 参考`example/httpserver` > - 修复channel::send_slice可能出现的panic > - 内部其他的优化. 1. 0.1.14 版本更新情况 > - 兼容rustc 1.76.0, v0.1.13版本修改不全 > - issues: #1 #2 #5 1. 0.1.13 版本更新情况 > - 解决0.1.12版本在`-C opt-level >= 1`时无法正常工作的bug. > - 兼容rustc 1.76.0版本 1. 0.1.12 版本更新情况 > - 适配hipool 0.2版本 > - 新增runtime::TaskContext为Future提供处理abort响应机制(类似异步析构的能力),基于此机制完善Future的deadline/or以及JoinHandle::abort的功能. > - runtime::Extensions新增wait_entry操作,利用POLLET机制提升io效率 > - 完全消除了AioFd所有权转移可能带来的隐患. > - 其他为后续功能扩展的内部变化 1. 0.1.11 Fd新增setsockopt_i32/getsockopt_i32/get_sock_error/copy_bidirectional等新接口 1. 0.1.10: 修复AioFd在所有权转移后存在的Bug,修复SocketAddr的Display、Debug trait实现的bug 1. 0.1.9: future增加`Or/And/`在task内部并发调度future的功能 1. 0.1.8: 匹配hierr 0.2版本, 和hipool保持一致 ## 后续计划 1. 支持windows 2. 支持http proxy 3. 支持socket proxy ## `no_std` `no_std`环境也需要一个异步并发框架,现在广泛使用的tokio等并不支持. 本crate基于linux libc的能力构建. 因为内部使用了linux的eventfd/epoll,当前还仅支持linux. ## 同步异步混合编程 spawn系列接口基本上同tokio的定义,只要在运行时创建之后可以在同步和异步环境的任何时候调用它, 没有调用上下文的约束. 此外其返回的JoinHandle提供join接口,供同步环境中等待异步任务的结束. 注意异步函数中,不能调用join,否则会阻塞当前工作线程. 阻塞等待异步任务结束,一般用于业务层的异步任务的入口函数的调用. 这类似标准库thread::scope的使用方式. ### 当前线程调度异步函数 ```rust use hirun::runtime; fn main() { let val = local::block_on(async_main(100)).unwrap(); println!("async_main return {val}"); } async fn async_main(val: i32) -> i32 { task::spawn(foo()).unwrap() + val } async fn foo() -> i32 { 50 } ``` runtime::local::block_on阻塞当前线程,创建一个在当前线程调度异步任务的临时运行时实例,完成调度后返回. ### 独立线程调度异步函数 ```rust use hirun::runtime::{Builder, block_on}; fn main() { Builder::new().build().unwrap(); let val = block_on(async_main(100)).unwrap(); println!("async_main return {val}"); } async fn async_main(val: i32) -> i32 { val + 100 } ``` 现实创建一个运行时实例,利用Builder接口可以定制运行时实例的多种属性. ### 异步环境创建异步子任务 在异步环境中可以使用runtime::task::spawn创建异步任务,性能会更好一些. 注意: task::spawn和task::JoinSet创建的异步任务都在当前运行时实例中调度,如果必须在其他异步运行时中调度,必须使用runtime::spawn和runtime::JoinSet. ```rust use hirun::runtime::{Builder, spawn, task}; fn main() { Builder::new().build().unwrap(); let val = spawn(foo(100)).join().unwrap(); println!("async foo return: {val}"); } async fn foo(val: i32) -> i32 { val + task::spawn(bar()).await.unwrap() } async fn bar() -> i32 { 100 } ``` ## 支持运行时多实例 包含阻塞操作的异步任务最好在单独的运行时实例中调度,避免对其他异步任务的影响. 可在`spawn_with`接口中按需指定运行时实例. ``` rust use hirun::runtime::{Builder, spawn, spawn_with, Attr}; const BLOCK_RUNTIME_ID: u8 = 1; fn main() { Builder::new().build().unwrap(); Builder::new().id(BLOCK_RUNTIME_ID).build().unwrap(); let h1 = spawn(foo(100)); let h2 = spawn_with(bar(200), Attr::new().id(BLOCK_RUNTIME_ID)); println!("default runtime: foo return {}", h1.join().unwrap()); println!("runtime_1: bar return {}", h2.join().unwrap()); } async fn foo(val: i32) -> i32 { val + 100 } async fn bar(val: i32) -> i32 { val + 1000 } ``` ## 支持基于hash的调度策略 业务上有需要约束某些任务必须在一个线程或者不同线程中调度,业务层可以为任务指定hash值实现这个功能. 使用这个功能应该要了解运行时的工作线程的数量, 才能利用hash值达到自身的控制目标. 以下代码强制异步任务一定在同一个工作线程运行. ```rust use hirun::runtime::{Builder, spawn_with, Attr}; use libc::pthread_self; fn main() { Builder::new().nth(2).build().unwrap(); let h1 = spawn_with(foo(200), Attr::new().hash(1)); let h2 = spawn_with(bar(200), Attr::new().hash(1)); println!("foo return {}", h1.join().unwrap()); println!("bar return {}", h2.join().unwrap()); } async fn foo(val: i32) -> i32 { println!("pthread_id: {}", unsafe { pthread_self() }); val + 100 } async fn bar(val: i32) -> i32 { println!("pthread_id: {}", unsafe { pthread_self() }); val + 1000 } ``` ## 支持Task的缓存 缺省并未开启缓存,如果开启需要指定每个线程可以缓存Task的最大数量. ```rust use hirun::runtime; fn main() { let _ = runtime::Builder::new().max_cache(100).build().unwrap(); //... } ``` `max_cache`指每个线程的最大缓存的task数量,缓存的每个task会占用4k的内存,每次新建task时会优先从缓存中获取. ## JoinSet 批量分发异步任务后,可能有需要等待所有任务执行完毕后返回,也可能等待最先完成的任务返回,可利用JoinSet实现. 最新版本有两个实现, 分别是runtime::JoinSet和runtime::task::JoinSet,前者可以在同步环境下创建异步任务,后者只能在异步环境下创建. 相对而言,后者对cpu更为有利. 以下等待所有任务完成后再返回. ```rust use hirun::runtime::{Builder, block_on, spawn, task}; fn main() { Builder::new().nth(2).build().unwrap(); block_on(async { let mut set = task::JoinSet::new(); let _ = set.spawn(foo(100)).await; let _ = set.spawn(bar(200)).await; for (seqno, val) in set.wait_all().await { println!("{seqno}, return {}", val.unwrap()); } }); } async fn foo(val: i32) -> i32 { val + 100 } async fn bar(val: i32) -> i32 { val + 1000 } ``` 也可以基于任务完成的先后顺序进行处理. ```rust use hirun::runtime::{Builder, block_on, spawn, task, sleep}; use core::time::Duration; fn main() { Builder::new().nth(2).build().unwrap(); block_on(async { let mut set = task::JoinSet::new(); let _ = set.spawn(foo(100)).await; let _ = set.spawn(bar(200)).await; while let Some((seqno, val)) = set.wait_any().await { println!("{seqno}, return {}", val.unwrap()); } }); } async fn foo(val: i32) -> i32 { sleep(Duration::new(1, 0)).await; val + 100 } async fn bar(val: i32) -> i32 { val + 1000 } ``` ## Future的调度策略 ### TaskContext提供Future的abort机制 `JoinHandle::abort`可以强制终止一个异步任务,但是异步任务可能注册了一些只能在异步上下文中释放的资源,有些类似异步析构的应用场景. hirun提供了这种能力. ```rust use hirun::runtime::TaskContext; // trait TaskContext为Context提供了abort接口 impl Future for MyFuture { type Output = i32; fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { if ctx.aborted() { self.abort(ctx); // 在异步上下文中清理全局资源 return Poll::Pending; // abort场景下,外部已经不关心返回值,简单返回Pending即可. } // ... } } ``` TaskContext还提供了`exit`接口,可以从异步函数内部强制退出当前Task,退出之后,已经启动但还未结束的Future都会同上述样例一样,获取到abort通知. ### 异步函数内部 支持`sleep/yield`操作. ```rust use hirun::runtime; async fn foo() -> i32 { runtime::sleep(core::time::Duration(1, 0)).await; runtime::yield().await; 1 } ``` ### 异步函数外部 1. delay: 可以指定延时执行一个异步函数 ```rust use hirun::runtime::Extentions; async fn foo() -> i32 { 1 } async fn bar() -> i32 { foo().delay(core::time::Duration(1, 0)).await } ``` 2. deadline: 可以指定一个异步函数完成的最迟时间,超过则会被取消 ```rust use hirun::runtime::{Extentions, sleep}; async fn foo() -> i32 { sleep(core::time::Duration(2, 0).await; 1 } async fn bar() { assert!(foo().deadline(core::time::Duration(1, 0)).await.is_none()); } ``` 3. or: 并发执行多个异步函数,直到其中一个返回 ``` use hirun::runtime::Extentions; async fn foo() { 1 } async fn bar() { 2 } async fn baz() { let (foo_ret, bar_ret) = foo().or(bar()).await; assert_eq!(foo_ret, Some(1)); assert!(bar_ret.is_none()); } ``` 4. and: 并发执行多个异步函数,直到全部返回 ``` use hirun::runtime::Extentions; async fn foo() { 1 } async fn bar() { 2 } async fn baz() { let (foo_ret, bar_ret) = foo().and(bar()).await; assert_eq!(foo_ret, 1); assert_eq!(bar_ret, 2); } ``` 5. ready: 类似Poll::map接口,只是适用于返回值impl Future的函数 ``` use hirun::runtime::Extentions; async fn foo() -> i32 { 1 } async fn test() { let val = foo().ready(|val| { if val > 0 { "positive", } else { "negative" } }).await; assert_eq!(val, "positive"); } ``` ## Fd和AioFd 需要支持自定义的IPC通信机制,这些机制都是基于Linux的文件系统来实现的, 使用方式相同: 创建文件句柄,利用poll机制获取异步IO事件,调用read/write读写数据.Linux新的`io_uring`也可基于poll机制获取提交任务的完成情况. 本Crate简单封装fd,同时提供AioFd,支持异步读写和获取异步IO事件通知的功能,具有最大的普适性. 只封装了最基础的功能,更多的功能需要业务层基于libc crate的api来完成. ```rust use hirun::runtime::{Builder, block_on}; use hirun::net::{Fd, AioFd, SocketAddr}; use hirun::event::POLLIN; fn main() { Builder::new().nth(2).build().unwrap(); let _ = block_on(async { let server_addr = SocketAddr::inet("127.0.0.1", 2000).unwrap(); let fd = Fd::tcp_client(libc::AF_INET, None).unwrap(); let mut aiofd = AioFd::new(&fd); aiofd.connect(&server_addr).await.unwrap(); aiofd.wait_readable().await.unwrap(); let mut buf = [0_u8; 100]; if let Ok(size) = aiofd.try_read(&mut buf) { println!("recv {size} bytes from server"); } }).unwrap(); } ``` 也可以直接使用异步读取接口: ```rust use hirun::runtime::{Builder, block_on}; use hirun::net::{Fd, AioFd, SocketAddr}; fn main() { Builder::new().nth(2).build().unwrap(); let _ = block_on(async { let server_addr = SocketAddr::inet("127.0.0.1", 2000).unwrap(); let fd = Fd::tcp_client(libc::AF_INET, None).unwrap(); let mut aiofd = AioFd::new(&fd); aiofd.connect(&server_addr).await.unwrap(); let mut buf = [0_u8; 100]; if let Ok(size) = aiofd.read(&mut buf).await { println!("recv {size} bytes from server"); } }).unwrap(); } ``` 异步函数的参数一定会是异步任务的内置数据成员,而并发框架创建的异步任务都会占用堆内存空间. 如果大量任务使用异步读取接口,因为缓冲器在堆上分配, 可能导致占用的堆内存空间比较大. 如果内存资源有限,推荐使用`async wait + try_read`这种组合使用方式, **一定是try_xxx系列接口返回EAGAIN才调用async wait, 因为内部使用POLLET策略, 如果在可读或者可写情况下调用async wait,将不会返回**. **注意**: 对于Fd,一个工作线程中如果有多个AioFd并发请求异步Io事件,只有一个会得到响应,因此要求一个AioFd实现读写功能,如果必须分离,有两种方式 1. 利用hash调度策略,强制他们在不同的工作线程中被调度,这要求Fd满足'static生命周期要求 2. 将Fd::clone复制后创建不同的AioFd分别实现读写功能 3. 一个异步任务内部同时可用的fd数量有上限值,如果超过上限值,则AioFd::wait系列接口会返回失败. 此上限值缺省为32,如果需要更改,可以通过runtime::Attr::fdset接口设置并通过spawn_with系列接口使用. 同时注意AioFd::wait_none接口的调用时机. ## 宏`#[future]` 现有Rust自动判断异步函数是否支持Send的规则存在一定局限性. 一个异步函数内部仅仅是直接调用异步子函数,不会通过`spawn`类接口创建并发的异步任务,那么这个异步函数内部实际上是可以安全的使用Rc这些类型. 以下代码如果`async fn foo`不使用`#[future]`修饰,则会报告因为Future不支持Send无法通过编译. 注意: `#[future]`生成unsafe代码,将异步函数的函数体转换为支持Send,如果是异步函数的入参不支持Send,则这类异步函数只能使用`spawn_local`在当前线程调度. ```rust use hirun::runtime::{Builder, spawn}; use hirun::future; use std::rc::Rc; fn main() { Builder::new().nth(2).build().unwrap(); let h = spawn(foo(100)); println!("async foo return {}", h.join().unwrap()); } #[future] async fn foo(val: i32) -> i32 { let rc = Rc::new(100); val + bar(*rc).await } async fn bar(val: i32) -> i32 { val + 1000 } ``` ## 性能 examples/httpserver和examples/tokioserver是本crate和tokio实现的完全相同的一个测试用http server,可以利用httperf测试其性能. 启动httpserver, 服务监听端口2000. 支持`-h | --help`输出命令行参数说明. ```shell # cargo run --release --example httpserver -- --addr 2000 --size 102400 --threads 2 ``` 启动tokioserver, 服务监听端口2001. 支持`-h | --help`输出命令行参数说明. ```shell # export http_body_size=102400 # cargo run --release --example tokioserver -- --addr 2001 --size 102400 --threads 2 ``` 启动httperf测试, 具体测试参数参见httperf的帮助说明. ```shell # httperf --num-calls 10 --num-conns 1000 --port 2000 # 测试httpserver的能力 # httperf --num-calls 10 --num-conns 1000 --port 2001 # 测试httpserver的能力 ``` 目前已有的数据看,不弱于tokio,不少场景下(变化因素: `http_body_size`, `--num-calls`, `--num-conns`)比tokio更优. 在用户的使用环境上进行对比验证获取的数据最真实.