concurrent_vector vs 矢量与互斥锁,线程问题与push_back
concurrent_vector vs vector with mutex, thread issues with push_back
我有一个对象正在由多个任务处理。我多次复制此对象并存储在向量中,以便任务检索其自己的副本以在parallel_for循环中进行处理。下面是带有标准向量的代码。
这个想法是我从一个大小为 0 的向量开始,然后根据并行启动并需要自己的副本的任务数量来增加它。我使用原子"_poolIndex"来跟踪每次运行的全局索引。
Object& GetObject()
{
if (_poolIndex >= _objectPool.size())
{
lock_guard<mutex> lock(_mutex);
Object copy(_original);
_objectPool.push_back(move(copy));
}
int taskIndex = _poolIndex.fetch_add(1);
return _objectPool[taskIndex];
}
我在向量类的以下代码中获取索引越界,即使调试器中断时位置<大小:>
reference operator[](size_type _Pos)
{ // subscript mutable sequence
#if _ITERATOR_DEBUG_LEVEL == 2
if (size() <= _Pos)
{ // report error
_DEBUG_ERROR("vector subscript out of range");
_SCL_SECURE_OUT_OF_RANGE;
}
所以很明显,检索 size() <= _Pos 的部分评估了一些不同的东西......我很困惑,因为我有一个推动矢量的锁。
然后我尝试concurrent_vector,push_back给了我编译问题,这是Visual Studio 2013错误:
错误35 错误 C2059:语法错误:"&"c:\程序文件 (x86)\Microsoft Visual Studio 12.0\vc\include\concurrent_vector.h 1492 1 单元测试
错误36 错误 C2143:语法错误:在")"之前缺少";" c:\program 文件 (x86)\Microsoft Visual Studio 12.0\vc\include\concurrent_vector.h 1492 1 单元测试
在concurrent_vector类中,这是我从 vector 切换concurrent_vector_objectPool of 时出现问题的代码:
void _Init(const void *_Src)
{
for(; _I < _N; ++_I)
new( &_My_array[_I] ) _Ty(*static_cast<const _Ty*>(_Src));
}
如果有人能就上述两个问题提供指导,那就太好了。
我还试图尽量减少关键部分以提高效率。这个想法是在启动算法并多次运行它之后,_objectPool将拥有大部分(如果不是全部)副本已经推送到向量上。
问题
首先,存在数据争用,因为从_poolIndex
读取的两个值(if
和 taskIndex
)不同步。交换它们并在条件中使用taskIndex
,而不是再次读取共享状态。
Object& GetObject()
{
int taskIndex = _poolIndex.fetch_add(1);
if (taskIndex >= _objectPool.size()) // issue #2: size() is not thread-safe
{
lock_guard<mutex> lock(_mutex);
//This: Object copy(_original);
// _objectPool.push_back(move(copy));
// can be simplified to:
_objectPool.push_back(_original); // issue #3: it can push at different index
}
return _objectPool[taskIndex];
}
在某些情况下,第二个问题可能不适用于std::vector
。但它肯定会破坏concurrent_vector的使用(请参阅原因)。
第三个问题是taskIndex
与锁定顺序不同步,因此它可以构造一个对象,但返回尚未构造或分配的对象(超出范围)。
正确的方式
如果我正确理解您的意图,您希望在第二次传递中重用在第一次传递中创建的对象,并在需要时创建更多对象。我将尝试修复以下代码中的问题:
Object& GetObject()
{
int taskIndex = _poolIndex.fetch_add(1); // get current index in the pool
if (taskIndex >= _objectPoolSize.load(memory_order_acquire)) // atomic<size_t>
{
lock_guard<mutex> lock(_mutex);
size_t sz = _objectPoolSize.load(memory_order_relaxed);
if (taskIndex >= sz) { // double-check under the lock
sz *= 2; // or any other factor, get a bunch of new objects at once
_objectPool.resize(sz, _original); // construct new copies of _original
_objectPoolSize.store(sz, memory_order_release);// protect from reorder with resize
}
}
return _objectPool[taskIndex];
}
concurrent_vector
至于concurrent_vector(在 ppl 和 tbb 中都可用),您可能希望使用 grow_to_at_least 来消除锁定。 但是:
Object& GetObject()
{
int taskIndex = _poolIndex.fetch_add(1); // get current index in the pool
// construct new copies of _original if free objects is about to ran out
_objectPool.grow_to_at_least(taskIndex+10/*or other*/, _original );
return _objectPool[taskIndex]; // ISSUE: it might not be constructed yet in other thread
}
它遭受与size()
相同的问题。因此,它要么需要与_objectPoolSize进行一些同步,要么需要与基于零填充分配的构造函数进行每项同步,如同一博客中所述。
- 从不同线程使用int64的不同字节安全吗
- 删除一个线程上有数百万个字符串的大型哈希映射会影响另一个线程的性能
- 在C++中使用cURL和多线程
- 为什么我的C#代码在调用回C++COM直到Task时会暂停.等待/线程.加入
- 在cuda线程之间共享大量常量数据
- 如何将元素添加到数组的线程安全函数?
- 线程,如果else语句,都是错误的上下文切换后,会发生什么
- C++Boost Asio Pool线程,带有lambda函数和传递引用变量
- Qt C++静态thread_local QNetworkAccessManager是线程应用程序的好选择吗
- 异常属于C++中的线程还是进程
- C++中的线程安全删除
- C++使用params创建线程函数会导致转换错误
- 类与私有变量的其他类之间的线程安全性
- CoInitialize()在单独的线程上崩溃而不返回
- c++中的线程池
- 线程之间的布尔停止信号
- 为什么std::async使用同一个线程运行函数
- 用于矢量处理的多个线程
- C++为线程工作动态地分割例程
- std::stack::push和std::stack:pop是线程安全的