存储和检索大量小型非结构化消息的最快方法

Fastest way to store and retrieve a large stream of small unstructured messages

本文关键字:消息 方法 结构化 检索 小型 存储      更新时间:2024-05-10

我正在开发一个IOT应用程序,该应用程序要求我处理许多小型非结构化消息(这意味着它们的字段可能会随着时间的推移而变化——有些字段可能出现,有些字段可能消失)。这些消息通常有2到15个字段,这些字段的值属于基本数据类型(int/long、字符串、布尔值)。这些消息非常适合JSON数据格式(或msgpack)。

按照消息到达的顺序处理消息是至关重要的(要明白:它们需要由单个线程处理——没有办法并行化这一部分)。我有自己的实时处理这些消息的逻辑(吞吐量相对较小,每秒最多几十万条消息),但引擎越来越需要能够通过回放消息历史来模拟/回放以前的时段。虽然最初并不是为了这个目的而编写的,但如果我能够以足够的速度向我的事件处理引擎提供历史数据,它(用Go编写)每秒可以很好地处理几十条(可能只有数亿条)消息。

这正是问题所在。我已经在很长一段时间(几年)内存储了许多(数千亿)这样的消息,现在是以分隔的消息包格式存储的(https://github.com/msgpack/msgpack-python#streaming-拆包)。在这个设置和其他设置中(见下文),我能够基准测试大约2M条消息/秒的峰值解析速度(在2019款Macbook Pro上,仅用于解析),这远未使磁盘IO饱和。

即使不讨论IO,也要执行以下操作:

import json
message = {
'meta1': "measurement",
'location': "NYC",
'time': "20200101",
'value1': 1.0,
'value2': 2.0,
'value3': 3.0,
'value4': 4.0
}
json_message = json.dumps(message)
%%timeit
json.loads(json_message)

给了我3微秒/条消息的解析时间,略高于每秒300k条消息。与ujson、rapidjson和orjson相比,而不是标准库的json模块,我能够获得1微秒/条消息的峰值速度(使用ujson),即大约1M条消息/秒。

Msgpack稍微好一点:

import msgpack
message = {
'meta1': "measurement",
'location': "NYC",
'time': "20200101",
'value1': 1.0,
'value2': 2.0,
'value3': 3.0,
'value4': 4.0
}
msgpack_message = msgpack.packb(message)
%%timeit
msgpack.unpackb(msgpack_message)

给我大约750ns/条消息(大约100ns/字段)的处理时间,即大约130万条消息/秒。我最初认为C++可以更快。下面是一个使用nlohmann/json的示例,尽管这与msgpack:无法直接比较

#include <iostream>
#include "json.hpp"
using json = nlohmann::json;
const std::string message = "{"value": "hello"}";
int main() {
auto jsonMessage = json::parse(message);
for(size_t i=0; i<1000000; ++i) {
jsonMessage = json::parse(message);
}
std::cout << jsonMessage["value"] << std::endl; // To avoid having the compiler optimize the loop away. 
};

使用clang 11.0.3(std=c++17,-O3)编译,在同一台Macbook上运行约1.4s,也就是说,解析速度为约700k条消息/秒,消息甚至比Python示例更小。我知道nlohmann/json可能非常慢,并且能够使用simdjson的DOMneneneba API获得大约2M消息/秒的解析速度。

对于我的用例来说,这仍然太慢了。我愿意接受所有关于提高Python、C++、Java(或任何JVM语言)或Go中潜在应用程序的消息解析速度的建议。

注:

  • 我不一定关心磁盘上消息的大小(如果您建议的存储方法是内存高效的,则认为这是一个优点)
  • 我所需要的只是一个基本数据类型的键值模型——我不需要嵌套的字典或列表
  • 转换现有数据根本不是问题。我只是在寻找一些阅读优化的东西
  • 我不一定需要将整个东西解析成一个结构或自定义对象,只需要在需要时访问其中的一些字段(我通常需要每条消息的一小部分字段)——如果这会带来惩罚,那也没关系,只要惩罚不会破坏整个应用程序的吞吐量
  • 我对自定义/稍微不安全的解决方案持开放态度
  • 我选择使用的任何格式都需要自然分隔,因为消息将串行写入一个文件(我目前每天使用一个文件,这对于我的用例来说已经足够了)。过去,我曾遇到过分隔消息不正确的问题(请参阅JavaProtobuf API中的writeDelimitedTo-丢失一个字节,整个文件就会被破坏)

我已经探索过的东西:

  • JSON:用rapidjson、simdjson、nlohmann/JSON等进行了实验。)
  • 带有分隔的msgpack的平面文件(请参阅此API:https://github.com/msgpack/msgpack-python#streaming-开箱):我目前用来存储消息的东西
  • 协议缓冲区:速度稍快,但并不真正适应数据的非结构化性质

谢谢!!

我假设消息只包含一些基本类型的命名属性(在运行时定义),这些基本类型例如是字符串、整数和浮点数。

为了实现快速,最好是:

  • 避免文本解析(速度较慢,因为它是连续的并且充满了条件句)
  • 避免检查消息是否格式错误(此处不需要,因为它们都应该格式正确)
  • 尽可能避免拨款
  • 处理消息块

因此,我们首先需要设计一个简单快速的二进制消息协议:

二进制消息包含其属性的数量(以1字节编码),后面跟着属性列表。每个属性都包含一个以大小为前缀的字符串(以1字节编码),后面是属性的类型(std::变体中类型的索引,以1个字节编码)以及属性值(以大小为后缀的字符串、64位整数或64位浮点数)。

每个编码的消息都是一个字节流,可以放入一个大的缓冲区(分配一次,并对多个传入消息重复使用)。

以下是一个代码,用于解码来自原始二进制缓冲区的消息:

#include <unordered_map>
#include <variant>
#include <climits>
// Define the possible types here
using AttrType = std::variant<std::string_view, int64_t, double>;
// Decode the `msgData` buffer and write the decoded message into `result`.
// Assume the message is not ill-formed!
// msgData must not be freed or modified while the resulting map is being used.
void decode(const char* msgData, std::unordered_map<std::string_view, AttrType>& result)
{
static_assert(CHAR_BIT == 8);
const size_t attrCount = msgData[0];
size_t cur = 1;
result.clear();
for(size_t i=0 ; i<attrCount ; ++i)
{
const size_t keyLen = msgData[cur];
std::string_view key(msgData+cur+1, keyLen);
cur += 1 + keyLen;
const size_t attrType = msgData[cur];
cur++;
// A switch could be better if there is more types
if(attrType == 0) // std::string_view
{
const size_t valueLen = msgData[cur];
std::string_view value(msgData+cur+1, valueLen);
cur += 1 + valueLen;
result[key] = std::move(AttrType(value));
}
else if(attrType == 1) // Native-endian 64-bit integer
{
int64_t value;
// Required to not break the strict aliasing rule
std::memcpy(&value, msgData+cur, sizeof(int64_t));
cur += sizeof(int64_t);
result[key] = std::move(AttrType(value));
}
else // IEEE-754 double
{
double value;
// Required to not break the strict aliasing rule
std::memcpy(&value, msgData+cur, sizeof(double));
cur += sizeof(double);
result[key] = std::move(AttrType(value));
}
}
}

您可能也需要编写编码函数(基于相同的想法)。

下面是一个用法示例(基于json相关代码):

const char* message = "x01x05valuex00x05hello";
void bench()
{
std::unordered_map<std::string_view, AttrType> decodedMsg;
decodedMsg.reserve(16);
decode(message, decodedMsg);
for(size_t i=0; i<1000*1000; ++i)
{
decode(message, decodedMsg);
}
visit([](const auto& v) { cout << "Result: " << v << endl; }, decodedMsg["value"]);
}

在我的机器上(使用Intel i7-9700KF处理器),基于您的基准测试,使用nlohmann json库的代码,我获得了2.7M条消息/秒,使用新代码获得了35.4M条消息/s。

请注意,此代码的速度可能要快得多。事实上,大部分时间都花在了高效的散列和分配上。您可以通过使用更快的哈希映射实现(例如boost::container::flat_map或ska::bytell_hash_map)和/或使用自定义分配器来缓解此问题。另一种选择是构建您自己精心调优的哈希图实现。另一种选择是使用键值对的向量,并使用线性搜索来执行查找(这应该很快,因为您的消息不应该有很多属性,而且您说过每个消息需要一小部分属性)。然而,消息越大,解码就越慢。因此,您可能需要利用并行性来更快地解码消息块。有了所有这些,这就有可能达到超过1亿条消息/秒。