如何读取STD ::队列与另一个线程共享
How to read std::queue shared with another thread?
我的代码获取图像并处理它们。性能对于我的代码至关重要,因此我尝试了多线程。目前,我只是将获取部分作为单独的线程。我正在使用存储获得的图像的std::queue
实现一个简单的FIFO缓冲区。采集功能AcquireImages
将原始图像数据无限期地写入此缓冲区,直到用户中断为止。处理功能,ProcessImages
读取缓冲区并处理图像数据(当前在主线程中,但一旦我解决了问题,我也打算将其作为单独的线程)。这是我的代码(修改以形成MCV示例):
#include <iostream>
#include <vector>
#include <queue>
#include <atomic>
#include <thread>
#define NUM_CAMERAS 2
void AcquireImages(std::queue<unsigned char*> &rawImageQueue, std::atomic<bool> &quit)
{
unsigned char* rawImage{};
while (!quit)
{
for (int camera = 0; camera < NUM_CAMERAS; camera++)
{
switch (camera)
{
case 0:
rawImage = (unsigned char*)"Cam0Image";
break;
case 1:
rawImage = (unsigned char*)"Cam1Image";
break;
default:
break;
}
rawImageQueue.push(std::move(rawImage));
}
}
}
int ProcessImages(const std::vector<unsigned char*> &rawImageVec, const int count)
{
// Do something to the raw image vector
if (count > 10)
{
return 1;
}
else
{
return 0;
} // In my application, this function only returns non-zero upon user interception.
}
int main()
{
// Preparation
std::vector<unsigned char*> rawImageVec;
rawImageVec.reserve(NUM_CAMERAS);
std::queue<unsigned char*> rawImageQueue;
int count{};
const unsigned int nThreads = 1; // this might grow later
std::atomic<bool> loopFlags[nThreads];
std::thread threads[nThreads];
// Start threads
for (int i = 0; i < nThreads; i++) {
loopFlags[i] = false;
threads[i] = std::thread(AcquireImages, rawImageQueue, ref(loopFlags[i]));
}
// Process images
while (true)
{
// Process the images
for (int cam{}; cam < NUM_CAMERAS; ++cam)
{
rawImageVec.push_back(rawImageQueue.front());
rawImageQueue.pop();
}
int processResult = ProcessImages(move(rawImageVec), count);
if (processResult)
{
std::cout << "Leaving while loop.n"; // In my application this is triggered by the user
break;
}
rawImageVec.clear();
++count;
}
// Shutdown other threads
for (auto & flag : loopFlags) {
flag = true;
}
// Wait for threads to actually finish.
for (auto& thread : threads) {
thread.join();
}
return 0;
}
你们中有些人可能已经注意到我的错误。我知道的是,该程序在rawImageVec.push_back(rawImageQueue.front());
上引发了例外。
投掷异常后的输出如下:
Debug Assertion Failed!
Program: C:WINDOWSSYSTEM32MSVCP140D.dll
File: c:program files (x86)microsoft visual studio 14.0vcincludedeque
Line: 329
Expression: deque iterator not dereferencable
我了解问题的原因可能是我正在阅读与另一个线程共享的东西(我是正确的吗?)。我如何解决这个问题?
我在评论中遵循了Praetorian的建议,在检查rawImageQueue
是否为空之后,我看到它总是空的。我不确定是什么原因引起的。
这是共享队列中生产者/消费者的广义示例。这个想法是,如果您正在从数据结构中撰写和阅读,则需要围绕访问的某种保护。
为此,下面的示例使用条件变量和静音。
#include <thread>
#include <iostream>
#include <chrono>
#include <queue>
#include <mutex>
#include <vector>
#include <condition_variable>
using namespace std::chrono_literals;
using std::vector;
using std::thread;
using std::unique_lock;
using std::mutex;
using std::condition_variable;
using std::queue;
class WorkQueue
{
condition_variable work_available;
mutex work_mutex;
queue<int> work;
public:
void push_work(int item)
{
unique_lock<mutex> lock(work_mutex);
bool was_empty = work.empty();
work.push(item);
lock.unlock();
if (was_empty)
{
work_available.notify_one();
}
}
int wait_and_pop()
{
unique_lock<mutex> lock(work_mutex);
while (work.empty())
{
work_available.wait(lock);
}
int tmp = work.front();
work.pop();
return tmp;
}
};
int main() {
WorkQueue work_queue;
auto producer = [&]() {
while (true) {
work_queue.push_work(10);
std::this_thread::sleep_for(2ms);
}
};
vector<thread> producers;
producers.push_back(std::thread(producer));
producers.push_back(std::thread(producer));
producers.push_back(std::thread(producer));
producers.push_back(std::thread(producer));
std::thread consumer([&]() {
while (true)
{
int work_to_do = work_queue.wait_and_pop();
std::cout << "Got some work: " << work_to_do << std::endl;
}
});
std::for_each(producers.begin(), producers.end(), [](thread &p) {
p.join();
});
consumer.join();
}
您的情况相对简单,因为您只有一个生产者和一个消费者。另外,图像处理听起来很慢(足够慢,不必担心线程争夺),并且您正在从单线程版本中切换,因此可能无需打扰高效的无锁实现。
我建议您研究此伪代码:https://en.wikipedia.org/wiki/wiki/producer–Consumer_problemer#problemer#monitors_monitors,然后学习条件变量,如果需要:.cppreference.com/w/cpp/thread/procenty_variable。
- 为什么我的多线程作业队列崩溃
- 在c++队列中使用pop和visit实现线程安全
- 共享队列的线程安全
- 线程安全队列 c++
- 将项目添加到队列时运行线程
- 线程安全的引用计数队列C++
- 对C++中的队列进行多线程访问
- asio::io_service 具有多个线程的优先级队列处理
- C++11如何在1个线程中使用条件变量处理2个线程安全队列
- QtThread:I/O 队列的工作线程
- 如何做 gtkmm 线程安全队列绘制?
- 线程安全队列出现分段错误
- 一个好的线程池队列大小
- 从调度队列块 [ swift / c++ ] 创建线程
- Qt的事件循环线程是安全的还是原子的?处理"队列连接"时如何同步?
- IPC Unix 消息队列线程安全吗?
- boost消息队列线程安全和进程安全吗?
- 用管道在C++中创建调度队列/线程处理程序:FIFO溢出
- 转储关键节队列(线程正在等待)
- 是否有一种方法来队列线程