创建项目

最简单的是使用 git submodule:

# 在项目根路径下
cd extern
git submodule add https://github.com/zeromq/libzmq
git submodule add https://github.com/zeromq/cppzmq

这两个子模块的功能分别为:

  • libzmq 提供了库 libzmq,这是 cppzmq 的依赖

  • cpzmq 提供了 libzmq 对 C++ 的绑定

然后在 CMakeLists 中包含它们:

include_directories("${CMAKE_CURRENT_SOURCE_DIR}/extern/cppzmq")

set(BUILD_TESTS
    OFF
    CACHE BOOL "是否构建测试用例")
 set(BUILD_SHARED
    OFF
    CACHE BOOL "是否构建共享库")
set(CPPZMQ_BUILD_TESTS
    OFF
    CACHE BOOL "是否构建 cppzmq 的测试用例")
add_subdirectory("${CMAKE_CURRENT_SOURCE_DIR}/extern/libzmq")
add_subdirectory("${CMAKE_CURRENT_SOURCE_DIR}/extern/cppzmq")

然后,直接将 cppzmq 或者 cppzmq-static 链接即可:

target_link_libraries(main PRIVATE cppzmq-static)

zmq 是协议无关的,其通信方式是使用封装好的通信模型,而与底层协议无关。

zmq 提供了三种模式:

  • 请求-回复模式

  • 发布和订阅模式

  • Push - Pull 模式

底层协议可为 udp, tcp 等

zmq 的基本设计

  • zmq 的消息是一次性的

  • zmq 的消息中不包含 \0 字符

  • zmq 的发送和接收消息时二进制序列是不变的

  • zmq 会尽快 抛出异常

  • 不要跨线程使用套接字

context_t

context_t 对象是 zmq 套接字的容器,一个正确的 zmq 程序总是以 context_t 的创建开始,以 content_t 的销毁为终。context_t 具备以下特点:

  • context_t 是线程安全的,一个进程可以创建任意个 context_t 对象

  • content_t 是进程独占的。子进程无法共享父进程的 content_t 对象

如果一个 context_t 对象提前被销毁,则所有使用此 context_t 的套接字返回错误

由于 content_t 的进程独占性,zmq 的程序一般在主进程中进行流程管理,在子进程中管理数据

套接字

和 Unix 中的套接字不同,zmq 中的套接字是 独立于协议 的。在程序完成后,可以根据需求任意地更改程序的连接协议。当前 zmq 支持的协议为:

协议作用

inproc

线程间通信

ipc

进程间通信

tcp

tcp 协议

pgm

rfc 定义的可靠多播协议

epgm

自行实现的可靠多协议

另外,套接字始终是空指针,消息总是结构体。在编写 zmq 代码时,请注意:所有的套接字都属于 libzmq,只有消息属于自己!

另外,zmq 套接字还是:

  • 套接字和连接是一对多的关系

  • 套接字连接到端点后自动接受连接

  • 网络中断后自动恢复

  • 客户端可以先于服务器启动

  • 套接字是有类型的,不同的套接字行为有所不同

  • 套接字携带的是消息,而不是字节流

  • 套接字在后台线程工作。不会阻塞前台进程。zmq_send 实际上是对消息进行排队,而不是发送

当客户端先于服务器启动时,客户端可以写入消息,这些无法发往服务端的消息可能会导致客户端阻塞,抑或是丢失一些消息

zmq 仍然有一些限制,例如:

  • windows 端的 ipc 不可用

  • v4.0 之前的 inproc 服务器必须先于客户端绑定

zmq 在后台线程中进行 IO,一般来说,一个线程就够了。一个套接字就足以处理数十个甚至数千个连接。当 zmq 用于线程间通信的时候,可以将 IO 线程设置为零

和操作系统的套接字不同,zmq 的套接字根据不同的模式具有不同的功能。在使用套接字模式的时候需要注意。zmq 提供的核心模式为:

  • 请求 - 回复模式:远程过程调用抑或是任务分配

  • 发布 - 订阅模式:数据分布模式

  • 管道模式:并行任务分配和收集模式

  • 独占对模式:用于连接统一进程中两个线程的模式

消息

zmq 的消息是从零开始的任意大小的 blob,消息是带有长度信息的二进制数据。如果使用 zmq_recv 接收消息,而缓冲区尺寸又不够,则消息会被截断。另一种更好的方式是使用 message_t 对象接收消息。

  • 当发送消息后,消息的长度被设置为零。也就是说消息只能被发送一次(更确切地说是 message_t)

  • 当删除一个 zmq 的消息时,实际上只是删除了对消息的引用。zmq 会在适当的时候自己删除它

如果需要多次发送相同的消息,而且它相当大,那么可以创建第二个消息,初始化后使用 zmq_msg_copy 创建第一个条消息的副本。当然,这个副本实际上是对真实数据的引用

zmq 可以通过多次调用 send 来发送一个数据包。这样的方式被称为 多部分消息 ,多部分消息在接受端会被拼接成一个完整的消息

请求-回复模式

请求回复模式适用于 一问一答 的情况,软件在发出消息后,下一个动作 必为 接收消息,反之依然。否则程序将会进入异常状态:

服务端代码:

void server() {
   zmq::context_t context(1);
   zmq::socket_t  socket(context, zmq::socket_type::rep);

   socket.bind("tcp://*:5555");

   while(true) {
      zmq::message_t req;

      socket.recv(req, zmq::recv_flags::none);
      std::cout << req.str() << std::endl;

      sleep(1);
      zmq::message_t replay(req.size());
      memcpy(replay.data(), req.data(), req.size());
      socket.send(replay, zmq::send_flags::none);
   }
}

客户端代码:

void client() {
   zmq::context_t context(1);
   zmq::socket_t  socket(context, zmq::socket_type::req);
   std::string    msg = "hello, world";

   socket.connect("tcp://localhost:5555");
   while(true) {
      zmq::message_t replay(msg.length());
      memcpy(replay.data(), msg.data(), msg.length());
      socket.send(replay, zmq::send_flags::none);

      zmq::message_t req;
      socket.recv(req, zmq::recv_flags::none);
      std::cout << req.str() << std::endl;
   }
}

如果重新启动服务器,客户端将无法正常恢复。请求-回复模式映射到 RPC 和经典的客户端/服务器模型上

发布-订阅模式

pub-sub 是一个单向数据流,信息从 pub 流向任意多 sub,且 sub 可能在任意时刻连入 pub。

void server() {
    zmq::context_t context(1);
    zmq::socket_t  socket(context, zmq::socket_type::pub);

    socket.bind("tcp://*:5555");

    while(true) {
        socket.send(zmq::str_buffer("A"), zmq::send_flags::sndmore);
        socket.send(zmq::str_buffer("hello, world"));
        sleep(1);
    }
}

void client() {
    zmq::context_t context(1);
    zmq::socket_t  socket(context, zmq::socket_type::sub);

    socket.connect("tcp://127.0.0.1:5555");

    socket.set(zmq::sockopt::subscribe, "A");

    while(true) {
        std::vector<zmq::message_t> recv_msgs;

        auto res = zmq::recv_multipart(socket, std::back_inserter(recv_msgs));

        if(!res) {
            std::cout << "error" << std::endl;
            continue;
        }
        assert(*res == 2);

        std::cout << recv_msgs[1].to_string() << std::endl;
    }
}

在此模式下,pub 尝试接收信息或者 sub 尝试发送信息都会导致异常。

另外,由于 tcp 连接的建立需要时间,因此订阅者总是会错过这段时间发布的消息

发布-订阅有以下几种特点:

  • sub 可以连接多个 pub,且这些 pub 公平排队

  • 如果 pub 没有订阅者,则直接丢弃所有消息

  • 如果 sub 很慢,就会导致消息在 pub 上排队

  • zmq v3.0x 后,tcp/ipc 协议中消息过滤发生 pub 端,epgm 协议中消息过滤发生在 sub 端。而 zmq v2.x 中,过滤都发生在 sub 端

IO 多路复用

zmq 也提供了 poll 函数来执行 IO 多路复用:

zmq::socket socket_(...);
int timerfd_;

std::vector<zmq::pollitem_t> poitems= {
    {socket_, 0, ZMQ_POLLIN, 0 },
    {nullptr, timerfd_, ZMQ_POLLIN, 0}
};

while(true) {
    zmq::poll(poitems);

    if(poitems[1].revents & ZMQ_POLLIN) {
        // 进行一次 read 防止一直触发定时器事件
        spdlog::trace("Timerout event happend");
        TimerEvent();
        uint64_t exp;
        read(timerfd_, &exp, sizeof(exp));
    }
    if(poitems[0].revents & ZMQ_POLLIN) {
        spdlog::trace("MessageHandle event happend");
        MessageHandle();
    }
}

动态发现

在一个动态网络中,一个经常碰到的问题是机器之间怎么相互了解。尤其是当网络中的机器经常变动的时候。这个时候最好的方式是创建一个代理。这个问题被称作 动态发现 问题

代理一方面接收来自发布者的消息,另一方面将消息发送给订阅者。这样,新的发布者加入时只需要修改代理,新的订阅者出现时只需要订阅代理

Diagram

消息代理

zmq 提供了非常便捷的方式来构建消息代理:

#include "zhelpers.hpp"

int main (int argc, char *argv[]){
   zmq::context_t context(1);

   //  Socket facing clients
   zmq::socket_t frontend (context, ZMQ_ROUTER);
   frontend.bind("tcp://*:5559");

   //  Socket facing services
   zmq::socket_t backend (context, ZMQ_DEALER);
   backend.bind("tcp://*:5560");

   //  Start the proxy
   zmq::proxy(static_cast<void*>(frontend),
              static_cast<void*>(backend),
              nullptr);
   return 0;
}

错误处理

zmq 的错误处理分为两个方向:

  • 内部的错误直接调用断言

  • 外部的错误返回错误码

内部的错误例如错误地使用了套接字模式。外部的错误则为网络等问题。一个理想的 zmq 代码应该对每个 zmq 调用进行错误处理

对于 c 语言,错误返回有以下方式:

  • 创建对象的方法失败则返回 NULL

  • 处理数据的方法失败返回 -1

  • 其它方法成功时返回 0,失败时返回 -1

  • 错误代码在 errno 或 zmq_errno 中提供

  • zmq_strerror 提供了错误的文本

多线程

zmq 中的多线程通信的原则是:通过通信来共享数据,而不是通过共享数据来通信。基于这一点,一个正确的 zmq 多线程程序应当总是通过 inporc 套接字通信

  • 除了 content_t 不要在多线程中共享任何数据

  • 不要使用传统的互斥锁、临界区、信号量等

  • 在进程开始时差u能构建一个 content_t 对象,然后传递给所有通过 inproc 连接的线程

  • 线程之间的所有交互都通过 zmq 消息的形式发生

  • 不要再线程之间共享 zmq 套接字

pair 套接字

pair 套接字类似于一个信号量。用来协调一对进程的进度。一个进程调用 read 来阻塞自己,另一个进程调用 send 来通知对方:

#include <iostream>
#include <thread>

#include <zmq.hpp>
#include <zmq_addon.hpp>

int main(int argc, char *argv[]) {
    zmq::context_t context;

    std::thread t([&context]() {
        zmq::socket_t socket(context, zmq::socket_type::pair);
        socket.bind("inproc://step1");

        std::cout << "hello, ";

        zmq::message_t msg;
        socket.send(msg, zmq::send_flags::none);
    });

    zmq::socket_t socket(context, zmq::socket_type::pair);
    socket.connect("inproc://step1");

    zmq::message_t msg;
    socket.recv(msg, zmq::recv_flags::none);

    std::cout << "world" << std::endl;

    if(t.joinable()) t.join();
    return 0;
}
Last moify: 2022-12-04 15:11:33
Build time:2025-07-18 09:41:42
Powered By asphinx