参考:
automaticaddison.com/getting-started-with-opencv-in-ros-2-galactic-python/
automaticaddison.com/getting-started-with-opencv-in-ros-2-foxy-fitzroy-python/
推荐使用:YOLOX + ROS2 object detection package
也可以参考:github.com/jeffreyttc/opencv_ros2
vision_opencv
ros2 vision_opencv 包含将 ROS 2 与 OpenCV 接口的包,OpenCV 是一个专为计算效率和实时计算机视觉应用程序而设计的库。 该存储库包含:
- cv_bridge:ROS 2 图像消息和 OpenCV 图像表示之间的桥梁
- image_geometry:处理图像和像素几何的方法集合
- opencv_tests:集成测试以使用带有 opencv 的包的功能
- vision_opencv:安装 cv_bridge 和 image_geometry 的元包
为了将 ROS 2 与 OpenCV 一起使用,请参阅 cv_bridge 包中的详细信息。
程序适用于foxy/galactic/humble,windows/linux系统通用
在本教程中,将学习如何将 ROS 2 与流行的计算机视觉库 OpenCV 接口的基础知识。 这些基础知识将提供在机器人应用程序中添加视觉的基础。
我们将创建一个图像发布者节点以将网络摄像头数据(即视频帧)发布到一个主题,我们将创建一个订阅该主题的图像订阅者节点。
pub发布:
foxygalactic
# Basic ROS 2 program to publish real-time streaming
# video from your built-in webcam
# Author:
# - Addison Sears-Collins
# Import the necessary libraries
import
rclpy
# Python Client Library for ROS 2
from
rclpy.node
import
Node
# Handles the creation of nodes
from
sensor_msgs.msg
import
Image
# Image is the message type
from
cv_bridge
import
CvBridge
# Package to convert between ROS and OpenCV Images
import
cv2
# OpenCV library
class
ImagePublisher(Node):
"""
Create an ImagePublisher class, which is a subclass of the Node class.
"""
def
__init__(
self
):
"""
Class constructor to set up the node
"""
# Initiate the Node class's constructor and give it a name
super
().__init__(
'image_publisher'
)
# Create the publisher. This publisher will publish an Image
# to the video_frames topic. The queue size is 10 messages.
self
.publisher_
=
self
.create_publisher(Image,
'video_frames'
,
10
)
# We will publish a message every 0.1 seconds
timer_period
=
0.1
# seconds
# Create the timer
self
.timer
=
self
.create_timer(timer_period,
self
.timer_callback)
# Create a VideoCapture object
# The argument '0' gets the default webcam.
self
.cap
=
cv2.VideoCapture(
0
)
# Used to convert between ROS and OpenCV images
self
.br
=
CvBridge()
def
timer_callback(
self
):
"""
Callback function.
This function gets called every 0.1 seconds.
"""
# Capture frame-by-frame
# This method returns True/False as well
# as the video frame.
ret, frame
=
self
.cap.read()
if
ret
=
=
True
:
# Publish the image.
# The 'cv2_to_imgmsg' method converts an OpenCV
# image to a ROS 2 image message
self
.publisher_.publish(
self
.br.cv2_to_imgmsg(frame))
# Display the message on the console
self
.get_logger().info(
'Publishing video frame'
)
def
main(args
=
None
):
# Initialize the rclpy library
rclpy.init(args
=
args)
# Create the node
image_publisher
=
ImagePublisher()
# Spin the node so the callback function is called.
rclpy.spin(image_publisher)
# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
image_publisher.destroy_node()
# Shutdown the ROS client library for Python
rclpy.shutdown()
if
__name__
=
=
'__main__'
:
main()
# Basic ROS 2 program to publish real-time streaming
# video from your built-in webcam
# Author:
# - Addison Sears-Collins
# Import the necessary libraries
import
rclpy
# Python Client Library for ROS 2
from
rclpy.node
import
Node
# Handles the creation of nodes
from
sensor_msgs.msg
import
Image
# Image is the message type
from
cv_bridge
import
CvBridge
# Package to convert between ROS and OpenCV Images
import
cv2
# OpenCV library
class
ImagePublisher(Node):
"""
Create an ImagePublisher class, which is a subclass of the Node class.
"""
def
__init__(
self
):
"""
Class constructor to set up the node
"""
# Initiate the Node class's constructor and give it a name
super
().__init__(
'image_publisher'
)
# Create the publisher. This publisher will publish an Image
# to the video_frames topic. The queue size is 10 messages.
self
.publisher_
=
self
.create_publisher(Image,
'video_frames'
,
10
)
# We will publish a message every 0.1 seconds
timer_period
=
0.1
# seconds
# Create the timer
self
.timer
=
self
.create_timer(timer_period,
self
.timer_callback)
# Create a VideoCapture object
# The argument '0' gets the default webcam.
self
.cap
=
cv2.VideoCapture(
0
)
# Used to convert between ROS and OpenCV images
self
.br
=
CvBridge()
def
timer_callback(
self
):
"""
Callback function.
This function gets called every 0.1 seconds.
"""
# Capture frame-by-frame
# This method returns True/False as well
# as the video frame.
ret, frame
=
self
.cap.read()
if
ret
=
=
True
:
# Publish the image.
# The 'cv2_to_imgmsg' method converts an OpenCV
# image to a ROS 2 image message
self
.publisher_.publish(
self
.br.cv2_to_imgmsg(frame))
# Display the message on the console
self
.get_logger().info(
'Publishing video frame'
)
def
main(args
=
None
):
# Initialize the rclpy library
rclpy.init(args
=
args)
# Create the node
image_publisher
=
ImagePublisher()
# Spin the node so the callback function is called.
rclpy.spin(image_publisher)
# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
image_publisher.destroy_node()
# Shutdown the ROS client library for Python
rclpy.shutdown()
if
__name__
=
=
'__main__'
:
main()
sub订阅:
foxygalactic
# Basic ROS 2 program to subscribe to real-time streaming
# video from your built-in webcam
# Author:
# - Addison Sears-Collins
# Import the necessary libraries
import
rclpy
# Python library for ROS 2
from
rclpy.node
import
Node
# Handles the creation of nodes
from
sensor_msgs.msg
import
Image
# Image is the message type
from
cv_bridge
import
CvBridge
# Package to convert between ROS and OpenCV Images
import
cv2
# OpenCV library
class
ImageSubscriber(Node):
"""
Create an ImageSubscriber class, which is a subclass of the Node class.
"""
def
__init__(
self
):
"""
Class constructor to set up the node
"""
# Initiate the Node class's constructor and give it a name
super
().__init__(
'image_subscriber'
)
# Create the subscriber. This subscriber will receive an Image
# from the video_frames topic. The queue size is 10 messages.
self
.subscription
=
self
.create_subscription(
Image,
'video_frames'
,
self
.listener_callback,
10
)
self
.subscription
# prevent unused variable warning
# Used to convert between ROS and OpenCV images
self
.br
=
CvBridge()
def
listener_callback(
self
, data):
"""
Callback function.
"""
# Display the message on the console
self
.get_logger().info(
'Receiving video frame'
)
# Convert ROS Image message to OpenCV image
current_frame
=
self
.br.imgmsg_to_cv2(data)
# Display image
cv2.imshow(
"camera"
, current_frame)
cv2.waitKey(
1
)
def
main(args
=
None
):
# Initialize the rclpy library
rclpy.init(args
=
args)
# Create the node
image_subscriber
=
ImageSubscriber()
# Spin the node so the callback function is called.
rclpy.spin(image_subscriber)
# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
image_subscriber.destroy_node()
# Shutdown the ROS client library for Python
rclpy.shutdown()
if
__name__
=
=
'__main__'
:
main()
# Basic ROS 2 program to subscribe to real-time streaming
# video from your built-in webcam
# Author:
# - Addison Sears-Collins
# Import the necessary libraries
import
rclpy
# Python library for ROS 2
from
rclpy.node
import
Node
# Handles the creation of nodes
from
sensor_msgs.msg
import
Image
# Image is the message type
from
cv_bridge
import
CvBridge
# Package to convert between ROS and OpenCV Images
import
cv2
# OpenCV library
class
ImageSubscriber(Node):
"""
Create an ImageSubscriber class, which is a subclass of the Node class.
"""
def
__init__(
self
):
"""
Class constructor to set up the node
"""
# Initiate the Node class's constructor and give it a name
super
().__init__(
'image_subscriber'
)
# Create the subscriber. This subscriber will receive an Image
# from the video_frames topic. The queue size is 10 messages.
self
.subscription
=
self
.create_subscription(
Image,
'video_frames'
,
self
.listener_callback,
10
)
self
.subscription
# prevent unused variable warning
# Used to convert between ROS and OpenCV images
self
.br
=
CvBridge()
def
listener_callback(
self
, data):
"""
Callback function.
"""
# Display the message on the console
self
.get_logger().info(
'Receiving video frame'
)
# Convert ROS Image message to OpenCV image
current_frame
=
self
.br.imgmsg_to_cv2(data)
# Display image
cv2.imshow(
"camera"
, current_frame)
cv2.waitKey(
1
)
def
main(args
=
None
):
# Initialize the rclpy library
rclpy.init(args
=
args)
# Create the node
image_subscriber
=
ImageSubscriber()
# Spin the node so the callback function is called.
rclpy.spin(image_subscriber)
# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
image_subscriber.destroy_node()
# Shutdown the ROS client library for Python
rclpy.shutdown()
if
__name__
=
=
'__main__'
:
main()
humble:
pub
# Basic ROS 2 program to publish real-time streaming
# video from your built-in webcam
# Author:
# - Addison Sears-Collins
# Import the necessary libraries
import rclpy # Python Client Library for ROS 2
from rclpy.node import Node # Handles the creation of nodes
from sensor_msgs.msg import Image # Image is the message type
from cv_bridge import CvBridge # Package to convert between ROS and OpenCV Images
import cv2 # OpenCV library
class ImagePublisher(Node):
"""
Create an ImagePublisher class, which is a subclass of the Node class.
"""
def __init__(self):
"""
Class constructor to set up the node
"""
# Initiate the Node class's constructor and give it a name
super().__init__('image_publisher')
# Create the publisher. This publisher will publish an Image
# to the video_frames topic. The queue size is 10 messages.
self.publisher_ = self.create_publisher(Image, 'video_frames', 10)
# We will publish a message every 0.1 seconds
timer_period = 0.1 # seconds
# Create the timer
self.timer = self.create_timer(timer_period, self.timer_callback)
# Create a VideoCapture object
# The argument '0' gets the default webcam.
self.cap = cv2.VideoCapture(0)
# Used to convert between ROS and OpenCV images
self.br = CvBridge()
def timer_callback(self):
"""
Callback function.
This function gets called every 0.1 seconds.
"""
# Capture frame-by-frame
# This method returns True/False as well
# as the video frame.
ret, frame = self.cap.read()
if ret == True:
# Publish the image.
# The 'cv2_to_imgmsg' method converts an OpenCV
# image to a ROS 2 image message
self.publisher_.publish(self.br.cv2_to_imgmsg(frame))
# Display the message on the console
self.get_logger().info('Publishing video frame')
def main(args=None):
# Initialize the rclpy library
rclpy.init(args=args)
# Create the node
image_publisher = ImagePublisher()
# Spin the node so the callback function is called.
rclpy.spin(image_publisher)
# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
image_publisher.destroy_node()
# Shutdown the ROS client library for Python
rclpy.shutdown()
if __name__ == '__main__':
main()
sub
# Basic ROS 2 program to subscribe to real-time streaming
# video from your built-in webcam
# Author:
# - Addison Sears-Collins
# Import the necessary libraries
import rclpy # Python library for ROS 2
from rclpy.node import Node # Handles the creation of nodes
from sensor_msgs.msg import Image # Image is the message type
from cv_bridge import CvBridge # Package to convert between ROS and OpenCV Images
import cv2 # OpenCV library
class ImageSubscriber(Node):
"""
Create an ImageSubscriber class, which is a subclass of the Node class.
"""
def __init__(self):
"""
Class constructor to set up the node
"""
# Initiate the Node class's constructor and give it a name
super().__init__('image_subscriber')
# Create the subscriber. This subscriber will receive an Image
# from the video_frames topic. The queue size is 10 messages.
self.subscription = self.create_subscription(
Image,
'video_frames',
self.listener_callback,
10)
self.subscription # prevent unused variable warning
# Used to convert between ROS and OpenCV images
self.br = CvBridge()
def listener_callback(self, data):
"""
Callback function.
"""
# Display the message on the console
self.get_logger().info('Receiving video frame')
# Convert ROS Image message to OpenCV image
current_frame = self.br.imgmsg_to_cv2(data)
# Display image
cv2.imshow("camera", current_frame)
cv2.waitKey(1)
def main(args=None):
# Initialize the rclpy library
rclpy.init(args=args)
# Create the node
image_subscriber = ImageSubscriber()
# Spin the node so the callback function is called.
rclpy.spin(image_subscriber)
# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
image_subscriber.destroy_node()
# Shutdown the ROS client library for Python
rclpy.shutdown()
if __name__ == '__main__':
main()
版权归原作者 zhangrelay 所有, 如有侵权,请联系我们删除。