如何将元素添加到数组的线程安全函数?

How to thread-safe function that adds elements to array?

本文关键字:线程 安全 函数 数组 元素 添加      更新时间:2023-10-16

我编写了以下函数,它接受一个对象(element),在现有向量(elements)中为其腾出空间,并通过添加新对象来更新向量:

void addElement(const ElementType& element) {
if (numElements == elements.size()) {
elements.resize(boost::extents[numElements+1]);
}
elements[numElements] = element;
numElements++;
}

如何使 MPI 的线程安全?据我了解,每个线程都知道elements的大小,因此我不明白为什么这个函数不是线程安全的。numElements在此函数外部初始化为零,并且是elements向量的大小。

编辑:我正在使用上面写的函数和 mtx 锁定和解锁如下,但最终的elements向量仍然只包含来自第一等级的数据。

#pragma omp parallel for collapse(3) schedule(static)
for (long n0 = mgr->startN0; n0 < mgr->startN0 + mgr->localN0; n0++) {
for (int n1 = 0; n1 < Ν; n1++) {
for (int n2 = 0; n2 < Ν; n2++) {
ElementType element;
std::mutex mtx;
for(int i=0;i<g_field[n0][n1][n2];i++){
... do stuff with element ...
mtx.lock();
#pragma omp critical
addElement(element);
mtx.unlock();}
}
}
}

编辑:出于速度原因,我不得不稍微更改功能及其用途:

void addElements_MPI(std::vector<ElementType> new_batch,std::mutex& mtx) {
std::lock_guard<std::mutex> lock(mtx); 
elements.resize(boost::extents[elements.num_elements()+new_batch.size()]); 
std::copy(new_batch.begin(), new_batch.end(), elements.begin()+numElements); 
numElements += new_batch.size();
}
std::mutex mtx;
std::vector<ElementType> all_elements;
#pragma omp parallel for collapse(3) schedule(static)
for (long n0 = mgr->startN0; n0 < mgr->startN0 + mgr->localN0; n0++) {
for (int n1 = 0; n1 < Ν; n1++) {
for (int n2 = 0; n2 < Ν; n2++) {
ElementType element;
for(int i=0;i<g_field[n0][n1][n2];i++){
... do stuff with element ...
mtx.lock();
#pragma omp critical
all_elements.push_back(element);
mtx.unlock();}
}
}
}
mtx.lock();
#pragma omp critical
addElement_MPI(all_elements,mtx);
mtx.unlock();

首先,让我们看看数据竞争何时发生:

  • 两个或多个线程同时访问同一内存位置
  • 至少有一个访问用于写入
  • 线程未使用任何独占锁来控制其对该内存的访问

在您的情况下,一个线程可以添加元素,而另一个线程正在读取elements。此外,多个线程可以同时添加新元素。要控制他们的访问,您应该锁定共享资源,即elements.

如何在C++中锁定共享资源?

在C++中,您可以使用std::mutex来保护共享数据不被多个线程同时访问。当std::mutex被一个线程锁定时,其他线程无法访问共享资源。解锁std::mutex后,其他线程可以访问该资源。您可以使用std::mutex::lockstd::mutex::unlock,例如:

std::mutex mtx;
ElementsType elements;
...
mtx.lock();
// only one thread accesses this part at a time
// work with elements
mtx.unlock();

开发人员经常忘记一些东西...

由于开发人员经常忘记一些东西...就像解锁被锁定的std::mutex一样...C++提供std::lock_guard(如果是C++11)和std::scoped_lock(如果是C++17)。您可以将std::lock_guardstd::scoped_lock视为某种包装器,它将实例作为构造函数的参数std::mutex并将该std::mutex实例锁定在构造函数中。当std::lock_guardstd::scoped_lock实例被破坏时,std::mutex实例会自动解锁。

void addElement(const ElementType& element) {
std::lock_guard<std::mutex> lock(mtx); // similar to mtx.lock()
if (numElements == elements.size()) {
elements.resize(boost::extents[numElements+1]);
}
elements[numElements] = element;
numElements++;
// no need for mtx.unlock() since lock instance is now destructed and mutex is automatically unlocked
}

编辑

由于std::mutex是所有线程之间的某种通信通道,因此所有线程应共享相同的std::mutex实例,即所有线程都应看到相同的std::mutex实例。

在下面的情况下(我从更新的问题中获取):

for (int n2 = 0; n2 < Ν; n2++) {
ElementType element;
std::mutex mtx;
for(int i=0;i<g_field[n0][n1][n2];i++){
// ... do stuff with element ...
mtx.lock();
#pragma omp critical
addElement(element);
mtx.unlock();}
}
}

std::mutex被创建 N 次,每个线程创建自己的std::mutex实例没有任何意义,因为它们无法相互通信。相反,一个std::mutex实例应该对所有线程可见,就像elements对所有线程可见一样。