线程过程中的线程同步问题

Thread synchronization problem in thread's procedure

本文关键字:线程 问题 同步 过程中      更新时间:2023-10-16

我有一个问题。我将对象添加到映射中,并在线程中为映射中的所有元素调用 run(( 过程。 我正确理解在此代码中,进程过程中存在同步问题。我可以添加互斥锁吗?鉴于此过程是在线程中调用的?

class Network {
public:
Network() {
std::cout << "Network constructor" << std::endl;
}
void NetworkInit(const std::string& par1) {
this->par1 = par1;
}
~Network() {
std::cout << "Network destructor" << std::endl;
my_map.clear();
}
void addLogic(uint32_t Id, std::shared_ptr<Logic> lgc) {
std::lock_guard<std::mutex> lk(mutex);
my_map.insert(std::pair<uint32_t, std::shared_ptr<Logic>>(Id, lgc));
cv.notify_one();
}
void removeLogic(uint32_t Id) {
std::unique_lock<std::mutex> lk(mutex);
cv.wait(lk, [this]{return !my_map.empty(); });
auto p = this->my_map.find(roomId);
if (p != end(this->my_map)) {
this->my_map.erase(roomId);
}
lk.unlock();
}
/**
* Start thread
*/
void StartThread(int id = 1) {
running = true;
first = std::thread([this, id] { process(id); });
first.detach();
}
/**
* Stop thread
*/
void StopThread() {
running = false;
}
private:
std::thread first;
std::atomic<bool> running = ATOMIC_VAR_INIT(true);
void process(int id) {
while (running) {
for (const auto& it:my_map) {
it.second->run();
}
std::this_thread::sleep_for(10ms);
}
}    
private:
std::mutex mutex;
std::condition_variable cv;
using MyMapType = std::map<uint32_t, std::shared_ptr<Logic> >; 
MyMapType my_map;
std::string par1;
};

第一个想法是使用在run期间释放的互斥锁来保护整个map。 这适用于addLogic,因为插入map不会使迭代器失效,但不适用于可能使process使用的迭代器值无效的deleteLogic

更高效、无锁的方法(如危险指针(可能适用于此处,但基本思想是使用延迟删除列表。 假设并发删除的目的是取消任务(而不仅仅是在所有工作完成后进行清理(,那么在执行之前立即检查使用者线程是明智的。 使用set(对应于您的map(将使删除列表是动态的,并且这些检查是有效的。

因此,请再mutex保护删除列表,并在每次迭代开始时将其用于process

void addLogic(uint32_t Id, std::shared_ptr<Logic> lgc) {
std::lock_guard<std::mutex> lk(mutex);
my_map.insert(std::pair<uint32_t, std::shared_ptr<Logic>>(Id, lgc));
}
void removeLogic(uint32_t Id) {
std::lock_guard<std::mutex> kg(kill_mutex);
kill.insert(Id);
}
private:
std::set<uint32_t> kill;
std::mutex mutex,kill_mutex;
void process(int id) {
for(;running;std::this_thread::sleep_for(10ms)) {
std::unique_lock<std::mutex> lg(mutex);
for(auto i=my_map.begin(),e=my_map.end();i!=e;) {
if(std::lock_guard<std::mutex>(kill_mutex),kill.erase(i->first)) {
i=my_map.erase(i);
continue;    // test i!=e again
}
lg.unlock();
i->second->run();
lg.lock();
++i;
}
}
}

此代码省略了您的condition_variable用法:在排队删除某些内容之前无需等待。

具有低级并发基元的解决方案通常无法扩展且不易于维护。

更好的选择是使用映射更新或工作线程终止指令的线程安全"控制"队列。

像这样:

enum Op {
ADD,
DROP,
STOP
};
struct Request {
Op op;
uint32_t id;
std::function<void()> action;
};
...
// the map which required protection in your code
std::map<uint32_t, std::function<void()>> subs;
// requests queue and its mutex (not very optimal, just to demonstrate the idea)
std::vector<Request> requests;
std::mutex mutex;
// the worker thread
std::thread worker([&](){
// the temporary buffer where requests are drained to from the queue before processing
decltype(requests) buffer;
// the main loop
while (true) {
// requests collection (requires synchronization)
{
std::lock_guard<decltype(mutex)> const guard {mutex};
buffer.swap(requests);
}
// requests processing
for(auto&& request: buffer) {
switch (request.op) {
case ADD:
subs[request.id] = std::move(request.action);
break;
case DROP:
subs.erase(request.id);
break;
case STOP: goto endloop;
}
}
// map iteration
for (auto&& entry: subs) {
entry.second();
}
}
endloop:;
});