ROS2 Tutorials – Writing a simple publisher and subscriber (Python)

はじめに

漸く実践的なチュートリアルに入った.

今回は Python で Pub/Sub するノードを実装する.

Writing a simple publisher and subscriber (Python) — ROS 2 Documentation: Iron documentation

Pub/Sub してみる

pub node を作る

まずはいつもの通り,ワークスペースを作る.

$ source ros2_iron/install/setup.bash
$ mkdir -p ros-ws/src
$ cd ros-ws/src

パッケージのもとを作る.

$ ros2 pkg create --build-type ament_python --license Apache-2.0 py_pubsub

ソースコードを拾ってくる.

$ cd py_pubsub/py_pubsub/
$ wget https://raw.githubusercontent.com/ros2/examples/iron/rclpy/topics/minimal_publisher/examples_rclpy_minimal_publisher/publisher_member_function.py

ソースの中身はこんな感じ.

# Copyright 2016 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import rclpy
from rclpy.node import Node

from std_msgs.msg import String


class MinimalPublisher(Node):

    def __init__(self):
        super().__init__('minimal_publisher')                         # コンストラクタでノード名を設定
        self.publisher_ = self.create_publisher(String, 'topic', 10)  # メッセージ型:String,トピック名:topic,キューサイズ:10
        timer_period = 0.5  # seconds
        self.timer = self.create_timer(timer_period, self.timer_callback)
        self.i = 0

    def timer_callback(self): # 0.5秒毎にpubしながらログ出力する
        msg = String()
        msg.data = 'Hello World: %d' % self.i
        self.publisher_.publish(msg)
        self.get_logger().info('Publishing: "%s"' % msg.data)
        self.i += 1


def main(args=None):
    rclpy.init(args=args)

    minimal_publisher = MinimalPublisher()

    rclpy.spin(minimal_publisher)

    # Destroy the node explicitly
    # (optional - otherwise it will be done automatically
    # when the garbage collector destroys the node object)
    minimal_publisher.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

このソースでは rclpy と sdt_msgs を利用しているので,package.xml に依存関係として次の通りに記載する必要がある.description,maintainer,license の変更はお好みでどうぞ.

<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
  <name>py_pubsub</name>
  <version>0.0.0</version>
  <description>TODO: Package description</description>
  <maintainer email="*****@todo.todo">*****</maintainer>
  <license>Apache-2.0</license>

  <exec_depend>rclpy</exec_depend>     # 依存関係を記載
  <exec_depend>std_msgs</exec_depend>  # 依存関係を記載

  <export>
    <build_type>ament_python</build_type>
  </export>
</package>

続いてノードとして呼び出す際のエントリーポイントを指定する.maintainer,maintainer_email,description はお好みでどうぞ.

from setuptools import find_packages, setup

package_name = 'py_pubsub'

setup(
    name=package_name,
    version='0.0.0',
    packages=find_packages(exclude=['test']),
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='*****',
    maintainer_email='*****@todo.todo',
    description='TODO: Package description',
    license='Apache-2.0',
    tests_require=['pytest'],
    entry_points={
        'console_scripts': [
            'talker = py_pubsub.publisher_member_function:main',  # エントリーポイントを記載
        ],
    },
)

sub node を作る

ソースコードを拾ってくる.

$ cd src/py_pubsub/py_pubsub/
$ wget https://raw.githubusercontent.com/ros2/examples/iron/rclpy/topics/minimal_subscriber/examples_rclpy_minimal_subscriber/subscriber_member_function.py

ソースの中身はこんな感じ.構造は pub とほぼ同じ.

$ cat subscriber_member_function.py 
# Copyright 2016 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import rclpy
from rclpy.node import Node

from std_msgs.msg import String


class MinimalSubscriber(Node):

    def __init__(self):
        super().__init__('minimal_subscriber')         # コンストラクタでノード名を設定
        self.subscription = self.create_subscription(
            String,                                    # pub と同じ
            'topic',                                   # pub と同じ
            self.listener_callback,                    # メッセージを受信したら listener_callback を呼び出す
            10)                                        # pub と同じ
        self.subscription  # prevent unused variable warning

    def listener_callback(self, msg):
        self.get_logger().info('I heard: "%s"' % msg.data)


def main(args=None):
    rclpy.init(args=args)

    minimal_subscriber = MinimalSubscriber()

    rclpy.spin(minimal_subscriber)

    # Destroy the node explicitly
    # (optional - otherwise it will be done automatically
    # when the garbage collector destroys the node object)
    minimal_subscriber.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

ソースが用意できたので,pub と同様に setup.py にエントリーポイントを追記する.

from setuptools import find_packages, setup

package_name = 'py_pubsub'

setup(
    name=package_name,
    version='0.0.0',
    packages=find_packages(exclude=['test']),
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='*****',
    maintainer_email='*****@todo.todo',
    description='TODO: Package description',
    license='Apache-2.0',
    tests_require=['pytest'],
    entry_points={
        'console_scripts': [
            'talker = py_pubsub.publisher_member_function:main',
            'listener = py_pubsub.subscriber_member_function:main', # エントリーポイントを追記
        ],
    },
)

この時点でワークスペースはこんな状態.

$ tree
.
└── src
    └── py_pubsub
        ├── LICENSE
        ├── package.xml
        ├── py_pubsub
        │   ├── __init__.py
        │   ├── publisher_member_function.py
        │   └── subscriber_member_function.py
        ├── resource
        │   └── py_pubsub
        ├── setup.cfg
        ├── setup.py
        └── test
            ├── test_copyright.py
            ├── test_flake8.py
            └── test_pep257.py

ビルドして実行してみる

あとはこれまでのチュートリアルの通り,依存関係の解消後に colcon でビルドして実行まで進める.

$ rosdep install -i --from-path src --rosdistro iron -y
#All required rosdeps installed successfully
$ colcon build --packages-select py_pubsub
Starting >>> py_pubsub
--- stderr: py_pubsub                   
/usr/lib/python3.12/site-packages/setuptools/_distutils/cmd.py:66: SetuptoolsDeprecationWarning: setup.py install is deprecated.
!!

        ********************************************************************************
        Please avoid running ``setup.py`` directly.
        Instead, use pypa/build, pypa/installer, pypa/build or
        other standards-based tools.

        See https://blog.ganssle.io/articles/2021/10/setup-py-deprecated.html for details.
        ********************************************************************************

!!
  self.initialize_options()
---
Finished <<< py_pubsub [3.73s]

Summary: 1 package finished [6.33s]
  1 package had stderr output: py_pubsub

なんかエラー出たけど,オーバーレイをソースする.

$ source install/setup.bash 

pub node を起動してみる.

$ ros2 run py_pubsub talker
[INFO] [minimal_publisher]: Publishing: "Hello World: 0"
[INFO] [minimal_publisher]: Publishing: "Hello World: 1"
[INFO] [minimal_publisher]: Publishing: "Hello World: 2"
[INFO] [minimal_publisher]: Publishing: "Hello World: 3"

sub node も起動してみる.

$ ros2 run py_pubsub listener
[INFO] [minimal_subscriber]: I heard: "Hello World: 14"
[INFO] [minimal_subscriber]: I heard: "Hello World: 15"
[INFO] [minimal_subscriber]: I heard: "Hello World: 16"

なんかエラー出てたけど,0.5秒間隔でちゃんと pub/sub できた.

おわりに

今回は漸く pub/sub 処理を実装した.センサーデータ垂れ流したり,用途はたくさんありそう.

コメント

タイトルとURLをコピーしました