写在前面:
什么是同步函数与异步函数?
- 同步函数:当一个函数是同步执行时,那么当该函数被调用时不会立即返回,直到该函数所要做的事情全都做完了才返回。
- 异步函数:如果一个异步函数被调用时,该函数会立即返回尽管该函数规定的操作任务还没有完成。
具体来说,当一个线程调用同步函数时(例如读文件),如果函数没有完成操作,则此线程会被挂起,直到该函数完成了规定的操作,这个线程才会被重新调度。
相反,当一个线程调用异步函数时,该函数会立即返回,即使当前的任务并没有完成,这样线程就会执行此异步函数之后的下一条语句。异步函数没做完的工作会通过另一个线程完成(可能是异步函数中新创建的,也可能是系统中已经准备好的)
异步函数执行完毕后,如何通知线程?
调用线程需要使用等待函数来确定异步函数何时完成了任务。因此在线程调用异步函数之后立即调用一个“等待函数”挂起调用线程,一直等到异步函数执行完其所有的操作之后,再执行线程中的下一条指令。
异步调用与多线程的关系?
操作系统把 CPU 处理时间划分成许多短暂时间片,在时间 T1 执行一个线程的指令,到时间 T2 又执行下一线程的指令,各线程轮流执行,结果好象是所有线程在并肩前进。这样,编程时可以创建多个线程,在同一期间执行,各线程可以“并行”完成不同的任务。
有了多线程的支持,可以采用异步调用,调用方和被调方可以属于两个不同的线程,调用方启动被调方线程后,不等对方返回结果就继续执行后续代码。被调方执行完毕后,通过某种手段通知调用方:结果已经出来,请酌情处理。
Item 35:Prefer task-based programming to thread based.
通常来说要执行异步函数有两种选择:
创建一个thread
int doAsyncWork(); std::thread t(doAsyncWork);
创建一个task
auto fut = std::async(doAsyncWork); // "fut" for "future"
task通常要比thread好,原因如下:
- 基于task的代码往往更少。
- 基于task更容易得到函数的返回值:调用future的get方法。
- future的get方法还能拿到函数抛出的异常,而thread中如果函数抛了异常,进程就挂掉了。
- 易于实现调度均衡
通常来说,task好处的前三点比较好理解,第四点怎么理解呢?
首先我们要搞清楚它们之间更本质的差别在于,基于task的方法有着更高的抽象层次,而无需关心底层的线程管理。下面是C++中”线程”的三种不同层次的概念:
- 硬件线程:真正的运算线程,目前每个CPU核可以提供一个或多个线程。比如,因特尔i7-8550U就是4核8线程架构。
- 软件线程(OS线程):OS提供的线程,OS会负责管理和调度这些线程。通常OS线程可以远多于硬件线程。
std::thread
:C++标准库提供的线程类,底层对应一个OS线程。这些情况下std::thread
没有对应的OS线程:刚刚构造好时;已经调用过join
;已经调用过detach
。
创建过多的OS线程会导致系统过载,大量资源消耗在线程调度和切换上。避免系统过载是一件困难的事情,我们很难知道OS线程和硬件线程之间的合适比例。如果我们基于task来开发,把这些问题丢给task,丢给C++标准库,这样就能更好解决问题。
当然,std::async
没办法解决GUI线程的问题,因为调度器不知道你的哪个线程对响应时间的要求最低。此时你可以指定std::launch::async
来确保你的函数运行在另一个线程中。
Item 36:Specify std::launch::async if asynchronicity is essential.
std::async
有三种模式:
std::launch::async
:f
必须异步执行,比如在另一个线程。写为std::async(std::launch::async,f)
std::launch::deferred
:f
只在对应的future的get
或wait
被调用时才执行,且是同步执行。如果没有人调用对应的get
或wait
。std::launch::async|std::launch::defered
: 也可以写为std::async(f)
。这是默认策略,有调度器自动决定是异步还是同步。
默认策略的问题在于:
- 无法预测
f
是否与当前线程并发执行,因为调度器有可能选择std::launch::deferred
。 - 无法预测
f
是否在调用get
或wait
的另一个线程执行。 - 可能无法预测
f
是否会执行。
下面的代码可能会出现问题:
using namespace std::literals;
void f() {
std::this_thread::sleep_for(1s);
}
auto fut = std::async(f);
while (fut.wait_for(100ms) != std::future_status::ready) {
...
}
如果f在另一个线程运行,则没有问题。如果f是deferred
状态,则fut.wait_for
就会一直返回std::future_status::deferred
,导致循环永不结束。因为我们无法掌握调度器的状况,导致它有可能把它定为lauch
或者deferred
。
为了解决这个问题,最好检查future是不是deferred,如果是,就不进循环。但我们没办法直接询问future是不是deferred,需要用wait_for
来绕一下:
auto fut = std::async(f);
if (fut.wait_for(0s) == std::future_status::deferred) {
...
} else {
while (fut.wait_for(100ms) != std::future_status::ready) {
...
}
...
}
上述场景的要点在于,当满足以下条件时,使用std::async
的默认策略才是好的:
- task不需要与调用
get
或wait
的线程并发执行。 - 无所谓访问哪个局部线程变量(TLS)。
- 要么能确保有人会调用future的
get
或wait
,要么f
执不执行都可以。 - 调用了
wait_for
或wait_until
的代码要保证能处理deferred。
如果没办法保证以上几点,你需要确保你的task运行在另一个线程中,就指定std::launch::async
:
auto fut = std::async(std::launch::async, f);
Item 37:Make std::threads unjoin able on all paths
每个std::thread
对象都处于两种状态下:可join、不可join。可join的std::thread
对应一个可运行或运行中的底层线程,例如被阻塞、未调度或已运行完成的线程都是可join的。我们可以通过线程的成员函数joinable()
来判断。这些情况下都是不可以join的:
- 默认构造状态的
std::thread
:不对应底层线程。 - 被移动过的
std::thread
:底层线程现在由其它std::thread
管理。 - 已调用过
join
的std::thread
:底层线程已结束。 - 已调用过
detach
的std::thread
:detach
会切断std::thread
和底层线程的联系。
下面给一个例子:
constexpr auto tenMillion = 10'000'000;
bool doWork(std::function<bool(int)> filter, int maxVal = tenMillion) {
std::vector<int> goodVals;
std::thread t([&filter, maxVal, &goodVals] {
for (auto i = 0; i <= maxVal; ++i) {
if (filter(i)) {
goodVals.push_back(i);
}
}
});
auto nh = t.native_handle(); // use t's native handle to set t's priority
...
if (conditionsAreSatisfied()) {
t.join();
performComputation(goodVals); // computation was performed
return true;
}
return false; // computation was not performed
}
这段代码我们用一个过滤器过滤一下容器,然后送入performComputation
计算,由于我们需要取得线程的底层句柄来设置优先级,因此我们用thread而不是async。
上面这段代码,如果最后走到了false
分支,或中间抛了异常,就会遇到构造了一个可join的std::thread
的问题,程序就会终止。可以改进的一点是在开始设置t
为暂停状态。
我们可以通过包装一个RAII类来实现所有情况下的std::thread
都不可join。
class ThreadRAII {
public:
enum class DtorAction {join, detach};
ThreadRAII(std::thread&& t, DtorAction a)
: action(a), t(std::move(t)) {}
~ThreadRAII() {
if (t.joinable()) {
if (action == DtorAction::join) {
t.join();
} else {
t.detach();
}
}
}
std::thread& get() {return t;}
private:
DtorAction action;
std::thread t;
};
需要注意:
- 构造函数只接受
std::thread
的右值,因为std::thread
只能移动不能复制。 - 在
ThreadRAII
的析构函数中,在调用t.join()
或t.detach()
前,需要先调用t.joinable()
,因为有可能t
已经被移动过了。
应用ThreadRAII
到我们前面的代码中:
bool doWork(std::function<bool(int)> filter, int maxVal = tenMillion) {
std::vector<int> goodVals;
ThreadRAII t(
std::thread([&filter, maxVal, &goodVals] {
for (auto i = 0; i <= maxVals; ++i) {
if (filter(i)) {
goodVals.push_back(i);
}
}
}),
ThreadRAII::DtorAction::join
);
auto nh = t.get().native_handle();
...
if (conditionsAreSatisfied()) {
t.get().join();
performComputation(goodVals);
return true;
}
return false;
}