经过节点、话题、服务、动作、参数的基本学习,我们可以得到如下总结:
ros2是一个控制机器人的系统,每个机器人,或者复杂机器人的每个部件都可以看作是一个节点,节点这里被我们定义为能够执行特定功能的最小单位。而ros2就是负责统筹、从整体层面去控制它们的。
进行控制的本质就是将信息传递给每个节点,从而告知每个节点现在应该进行什么动作。而告知这个行为的实现,以话题、服务、动作为典型。
实操案例
我们现在需要实际使用这几个功能的实现,从而进一步理解它们的功能。
我们设计如下的功能系统:
A和B节点发送消息给C。其中B节点是由机器自动发送,为了确定C节点收到消息,需要收到C节点的返回消息。A节点中有参数,当我们通过命令行手动改变其参数时,A节点发送消息给C。
C节点在收到A和B的消息后,告知D进行一个累加动作,累加次数由用户在A节点中发送的消息决定。最终由D节点完成累加动作,期间周期给C节点反馈feedback消息,告知累加进度。
在这个系统中,A节点只需要传递消息即可,我们使用话题。B节点因为需要收到返回消息,我们使用服务。C节点对D节点的传输因为需要反馈消息,我们使用动作。
需要注意的是,这只是个帮助温习前面内容的实操,很多地方考虑欠妥,不具备实际可行性
需要注意的是,这只是个帮助温习前面内容的实操,很多地方考虑欠妥,不具备实际可行性
需要注意的是,这只是个帮助温习前面内容的实操,很多地方考虑欠妥,不具备实际可行性
本人不是老手,程序仍有不足,嘴下留情!
本人不是老手,程序仍有不足,嘴下留情!
本人不是老手,程序仍有不足,嘴下留情!
准备工作
首先我们来创建一个工作空间。建立一个文件夹,并在其中创建src文件夹,并$ colcon build
在home中.bashrc文件最末尾加入source ~/#{这里是工作区名}/install/local_setup.sh
在命令行中进入src目录,输入$ ros2 pkg create --build-type ament_python abcd,以创建功能包。在功能包的abcd文件夹(abcd是我瞎起的)中创建四个节点文件,nodeAnodeBnodeCnodeD
接口准备
命令行进入src目录,输入如下命令创建
$ ros2 pkg create --build-type ament_cmake interfaces
在得到的功能包中创建目录msg、srv、action
节点A编写
下面我们编写节点A。这部分的要求很简单:设定一个参数,默认值为0,一旦手动修改其值至非0就将这个数通过话题的形式传递出去。
节点A的接口编写
接口功能包msg存放消息,在msg目录中创建AtoC.msg,在其中添加
int64 x
在package.xml中添加依赖:
我们需要把创建接口时用到的依赖写在其中,例如:
<depend>#{此处为写代码时用到的其他依赖,没有可以不填}</depend>
<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
在CMakeLists.txt设置编译信息:
在find dependencies下方加入如下代码
find_package(rosidl_default_generators REQUIRED)
然后在其下方添加如下代码以生成接口:
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/AtoC.msg"
)
此处额外进行说明!:
若接口中包含引用项,如
geometry_msgs/Point center
则在依赖中要加入
find_package(geometry_msgs REQUIRED)
节点A的代码编写
import rclpy
from rclpy.node import Node
from std_msgs.msg import Int64
import time
from interfaces.msg import AtoC
class Anode(Node):
def __init__(self,name):
super().__init__(name)
self.publisher=self.create_publisher(AtoC,'AtoC',10)
self.timer=self.create_timer(1,self.callback)
self.declare_parameter('x',0)
def callback(self):
num=self.get_parameter('x').get_parameter_value().integer_value
pub=AtoC()
pub.x=num
if(num!=0):
self.publisher.publish(pub)
self.get_logger().info('published %d!now wait for 5s'%num)
self.set_parameters([rclpy.Parameter('x',rclpy.Parameter.Type.INTEGER,0)])
time.sleep(5)
else:
self.get_logger().info('x is 0! Waiting for changes...')
def main(args=None):
rclpy.init(args=args)
node=Anode('Anode')
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
在这段代码中,我们创建了一个Anode节点类,在构造函数中创建了发布者对象、计时器对象、参数x。
在计时器对象的回调函数中,我们每隔1秒查询一次参数x的值,若x不为0,创建一个接口对象,将其中的x赋值,最终发送给话题AtoC。
节点B编写
节点B中,我们需要将消息定时发送给节点C,通过服务的方式。在这里,B为客户端,C为服务端。
节点B的接口编写
在这个数据传输中,我们需要传入一个bool值,返回一个bool值。
与A节点的接口编写类似,我们在interfaces/srv中创建一个BtoC.srv文件,其中写入如下代码:
bool go
---
bool received
依赖此前已经加入,我们只需要添加编译信息即可。在CMakeLists.txt中相应部分修改成如下代码:
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/AtoC.msg"
"srv/BtoC.srv"
)
节点B的代码编写
import rclpy
from rclpy.node import Node
import time
from interfaces.srv import BtoC
class Bnode(Node):
def __init__(self,name):
super().__init__(name)
self.client=self.create_client(BtoC,'BtoC')
while(not self.client.wait_for_service(1)):
self.get_logger().info("not connected! still waiting...")
self.get_logger().info("successfully connected!")
self.timer=self.create_timer(1,self.timer_callback)
def timer_callback(self):
self.request=BtoC.Request()
self.request.go=True
self.get_logger().info("send: 1")
self.future=self.client.call_async(self.request)
if(self.future.done()):
try:
response=self.future.result()
except Exception as err:
self.get_logger().info("error with response: %s" % err)
else:
if(response.recieved==True):
self.get_logger().info("message successfully delivered!")
time.sleep(5)
else:
self.get_logger().info("ERROR! error with message feedback, maybe is because CtoD is running")
def main(args=None):
rclpy.init(args=args)
node=Bnode("Bnode")
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
在这段代码中,我们还是建立了一个节点B的类Bnode,在其构造函数中我们建立了客户端对象并等待其连接,建立计时器对象,调用回调函数以实现每隔一秒发送一次请求。
在回调函数timer_callback中,我们创建了一个BtoC的接口实现,并赋值为真。将其用异步的方式发送,发送后用done()函数判断是否完成,并分不同情况进行输出。
节点C编写
节点C需要做的工作相对复杂:节点C需要订阅来自A发布者的话题、作为BC服务的服务端、在收到以上两个节点的数据后作为客户端激活C到D的动作。
节点C到D的接口编写
在interfaces/action中创建文件CtoD.action,在其中写入代码:
int64 num
---
bool finish
---
int64 x
修改CMakeLists.txt中对应部分至如下:
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/AtoC.msg"
"srv/BtoC.srv"
"action/CtoD.action"
)
节点C的代码编写
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from interfaces.action import CtoD
from interfaces.msg import AtoC
from interfaces.srv import BtoC
class Cnode(Node):
def __init__(self,name):
super().__init__(name)
self.topic_sub=self.create_subscription(AtoC,"AtoC",self.topic_callback,10)
self.service_server=self.create_service(BtoC,"BtoC",self.service_callback)
self.action_client=ActionClient(self,CtoD,"CtoD")
self.a=0
self.b=0
self.begin=0
self.timer_CtoD=self.create_timer(1,self.CtoD_callback)
def topic_callback(self,msg):
if(self.begin==0):
if(not msg.x==0):
self.a=msg.x
self.get_logger().info("successfully got message from nodeA! msg is %d"%self.a)
def service_callback(self,request,response):
response.received=False
if(self.begin==0):
if(request.go==True):
self.b=1
self.get_logger().info("successfully got message from nodeB!")
response.received=True
return response
def CtoD_callback(self):
if((not self.a==0)and(self.b==1)):
self.begin=1
self.get_logger().info("successfully activated! Now begin CtoD!")
msg=CtoD.Goal()
msg.num=self.a
self.action_client.wait_for_server()
self.future=self.action_client.send_goal_async(msg,self.feedback_callback)
self.future.add_done_callback(self.action_link_callback)
def feedback_callback(self,msg):
self.get_logger().info("successfully get callback! msg is %d"%msg.feedback.x)
def action_link_callback(self,Result):
doGoal=Result.result()
if not doGoal.accepted:
self.get_logger().info("Goal rejected!")
return
self.get_logger().info("Goal begin!")
self.getResult=doGoal.get_result_async()
self.getResult.add_done_callback(self.done_callback)
def done_callback(self,msg):
if(not msg.result().result.finish==0):
self.get_logger().info("Goal finished! result is %d"%msg.result().result.finish)
self.begin=0
self.a=0
self.b=0
def main(args=None):
rclpy.init(args=args)
node=Cnode("Cnode")
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
在这段代码中,节点类Cnode包含三大部分:
由topic_sub、topic_callback组成的接收A节点话题消息的部分。此部分在收到非0数值后会将类成员变量a修改为相应值。
由service_server、service_callback组成的接受B节点消息的部分。此部分在收到1数值后需要返回一个response,内容为1,以告知对方接收成功。而若C至D在运行,此部分需要返回0
由action_client、timer_CtoD、CtoD_callback、feedback_callback、action_link_callback、done_callback构成的部分,用于向D节点发送消息。在这部分中,通过一个计时器timer_CtoD循环判定a和b是否都不为0,若不为0则将self.begin修改为1,以阻止前两个部分继续运行。而后将A发送来的值作为变量发送给节点D,执行动作。动作执行完毕后,重新将self.begin、self.a、self.b值设为0,以便继续接受数据。
节点D编写
因为上文已经完成了C到D的接口,这里无需创建接口
在节点D中,我们的目的是将得到的值累加,每次累加返回给C反馈,最终完成时将结果返回。
代码实现:
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer
from interfaces.action import CtoD
import time
class Dnode(Node):
def __init__(self,name):
super().__init__(name)
self.server=ActionServer(self,CtoD,'CtoD',self.server_callback)
self.x=0
def server_callback(self,goal):
self.get_logger().info("get goal")
feedback=CtoD.Feedback()
for i in range(goal.request.num):
self.x=self.x+i
feedback.x=self.x
goal.publish_feedback(feedback)
time.sleep(1)
goal.succeed()
result=CtoD.Result()
result.finish=self.x
self.x=0
return result
def main(args=None):
rclpy.init(args=args)
node=Dnode("Dnode")
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
这段程序中,我们创捷节点类Dnode,在这里我们创建了服务端实例server,以及累加结果x。
在回调函数中,我们重复对x进行累加,每次累加后将值打包成反馈回给节点C。最终结束时将结果给到节点C。
最终不要忘记在setup.py中相应部分改成如下:
entry_points={
'console_scripts': [
'nodeA = abcd.nodeA:main',
'nodeB = abcd.nodeB:main',
'nodeC = abcd.nodeC:main',
'nodeD=abcd.nodeD:main'
],
},
实操测试
一切步骤完成后,我们开始进行以上工程的运行测试,
control+alt+T开启五个命令行,其中一个命令行进入我们的工作空间,输入$ colcon build进行编译
剩余四个命令行分别输入
$ ros2 run abcd nodeA
$ ros2 run abcd nodeB
$ ros2 run abcd nodeC
$ ros2 run abcd nodeD
以启动节点
接下来,在第一个命令行,即空余的命令行中输入
$ ros2 param set Anode x 5
将x参数改为5,激活AtoB数据传输,从而激活C节点对D节点动作的控制
即可开始运行项目!
版权归原作者 tr! 所有, 如有侵权,请联系我们删除。