并发编程(一)

《现代C++并发编程教程》 —— C++并发编程学习笔记(一)

启动线程

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
#include <iostream>
#include <thread>

void hello() {
    printf("hello world!\n");
}

int main() {
    std::thread my_thread(hello);

}

可以传入函数对象,如上例所示。也可以传入类或者其他重载了 () (callable)运算符的对象,例如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
class task {
public:
    void operator()() const {
        do_something();
        do_something_else();
    }
};

task f;
std::thread my_thread(f);

但这里需要注意一个问题,由于 C++ 的语法问题,有时会造成歧义,例如:

1
std::thread my_thread(task());  // 这会被认为是声明了一个返回值为 thread 的,名为 my_thread 的函数

这里最好使用 {} 运算符来创建一个 thread 对象,如: std::thread my_thread{task()}。同时也可以用匿名函数(lambda表达式)来创建线程:

1
2
3
4
5
6
7
#include <iostream>
#include <thread>

int main() {
    std::thread thread{ [] {std::cout << "Hello World!\n"; } };
    thread.join();
}
当一个线程对象创建时(即 std::thread 对象构造时)就开始执行传入的函数 f 了。

线程管理

启动线程后(构造 std::thread 对象),我们必须在线程的生命周期结束之前,即 std::thread::~thread 调用之前,决定它的执行策略,包括 join()detach()

join()

其中 join() 表示将阻塞关联的线程,直至执行完毕。内部实现会让 std::thread::joinable() 返回 false。否则会返回 true,执行 std::terminate()

detach()

执行了 detach() 后,表示线程对象放弃了对线程资源的所有权,允许此线程的独立运行,在线程退出时释放所有分配的资源。通常不建议使用 detach(),可以用 join() 替代。

可以提供一个类,RAII(Resource Acquisition Initilization)地确保线程执行完成,线程对象正常析构释放资源:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
class thread_guard {
    std::thread& m_t;
public:
    explicit thread_guard(std::thread& t) : m_t{ t } {}
    ~thread_guard() {
        std::puts("析构");     // 打印日志 不用在乎
        if (m_t.joinable()) { // 线程对象当前关联了活跃线程
            m_t.join();
        }
    }
    thread_guard(const thread_guard&) = delete;
    thread_guard& operator=(const thread_guard&) = delete;
};
void f() {
    int n = 0;
    std::thread t{ func{n},10 };
    thread_guard g(t);
    f2(); // 可能抛出异常
}

传递参数

向可调用对象传递参数,只需要将这些参数作为 std::thread 的构造参数即可。

需要注意的是,这些参数会复制到新线程的内存空间中,即使函数中的参数是引用,依然实际是复制。
1
2
3
4
void f(int, const int& a);

int n = 1;
std::thread t{ f, 3, n };

线程对象 t 的构造没有问题,可以通过编译,但是这个 n 实际上并没有按引用传递,而是按值复制的。如果我们的 f 的形参类型不是 const 的引用,则会产生一个编译错误。可以用标准库的 std::refstd::cref 函数模版。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
void f(int, int& a) {
    std::cout << &a << '\n';
}

int main() {
    int n = 1;
    std::cout << &n << '\n';
    std::thread t { f, 3, std::ref(n) };
    t.join();
}

共享数据

我们都知道线程通信的方式有临界区、互斥量、信号量、条件变量、读写锁

  1. 临界区:每个线程中访问临界资源的那段代码称为临界区(Critical Section)(临界资源是一次仅允许一个线程使用的共享资源)。每次只准许一个线程进入临界区,进入后不允许其他线程进入。不论是硬件临界资源,还是软件临界资源,多个线程必须互斥地对它进行访问。在临界区中,通常会使用同步机制,比如我们要讲的互斥量(Mutex)
  2. 互斥量:采用互斥对象机制,只有拥有互斥对象的线程才可以访问。因为互斥对象只有一个,所以可以保证公共资源不会被多个线程同时访问。
  3. 信号量:计数器,允许多个线程同时访问同一个资源。
  4. 条件变量:通过条件变量通知操作的方式来保持多线程同步。
  5. 读写锁:读写锁与互斥量类似。但互斥量要么是锁住状态,要么就是不加锁状态。读写锁一次只允许一个线程写,但允许一次多个线程读,这样效率就比互斥锁要高。

如果有以下情况,出现数据竞争情况。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
std::vector<int> v;

void f() { v.emplace_back(1); }
void f2() { v.erase(v.begin()); }

int main() {
    std::thread t{ f };
    std::thread t2{ f2 };
    t.join();
    t2.join();
    std::cout << v.size() << '\n';  // 有时出现段错误,有时输出0,不稳定的输出结果
}

这里我们可以用互斥量来解决这一问题。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>

std::mutex m;
std::vector<int> v;

void f() {
    m.lock();
    v.emplace_back(1);
    m.unlock();
}

void f2() {
    m.lock();
    v.erase(v.begin());
    m.unlock();
}

int main() {
    std::thread t{ f };
    std::thread t2{ f2 };
    t.join();
    t2.join();
    std::cout << v.size() << '\n';  // 稳定输出0
}

另外一个例子,使用 mutex 互斥量前:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
void f() {
    // this_thread::get_id() 表示获取当前线程的唯一标识符,以便在多线程程序中区分不同的线程。
    std::cout << std::this_thread::get_id() << '\n';  
}

int main() {
    std::vector<std::thread> threads;
    for (std::size_t i = 0; i < 10; ++i)
        threads.emplace_back(f);

    for (auto& thread : threads)
        thread.join();
}
这里有一个点,正好说明一下 push_back()emplace_back() 的区别
  1. 如果要用 push_back,则需要先构造一个 thread 临时对象:threads.push_back(std::thread(f));
  2. 而如果用 emplace_back,该方法允许在 vector 末尾直接构造对象,而无需创建临时对象。它接受构造函数的参数,并在适当的位置直接调用构造函数。这样可以减少不必要的对象创建和移动操作,提高性能:threads.emplace_back(f);
std::mutexstd::shared_mutex 区别

std::mutex: 独占锁,同一时刻只能有一个线程访问线程资源
std::shared_mutex:读写锁,允许多个线程同时读取,但写入时会独占(C++ 17)引入

使用后:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
#include <mutex> // 必要标头
std::mutex m;

void f() {
    m.lock();
    std::cout << std::this_thread::get_id() << '\n';
    m.unlock();
}

int main() {
    std::vector<std::thread>threads;
    for (std::size_t i = 0; i < 10; ++i)
        threads.emplace_back(f);

    for (auto& thread : threads)
        thread.join();
}

当多个线程执行函数 f 的时候,只有一个线程能成功调用 lock() 给互斥量上锁,其他所有的线程 lock() 的调用将阻塞执行,直至获得锁。第一个调用 lock() 的线程得以继续往下执行,执行我们的 std::cout 输出语句,不会有任何其他的线程打断这个操作。直到线程执行 unlock(),就解锁了互斥量。那么其他线程此时也就能再有一个成功调用 lock。至于是哪个线程才会成功调用,这个是由操作系统调度决定的

这里我理解的是,“锁” 是一种广泛的概念,可以有多种实现方式,c++ 中的互斥量 mutex 可以用来实现锁

std::lock_guard

一般来说,不建议直接使用互斥量 mutex 显式地进行 lock()unlock()。可以用 C++11 标准引入的管理类 std::lock_guard:

1
2
3
4
void f() {
    std::lock_guard<std::mutex> lc{ m };  // 等价于 m.lock(),超出作用域调用析构来 unlock
    std::cout << std::this_thread::get_id() << '\n';
}

std::lock_guard 实现比较简单,可以看它在 MSVC STL 中的实现。

我们要尽可能的让互斥量上锁的粒度小,只用来确保必须的共享资源的线程安全。

“粒度”通常用于描述锁定的范围大小,较小的粒度意味着锁定的范围更小,因此有更好的性能和更少的竞争。

比如有的时候可以看到这样的写法:

1
2
3
4
5
6
7
8
void f() {
    //code..
    {
        std::lock_guard<std::mutex> lc{ m };
        // 涉及共享资源的修改的代码...
    }
    //code..
}

使用 {} 创建了一个块作用域,限制了对象 lc 的生存期,进入作用域构造 lock_guard 的时候上锁(lock),离开作用域析构的时候解锁(unlock)。

举一个具体的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
std::mutex m;

void add_to_list(int n, std::list<int>& list) {
    std::vector<int> numbers(n + 1);
    std::iota(numbers.begin(), numbers.end(), 0);  // iota是对vector进行递增(默认递增1)赋值的方法,0是起始值
    int sum = std::accumulate(numbers.begin(), numbers.end(), 0);  // 0是起始值

    {
        std::lock_guard<std::mutex> lc{ m };
        list.push_back(sum);
    }
}
void print_list(const std::list<int>& list){
    std::lock_guard<std::mutex> lc{ m };
    for(const auto& i : list){
        std::cout << i << ' ';
    }
    std::cout << '\n';
}

// ...... //

std::list<int> list;
std::thread t1{ add_to_list,i,std::ref(list) };  // 上面提到过,传参即使是引用,也会被复制,需要用 std::ref
std::thread t2{ add_to_list,i,std::ref(list) };
std::thread t3{ print_list,std::cref(list) };  // const 引用需要用 std::cref
std::thread t4{ print_list,std::cref(list) };
t1.join();
t2.join();
t3.join();
t4.join();

这里的共享数据只有 list, 先看 add_to_list,只有 list.push_back(sum) 涉及到了对共享数据的修改,需要进行保护,因此我们用 {} 包裹。

函数 print_list() 打印 list,给整个函数上锁,同一时刻只能有一个线程执行。我们代码是多个线程执行这两个函数,两个函数共享了一个锁,这样确保了当执行函数 print_list() 打印的时候,list 的状态是确定的。打印函数 print_listadd_to_list 函数的修改操作同一时间只能有一个线程在执行。print_list() 不可能看到正在被 add_to_list() 修改的 list

至于到底哪个函数哪个线程会先执行,执行多少次,这些都由操作系统调度决定,也完全有可能连续 4 次都是执行函数 print_list 的线程成功调用 lock,会打印出了一样的值,这都很正常。

try_lock

try_lock 是互斥量中的一种尝试上锁的方式。与常规的 lock 不同,try_lock 会尝试上锁,但如果锁已经被其他线程占用,则不会阻塞当前线程,而是立即返回。

它的返回类型是 bool ,如果上锁成功就返回 true,失败就返回 false

这种方法在多线程编程中很有用,特别是在需要保护临界区的同时,又不想线程因为等待锁而阻塞的情况下。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
std::mutex mtx;

void thread_function(int id) {
    // 尝试加锁
    if (mtx.try_lock()) {
        std::cout << "线程:" << id << " 获得锁" << std::endl;
        // 临界区代码
        std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟临界区操作
        mtx.unlock(); // 解锁
        std::cout << "线程:" << id << " 释放锁" << std::endl;
    } else {
        std::cout << "线程:" << id << " 获取锁失败 处理步骤" << std::endl;
    }
}

如果有两个线程运行这段代码,必然有一个线程无法成功上锁,要走 else 的分支。

1
2
3
4
5
std::thread t1(thread_function, 1);
std::thread t2(thread_function, 2);

t1.join();
t2.join();

可能的运行结果:

1
2
3
线程:1 获得锁
线程:2 获取锁失败 处理步骤
线程:1 释放锁

小心

切勿将受保护数据的指针或引用传递到互斥量作用域之外,不然保护将形同虚设。下面是一个具体例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class Data {
    int a{};
    std::string b{};
public:
    void do_something() {
        // 修改数据成员等...
    }
};

class Data_wrapper {
    Data data;
    std::mutex m;
public:
    template<class Func>
    void process_data(Func func) {
        std::lock_guard<std::mutex> lc{m};
        func(data);  // 受保护数据传递给函数
    }
};

Data* p = nullptr;

void malicious_function(Data& protected_data) {
    p = &protected_data; // 受保护的数据被传递到外部
}

Data_wrapper d;

void foo() {
    d.process_data(malicious_function);  // 传递了一个恶意的函数
    p->do_something();                   // 在无保护的情况下访问保护数据
}

成员函数模板 process_data 看起来一点问题也没有,使用 std::lock_guard 对数据做了保护,但是调用方传递了 malicious_function 这样一个恶意的函数,使受保护数据传递给外部,可以在没有被互斥量保护的情况下调用 do_something()

死锁:问题与解决

两个线程需要对它们所有的互斥量做一些操作,其中每个线程都有一个互斥量,且等待另一个线程的互斥量解锁。因为它们都在等待对方释放互斥量,没有线程工作。 这种情况就是死锁。一般只有多个互斥量才会遇到死锁问题

死锁发生的四个必要条件
  1. 互斥(资源一次只能被一个线程占用)
  2. 持有且等待(线程已持有资源,并等待其他资源)
  3. 不可抢占(已持有资源不能被强制释放)
  4. 循环等待(多个线程互相等待对方释放资源)

避免死锁的一般建议是让两个互斥量以相同的顺序上锁,总在互斥量 B 之前锁住互斥量 A,就通常不会死锁。反面示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
std::mutex m1,m2;
std::size_t n{};

void f() {
    std::lock_guard<std::mutex> lc1{ m1 };
    std::lock_guard<std::mutex> lc2{ m2 };
    ++n;
}
void f2() {
    std::lock_guard<std::mutex> lc1{ m2 };
    std::lock_guard<std::mutex> lc2{ m1 };
    ++n;
}

ff2 因为互斥量上锁顺序不同,就有死锁风险。函数 f 先锁定 m1,然后再尝试锁定 m2,而函数 f2 先锁定 m2 再锁定 m1 。如果两个线程同时运行,它们就可能(具体获得锁的顺序由操作系统调度决定,上面阐述过)会彼此等待对方释放其所需的锁,从而造成死锁。

但有时候即使固定了锁的顺序,依旧会产生问题。当有多个互斥量保护同一个类的对象时,对于相同类型的两个不同对象进行数据的交换操作,为了保证数据交换的正确性,就要避免其它线程修改,确保每个对象的互斥量都锁住自己要保护的区域。如果按照前面的的选择一个固定的顺序上锁解锁,则毫无意义,比如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
struct X {
    X(const std::string& str) :object{ str } {}

    friend void swap(X& lhs, X& rhs);
private:
    std::string object;
    std::mutex m;
};

void swap(X& lhs, X& rhs) {
    if (&lhs == &rhs) return;
    std::lock_guard<std::mutex> lock1{ lhs.m };
    std::lock_guard<std::mutex> lock2{ rhs.m };
    swap(lhs.object, rhs.object);
}

考虑用户调用的时候将参数交换,就会产生死锁:

1
2
3
X a{ "🤣" }, b{ "😅" };
std::thread t{ [&] {swap(a, b); } };  // 1
std::thread t2{ [&] {swap(b, a); } }; // 2

1 执行的时候,先上锁 a 的互斥量,再上锁 b 的互斥量。

2 执行的时候,先上锁 b 的互斥量,再上锁 a 的互斥量。

完全可能线程 A 执行 1 的时候上锁了 a 的互斥量,线程 B 执行 2 上锁了 b 的互斥量。线程 A 往下执行需要上锁 b 的互斥量,线程 B 则要上锁 a 的互斥量执行完毕才能解锁,哪个都没办法往下执行,死锁。

如何解决?可以使用 C++ 标准库中的 std::lock,它能一次性锁住多个互斥量,并且没有死锁风险。修改后 swap 代码如下:

1
2
3
4
5
6
7
void swap(X& lhs, X& rhs) {
    if (&lhs == &rhs) return;
    std::lock(lhs.m, rhs.m);    // 给两个互斥量上锁
    std::lock_guard<std::mutex> lock1{ lhs.m,std::adopt_lock };
    std::lock_guard<std::mutex> lock2{ rhs.m,std::adopt_lock };
    swap(lhs.object, rhs.object);
}

因为前面已经使用了 std::lock 上锁,所以后面的 std::lock_guard 构造都额外传递了一个 std::adopt_lock 参数,让其选择到不上锁的构造函数。函数退出也能正常解锁。

std::locklhs.mrhs.m 上锁时若抛出异常,则在重抛前对任何已锁的对象调用 unlock() 解锁,也就是 std::lock 要么将互斥量都上锁,要么一个都不锁。

C++17 新增了 std::scoped_lock ,提供此函数的 RAII 包装,通常它比裸调用 std::lock 更好。

所以我们前面的代码可以改写为:

1
2
3
4
5
void swap(X& lhs, X& rhs) {
    if (&lhs == &rhs) return;
    std::scoped_lock guard{ lhs.m,rhs.m };
    swap(lhs.object, rhs.object);
}

使用 std::scoped_lock 可以将所有 std::lock 替换掉,减少错误发生。也可以用 std::unique_lock(可以简单理解为 std::lock_guard 的升级版,具有额外的功能,为更复杂的锁做准备),详情见标准库文档。

总结,避免死锁要注意:

  1. 避免嵌套锁:线程获取一个锁时,就别再获取第二个锁。每个线程只持有一个锁,自然不会产生死锁。如果必须要获取多个锁,使用 std::lock
  2. 避免在持有锁时调用外部代码
  3. 使用固定顺序获取锁

读写锁

如果需要多线程读取写(多线程读不存在数据竞争;而写和读共存时存在竞争),使用 std::mutex 开销较大。这时可以用专门的读写锁,即 std::shared_timed_mutex (C++ 14),std::shared_mutex (C++ 17)。示例代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
class Settings {
private:
    std::map<std::string, std::string> data_;
    mutable std::shared_mutex mutex_; // “M&M 规则”:mutable 与 mutex 一起出现

public:
    void set(const std::string& key, const std::string& value) {
        std::lock_guard<std::shared_mutex> lock{ mutex_ };
        data_[key] = value;
    }

    std::string get(const std::string& key) const {
        std::shared_lock<std::shared_mutex> lock(mutex_);
        auto it = data_.find(key);
        return (it != data_.end()) ? it->second : ""; // 如果没有找到键返回空字符串
    }
};

使用互斥量实现并发读写锁(字节 Data AML 一面手撕题)

我们知道,写操作是独占的;而读操作是非独占的,即多个线程可以同时读。如果多线程访问某个共享变量时,每次访问时都加上一个互斥锁,这样开销会非常大。

所以我们期望在多个线程试图读取共享变量的时候,它们可以立刻获取因为读而加的锁,而不是需要等待前一个线程释放。当然,如果一个线程用写锁锁住了临界区,那么其他线程无论是读还是写都会发生阻塞。

Image

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#include <iostream>
#include <mutex>
#include <thread>

class ReadWriteLock {
private:
    std::mutex readMutex;     // 用于保护读操作计数器的互斥量
    std::mutex writeMutex;    // 用于保护写操作的互斥量
    int readCount = 0;        // 记录当前正在进行读操作的线程数量
    bool writing = false;     // 表示当前是否有线程正在进行写操作

public:
    void readLock() {
        // 锁定读操作互斥量
        std::unique_lock<std::mutex> readLock(readMutex);

        // 增加读操作计数器
        ++ readCount;

        // 等待没有写操作时进行读操作
        while (writing) {
            readLock.unlock();
            std::this_thread::yield();  // 让出CPU,避免忙等待
            readLock.lock();
        }
    }

    void readUnlock() {
        // 锁定读操作互斥量并减少读操作计数器
        std::lock_guard<std::mutex> readLock(readMutex);
        -- readCount;
    }

    void writeLock() {
        // 锁定写操作互斥量
        std::unique_lock<std::mutex> writeLock(writeMutex);

        // 等待没有读操作和写操作时进行写操作
        while (readCount > 0 || writing) {
            writeLock.unlock();
            std::this_thread::yield();  // 让出CPU,避免忙等待
            writeLock.lock();
        }

        // 标记正在进行写操作
        writing = true;
    }

    void writeUnlock() {
        // 锁定写操作互斥量并标记写操作完成
        std::lock_guard<std::mutex> writeLock(writeMutex);
        writing = false;
    }
};

// 示例使用
int sharedData = 0;
ReadWriteLock lock;

void reader() {
    lock.readLock();
    std::cout << "Reading: " << sharedData << std::endl;
    lock.readUnlock();
}

void writer() {
    lock.writeLock();
    ++ sharedData;
    std::cout << "Writing: " << sharedData << std::endl;
    lock.writeUnlock();
}

int main() {
    std::thread readers[5];
    std::thread writers[3];

    // 创建多个读线程和写线程
    for (int i = 0; i < 5; ++i) {
        readers[i] = std::thread(reader);
    }
    for (int i = 0; i < 3; ++i) {
        writers[i] = std::thread(writer);
    }

    // 等待所有线程执行完毕
    for (auto& reader : readers) {
        reader.join();
    }
    for (auto& writer : writers) {
        writer.join();
    }

    return 0;
}

这段代码中,锁定操作用的是 std::unique_lock,而解锁操作用的是 std::lock_guard。这两者都是 C++ 11 引入的 RAII 包装。正如上面所述,unique_locklock_guard 的升级版(更灵活,常与条件变量的 wait()notify_one()notify_all()配合使用),这里锁定时需要解锁和重新锁定互斥量。这对于需要在等待条件满足时解锁互斥量并让出 CPU 的场景非常有用。

yitao 支付宝支付宝
yitao 微信微信
0%