本章着重讲述如何处理线程间的共享数据,内容包括:mutex, lock, unique_lock, call_once,shared_lock

1. 使用互斥量保护共享数据

1.1 以RAII的方式使用互斥量

在 C++ 中,标准库提供的互斥量是 std::mutex,它被定义在 mutex 这个头文件中。互斥量是锁的一种,它也是一种资源,必须保证资源被正确释放(正确使用互斥量的条件之一)。这就像内存需要delete一样,互斥量在lock()以后必须unlock()解除。

实际上并不推荐这么做,C++标准库为互斥量提供了一个RAII语法的模板类std::lock_guard,在构造时就能提供已锁的互斥量,并在析构的时候进行解锁,从而保证了一个已锁互斥量能被正确解锁。

RAII要求,资源的有效期与持有资源的对象的生命期严格绑定,即由对象的构造函数完成资源的分配(获取),同时由析构函数完成资源的释放。在这种要求下,只要对象能正确地析构,就不会出现资源泄露问题。

std::list<int> some_list;    // 1
std::mutex some_mutex;    // 2

void add_to_list(int new_value)
{
  std::lock_guard<std::mutex> guard(some_mutex);    // 3
  some_list.push_back(new_value);
}

bool list_contains(int value_to_find)
{
  std::lock_guard<std::mutex> guard(some_mutex);    // 4
  return std::find(some_list.begin(),some_list.end(),value_to_find) != some_list.end();
}

有一个全局变量①,这个全局变量被一个全局的互斥量保护②。add_to_list()③和list_contains()④函数中使用std::lock_guard,使得这两个函数中对数据的访问是互斥的:list_contains()不可能看到正在被add_to_list()修改的列表

简单理解:被声明guard的地方在析构前都是被保护的

在C++17中新添加了一个特性,称之为模板类参数推导,这样类似std::locak_guard这样简单的模板类型的模板参数列表可以省略。③和④的代码可以简化成:

std::lock_guard guard(some_mutex);

1.2 限制被保护数据的使用范围

这一节讨论正确使用互斥锁的一个重要前提:必须限制被保护数据的使用范围。简单来说,就是不要将被保护数据的指针或引用通过返回值、函数参数的方式,传到无法控制的范围内。

举一个例子:

class some_data
{
  int a;
  std::string b;
public:
  void do_something();
};

class data_wrapper
{
private:
  some_data data;
  std::mutex m;
public:
  template<typename Function>
  void process_data(Function func)
  {
    std::lock_guard<std::mutex> l(m);
    func(data);    // 1 传递“保护”数据给用户函数
  }
};

some_data* unprotected;

void malicious_function(some_data& protected_data)
{
  unprotected=&protected_data;
}

data_wrapper x;
void foo()
{
  x.process_data(malicious_function);    // 2 传递一个恶意函数
  unprotected->do_something();    // 3 在无保护的情况下访问保护数据
}

例子中process_data看起来没有任何问题,std::lock_guard对数据做了很好的保护,但调用用户提供的函数func①,就意味着foo能够绕过保护机制将函数malicious_function传递进去②,在没有锁定互斥量的情况下调用do_something()

这段代码的问题在于根本没有保护,只是将所有可访问的数据结构代码标记为互斥。函数foo()中调用unprotected->do_something()的代码未能被标记为互斥。

所以,不能将被保护数据的指针或引用以函数返回值的形式,返回或传递给外部不可控的调用者

1.3 定位接口间的条件竞争

std::stack为例,讲解接口设计缺陷导致的条件竞争。

if (! s.empty()){    // 1
  int const value = s.top();    // 2
  s.pop();    // 3
  do_something(value);
}

上面的代码是一套很常规的操作,存在两个竞争:

有可能线程A判断不为空后,放心的去执行2,但在步骤1和2之间,线程B插了进来弹出了最后一个元素,此时线程A执行步骤2就会发生异常。即使我们使用了互斥量也不能保证,这就是接口本身的问题。解决问题的最简单办法就是在if (! s.empty())之后,再加一个try/catch抛出异常,但这样empty就成了摆设。

(2)调用top()pop()之间

上图是可能的操作顺序,很有可能两个线程都取了相同的值,这种条件竞争,然其结果依赖于do_something()的结果,但因为看起来没有任何错误,就会让这个Bug很难定位。


那么为什么STL的设计者还要这样设计接口呢?如果我们auto val = stk.pop()既完成取栈顶的工作,又执行弹出的操作,就会发生一个问题:传值的办法是通过拷贝,如果这里的val是一个vector类似的容器,它的拷贝需要时间,这时如果出现了异常抛出的情况,原来的值就被丢掉了——它既不存在于栈,也不存在于其他变量。

下面是解决的办法,也是一段非常经典的代码:

#include <exception>
#include <memory>
#include <mutex>
#include <stack>

struct empty_stack: std::exception
{
  const char* what() const throw() {
    return "empty stack!";
  };
};

template<typename T>
class threadsafe_stack
{
private:
  std::stack<T> data;
  mutable std::mutex m;

public:
  threadsafe_stack()
    : data(std::stack<T>()){}

  threadsafe_stack(const threadsafe_stack& other)
  {
    std::lock_guard<std::mutex> lock(other.m);
    data = other.data; // 1 在构造函数体中的执行拷贝
  }

  threadsafe_stack& operator=(const threadsafe_stack&) = delete;

  void push(T new_value)
  {
    std::lock_guard<std::mutex> lock(m);
    data.push(new_value);
  }

  std::shared_ptr<T> pop()
  {
    std::lock_guard<std::mutex> lock(m);
    if(data.empty()) throw empty_stack(); // 在调用pop前,检查栈是否为空

    std::shared_ptr<T> const res(std::make_shared<T>(data.top())); // 在修改堆栈前,分配出返回值
    data.pop();
    return res;
  }

  void pop(T& value)
  {
    std::lock_guard<std::mutex> lock(m);
    if(data.empty()) throw empty_stack();

    value=data.top();
    data.pop();
  }

  bool empty() const
  {
    std::lock_guard<std::mutex> lock(m);
    return data.empty();
  }
};

在这段程序中,我们重载了两个pop版本,一个是原始版本(返回void),另一个是pop和top结合版本(返回值)。为了避免之前提到的数据丢失问题,我们采用返回指针的办法。

2. 避免死锁

通俗来说,假如有两个线程分别有互斥量AB,两个线程都在等待对方解锁,这样两个锁AB就会形成死锁。死锁往往是由于不同线程之间不当交互所导致的

从原理上来说,避免死锁的办法是两个互斥量总以相同的顺序上锁解锁:先锁住A,锁住B,先解锁A再解锁B。如果反过来,锁住AB,以BA的形式解锁,就会导致死锁。

void a()
{
    mutex1.lock();
    mutex2.lock();
    doSomething();
    mutex2.unlock();
    mutex1.unlock();
}
void b()
{
    mutex2.lock();
    mutex1.lock();
    doSomething();
    mutex1.unlock();
    mutex2.unlock();
}

上面的代码中假设函数a先锁住了1,b同时锁住了2,这样a要去锁2时需要等待2解锁,b要去锁1时要等待1解锁,这样就凉了。

然而即使我们按顺序上锁,在一些交换操作中,交换了两个锁保护的两个实例,这下就又会发生死锁。因此我们研究几个有效避免死锁的办法:

2.1 std::lock函数

std::lock——可以一次性锁住多个(两个以上)的互斥量,并且没有副作用(死锁风险),下面给出了一个很好的例子:

class some_big_object;
void swap(some_big_object& lhs,some_big_object& rhs);
class X
{
private:
  some_big_object some_detail;
  std::mutex m;
public:
  X(some_big_object const& sd):some_detail(sd){}

  friend void swap(X& lhs, X& rhs)
  {
    if(&lhs==&rhs) //一定要引用,判断地址是否相同而不是值
      return;
    std::lock(lhs.m,rhs.m); // 1
    std::lock_guard<std::mutex> lock_a(lhs.m,std::adopt_lock); // 2
    std::lock_guard<std::mutex> lock_b(rhs.m,std::adopt_lock); // 3
    swap(lhs.some_detail,rhs.some_detail);
  }
};

着重看一下这个友元函数。首先需要判断两个对象是否相同,避免被同时上锁两次。然后,调用std::lock()①锁住两个互斥量,并且两个std:lock_guard实例已经创建好②③。提供std::adopt_lock参数除了表示std::lock_guard对象可获取锁之外,还将锁交由std::lock_guard对象管理,而不需要std::lock_guard对象再去构建新的锁。在退出时,互斥量能被正确解锁。

2.2 std::scoped_lock函数

这是C++17中的新函数,一种新的RAII类型模板类型,与std::lock_guard<>的功能等价,这个新类型能接受不定数量的互斥量类型作为模板参数,以及相应的互斥量(数量和类型)作为构造参数。互斥量支持构造即上锁,与std::lock的用法相同,其解锁阶段是在析构中进行。(注意请确保你的编译器支持17,如果是使用visual studio,需要手动设置语言版本为17)

void swap(X& lhs, X& rhs)
{
  if(&lhs==&rhs)
    return;
  std::scoped_lock guard(lhs.m,rhs.m); // 1
  swap(lhs.some_detail,rhs.some_detail);
}

这里使用了C++17的另一个特性:自动推导模板参数。C++17可以通过隐式参数模板类型推导机制, 通过传递的对形象类型来构造实例1。其等价于:

std::scoped_lock<std::mutex,std::mutex> guard(lhs.m,rhs.m);

3. std::unique_lock

3.1 独占锁的基本用法

std::unique_lock不是一种锁而是一种锁的超集,是在11中引入的锁工具的复合体。

std::unique_lock的构造函数,支持三种加锁模式:

unique_lock( mutex_type& m, std::defer_lock_t t );   //延迟加锁
unique_lock( mutex_type& m, std::try_to_lock_t t ); //尝试加锁
unique_lock( mutex_type& m, std::adopt_lock_t t );   //马上加锁

有丰富的操作函数:

lock()         //阻塞等待加锁
try_lock()        // 非阻塞等待加锁
try_lock_for()  //在一段时间内尝试加锁
try_lock_until()   //在某个时间点之前尝试加锁
unlock()             //临时解锁

std::lock_guard只有在析构时才会解锁,它自己本身没有加锁解锁的接口。而std::unique_lock可以在任意时间临时加锁解锁,在析构时也会自动解锁。下面举一个例子:

使用std::lock_guard我们需要生成两个锁来保护,当然可以用一个锁同时保护123,但这样锁的粒度太大,效率不行。

void shared_print(string msg, int id) {
    {
        std::lock_guard<std::mutex> guard(_mu);
        //do something 1
    }
    //do something 2
    {
        std::lock_guard<std::mutex> guard(_mu);
        // do something 3
    }
}

当我们改为独占锁,可以通过临时上锁解锁,实现精细化的操作。

void shared_print(string msg, int id) {

    std::unique_lock<std::mutex> guard(_mu);
    //do something 1
    guard.unlock(); //临时解锁

    //do something 2

    guard.lock(); //继续上锁
    // do something 3
}

我们甚至可以使用std::defer_lock设置初始化的时候不进行默认的上锁操作:

void shared_print(string msg, int id) {
    std::unique_lock<std::mutex> guard(_mu, std::defer_lock);
    //do something 1

    guard.lock();
    // do something protected
    guard.unlock(); //临时解锁

    //do something 2

    guard.lock(); //继续上锁
    // do something 3
}

此外,独占锁和unique_ptr一样不能复制,但可以移动!而lock_guard两个都不行:

// unique_lock 可以移动,不能复制
std::unique_lock<std::mutex> guard1(_mu);
std::unique_lock<std::mutex> guard2 = guard1;  // error
std::unique_lock<std::mutex> guard2 = std::move(guard1); // ok

下面的示例展示了所有权的传递应用:

std::unique_lock<std::mutex> get_lock()
{
  extern std::mutex some_mutex;
  std::unique_lock<std::mutex> lk(some_mutex);
  prepare_data();
  return lk;  // 1
}
void process_data()
{
  std::unique_lock<std::mutex> lk(get_lock());  // 2
  do_something();
}

构造时,拷贝构造函数是delete,调用移动构造函数,所以不需要std::move

4. 保护共享数据的其他方式

4.1 保护共享数据的初始化过程

延迟初始化(Lazy initialization)在单线程代码很常见——每一个操作都需要先对源进行检查,为了了解数据是否被初始化,然后在其使用前决定,数据是否需要初始化:

std::shared_ptr<some_resource> resource_ptr;
void foo()
{
  if(!resource_ptr)
  {
    resource_ptr.reset(new some_resource);  // 1
  }
  resource_ptr->do_something();
}

转为多线程代码时,只有①处需要保护,这样共享数据对于并发访问就是安全的,我们尝试使用锁来进行保护时会出现一些问题:由于每次我们调用foo()都会检查是否初始化,这样每次都会创建锁,非常影响代码的正常运行。

std::shared_ptr<some_resource> resource_ptr;
std::mutex resource_mutex;

void foo()
{
  std::unique_lock<std::mutex> lk(resource_mutex);  // 所有线程在此序列化 
  if(!resource_ptr)
  {
    resource_ptr.reset(new some_resource);  // 只有初始化过程需要保护 
  }
  lk.unlock();
  resource_ptr->do_something();
}

于是有人针对这种情况提出了双重检查锁模式。由于条件判断1并没有与被锁保护的3同步,这样又会引入新条件的竞争:线程A进入了步骤3,由于C++在new一个对象时,首先会把指针指向分配的那块空间,然后在初始化该空间。此时线程B兴奋得知,resource_ptr不为空了,赶紧执行4,然而事实却是:指针只是指向了某个地方,指向的对象并没有完全初始化完成!

void undefined_behaviour_with_double_checked_locking()
{
  if(!resource_ptr)  // 1
  {
    std::lock_guard<std::mutex> lk(resource_mutex);
    if(!resource_ptr)  // 2
    {
      resource_ptr.reset(new some_resource);  // 3
    }
  }
  resource_ptr->do_something();  // 4
}

为了避免这些情况,C++标准委员会建议我们使用std::call_once函数。这个函数能保证我们在多线程中,某个函数只会被调用一次。

std::once_flag flag1;
void simple_do_once()
{
    std::call_once(flag1, []() { std::cout << "Simple example: called once\n"; });
}
int main()
{
    std::thread st1(simple_do_once);
    std::thread st2(simple_do_once);
    st1.join();
    st2.join();
}

下面展示了使用std::call_once作为类成员的延迟初始化(线程安全)

class X
{
private:
    string connection_handle;
    std::once_flag connection_init_flag;

    void open_connection()
    {
        connection_handle = "4396";
    }
public:
    void send_data()
    {
        for (int i = 0; i < 10; i++)
        {
            std::call_once(connection_init_flag, &X::open_connection, this);
            cout << "Connectiong handle is " << connection_handle << " send data: " <<i<< endl;
        }
    }
    void receive_data()
    {
        for (int i = 0; i < 10; i++)
        {
            std::call_once(connection_init_flag, &X::open_connection, this);
            cout << "Connectiong handle is " << connection_handle << " recieve data: " << i << endl;
        }
    }
};

4.2 保护不常更新的数据结构

有的数据比如DNS,我们很少去改改动他,平常更多的是去读取访问,但偶尔也会对他进行修改。因此我们可以使用读写锁std::shared_lock去管理。所谓「读写锁」,就是同时可以被多个读者拥有,但是只能被一个写者拥有的锁。而所谓「多个读者、单个写者」,并非指程序中只有一个写者(线程),而是说不能有多个写者同时去写。

std::shared_lock往往和shared_mutex搭配使用,下面给一个例子:

class ThreadSafeCounter {
 public:
  ThreadSafeCounter() = default;

  // 多个线程/读者能同时读计数器的值。
  unsigned int get() const {
    std::shared_lock<std::shared_mutex> lock(mutex_);
    return value_;
  }

  // 只有一个线程/写者能增加/写线程的值。
  void increment() {
    std::unique_lock<std::shared_mutex> lock(mutex_);
    value_++;
  }

  // 只有一个线程/写者能重置/写线程的值。
  void reset() {
    std::unique_lock<std::shared_mutex> lock(mutex_);
    value_ = 0;
  }

 private:
  mutable std::shared_mutex mutex_;
  unsigned int value_ = 0;
};