concurrent_queue内存消耗爆炸式增长,然后程序崩溃
concurrent_queue memory consumption exploded, then program crashed
这是一个典型的使用VS 2010并发队列的生产者/消费者模式,问题是当我运行程序时,内存消耗超过1GB,然后程序崩溃,有人能指出这段代码中的问题吗?
#include <iostream>
#include <fstream>
#include <string>
#include <cstdlib>
#include <ctime>
#include <boostshared_ptr.hpp>
#include <boostthread.hpp>
#include <concurrent_queue.h>
void wait2(int milliseconds)
{
boost::this_thread::sleep(boost::posix_time::milliseconds(milliseconds));
}
class CQueue
{
Concurrency::concurrent_queue<int> Q;
boost::mutex m;
boost::condition_variable cv;
public:
CQueue():QValue(-1)
{
}
int QRead()
{
while(Q.empty())
{
boost::unique_lock<boost::mutex> lk(m);
cv.wait(lk);
}
int res;
if(Q.try_pop(res))
{
QValue = res;
return true;
}
return false;
}
void QWrite(int i)
{
Q.push(i);
cv.notify_one();
}
int QValue;
};
CQueue myqueue;
void write()
{
int i = 0;
while(true)
{
myqueue.QWrite(++i);
}
}
void read()
{
while(true)
{
if( myqueue.QRead())
std::cout << myqueue.QValue << std::endl;
else
std::cout << "failed to read" << std::endl;
}
}
void main ()
{
boost::thread w(write);
boost::thread r(read);
w.join();
r.join();
}
我在一个简单的双核上用VS'13和Boost 1.52构建并测试了您的代码。
正如已经说过的,因为当库存(concurrent_queue)达到给定级别时,你的生产者-消费者设计没有定义阻止生产者的阈值,生产者在队列中推送了太多数据,因此内存增加,窗口开始交换、冻结,如果超过最大提交大小,进程可能会崩溃,等等。。。。
请注意,提交大小限制取决于几个因素、编译器、编译器选项、运行程序的操作系统。。。
因此,在下面的文章中,我添加了一种方法,在队列大小达到阈值时阻止生产者,如果队列大小低于阈值,消费者唤醒生产者。
通过这些更改,我们添加了一些同步,这可能会限制并行性,但使用中的内存是可控的。
#include <iostream>
#include <fstream>
#include <string>
#include <cstdlib>
#include <ctime>
#include "......boostboostshared_ptr.hpp"
#include "......boostboostthread.hpp"
#include <concurrent_queue.h>
#define STOCK_THRESHOLD 1000
void wait2(int milliseconds)
{
boost::this_thread::sleep(boost::posix_time::milliseconds(milliseconds));
}
class CQueue
{
Concurrency::concurrent_queue<int> Q;
boost::mutex consumerMutex;
boost::condition_variable consumerCV;
boost::mutex producerMutex;
boost::condition_variable producerCV;
public:
CQueue():QValue(-1)
{
}
int QRead()
{
while(Q.empty())
{
boost::unique_lock<boost::mutex> lk(consumerMutex);
consumerCV.wait(lk);
}
int res;
if(Q.try_pop(res))
{
QValue = res;
if(Q.unsafe_size() <= STOCK_THRESHOLD)
{
producerCV.notify_one();
}
return true;
}
return false;
}
void QWrite(int i)
{
while(Q.unsafe_size() > STOCK_THRESHOLD){
boost::unique_lock<boost::mutex> lk(producerMutex);
producerCV.wait_for(lk, boost::chrono::milliseconds(10));
}
Q.push(i);
consumerCV.notify_one();
}
int QValue;
};
CQueue myqueue;
void write()
{
int i = 0;
while(true)
{
myqueue.QWrite(++i);
}
}
void read()
{
while(true)
{
if( myqueue.QRead())
std::cout << myqueue.QValue << std::endl;
else
std::cout << "failed to read" << std::endl;
}
}
void main ()
{
boost::thread w(write);
boost::thread r(read);
w.join();
r.join();
}
代码会丢失来自条件变量的通知,因此您的使用者线程睡眠时间过长,因此消耗速度不够快。
想象一下可以想象的线程序列:
Producer Consumer
--+-----------------------------+-------------------------------------------------------
1 | | while(Q.empty())
2 | Q.push(i); | boost::unique_lock<boost::mutex> lk(consumerMutex);
3 | consumerCV.notify_one(); |
4 | | consumerCV.wait(lk); // notification from 3 gets lost
要修复此问题,必须在consumerCV.notify_one()
之前的生产者中发出条件信号时以及在Q.empty()
之前的消费者中检查队列状态时保持互斥。
您可以通过注释掉所有互斥和条件变量调用,并将使用者更改为像while(Q.empty()) /* busy-wait */;
一样的繁忙等待,来轻松地检查这一点。
如果concurrent_queue
不提供等待项可用的函数,那么最好使用一个封装在互斥锁中的非线程安全容器。因为它仍然需要一个互斥锁和一个条件变量来正确地通知使用无锁或无等待容器所获得的好处已经丢失。
此外,由于生产者只生产++i
,而消费者通过打印每个值来生产更多的值,因此消费者不可能跟上生产者的步伐,从而导致队列的堆积和最终的内存耗尽。
- boost::asio如何生成多个协同程序,然后加入它们
- 将加密消息从 php 发送到 C++ 应用程序,然后使用 CryptoPP 进行解密
- 在 c++ 中运行命令,然后在该程序中运行命令
- 该程序将.csv文件中的一系列单词放入数组中,然后随机生成句子.但它不起作用
- 编写一个程序,提示用户输入一个整数,然后输出数字的单个数字和数字的总和
- 如何创建一个C++程序来读取字符串数组中的信息,然后将其排序到类中?
- arm_data在第二次运行我的程序时中止失败,然后
- VS2015中内置的程序的发布版本与Qt运行然后关闭没有错误?
- 如何在C++中挂起程序,然后"取消挂起"它
- 我的程序在另一行中读取名字然后是姓氏,而我需要它一起读取名字和姓氏
- 如果用户想要输入 x 数量的数字,我如何设法跟踪它然后结束程序?
- 有没有办法从程序中打开一个新控制台,然后进入其中
- 在计算机级别,应用程序以二进制方式运行,然后当发生运行时错误时,它如何呈现回错误代码
- QT创建者的运行应用程序会出现错误QXCBConnection,然后流产
- 运行在DLL中的工作线程在应用程序关闭时被终止,然后才能正常关闭它们
- 基本C++应用程序具有额外的输出,然后应该
- 当我递增指针然后将其删除时,为什么我的程序会崩溃
- 在我的程序中使用EOF,然后连续循环
- 打开一个 Shell 程序,向其传递参数,然后返回结果
- concurrent_queue内存消耗爆炸式增长,然后程序崩溃