0


ROS2-简单的节点之间交互实操案例

经过节点、话题、服务、动作、参数的基本学习,我们可以得到如下总结:

ros2是一个控制机器人的系统,每个机器人,或者复杂机器人的每个部件都可以看作是一个节点,节点这里被我们定义为能够执行特定功能的最小单位。而ros2就是负责统筹、从整体层面去控制它们的。

进行控制的本质就是将信息传递给每个节点,从而告知每个节点现在应该进行什么动作。而告知这个行为的实现,以话题、服务、动作为典型。

实操案例

我们现在需要实际使用这几个功能的实现,从而进一步理解它们的功能。

我们设计如下的功能系统:

A和B节点发送消息给C。其中B节点是由机器自动发送,为了确定C节点收到消息,需要收到C节点的返回消息。A节点中有参数,当我们通过命令行手动改变其参数时,A节点发送消息给C。
C节点在收到A和B的消息后,告知D进行一个累加动作,累加次数由用户在A节点中发送的消息决定。最终由D节点完成累加动作,期间周期给C节点反馈feedback消息,告知累加进度。

在这个系统中,A节点只需要传递消息即可,我们使用话题。B节点因为需要收到返回消息,我们使用服务。C节点对D节点的传输因为需要反馈消息,我们使用动作。

需要注意的是,这只是个帮助温习前面内容的实操,很多地方考虑欠妥,不具备实际可行性

需要注意的是,这只是个帮助温习前面内容的实操,很多地方考虑欠妥,不具备实际可行性

需要注意的是,这只是个帮助温习前面内容的实操,很多地方考虑欠妥,不具备实际可行性

本人不是老手,程序仍有不足,嘴下留情!

本人不是老手,程序仍有不足,嘴下留情!

本人不是老手,程序仍有不足,嘴下留情!

准备工作

首先我们来创建一个工作空间。建立一个文件夹,并在其中创建src文件夹,并$ colcon build

在home中.bashrc文件最末尾加入source ~/#{这里是工作区名}/install/local_setup.sh

在命令行中进入src目录,输入$ ros2 pkg create --build-type ament_python abcd,以创建功能包。在功能包的abcd文件夹(abcd是我瞎起的)中创建四个节点文件,nodeAnodeBnodeCnodeD

接口准备

命令行进入src目录,输入如下命令创建

$ ros2 pkg create --build-type ament_cmake interfaces

在得到的功能包中创建目录msg、srv、action

节点A编写

下面我们编写节点A。这部分的要求很简单:设定一个参数,默认值为0,一旦手动修改其值至非0就将这个数通过话题的形式传递出去。

节点A的接口编写

接口功能包msg存放消息,在msg目录中创建AtoC.msg,在其中添加

int64 x
在package.xml中添加依赖:

我们需要把创建接口时用到的依赖写在其中,例如:

<depend>#{此处为写代码时用到的其他依赖,没有可以不填}</depend>
<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
在CMakeLists.txt设置编译信息:

在find dependencies下方加入如下代码

find_package(rosidl_default_generators REQUIRED)

然后在其下方添加如下代码以生成接口:

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/AtoC.msg"
 )
此处额外进行说明!:

若接口中包含引用项,如

geometry_msgs/Point center

则在依赖中要加入

find_package(geometry_msgs REQUIRED)

节点A的代码编写

import rclpy
from rclpy.node import Node
from std_msgs.msg import Int64
import time
from interfaces.msg import AtoC
class Anode(Node):
    def __init__(self,name):
        super().__init__(name)
        self.publisher=self.create_publisher(AtoC,'AtoC',10)
        self.timer=self.create_timer(1,self.callback)
        self.declare_parameter('x',0)
    def callback(self):
        num=self.get_parameter('x').get_parameter_value().integer_value
        pub=AtoC()
        pub.x=num
        if(num!=0):
            self.publisher.publish(pub)
            self.get_logger().info('published %d!now wait for 5s'%num)
            self.set_parameters([rclpy.Parameter('x',rclpy.Parameter.Type.INTEGER,0)])
            time.sleep(5)
        else:
            self.get_logger().info('x is 0! Waiting for changes...')
def main(args=None):
    rclpy.init(args=args)
    node=Anode('Anode')
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

在这段代码中,我们创建了一个Anode节点类,在构造函数中创建了发布者对象、计时器对象、参数x。

在计时器对象的回调函数中,我们每隔1秒查询一次参数x的值,若x不为0,创建一个接口对象,将其中的x赋值,最终发送给话题AtoC。

节点B编写

节点B中,我们需要将消息定时发送给节点C,通过服务的方式。在这里,B为客户端,C为服务端。

节点B的接口编写

在这个数据传输中,我们需要传入一个bool值,返回一个bool值。

与A节点的接口编写类似,我们在interfaces/srv中创建一个BtoC.srv文件,其中写入如下代码:

bool go
---
bool received

依赖此前已经加入,我们只需要添加编译信息即可。在CMakeLists.txt中相应部分修改成如下代码:

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/AtoC.msg"
  "srv/BtoC.srv"
 )

节点B的代码编写

import rclpy
from rclpy.node import Node
import time
from interfaces.srv import BtoC
class Bnode(Node):
    def __init__(self,name):
        super().__init__(name)
        self.client=self.create_client(BtoC,'BtoC')
        while(not self.client.wait_for_service(1)):
            self.get_logger().info("not connected! still waiting...")
        self.get_logger().info("successfully connected!")
        self.timer=self.create_timer(1,self.timer_callback)
    def timer_callback(self):
        self.request=BtoC.Request()
        self.request.go=True
        self.get_logger().info("send: 1")
        self.future=self.client.call_async(self.request)
        if(self.future.done()):
            try:
                response=self.future.result()
            except Exception as err:
                self.get_logger().info("error with response: %s" % err)
            else:
                if(response.recieved==True):
                    self.get_logger().info("message successfully delivered!")
                    time.sleep(5)
                else:
                    self.get_logger().info("ERROR! error with message feedback, maybe is because CtoD is running")
def main(args=None):
    rclpy.init(args=args)
    node=Bnode("Bnode")
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

在这段代码中,我们还是建立了一个节点B的类Bnode,在其构造函数中我们建立了客户端对象并等待其连接,建立计时器对象,调用回调函数以实现每隔一秒发送一次请求。

在回调函数timer_callback中,我们创建了一个BtoC的接口实现,并赋值为真。将其用异步的方式发送,发送后用done()函数判断是否完成,并分不同情况进行输出。

节点C编写

节点C需要做的工作相对复杂:节点C需要订阅来自A发布者的话题、作为BC服务的服务端、在收到以上两个节点的数据后作为客户端激活C到D的动作。

节点C到D的接口编写

在interfaces/action中创建文件CtoD.action,在其中写入代码:

int64 num
---
bool finish
---
int64 x

修改CMakeLists.txt中对应部分至如下:


find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/AtoC.msg"
  "srv/BtoC.srv"
  "action/CtoD.action"
 )

节点C的代码编写

import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from interfaces.action import CtoD
from interfaces.msg import AtoC
from interfaces.srv import BtoC
class Cnode(Node):
    def __init__(self,name):
        super().__init__(name)
        self.topic_sub=self.create_subscription(AtoC,"AtoC",self.topic_callback,10)
        self.service_server=self.create_service(BtoC,"BtoC",self.service_callback)
        self.action_client=ActionClient(self,CtoD,"CtoD")
        self.a=0
        self.b=0
        self.begin=0
        self.timer_CtoD=self.create_timer(1,self.CtoD_callback)
    def topic_callback(self,msg):
        if(self.begin==0):
            if(not msg.x==0):
                self.a=msg.x
                self.get_logger().info("successfully got message from nodeA! msg is %d"%self.a)
    def service_callback(self,request,response):
        response.received=False
        if(self.begin==0):
            if(request.go==True):
                self.b=1
                self.get_logger().info("successfully got message from nodeB!")
                response.received=True
        return response
    def CtoD_callback(self):
        if((not self.a==0)and(self.b==1)):
            self.begin=1
            self.get_logger().info("successfully activated! Now begin CtoD!")
            msg=CtoD.Goal()
            msg.num=self.a
            self.action_client.wait_for_server()
            self.future=self.action_client.send_goal_async(msg,self.feedback_callback)
            self.future.add_done_callback(self.action_link_callback)
    def feedback_callback(self,msg):
        self.get_logger().info("successfully get callback! msg is %d"%msg.feedback.x)
    def action_link_callback(self,Result):
        doGoal=Result.result()
        if not doGoal.accepted:
            self.get_logger().info("Goal rejected!")
            return
        self.get_logger().info("Goal begin!")
        self.getResult=doGoal.get_result_async()
        self.getResult.add_done_callback(self.done_callback)
    def done_callback(self,msg):
        if(not msg.result().result.finish==0):
            self.get_logger().info("Goal finished! result is %d"%msg.result().result.finish)
            self.begin=0
            self.a=0
            self.b=0
def main(args=None):
    rclpy.init(args=args)
    node=Cnode("Cnode")
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

在这段代码中,节点类Cnode包含三大部分:

由topic_sub、topic_callback组成的接收A节点话题消息的部分。此部分在收到非0数值后会将类成员变量a修改为相应值。

由service_server、service_callback组成的接受B节点消息的部分。此部分在收到1数值后需要返回一个response,内容为1,以告知对方接收成功。而若C至D在运行,此部分需要返回0

由action_client、timer_CtoD、CtoD_callback、feedback_callback、action_link_callback、done_callback构成的部分,用于向D节点发送消息。在这部分中,通过一个计时器timer_CtoD循环判定a和b是否都不为0,若不为0则将self.begin修改为1,以阻止前两个部分继续运行。而后将A发送来的值作为变量发送给节点D,执行动作。动作执行完毕后,重新将self.begin、self.a、self.b值设为0,以便继续接受数据。

节点D编写

因为上文已经完成了C到D的接口,这里无需创建接口

在节点D中,我们的目的是将得到的值累加,每次累加返回给C反馈,最终完成时将结果返回。

代码实现:

import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer
from interfaces.action import CtoD
import time
class Dnode(Node):
    def __init__(self,name):
        super().__init__(name)
        self.server=ActionServer(self,CtoD,'CtoD',self.server_callback)
        self.x=0
    def server_callback(self,goal):
        self.get_logger().info("get goal")
        feedback=CtoD.Feedback()
        for i in range(goal.request.num):
            self.x=self.x+i
            feedback.x=self.x
            goal.publish_feedback(feedback)
            time.sleep(1)
        goal.succeed()
        result=CtoD.Result()
        result.finish=self.x
        self.x=0
        return result
def main(args=None):
    rclpy.init(args=args)
    node=Dnode("Dnode")
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

这段程序中,我们创捷节点类Dnode,在这里我们创建了服务端实例server,以及累加结果x。

在回调函数中,我们重复对x进行累加,每次累加后将值打包成反馈回给节点C。最终结束时将结果给到节点C。

最终不要忘记在setup.py中相应部分改成如下:

entry_points={
        'console_scripts': [
         'nodeA  = abcd.nodeA:main',
         'nodeB = abcd.nodeB:main',
         'nodeC = abcd.nodeC:main',
         'nodeD=abcd.nodeD:main'
        ],
    },

实操测试

一切步骤完成后,我们开始进行以上工程的运行测试,

control+alt+T开启五个命令行,其中一个命令行进入我们的工作空间,输入$ colcon build进行编译

剩余四个命令行分别输入

$ ros2 run abcd nodeA

$ ros2 run abcd nodeB

$ ros2 run abcd nodeC

$ ros2 run abcd nodeD

以启动节点

接下来,在第一个命令行,即空余的命令行中输入

$ ros2 param set Anode x 5

将x参数改为5,激活AtoB数据传输,从而激活C节点对D节点动作的控制

即可开始运行项目!

标签: python linux 机器人

本文转载自: https://blog.csdn.net/weixin_44334877/article/details/139868457
版权归原作者 tr! 所有, 如有侵权,请联系我们删除。

“ROS2-简单的节点之间交互实操案例”的评论:

还没有评论