在订阅发布模式中,订阅者 应当 设置订阅类型。订阅类型:

  • 如果不设置订阅类型,则无法收到任何消息

  • 订阅类型是一个简单字符串,以此字符串开头的消息会发送给订阅者。

  • 订阅者可用多次设置订阅类型。符合任意一个订阅类型的消息都会被发送到订阅者

  • 当订阅类型为空字符串时,订阅者会收到所有消息

当发送的消息类型为多部份消息时,第一部分消息用做匹配:

#include <common/precompile.h>

int main(int argc, char** argv) {
    zmq::context_t ctx;
    zmq::socket_t  pub { ctx, zmq::socket_type::pub };
    zmq::socket_t  sub { ctx, zmq::socket_type::sub };

    pub.bind("tcp://*:9320");
    sub.connect("tcp://localhost:9320");
    sub.set(zmq::sockopt::subscribe, "A");

    std::thread([&]() {
        while(true) {
            std::vector<zmq::message_t> msgvec;
            zmq::recv_multipart(sub, std::back_inserter(msgvec));
            std::cout.clear();
            std::cout << msgvec[1].to_string() << std::endl;
        }
    }).detach();

    std::this_thread::sleep_for(std::chrono::seconds(1));

    for(int i = 0; i < 10; ++i) {
        pub.send(zmq::str_buffer("A boy"), zmq::send_flags::sndmore);
        pub.send(zmq::str_buffer("B is not C"), zmq::send_flags::none);
        pub.send(zmq::str_buffer("B like"), zmq::send_flags::sndmore);
        pub.send(zmq::str_buffer("A boy like apple"), zmq::send_flags::none);
    }

    std::this_thread::sleep_for(std::chrono::seconds(5));

    return 0;
}

输出结果为:

B is not C
B is not C
B is not C
B is not C
B is not C
B is not C
B is not C
B is not C
B is not C
B is not C
  • 如果发布者在订阅者连接之前就发布消息了,则前面的消息会被丢掉

  • 订阅者以轮询的方式向发布者发送消息

  • 如果订阅者太慢,需要设置阈值 HWM 保护发布者

  • 消息的过滤发生在在发布者处

使用 sub 进行 bind, pub 进行 connect 也是可行的

在 pub bind 的情况下,是 pub 端一对多,这是广播模式,扇出模型 在 sub bind 的情况下,是 sub 端一对多,这时信箱模式,扇入模型

和管道模式的区别

pub-sub, push-pull 模式的区别为:

  • pub 将消息发送到所有 sub 上。是广播模型

  • push-pull 将消息发送到任一 pull 上。是任播模型

除此之外,别无二致

Last moify: 2023-12-19 01:44:20
Build time:2025-07-18 09:41:42
Powered By asphinx