避免多线程应用程序中潜在的死锁/内存泄漏
Avoiding a potential deadlock / memory leak in a multithreaded application
短版本:
如何处理生成一组线程、运行一些自定义(在实现时未指定(回调的非原子性?下面描述了几种可能的解决方案,使用线程池似乎是唯一好的解决方案。有标准的处理方法吗?不需要发布完整的C++解决方案、伪代码或简短的描述就足够了。性能是一个重要方面。
尽管这看起来微不足道,但我相信下面的代码片段出现在许多现有的应用程序中,许多(最初的,可能还有一些高级的(程序员可能会在没有意识到危险的情况下编写类似的结构。pthread/C++11std::thread
/WinAPI和许多其他低级多线程库也存在同样的问题。因此,这是一个重要的问题。
长版本:
我正在设计一些多线程应用程序,并决定制作一个实用程序函数,其中派生了几个线程。这可能是一个非常常见的代码,它出现在我的许多应用程序中(除非他们使用OpenMP(:
void ParallelCall(void (*function)(int, int), int numThreads)
{
Thread *threads = new Thread[numThreads - 1];
for(int i = 1; i < numThreads; ++ i) {
if(threads[i - 1].start(&function, i, numThreads)) // this might fail
abort(); // what now?
}
(*function)(0, numThreads);
// use the calling thread as thread 0
for(int i = 1; i < numThreads; ++ i)
threads[i - 1].join();
delete[] threads;
}
这更像是一个用于说明问题的伪代码。正在创建和派生一堆线程(Thread
对象包装一个pthread线程(。然后他们做了一些事情,最后他们加入了。
现在的问题是:如果由于任何原因,一些线程无法启动(可能是资源耗尽或每个用户的限制(,该怎么办?我知道如何检测它的发生,但我不确定如何处理它
我想我应该等待成功启动的线程完成,然后抛出异常。但是,如果function
中的代码包含一些同步(例如屏障(,这很容易导致死锁,因为其他预期线程将永远不会生成。
或者,我可以立即抛出一个异常,忽略正在运行的线程,但随后我保留了已分配的包装器对象,导致内存泄漏(并且永远不会加入派生的线程(。
像杀死正在运行的线程这样的事情似乎不是一个好主意(坦率地说,我不太确定强行杀死多线程应用程序的线程会产生什么结果——似乎内存会处于未定义的状态,这在很大程度上很难处理——如果回调function
分配内存,它本身可能会导致更多的内存泄漏(。
在让所有线程进入回调function
之前,插入一个等待所有线程启动的过程,这在性能方面似乎令人难以忍受(尽管这可以很容易地解决问题(。另一种选择是有一个带有相关FIFO的派生线程池,等待任务,但线程数量有问题(我会生成与逻辑CPU一样多的线程,但如果numThreads
更大呢?我基本上会在代码中重新实现OS的调度器(。
这通常是如何解决的?有更好的方法吗?如果不是,潜在的(取决于回调function
中的内容(死锁是否比内存泄漏更好?
如何解决此问题:
创建每个线程的方式是,它在允许开始用户的工作函数之前等待一个sentinel(您需要一个调用它的lambda(如果任何线程无法启动,请设置一个标志,指示现有线程应立即全部完成,而不是执行用户的功能。在出现错误的情况下,连接已启动的线程。然后根据需要退出并返回错误代码或异常(异常更好(。
现在,您的函数是线程安全的,不会泄漏内存。
编辑:这里有一些代码可以做你想做的事情,包括一个测试。如果您想强制模拟线程失败,请使用定义为1
的INTRODUCE_FAILURE
重新编译
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
#include <memory>
#include <atomic>
#include <system_error>
#include <condition_variable>
#define INTRODUCE_FAILURE 0
// implementation
void ParallelCall(void (*function)(int, int), int numThreads)
{
std::vector<std::thread> threads;
threads.reserve(numThreads-1);
std::atomic<bool> mustAbort ( false );
std::atomic<bool> mayRun ( false );
std::mutex conditionMutex;
std::condition_variable runCondition;
for(int i = 1; i < numThreads; ++ i) {
try {
#if INTRODUCE_FAILURE == 1
if (i == 3) {
throw std::system_error(99, std::generic_category(), "the test deliberately failed a thread");
}
#endif
threads.emplace_back( std::thread{ [i, numThreads, function
, &mustAbort
, &conditionMutex
, &runCondition
, &mayRun]()->int {
std::unique_lock<std::mutex> myLock(conditionMutex);
runCondition.wait(myLock, [&mayRun]()->bool {
return mayRun;
});
myLock.unlock();
// wait for permission
if (!mustAbort) {
function(i, numThreads);
}
return 0;
}} );
}
catch(std::exception& e) { // will be a std::system_error
mustAbort = true;
std::unique_lock<std::mutex> myLock(conditionMutex);
mayRun = true;
conditionMutex.unlock();
runCondition.notify_all();
for(auto& t : threads) {
t.join();
}
throw;
}
}
std::unique_lock<std::mutex> myLock(conditionMutex);
mayRun = true;
conditionMutex.unlock();
runCondition.notify_all();
function(0, numThreads);
// use the calling thread as thread 0
for(auto& t : threads) {
t.join();
}
}
// test
using namespace std;
void testFunc(int index, int extent) {
static std::mutex outputMutex;
unique_lock<mutex> myLock(outputMutex);
cout << "Executing " << index << " of " << extent << endl;
myLock.unlock();
this_thread::sleep_for( chrono::milliseconds(2000) );
myLock.lock();
cout << "Finishing " << index << " of " << extent << endl;
myLock.unlock();
}
int main()
{
try {
cout << "initiating parallel call" << endl;
ParallelCall(testFunc, 10);
cout << "parallel call complete" << endl;
}
catch(std::exception& e) {
cout << "Parallel call failed because: " << e.what() << endl;
}
return 0;
}
成功输出示例:
Compiling the source code....
$g++ -std=c++11 main.cpp -o demo -lm -pthread -lgmpxx -lgmp -lreadline 2>&1
Executing the program....
$demo
initiating parallel call
Executing 0 of 10
Executing 1 of 10
Executing 4 of 10
Executing 5 of 10
Executing 8 of 10
Executing 2 of 10
Executing 7 of 10
Executing 6 of 10
Executing 9 of 10
Executing 3 of 10
Finishing 1 of 10
Finishing 5 of 10
Finishing 2 of 10
Finishing 9 of 10
Finishing 8 of 10
Finishing 4 of 10
Finishing 3 of 10
Finishing 0 of 10
Finishing 6 of 10
Finishing 7 of 10
parallel call complete
故障输出示例:
Compiling the source code....
$g++ -std=c++11 main.cpp -o demo -lm -pthread -lgmpxx -lgmp -lreadline 2>&1
Executing the program....
$demo
initiating parallel call
Parallel call failed because: the test deliberately failed a thread: Cannot assign requested address
最后是一个恳求——不要把你的图书馆放在世界上。std::线程库非常全面,如果这还不够的话,我们有OpenMP、TBB等。
让已经创建的线程在退出其threadproc之前完成丢失的工作以获得帮助如何?
List _StillBornWork;
void ParallelCall(void (*function)(int, int), int numThreads)
{
Thread *threads = new Thread[numThreads - 1];
for(int i = 1; i < numThreads; ++ i) {
if(threads[i - 1].start(&function, i, numThreads)) {
_StillBornWork.Push(i);
}
}
(*function)(0, numThreads);
// use the calling thread as thread 0
for(int i = 1; i < numThreads; ++ i)
threads[i - 1].join();
delete[] threads;
}
ThreadProc(int i) {
while(1) {
do work
// Here we see if there was any work that didn't get done because its thread
// was stilborn. In your case, the work is indicated by the integer i.
// If we get work, loop again, else break.
if (!_StillBornWork.Pop(&i))
break; // no more work that wasn't done.
}
}
- 获取日期异步信号安全吗?如果在信号处理程序中使用,它会导致死锁吗
- 如何在没有死锁和/或争用的情况下正确使用 std::mutex C++?
- 用C++中的std::condition_variable将线程置于死锁中会有风险吗
- 使用 std::async 时死锁,将来作为成员
- 如何调试读写器锁的死锁?
- 为什么在Visual Studio 2013上的std::this_thread::sleep_for上死锁
- localtime() 函数正在调用 ___lll_lock_wait_private(),这会使线程陷入死锁
- 如何重现 Boost 进程文档提示的死锁?
- 多线程Windows GUI应用程序中的死锁
- 为什么printf会导致与future.get的死锁,而cout则不会?
- C++中具有阻塞队列和障碍的死锁
- 死锁使用 std::mutex 来保护多个线程中的 cout
- 避免并发等待对象中的死锁
- 在VC++中从DLLMAIN内部调用D3D的CREATEDEVICE时,它会创建一个死锁(loaderlock?)。有没有办法克服这个问题?最终目标内
- 当用2个螺纹锁定时,将recursive_mutex死锁
- 程序在 C++11 中使用条件变量进入死锁
- 一个线程提升的死锁
- 单个生产者/多个消费者死锁
- 当被调用方法使用调用方已锁定的同一锁时,如何避免死锁
- 避免多线程应用程序中潜在的死锁/内存泄漏