concurrent_vector vs 矢量与互斥锁,线程问题与push_back

concurrent_vector vs vector with mutex, thread issues with push_back

本文关键字:线程 push back 问题 vs vector concurrent      更新时间:2023-10-16

我有一个对象正在由多个任务处理。我多次复制此对象并存储在向量中,以便任务检索其自己的副本以在parallel_for循环中进行处理。下面是带有标准向量的代码。

这个想法是我从一个大小为 0 的向量开始,然后根据并行启动并需要自己的副本的任务数量来增加它。我使用原子"_poolIndex"来跟踪每次运行的全局索引。

    Object& GetObject()
    {
        if (_poolIndex >= _objectPool.size())
        {
            lock_guard<mutex> lock(_mutex);
            Object copy(_original);
            _objectPool.push_back(move(copy));
        }
        int taskIndex = _poolIndex.fetch_add(1);
        return _objectPool[taskIndex];
    }

我在向量类的以下代码中获取索引越界,即使调试器中断时位置<大小:>

reference operator[](size_type _Pos)
{   // subscript mutable sequence
        #if _ITERATOR_DEBUG_LEVEL == 2
if (size() <= _Pos)
    {   // report error
    _DEBUG_ERROR("vector subscript out of range");
    _SCL_SECURE_OUT_OF_RANGE;
    }

所以很明显,检索 size() <= _Pos 的部分评估了一些不同的东西......我很困惑,因为我有一个推动矢量的锁。

然后我尝试concurrent_vector,push_back给了我编译问题,这是Visual Studio 2013错误:

错误

35 错误 C2059:语法错误:"&"c:\程序文件 (x86)\Microsoft Visual Studio 12.0\vc\include\concurrent_vector.h 1492 1 单元测试

错误

36 错误 C2143:语法错误:在")"之前缺少";" c:\program 文件 (x86)\Microsoft Visual Studio 12.0\vc\include\concurrent_vector.h 1492 1 单元测试

在concurrent_vector类中,这是我从 vector 切换concurrent_vector_objectPool of 时出现问题的代码:

    void _Init(const void *_Src)
    {
        for(; _I < _N; ++_I)
            new( &_My_array[_I] ) _Ty(*static_cast<const _Ty*>(_Src));
    }

如果有人能就上述两个问题提供指导,那就太好了。

我还试图尽量减少关键部分以提高效率。这个想法是在启动算法并多次运行它之后,_objectPool将拥有大部分(如果不是全部)副本已经推送到向量上。

问题

首先,存在数据争用,因为从_poolIndex读取的两个值(iftaskIndex )不同步。交换它们并在条件中使用taskIndex,而不是再次读取共享状态。

Object& GetObject()
{
    int taskIndex = _poolIndex.fetch_add(1);
    if (taskIndex >= _objectPool.size())    // issue #2: size() is not thread-safe
    {
        lock_guard<mutex> lock(_mutex);
        //This: Object copy(_original);
        //      _objectPool.push_back(move(copy));
        // can be simplified to:
        _objectPool.push_back(_original); // issue #3: it can push at different index
    }
    return _objectPool[taskIndex];
}

在某些情况下,第二个问题可能不适用于std::vector。但它肯定会破坏concurrent_vector的使用(请参阅原因)。

第三个问题是taskIndex与锁定顺序不同步,因此它可以构造一个对象,但返回尚未构造或分配的对象(超出范围)。

正确的方式

如果我正确理解您的意图,您希望在第二次传递中重用在第一次传递中创建的对象,并在需要时创建更多对象。我将尝试修复以下代码中的问题:

Object& GetObject()
{
    int taskIndex = _poolIndex.fetch_add(1);                // get current index in the pool
    if (taskIndex >= _objectPoolSize.load(memory_order_acquire)) // atomic<size_t>
    {
        lock_guard<mutex> lock(_mutex);
        size_t sz = _objectPoolSize.load(memory_order_relaxed);
        if (taskIndex >= sz) {                              // double-check under the lock
            sz *= 2;                  // or any other factor, get a bunch of new objects at once
            _objectPool.resize(sz, _original);              // construct new copies of _original
            _objectPoolSize.store(sz, memory_order_release);// protect from reorder with resize
        }
    }
    return _objectPool[taskIndex];
}

concurrent_vector

至于concurrent_vector(在 ppl 和 tbb 中都可用),您可能希望使用 grow_to_at_least 来消除锁定。 但是:

Object& GetObject()
{
    int taskIndex = _poolIndex.fetch_add(1); // get current index in the pool
    // construct new copies of _original if free objects is about to ran out
    _objectPool.grow_to_at_least(taskIndex+10/*or other*/, _original );
    return _objectPool[taskIndex];  // ISSUE: it might not be constructed yet in other thread
}

它遭受与size()相同的问题。因此,它要么需要与_objectPoolSize进行一些同步,要么需要与基于零填充分配的构造函数进行每项同步,如同一博客中所述。