避免多线程应用程序中潜在的死锁/内存泄漏

Avoiding a potential deadlock / memory leak in a multithreaded application

本文关键字:死锁 内存 泄漏 多线程 应用程序      更新时间:2023-10-16

短版本

如何处理生成一组线程、运行一些自定义(在实现时未指定(回调的非原子性?下面描述了几种可能的解决方案,使用线程池似乎是唯一好的解决方案。有标准的处理方法吗?不需要发布完整的C++解决方案、伪代码或简短的描述就足够了。性能是一个重要方面。

尽管这看起来微不足道,但我相信下面的代码片段出现在许多现有的应用程序中,许多(最初的,可能还有一些高级的(程序员可能会在没有意识到危险的情况下编写类似的结构。pthread/C++11std::thread/WinAPI和许多其他低级多线程库也存在同样的问题。因此,这是一个重要的问题。

长版本

我正在设计一些多线程应用程序,并决定制作一个实用程序函数,其中派生了几个线程。这可能是一个非常常见的代码,它出现在我的许多应用程序中(除非他们使用OpenMP(:

void ParallelCall(void (*function)(int, int), int numThreads)
{
    Thread *threads = new Thread[numThreads - 1];
    for(int i = 1; i < numThreads; ++ i) {
        if(threads[i - 1].start(&function, i, numThreads)) // this might fail
            abort(); // what now?
    }
    (*function)(0, numThreads);
    // use the calling thread as thread 0
    for(int i = 1; i < numThreads; ++ i)
        threads[i - 1].join();
    delete[] threads;
}

这更像是一个用于说明问题的伪代码。正在创建和派生一堆线程(Thread对象包装一个pthread线程(。然后他们做了一些事情,最后他们加入了。

现在的问题是:如果由于任何原因,一些线程无法启动(可能是资源耗尽或每个用户的限制(,该怎么办?我知道如何检测它的发生,但我不确定如何处理它

我想我应该等待成功启动的线程完成,然后抛出异常。但是,如果function中的代码包含一些同步(例如屏障(,这很容易导致死锁,因为其他预期线程将永远不会生成。

或者,我可以立即抛出一个异常,忽略正在运行的线程,但随后我保留了已分配的包装器对象,导致内存泄漏(并且永远不会加入派生的线程(。

像杀死正在运行的线程这样的事情似乎不是一个好主意(坦率地说,我不太确定强行杀死多线程应用程序的线程会产生什么结果——似乎内存会处于未定义的状态,这在很大程度上很难处理——如果回调function分配内存,它本身可能会导致更多的内存泄漏(。

在让所有线程进入回调function之前,插入一个等待所有线程启动的过程,这在性能方面似乎令人难以忍受(尽管这可以很容易地解决问题(。另一种选择是有一个带有相关FIFO的派生线程池,等待任务,但线程数量有问题(我会生成与逻辑CPU一样多的线程,但如果numThreads更大呢?我基本上会在代码中重新实现OS的调度器(。

这通常是如何解决的?有更好的方法吗?如果不是,潜在的(取决于回调function中的内容(死锁是否比内存泄漏更好?

如何解决此问题:

创建每个线程的方式是,它在允许开始用户的工作函数之前等待一个sentinel(您需要一个调用它的lambda(如果任何线程无法启动,请设置一个标志,指示现有线程应立即全部完成,而不是执行用户的功能。在出现错误的情况下,连接已启动的线程。然后根据需要退出并返回错误代码或异常(异常更好(。

现在,您的函数是线程安全的,不会泄漏内存。

编辑:这里有一些代码可以做你想做的事情,包括一个测试。如果您想强制模拟线程失败,请使用定义为1INTRODUCE_FAILURE重新编译

#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
#include <memory>
#include <atomic>
#include <system_error>
#include <condition_variable>
#define INTRODUCE_FAILURE 0
// implementation
void ParallelCall(void (*function)(int, int), int numThreads)
{
    std::vector<std::thread> threads;
    threads.reserve(numThreads-1);
    std::atomic<bool> mustAbort ( false );
    std::atomic<bool> mayRun ( false );
    std::mutex conditionMutex;
    std::condition_variable runCondition;
    for(int i = 1; i < numThreads; ++ i) {
        try {
            #if INTRODUCE_FAILURE == 1
            if (i == 3) {
                throw std::system_error(99, std::generic_category(),  "the test deliberately failed a thread");
            }
            #endif
            threads.emplace_back( std::thread{ [i, numThreads, function
                                , &mustAbort
                                , &conditionMutex
                                , &runCondition
                                , &mayRun]()->int {
                std::unique_lock<std::mutex> myLock(conditionMutex);
                runCondition.wait(myLock, [&mayRun]()->bool { 
                    return mayRun;
                });
                myLock.unlock();
                // wait for permission
                if (!mustAbort) {
                    function(i, numThreads);
                }
                return 0;
            }} );
        }
        catch(std::exception& e) { // will be a std::system_error
            mustAbort = true;
            std::unique_lock<std::mutex> myLock(conditionMutex);
            mayRun = true;
            conditionMutex.unlock();
            runCondition.notify_all();
            for(auto& t : threads) {
                t.join();
            }
            throw;
        }
    }
    std::unique_lock<std::mutex> myLock(conditionMutex);
    mayRun = true;
    conditionMutex.unlock();
    runCondition.notify_all();
    function(0, numThreads);
    // use the calling thread as thread 0
    for(auto& t : threads) {
        t.join();
    }
}
// test
using namespace std;
void testFunc(int index, int extent) {
    static std::mutex outputMutex;
    unique_lock<mutex> myLock(outputMutex);
    cout << "Executing " << index << " of " << extent << endl;
    myLock.unlock();
    this_thread::sleep_for( chrono::milliseconds(2000) );
    myLock.lock();
    cout << "Finishing " << index << " of " << extent << endl;
    myLock.unlock();
}
int main()
{
    try {
        cout << "initiating parallel call" << endl;
        ParallelCall(testFunc, 10);
        cout << "parallel call complete" << endl;
    }
    catch(std::exception& e) {
        cout << "Parallel call failed because: " << e.what() << endl;
    }
   return 0;
}

成功输出示例:

Compiling the source code....
$g++ -std=c++11 main.cpp -o demo -lm -pthread -lgmpxx -lgmp -lreadline 2>&1
Executing the program....
$demo 
initiating parallel call
Executing 0 of 10
Executing 1 of 10
Executing 4 of 10
Executing 5 of 10
Executing 8 of 10
Executing 2 of 10
Executing 7 of 10
Executing 6 of 10
Executing 9 of 10
Executing 3 of 10
Finishing 1 of 10
Finishing 5 of 10
Finishing 2 of 10
Finishing 9 of 10
Finishing 8 of 10
Finishing 4 of 10
Finishing 3 of 10
Finishing 0 of 10
Finishing 6 of 10
Finishing 7 of 10
parallel call complete

故障输出示例:

Compiling the source code....
$g++ -std=c++11 main.cpp -o demo -lm -pthread -lgmpxx -lgmp -lreadline 2>&1
Executing the program....
$demo 
initiating parallel call
Parallel call failed because: the test deliberately failed a thread: Cannot assign requested address

最后是一个恳求——不要把你的图书馆放在世界上。std::线程库非常全面,如果这还不够的话,我们有OpenMP、TBB等。

让已经创建的线程在退出其threadproc之前完成丢失的工作以获得帮助如何?

List _StillBornWork;
void ParallelCall(void (*function)(int, int), int numThreads)
{
    Thread *threads = new Thread[numThreads - 1];
    for(int i = 1; i < numThreads; ++ i) {
        if(threads[i - 1].start(&function, i, numThreads)) {
            _StillBornWork.Push(i);
        }
    }
    (*function)(0, numThreads);
    // use the calling thread as thread 0
    for(int i = 1; i < numThreads; ++ i)
        threads[i - 1].join();
    delete[] threads;
}
ThreadProc(int i) {
  while(1) {
    do work
    // Here we see if there was any work that didn't get done because its thread
    // was stilborn.  In your case, the work is indicated by the integer i.
    // If we get work, loop again, else break.
    if (!_StillBornWork.Pop(&i))
      break;  // no more work that wasn't done.
  }
}