本章着重讲述如何处理线程间的共享数据,内容包括:mutex
, lock
, unique_lock
, call_once
,shared_lock
。
1. 使用互斥量保护共享数据
1.1 以RAII的方式使用互斥量
在 C++ 中,标准库提供的互斥量是 std::mutex
,它被定义在 mutex
这个头文件中。互斥量是锁的一种,它也是一种资源,必须保证资源被正确释放(正确使用互斥量的条件之一)。这就像内存需要delete一样,互斥量在lock()
以后必须unlock()
解除。
实际上并不推荐这么做,C++标准库为互斥量提供了一个RAII语法的模板类std::lock_guard
,在构造时就能提供已锁的互斥量,并在析构的时候进行解锁,从而保证了一个已锁互斥量能被正确解锁。
RAII要求,资源的有效期与持有资源的对象的生命期严格绑定,即由对象的构造函数完成资源的分配(获取),同时由析构函数完成资源的释放。在这种要求下,只要对象能正确地析构,就不会出现资源泄露问题。
std::list<int> some_list; // 1
std::mutex some_mutex; // 2
void add_to_list(int new_value)
{
std::lock_guard<std::mutex> guard(some_mutex); // 3
some_list.push_back(new_value);
}
bool list_contains(int value_to_find)
{
std::lock_guard<std::mutex> guard(some_mutex); // 4
return std::find(some_list.begin(),some_list.end(),value_to_find) != some_list.end();
}
有一个全局变量①,这个全局变量被一个全局的互斥量保护②。add_to_list()
③和list_contains()
④函数中使用std::lock_guard
,使得这两个函数中对数据的访问是互斥的:list_contains()
不可能看到正在被add_to_list()
修改的列表
简单理解:被声明guard的地方在析构前都是被保护的。
在C++17中新添加了一个特性,称之为模板类参数推导,这样类似std::locak_guard
这样简单的模板类型的模板参数列表可以省略。③和④的代码可以简化成:
std::lock_guard guard(some_mutex);
1.2 限制被保护数据的使用范围
这一节讨论正确使用互斥锁的一个重要前提:必须限制被保护数据的使用范围。简单来说,就是不要将被保护数据的指针或引用通过返回值、函数参数的方式,传到无法控制的范围内。
举一个例子:
class some_data
{
int a;
std::string b;
public:
void do_something();
};
class data_wrapper
{
private:
some_data data;
std::mutex m;
public:
template<typename Function>
void process_data(Function func)
{
std::lock_guard<std::mutex> l(m);
func(data); // 1 传递“保护”数据给用户函数
}
};
some_data* unprotected;
void malicious_function(some_data& protected_data)
{
unprotected=&protected_data;
}
data_wrapper x;
void foo()
{
x.process_data(malicious_function); // 2 传递一个恶意函数
unprotected->do_something(); // 3 在无保护的情况下访问保护数据
}
例子中process_data
看起来没有任何问题,std::lock_guard
对数据做了很好的保护,但调用用户提供的函数func
①,就意味着foo能够绕过保护机制将函数malicious_function
传递进去②,在没有锁定互斥量的情况下调用do_something()
。
这段代码的问题在于根本没有保护,只是将所有可访问的数据结构代码标记为互斥。函数foo()
中调用unprotected->do_something()
的代码未能被标记为互斥。
所以,不能将被保护数据的指针或引用以函数返回值的形式,返回或传递给外部不可控的调用者。
1.3 定位接口间的条件竞争
以std::stack
为例,讲解接口设计缺陷导致的条件竞争。
if (! s.empty()){ // 1
int const value = s.top(); // 2
s.pop(); // 3
do_something(value);
}
上面的代码是一套很常规的操作,存在两个竞争:
有可能线程A判断不为空后,放心的去执行2,但在步骤1和2之间,线程B插了进来弹出了最后一个元素,此时线程A执行步骤2就会发生异常。即使我们使用了互斥量也不能保证,这就是接口本身的问题。解决问题的最简单办法就是在if (! s.empty())
之后,再加一个try/catch
抛出异常,但这样empty
就成了摆设。
(2)调用top()
和pop()
之间
上图是可能的操作顺序,很有可能两个线程都取了相同的值,这种条件竞争,然其结果依赖于do_something()
的结果,但因为看起来没有任何错误,就会让这个Bug很难定位。
那么为什么STL的设计者还要这样设计接口呢?如果我们auto val = stk.pop()
既完成取栈顶的工作,又执行弹出的操作,就会发生一个问题:传值的办法是通过拷贝,如果这里的val
是一个vector
类似的容器,它的拷贝需要时间,这时如果出现了异常抛出的情况,原来的值就被丢掉了——它既不存在于栈,也不存在于其他变量。
下面是解决的办法,也是一段非常经典的代码:
#include <exception>
#include <memory>
#include <mutex>
#include <stack>
struct empty_stack: std::exception
{
const char* what() const throw() {
return "empty stack!";
};
};
template<typename T>
class threadsafe_stack
{
private:
std::stack<T> data;
mutable std::mutex m;
public:
threadsafe_stack()
: data(std::stack<T>()){}
threadsafe_stack(const threadsafe_stack& other)
{
std::lock_guard<std::mutex> lock(other.m);
data = other.data; // 1 在构造函数体中的执行拷贝
}
threadsafe_stack& operator=(const threadsafe_stack&) = delete;
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(new_value);
}
std::shared_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(m);
if(data.empty()) throw empty_stack(); // 在调用pop前,检查栈是否为空
std::shared_ptr<T> const res(std::make_shared<T>(data.top())); // 在修改堆栈前,分配出返回值
data.pop();
return res;
}
void pop(T& value)
{
std::lock_guard<std::mutex> lock(m);
if(data.empty()) throw empty_stack();
value=data.top();
data.pop();
}
bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};
在这段程序中,我们重载了两个pop版本,一个是原始版本(返回void),另一个是pop和top结合版本(返回值)。为了避免之前提到的数据丢失问题,我们采用返回指针的办法。
2. 避免死锁
通俗来说,假如有两个线程分别有互斥量AB,两个线程都在等待对方解锁,这样两个锁AB就会形成死锁。死锁往往是由于不同线程之间不当交互所导致的。
从原理上来说,避免死锁的办法是两个互斥量总以相同的顺序上锁解锁:先锁住A,锁住B,先解锁A再解锁B。如果反过来,锁住AB,以BA的形式解锁,就会导致死锁。
void a()
{
mutex1.lock();
mutex2.lock();
doSomething();
mutex2.unlock();
mutex1.unlock();
}
void b()
{
mutex2.lock();
mutex1.lock();
doSomething();
mutex1.unlock();
mutex2.unlock();
}
上面的代码中假设函数a先锁住了1,b同时锁住了2,这样a要去锁2时需要等待2解锁,b要去锁1时要等待1解锁,这样就凉了。
然而即使我们按顺序上锁,在一些交换操作中,交换了两个锁保护的两个实例,这下就又会发生死锁。因此我们研究几个有效避免死锁的办法:
2.1 std::lock
函数
std::lock
——可以一次性锁住多个(两个以上)的互斥量,并且没有副作用(死锁风险),下面给出了一个很好的例子:
class some_big_object;
void swap(some_big_object& lhs,some_big_object& rhs);
class X
{
private:
some_big_object some_detail;
std::mutex m;
public:
X(some_big_object const& sd):some_detail(sd){}
friend void swap(X& lhs, X& rhs)
{
if(&lhs==&rhs) //一定要引用,判断地址是否相同而不是值
return;
std::lock(lhs.m,rhs.m); // 1
std::lock_guard<std::mutex> lock_a(lhs.m,std::adopt_lock); // 2
std::lock_guard<std::mutex> lock_b(rhs.m,std::adopt_lock); // 3
swap(lhs.some_detail,rhs.some_detail);
}
};
着重看一下这个友元函数。首先需要判断两个对象是否相同,避免被同时上锁两次。然后,调用std::lock()
①锁住两个互斥量,并且两个std:lock_guard
实例已经创建好②③。提供std::adopt_lock
参数除了表示std::lock_guard
对象可获取锁之外,还将锁交由std::lock_guard
对象管理,而不需要std::lock_guard
对象再去构建新的锁。在退出时,互斥量能被正确解锁。
2.2 std::scoped_lock
函数
这是C++17中的新函数,一种新的RAII类型模板类型,与std::lock_guard<>
的功能等价,这个新类型能接受不定数量的互斥量类型作为模板参数,以及相应的互斥量(数量和类型)作为构造参数。互斥量支持构造即上锁,与std::lock
的用法相同,其解锁阶段是在析构中进行。(注意请确保你的编译器支持17,如果是使用visual studio,需要手动设置语言版本为17)
void swap(X& lhs, X& rhs)
{
if(&lhs==&rhs)
return;
std::scoped_lock guard(lhs.m,rhs.m); // 1
swap(lhs.some_detail,rhs.some_detail);
}
这里使用了C++17的另一个特性:自动推导模板参数。C++17可以通过隐式参数模板类型推导机制, 通过传递的对形象类型来构造实例1。其等价于:
std::scoped_lock<std::mutex,std::mutex> guard(lhs.m,rhs.m);
3. std::unique_lock
3.1 独占锁的基本用法
std::unique_lock
不是一种锁而是一种锁的超集,是在11中引入的锁工具的复合体。
std::unique_lock
的构造函数,支持三种加锁模式:
unique_lock( mutex_type& m, std::defer_lock_t t ); //延迟加锁
unique_lock( mutex_type& m, std::try_to_lock_t t ); //尝试加锁
unique_lock( mutex_type& m, std::adopt_lock_t t ); //马上加锁
有丰富的操作函数:
lock() //阻塞等待加锁
try_lock() // 非阻塞等待加锁
try_lock_for() //在一段时间内尝试加锁
try_lock_until() //在某个时间点之前尝试加锁
unlock() //临时解锁
std::lock_guard
只有在析构时才会解锁,它自己本身没有加锁解锁的接口。而std::unique_lock
可以在任意时间临时加锁解锁,在析构时也会自动解锁。下面举一个例子:
使用std::lock_guard
我们需要生成两个锁来保护,当然可以用一个锁同时保护123,但这样锁的粒度太大,效率不行。
void shared_print(string msg, int id) {
{
std::lock_guard<std::mutex> guard(_mu);
//do something 1
}
//do something 2
{
std::lock_guard<std::mutex> guard(_mu);
// do something 3
}
}
当我们改为独占锁,可以通过临时上锁解锁,实现精细化的操作。
void shared_print(string msg, int id) {
std::unique_lock<std::mutex> guard(_mu);
//do something 1
guard.unlock(); //临时解锁
//do something 2
guard.lock(); //继续上锁
// do something 3
}
我们甚至可以使用std::defer_lock
设置初始化的时候不进行默认的上锁操作:
void shared_print(string msg, int id) {
std::unique_lock<std::mutex> guard(_mu, std::defer_lock);
//do something 1
guard.lock();
// do something protected
guard.unlock(); //临时解锁
//do something 2
guard.lock(); //继续上锁
// do something 3
}
此外,独占锁和unique_ptr
一样不能复制,但可以移动!而lock_guard
两个都不行:
// unique_lock 可以移动,不能复制
std::unique_lock<std::mutex> guard1(_mu);
std::unique_lock<std::mutex> guard2 = guard1; // error
std::unique_lock<std::mutex> guard2 = std::move(guard1); // ok
下面的示例展示了所有权的传递应用:
std::unique_lock<std::mutex> get_lock()
{
extern std::mutex some_mutex;
std::unique_lock<std::mutex> lk(some_mutex);
prepare_data();
return lk; // 1
}
void process_data()
{
std::unique_lock<std::mutex> lk(get_lock()); // 2
do_something();
}
构造时,拷贝构造函数是delete
,调用移动构造函数,所以不需要std::move
。
4. 保护共享数据的其他方式
4.1 保护共享数据的初始化过程
延迟初始化(Lazy initialization)在单线程代码很常见——每一个操作都需要先对源进行检查,为了了解数据是否被初始化,然后在其使用前决定,数据是否需要初始化:
std::shared_ptr<some_resource> resource_ptr;
void foo()
{
if(!resource_ptr)
{
resource_ptr.reset(new some_resource); // 1
}
resource_ptr->do_something();
}
转为多线程代码时,只有①处需要保护,这样共享数据对于并发访问就是安全的,我们尝试使用锁来进行保护时会出现一些问题:由于每次我们调用foo()
都会检查是否初始化,这样每次都会创建锁,非常影响代码的正常运行。
std::shared_ptr<some_resource> resource_ptr;
std::mutex resource_mutex;
void foo()
{
std::unique_lock<std::mutex> lk(resource_mutex); // 所有线程在此序列化
if(!resource_ptr)
{
resource_ptr.reset(new some_resource); // 只有初始化过程需要保护
}
lk.unlock();
resource_ptr->do_something();
}
于是有人针对这种情况提出了双重检查锁模式。由于条件判断1并没有与被锁保护的3同步,这样又会引入新条件的竞争:线程A进入了步骤3,由于C++在new一个对象时,首先会把指针指向分配的那块空间,然后在初始化该空间。此时线程B兴奋得知,resource_ptr
不为空了,赶紧执行4,然而事实却是:指针只是指向了某个地方,指向的对象并没有完全初始化完成!
void undefined_behaviour_with_double_checked_locking()
{
if(!resource_ptr) // 1
{
std::lock_guard<std::mutex> lk(resource_mutex);
if(!resource_ptr) // 2
{
resource_ptr.reset(new some_resource); // 3
}
}
resource_ptr->do_something(); // 4
}
为了避免这些情况,C++标准委员会建议我们使用std::call_once
函数。这个函数能保证我们在多线程中,某个函数只会被调用一次。
std::once_flag flag1;
void simple_do_once()
{
std::call_once(flag1, []() { std::cout << "Simple example: called once\n"; });
}
int main()
{
std::thread st1(simple_do_once);
std::thread st2(simple_do_once);
st1.join();
st2.join();
}
下面展示了使用std::call_once
作为类成员的延迟初始化(线程安全)
class X
{
private:
string connection_handle;
std::once_flag connection_init_flag;
void open_connection()
{
connection_handle = "4396";
}
public:
void send_data()
{
for (int i = 0; i < 10; i++)
{
std::call_once(connection_init_flag, &X::open_connection, this);
cout << "Connectiong handle is " << connection_handle << " send data: " <<i<< endl;
}
}
void receive_data()
{
for (int i = 0; i < 10; i++)
{
std::call_once(connection_init_flag, &X::open_connection, this);
cout << "Connectiong handle is " << connection_handle << " recieve data: " << i << endl;
}
}
};
4.2 保护不常更新的数据结构
有的数据比如DNS,我们很少去改改动他,平常更多的是去读取访问,但偶尔也会对他进行修改。因此我们可以使用读写锁std::shared_lock
去管理。所谓「读写锁」,就是同时可以被多个读者拥有,但是只能被一个写者拥有的锁。而所谓「多个读者、单个写者」,并非指程序中只有一个写者(线程),而是说不能有多个写者同时去写。
std::shared_lock
往往和shared_mutex
搭配使用,下面给一个例子:
class ThreadSafeCounter {
public:
ThreadSafeCounter() = default;
// 多个线程/读者能同时读计数器的值。
unsigned int get() const {
std::shared_lock<std::shared_mutex> lock(mutex_);
return value_;
}
// 只有一个线程/写者能增加/写线程的值。
void increment() {
std::unique_lock<std::shared_mutex> lock(mutex_);
value_++;
}
// 只有一个线程/写者能重置/写线程的值。
void reset() {
std::unique_lock<std::shared_mutex> lock(mutex_);
value_ = 0;
}
private:
mutable std::shared_mutex mutex_;
unsigned int value_ = 0;
};