0


Fast-DDS共享内存(SHM)通信

一、概述

     ROS2是建立在DDS程序的基础上的,在ROS2中DDS程序被用来发现节点,序列化和传递信息。ROS2 支持多种DDS的实现方式;在ROS2中,RMW是实现节点间通信的中间件。RMW负责传输ROS 2消息,并在节点之间建立连接。RMW支持多种不同的通信协议,包括DDS(Data Distribution Service)和ROS 1中使用的TCPROS和UDPROS协议。DDS提供了一种可靠的、分布式的通信机制,允许节点在不同的计算机上运行,而且不需要在节点之间建立直接的连接。

ROS2架构图

    DDS是一个被很多公司实现的工业标准,比如RTI的实现Connext和eProsima的实现Fast-RTPS。Fast-DDS 是一种发布/订阅(pub/sub)通信模型的中间件,主要目标是为分布式系统提供高效、可靠的实时数据交换。发布者(publisher)负责生成数据,而订阅者(subscriber)负责在需要时接收数据。发布者和订阅者之间的通信通过主题(topic)进行,这是一种具有相同数据类型和名称的消息通道。

Fast-DDS层级模型架构图

    DDS中数据的传输通过传输层实现,DDS定义了一个传输API,并且可以运行实现该API的插件;这样DDS就不局限于特定的传输,应用程序可以根据需要选择或实现适合需求的传输层插件。传输层位于Fast-DDS层级模型的最底层,传输层可以使用多种不同的传输协议。Fast-DDS具有五种已实现的传输插件:UDPv4、UDPv6、TCPv4、TCPv6和SHM。

                                                                                          ROS2调度过程

Fast-DDS通信组件

    Fast-RTPS信息交互的单元是change,change代表了一个topic的更新,节点将更新记录到历史消息中,历史消息可以还爨最近的change。通过写节点发布一个change将会发生如下过程:
  1. change被添加到writer的HistoryCache中;
  2. writer通知每一个reader;
  3. 感兴趣的reader接收change;
  4. 更新后,reader将新的change加入HistoryCache中。

通过选择Qos决定HistoryCache的管理方式。

二、Fast-DDS数据传输

2.1 共享内存通信

    共享内存(SHM)传输依靠主机操作系统提供的共享内存机制,实现了在同一处理单元/机器中运行的实体之间的快速通信。共享内存传输提供了一种比网络传输更好的性能,即使网络通信使用回环接口(loopback interface);其主要原因包括以下几个方面:
  1. 支持大消息传输:网络协议需要对数据进行分段,以符合特定的协议和网络堆栈要求,从而增加通信开销;SHM传输允许复制完整消息,其中唯一的大小限制是机器的内存容量。
  2. 减少内存拷贝:当向不同的端点发送相同的消息时,SHM传输可以直接与所有目标端点共享相同的内存缓冲区;而其他协议要求每个端点执行一个消息副本。
  3. 更小的操作系统开销:一旦初始设置完成,共享内存传输所需的系统调用就比其他协议少得多;因此,通过使用SHM,存在性能/时间消耗增益。

共享内存传输的序列图

  • Segment:Segment对象是一块可以被不同进程访问的共享内存块,每一个配置了SHM传输的DomainParticipant都会创建一段共享内存。DomainParticipant向该Segment写入它需要传递给其他DomainParticipants的数据,远程的DomainParticipients能够使用共享内存机制直接读取它。每个段都有一个segmentId,一个16个字符的UUID,用于唯一标识每个共享内存段,这些segmentId用于识别和访问每个DomainParticipant的Segment。
  • Segment Buffer:在共享内存段中分配的缓冲区,它充当放置在Segment中的DDS消息的容器;换句话说,DomainParticipant在Segment上写入的每条消息都将被放置在不同的缓冲区中。
  • Buffer Descriptor:它充当指向特定段中特定段缓冲区的指针,它包含segmentId和Segment Buffer相对于Segment底部的偏移量。当将消息传递给其他DomainParticipants时,共享内存传输仅分发缓冲区描述符,从而避免将消息拷贝;有了这个描述符,接收DomainParticipant可以访问写入缓冲区的消息,因为它唯一地标识Segment(通过segmentId)和Segment buffer(通过其偏移量)。
  • Port:表示用于通信缓冲区描述符的通道。它被实现为共享内存中的环形缓冲区,因此任何DomainParticipant都可以潜在地从上面读取或写入信息。每个端口都有一个唯一的标识符,一个32位的数字,可以用来指代端口。每个配置共享内存传输的DomainParticipant都会创建一个端口来接收缓冲区描述符。此端口的标识符在Discovery过程共享给其他DomainParticipant。

共享内存通信UML

  • SharedMemoryTransport: 共享内存数据传输组件,继承自TransportInterface并实现相关业务逻辑。

  • SharedMemoryManager: 通过实例化SharedMemoryManager对象,应用程序可以访问内部维护的共享内存资源。每个进程至少需要一个内存管理器。SharedMemoryManager为应用程序提供了创建共享内存段、在段中分配数据缓冲区、将缓冲区描述符推送到共享内存端口以及创建与端口相关联的监听器的功能。

  • Port: 由port_id标识的通信信道,通过该通道,缓存区描述符可以被发送到其他进程。

                                            ![](https://i-blog.csdnimg.cn/direct/0e248e63b84044ea918348d36462bdf4.png)
    

数据发布流程图

Fast-DDS共享内存通信数据发布时序图

2.2 关键过程source code

**拷贝消息到共享内存**

src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp

// 拷贝消息到共享内存
std::shared_ptr<SharedMemManager::Buffer> SharedMemTransport::copy_to_shared_buffer(
const octet* send_buffer, uint32_t send_buffer_size, const std::chrono::steady_clock::time_point& max_blocking_time_point)
{
    assert(shared_mem_segment_);
    std::shared_ptr<SharedMemManager::Buffer> shared_buffer =
    shared_mem_segment_->alloc_buffer(send_buffer_size, max_blocking_time_point);
    memcpy(shared_buffer->data(), send_buffer, send_buffer_size);
    return shared_buffer;
}

src/cpp/rtps/transport/shared_mem/SharedMemTransport.cpp

// 将消息发送给订阅端的Port
bool SharedMemTransport::push_discard(const std::shared_ptr<SharedMemManager::Buffer>& buffer, const Locator& remote_locator)
{
    try
    {
        if (!find_port(remote_locator.port)->try_push(buffer))
        {
            logInfo(RTPS_MSG_OUT, "Port " << remote_locator.port << " full. Buffer dropped");
        }
    }

    catch (const std::exception& error)
    {
        logWarning(RTPS_MSG_OUT, error.what());
        return false;
    }

    return true;
}
**将消息发送给订阅端的Port**

src/cpp/rtps/transport/shared_mem/SharedMemManager.hpp

bool try_push(const std::shared_ptr<Buffer>& buffer)
{
    assert(std::dynamic_pointer_cast<SharedMemBuffer>(buffer));
    SharedMemBuffer* shared_mem_buffer = std::static_pointer_cast<SharedMemBuffer>(buffer).get();
    bool ret;
    bool are_listeners_active = false;
    auto validity_id = shared_mem_buffer->validity_id();
    shared_mem_buffer->inc_enqueued_count(validity_id);

    try
    {
        ret = global_port_->try_push( {shared_mem_buffer->segment_id(), shared_mem_buffer->node_offset(), validity_id}, &are_listeners_active);

        if (!are_listeners_active)
        {
            shared_mem_buffer->dec_enqueued_count(validity_id);
        }
    }

    catch (std::exception& e)
    {
        shared_mem_buffer->dec_enqueued_count(validity_id);
        if (!global_port_->is_port_ok())
        {
            logWarning(RTPS_TRANSPORT_SHM, "SHM Port " << global_port_->port_id() << " failure: " << e.what());
            regenerate_port();
            ret = false;
        }
        else
        {
            throw;
        }
    }

    return ret;
}

src/cpp/rtps/transport/shared_mem/SharedMemGlobal.hpp

/**
 * Try to enqueue a buffer descriptor in the port.
 * If the port queue is full returns inmediatelly with false value.
 * @param[in] buffer_descriptor buffer descriptor to be enqueued
 * @param[out] listeners_active false if no active listeners => buffer not enqueued
 * @return false in overflow case, true otherwise.
 */

bool try_push(const BufferDescriptor& buffer_descriptor, bool* listeners_active)
{
    std::unique_lock<SharedMemSegment::mutex> lock_empty(node_->empty_cv_mutex);
    if (!node_->is_port_ok)
    {
        throw std::runtime_error("the port is marked as not ok!");
    }

    try
    {
        bool was_opened_as_unicast_port = node_->is_opened_read_exclusive;
        bool was_buffer_empty_before_push = buffer_->is_buffer_empty();
        bool was_someone_listening = (node_->waiting_count > 0);
        *listeners_active = buffer_->push(buffer_descriptor);
        lock_empty.unlock();
        logInfo(RTPS_TRANSPORT_SHM, THREADID << "port: " << node_->port_id << ", opened as unicast? " << was_opened_as_unicast_port << ", was_buffer_empty_before_push: "<< was_buffer_empty_before_push);

        if (was_someone_listening)
        {
            if (was_opened_as_unicast_port)
            {
                notify_unicast(was_buffer_empty_before_push);
            }
            else
            {
                notify_multicast();
            }
        }

        return true;
    }

    catch (const std::exception&)
    {
        lock_empty.unlock();
        overflows_count_++;
    }

    return false;
}

三、参考链接


本文转载自: https://blog.csdn.net/John_ToStr/article/details/140557093
版权归原作者 ปรัชญา แค้วคำมูล 所有, 如有侵权,请联系我们删除。

“Fast-DDS共享内存(SHM)通信”的评论:

还没有评论