包含了线程创建的基本操作和注意事项,如何向线程传递参数,以及线程管理的高级操作:转移所有权和动态数量。

1. 创建与启动线程

1.1 线程与进程

线程与进程的区别,这个问题真的是老生常谈了,在开始之前最好还是复习一遍:

对于操作系统来说,一个任务就是一个进程(Process),比如打开浏览器,使用word。而一个进程可能不只干一件事(比如word既要打字又要检查拼写),这种进程内的多个子任务就是线程(Thread)。

具体来说:

  • 进程是操作系统分配资源的单位,而线程是进程的一个实体,是CPU调度和分派的基本单位。
  • 线程不能够独立执行,必须依存在应用程序中,由应用程序提供多个线程执行控制。

多进程与多线程:现代操作系统一般都是多进程的,他可以同时运行多个任务,一般来说一个CPU核对应一个进程,如果开启的进程比较多就需要使用时间片轮转进程调度算法。它的思想简单介绍如下:在操作系统的管理下,所有正在运行的进程轮流使用CPU,每个进程允许占用CPU的时间非常短(比如10毫秒),这样用户根本感觉不出来CPU是在轮流为多个进程服务,就好象所有的进程都在不间断地运行一样。

引入线程的好处:

  • 在进程内创建、终止线程比创建、终止进程要快;
  • 同一进程内的线程间切换比进程间的切换要快。

总结:

  • 一个程序至少有一个进程,一个进程至少有一个线程。
  • 线程的划分尺度小于进程,使得多线程程序的并发性高(多个计算同时执行)。
  • 进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率。
  • 虽然线程拥有单独的程序运行入口,出口,但不能独立执行。
  • 从逻辑角度来看,多线程的意义在于一个应用程序中,有多个执行部分可以同时执行。但操作系统并没有将多个线程看做多个独立的应用,来实现进程的调度和管理以及资源分配。这就是进程和线程的重要区别。

1.2 启动线程

在C++11的标准库中,将创建线程和创建实例两个动作统一起来,对于人们来说来说,线程就变成了如内存、文件一样的资源,由 C++ 提供统一的接口进行管理。同时,我们也已知晓,创建线程需指定线程函数。那么,根据线程函数的不同,在 C++ 中使用 std::thread 直接创建线程,大致有三种不同的方式。

1)使用函数指针创建

void do_some_work();
std::thread wk_thread{do_some_work};

这就是最基本的方式,当然也可以显示的传入&do_some_work当做构造参数,或者利用auto进行构造:

void thread_function() {
    for (int i = 0; i < 100; i++)
        std::cout << "thread function excuting" << std::endl;
}

auto fun1 = thread_function;
std::thread wk_thread(fun1);

(2)可调用类型创建

class DisplayThread {
public:
    void operator ()() {
        for (int i = 0; i < 100; i++)
            std::cout << "Display Thread Excecuting" << std::endl;
    }
};

std::thread wk_thread{DisplayThread{}};

若是在创建线程的时候,传入的是临时构造的实例,需要注意 C++ 的语法解析规则。这种情况下,推荐使用 C++ 的列表初始化。

std::thread wk_thread(ThreadTask());    // 1
std::thread wk_thread{ThreadTask{}};    // 2

1可能会被理解为一个函数声明:参数是函数指针ThreadTask,返回类型是thread。所以这种情况下最好用花括号初始化。

(3)Lambda表达式

auto a = 1, b = 5;
auto dosomething = [a=a,b=b]() {cout << a+b << endl; };
std::thread wk_thread(dosomething);

1.3 线程控制

正如申请了内存,必须主动释放一样,对线程的管理也讲究有始有终。当线程启动之后,我们可以在 std::thread 实例销毁之前,显式地说明我们希望如何处理实例对应线程的结束状态。如果上述实例销毁之时,程序员尚未显式说明如何处理对应线程的结束状态,那么在上述实例的析构函数中,会调用 std::terminate() 函数,终止整个程序。

在主线程中,我们可以选择「挂起 (join)」或者「分离 (detach)」产生的子线程。具体来说,就是对 std::thread 实例调用 join() 或者 detach() 成员函数。

void do_something();
std::thread join_me{do_something};
std::thread detach_me{do_something};

if (join_me.joinable()) {       // 1
    join_me.join();
}
if (detach_me.joinable()) {     // 1
    detach_me.detach();
}

如果选择挂起子线程,则主线程会被阻塞,直到子线程退出为止。如果选择分离子线程,则主线程对子线程的控制权丢失,将控制权转交给C++库。这会导致几个问题:

  • 主线程结束之后,子线程可能仍在运行(因而可以作为守护线程)
  • 主线程结束伴随着资源销毁,需要保证子线程没有引用这些资源。
struct func {
    size_t& i_ = 0;
    func(int& i): i_(i) {}      // 1
    void operator()() {
        for (size_t j{0}; j!= 1000000; ++j) {
            do_something(i);    // 2
        }
    }
};

void bad_reference() {
    size_t working{42};
    func wk_func{working};
    std::thread wk_thread{wk_func};
    wk_thread.detach();         // 3
    return;                     // 4
}

在这里,我们定义了一个可调用的类。在循环内,我们不断尝试对外部传来的引用 (1) 进行一些操作 (2)。然而,在分离子线程之后 (3),子线程所依赖的外部引用,随着函数的退出而销毁 (4)。这样,子线程后续使用该引用 (2) 的行为就是未定义的了,这是非常危险的。

事实上这段程序根本不会通过编译,因为根据C++11的新特性:向thread传引用必须使用std::ref(params)或者使用移动语义std::move(params),前者相当于复制了一个资源进入线程。

对于可能发生资源泄漏的情况,下面介绍两种应对策略:异常抛出和RAII

(1)RAII策略(Resource Acquisition Is Initialization)

将资源封装在一个 handle 或者 guard 当中,从而防止资源泄漏。同时,前文也提到,线程也是一种资源。因此,我们可以考虑构造一个 ThreadGuard 来处理这种异常安全的问题。

class ThreadGuard
{
private:
    std::thread& t_;
public:
    explicit ThreadGuard(std::thread& t) :t_(t) {}
    ~ThreadGuard()
    {
        if (this->t_.joinable())
            this->t_.join();
    }
    ThreadGuard(const ThreadGuard&) = delete;
    ThreadGuard& operator=(const ThreadGuard&) = delete;
};

void main()
{
    std::thread mythread{ myfun };
    ThreadGuard g{ mythread };
}

如果不添加线程守护,则主程序结束后,线程资源并没有得到释放,会引起内存出错。加了以后,会一直等待子线程跑完,主线程才会彻底结束。

(2)try-catch策略

通过异常抛出的办法保护子线程的正常工作:

struct func; // 定义在清单2.1中
void f()
{
  int some_local_state=0;
  func my_func(some_local_state);
  std::thread t(my_func);
  try
  {
    do_something_in_current_thread();
  }
  catch(...)
  {
    t.join();  // 1
    throw;
  }
  t.join();  // 2
}

try/catch块确保访问本地状态的线程退出后,函数才结束。当函数正常退出时,会执行到②处;当函数执行过程中抛出异常,程序会执行到①处。try/catch块能轻易的捕获轻量级错误。

2. 向线程传递参数

2.1 确保预先转换格式

void demo(int, const std::string&);

void bad_buffer(const int param) {
    char buffer[2014];                  // 1
    sprintf(buffer, "%i", param);
    std::thread wk_t(demo, 42, buffer); // 2
    wk_t.detach();
    return;                             // 3
}

传入thread的是一个int,一个char*,而char*需要隐式转化为const string&,如果在转化的过程中发生了函数退出(3),则会产生一个未定义行为,这是非常危险的。

因此,关于线程函数传参的铁律是:必须在参数传递给线程构造之前,就转换好格式

2.2 确保引用和右值的可行性

对于引用类型,如前面提到的必须使用std::ref()进行转化转化为std::reference_wrapper类型。

首先介绍一下refstd::reference_wrapper。简单来说就是让按值传参的模板可以接受一个引用作为参数。如下代码所示,由于模板是按值传递,T会被推导为int类型,所以引用无效。所以为了使得引用生效,我们必须转化为std::reference_wrapper

template<typename T>
void functest(T a){
    ++a;
}

int a=1;
int& b=a;
functest(a);
functest(ref(a));

ref()返回一个reference_wrapper对象,事实上,ref()就是用reference wrapper来包裹对象的一个简化写法。

下面展示了ref()的一个用例,stl容器提供的是value语义而不是reference语义,所以容器不支持元素为引用,而用reference_wrapper可以实现。所以vector和list能实现同步更新(不支持添加,删除操作!)

std::list<int> l1(2,1);
std::vector<std::reference_wrapper<int>> v(l1.begin(), l1.end());
for (auto& x : l1)
    x = x * 2;

讲完引用,下面来讲下移动语义。由于thread支持普适引用T&&,所以move操作可以放心执行。

std::thread wk_t(demo, 42, std::move(params));

2.3 非静态成员函数作为线程函数

类的非静态成员函数也是函数,因而也可以作为线程函数使用。不过,相比一般的函数(包括静态成员函数),将其作为线程函数使用时,有两个特殊之处。

  • 必须显式地使用函数指针,作为 std::thread 构造函数的第一个参数(换句话说,必须使用引用的形式);
  • 非静态成员函数的第一个参数,实际上是类实例的指针,在创建线程时,需要显式地填入这个参数。
class Foo {
 public:
    void bar(void);
};

void demo() {
    Foo baz;
    std::thread temp_t{&Foo::bar, &baz};
    temp_t.join();
    return;
}

此外,必须说明的是,脱离了实例的非静态成员函数是没有意义的。因此,在将非静态成员函数作为线程函数时,必须保证对应的实例可用。

3. 转移线程所有权

线程这种资源可转移但不可复制

void some_function();
void some_other_function();
std::thread t1(some_function);            // 1
std::thread t2=std::move(t1);            // 2

t1=std::thread(some_other_function);    // 3
std::thread t3;                            // 4
t3=std::move(t2);                        // 5
t1=std::move(t3);                        // 6 赋值操作将使程序崩溃

上述代码中,我们先创建了一个线程t1,然后将结果移动到t2,现在t1这个变量依然存在,只是不包含任何内容。

操作3-6则尝试将一个线程赋给一个非空线程,将some_function线程的所有权转移⑥给t1。不过,t1已经有了一个关联的线程(执行some_other_function的线程),所以这里系统直接调用std::terminate()终止程序继续运行。这样做(不抛出异常std::terminate()是noexcept函数,是为了保证与std::thread的析构函数的行为一致。

std::thread支持移动,就意味着线程的所有权可以在函数外进行转移,或者作为参数传递进入函数中。

void f(std::thread t);
void g()
{
  void some_function();
  f(std::thread(some_function));
  std::thread t(some_function);
  f(std::move(t));
}

移动操作也可能引出一些问题:当某个对象转移了线程的所有权后,它就不能对线程进行加入或分离。为了确保线程程序退出前完成,下面的代码里定义了scoped_thread类。它的思路和我们之前写的ThreadGuard比较相似。

class scoped_thread
{
  std::thread t;
public:
  explicit scoped_thread(std::thread t_):                 // 1
    t(std::move(t_))
  {
    if(!t.joinable())                                     // 2
      throw std::logic_error(“No thread”);
  }
  ~scoped_thread()
  {
    t.join();                                            // 3
  }
  scoped_thread(scoped_thread const&)=delete;
  scoped_thread& operator=(scoped_thread const&)=delete;
};

struct func; // 定义在清单2.1中

void f()
{
  int some_local_state;
  scoped_thread t(std::thread(func(some_local_state)));    // 4
  do_something_in_current_thread();
}                                                        // 5

ThreadGuard相似,不过新线程直接传递到scoped_thread中④,而非创建一个独立变量。当主线程到达f()函数末尾时⑤,scoped_thread对象就会销毁,然后加入③到的构造函数①创建的线程对象中去。在清单2.3中的thread_guard类,需要在析构中检查线程是否”可加入”。这里把检查放在了构造函数中②,并且当线程不可加入时,抛出异常。

4. 运时决定线程数量

对于那些支持移动操作的容器,我们可以将线程放入这些容器中,产生量产线程

void do_work(unsigned id);

void f()
{
  std::vector<std::thread> threads;
  for (unsigned i = 0; i < 20; ++i)
  {
    threads.emplace_back(do_work,i); // 产生线程
  } 
  for (auto& entry : threads) // 对每个线程调用 join()
    entry.join();       
}

下面给出一个很棒的实战demo,我们仔细来分析一下:

template<typename Iterator,typename T>
struct accumulate_block
{
  void operator()(Iterator first,Iterator last,T& result)
  {
    result=std::accumulate(first,last,result);
  }
};

template<typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{
  unsigned long const length=std::distance(first,last);

  if(!length) // 1
    return init;

  unsigned long const min_per_thread=25;
  unsigned long const max_threads=
      (length+min_per_thread-1)/min_per_thread; // 2

  unsigned long const hardware_threads=
      std::thread::hardware_concurrency();

  unsigned long const num_threads=  // 3
      std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);

  unsigned long const block_size=length/num_threads; // 4

  std::vector<T> results(num_threads);
  std::vector<std::thread> threads(num_threads-1);  // 5

  Iterator block_start=first;
  for(unsigned long i=0; i < (num_threads-1); ++i)
  {
    Iterator block_end=block_start;
    std::advance(block_end,block_size);  // 6
    threads[i]=std::thread(     // 7
        accumulate_block<Iterator,T>(),
        block_start,block_end,std::ref(results[i]));
    block_start=block_end;  // #8
  }
  accumulate_block<Iterator,T>()(
      block_start,last,results[num_threads-1]); // 9

    std::for_each(threads.begin(),threads.end(),
        std::mem_fn(&std::thread::join)); //10

  return std::accumulate(results.begin(),results.end(),init); // 11
}

std::accumulate函数的作用是:累加容器中begin到end的所有数,再加上init。下面我们按标号一一解释:

  1. 此处避免输入迭代器起始位置都相同,浪费计算
  2. min_per_thread表示每个线程的最小计算数,如果输入长度较小的情况下,避免产生过多线程。
  3. std::thread::hardware_concurrency()返回当前计算机核心数,如果没有顺利取得,就返回0,此时我们默认设为2核。
  4. 每个线程分到的计算任务量大小
  5. 创建线程容器
  6. block_end迭代器指向当前块的末尾
  7. 启动一个新线程为当前块累加结果
  8. 当迭代器指向当前块的末尾时,启动下一个块
  9. 启动所有线程后,线程会处理最终块的结果。由于不一定分配均匀,所以需要一个来收尾。
  10. 将所有线程挂起,mem_fn表示每个容器的元素都执行这个函数。
  11. 返回所有结果的累加和。