并发编程(二)

《现代C++并发编程教程》 —— C++并发编程学习笔记(二)

等待事件或条件

假设你正在一辆夜间运行的地铁上,那么你要如何在正确的站点下车呢?

1.一直不休息,每一站都能知道,这样就不会错过你要下车的站点,但是这会很疲惫。

这种方法被称为“忙等待(busy waiting)”也称 “自旋“

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
bool flag = false;
std::mutex m;

void wait_for_flag() {
    std::unique_lock<std::mutex> lk{ m };
    while (!flag){
        lk.unlock();    // 1 解锁互斥量
        lk.lock();      // 2 上锁互斥量
    }
}

2.可以看一下时间,估算一下地铁到达目的地的时间,然后设置一个稍早的闹钟,就休息。这个方法听起来还行,但是你可能被过早的叫醒,甚至估算错误导致坐过站,又或者闹钟没电了睡过站。

第二种方法就是加个延时,这种实现进步了很多,减少浪费的执行时间,但很难确定正确的休眠时间。这会影响到程序的行为,在需要快速响应的程序中就意味着丢帧或错过了一个时间片。循环中,休眠 ② 前函数对互斥量解锁 ,再休眠结束后再对互斥量上锁,让另外的线程有机会获取锁并设置标识(因为修改函数和等待函数共用一个互斥量)。

1
2
3
4
5
6
7
8
void wait_for_flag() {
    std::unique_lock<std::mutex> lk{ m };
    while (!flag){
        lk.unlock();    // 1 解锁互斥量
        std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 2 休眠
        lk.lock();      // 3 上锁互斥量
    }
}

3.事实上最简单的方式是,到站的时候有人或者其它东西能将你叫醒(比如手机的地图,到达设置的位置就提醒)。

第三种方式(也是最好的)实际上就是使用条件变量了。通过另一线程触发等待事件的机制是最基本的唤醒方式,这种机制就称为“条件变量”。

C++ 标准库对条件变量有两套实现:std::condition_variablestd::condition_variable_any,这两个实现都包含在 <condition_variable> 这个头文件中。

condition_variable_any 类是 std::condition_variable 的泛化。相对于只在 std::unique_lock<std::mutex> 上工作的 std::condition_variablecondition_variable_any 能在任何满足 可基本锁定(BasicLockable) 要求的锁上工作,所以增加了 _any 后缀。显而易见,这种区分必然是 any 版更加通用但是却有更多的性能开销。所以通常首选 std::condition_variable。有特殊需求,才会考虑 std::condition_variable_any

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
std::mutex mtx;  // 创建了一个互斥量,用于保护共享数据的访问,确保在多线程环境下的数据同步。
std::condition_variable cv;  // 创建了一个条件变量,用于线程间的同步,当条件不满足时,线程可以等待,直到条件满足时被唤醒。
bool arrived = false;  // 设置了一个标志位,表示是否到达目的地。

void wait_for_arrival() {
    std::unique_lock<std::mutex> lck(mtx);  // 使用互斥量创建了一个独占锁。
    cv.wait(lck, []{ return arrived; }); // 阻塞当前线程,释放(unlock)锁,直到条件被满足。
    std::cout << "到达目的地,可以下车了!" << std::endl;
}

void simulate_arrival() {
    std::this_thread::sleep_for(std::chrono::seconds(5)); // 模拟地铁到站,假设5秒后到达目的地
    {
        std::lock_guard<std::mutex> lck(mtx);
        arrived = true; // 设置条件变量为 true,表示到达目的地
    }
    cv.notify_one(); // 通知等待的线程
}

这样,当 simulate_arrival 函数执行后,arrived 被设置为 true,并且通过 cv.notify_one() 唤醒了等待在条件变量上的线程,从而使得 wait_for_arrival 函数中的等待结束,可以执行后续的操作,即输出提示信息。

条件变量的 wait 成员函数有两个版本,以上代码使用的就是第二个版本,传入了一个谓词。

1
2
3
4
void wait(std::unique_lock<std::mutex>& lock);                 // 1

template<class Predicate>
void wait(std::unique_lock<std::mutex>& lock, Predicate pred); // 2

②等价于:

1
2
while (!pred())
    wait(lock);

第二个版本只是对第一个版本的包装,等待并判断谓词,会调用第一个版本的重载。这可以避免 虚假唤醒

条件变量虚假唤醒是指在使用条件变量进行线程同步时,有时候线程可能会在没有收到通知的情况下被唤醒。问题取决于程序和系统的具体实现。解决方法很简单,在循环中等待并判断条件可一并解决。使用 C++ 标准库则没有这个烦恼了。

线程安全的队列

这里介绍一个更为复杂的示例,用于巩固条件变量的学习。在实现一个线程安全的队列过程中,需要注意两点内容:

  1. 当执行 push 操作时,需要确保没有其他线程正在执行 pushpop 操作;同样,在执行 pop 操作时,也需要确保没有其他线程正在执行 pushpop 操作。
  1. 当队列为空时,不应该执行 pop 操作。因此,我们需要使用条件变量来传递一个谓词,以确保在执行 pop 操作时队列不为空。

以下是一个线程安全的模版类 threadsafe_queue

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
template<typename T>
class threadsafe_queue {
    mutable std::mutex m;              // 互斥量,用于保护队列操作的独占访问
    std::condition_variable data_cond; // 条件变量,用于在队列为空时等待
    std::queue<T> data_queue;          // 实际存储数据的队列
public:
    threadsafe_queue() {}              // 无参构造
    void push(T new_value) {
        {
            std::lock_guard<std::mutex> lk { m };
            data_queue.push(new_value);
        }
        data_cond.notify_one();
    }
    // 从队列中弹出元素(阻塞直到队列不为空)
    void pop(T& value) {
        std::unique_lock<std::mutex> lk{ m };
        data_cond.wait(lk, [this] {return !data_queue.empty(); });  // 这里的 this 表示按值传递 this,见 lambda 表达式用法
        value = data_queue.front();
        data_queue.pop();
    }
    // 从队列中弹出元素(阻塞直到队列不为空),并返回一个指向弹出元素的 shared_ptr
    std::shared_ptr<T> pop() {
        std::unique_lock<std::mutex> lk{ m };
        data_cond.wait(lk, [this] {return !data_queue.empty(); });
        std::shared_ptr<T> res { std::make_shared<T>(data_queue.front()) };
        data_queue.pop();
        return res;
    }
    bool empty()const {
        std::lock_guard<std::mutex> lk (m);
        return data_queue.empty();
    }
};

使用 future

举个例子,我们在车站等车,你可能会做一些别的事情打发时间,比如学习现代C++并发编程教程、玩手机等,但始终在等待一件事情:车到站。

C++ 标准库将这种事件称为 future。它用于处理线程中需要等待某个事件的情况,线程知道预期结果。等待的同时也可以执行其它的任务。

C++ 标准库有两种 future,都声明在 <future> 头文件中:独占的 std::future 、共享的 std::shared_future。它们的区别与 std::unique_ptrstd::shared_ptr 类似。std::future 只能与单个指定事件关联,而 std::shared_future 能关联多个事件。它们都是模板,它们的模板类型参数,就是其关联的事件(函数)的返回类型。当多个线程需要访问一个独立 future 对象时, 必须使用互斥量或类似同步机制进行保护。而多个线程访问同一共享状态,若每个线程都是通过其自身的 shared_future 对象副本进行访问,则是安全的。

最简单有效的使用是,我们先前讲的 std::thread 在线程中执行任务是没有返回值的,这个问题就能使用 future 解决。

创建异步任务获取返回值

假设需要执行一个耗时任务并获取其返回值,但是并不急切的需要它。那么就可以启动新线程计算,然而 std::thread 没提供直接从线程获取返回值的机制。所以我们可以使用 std::async 函数模板。

使用 std::async 启动一个异步任务,它会返回一个 std::future 对象,这个对象和任务关联,将持有最终计算出来的结果。当需要任务执行完的结果的时候,只需要调用 get() 成员函数,就会阻塞直到 future 为就绪为止(即任务执行完毕),返回执行结果。valid() 成员函数检查 future 当前是否关联共享状态,即是否当前关联任务。还未关联,或者任务已经执行完(调用了 get()set()),都会返回 false

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
#include <iostream>
#include <thread>
#include <future> // 引入 future 头文件

int task(int n) {
    std::cout << "异步任务 ID: " << std::this_thread::get_id() << '\n';
    return n * n;
}

int main() {
    std::future<int> future = std::async(task, 10);
    std::cout << "main: " << std::this_thread::get_id() << '\n';
    std::cout << std::boolalpha << future.valid() << '\n'; // true
    std::cout << future.get() << '\n';
    std::cout << std::boolalpha << future.valid() << '\n'; // false
}

关于 std::async 的参数传递,这里不再展开记录,用时再查。

信号量

信号量是一个非常轻量简单的同步设施(在 C++ 20中被引入),它维护一个计数,这个计数不能小于 0。信号量提供两种基本操作:释放(增加计数)和等待(减少计数)。如果当前信号量的计数值为 0,那么执行“等待”操作的线程将会一直阻塞,直到计数大于 0,也就是其它线程执行了 “释放” 操作。

C++ 提供了两个信号量类型:std::counting_semaphorestd::binary_semaphore,定义在 <semaphore> 中。其中 binary_semaphore 只是 counting_semaphore 的一个特化别名(其 LeastMaxValue 为1,LeastMaxValue 意思是信号量维护的计数最大值。):

1
using binary_semaphore = counting_semaphore<1>;

举个具体使用信号量的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 全局二元信号量对象
// 设置对象初始计数为 0
std::binary_semaphore smph_signal_main_to_thread{ 0 };
std::binary_semaphore smph_signal_thread_to_main{ 0 };

void thread_proc() {
    smph_signal_main_to_thread.acquire();
    std::cout << "[线程] 获得信号" << std::endl;

    std::this_thread::sleep_for(3s);

    std::cout << "[线程] 发送信号\n";
    smph_signal_thread_to_main.release();
}

int main() {
    std::jthread thr_worker{ thread_proc };

    std::cout << "[主] 发送信号\n";
    smph_signal_main_to_thread.release();

    smph_signal_thread_to_main.acquire();
    std::cout << "[主] 获得信号\n";
}

结果:

1
2
3
4
[主] 发送信号
[线程] 获得信号
[线程] 发送信号
[主] 获得信号

acquire 函数就是我们先前说的“等待”(原子地减少计数),release 函数就是"释放"(原子地增加计数)。

提示

信号量常用于 发信/提醒 而非互斥,通过初始化该信号量为 0 从而阻塞尝试 acquire() 的接收者,直至提醒者通过调用 release(n) “发信”。在此方面可把信号量当作条件变量的替代品,通常它有更好的性能。

假设我们有一个 Web 服务器,它只能处理有限数量的并发请求。为了防止服务器过载,我们可以使用信号量来限制并发请求的数量。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 定义一个信号量,最大并发数为 3
std::counting_semaphore<3> semaphore{ 3 };  // counting_semaphore 轻量同步原语,允许同一资源进行多个并发的访问,至少允许 LeastMaxValue 个同时访问者

void handle_request(int request_id) {
    // 请求到达,尝试获取信号量
    std::cout << "进入 handle_request 尝试获取信号量\n";

    semaphore.acquire();

    std::cout << "成功获取信号量\n";

    // 此处延时三秒可以方便测试,会看到先输出 3 个“成功获取信号量”,因为只有三个线程能成功调用 acquire,剩余的会被阻塞
    std::this_thread::sleep_for(3s);

    // 模拟处理时间
    std::random_device rd;
    std::mt19937 gen{ rd() };
    std::uniform_int_distribution<> dis(1, 5);
    int processing_time = dis(gen);
    std::this_thread::sleep_for(std::chrono::seconds(processing_time));

    std::cout << std::format("请求 {} 已被处理\n", request_id);

    semaphore.release();
}

int main() {
    // 模拟 10 个并发请求
    std::vector<std::jthread> threads;
    for (int i = 0; i < 10; ++i) {
        threads.emplace_back(handle_request, i);
    }
}

牢记信号量的基本的概念不变,计数的值不能小于 0,如果当前信号量的计数值为 0,那么执行 “等待”(acquire) 操作的线程将会一直阻塞。明白这点,那么就都不存在问题。

yitao 支付宝支付宝
yitao 微信微信
0%