叙述方式:
1.背景介绍 (使用场景)
2.讲结论 (无锁队列实现)
3.讲内存序的使用(通用方式)
一、背景
本文通过一个“单生产者-单消费者”模型的场景,讲解基于C++原子操作和内存序实现的无锁队列
在生产者-消费者模型中,两者通过一个共享的任务(消息)队列实现数据传递,同时这个队列需要保证线程安全
1.多生产者-多消费者(MPMC):
- 多个生产者之间需要保证线程安全(不能同时访问任务队列的同一个位置插入任务)
- 多个消费者之间需要保证线程安全(不能同时访问任务队列的同一个位置取出任务)
- 生产者和消费者之间需要线程安全(不能同时操作同一个队列)
2.多生产者-单消费者(SPMC)
- 多个生产者之间需要保证线程安全(不能同时访问任务队列的同一个位置插入任务)
- 生产者和消费者之间需要线程安全(不能同时操作同一个队列)
3.单生产者-多消费者(MPSC)
- 多个消费者之间需要保证线程安全(不能同时访问任务队列的同一个位置取出任务)
- 生产者和消费者之间需要线程安全(不能同时操作同一个队列)
4.单生产者-单消费者(SPSC)
- 生产者和消费者之间需要线程安全(不能同时操作同一个队列)
以上是所有生产-消费者模型的线程安全问题,本文讲解SPSC无锁队列的实现
首先,为什么不用mutex保护临界区?我们需要更好的性能:
二、结论
先贴源码:
template<typename T, size_t Capacity>
class SPSCQueue {
public:SPSCQueue() : head_(0), tail_(0) {}bool enqueue(const T& item) {size_t next_head = (head_ + 1) % Capacity;if (next_head == tail_) return false; // fullbuffer_[head_] = item;std::atomic_thread_fence(std::memory_order_release); // 保证数据写入在更新 head_ 前完成head_ = next_head;return true;}bool dequeue(T& item) {if (tail_ == head_) return false; // emptystd::atomic_thread_fence(std::memory_order_acquire); // 保证读取数据在读取 head_ 后进行item = buffer_[tail_];tail_ = (tail_ + 1) % Capacity;return true;}private:T buffer_[Capacity];size_t head_; // 仅生产者访问size_t tail_; // 仅消费者访问
};
这样一个队列,可以实现无锁化的线程安全,他是怎么做到的?
1.首先head和tail两个队列指针,得益于单生产者-单消费者,天然没有并发安全问题
size_t head_; // 仅由生产者线程访问
size_t tail_; // 仅由消费者线程访问
但是buffer共享,于是分别分析生产者-消费者:
2.enqueue 分析(生产者线程)
buffer_[head_] = item;
std::atomic_thread_fence(std::memory_order_release); // 此行后的更新写操作不能被向前重排,也就保证2、3行的顺序执行
head_ = next_head;
效果:保证数据写入buffer之后,再更新head指针,保证对 buffer_
的写入先于 head_
的更新对消费者可见
3.dequeue 分析(消费者线程)
if (tail_ == head_) return false; // empty
std::atomic_thread_fence(std::memory_order_acquire); // 此行后的读操作不能向前重排,即保证先1后3行,先判断后取值
item = buffer_[tail_];
tail_ = (tail_ + 1) % Capacity;
从 buffer_
读取数据之前,先执行 acquire 屏障;这样可确保读取到的 buffer_
中的数据,是生产者在对应 head_
更新前写入的,也就是避免了消费者误判队列为空
三、内存序的使用
只讲解常用的两个:release和acquire
1.release
// Thread A
data = 42;
flag.store(1, std::memory_order_release);
含义是:
- 编译器和 CPU 不得把
data = 42
移动到flag.store
之后; - 也就是说,在 store-release 之前的写操作,不能重排到 store-release 之后;
- 但 store-release 之后的读写操作(在当前线程中)可以重排到它之前。
store-release 保证的语义:
- 对写线程本身来说,release 之前的写,先于 release 发生;
- 对其他线程来说,如果它通过 acquire-load 看到这个 release-store 的值,它也就能看到这个 store-release 之前的写。
2.acquire
// Thread B
if (flag.load(std::memory_order_acquire) == 1) {int x = data;
}
含义是:
- 编译器和 CPU 不得把
data
的读取(即 load-acquire 之后的操作)重排到 load-acquire 之前; - 但 load-acquire 之前的操作是可以被重排到之后的。
load-acquire 保证的语义:
- acquire 之后的读写不能被重排到前面;
- 如果 acquire-load 成功观察到了 release-store 的值,那么它也能看到写线程在 release-store 之前的写操作。
最后附上刚刚无锁队列代码更加现代化的写法,刚刚实例代码只为叙述原理比较清晰:
template<typename T, size_t Capacity>
class SPSCQueue {
public:SPSCQueue() : head_(0), tail_(0) {}bool enqueue(const T& item) {size_t head = head_.load(std::memory_order_relaxed);size_t next_head = (head + 1) % Capacity;if (next_head == tail_.load(std::memory_order_acquire)) {return false; // full}buffer_[head] = item;head_.store(next_head, std::memory_order_release);return true;}bool dequeue(T& item) {size_t tail = tail_.load(std::memory_order_relaxed);if (tail == head_.load(std::memory_order_acquire)) {return false; // empty}item = buffer_[tail];tail_.store((tail + 1) % Capacity, std::memory_order_release);return true;}private:T buffer_[Capacity];std::atomic<size_t> head_; // 仅生产者写,消费者读std::atomic<size_t> tail_; // 仅消费者写,生产者读
};