跟进:提升通过 ZeroMQ 拉套接字传递的序列化自定义C++对象

Follow up: Boost serialized custom C++ object passed over ZeroMQ pull socket

本文关键字:序列化 自定义 对象 C++ 套接字 ZeroMQ 跟进      更新时间:2023-10-16
这是我

之前在 Boost 的另一个线程中打开的后续问题:反序列化通过 ZeroMQ 拉套接字传递的自定义C++对象。该线程中的问题已根据提供的答案得到解决。现在我在运行时遇到了另一个问题。请参阅下面的说明。我对这个领域真的很陌生C++所以如果你告诉我除了我在问题陈述下描述的内容之外,还需要改进所提供的代码的任何部分,我将不胜感激。

描述:

我有一个名为 GenericMessage 的C++类,它只保存一个 id 和数据作为其成员(请参阅下面的代码片段 2 - GenericMessage.hxx)。我的目的是序列化此类的实例,并通过实现推送模式的 ZeroMQ 套接字发送它。

序列化和发送任务已在类 ZMQHandler 中实现(请参阅 sendToBE 函数),该类放置在标题文件名 ZMQHandler.hxx 中,如下面的代码片段 3 所示。此类由 TestFE.cxx 实例化,如下面的第 4 个代码片段所示。

GenericMessage 实例的接收和反序列化在下面的第 5 个代码片段中提供的 TestBE.cxx 中实现。我的目的是通过 ZMQ 套接字(即拉套接字)接收 GenericMessage 实例,对其进行反序列化,然后将其成员打印到标准输出。

我验证了通过 ZeroMQ 套接字传输的 seriazalition 和 GenericMessage 对象工作正常。反序列化似乎也可以工作,因为我没有得到任何异常或分段错误。

问题陈述:

TestBE.cxx中的代码(参见代码片段 5)的预期是通过 ZeroMQ 套接字接收 GenericMessage 对象,对其进行反序列化,然后打印其两个成员,即 id 和数据,在本例中是字符串对象。更准确地说,它应该首先打印它获取的字符流的内容,然后打印反序列化对象的成员。相反,它根本不打印这些成员。Al,它将包括问号在内的奇怪符号放入接收的字符流中。请参阅下面的第一个代码片段,您将了解我的观点。

问题:

i) 为什么我无法获得预期的输出?为什么我在输出中看到标记为奇怪符号的问题?为什么我没有看到打印的 id 和数据字段,尽管它们在打印的字符流中可见?

ii) GenericMessage 类中的数据字段是一个模板类型,它设置为 std::string 以进行测试。但是,在实际使用中,我计划以序列化形式传输更复杂的对象。在这方面,你认为使用类 boost::archive::text_iarchive 和 boost::archive::text_oarchive 有用吗?我应该改用二进制吗?如果是这样,您是否认为我应该注意一些陷阱/可能的问题?提前谢谢。

片段 1:程序输出与预期输出

  *******************
  The EXPECTED OUTPUT
  *******************
  Connecting to FE...
  CHAR [22 serialization::archive 9 0 1 0
  0 1 12 Hello there!]
  ID: 1
  Data: Hello there!
  CHAR [22 serialization::archive 9 0 1 0
  0 2 12 Hello there!]
  ID: 2
  Data: Hello there!
  CHAR [22 serialization::archive 9 0 1 0
  0 3 12 Hello there!]
  ID: 3
  Data: Hello there!
  ......

  *************************
  PRINTED OUTPUT IN REALITY
  *************************
  Connecting to FE...
  CHAR [22 serialization::archive 9 0 1 0
  0 1 12 Hello there!]
  ID: 1
  Data: Hello there!
  //continues in increasing order same as above until the 18th message in the following
  CHAR [22 serialization::archive 9 0 1 0
  0 18 12 Hello there!]
  ID: 0
  Data: 
  //!!!!AFTER the 18th message I got question marks in the printed char stream!!!!!
  CHAR [22 serialization::archive 9 0 1 0
  0 19 12 Hello there!���]
  ID: 0
  Data: 
  CHAR [22 serialization::archive 9 0 1 0
  0 20 12 Hello there!���]
  ID: 0
  Data: 

代码片段 2 (GenericMessage.hxx)

  #include <iostream>
  #include <string>
  #include <sstream>
  #include <boost/serialization/serialization.hpp>
  #include <boost/archive/binary_oarchive.hpp>
  #include <boost/archive/binary_iarchive.hpp>
  #include <boost/archive/text_oarchive.hpp>
  #include <boost/archive/text_iarchive.hpp>
  template <class T>
  class GenericMessage {
  public:
    GenericMessage(): 
      beId(-1)
    {}
    GenericMessage(int id, T msg): 
       beId(id), 
       data(msg)
    {}
    ~GenericMessage(){}
    T getData()
    {
      return data;
    }

    std::string toString()
    {
       std::ostringstream ss;
       ss << getBeId();
       std::string ret =  ss.str();
      return ("ID: " + ret + " DATA: " + getData());
    }
    void setBeId(int id)
    {
      beId = id;
    }
    int getBeId()
    {
      return beId;
    }

  private:
    friend class boost::serialization::access;
    int beId;
    T data;

    template <class Archive>
    void serialize(Archive & ar, const unsigned int version)
    {
        ar & beId;
        ar & data;
    }
   };

代码片段 3 (ZmqHandler.hxx)

   #include "zmq.hpp"
   #include "GenericMessage.hxx"
   #include <unistd.h>
   #include <cassert>
   template <class A>
   class ZmqHandler {
      public:
         ZmqHandler():
     mContext(1),
     mOutbHandlerSocket(mContext, ZMQ_PUSH)
         {    
             mOutbHandlerSocket.bind ("tcp://*:5555");       
         }

         ~ZmqHandler() {}
         void sendToBE(GenericMessage<A> *theMsg)
         {
            std::ostringstream archive_stream;
            boost::archive::text_oarchive archive(archive_stream);
            try
            {
                archive << theMsg;
            } catch (boost::archive::archive_exception& ex) {
                std::cout << "Archive Exception during deserializing:" << std::endl;
                std::cout << ex.what() << std::endl;           
            } catch (int e) {
                std::cout << "EXCEPTION " << e << std::endl; 
            }
           std::string outbound_data_ = archive_stream.str();
           // no need to use the c-style string function 'strlen'
           int len = outbound_data_.length();
           zmq::message_t msgToSend(len);
           memcpy( msgToSend.data(), outbound_data_.data(), len );
           mOutbHandlerSocket.send(msgToSend);
           std::cout << "SENT request: [" << theMsg->toString() << "]" << std::endl;
           std::cout << "LENGTH [" << len << "]" << std::endl;
         }   
        private:  
          zmq::context_t mContext;
          zmq::socket_t mOutbHandlerSocket;         
     };

代码片段 4 (TestFE.cxx)

       #include "ZmqHandler.hxx"
       int main ()
       {
            ZmqHandler<std::string> zmqHandler;
            int counter = 1;
            while(1)
            {  
                std::string data = "Hello there!";
                GenericMessage<std::string> msg(counter, data);
                zmqHandler.sendToBE(&msg);
                counter++;
                sleep(1);
             }
             return 0;
        }

代码片段 5 (TestBE.cxx)

       #include "zmq.hpp"
       #include "GenericMessage.hxx"
       #include <fstream>
       int main ()
       {
          //  Prepare our context and socket
          zmq::context_t context (1);
          zmq::socket_t socket (context, ZMQ_PULL);
         std::cout << "Connecting to FE..." << std::endl;
         socket.connect ("tcp://localhost:5555");
         while(1){
              zmq::message_t reply;
              socket.recv (&reply);
              const char *buf = static_cast<const char*>(reply.data());
              std::cout << "CHAR [" << buf << "]" << std::endl;
              std::string input_data_( buf, reply.size() ); 
              std::istringstream archive_stream(input_data_);
              boost::archive::text_iarchive archive(archive_stream);
              GenericMessage<std::string> theMsg;
              try
              {
                 archive >> theMsg;
              } catch (boost::archive::archive_exception& ex) {
                 std::cout << "Archive Exception during deserializing:" << std::endl;
                 std::cout << ex.what() << std::endl;           
              } catch (int e) {
                 std::cout << "EXCEPTION " << e << std::endl; 
              }
              std::cout << "ID: " << theMsg.getBeId() << std::endl;
              std::cout << "Data: " << theMsg.getData() << std::endl;
           }
            return 0;
         }

当我在我的系统上构建和运行您的代码时,TestBE确实会抛出反序列化异常(每次)。 这是我为修复它所做的:

ZmqHandler类中,将方法void sendToBE(GenericMessage<A> *theMsg)更改为void sendToBE(GenericMessage<A> theMsg)。 如果需要,可以使用const&,但您可能不想在此处使用指针。 在相同的方法中,您需要将theMsg->XXX更改为 theMsg.XXX ,因为theMsg不再是指针。

TestFEzmqHandler.sendToBE(&msg);变得zmqHandler.sendToBE(msg);

如果theMsg必须是指针

ZmqHandler中,只需将行archive << theMsg更改为archive << *theMsg即可。 这样,存档的operator<<正在使用对象,而不是指向对象的指针。 代码的其余部分可以保持不变。