1 并发简介
并发(Concurrent)和并行(Papallel)时现代计算机系统常见的编程概念。并发指的是不同任务之间可以看作“独立”运行,程序的不同部分可以不按顺序运行,并且不允许最后结果;并行指的是两个任务可以同时运行。在本教程中,除非特殊说明,不对这两个概念作明显区分,统称并发。
Rust是无畏并发的,意思是它允许你编写没有明显bug的代码,并在不引入新bug的情况下易于重构(程序块之间的耦合性较低)。
需要注意的是,不要将并发和异步的概念搞混了。异步指的是在一个线程上同时执行任务,或者说是在一个线程上快速在多个任务之间切换,看起来好像是在执行多个任务一样;并发中一般有多个线程在同时工作,并发规定了这些线程完成任务的能力。
在大多数OS中,代码运行在进程(Process)中,每个进程中又可以分为多个线程(Thread),这些线程可以独立运行。这样的做法可以提高程序的性能表现,但是由于复杂性的增加,多线程可能会带来多个问题:
- 竞争:线程以不一致的顺序访问数据或者资源;
- 死锁:两个线程同时等待对方使用完目标资源,或者都在等待对方的完成状态,使得线程无法继续;
- 难以Debug:多线程的有些bug只在特定状态下有概率地触发,很难可靠地复制现象和修复
现代编程语言实现线程有两种方式:
- 通过调用OS的API来创建线程:1:1模型,它需要较小的运行时(环境);
- 语言自己创建多个线程(绿色线程):M:N模型,它需要更大的运行时(环境)。
对于Rust而言,需要权衡运行时环境的支持。除了汇编语言以外,其他语言都有自己的运行时,有些人认为C/C++等语言就不需要运行时环境,其实C/C++的运行时环境只是比较小而已,它们通常和OS深度绑定在一起,很多时候难以察觉。而有些语言则需要更大的运行时来获得更多的特性,例如Python,在运行它们编写的程序时需要安装特定的环境。Rust标准库只支持1:1模型的线程,其他M:N模型需要社区提供的第三方库的支持。
2 创建和使用线程
2.1 spawn join 创建线程并等待其完成
Rust通过thread::spawn
函数来创建线程,这个线程接收一个闭包作为参数:
use std::thread;
use std::time::Duration;
fn main() {
thread::spawn(|| {
for i in 1..10 {
println!("hi number from spawn thread {i}");
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..5 {
println!("hi number form main thread {i}");
thread::sleep(Duration::from_millis(1));
}
}
通过执行结果可以看到,两个线程都在执行,但是在主线程运行结束时,子线程也停止运行了。要想使得所有线程都完成执行,那么我们需要为thread::spawn()
函数返回的JoinHandle
类型使用join()
方法。
use std::thread;
use std::time::Duration;
fn main() {
let handle = thread::spawn(|| {
for i in 1..10 {
println!("hi number from spawn thread {i}");
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..5 {
println!("hi number form main thread {i}");
thread::sleep(Duration::from_millis(1));
}
handle.join().unwrap();
}
JoinHandle
持有值的所有权,调用join()
方法会阻塞当前线程的执行,直到handle所表示的线程结束。
2.2 move 闭包
move
闭包通常和thread::spawn
函数一起使用,它允许你使用其他线程的数据(一般使用主线程的数据),即创建的时候将值的所有权转移到目标线程中。
use std::thread;
fn main() {
let v = vec![1, 2, 3];
let handle = thread::spawn(move || { // v 的所有权转移到线程中
println!("Here's a vector: {:?}", v);
});
// drop(v);
handle.join().unwrap();
}
3 使用消息传递的并发
3.1 Channel
消息传递是一种流行的能够保证安全并发的技术,线程之间通过彼此发送消息来进行通信。Go语言有一句名言:不要用共享内存来通信,要用通信来共享内存。Rust对并发的设计也体现了这种思想。
Rust主要通过Channel
来进行消息传递。一个Channel
至少包含一个发送端和接收端,我么调用发送端的方法发送数据,调用接收端端方法检查和接收数据,如果发送端或者接受端端任意一端被丢弃,那么这个Channel
就被关闭了。
可以使用mpsc::channel
函数来创建一个Channel
(mpsc:multiple producer single consumer,多生产者单消费者,意即可以有多个发送端,但是接受端只有一个),它返回一个元组,里面的元素分别是发送端和接收端。
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || { // 子线程必须要取得tx的所有权
let val = String::from("hi");
tx.send(val).unwrap(); // val 的所有权发生转移,之后无法使用
// println!("{}", val);
});
let result = rx.recv().unwrap(); // recv() 会阻塞当前线程,直到收到消息
println!("Got {}", result);
}
在发送端和接收端的方法中,都返回的是Result枚举,如果另一端关闭,则返回一个错误。对于接受端,还有一个方法是try_recv()
,它不会阻塞当前线程,而是立即返回一个Result。我们通常循环调用try_recv()
方法来获取其他线程的数据。
发送端发送多个值,可以看到接受端在等待:
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || { // 子线程必须要取得tx的所有权
let vals = vec!["hi", "from", "spawn", "thread"];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_millis(100));
}
});
for result in rx { // 直接将rx当做一个迭代器
println!("Got: {}", result);
}
}
3.2 通过克隆创建多个发送端
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
let tx1 = mpsc::Sender::clone(&tx);
thread::spawn(move || {
let vals = vec!["hi", "from", "spawn", "thread"];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_millis(100));
}
});
thread::spawn(move || {
let vals = vec!["1: hi", "1: from", "1: spawn", "1: thread"];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_millis(100));
}
});
for result in rx {
println!("Got: {}", result);
}
}
可以看到,主线程接收数据的顺序是无法保证的。
4 共享状态的并发
使用Channel基本只能实现单所有权的线程间交流,因为发送端(接受端)一旦移入线程内部,外部程序就无法访问了。但是实际上Rust也支持多所有权的并发,即各个线程之间共享同一快内存的状态,并且在这个过程中保证安全。
Rust使用Mutex来使得每次只允许一个线程来访问数据,Mutex是mutual exclusion(互斥锁)的简写,在同一时刻,Mutex只允许一个线程来访问某些数据。想要访问数据,线程必须首先获取互斥锁(lock
方法),Lock数据结构是Mutex的一部分,它能够跟踪在特定时刻哪个线程对数据拥有独占访问权。Mutex通常被描述为:通过锁定系统来保护它所持有的数据。
由此,我么可以得出Mutex有两条规则:
- 在使用数据之前,必须尝试获取锁(lock);
- 使用完mutex所保护的数据,必须对数据进行解锁,以便其他线程可以获取锁。
4.1 Mutex<T>
通过Mutex::new(data)
来创建一个Mutex<T>
,参数data
就是我们要保护的数据。访问数据前,通过lock
方法来获取锁,这个方法会阻塞当前线程,也有可能获取失败,它返回一个MutexGuard
智能指针。
use std::sync::Mutex;
fn main() {
let m = Mutex::new(5);
{
let mut num = m.lock().unwrap();
*num = 6; // 可以直接使用解引用修改数据
}
println!("m = {:?}", m);
}
4.2 多线程共享 Mutex
直接将Mutex<T>
放置在多个线程中是不行的,因为多个线程会引起所有权冲突,为了解决这个冲突,需要引入多线程下的多重所有权分配的专用智能指针Arc<T>
。Arc<T>
和Rc<T>
类似,但是它可以用于并发的场景,A即atomic,即原子的(使用Arc<T>
需要一定的性能代价)。
看下面这个例子,创建了十个线程,每个线程都需要获得counter的所有权,所以我们使用Arc<T>
将counter
包裹起来,使得这10个线程都共享它的所有权。
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
5 Send 和 Sync trait
Send和Sync trait都是标签trait,因为实际上它们没有定义任何方法。
Send trait允许在线程间转移所有权。Rust中几乎所有的类型都实现了Send,Rc<T>
是个例外,所以它只适用于单线程场景。
实现Sync的线程可以安全地被多个线程引用,如果T
实现了Sync
,那么&T
就相当于实现了Send
(可以安全地送往另一个线程)。
手动实现Send和Sync是不安全的,因为其中涉及一些不安全的Rust代码。