使用 std::atomic 的无锁队列

Lockless queue using std::atomic

本文关键字:队列 atomic std 使用      更新时间:2024-05-09

我希望使用std::atomic创建一个无锁队列。
这是我尝试这样做的可能不太好的第一次尝试:

template <typename T>
class atomic_queue
{
public:
using value_type = T;
private:
struct node
{
value_type m_value;
node* m_next;
node* m_prev;
node(const value_type& value) :
m_value(value),
m_next(nullptr),
m_prev(nullptr) {}
};
private:
std::atomic<node*> m_head = nullptr;
std::atomic<node*> m_tail = nullptr;
public:
void push(const value_type& value)
{
auto new_node = new node(value);
node* tmp = nullptr;
if (m_tail.compare_exchange_strong(tmp, new_node))
{
m_head.store(new_node, std::memory_order_relaxed);
return;
}
node* old_tail;
do {
old_tail = m_tail;
new_node->m_prev = old_tail;
} while (!m_tail.compare_exchange_strong(old_tail, new_node));
new_node->m_prev->m_next = new_node;
}
void pop()
{
if (m_head.load(std::memory_order_relaxed) == nullptr)
{
return;
}
node* tmp = nullptr;
node* head = m_head;
if (m_tail.compare_exchange_strong(head, tmp))
{
m_head.store(tmp, std::memory_order_relaxed);
return;
}
node* old_head;
do {
old_head = m_head;
} while (m_head && !m_head.compare_exchange_strong(old_head, old_head->m_next));
if (old_head)
{
delete old_head;
}
}
bool empty()
{
return m_head.load(std::memory_order_relaxed) == nullptr;
}
value_type& front()
{
node* head = m_head.load(std::memory_order_acquire);
return head->m_value;
}
};

这里需要注意的是,我将m_prev存储在node上,以便我可以在成功push后更新m_tailm_next,而无需通过m_tail实际执行此操作,以防它已经被另一个线程更改。因此,即使另一个线程已经push一个新值,当前线程仍然会将它认为m_tailm_next链接到新节点。

现在,据我所知,有一些事情并不是真正线程安全的,我真的想不出解决这些问题的好方法:

让我们假设队列中的thread1pop是唯一的项目,然后我们进入以下 if 语句:

node* tmp = nullptr;
node* head = m_head;
if (m_tail.compare_exchange_strong(head, tmp))
{
// Now thread2 kicks in
m_head.store(tmp, std::memory_order_relaxed);
return;
}

让我们假设thread2在标记的位置启动以向队列push一个新值,则将执行以下语句:

node* tmp = nullptr;
if (m_tail.compare_exchange_strong(tmp, new_node))
{
m_head.store(new_node, std::memory_order_relaxed);
return;
}

让我们假设它完成了它push而不thread1继续,然后thread1继续,然后thread1将执行:

m_head.store(tmp, std::memory_order_relaxed);
return;

并且基本上会通过将m_head设置为nullptr来撤消thread2push。 据我所知,在这种情况下,内存顺序对我无济于事,所以我不确定我的选择是什么?

另一个有问题的场景是,假设我们有 2 个读取器线程thread3thread4做同样的工作:

while (true)
{
if (!q.empty())
{
int v = q.front();
q.pop();
std::stringstream stream;
stream << "thread_3/4: " << v << 'n';
std::cout << stream.str();
}
}

让我们假设队列的大小为 1,因此他们都可以看到队列不为空并获取对前端数据的引用,然后弹出元素并打印相同的结果。
在我看来,锁定在这种情况下会有所帮助,但我不希望使用锁定,我也不希望读取线程关心同步问题,因为接口本身应该是负责的,但由于frontpop是独立的,我看不到处理这个问题的好方法。
还有一个问题front可能会访问nullptr,所以即使在这里我也不确定如何处理这个问题。我可以让界面返回一个原始指针,或者std::optional但在我看来,这两种解决方案似乎都不正确,所以很想听听关于这里应该做什么的意见。

另外,我不确定我是否可以使用 CAS 以外的更便宜的方法,我知道我可以使用独特的插槽方法,其中每个线程通过在类型std::atomic<int> slot的原子上使用fetch_add将索引获取到固定数组中,因此每个线程推送到队列到唯一索引, 但我不喜欢这种方法,因为它限制了固定大小的队列。另一方面,使用newdelete可能也不是最快的事情,我可以使用某种池分配器,但随后我必须确保它也同步,这是一个新的痛苦水平。

我什至不确定这些都是问题,这些是我在实现中可以发现的问题,我确定我没有想到所有事情(或者也许我考虑了?),无论如何很想听听你对所描述问题的想法,也许还有克服它们的方法。

您的实现存在几个问题,其中一些您已经正确识别。

  1. 中科院之后m_head.store行动m_tail
  2. 此循环可能会遇到 ABA 问题:
do {
old_head = m_head;
} while (m_head && !m_head.compare_exchange_strong(old_head, old_head->m_next));
  1. pop中删除节点后,您立即delete它,但此时另一个线程可能仍然引用它并访问它(例如 pop 中的另一个线程),导致释放后使用。(这也称为内存回收问题。>说明:假设两个线程当前在pop内部,并且已将相同的值读取到old_head中。第一个线程继续,m_head上执行 CAS,并在下一步中立即删除old_head。直到现在,第二个线程才继续尝试更新m_head,使用old_head->m_next作为新值。这意味着线程 2取消指向刚刚删除的节点的指针
  2. 您的设计需要两个单独的函数调用才能从队列中弹出项目并获取其值。

设计无锁甚至无锁算法本质上是困难的。问题 2.和 3.两者都可以通过使用内存回收方案来解决。问题 4.通常通过不使用front操作来避免,而是pop返回项目(直接通过std::optional返回,或通过try_pop版本,该版本通过引用获取 out-参数并返回指示操作是否成功的布尔值)。

无论哪种方式,我都建议使用一种已建立的无锁算法,例如迈克尔-斯科特队列。不幸的是,如果您决定实现该算法,您仍然必须处理内存回收问题。

我可以向您推荐我的 Xenium,它不仅提供了 Michael-Scott-Queue 的实现,还提供了几种内存回收方案,以防您仍然想自己进行一些实验,但又想避免安全内存回收的麻烦。


内存回收方案是一种解决内存回收问题的算法。有很多建议的解决方案来解决安全内存回收的问题,如危险指针或基于纪元的回收,但每种方案都有其缺点。这就是为什么内存回收问题仍然被视为共享内存并发中当前最困难的开放问题。有关更多详细信息,我可以参考我的硕士论文 C++中无锁数据结构的有效内存回收。它不仅解释了内存回收问题和大量提出的回收方案,还讨论了我基于通用接口实现其中一些方案。Xenium建立在这项工作的基础上。