介绍并发的注意事项。

写在前面:

什么是同步函数与异步函数?

  • 同步函数:当一个函数是同步执行时,那么当该函数被调用时不会立即返回,直到该函数所要做的事情全都做完了才返回。
  • 异步函数:如果一个异步函数被调用时,该函数会立即返回尽管该函数规定的操作任务还没有完成。

具体来说,当一个线程调用同步函数时(例如读文件),如果函数没有完成操作,则此线程会被挂起,直到该函数完成了规定的操作,这个线程才会被重新调度。

相反,当一个线程调用异步函数时,该函数会立即返回,即使当前的任务并没有完成,这样线程就会执行此异步函数之后的下一条语句异步函数没做完的工作会通过另一个线程完成(可能是异步函数中新创建的,也可能是系统中已经准备好的)

异步函数执行完毕后,如何通知线程?

调用线程需要使用等待函数来确定异步函数何时完成了任务。因此在线程调用异步函数之后立即调用一个“等待函数”挂起调用线程,一直等到异步函数执行完其所有的操作之后,再执行线程中的下一条指令。

异步调用与多线程的关系?

操作系统把 CPU 处理时间划分成许多短暂时间片,在时间 T1 执行一个线程的指令,到时间 T2 又执行下一线程的指令,各线程轮流执行,结果好象是所有线程在并肩前进。这样,编程时可以创建多个线程,在同一期间执行,各线程可以“并行”完成不同的任务。

有了多线程的支持,可以采用异步调用,调用方和被调方可以属于两个不同的线程,调用方启动被调方线程后,不等对方返回结果就继续执行后续代码。被调方执行完毕后,通过某种手段通知调用方:结果已经出来,请酌情处理。

Item 35:Prefer task-based programming to thread based.

通常来说要执行异步函数有两种选择:

  • 创建一个thread

    int doAsyncWork();
    std::thread t(doAsyncWork);
    
  • 创建一个task

    auto fut = std::async(doAsyncWork); // "fut" for "future"
    

task通常要比thread好,原因如下:

  1. 基于task的代码往往更少。
  2. 基于task更容易得到函数的返回值:调用future的get方法。
  3. future的get方法还能拿到函数抛出的异常,而thread中如果函数抛了异常,进程就挂掉了。
  4. 易于实现调度均衡

通常来说,task好处的前三点比较好理解,第四点怎么理解呢?

首先我们要搞清楚它们之间更本质的差别在于,基于task的方法有着更高的抽象层次,而无需关心底层的线程管理。下面是C++中”线程”的三种不同层次的概念:

  • 硬件线程:真正的运算线程,目前每个CPU核可以提供一个或多个线程。比如,因特尔i7-8550U就是4核8线程架构。
  • 软件线程(OS线程):OS提供的线程,OS会负责管理和调度这些线程。通常OS线程可以远多于硬件线程。
  • std::thread:C++标准库提供的线程类,底层对应一个OS线程。这些情况下std::thread没有对应的OS线程:刚刚构造好时;已经调用过join;已经调用过detach

创建过多的OS线程会导致系统过载,大量资源消耗在线程调度和切换上。避免系统过载是一件困难的事情,我们很难知道OS线程和硬件线程之间的合适比例。如果我们基于task来开发,把这些问题丢给task,丢给C++标准库,这样就能更好解决问题。

当然,std::async没办法解决GUI线程的问题,因为调度器不知道你的哪个线程对响应时间的要求最低。此时你可以指定std::launch::async来确保你的函数运行在另一个线程中

Item 36:Specify std::launch::async if asynchronicity is essential.

std::async有三种模式:

  • std::launch::asyncf必须异步执行,比如在另一个线程。写为std::async(std::launch::async,f)
  • std::launch::deferredf只在对应的future的getwait被调用时才执行,且是同步执行。如果没有人调用对应的getwait
  • std::launch::async|std::launch::defered: 也可以写为std::async(f)。这是默认策略,有调度器自动决定是异步还是同步。

默认策略的问题在于:

  • 无法预测f是否与当前线程并发执行,因为调度器有可能选择std::launch::deferred
  • 无法预测f是否在调用getwait的另一个线程执行。
  • 可能无法预测f是否会执行。

下面的代码可能会出现问题:

using namespace std::literals;

void f() {
    std::this_thread::sleep_for(1s);
}

auto fut = std::async(f);
while (fut.wait_for(100ms) != std::future_status::ready) {
    ...
}

如果f在另一个线程运行,则没有问题。如果f是deferred状态,则fut.wait_for就会一直返回std::future_status::deferred,导致循环永不结束。因为我们无法掌握调度器的状况,导致它有可能把它定为lauch或者deferred

为了解决这个问题,最好检查future是不是deferred,如果是,就不进循环。但我们没办法直接询问future是不是deferred,需要用wait_for来绕一下:

auto fut = std::async(f);
if (fut.wait_for(0s) == std::future_status::deferred) {
    ...
} else {
    while (fut.wait_for(100ms) != std::future_status::ready) {
        ...
    }
    ...
}

上述场景的要点在于,当满足以下条件时,使用std::async的默认策略才是好的:

  • task不需要与调用getwait的线程并发执行。
  • 无所谓访问哪个局部线程变量(TLS)。
  • 要么能确保有人会调用future的getwait,要么f执不执行都可以。
  • 调用了wait_forwait_until的代码要保证能处理deferred。

如果没办法保证以上几点,你需要确保你的task运行在另一个线程中,就指定std::launch::async

auto fut = std::async(std::launch::async, f);

Item 37:Make std::threads unjoin able on all paths

每个std::thread对象都处于两种状态下:可join、不可join。可join的std::thread对应一个可运行或运行中的底层线程,例如被阻塞、未调度或已运行完成的线程都是可join的。我们可以通过线程的成员函数joinable()来判断。这些情况下都是不可以join的:

  • 默认构造状态的std::thread:不对应底层线程。
  • 被移动过的std::thread:底层线程现在由其它std::thread管理。
  • 已调用过joinstd::thread:底层线程已结束。
  • 已调用过detachstd::threaddetach会切断std::thread和底层线程的联系。

下面给一个例子:

constexpr auto tenMillion = 10'000'000;
bool doWork(std::function<bool(int)> filter, int maxVal = tenMillion) {
    std::vector<int> goodVals;
    std::thread t([&filter, maxVal, &goodVals] {
        for (auto i = 0; i <= maxVal; ++i) {
            if (filter(i)) {
                goodVals.push_back(i);
            }
        }
    });
    auto nh = t.native_handle();              // use t's native handle to set t's priority
    ...
        if (conditionsAreSatisfied()) {
            t.join();
            performComputation(goodVals);         // computation was performed
            return true;
        }
    return false;                             // computation was not performed
}

这段代码我们用一个过滤器过滤一下容器,然后送入performComputation计算,由于我们需要取得线程的底层句柄来设置优先级,因此我们用thread而不是async。

上面这段代码,如果最后走到了false分支,或中间抛了异常,就会遇到构造了一个可join的std::thread的问题,程序就会终止。可以改进的一点是在开始设置t为暂停状态

我们可以通过包装一个RAII类来实现所有情况下的std::thread都不可join。

class ThreadRAII {
public:
    enum class DtorAction {join, detach};

    ThreadRAII(std::thread&& t, DtorAction a)
        : action(a), t(std::move(t)) {}

    ~ThreadRAII() {
        if (t.joinable()) {
            if (action == DtorAction::join) {
                t.join();
            } else {
                t.detach();
            }
        }
    }

    std::thread& get() {return t;}
private:
    DtorAction action;
    std::thread t;
};

需要注意:

  • 构造函数只接受std::thread的右值,因为std::thread只能移动不能复制。
  • ThreadRAII的析构函数中,在调用t.join()t.detach()前,需要先调用t.joinable(),因为有可能t已经被移动过了。

应用ThreadRAII到我们前面的代码中:

bool doWork(std::function<bool(int)> filter, int maxVal = tenMillion) {
    std::vector<int> goodVals;
    ThreadRAII t(
        std::thread([&filter, maxVal, &goodVals] {
            for (auto i = 0; i <= maxVals; ++i) {
                if (filter(i)) {
                    goodVals.push_back(i);
                }
            }
        }),
        ThreadRAII::DtorAction::join
    );
    auto nh = t.get().native_handle();
    ...
    if (conditionsAreSatisfied()) {
        t.get().join();
        performComputation(goodVals);
        return true;
    }
    return false;
}