はじめに
漸く実践的なチュートリアルに入った.
今回は Python で Pub/Sub するノードを実装する.
Writing a simple publisher and subscriber (Python) — ROS 2 Documentation: Iron documentation
Pub/Sub してみる
pub node を作る
まずはいつもの通り,ワークスペースを作る.
$ source ros2_iron/install/setup.bash
$ mkdir -p ros-ws/src
$ cd ros-ws/src
パッケージのもとを作る.
$ ros2 pkg create --build-type ament_python --license Apache-2.0 py_pubsub
ソースコードを拾ってくる.
$ cd py_pubsub/py_pubsub/
$ wget https://raw.githubusercontent.com/ros2/examples/iron/rclpy/topics/minimal_publisher/examples_rclpy_minimal_publisher/publisher_member_function.py
ソースの中身はこんな感じ.
# Copyright 2016 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class MinimalPublisher(Node):
def __init__(self):
super().__init__('minimal_publisher') # コンストラクタでノード名を設定
self.publisher_ = self.create_publisher(String, 'topic', 10) # メッセージ型:String,トピック名:topic,キューサイズ:10
timer_period = 0.5 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0
def timer_callback(self): # 0.5秒毎にpubしながらログ出力する
msg = String()
msg.data = 'Hello World: %d' % self.i
self.publisher_.publish(msg)
self.get_logger().info('Publishing: "%s"' % msg.data)
self.i += 1
def main(args=None):
rclpy.init(args=args)
minimal_publisher = MinimalPublisher()
rclpy.spin(minimal_publisher)
# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
minimal_publisher.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
このソースでは rclpy と sdt_msgs を利用しているので,package.xml に依存関係として次の通りに記載する必要がある.description,maintainer,license の変更はお好みでどうぞ.
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>py_pubsub</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="*****@todo.todo">*****</maintainer>
<license>Apache-2.0</license>
<exec_depend>rclpy</exec_depend> # 依存関係を記載
<exec_depend>std_msgs</exec_depend> # 依存関係を記載
<export>
<build_type>ament_python</build_type>
</export>
</package>
続いてノードとして呼び出す際のエントリーポイントを指定する.maintainer,maintainer_email,description はお好みでどうぞ.
from setuptools import find_packages, setup
package_name = 'py_pubsub'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='*****',
maintainer_email='*****@todo.todo',
description='TODO: Package description',
license='Apache-2.0',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'talker = py_pubsub.publisher_member_function:main', # エントリーポイントを記載
],
},
)
sub node を作る
ソースコードを拾ってくる.
$ cd src/py_pubsub/py_pubsub/
$ wget https://raw.githubusercontent.com/ros2/examples/iron/rclpy/topics/minimal_subscriber/examples_rclpy_minimal_subscriber/subscriber_member_function.py
ソースの中身はこんな感じ.構造は pub とほぼ同じ.
$ cat subscriber_member_function.py
# Copyright 2016 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class MinimalSubscriber(Node):
def __init__(self):
super().__init__('minimal_subscriber') # コンストラクタでノード名を設定
self.subscription = self.create_subscription(
String, # pub と同じ
'topic', # pub と同じ
self.listener_callback, # メッセージを受信したら listener_callback を呼び出す
10) # pub と同じ
self.subscription # prevent unused variable warning
def listener_callback(self, msg):
self.get_logger().info('I heard: "%s"' % msg.data)
def main(args=None):
rclpy.init(args=args)
minimal_subscriber = MinimalSubscriber()
rclpy.spin(minimal_subscriber)
# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
minimal_subscriber.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
ソースが用意できたので,pub と同様に setup.py にエントリーポイントを追記する.
from setuptools import find_packages, setup
package_name = 'py_pubsub'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='*****',
maintainer_email='*****@todo.todo',
description='TODO: Package description',
license='Apache-2.0',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'talker = py_pubsub.publisher_member_function:main',
'listener = py_pubsub.subscriber_member_function:main', # エントリーポイントを追記
],
},
)
この時点でワークスペースはこんな状態.
$ tree
.
└── src
└── py_pubsub
├── LICENSE
├── package.xml
├── py_pubsub
│ ├── __init__.py
│ ├── publisher_member_function.py
│ └── subscriber_member_function.py
├── resource
│ └── py_pubsub
├── setup.cfg
├── setup.py
└── test
├── test_copyright.py
├── test_flake8.py
└── test_pep257.py
ビルドして実行してみる
あとはこれまでのチュートリアルの通り,依存関係の解消後に colcon でビルドして実行まで進める.
$ rosdep install -i --from-path src --rosdistro iron -y
#All required rosdeps installed successfully
$ colcon build --packages-select py_pubsub
Starting >>> py_pubsub
--- stderr: py_pubsub
/usr/lib/python3.12/site-packages/setuptools/_distutils/cmd.py:66: SetuptoolsDeprecationWarning: setup.py install is deprecated.
!!
********************************************************************************
Please avoid running ``setup.py`` directly.
Instead, use pypa/build, pypa/installer, pypa/build or
other standards-based tools.
See https://blog.ganssle.io/articles/2021/10/setup-py-deprecated.html for details.
********************************************************************************
!!
self.initialize_options()
---
Finished <<< py_pubsub [3.73s]
Summary: 1 package finished [6.33s]
1 package had stderr output: py_pubsub
なんかエラー出たけど,オーバーレイをソースする.
$ source install/setup.bash
pub node を起動してみる.
$ ros2 run py_pubsub talker
[INFO] [minimal_publisher]: Publishing: "Hello World: 0"
[INFO] [minimal_publisher]: Publishing: "Hello World: 1"
[INFO] [minimal_publisher]: Publishing: "Hello World: 2"
[INFO] [minimal_publisher]: Publishing: "Hello World: 3"
sub node も起動してみる.
$ ros2 run py_pubsub listener
[INFO] [minimal_subscriber]: I heard: "Hello World: 14"
[INFO] [minimal_subscriber]: I heard: "Hello World: 15"
[INFO] [minimal_subscriber]: I heard: "Hello World: 16"
なんかエラー出てたけど,0.5秒間隔でちゃんと pub/sub できた.
おわりに
今回は漸く pub/sub 処理を実装した.センサーデータ垂れ流したり,用途はたくさんありそう.
コメント