concurrent_queue内存消耗爆炸式增长,然后程序崩溃

concurrent_queue memory consumption exploded, then program crashed

本文关键字:然后 程序 崩溃 爆炸式 queue 内存 concurrent      更新时间:2023-10-16

这是一个典型的使用VS 2010并发队列的生产者/消费者模式,问题是当我运行程序时,内存消耗超过1GB,然后程序崩溃,有人能指出这段代码中的问题吗?

#include <iostream>
#include <fstream>
#include <string>
#include <cstdlib> 
#include <ctime> 
#include <boostshared_ptr.hpp>
#include <boostthread.hpp>
#include <concurrent_queue.h>

void wait2(int milliseconds)
{
    boost::this_thread::sleep(boost::posix_time::milliseconds(milliseconds)); 
}
class CQueue
{
    Concurrency::concurrent_queue<int>  Q;
    boost::mutex                m;
    boost::condition_variable   cv;
public:
    CQueue():QValue(-1)
    {
    }
    int QRead()
    {
        while(Q.empty())
        {
            boost::unique_lock<boost::mutex> lk(m);
            cv.wait(lk);
        }
        int res;
        if(Q.try_pop(res))
        {
            QValue = res;
            return true;
        }
        return false;
    }
    void QWrite(int i)
    {
        Q.push(i);
        cv.notify_one();
    }
    int QValue;
};
CQueue myqueue;
void write()
{
    int i = 0;
    while(true)
    {
        myqueue.QWrite(++i);
    }
}

void read()
{
    while(true)
    {
        if( myqueue.QRead())
            std::cout << myqueue.QValue << std::endl;
        else
            std::cout << "failed to read" << std::endl;
    }
}
void main ()
{
    boost::thread w(write);
    boost::thread r(read);
    w.join();
    r.join();
}

我在一个简单的双核上用VS'13和Boost 1.52构建并测试了您的代码。

正如已经说过的,因为当库存(concurrent_queue)达到给定级别时,你的生产者-消费者设计没有定义阻止生产者的阈值,生产者在队列中推送了太多数据,因此内存增加,窗口开始交换、冻结,如果超过最大提交大小,进程可能会崩溃,等等。。。。

请注意,提交大小限制取决于几个因素、编译器、编译器选项、运行程序的操作系统。。。

因此,在下面的文章中,我添加了一种方法,在队列大小达到阈值时阻止生产者,如果队列大小低于阈值,消费者唤醒生产者。

通过这些更改,我们添加了一些同步,这可能会限制并行性,但使用中的内存是可控的。

#include <iostream>
#include <fstream>
#include <string>
#include <cstdlib> 
#include <ctime> 
#include "......boostboostshared_ptr.hpp"
#include "......boostboostthread.hpp"
#include <concurrent_queue.h>
#define STOCK_THRESHOLD 1000
void wait2(int milliseconds)
{
    boost::this_thread::sleep(boost::posix_time::milliseconds(milliseconds)); 
}
class CQueue
{
    Concurrency::concurrent_queue<int>  Q;
    boost::mutex                consumerMutex;
    boost::condition_variable   consumerCV;
    boost::mutex                producerMutex;
    boost::condition_variable   producerCV;
public:
    CQueue():QValue(-1)
    {
    }
    int QRead()
    {
        while(Q.empty())
        {
            boost::unique_lock<boost::mutex> lk(consumerMutex);
            consumerCV.wait(lk);
        }
        int res;
        if(Q.try_pop(res))
        {
            QValue = res;
            if(Q.unsafe_size() <= STOCK_THRESHOLD)
            {
                producerCV.notify_one();
            }
            return true;
        }
        return false;
    }
    void QWrite(int i)
    {
        while(Q.unsafe_size() > STOCK_THRESHOLD){
            boost::unique_lock<boost::mutex> lk(producerMutex);
            producerCV.wait_for(lk, boost::chrono::milliseconds(10));
        }
        Q.push(i);
        consumerCV.notify_one();
    }
    int QValue;
};
CQueue myqueue;
void write()
{
    int i = 0;
    while(true)
    {
        myqueue.QWrite(++i);
    }
}

void read()
{
    while(true)
    {
        if( myqueue.QRead())
            std::cout << myqueue.QValue << std::endl;
        else
            std::cout << "failed to read" << std::endl;
    }
}
void main ()
{
    boost::thread w(write);
    boost::thread r(read);
    w.join();
    r.join();
}

代码会丢失来自条件变量的通知,因此您的使用者线程睡眠时间过长,因此消耗速度不够快。

想象一下可以想象的线程序列:

    Producer                       Consumer
--+-----------------------------+-------------------------------------------------------
1 |                             |  while(Q.empty())
2 |   Q.push(i);                |  boost::unique_lock<boost::mutex> lk(consumerMutex);
3 |   consumerCV.notify_one();  |
4 |                             |  consumerCV.wait(lk); // notification from 3 gets lost

要修复此问题,必须在consumerCV.notify_one()之前的生产者中发出条件信号时以及在Q.empty()之前的消费者中检查队列状态时保持互斥。

您可以通过注释掉所有互斥和条件变量调用,并将使用者更改为像while(Q.empty()) /* busy-wait */;一样的繁忙等待,来轻松地检查这一点。

如果concurrent_queue不提供等待项可用的函数,那么最好使用一个封装在互斥锁中的非线程安全容器。因为它仍然需要一个互斥锁和一个条件变量来正确地通知使用无锁或无等待容器所获得的好处已经丢失。

此外,由于生产者只生产++i,而消费者通过打印每个值来生产更多的值,因此消费者不可能跟上生产者的步伐,从而导致队列的堆积和最终的内存耗尽。

相关文章: