1 Rust 异步
所谓异步(Async),是指在少量线程环境下运行多个独立任务。嵌入式中的的RTOS(Real Time OS,实时操作系统)就可以看作一种异步编程。异步编程适用于IO密集型任务。
所谓IO密集型任务,是指CPU在运行过程中需要多次和数据以及地址总线交互,并且任务的运行效率主要局限于输入输出端口的读写时间的任务,例如读写内存,读写大量文件,执行Web请求等。与之相比的是CPU密集型任务,它指的是程序的运行效率主要局限于CPU的运算效率,例如执行大量运算。
对于IO密集型任务,程序中的大量时间实际上都处于“等待”状态,所谓异步就是通过某种算法(任务调度器)使得CPU把这些“等待”的时间都利用起来的编程方式。一般来说,IO受限(IO Bound)的程序(程序执行的速度依赖于IO子系统的速度)比起CPU受限(CPU Bound)的任务(程序执行的速度依赖于CPU的运算速度)可能更适合于异步任务的执行(也不是绝对的,存在例外)。
举一个例子,当CPU等待外部事件或者动作的时候,异步运行时会安排其他可继续执行的任务在CPU上执行,当从磁盘或者IO子系统的系统中断到达的时候,异步运行时就会知道这件事,并安排原来的任务继续执行。
Rust标准库本身并没有提供异步的运行时(环境),而是仅仅提供了用于异步编程的关键字async
和.await
(相当于提供了语法糖),他们是Rust标准库里用于异步编程的内置核心原语的代表。Rust的异步运行时由Rust社区提供和维护。
博主学习异步编程主要是为了为后续嵌入式编程打基础。实际上,如果你的项目没有必要应用异步技术,那么从功利的角度说,就不需要花费大量时间研究异步编程。
下面来看一个Web服务器的例子,使用多线程(Multi-Threading)技术对服务器执行增删改查操作。使用多线程虽然可以提高效率,但是却提高了复杂性:
- 各个线程之间的执行顺序无法预测;
- 线程之间可能发生死锁;
- 线程之间可能发生竞争;
Rust标准库仅实现了1:1的多线程模型,即一个语言线程对应一个系统线程(通过语言提供的API创建线程);而对于M:N模型(M个准线程,或者说M个绿色线程对应N个系统线程)由社区提供的第三方标准库实现。
对上面的例子使用异步的方式进行改进:使用一个异步运行时管理多个任务,每个HTTP请求被异步Web Server接受,Web Server会生成一个任务来处理它,并由异步运行时安排各个任务在可用的CPU上执行。
2 tokio 异步运行时
tokio
是适用于Rust的一个异步运行时(库),使用时需要在Cargo.toml
文件中的dependencies
模块中对其进行声明:
[dependencies]
tokio = {version = "1", features = ["full"]}
看下面这个例子:从两个文件中读取数据,第一个文件读取需要4s,第二个文件读取需要2s。使用异步的方式创建两个异步任务(函数),并且通过await
关键字来使得异步任务得以执行:
use std::thread::sleep;
use std::time::Duration;
#[tokio::main]
async fn main() {
println!("Hello before reading file!");
let h1 = tokio::spawn(async {
let _file1_contents = read_from_file1().await; // 调用异步任务函数使用.await关键字
});
let h2 = tokio::spawn(async {
let _file2_contents = read_from_file2().await;
});
let _ = tokio::join!(h1, h2);
}
async fn read_from_file1() -> String { // 异步函数,惰性的,只有遇到.await关键字才会执行
sleep(Duration::new(4, 0));
println!("{:?}", "Processing file 1");
String::from("Hello there from file 1")
}
async fn read_from_file2() -> String { // 异步函数,惰性的,只有遇到.await关键字才会执行
sleep(Duration::new(2, 0));
println!("{:?}", "Processing file 2");
String::from("Hello there from file 2")
}
上面两个任务是并发执行的,但是和多线程不同的是,这两个任务可以执行在同一个线程上,也可以执行在单独的线程上,这就依赖于tokio运行时的配置了。
3 tokio 的组成和运行过程
tokio运行时是一个管理异步任务并安排他们在CPU上执行的组件。一个程序可能生成多个任务,每个任务可能包含一个或者多个Future。
3.1 Future
3.1.1 Future 简介
Future
是Rust异步的核心。Future是由异步计算或者函数产生的单一最终值,Rust的异步函数都会返回Future,基本可以说它代表着延迟的计算。有时候,在创建一个异步任务时,我们说这个异步任务拥有一个Future。
上面例子中并没有体现Future的使用,但是实际上相关的代码被async
关键字隐藏了。上面例子中的异步函数相当于下面的代码:
use std::future::Future;
fn read_from_file1() -> impl Future<Output = String> {
async {
sleep(Duration::new(4, 0));
println!("{:?}", "Processing file 1");
String::from("Hello there from file 1")
}
}
fn read_from_file2() -> impl Future<Output = String> {
async {
sleep(Duration::new(2, 0));
println!("{:?}", "Processing file 2");
String::from("Hello there from file 2")
}
}
Future
这个trait中只有一个poll
方法(这里的poll可以理解为“查询”),这个方法是异步运行时运行的重要方法。它被异步运行时调用,用于检查异步任务是否已经完成,这个方法返回一个枚举,这个枚举只有两个变体:挂起Poll::Pending
(任务没有完成)和就绪Poll::Ready(val)
(任务完成了)。
同时,poll
方法的self
的类型是Pin
,这也是一块更加高级的内容。简单来说,Pin(固定)代表了一种编程思想,Future可能会被异步运行时反复地poll
,即将Future
固定在内存的特定位置,这对于异步代码中内存的安全性是必要的。
pub trait Future {
/// The type of value produced on completion.
#[stable(feature = "futures_api", since = "1.36.0")]
#[lang = "future_output"]
type Output;
// some comments ...
#[lang = "poll"]
#[stable(feature = "futures_api", since = "1.36.0")]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
3.1.2 例子:自定义一个Future
use std::future::Future;
use std::thread::sleep;
use std::time::Duration;
use std::pin::Pin;
use std::task::{Context, Poll}; // Context 包含异步任务的上下文,可以用来唤醒当前的任务
struct ReadFileFuture {}
impl Future for ReadFileFuture {
type Output = String;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
println!("Tokio! Stop polling me!");
Poll::Pending
}
}
#[tokio::main]
async fn main() {
println!("Hello before reading file!");
let h1 = tokio::spawn(async {
let future1 = ReadFileFuture {};
future1.await
});
let h2 = tokio::spawn(async {
let _file2_contents = read_from_file2().await;
});
let _ = tokio::join!(h1, h2);
}
//
async fn read_from_file2() -> String {
sleep(Duration::new(2, 0));
println!("{:?}", "Processing file 2");
String::from("Hello there from file 2")
}
在上面的例子中,为ReadFileFuture
这个结构体实现了Future
这个trait,在poll
函数里使这个函数打印一句话,并且总是返回Poll::Pending
。运行这个程序,可以看到任务2(read_from_file2
)被执行,任务1(ReadFileFuture
)在打印一句话之后就再无输出:
3.2 异步执行器
Rust的Future需要不断地被跟进和驱动,就像一个事无巨细的项目经理。poll方法究竟是谁在调用呢?答案是异步执行器,它是异步运行时的一部分,tokio
中就有一个异步执行器。异步执行器会管理一个Future
的集合,并通过调用Future
上的poll
方法来驱动他们完成。函数或者代码块前面加上async
关键字的时候,就相当于告诉异步执行器这个代码块会返回Future
,这个Future
需要被驱动知道完成。
知道了谁在调用poll
方法,那么接下来的问题是异步执行器是如何调用它的呢?他是怎么知道异步任务已经准备好可以取得进展了呢?它会不断地调用poll
方法吗?如果没有其他的部件,可能就只能轮询了。但是那样效率过低,我们还需要其他的组件来使其可以运行。我们如果不要轮询,那么就在每个任务上装一个“闹钟”,当满足一定的条件的时候,就让那个“闹钟”叫醒任务,告诉它你可以接着执行了。这里的这个闹钟就是Waker组件。
3.3 Waker
Waker在这里是唤醒的意思,或者说通知的意思。当tokio调用poll
方法,如果得到了Poll::Pending
,那么这个任务就会被注册到一个Waker(唤醒器)组件。Waker会有一个处理程序(handle),它会被储存再任务关联的Context对象中。其他部件要通过这个Waker组件来告诉任务(或者说告诉tokio运行时):“你的任务可以取得进展了,你可以再次调用poll
方法”。
Waker有一个wake()
方法,可以用来告诉异步执行器关联的任务需要被唤醒了。当wake()
方法被调用的时候,tokio执行器就会被通知是时候再poll
一下这个任务了。如果poll
之后返回的是Poll::Ready(val)
,那么就可以获取到任务需要返回的值。
3.4 Tokio反应器以及整体组成
tokio运行时需要知晓操作系统(内核)的方法来开启IO操作(读取网络数据,读写文件等)。tokio运行时会注册异步的处理程序,以便在事件发生时作为IO操作的一部分进行调用。在tokio中从内核监听这些事件并与其他部分通信的组件就是反应器。
例如当一个任务需要读取文件,文件读取结束后,文件子系统会触发一个系统中断,tokio反应器就会发出反应,将这个中断翻译成tokio反应器可以识别的一个事件;之后反应器通知任务:“文件操作的数据已经准备好了!”,任务再通知他的Waker组件:”我可以产生一个值了!“,Waker组件再通知执行器:”来poll
我!“,执行器通过这个poll
获取到从文件中读取到的值。
4 一个定时器的例子
创建一个自定义的Future,它是一个定时器,功能如下:
- 可设定超时时间;
- 当异步任务被
poll
的时候,检查:- 如果当前已经超时,则返回
Poll::Ready(val)
,里面带一个String
值; - 如果没有超时,就睡眠到超时为止。然后触发Waker上的
wake()
方法,这就会通知异步运行时的执行器再次安排执行这个任务(接着poll
)。
- 如果当前已经超时,则返回
use std::future::Future;
use std::thread::sleep;
use std::time::{Duration, Instant};
use std::pin::Pin;
use std::task::{Context, Poll};
struct AsyncTimer {
expiration_time: Instant,
}
impl Future for AsyncTimer {
type Output = String;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if Instant::now() >= self.expiration_time {
println!("Time is over!");
Poll::Ready(String::from("Future 1 is completed"))
} else {
println!("It's not yet time 1 for Future 1. Going to sleep");
let waker = cx.waker().clone();
let expiration_time = self.expiration_time;
std::thread::spawn(move || {
let current_time = Instant::now();
if current_time < expiration_time {
sleep(expiration_time - current_time);
}
waker.wake();
});
Poll::Pending
}
}
}
#[tokio::main]
async fn main() {
let h1 = tokio::spawn(async {
let future1 = AsyncTimer {
expiration_time: Instant::now() + Duration::from_millis(4000),
};
println!("{:?}", future1.await);
});
let h2 = tokio::spawn(async {
let file2_content = read_from_file2().await;
println!("{}", file2_content);
});
let _ = tokio::join!(h1, h2);
}
async fn read_from_file2() -> String {
sleep(Duration::new(2, 0));
String::from("Future 2 has completed!")
}
上面的例子中,AsyncTimer
的poll
方法只会被调用两次。