多线程应用程序中的零MQ处理中断
ZeroMQ handling interrupt in multithreaded application
多线程环境中 ZeroMQ 中的优雅退出
规格 : ubuntu 16.04 与 c++11,libzmq : 4.2.3
示例代码
static int s_interrupted = 0;
static void s_signal_handler (int signal_value)
{
s_interrupted = 1;
//some code which will tell main thread to exit
}
static void s_catch_signals (void)
{
struct sigaction action;
action.sa_handler = s_signal_handler;
action.sa_flags = 0;
sigemptyset (&action.sa_mask);
sigaction (SIGINT, &action, NULL);
sigaction (SIGTERM, &action, NULL);
}
static void Thread((zsock_t *pipe, void *)
{
zmq::context_t context(1);
zmq::socket_t requester1(context,ZMQ_DEALER);
zmq::socket_t requester2(context,ZMQ_DEALER);
requester1.connect(address1);
requester2.connect(address2);
zmq_pollitem_t items []=
{{requester1,0,ZMQ_POLLIN,0},
{requester2,0,ZMQ_POLLIN,0}};
while(true)
{
zmq::message_t message;
zmq::poll (items, 2, -1);
if (items [0].revents & ZMQ_POLLIN)
{
requester1.recv(&message);
}
if (items [1].revents & ZMQ_POLLIN)
{
requester2.recv(&message);
}
}
}
int main()
{
.
//some code
.
zactor_t *actor = zactor_new (Threaded, nullptr);
s_catch_signals();
.
//continue
.
//wait till thread finishes to exit
return 0;
}
现在,当中断发生时,它将从主线程调用信号处理程序。我不知何故需要告诉线程(轮询器(退出信号处理程序。任何想法如何实现这一目标?
从 ZMQ 文档中,您有 2 种"惯用"方法来解决这个问题:
- 在管道上
轮询,并在信号处理程序中的管道上写入。
捕获发送信号时在 recv 中引发的异常。
测试后,似乎 zmq::p oll 不会在 SIGINT 上抛出异常。因此,解决方案似乎是使用专用于关闭的套接字。解决方案如下所示:
#include <iostream>
#include <thread>
#include <signal.h>
#include <zmq.hpp>
zmq::context_t* ctx;
static void s_signal_handler (int signal_value)
{
std::cout << "Signal received" << std::endl;
zmq::socket_t stop_socket(*ctx, ZMQ_PAIR);
stop_socket.connect("inproc://stop_address");
zmq::message_t msg("0", 1);
stop_socket.send(msg);
std::cout << "end sighandler" << std::endl;
}
static void s_catch_signals (void)
{
struct sigaction action;
action.sa_handler = s_signal_handler;
action.sa_flags = 0;
sigemptyset (&action.sa_mask);
sigaction (SIGINT, &action, NULL);
sigaction (SIGTERM, &action, NULL);
}
void thread(void)
{
std::cout << "Thread Begin" << std::endl;
zmq::context_t context (1);
ctx = &context;
zmq::socket_t requester1(context,ZMQ_DEALER);
zmq::socket_t requester2(context,ZMQ_DEALER);
zmq::socket_t stop_socket(context, ZMQ_PAIR);
requester1.connect("tcp://127.0.0.1:36483");
requester2.connect("tcp://127.0.0.1:36483");
stop_socket.bind("inproc://stop_address");
zmq_pollitem_t items []=
{
{requester1,0,ZMQ_POLLIN,0},
{requester2,0,ZMQ_POLLIN,0},
{stop_socket,0,ZMQ_POLLIN,0}
};
while ( true )
{
// Blocking read will throw on a signal
int rc = 0;
std::cout << "Polling" << std::endl;
rc = zmq::poll (items, 3, -1);
zmq::message_t message;
if(rc > 0)
{
if (items [0].revents & ZMQ_POLLIN)
{
requester1.recv(&message);
}
if (items [1].revents & ZMQ_POLLIN)
{
requester2.recv(&message);
}
if(items [2].revents & ZMQ_POLLIN)
{
std::cout << "message stop received " << std::endl;
break;
}
}
}
requester1.setsockopt(ZMQ_LINGER, 0);
requester2.setsockopt(ZMQ_LINGER, 0);
stop_socket.setsockopt(ZMQ_LINGER, 0);
requester1.close();
requester2.close();
stop_socket.close();
std::cout << "Thread end" << std::endl;
}
int main(void)
{
std::cout << "Begin" << std::endl;
s_catch_signals ();
zmq::context_t context (1);
zmq::socket_t router(context,ZMQ_ROUTER);
router.bind("tcp://127.0.0.1:36483");
std::thread t(&thread);
t.join();
std::cout << "end join" << std::endl;
}
请注意,如果您不想将上下文共享给信号处理程序,则可以使用"ipc://..."。
如果你想在处理信号时保留ZMQ的Actor模型的感觉,你可以在Linux上使用signalfd接口:signalfdmanpage。这样,您就可以使用 zmq 轮询来等待信号传递,而不是使用信号处理程序。
它的另一个优点是,在处理通过文件描述符传递的信号时,您可以调用任何您喜欢的函数,因为您正在同步处理它,而不是异步处理它。
相关文章:
- 警告处理为错误这里有什么问题
- 在C#中处理C++指针而不使用unsafe的最佳方法
- 处理多个异常集合的C++方法
- 找不到成员对象:没有名为get_event()的成员,也处理多态性和向量
- 使用流处理接收到的数据
- 获取日期异步信号安全吗?如果在信号处理程序中使用,它会导致死锁吗
- 处理小于cpu数据总线的数据类型.(c++转换为机器代码)
- 基于多个条件处理地图中的所有元素
- 如何用数字处理log(0)
- SSL上的`curl_easy_send`和`curl_asy_recv`:如何处理`CURLE_AGAIN`
- 错误处理.将系统错误代码映射到泛型
- 从文本文件中读取时钟时间和事件时间并进行处理
- 在运行时处理类型擦除的数据-如何不重新发明轮子
- 在for循环中使用auto vs decltype(vec.size())来处理字符串的向量
- 用于矢量处理的多个线程
- 对字符串进行排序时,在c++中处理sort()
- 如何处理linux终端中带有负号(-)的C++中的命令行参数
- 处理除以零会导致<csignal>意外行为
- 多线程应用程序中的零MQ处理中断
- 如何设置ZERO-MQ架构来处理不同速度的工人