DDS(Data Distribution Service) 是一个多对多的订阅发布模式的数据通信模型。

DDS 是一个 DCPS(Data-Centric Public Subscribe) 模型。在实现中定义了三种重要实体:

实体用途

发布实体

定义了消息生成对象及其属性。

订阅实体

定义了消息消费对象及其属性。

配置实体

定义了作为 Topic 传输的信息类型。

此外,DDS 使用 QoS 来描述发布实体和订阅实体的行为特征。更简单来讲,QoS 配置了当消息丢失时订阅实体和发布实体的行为。例如使用尽力而为模型亦或是可靠传输。

DCPS 概念模型

DCPS 定义了四种元素:

元素作用

Publisher

用于发布消息。

Subscriber

用于接受消息。

Topic

用于定义消息类型。

Domain

用于限制消息传播的范围。

这些消息关系如下图:

dds domain
Figure 1. DDS Domain

如上图所示,可以看到:

  • 一个 Publisher 可以拥有多个 DataWriter。

  • 一个 DataWriter 只能写一种 Topic。

  • 多个 Publisher 可以发送同一个 Topic 的消息。

  • 同一个 Domain 中的 Publisher 和 Subscriber 才可以相互通信。

  • 同一个 Topic 的 Publisher 和 Subscriber 必须使用同一种 QoS 策略。

RTPS

RTPS(Real-Time Publish Subscribe) 是基于 UDP 实现的 尽力而为 的订阅发布通信中间件。

它旨在同时支持单播和多播通信。

RTPS 可以简单理解为对 DDS 的一种实现,因此概念也基本相同:

rtps domain
Figure 2. RTPS Domain

Fast DDS 架构

Fast DDS 分为五层架构:

  • 应用层:应用层使用 Fast DDS API 实现分布式系统的通信。

  • Fast DDS 层:鲁棒性的 DDS 通信中间件。允许一个或多个 DDS domain 中的 DomainParticipant 通过发布或订阅同一个 topic 进行通信。

  • RTPS 层:实现了 RTPS 协议以便 DDS 应用互操作。此层是 transport 层的抽象层。

  • transport 层:Fast DDS 可以运行在多种传输层协议上。例如 UDP、TCP 或者共享内存(SHM)。

library overview
Figure 3. Fast DDS layer model architecture

DDS 层

Fast DDS 的 DDS 层定义了几个关键元素。用户将会在他们的应用中创建这些应用,以便创建一个数据交流系统。Fast DDS 遵循 DDS 规范,将这些涉及通信的元素称为 实体 。一个 DDS 实体 是任何支持 QoS 配置且实现了 listener 的对象。

  • QoS。

下面列出了 DDS 实体和它们的描述和作用:

  • Domain。 一个用于标示 DDS domain 的正数。每个 DomainParticipant 都会被分配一个 DDS domain,因此相同 domain 的 DomainParticipant 可以通信,反之则不然。这个值必须由应用开发者在创建 DomainParticipant 的时候给予。

  • DomainParticipant。包含了诸如 publisher、subscriber、topic 或 multitopics 等 DDS 实体的对象。其允许创建曾经包含的对象并配置它们的行为。

  • Publisher。Publisher 允许使用 DataWriter 发布特定 topic 的数据。DataWriter 将数据写道 transport。其允许创建 DataWriter 并配置其包含的 DataWriter。且可以包含多个 DataWriter。

  • DataWriter。用户在创建 DataWriter 时必须提供一个 topic。此后使用 DataWriter 发布的数据自动变为相关的 topic。

  • DataWriterHistory。代表了对数据对象的一系列更改。当 DataWriter 在 topic 下发布数据时,实际上创建了对此数据的一个 change。这个 change 将会被注册到 DataWriterHistory,然后被发送到相关 topic 的 DataReader。

  • Subscriber。Subscriber 使用 DataReader 订阅了一个 topic,DataReader 从 transport 中读取数据。其可以创建 DataReader 并配置其包含的 DataReader,且可以包含多个 DataReader。‘

  • DataReader。DataReader 创建时必须提供一个 topic。当它的 HistoryDataReader 发生变化时,DataReader 会收到消息。

  • DataReaderHistory。它包含了 DataReader 读取的数据对象的更改。

  • Topic。用于绑定 DataWriter 和 DataReader 的实体。

RTPS 层

正如上述所提到的,Fast DDS 中的 RTPS 协议允许从 transport 层抽象 DDS 应用实体。根据上面的图,RTPS 层有四个主要实体:

  • RTPSDomain。时 DDS domain 在 RTPS 协议的拓展。

  • RTPSParticipant。包含了其它 RTPS 实体的实体。允许配置和创建其包含的实体。

  • RTPSWriter。message 的来源。其读取 DataWriterHistory 中写入的更改,并将其传输到所有它们之前匹配的 RTPSReader。

  • RTPSReader。message 的接收实体。它将 RTPSWriter 的更改写入 DataReaderHistory。

transport 层

Fast DDS 支持通过多种传输层协议实现应用程序。包含 UDPv4, UDPv6, TCPv4, TCPv6 和共享内存(shm)。默认情况下,DomainParticipant 实现了 UDPv4 和 SHM 传输协议。

enable 状态

enable() 操作将一个 Entity 从不可操纵状态转变为可操纵状态。实体对象可以在创建的时候指定 enable 状态。默认情况下,实体创建时总是 enable 的,也就是说,默认情况下一旦一个实体被创建,那么它就能被使用。

但是某些情况下可能需要改变这种行为。例如,默认情况下,一旦创建了一个 DataReader,它就开始接受相关 Topic 下的 DDS samples。但是这时你的应用可能还在初始化过程中,因此没有能力去处理致谢 samples。此时可以先将 DataReader 创建为 disabled 状态,在之后再转换其状态。

要创建一个处于 disabled 状态下的实体,需要修改 Subscriber 的 QoS 状态。

DDS_SubscriberQos subscriber_qos;
subscriber->get_qos(subscriber_qos);
subscriber_qos.entity_factory.autoenable_created_entities = DDS_BOOLEAN_FALSE;
subscriber->set_qos(subscriber_qos);
DDSDataReader* datareader =
    subscriber->create_datareader(topic, DDS_DATAREADER_QOS_DEFAULT, listener);

之后再调整其到 enabled 状态:

datareader->enable();

enable 具备以下规则:

现假设 Factory 代指指向 DomainParticipant, Publisher 或 Subscriber 的引用,child 意指它创建的实体。

  • 若 factory disabled,则其创建的 children 默认也是 diabled 的。

  • 若 factory enabled,则其创建的 children 根据 factory 的 EntityFactory QoS 既可以是 enabled 也可以是 disabled。

  • 在 factory disabled 的情况下调用 child 的 enable 将会导致失败,返回值为 DDS_RECODE_RECONDITION_NOT_MET。

  • 若 factory 的 EntityFactoryQoS 的值携带了 DDS_BOOLEAN_TRUE,则调用 factory 的 enable 同时也会将 children enable。若值为 DDS_BOOLEAN_FALSE 则只 enable factory 本身。

  • 在一个已经 enabled 的实体上调用 enable() 没有任何效果。返回值为 DDS_RETCODE_OK。

  • 没有 disable() 函数。实体的 enable 过程是不可逆的。

  • 除非实体是 enabled,否则实体的 Listener 不会被调用。

  • 除非实体被 enable,否则其存在不会被传播到其它 DomainParticipants。

  • 如果 DataWriter 或 DataReader 创建时就被 enable,则其关联的 topic 也必须已经 enabled。反之则 topic 的状态无关紧要。

  • 如果调用 DataWriter 或 DataReader 的 enable,则对其对应的 Publisher 或 Subscriber 和 Topic 也必须已经 enable。否则操作失败并返回 DDS_RETCODE_PRECONDITION_NOT_MET。

实体

实体(Entity)是对所有 DDS 实体的抽象。实体对象意味着同时支持 QoS 政策、Listener 和 Statuses 的对象。

实体类型

  • DomainParticipant:此实体是 Service 的实体端(Entry-Point),同时作为 Publisher, Subscriber 和 Topic 的工厂。

  • Publisher:是 DataWriter 的工厂和容器。

  • Subscriber:是 DataReader 的工厂和容器。

  • Topic:用于充当 Publisher 和 Subscriber 的通道。

  • DataWriter:用于分发数据的对象。

  • DataReader:用于接受数据的对象。

下面的图可以用来表示所有 DDS 实体的继承树:

entity diagram
Figure 4. Entity Diagram

共有实体属性

一些属性是所有实体都共享的:

  • 实体 ID。每个实体都有一个独一的 ID。这在 DDS 实体和它对应的 RTPS 实体(若存在)中是相同的。此 ID 被储存在声明在实体基类的 实例句柄对象 中。实例句柄对象可以通过 get_instance_handle() 访问。

  • QoS 政策。QoS 是一系列 policies 的容器。通过修改 policies 可以配置 DDS 实体的行为。QoS 可以在对象创建时设置,也可以在之后通过 set_qos() 接口进行更改。

  • Listener。

    Listener 是一个异步通知系统。当 DDS 实体的状态发生改变时,Listener 会调用相关的回调函数。

    所有的实体类型都有一个抽象的 Listener 接口,其包含了相关的回调函数。用户可以自己实现 Listener 并在自己的程序中实现他们的回调函数,然后调用 set_listener() 函数设置 Listener。Listener 及其接口如下:

    listeners inheritance diagram
    Figure 5. Listener Inheritance

    Note: Listener 是无状态的,因而可以在多个实体对象中共享。

  • Status。每个实体都有一系列状态对象来代表它们的通信状态。当状态改变时,会通知相关的 Listener。

  • StatusCondition。每个实体都拥有一个 StatusCondition,当实体的 enabled 状态发生更改时,将会通知它。StatusCondition 提供了实体和 Wait-set 的链接。

  • Enabling 实体。所有的实体都能更改 enabled 状态。

Condition 和 Wait-set

Conditions(同 Wait-sets 一起)为应用提供了类似 IO 多路复用的状态通知机制。

此机制是基于等待的。使用方式一般如下:

  1. 应用指出需要等待哪些状态。这通过创建 Condition 对象并通过 attach_condition() 将其添加到 Wait-set 实现。状态对象一般有 GuardCondition, StatusCondition, ReadCondition。

  2. 然后调用 Wait-set 的 wait() 函数等待有 Condition 对象变为 true。

  3. 如果 wait() 返回,则可以通过其它步骤获取详细信息。

下面是一个示例:

class ApplicationJob
{
    WaitSet wait_set_;
    GuardCondition terminate_condition_;
    std::thread thread_;

    void main_loop()
    {
        // Main loop is repeated until the terminate condition is triggered
        while (false == terminate_condition_.get_trigger_value())
        {
            // Wait for any of the conditions to be triggered
            ReturnCode_t ret_code;
            ConditionSeq triggered_conditions;
            ret_code = wait_set_.wait(triggered_conditions, eprosima::fastrtps::c_TimeInfinite);
            if (ReturnCode_t::RETCODE_OK != ret_code)
            {
                // ... handle error
                continue;
            }

            // Process triggered conditions
            for (Condition* cond : triggered_conditions)
            {
                StatusCondition* status_cond = dynamic_cast<StatusCondition*>(cond);
                if (nullptr != status_cond)
                {
                    Entity* entity = status_cond->get_entity();
                    StatusMask changed_statuses = entity->get_status_changes();

                    // Process status. Liveliness changed and data available are depicted as an example
                    if (changed_statuses.is_active(StatusMask::liveliness_changed()))
                    {
                        std::cout << "Liveliness changed reported for entity " << entity->get_instance_handle() <<
                            std::endl;
                    }

                    if (changed_statuses.is_active(StatusMask::data_available()))
                    {
                        std::cout << "Data avilable on reader " << entity->get_instance_handle() << std::endl;

                        FooSeq data_seq;
                        SampleInfoSeq info_seq;
                        DataReader* reader = static_cast<DataReader*>(entity);

                        // Process all the samples until no one is returned
                        while (ReturnCode_t::RETCODE_OK == reader->take(data_seq, info_seq,
                                LENGTH_UNLIMITED, ANY_SAMPLE_STATE,
                                ANY_VIEW_STATE, ANY_INSTANCE_STATE))
                        {
                            // Both info_seq.length() and data_seq.length() will have the number of samples returned
                            for (FooSeq::size_type n = 0; n < info_seq.length(); ++n)
                            {
                                // Only samples for which valid_data is true should be accessed
                                if (info_seq[n].valid_data)
                                {
                                    // Process sample on data_seq[n]
                                }
                            }

                            // must return the loaned sequences when done processing
                            reader->return_loan(data_seq, info_seq);
                        }
                    }
                }
            }
        }
    }

public:

    ApplicationJob(
            const std::vector<DataReader*>& readers,
            const std::vector<DataWriter*>& writers)
    {
        // Add a GuardCondition, so we can signal the processing thread to stop
        wait_set_.attach_condition(terminate_condition_);

        // Add the status condition of every reader and writer
        for (DataReader* reader : readers)
        {
            wait_set_.attach_condition(reader->get_statuscondition());
        }
        for (DataWriter* writer : writers)
        {
            wait_set_.attach_condition(writer->get_statuscondition());
        }

        thread_ = std::thread(&ApplicationJob::main_loop, this);
    }

    ~ApplicationJob()
    {
        // Signal the GuardCondition to force the WaitSet to wake up
        terminate_condition_.set_trigger_value(true);
        // Wait for the thread to finish
        thread_.join();
    }

};

// Application initialization
ReturnCode_t ret_code;
std::vector<DataReader*> application_readers;
std::vector<DataWriter*> application_writers;

// Create the participant, topics, readers, and writers.
ret_code = create_dds_application(application_readers, application_writers);
if (ReturnCode_t::RETCODE_OK != ret_code)
{
    // ... handle error
    return;
}

{
    ApplicationJob main_loop_thread(application_readers, application_writers);

    // ... wait for application termination signaling (signal handler, user input, etc)

    // ... Destructor of ApplicationJob takes care of stopping the processing thread
}

// Destroy readers, writers, topics, and participant
destroy_dds_application();
Last moify: 2025-01-27 01:58:23
Build time:2025-07-18 09:41:42
Powered By asphinx