DDS(Data Distribution Service) 是一个多对多的订阅发布模式的数据通信模型。
DDS 是一个 DCPS(Data-Centric Public Subscribe) 模型。在实现中定义了三种重要实体:
实体 | 用途 |
---|---|
发布实体 | 定义了消息生成对象及其属性。 |
订阅实体 | 定义了消息消费对象及其属性。 |
配置实体 | 定义了作为 Topic 传输的信息类型。 |
此外,DDS 使用 QoS 来描述发布实体和订阅实体的行为特征。更简单来讲,QoS 配置了当消息丢失时订阅实体和发布实体的行为。例如使用尽力而为模型亦或是可靠传输。
DCPS 概念模型
DCPS 定义了四种元素:
元素 | 作用 |
---|---|
Publisher | 用于发布消息。 |
Subscriber | 用于接受消息。 |
Topic | 用于定义消息类型。 |
Domain | 用于限制消息传播的范围。 |
这些消息关系如下图:
如上图所示,可以看到:
一个 Publisher 可以拥有多个 DataWriter。
一个 DataWriter 只能写一种 Topic。
多个 Publisher 可以发送同一个 Topic 的消息。
同一个 Domain 中的 Publisher 和 Subscriber 才可以相互通信。
同一个 Topic 的 Publisher 和 Subscriber 必须使用同一种 QoS 策略。
RTPS
RTPS(Real-Time Publish Subscribe) 是基于 UDP 实现的 尽力而为 的订阅发布通信中间件。
它旨在同时支持单播和多播通信。
RTPS 可以简单理解为对 DDS 的一种实现,因此概念也基本相同:
Fast DDS 架构
Fast DDS 分为五层架构:
应用层:应用层使用 Fast DDS API 实现分布式系统的通信。
Fast DDS 层:鲁棒性的 DDS 通信中间件。允许一个或多个 DDS domain 中的 DomainParticipant 通过发布或订阅同一个 topic 进行通信。
RTPS 层:实现了 RTPS 协议以便 DDS 应用互操作。此层是 transport 层的抽象层。
transport 层:Fast DDS 可以运行在多种传输层协议上。例如 UDP、TCP 或者共享内存(SHM)。
DDS 层
Fast DDS 的 DDS 层定义了几个关键元素。用户将会在他们的应用中创建这些应用,以便创建一个数据交流系统。Fast DDS 遵循 DDS 规范,将这些涉及通信的元素称为 实体 。一个 DDS 实体 是任何支持 QoS 配置且实现了 listener 的对象。
QoS。
下面列出了 DDS 实体和它们的描述和作用:
Domain。 一个用于标示 DDS domain 的正数。每个 DomainParticipant 都会被分配一个 DDS domain,因此相同 domain 的 DomainParticipant 可以通信,反之则不然。这个值必须由应用开发者在创建 DomainParticipant 的时候给予。
DomainParticipant。包含了诸如 publisher、subscriber、topic 或 multitopics 等 DDS 实体的对象。其允许创建曾经包含的对象并配置它们的行为。
Publisher。Publisher 允许使用 DataWriter 发布特定 topic 的数据。DataWriter 将数据写道 transport。其允许创建 DataWriter 并配置其包含的 DataWriter。且可以包含多个 DataWriter。
DataWriter。用户在创建 DataWriter 时必须提供一个 topic。此后使用 DataWriter 发布的数据自动变为相关的 topic。
DataWriterHistory。代表了对数据对象的一系列更改。当 DataWriter 在 topic 下发布数据时,实际上创建了对此数据的一个 change。这个 change 将会被注册到 DataWriterHistory,然后被发送到相关 topic 的 DataReader。
Subscriber。Subscriber 使用 DataReader 订阅了一个 topic,DataReader 从 transport 中读取数据。其可以创建 DataReader 并配置其包含的 DataReader,且可以包含多个 DataReader。‘
DataReader。DataReader 创建时必须提供一个 topic。当它的 HistoryDataReader 发生变化时,DataReader 会收到消息。
DataReaderHistory。它包含了 DataReader 读取的数据对象的更改。
Topic。用于绑定 DataWriter 和 DataReader 的实体。
RTPS 层
正如上述所提到的,Fast DDS 中的 RTPS 协议允许从 transport 层抽象 DDS 应用实体。根据上面的图,RTPS 层有四个主要实体:
RTPSDomain。时 DDS domain 在 RTPS 协议的拓展。
RTPSParticipant。包含了其它 RTPS 实体的实体。允许配置和创建其包含的实体。
RTPSWriter。message 的来源。其读取 DataWriterHistory 中写入的更改,并将其传输到所有它们之前匹配的 RTPSReader。
RTPSReader。message 的接收实体。它将 RTPSWriter 的更改写入 DataReaderHistory。
transport 层
Fast DDS 支持通过多种传输层协议实现应用程序。包含 UDPv4, UDPv6, TCPv4, TCPv6 和共享内存(shm)。默认情况下,DomainParticipant 实现了 UDPv4 和 SHM 传输协议。
enable 状态
enable()
操作将一个 Entity 从不可操纵状态转变为可操纵状态。实体对象可以在创建的时候指定 enable 状态。默认情况下,实体创建时总是 enable 的,也就是说,默认情况下一旦一个实体被创建,那么它就能被使用。
但是某些情况下可能需要改变这种行为。例如,默认情况下,一旦创建了一个 DataReader,它就开始接受相关 Topic 下的 DDS samples。但是这时你的应用可能还在初始化过程中,因此没有能力去处理致谢 samples。此时可以先将 DataReader 创建为 disabled 状态,在之后再转换其状态。
要创建一个处于 disabled 状态下的实体,需要修改 Subscriber 的 QoS 状态。
DDS_SubscriberQos subscriber_qos;
subscriber->get_qos(subscriber_qos);
subscriber_qos.entity_factory.autoenable_created_entities = DDS_BOOLEAN_FALSE;
subscriber->set_qos(subscriber_qos);
DDSDataReader* datareader =
subscriber->create_datareader(topic, DDS_DATAREADER_QOS_DEFAULT, listener);
之后再调整其到 enabled 状态:
datareader->enable();
enable 具备以下规则:
现假设 Factory 代指指向 DomainParticipant, Publisher 或 Subscriber 的引用,child 意指它创建的实体。
若 factory disabled,则其创建的 children 默认也是 diabled 的。
若 factory enabled,则其创建的 children 根据 factory 的 EntityFactory QoS 既可以是 enabled 也可以是 disabled。
在 factory disabled 的情况下调用 child 的 enable 将会导致失败,返回值为 DDS_RECODE_RECONDITION_NOT_MET。
若 factory 的 EntityFactoryQoS 的值携带了 DDS_BOOLEAN_TRUE,则调用 factory 的 enable 同时也会将 children enable。若值为 DDS_BOOLEAN_FALSE 则只 enable factory 本身。
在一个已经 enabled 的实体上调用 enable() 没有任何效果。返回值为 DDS_RETCODE_OK。
没有 disable() 函数。实体的 enable 过程是不可逆的。
除非实体是 enabled,否则实体的 Listener 不会被调用。
除非实体被 enable,否则其存在不会被传播到其它 DomainParticipants。
如果 DataWriter 或 DataReader 创建时就被 enable,则其关联的 topic 也必须已经 enabled。反之则 topic 的状态无关紧要。
如果调用 DataWriter 或 DataReader 的 enable,则对其对应的 Publisher 或 Subscriber 和 Topic 也必须已经 enable。否则操作失败并返回 DDS_RETCODE_PRECONDITION_NOT_MET。
实体
实体(Entity)是对所有 DDS 实体的抽象。实体对象意味着同时支持 QoS 政策、Listener 和 Statuses 的对象。
实体类型
DomainParticipant:此实体是 Service 的实体端(Entry-Point),同时作为 Publisher, Subscriber 和 Topic 的工厂。
Publisher:是 DataWriter 的工厂和容器。
Subscriber:是 DataReader 的工厂和容器。
Topic:用于充当 Publisher 和 Subscriber 的通道。
DataWriter:用于分发数据的对象。
DataReader:用于接受数据的对象。
下面的图可以用来表示所有 DDS 实体的继承树:
共有实体属性
一些属性是所有实体都共享的:
实体 ID。每个实体都有一个独一的 ID。这在 DDS 实体和它对应的 RTPS 实体(若存在)中是相同的。此 ID 被储存在声明在实体基类的 实例句柄对象 中。实例句柄对象可以通过
get_instance_handle()
访问。QoS 政策。QoS 是一系列 policies 的容器。通过修改 policies 可以配置 DDS 实体的行为。QoS 可以在对象创建时设置,也可以在之后通过
set_qos()
接口进行更改。Listener。
Listener 是一个异步通知系统。当 DDS 实体的状态发生改变时,Listener 会调用相关的回调函数。
所有的实体类型都有一个抽象的 Listener 接口,其包含了相关的回调函数。用户可以自己实现 Listener 并在自己的程序中实现他们的回调函数,然后调用
set_listener()
函数设置 Listener。Listener 及其接口如下:Figure 5. Listener InheritanceNote: Listener 是无状态的,因而可以在多个实体对象中共享。
Status。每个实体都有一系列状态对象来代表它们的通信状态。当状态改变时,会通知相关的 Listener。
StatusCondition。每个实体都拥有一个 StatusCondition,当实体的 enabled 状态发生更改时,将会通知它。StatusCondition 提供了实体和 Wait-set 的链接。
Enabling 实体。所有的实体都能更改 enabled 状态。
Condition 和 Wait-set
Conditions(同 Wait-sets 一起)为应用提供了类似 IO 多路复用的状态通知机制。
此机制是基于等待的。使用方式一般如下:
应用指出需要等待哪些状态。这通过创建 Condition 对象并通过
attach_condition()
将其添加到 Wait-set 实现。状态对象一般有 GuardCondition, StatusCondition, ReadCondition。然后调用 Wait-set 的
wait()
函数等待有 Condition 对象变为 true。如果
wait()
返回,则可以通过其它步骤获取详细信息。
下面是一个示例:
class ApplicationJob
{
WaitSet wait_set_;
GuardCondition terminate_condition_;
std::thread thread_;
void main_loop()
{
// Main loop is repeated until the terminate condition is triggered
while (false == terminate_condition_.get_trigger_value())
{
// Wait for any of the conditions to be triggered
ReturnCode_t ret_code;
ConditionSeq triggered_conditions;
ret_code = wait_set_.wait(triggered_conditions, eprosima::fastrtps::c_TimeInfinite);
if (ReturnCode_t::RETCODE_OK != ret_code)
{
// ... handle error
continue;
}
// Process triggered conditions
for (Condition* cond : triggered_conditions)
{
StatusCondition* status_cond = dynamic_cast<StatusCondition*>(cond);
if (nullptr != status_cond)
{
Entity* entity = status_cond->get_entity();
StatusMask changed_statuses = entity->get_status_changes();
// Process status. Liveliness changed and data available are depicted as an example
if (changed_statuses.is_active(StatusMask::liveliness_changed()))
{
std::cout << "Liveliness changed reported for entity " << entity->get_instance_handle() <<
std::endl;
}
if (changed_statuses.is_active(StatusMask::data_available()))
{
std::cout << "Data avilable on reader " << entity->get_instance_handle() << std::endl;
FooSeq data_seq;
SampleInfoSeq info_seq;
DataReader* reader = static_cast<DataReader*>(entity);
// Process all the samples until no one is returned
while (ReturnCode_t::RETCODE_OK == reader->take(data_seq, info_seq,
LENGTH_UNLIMITED, ANY_SAMPLE_STATE,
ANY_VIEW_STATE, ANY_INSTANCE_STATE))
{
// Both info_seq.length() and data_seq.length() will have the number of samples returned
for (FooSeq::size_type n = 0; n < info_seq.length(); ++n)
{
// Only samples for which valid_data is true should be accessed
if (info_seq[n].valid_data)
{
// Process sample on data_seq[n]
}
}
// must return the loaned sequences when done processing
reader->return_loan(data_seq, info_seq);
}
}
}
}
}
}
public:
ApplicationJob(
const std::vector<DataReader*>& readers,
const std::vector<DataWriter*>& writers)
{
// Add a GuardCondition, so we can signal the processing thread to stop
wait_set_.attach_condition(terminate_condition_);
// Add the status condition of every reader and writer
for (DataReader* reader : readers)
{
wait_set_.attach_condition(reader->get_statuscondition());
}
for (DataWriter* writer : writers)
{
wait_set_.attach_condition(writer->get_statuscondition());
}
thread_ = std::thread(&ApplicationJob::main_loop, this);
}
~ApplicationJob()
{
// Signal the GuardCondition to force the WaitSet to wake up
terminate_condition_.set_trigger_value(true);
// Wait for the thread to finish
thread_.join();
}
};
// Application initialization
ReturnCode_t ret_code;
std::vector<DataReader*> application_readers;
std::vector<DataWriter*> application_writers;
// Create the participant, topics, readers, and writers.
ret_code = create_dds_application(application_readers, application_writers);
if (ReturnCode_t::RETCODE_OK != ret_code)
{
// ... handle error
return;
}
{
ApplicationJob main_loop_thread(application_readers, application_writers);
// ... wait for application termination signaling (signal handler, user input, etc)
// ... Destructor of ApplicationJob takes care of stopping the processing thread
}
// Destroy readers, writers, topics, and participant
destroy_dds_application();