#async-io #non-blocking #async #future #net

no-std hirun

A concurrent framework for asynchronous programming based on event-driven, non-blocking I/O mechanism

16 releases

new 0.1.15 Nov 19, 2024
0.1.14 Nov 15, 2024
0.1.11 Oct 27, 2024
0.1.8 Aug 12, 2024
0.1.4 Aug 25, 2023

#83 in Asynchronous

Download history 120/week @ 2024-08-09 17/week @ 2024-08-16 11/week @ 2024-09-13 7/week @ 2024-09-20 6/week @ 2024-09-27 1/week @ 2024-10-04 99/week @ 2024-10-18 256/week @ 2024-10-25 32/week @ 2024-11-01 173/week @ 2024-11-08 320/week @ 2024-11-15

805 downloads per month

MIT/Apache

275KB
7.5K SLoC

hirun

提供rust异步并发框架,底层基于非阻塞的IO操作和事件驱动的机制来实现.

A runtime for writing asynchronous applications with the Rust programming language, based on event-driven, non-blocking I/O mechanism.

曾经在工作中深入对比过已有的C/Rust并发框架的并发性能,C版本的私有实现在各个场景下的测试数据都比tokio的好,但Rust版本在编码效率和难度上都胜于C版本. 此外C版本本身在嵌入式环境上使用,内存和磁盘资源都非常受限,因此Rust并发框架支持no_std是非常有必要的.

We have compared the concurrent performance of the existing C/Rust concurrent framework in our work. The test data of the proprietary implementation of the C version is better than that of the Tokio version in all scenarios, but the Rust version is better than that of the C version in terms of coding efficiency and difficulty. In addition, the C version itself is used in embedded environments, and memory and disk resources are very limited. Therefor, it is necessary for the Rust concurrent framework to support no_std.

版本更新说明

  1. 0.1.15 版本更新情况
  • Task支持priority,0代表普通,大于0代表高优先级. 网络应用,负责监听的Task可设置为高优先级,对于新建连接,响应时延有利. 参考example/httpserver
  • 修复channel::send_slice可能出现的panic
  • 内部其他的优化.
  1. 0.1.14 版本更新情况
  • 兼容rustc 1.76.0, v0.1.13版本修改不全
  • issues: #1 #2 #5
  1. 0.1.13 版本更新情况
  • 解决0.1.12版本在-C opt-level >= 1时无法正常工作的bug.
  • 兼容rustc 1.76.0版本
  1. 0.1.12 版本更新情况
  • 适配hipool 0.2版本
  • 新增runtime::TaskContext为Future提供处理abort响应机制(类似异步析构的能力),基于此机制完善Future的deadline/or以及JoinHandle::abort的功能.
  • runtime::Extensions新增wait_entry操作,利用POLLET机制提升io效率
  • 完全消除了AioFd所有权转移可能带来的隐患.
  • 其他为后续功能扩展的内部变化
  1. 0.1.11 Fd新增setsockopt_i32/getsockopt_i32/get_sock_error/copy_bidirectional等新接口
  2. 0.1.10: 修复AioFd在所有权转移后存在的Bug,修复SocketAddr的Display、Debug trait实现的bug
  3. 0.1.9: future增加Or/And/在task内部并发调度future的功能
  4. 0.1.8: 匹配hierr 0.2版本, 和hipool保持一致

后续计划

  1. 支持windows
  2. 支持http proxy
  3. 支持socket proxy

no_std

no_std环境也需要一个异步并发框架,现在广泛使用的tokio等并不支持. 本crate基于linux libc的能力构建. 因为内部使用了linux的eventfd/epoll,当前还仅支持linux.

同步异步混合编程

spawn系列接口基本上同tokio的定义,只要在运行时创建之后可以在同步和异步环境的任何时候调用它, 没有调用上下文的约束. 此外其返回的JoinHandle提供join接口,供同步环境中等待异步任务的结束. 注意异步函数中,不能调用join,否则会阻塞当前工作线程.

阻塞等待异步任务结束,一般用于业务层的异步任务的入口函数的调用. 这类似标准库thread::scope的使用方式.

use hirun::runtime::{Builder, block_on};

fn main() {
    Builder::new().build().unwrap();
    let val = block_on(async_main(100)).unwrap();
    println!("async_main return {val}");
}

async fn async_main(val: i32) -> i32 {
    val + 100
}

在没有统一的异步任务入口的时候,只是在同步流程的某些环节利用异步并发机制提升并发度,那么可以利用spawn接口,在需要的时候调用join等待异步任务返回,这样异步任务和同步环境可并发执行. 这类似标准库thread::spawn的使用方式.

use hirun::runtime::{Builder, spawn};

fn main() {
    Builder::new().build().unwrap();
    let val = spawn(foo(100)).join().unwrap();
    println!("async foo return: {val}");
}

async fn foo(val: i32) -> i32 {
    val + 100
}

支持运行时多实例

包含阻塞操作的异步任务最好在单独的运行时实例中调度,避免对其他异步任务的影响. 可在spawn_with接口中按需指定运行时实例.

use hirun::runtime::{Builder, spawn, spawn_with, Attr};

const BLOCK_RUNTIME_ID: u8 = 1;

fn main() {
    Builder::new().build().unwrap();
    Builder::new().id(BLOCK_RUNTIME_ID).build().unwrap();
    let h1 = spawn(foo(100));
    let h2 = spawn_with(bar(200), Attr::new().id(BLOCK_RUNTIME_ID));
    println!("default runtime: foo return {}", h1.join().unwrap());
    println!("runtime_1: bar return {}", h2.join().unwrap());
}

async fn foo(val: i32) -> i32 {
    val + 100
}

async fn bar(val: i32) -> i32 {
    val + 1000
}

支持基于hash的调度策略

业务上有需要约束某些任务必须在一个线程或者不同线程中调度,业务层可以为任务指定hash值实现这个功能. 使用这个功能应该要了解运行时的工作线程的数量, 才能利用hash值达到自身的控制目标.

以下代码强制异步任务一定在同一个工作线程运行.

use hirun::runtime::{Builder, spawn_with, Attr};
use libc::pthread_self;

fn main() {
    Builder::new().nth(2).build().unwrap();
    let h1 = spawn_with(foo(200), Attr::new().hash(1));
    let h2 = spawn_with(bar(200), Attr::new().hash(1));
    println!("foo return {}", h1.join().unwrap());
    println!("bar return {}", h2.join().unwrap());
}

async fn foo(val: i32) -> i32 {
    println!("pthread_id: {}", unsafe { pthread_self() });
    val + 100
}

async fn bar(val: i32) -> i32 {
    println!("pthread_id: {}", unsafe { pthread_self() });
    val + 1000
}

JoinSet

批量分发异步任务后,可能有需要等待所有任务执行完毕后返回,也可能等待最先完成的任务返回,可利用JoinSet实现.

以下等待所有任务完成后再返回.

use hirun::runtime::{Builder, block_on, spawn, JoinSet};

fn main() {
    Builder::new().nth(2).build().unwrap();
    block_on(async {
        let mut set = JoinSet::new();
        let _ = set.spawn(foo(100));
        let _ = set.spawn(bar(200));
        for (seqno, val) in set.wait_all().await {
            println!("{seqno}, return {}", val.unwrap());
        }
    });
}

async fn foo(val: i32) -> i32 {
    val + 100
}

async fn bar(val: i32) -> i32 {
    val + 1000
}

也可以基于任务完成的先后顺序进行处理.

use hirun::runtime::{Builder, block_on, spawn, JoinSet, sleep};
use core::time::Duration;

fn main() {
    Builder::new().nth(2).build().unwrap();
    block_on(async {
        let mut set = JoinSet::new();
        let _ = set.spawn(foo(100));
        let _ = set.spawn(bar(200));
        while let Some((seqno, val)) = set.wait_any().await {
            println!("{seqno}, return {}", val.unwrap());
        }
    });
}

async fn foo(val: i32) -> i32 {
    sleep(Duration::new(1, 0)).await;
    val + 100
}

async fn bar(val: i32) -> i32 {
    val + 1000
}

Future的调度策略

TaskContext提供Future的abort机制

JoinHandle::abort可以强制终止一个异步任务,但是异步任务可能注册了一些只能在异步上下文中释放的资源,有些类似异步析构的应用场景. hirun提供了这种能力.

use hirun::runtime::TaskContext; // trait TaskContext为Context提供了abort接口

impl Future for MyFuture {
    type Output = i32;
    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        if ctx.aborted() {
            self.abort(ctx); // 在异步上下文中清理全局资源
            return Poll::Pending; // abort场景下,外部已经不关心返回值,简单返回Pending即可.
        }
        // ...
    }
}

TaskContext还提供了exit接口,可以从异步函数内部强制退出当前Task,退出之后,已经启动但还未结束的Future都会同上述样例一样,获取到abort通知.

异步函数内部

支持sleep/yield操作.

use hirun::runtime;
async fn foo() -> i32 {
    runtime::sleep(core::time::Duration(1, 0)).await;
    runtime::yield().await;
    1
}

异步函数外部

  1. delay: 可以指定延时执行一个异步函数
use hirun::runtime::Extentions;

async fn foo() -> i32 { 1 }
async fn bar() -> i32 {
    foo().delay(core::time::Duration(1, 0)).await
}
  1. deadline: 可以指定一个异步函数完成的最迟时间,超过则会被取消
use hirun::runtime::{Extentions, sleep};

async fn foo() -> i32 { 
    sleep(core::time::Duration(2, 0).await;
    1
}

async fn bar() {
    assert!(foo().deadline(core::time::Duration(1, 0)).await.is_none());
}
  1. or: 并发执行多个异步函数,直到其中一个返回
use hirun::runtime::Extentions;

async fn foo()  { 1 }
async fn bar()  { 2 }
async fn baz() {
    let (foo_ret, bar_ret) = foo().or(bar()).await;
    assert_eq!(foo_ret, Some(1));
    assert!(bar_ret.is_none());
}
  1. and: 并发执行多个异步函数,直到全部返回
use hirun::runtime::Extentions;

async fn foo()  { 1 }
async fn bar()  { 2 }
async fn baz() {
    let (foo_ret, bar_ret) = foo().and(bar()).await;
    assert_eq!(foo_ret, 1);
    assert_eq!(bar_ret, 2);
}
  1. ready: 类似Poll::map接口,只是适用于返回值impl Future的函数
use hirun::runtime::Extentions;

async fn foo() -> i32 { 1 }
async fn test() {
    let val = foo().ready(|val| {
        if val > 0 {
            "positive",
        } else {
            "negative"
        }
    }).await;
    assert_eq!(val, "positive");
}

Fd和AioFd

需要支持自定义的IPC通信机制,这些机制都是基于Linux的文件系统来实现的, 使用方式相同: 创建文件句柄,利用poll机制获取异步IO事件,调用read/write读写数据.Linux新的io_uring也可基于poll机制获取提交任务的完成情况.

本crate未提供TcpLisenter/TcpStream这类高级封装,仅封装fd,即Fd,同时提供AioFd,支持异步读写和获取异步IO事件通知的功能,具有最大的普适性. 只封装了最基础的功能,更多的功能需要业务层基于libc crate的api来完成.

use hirun::runtime::{Builder, block_on};
use hirun::net::{Fd, AioFd, SocketAddr};
use hirun::event::POLLIN;

fn main() {
    Builder::new().nth(2).build().unwrap();
    let _ = block_on(async {
        let server_addr = SocketAddr::inet("127.0.0.1", 2000).unwrap();

        let fd = Fd::tcp_client(libc::AF_INET, None).unwrap();
        let mut aiofd = AioFd::new(&fd);

        aiofd.connect(&server_addr).await.unwrap();
        aiofd.wait(POLLIN).await.unwrap();

        let mut buf = [0_u8; 100];
        if let Ok(size) = aiofd.try_read(&mut buf) {
            println!("recv {size} bytes from server");
        }
    }).unwrap();
}

也可以直接使用异步读取接口:

use hirun::runtime::{Builder, block_on};
use hirun::net::{Fd, AioFd, SocketAddr};

fn main() {
    Builder::new().nth(2).build().unwrap();
    let _ = block_on(async {
        let server_addr = SocketAddr::inet("127.0.0.1", 2000).unwrap();

        let fd = Fd::tcp_client(libc::AF_INET, None).unwrap();
        let mut aiofd = AioFd::new(&fd);

        aiofd.connect(&server_addr).await.unwrap();

        let mut buf = [0_u8; 100];
        if let Ok(size) = aiofd.read(&mut buf).await {
            println!("recv {size} bytes from server");
        }
    }).unwrap();
}

异步函数的参数一定会是异步任务的内置数据成员,而并发框架创建的异步任务都会占用堆内存空间. 如果大量任务使用异步读取接口,因为缓冲器在堆上分配, 可能导致占用的堆内存空间比较大. 如果内存资源有限,推荐使用async wait + try_read这种组合使用方式, 一定是try_xxx系列接口返回EAGAIN才调用async wait, 因为内部使用POLLET策略, 如果在可读或者可写情况下调用async wait,将不会返回.

注意: 对于Fd,一个工作线程中如果有多个AioFd并发请求异步Io事件,只有一个会得到响应,因此要求一个AioFd实现读写功能,如果必须分离,有两种方式

  1. 利用hash调度策略,强制他们在不同的工作线程中被调度,这要求Fd满足'static生命周期要求
  2. 将Fd::clone复制后创建不同的AioFd分别实现读写功能

#[future]

现有Rust自动判断异步函数是否支持Send的规则存在一定局限性. 一个异步函数内部仅仅是直接调用异步子函数,不会通过spawn类接口创建并发的异步任务,那么这个异步函数内部实际上是可以安全的使用Rc这些类型.

以下代码如果async fn foo不使用#[future]修饰,则会报告因为Future不支持Send无法通过编译.

注意: #[future]生成unsafe代码,将异步函数的函数体转换为支持Send,如果是异步函数的入参不支持Send,则这类异步函数只能使用spawn_local在当前线程调度.

use hirun::runtime::{Builder, spawn};
use hirun::future;
use std::rc::Rc;

fn main() {
    Builder::new().nth(2).build().unwrap();
    let h = spawn(foo(100));
    println!("async foo return {}", h.join().unwrap());
}

#[future]
async fn foo(val: i32) -> i32 {
    let rc = Rc::new(100);
    val + bar(*rc).await
}

async fn bar(val: i32) -> i32 {
    val + 1000
}

性能

examples/httpserver和examples/tokioserver是本crate和tokio实现的完全相同的一个测试用http server,可以利用httperf测试其性能.

启动httpserver, 服务监听端口2000:

# export http_body_size=102400
# cargo run --release --example httpserver

启动tokioserver, 服务监听端口2001:

# export http_body_size=102400
# cargo run --release --example tokioserver

启动httperf测试, 具体测试参数参见httperf的帮助说明.

# httperf --num-calls 10 --num-conns 1000 --port 2000 # 测试httpserver的能力
# httperf --num-calls 10 --num-conns 1000 --port 2001 # 测试httpserver的能力

目前已有的数据看,不弱于tokio,不少场景下(变化因素: http_body_size, --num-calls, --num-conns)比tokio更优.

在用户的使用环境上进行对比验证获取的数据最真实.

Dependencies

~2MB
~45K SLoC