本篇介绍一下线程池。也可以说这一部分是介绍异步任务的。
主要介绍了 rayon 和 tokio 的一小部分。
tokio 非常强大,还在学习中。
1:1 线程模型
由于操作系统提供了创建线程的 API,因此部分语言会直接调用该 API 来创建线程,因此最终程序内的线程数和该程序占用的操作系统线程数相等,一般称之为 1:1 线程模型,例如 Rust。
M:N 线程模型
有些语言在内部实现了自己的线程模型(绿色线程、协程),程序 内部的 M 个线程最后会以某种映射方式使用 N 个操作系统线程去运行,因此称之为 M:N 线程模型,典型代表就是 Go 语言。
Rust 原生的线程是 1:1 模型,有些三方库实现了 M:N 模型,比如大名鼎鼎的 tokio。
Rust 原生线程
使用 thread::spawn 创建线程
use std::thread;
use std::time::Duration;
fn main() {
thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});
// 等待线程执行完成
handle.join().unwrap();
for i in 1..5 {
println!("hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1));
}
}
demo
线程内部的代码使用闭包来执行
main 线程一旦结束,程序就立刻结束。
#[get("/hello1")]
async fn hello1() -> HttpResponse {
info!("hello1 start");
thread::spawn(||{
handle();
});
info!("hello1 end");
success(Option::from(format!("Hello1")))
}
fn handle() {
info!("do something for hello1");
thread::sleep(time::Duration::from_secs(3));
}
修改了原来示例中的 hello1 接口,假设其内部逻辑处理需要3秒钟,那么客户端请求过来需要等待3秒钟才有响应。
2023-11-22 16:24:00.497 INFO ThreadId(15) rust_web: 16: hello1 start
2023-11-22 16:24:00.497 INFO ThreadId(15) rust_web: 25: do something for hello1
2023-11-22 16:24:03.510 INFO ThreadId(15) rust_web: 20: hello1 end
Request to /hello1 took 3.014134917s
将耗时的逻辑处理在子线程中执行,使接口响应更加迅速。
2023-11-22 16:24:25.425 INFO ThreadId(15) rust_web: 16: hello1 start
2023-11-22 16:24:25.426 INFO ThreadId(15) rust_web: 20: hello1 end
Request to /hello1 took 878.583µs
2023-11-22 16:24:25.426 INFO ThreadId(26) rust_web: 25: do something for hello1
线程池
当后端应用创建过多的线程时,可能会出现以下问题:
-
内存消耗增加:每个线程都需要一定的内存来存储线程栈、上下文和其他相关数据。如果创建了大量线程,将会消耗大量的内存资源,可能导致内存不足或增加了内存压力。
-
上下文切换开销增加:线程之间的切换会导致上下文切换开销。当线程数量增加时,操作系统需要频繁地进行线程调度和切换,这会增加CPU的负载和上下文切换的开销,可能导致系统性能下降。
-
系统资源竞争:创建过多的线程可能导致系统资源的竞争。例如,如果每个线程都需要访问共享的数据库连接、文件句柄或其他资源,那么线程之间可能会发生竞争条件,导致性能下降、数据不一致或死锁等问题。
-
调试和维护困难:线程数量过多会增加代码的复杂性和调试的难度。线程之间的交互和同步可能变得复杂,容易引入并发相关的bug。同时,维护大量线程的代码也会增加开发和维护的复杂性。
所以不能无限制的创建线程,可以使用线程池,以有限数量的线程来完成复杂业务逻辑。
Rust 中常用的线程池库
- rayon: rayon 是一个简单易用的并行计算库。它可以自动将任务分配给线程池中的线程,并根据需要动态调整线程数量。rayon 的设计目标是简化并行计算的编程,并提供高性能的并行执行。它适用于数据并行的任务,如迭代、映射、过滤等。适用于 CPU 密集型任务。
- tokio: tokio 是一个异步运行时库,它提供了一个基于事件驱动的线程池。它使用了非阻塞的 I/O 模型和异步任务调度器,适用于构建高性能的异步应用程序。tokio 的线程池可以自动调整线程数量,并提供了丰富的异步操作和工具。它是构建网络服务器、处理并发请求和执行异步任务的常用选择。适用于 IO 密集型任务。
rayon
rayon 常用来处理并行计算,类似 java 中 parallelStream。
demo
添加依赖
rayon = "1.8.0"
并行迭代器可以很容易地将顺序迭代器转换为并行执行。
ParallelIterator trait 定义了所有并行迭代器的通用方法。
IndexedPalallelIterator trait 为支持随机访问的迭代器添加了方法。
最重要的 par_iter 方法,提供了许多迭代函数的并行实现,如 map、for_each、filter、fold 等。
使用并行非常简单,替换原生的迭代器即可
iter() -> par_iter()
iter_mut() -> par_iter_mut()
into_iter() -> into_par_iter()
use rayon::prelude::*;
fn handle() {
info!("sum start");
let start = Instant::now();
let vec: Vec<i32> = (0..=1000).collect();
let sum:i32 = vec.iter().map(|x| {
thread::sleep(Duration::from_millis(5));
x
}).sum();
let elapsed_time = start.elapsed();
info!("0..=1000 sum = {},elapsed time {}", sum, elapsed_time);
}
计算 0 到 1000 的累加值,每次计算线程 sleep 5毫秒。
单线程情况下共耗时 6s。
2023-11-22 22:11:40.345 INFO ThreadId(26) rust_web: 44: 0..=1000 sum = 500500,elapsed time 6s285ms561µs791ns
接下来使用 rayon 并行计算
只需要将 iter() 替换为 par_iter() 方法。
let sum:i32 = vec.par_iter().map(|x| {
thread::sleep(Duration::from_millis(5));
x
}).sum();
耗时为 683ms。
2023-11-22 22:13:56.602 INFO ThreadId(26) rust_web: 39: 0..=1000 sum = 500500,elapsed time 683ms454µs750ns
线程池
创建一个线程池
let pool = rayon::ThreadPoolBuilder::new().num_threads(8).build().unwrap();
使用 install 在线程池中执行逻辑操作。
fn handle() {
info!("sum start");
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(1)
.build()
.unwrap();
pool.install(|| {
let start = Instant::now();
let vec: Vec<i32> = (0..=1000).collect();
let sum: i32 = vec
.par_iter()
.map(|x| {
thread::sleep(Duration::from_millis(5));
x
})
.sum();
let elapsed_time = start.elapsed();
info!("0..=1000 sum = {},elapsed time {}", sum, elapsed_time);
});
}
这段代码,创建了有 1 个线程的线程池,尽管使用了 par_iter() 开启了并行计算,但是使用了 pool.install 方法,意味着这一段操作将在这个只有 1 个线程的线程池中运行。其结果也是 6s。
不指定线程数的话,并行计算的线程数与 CPU 核数相同。
rayon 示例代码在 github。
tokio
tokio 是一个事件驱动的非阻塞 IO 平台,用于编写异步应用。
A reactor backed by the operating system's event queue (epoll, kqueue, IOCP, etc...).
demo
#[get("/hello1")]
async fn hello1() -> HttpResponse {
info!("hello1 start");
let x = tokio::spawn(async move {
handle(1)
});
let x = tokio::spawn(async move {
handle(11)
});
let x = tokio::spawn({
handle_async(2)
});
let x = tokio::spawn({
handle_async(22)
});
let x = tokio::task::spawn_blocking(||{
handle(3);
});
let x = tokio::task::spawn_blocking(||{
handle(33);
});
let x = tokio::task::spawn_blocking(||{
handle_async(4)
});
let x = tokio::task::spawn_blocking(||{
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(handle_async(44))
});
info!("hello1 end");
success(Option::from(format!("Hello1")))
}
async fn handle_async(x: i32) {
info!("handle start x = {} ...", x);
thread::sleep(Duration::from_secs(3));
}
fn handle(x: i32) {
info!("handle start x = {} ...", x);
thread::sleep(Duration::from_secs(3));
}
2023-11-24 11:30:47.469 INFO ThreadId(25) rust_web: 22: hello1 start
2023-11-24 11:30:47.469 INFO ThreadId(25) rust_web: 58: hello1 end
Request to /hello1 took 295.333µs
2023-11-24 11:30:47.469 INFO ThreadId(36) rust_web: 68: handle start x = 3 ...
2023-11-24 11:30:47.469 INFO ThreadId(37) rust_web: 68: handle start x = 33 ...
2023-11-24 11:30:47.469 INFO ThreadId(25) rust_web: 68: handle start x = 1 ...
2023-11-24 11:30:47.469 INFO ThreadId(39) rust_web: 63: handle start x = 44 ...
2023-11-24 11:30:50.474 INFO ThreadId(25) rust_web: 68: handle start x = 11 ...
2023-11-24 11:30:53.480 INFO ThreadId(25) rust_web: 63: handle start x = 2 ...
2023-11-24 11:30:56.485 INFO ThreadId(25) rust_web: 63: handle start x = 22 ...
日志输出这样的,说明了什么?
- tokio::spawn 接收的是一个异步方法,如果不是异步方法,需要使用 async move 将其变为异步执行
- Rust 中 async 方法若要同步调用需要使用
.await,而 tokio::spawn 中不用 await,会自动调用 - tokio::spawn 或者 spawn_blocking 都不会阻塞当前线程
- tokio::spawn 会以当前线程执行,多个 spawn 会排队依次执行,而 spawn_blocking 会在新线程中执行,每个 spawn_blocking 都会有一个新线程
- spawn_blocking 中接收闭包,可以执行同步方法,异步方法不会执行,若要执行异步方法,需要 runtime block_on
小结
rayon 适用于计算密集型,CPU 密集型,
tokio 适用于 IO 密集型
tokio::spawn 并不是创建一个新的线程,而是创建一个异步任务(Future),将其交给 tokio runtime 执行,以当前线程去执行。
spawn_blocking 会创建一个新的线程执行。
tokio 文档中并没有介绍线程池的部分。
若是在子线程中使用 tokio 需要先获取到 runtime。
