如何在没有死锁和/或争用的情况下正确使用 std::mutex C++?

How do I correctly use std::mutex in C++ without deadlocks and/or races?

本文关键字:std C++ mutex 情况下 死锁 争用      更新时间:2023-10-16

我正在尝试调试一个我试图并行运行的程序。我不知道为什么当我尝试在C++中编译和运行代码时,我同时会遇到死锁和竞争条件。以下是我到目前为止编写的所有相关代码。

// define job struct here
// define mutex, condition variable, deque, and atomic here
std::deque<job> jobList;
std::mutex jobMutex;
std::condition_variable jobCondition;
std::atomic<int> numberThreadsRunning;
void addJobs(...insert parameters here...)
{
job current = {...insert parameters here...};
jobMutex.lock();
std::cout << "We have successfully acquired the mutex." << std::endl;
jobList.push_back(current);
jobCondition.notify_one();
jobMutex.unlock();
std::cout << "We have successfully unlocked the mutex." << std::endl;
}
void work(void) {
job* current;
numberThreadsRunning++;
while (true) {
std::unique_lock<std::mutex> lock(jobMutex);
if (jobList.empty()) {
numberThreadsRunning--;
jobCondition.wait(lock);
numberThreadsRunning++;
}
current = &jobList.at(0);
jobList.pop_front();
jobMutex.unlock();
std::cout << "We are now going to start a job." << std::endl;
////Call an expensive function for the current job that we want to run in parallel.
////This could either complete the job, or spawn more jobs, by calling addJobs. 
////This recursive behavior typically results in there being thousands of jobs.
std::cout << "We have successfully completed a job." << std::endl;
}
numberThreadsRunning--;
std::cout << "There are now " << numberThreadsRunning << " threads running." << std::endl;
}
int main( int argc, char *argv[] ) {
//Initialize everything and add first job to the deque.
std::thread jobThreads[n]
for (int i = 0; i < n; i++) {
jobThreads[i] = std::thread(work);
}
for (int i = 0; i < n; i++) {
jobThreads[i].join();
}
}

代码可以编译,但根据随机因素,它要么在最后死锁,要么在队列仍然很大的时候在中间出现分段错误。有谁知道更多为什么会发生这种情况?

编辑: 我已经编辑了这个问题,以包括额外的信息和更完整的示例。虽然我当然不想让你厌烦我实际拥有的数千行代码(图像渲染包(,但我相信这个例子更好地代表了我面临的问题类型。Alan Birtles在回答中给出的例子仅适用于非常简单的工作结构,但功能非常简单。在实际的作业结构中,有多个指向不同向量和矩阵的指针,因此我们需要指向作业结构的指针,否则编译器将无法编译,因为构造函数被"隐式删除"。

我相信我面临的错误与我锁定和解锁线程的方式有关。我知道指针也会导致一些问题,但它们可能必须保留。函数thisFunction()表示需要并行运行的函数。

#include <queue>
#include <deque>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <iostream>
#include <cmath>
struct job {
std::vector<std::vector<int>>   &matrix;
int num;
};
bool closed = false;
std::deque<job> jobList;
std::mutex jobMutex;
std::condition_variable jobCondition;
std::atomic<int> numberThreadsRunning;
std::atomic<int> numJobs;
struct tcout
{
tcout() :lock(mutex) {}
template < typename T >
tcout& operator<< (T&& t)
{
std::cout << t;
return *this;
}
static std::mutex mutex;
std::unique_lock< std::mutex > lock;
};
std::mutex tcout::mutex;

std::vector<std::vector<int>> multiply4x4(
std::vector<std::vector<int>> &A,
std::vector<std::vector<int>> &B) {
//Only deals with 4x4 matrices
std::vector<std::vector<int>> C(4, std::vector<int>(4, 0));
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 4; j++) {
for (int k = 0; k < 4; k++) {
C.at(i).at(j) = C.at(i).at(j) + A.at(i).at(k) * B.at(k).at(j);
}
}
}
return C;
}
void addJobs()
{
numJobs++;
std::vector<std::vector<int>> matrix(4, std::vector<int>(4, -1)); //Create random 4x4 matrix
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 4; j++) {
matrix.at(i).at(j) = rand() % 10 + 1;
}
}
job current = { matrix, numJobs };
std::unique_lock<std::mutex> lock(jobMutex);
std::cout << "The matrix for job " << current.num << " is: n";
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 4; j++) {
std::cout << matrix.at(i).at(j) << "t";
}
std::cout << "n";
}
jobList.push_back(current);
jobCondition.notify_one();
lock.unlock();
}
void thisFunction(std::vector<std::vector<int>> &matrix, int num)
{
std::this_thread::sleep_for(std::chrono::milliseconds(rand() * 500 / RAND_MAX));
std::vector<std::vector<int>> product = matrix;
std::unique_lock<std::mutex> lk(jobMutex);
std::cout << "The imported matrix for job " << num << " is: n";
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 4; j++) {
std::cout << product.at(i).at(j) << "t";
}
std::cout << "n";
}
lk.unlock();
int power;
if (num % 2 == 1) {
power = 3;
} else if (num % 2 == 0) {
power = 2;
addJobs();
}
for (int k = 1; k < power; k++) {
product = multiply4x4(product, matrix);
}
std::unique_lock<std::mutex> lock(jobMutex);
std::cout << "The matrix for job " << num << " to the power of " << power << " is: n";
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 4; j++) {
std::cout << product.at(i).at(j) << "t";
}
std::cout << "n";
}
lock.unlock();
}
void work(void) {
job *current;
numberThreadsRunning++;
while (true) {
std::unique_lock<std::mutex> lock(jobMutex);
if (jobList.empty()) {
numberThreadsRunning--;
jobCondition.wait(lock, [] {return !jobList.empty() || closed; });
numberThreadsRunning++;
}
if (jobList.empty())
{
break;
}
current = &jobList.front();
job newcurrent = {current->matrix, current->num};
current = &newcurrent;
jobList.pop_front();
lock.unlock();
thisFunction(current->matrix, current->num);
tcout() << "job " << current->num << " completen";
}
numberThreadsRunning--;
}

int main(int argc, char *argv[]) {
const size_t n = 1;
numJobs = 0;
std::thread jobThreads[n];
std::vector<int> buffer;
for (int i = 0; i < n; i++) {
jobThreads[i] = std::thread(work);
}
for (int i = 0; i < 100; i++)
{
addJobs();
}
{
std::unique_lock<std::mutex> lock(jobMutex);
closed = true;
jobCondition.notify_all();
}
for (int i = 0; i < n; i++) {
jobThreads[i].join();
}
}

这是一个完全工作的例子:

#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <iostream>
struct job { int num; };
bool closed = false;
std::deque<job> jobList;
std::mutex jobMutex;
std::condition_variable jobCondition;
std::atomic<int> numberThreadsRunning;
struct tcout
{
tcout() :lock(mutex) {}
template < typename T >
tcout& operator<< (T&& t)
{
std::cout << t;
return *this;
}
static std::mutex mutex;
std::unique_lock< std::mutex > lock;
};
std::mutex tcout::mutex;
void addJobs()
{
static int num = 0;
job current = { num++ };
std::unique_lock<std::mutex> lock(jobMutex);
jobList.push_back(current);
jobCondition.notify_one();
lock.unlock();
}
void work(void) {
job current;
numberThreadsRunning++;
while (true) {
std::unique_lock<std::mutex> lock(jobMutex);
if (jobList.empty()) {
numberThreadsRunning--;
jobCondition.wait(lock, [] {return !jobList.empty() || closed; });
numberThreadsRunning++;
}
if (jobList.empty())
{
break;
}
current = jobList.front();
jobList.pop_front();
lock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(rand() * 500 / RAND_MAX));
tcout() << "job " << current.num << " completen";
}
numberThreadsRunning--;
}
int main(int argc, char *argv[]) {
const size_t n = 4;
std::thread jobThreads[n];
for (int i = 0; i < n; i++) {
jobThreads[i] = std::thread(work);
}
for (int i = 0; i < 100; i++)
{
addJobs();
}
{
std::unique_lock<std::mutex> lock(jobMutex);
closed = true;
jobCondition.notify_all();
}
for (int i = 0; i < n; i++) {
jobThreads[i].join();
}
}

我进行了以下更改:

  1. 切勿在std::mutex上调用lock()unlock(),始终使用std::unique_lock(或类似类(。您work()调用jobMutex.unlock(),以获取您与std::unique_lock锁定的互斥锁,然后std::unique_lock第二次调用解锁,导致未定义的行为。如果addJobs抛出异常,那么由于您根本没有使用std::unique_lock互斥锁将保持锁定状态。
  2. 您需要对jobCondition.wait使用谓词,否则虚假唤醒可能会导致等待返回,而jobList仍然为空。
  3. 我添加了一个closed变量,以便在没有更多工作要做时使程序退出
  4. 我添加了job的定义
  5. work中,您将指针指向队列中的某个项目,然后将其从队列中弹出,因为该项目不再存在,指针将悬空。您需要在弹出队列之前复制该项目。如果要避免复制,请使job结构可移动,或者更改队列以存储std::unique_ptr<job>std::shared_ptr<job>
  6. 我还添加了std::cout的线程安全版本,这不是绝对必要的,但会阻止您的输出行相互重叠。理想情况下,您应该使用适当的线程安全日志记录库,因为每次打印锁定互斥锁是昂贵的,如果您有足够的打印,这将使您的程序几乎是单线程的

job* current;替换为job current;,然后current = jobList.at(0);。否则,您最终会得到一个指向jobList元素的指针,该元素在jobList.pop_front()之后不存在。

if (jobList.empty())替换为while(jobList.empty())以处理虚假唤醒。