包含了线程创建的基本操作和注意事项,如何向线程传递参数,以及线程管理的高级操作:转移所有权和动态数量。
1. 创建与启动线程
1.1 线程与进程
线程与进程的区别,这个问题真的是老生常谈了,在开始之前最好还是复习一遍:
对于操作系统来说,一个任务就是一个进程(Process),比如打开浏览器,使用word。而一个进程可能不只干一件事(比如word既要打字又要检查拼写),这种进程内的多个子任务就是线程(Thread)。
具体来说:
- 进程是操作系统分配资源的单位,而线程是进程的一个实体,是CPU调度和分派的基本单位。
- 线程不能够独立执行,必须依存在应用程序中,由应用程序提供多个线程执行控制。
多进程与多线程:现代操作系统一般都是多进程的,他可以同时运行多个任务,一般来说一个CPU核对应一个进程,如果开启的进程比较多就需要使用时间片轮转进程调度算法。它的思想简单介绍如下:在操作系统的管理下,所有正在运行的进程轮流使用CPU,每个进程允许占用CPU的时间非常短(比如10毫秒),这样用户根本感觉不出来CPU是在轮流为多个进程服务,就好象所有的进程都在不间断地运行一样。
引入线程的好处:
- 在进程内创建、终止线程比创建、终止进程要快;
- 同一进程内的线程间切换比进程间的切换要快。
总结:
- 一个程序至少有一个进程,一个进程至少有一个线程。
- 线程的划分尺度小于进程,使得多线程程序的并发性高(多个计算同时执行)。
- 进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率。
- 虽然线程拥有单独的程序运行入口,出口,但不能独立执行。
- 从逻辑角度来看,多线程的意义在于一个应用程序中,有多个执行部分可以同时执行。但操作系统并没有将多个线程看做多个独立的应用,来实现进程的调度和管理以及资源分配。这就是进程和线程的重要区别。
1.2 启动线程
在C++11的标准库中,将创建线程和创建实例两个动作统一起来,对于人们来说来说,线程就变成了如内存、文件一样的资源,由 C++ 提供统一的接口进行管理。同时,我们也已知晓,创建线程需指定线程函数。那么,根据线程函数的不同,在 C++ 中使用 std::thread
直接创建线程,大致有三种不同的方式。
1)使用函数指针创建
void do_some_work();
std::thread wk_thread{do_some_work};
这就是最基本的方式,当然也可以显示的传入&do_some_work
当做构造参数,或者利用auto
进行构造:
void thread_function() {
for (int i = 0; i < 100; i++)
std::cout << "thread function excuting" << std::endl;
}
auto fun1 = thread_function;
std::thread wk_thread(fun1);
(2)可调用类型创建
class DisplayThread {
public:
void operator ()() {
for (int i = 0; i < 100; i++)
std::cout << "Display Thread Excecuting" << std::endl;
}
};
std::thread wk_thread{DisplayThread{}};
若是在创建线程的时候,传入的是临时构造的实例,需要注意 C++ 的语法解析规则。这种情况下,推荐使用 C++ 的列表初始化。
std::thread wk_thread(ThreadTask()); // 1
std::thread wk_thread{ThreadTask{}}; // 2
1可能会被理解为一个函数声明:参数是函数指针ThreadTask
,返回类型是thread
。所以这种情况下最好用花括号初始化。
(3)Lambda表达式
auto a = 1, b = 5;
auto dosomething = [a=a,b=b]() {cout << a+b << endl; };
std::thread wk_thread(dosomething);
1.3 线程控制
正如申请了内存,必须主动释放一样,对线程的管理也讲究有始有终。当线程启动之后,我们可以在 std::thread
实例销毁之前,显式地说明我们希望如何处理实例对应线程的结束状态。如果上述实例销毁之时,程序员尚未显式说明如何处理对应线程的结束状态,那么在上述实例的析构函数中,会调用 std::terminate()
函数,终止整个程序。
在主线程中,我们可以选择「挂起 (join)」或者「分离 (detach)」产生的子线程。具体来说,就是对 std::thread
实例调用 join()
或者 detach()
成员函数。
void do_something();
std::thread join_me{do_something};
std::thread detach_me{do_something};
if (join_me.joinable()) { // 1
join_me.join();
}
if (detach_me.joinable()) { // 1
detach_me.detach();
}
如果选择挂起子线程,则主线程会被阻塞,直到子线程退出为止。如果选择分离子线程,则主线程对子线程的控制权丢失,将控制权转交给C++库。这会导致几个问题:
- 主线程结束之后,子线程可能仍在运行(因而可以作为守护线程)
- 主线程结束伴随着资源销毁,需要保证子线程没有引用这些资源。
struct func {
size_t& i_ = 0;
func(int& i): i_(i) {} // 1
void operator()() {
for (size_t j{0}; j!= 1000000; ++j) {
do_something(i); // 2
}
}
};
void bad_reference() {
size_t working{42};
func wk_func{working};
std::thread wk_thread{wk_func};
wk_thread.detach(); // 3
return; // 4
}
在这里,我们定义了一个可调用的类。在循环内,我们不断尝试对外部传来的引用 (1) 进行一些操作 (2)。然而,在分离子线程之后 (3),子线程所依赖的外部引用,随着函数的退出而销毁 (4)。这样,子线程后续使用该引用 (2) 的行为就是未定义的了,这是非常危险的。
事实上这段程序根本不会通过编译,因为根据C++11的新特性:向thread传引用必须使用std::ref(params)
或者使用移动语义std::move(params)
,前者相当于复制了一个资源进入线程。
对于可能发生资源泄漏的情况,下面介绍两种应对策略:异常抛出和RAII。
(1)RAII策略(Resource Acquisition Is Initialization)
将资源封装在一个 handle 或者 guard 当中,从而防止资源泄漏。同时,前文也提到,线程也是一种资源。因此,我们可以考虑构造一个 ThreadGuard
来处理这种异常安全的问题。
class ThreadGuard
{
private:
std::thread& t_;
public:
explicit ThreadGuard(std::thread& t) :t_(t) {}
~ThreadGuard()
{
if (this->t_.joinable())
this->t_.join();
}
ThreadGuard(const ThreadGuard&) = delete;
ThreadGuard& operator=(const ThreadGuard&) = delete;
};
void main()
{
std::thread mythread{ myfun };
ThreadGuard g{ mythread };
}
如果不添加线程守护,则主程序结束后,线程资源并没有得到释放,会引起内存出错。加了以后,会一直等待子线程跑完,主线程才会彻底结束。
(2)try-catch策略
通过异常抛出的办法保护子线程的正常工作:
struct func; // 定义在清单2.1中
void f()
{
int some_local_state=0;
func my_func(some_local_state);
std::thread t(my_func);
try
{
do_something_in_current_thread();
}
catch(...)
{
t.join(); // 1
throw;
}
t.join(); // 2
}
try/catch
块确保访问本地状态的线程退出后,函数才结束。当函数正常退出时,会执行到②处;当函数执行过程中抛出异常,程序会执行到①处。try/catch
块能轻易的捕获轻量级错误。
2. 向线程传递参数
2.1 确保预先转换格式
void demo(int, const std::string&);
void bad_buffer(const int param) {
char buffer[2014]; // 1
sprintf(buffer, "%i", param);
std::thread wk_t(demo, 42, buffer); // 2
wk_t.detach();
return; // 3
}
传入thread
的是一个int
,一个char*
,而char*
需要隐式转化为const string&
,如果在转化的过程中发生了函数退出(3),则会产生一个未定义行为,这是非常危险的。
因此,关于线程函数传参的铁律是:必须在参数传递给线程构造之前,就转换好格式。
2.2 确保引用和右值的可行性
对于引用类型,如前面提到的必须使用std::ref()
进行转化转化为std::reference_wrapper
类型。
首先介绍一下ref
和std::reference_wrapper
。简单来说就是让按值传参的模板可以接受一个引用作为参数。如下代码所示,由于模板是按值传递,T会被推导为int
类型,所以引用无效。所以为了使得引用生效,我们必须转化为std::reference_wrapper
template<typename T>
void functest(T a){
++a;
}
int a=1;
int& b=a;
functest(a);
functest(ref(a));
ref()
返回一个reference_wrapper
对象,事实上,ref()
就是用reference wrapper
来包裹对象的一个简化写法。
下面展示了ref()
的一个用例,stl
容器提供的是value语义而不是reference语义,所以容器不支持元素为引用,而用reference_wrapper可以实现。所以vector和list能实现同步更新(不支持添加,删除操作!)
std::list<int> l1(2,1);
std::vector<std::reference_wrapper<int>> v(l1.begin(), l1.end());
for (auto& x : l1)
x = x * 2;
讲完引用,下面来讲下移动语义。由于thread支持普适引用T&&
,所以move
操作可以放心执行。
std::thread wk_t(demo, 42, std::move(params));
2.3 非静态成员函数作为线程函数
类的非静态成员函数也是函数,因而也可以作为线程函数使用。不过,相比一般的函数(包括静态成员函数),将其作为线程函数使用时,有两个特殊之处。
- 必须显式地使用函数指针,作为
std::thread
构造函数的第一个参数(换句话说,必须使用引用的形式); - 非静态成员函数的第一个参数,实际上是类实例的指针,在创建线程时,需要显式地填入这个参数。
class Foo {
public:
void bar(void);
};
void demo() {
Foo baz;
std::thread temp_t{&Foo::bar, &baz};
temp_t.join();
return;
}
此外,必须说明的是,脱离了实例的非静态成员函数是没有意义的。因此,在将非静态成员函数作为线程函数时,必须保证对应的实例可用。
3. 转移线程所有权
线程这种资源可转移但不可复制
void some_function();
void some_other_function();
std::thread t1(some_function); // 1
std::thread t2=std::move(t1); // 2
t1=std::thread(some_other_function); // 3
std::thread t3; // 4
t3=std::move(t2); // 5
t1=std::move(t3); // 6 赋值操作将使程序崩溃
上述代码中,我们先创建了一个线程t1,然后将结果移动到t2,现在t1这个变量依然存在,只是不包含任何内容。
操作3-6则尝试将一个线程赋给一个非空线程,将some_function线程的所有权转移⑥给t1。不过,t1已经有了一个关联的线程(执行some_other_function的线程),所以这里系统直接调用std::terminate()
终止程序继续运行。这样做(不抛出异常,std::terminate()
是noexcept函数,是为了保证与std::thread
的析构函数的行为一致。
std::thread
支持移动,就意味着线程的所有权可以在函数外进行转移,或者作为参数传递进入函数中。
void f(std::thread t);
void g()
{
void some_function();
f(std::thread(some_function));
std::thread t(some_function);
f(std::move(t));
}
移动操作也可能引出一些问题:当某个对象转移了线程的所有权后,它就不能对线程进行加入或分离。为了确保线程程序退出前完成,下面的代码里定义了scoped_thread
类。它的思路和我们之前写的ThreadGuard
比较相似。
class scoped_thread
{
std::thread t;
public:
explicit scoped_thread(std::thread t_): // 1
t(std::move(t_))
{
if(!t.joinable()) // 2
throw std::logic_error(“No thread”);
}
~scoped_thread()
{
t.join(); // 3
}
scoped_thread(scoped_thread const&)=delete;
scoped_thread& operator=(scoped_thread const&)=delete;
};
struct func; // 定义在清单2.1中
void f()
{
int some_local_state;
scoped_thread t(std::thread(func(some_local_state))); // 4
do_something_in_current_thread();
} // 5
与ThreadGuard
相似,不过新线程直接传递到scoped_thread
中④,而非创建一个独立变量。当主线程到达f()
函数末尾时⑤,scoped_thread
对象就会销毁,然后加入③到的构造函数①创建的线程对象中去。在清单2.3中的thread_guard
类,需要在析构中检查线程是否”可加入”。这里把检查放在了构造函数中②,并且当线程不可加入时,抛出异常。
4. 运时决定线程数量
对于那些支持移动操作的容器,我们可以将线程放入这些容器中,产生量产线程。
void do_work(unsigned id);
void f()
{
std::vector<std::thread> threads;
for (unsigned i = 0; i < 20; ++i)
{
threads.emplace_back(do_work,i); // 产生线程
}
for (auto& entry : threads) // 对每个线程调用 join()
entry.join();
}
下面给出一个很棒的实战demo,我们仔细来分析一下:
template<typename Iterator,typename T>
struct accumulate_block
{
void operator()(Iterator first,Iterator last,T& result)
{
result=std::accumulate(first,last,result);
}
};
template<typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{
unsigned long const length=std::distance(first,last);
if(!length) // 1
return init;
unsigned long const min_per_thread=25;
unsigned long const max_threads=
(length+min_per_thread-1)/min_per_thread; // 2
unsigned long const hardware_threads=
std::thread::hardware_concurrency();
unsigned long const num_threads= // 3
std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
unsigned long const block_size=length/num_threads; // 4
std::vector<T> results(num_threads);
std::vector<std::thread> threads(num_threads-1); // 5
Iterator block_start=first;
for(unsigned long i=0; i < (num_threads-1); ++i)
{
Iterator block_end=block_start;
std::advance(block_end,block_size); // 6
threads[i]=std::thread( // 7
accumulate_block<Iterator,T>(),
block_start,block_end,std::ref(results[i]));
block_start=block_end; // #8
}
accumulate_block<Iterator,T>()(
block_start,last,results[num_threads-1]); // 9
std::for_each(threads.begin(),threads.end(),
std::mem_fn(&std::thread::join)); //10
return std::accumulate(results.begin(),results.end(),init); // 11
}
std::accumulate
函数的作用是:累加容器中begin到end的所有数,再加上init。下面我们按标号一一解释:
- 此处避免输入迭代器起始位置都相同,浪费计算
min_per_thread
表示每个线程的最小计算数,如果输入长度较小的情况下,避免产生过多线程。std::thread::hardware_concurrency()
返回当前计算机核心数,如果没有顺利取得,就返回0,此时我们默认设为2核。- 每个线程分到的计算任务量大小
- 创建线程容器
block_end
迭代器指向当前块的末尾- 启动一个新线程为当前块累加结果
- 当迭代器指向当前块的末尾时,启动下一个块
- 启动所有线程后,线程会处理最终块的结果。由于不一定分配均匀,所以需要一个来收尾。
- 将所有线程挂起,
mem_fn
表示每个容器的元素都执行这个函数。 - 返回所有结果的累加和。