创建项目
最简单的是使用 git submodule:
# 在项目根路径下
cd extern
git submodule add https://github.com/zeromq/libzmq
git submodule add https://github.com/zeromq/cppzmq
这两个子模块的功能分别为:
libzmq 提供了库 libzmq,这是 cppzmq 的依赖
cpzmq 提供了 libzmq 对 C++ 的绑定
然后在 CMakeLists 中包含它们:
include_directories("${CMAKE_CURRENT_SOURCE_DIR}/extern/cppzmq")
set(BUILD_TESTS
OFF
CACHE BOOL "是否构建测试用例")
set(BUILD_SHARED
OFF
CACHE BOOL "是否构建共享库")
set(CPPZMQ_BUILD_TESTS
OFF
CACHE BOOL "是否构建 cppzmq 的测试用例")
add_subdirectory("${CMAKE_CURRENT_SOURCE_DIR}/extern/libzmq")
add_subdirectory("${CMAKE_CURRENT_SOURCE_DIR}/extern/cppzmq")
然后,直接将 cppzmq 或者 cppzmq-static 链接即可:
target_link_libraries(main PRIVATE cppzmq-static)
zmq 是协议无关的,其通信方式是使用封装好的通信模型,而与底层协议无关。
zmq 提供了三种模式:
请求-回复模式
发布和订阅模式
Push - Pull 模式
底层协议可为 udp, tcp 等
zmq 的基本设计
zmq 的消息是一次性的
zmq 的消息中不包含
\0
字符zmq 的发送和接收消息时二进制序列是不变的
zmq 会尽快 抛出异常
不要跨线程使用套接字
context_t
context_t 对象是 zmq 套接字的容器,一个正确的 zmq 程序总是以 context_t 的创建开始,以 content_t 的销毁为终。context_t 具备以下特点:
context_t 是线程安全的,一个进程可以创建任意个 context_t 对象
content_t 是进程独占的。子进程无法共享父进程的 content_t 对象
如果一个 context_t 对象提前被销毁,则所有使用此 context_t 的套接字返回错误
由于 content_t 的进程独占性,zmq 的程序一般在主进程中进行流程管理,在子进程中管理数据 |
套接字
和 Unix 中的套接字不同,zmq 中的套接字是 独立于协议 的。在程序完成后,可以根据需求任意地更改程序的连接协议。当前 zmq 支持的协议为:
协议 | 作用 |
---|---|
inproc | 线程间通信 |
ipc | 进程间通信 |
tcp | tcp 协议 |
pgm | rfc 定义的可靠多播协议 |
epgm | 自行实现的可靠多协议 |
另外,套接字始终是空指针,消息总是结构体。在编写 zmq 代码时,请注意:所有的套接字都属于 libzmq,只有消息属于自己!
另外,zmq 套接字还是:
套接字和连接是一对多的关系
套接字连接到端点后自动接受连接
网络中断后自动恢复
客户端可以先于服务器启动
套接字是有类型的,不同的套接字行为有所不同
套接字携带的是消息,而不是字节流
套接字在后台线程工作。不会阻塞前台进程。zmq_send 实际上是对消息进行排队,而不是发送
当客户端先于服务器启动时,客户端可以写入消息,这些无法发往服务端的消息可能会导致客户端阻塞,抑或是丢失一些消息 |
zmq 仍然有一些限制,例如:
windows 端的 ipc 不可用
v4.0 之前的 inproc 服务器必须先于客户端绑定
zmq 在后台线程中进行 IO,一般来说,一个线程就够了。一个套接字就足以处理数十个甚至数千个连接。当 zmq 用于线程间通信的时候,可以将 IO 线程设置为零
和操作系统的套接字不同,zmq 的套接字根据不同的模式具有不同的功能。在使用套接字模式的时候需要注意。zmq 提供的核心模式为:
请求 - 回复模式:远程过程调用抑或是任务分配
发布 - 订阅模式:数据分布模式
管道模式:并行任务分配和收集模式
独占对模式:用于连接统一进程中两个线程的模式
消息
zmq 的消息是从零开始的任意大小的 blob,消息是带有长度信息的二进制数据。如果使用 zmq_recv 接收消息,而缓冲区尺寸又不够,则消息会被截断。另一种更好的方式是使用 message_t 对象接收消息。
当发送消息后,消息的长度被设置为零。也就是说消息只能被发送一次(更确切地说是 message_t)
当删除一个 zmq 的消息时,实际上只是删除了对消息的引用。zmq 会在适当的时候自己删除它
如果需要多次发送相同的消息,而且它相当大,那么可以创建第二个消息,初始化后使用 zmq_msg_copy 创建第一个条消息的副本。当然,这个副本实际上是对真实数据的引用
zmq 可以通过多次调用 send 来发送一个数据包。这样的方式被称为 多部分消息 ,多部分消息在接受端会被拼接成一个完整的消息
请求-回复模式
请求回复模式适用于 一问一答 的情况,软件在发出消息后,下一个动作 必为 接收消息,反之依然。否则程序将会进入异常状态:
服务端代码:
void server() {
zmq::context_t context(1);
zmq::socket_t socket(context, zmq::socket_type::rep);
socket.bind("tcp://*:5555");
while(true) {
zmq::message_t req;
socket.recv(req, zmq::recv_flags::none);
std::cout << req.str() << std::endl;
sleep(1);
zmq::message_t replay(req.size());
memcpy(replay.data(), req.data(), req.size());
socket.send(replay, zmq::send_flags::none);
}
}
客户端代码:
void client() {
zmq::context_t context(1);
zmq::socket_t socket(context, zmq::socket_type::req);
std::string msg = "hello, world";
socket.connect("tcp://localhost:5555");
while(true) {
zmq::message_t replay(msg.length());
memcpy(replay.data(), msg.data(), msg.length());
socket.send(replay, zmq::send_flags::none);
zmq::message_t req;
socket.recv(req, zmq::recv_flags::none);
std::cout << req.str() << std::endl;
}
}
如果重新启动服务器,客户端将无法正常恢复。请求-回复模式映射到 RPC 和经典的客户端/服务器模型上
发布-订阅模式
pub-sub 是一个单向数据流,信息从 pub 流向任意多 sub,且 sub 可能在任意时刻连入 pub。
void server() {
zmq::context_t context(1);
zmq::socket_t socket(context, zmq::socket_type::pub);
socket.bind("tcp://*:5555");
while(true) {
socket.send(zmq::str_buffer("A"), zmq::send_flags::sndmore);
socket.send(zmq::str_buffer("hello, world"));
sleep(1);
}
}
void client() {
zmq::context_t context(1);
zmq::socket_t socket(context, zmq::socket_type::sub);
socket.connect("tcp://127.0.0.1:5555");
socket.set(zmq::sockopt::subscribe, "A");
while(true) {
std::vector<zmq::message_t> recv_msgs;
auto res = zmq::recv_multipart(socket, std::back_inserter(recv_msgs));
if(!res) {
std::cout << "error" << std::endl;
continue;
}
assert(*res == 2);
std::cout << recv_msgs[1].to_string() << std::endl;
}
}
在此模式下,pub 尝试接收信息或者 sub 尝试发送信息都会导致异常。
另外,由于 tcp 连接的建立需要时间,因此订阅者总是会错过这段时间发布的消息
发布-订阅有以下几种特点:
sub 可以连接多个 pub,且这些 pub 公平排队
如果 pub 没有订阅者,则直接丢弃所有消息
如果 sub 很慢,就会导致消息在 pub 上排队
zmq v3.0x 后,tcp/ipc 协议中消息过滤发生 pub 端,epgm 协议中消息过滤发生在 sub 端。而 zmq v2.x 中,过滤都发生在 sub 端
IO 多路复用
zmq 也提供了 poll 函数来执行 IO 多路复用:
zmq::socket socket_(...);
int timerfd_;
std::vector<zmq::pollitem_t> poitems= {
{socket_, 0, ZMQ_POLLIN, 0 },
{nullptr, timerfd_, ZMQ_POLLIN, 0}
};
while(true) {
zmq::poll(poitems);
if(poitems[1].revents & ZMQ_POLLIN) {
// 进行一次 read 防止一直触发定时器事件
spdlog::trace("Timerout event happend");
TimerEvent();
uint64_t exp;
read(timerfd_, &exp, sizeof(exp));
}
if(poitems[0].revents & ZMQ_POLLIN) {
spdlog::trace("MessageHandle event happend");
MessageHandle();
}
}
动态发现
在一个动态网络中,一个经常碰到的问题是机器之间怎么相互了解。尤其是当网络中的机器经常变动的时候。这个时候最好的方式是创建一个代理。这个问题被称作 动态发现 问题
代理一方面接收来自发布者的消息,另一方面将消息发送给订阅者。这样,新的发布者加入时只需要修改代理,新的订阅者出现时只需要订阅代理
消息代理
zmq 提供了非常便捷的方式来构建消息代理:
#include "zhelpers.hpp"
int main (int argc, char *argv[]){
zmq::context_t context(1);
// Socket facing clients
zmq::socket_t frontend (context, ZMQ_ROUTER);
frontend.bind("tcp://*:5559");
// Socket facing services
zmq::socket_t backend (context, ZMQ_DEALER);
backend.bind("tcp://*:5560");
// Start the proxy
zmq::proxy(static_cast<void*>(frontend),
static_cast<void*>(backend),
nullptr);
return 0;
}
错误处理
zmq 的错误处理分为两个方向:
内部的错误直接调用断言
外部的错误返回错误码
内部的错误例如错误地使用了套接字模式。外部的错误则为网络等问题。一个理想的 zmq 代码应该对每个 zmq 调用进行错误处理
对于 c 语言,错误返回有以下方式:
创建对象的方法失败则返回 NULL
处理数据的方法失败返回 -1
其它方法成功时返回 0,失败时返回 -1
错误代码在 errno 或 zmq_errno 中提供
zmq_strerror 提供了错误的文本
多线程
zmq 中的多线程通信的原则是:通过通信来共享数据,而不是通过共享数据来通信。基于这一点,一个正确的 zmq 多线程程序应当总是通过 inporc 套接字通信
除了 content_t 不要在多线程中共享任何数据
不要使用传统的互斥锁、临界区、信号量等
在进程开始时差u能构建一个 content_t 对象,然后传递给所有通过 inproc 连接的线程
线程之间的所有交互都通过 zmq 消息的形式发生
不要再线程之间共享 zmq 套接字
pair 套接字
pair 套接字类似于一个信号量。用来协调一对进程的进度。一个进程调用 read 来阻塞自己,另一个进程调用 send 来通知对方:
#include <iostream>
#include <thread>
#include <zmq.hpp>
#include <zmq_addon.hpp>
int main(int argc, char *argv[]) {
zmq::context_t context;
std::thread t([&context]() {
zmq::socket_t socket(context, zmq::socket_type::pair);
socket.bind("inproc://step1");
std::cout << "hello, ";
zmq::message_t msg;
socket.send(msg, zmq::send_flags::none);
});
zmq::socket_t socket(context, zmq::socket_type::pair);
socket.connect("inproc://step1");
zmq::message_t msg;
socket.recv(msg, zmq::recv_flags::none);
std::cout << "world" << std::endl;
if(t.joinable()) t.join();
return 0;
}