ROS2 2일차(4) topic 프로그래밍

2024. 9. 20. 16:21ROS2/기초

 

https://puzzling-cashew-c4c.notion.site/Topic-python-abf265c0af7b45c38264914d164d7349

 

Topic 프로그래밍 - python | Notion

즐거운 프로그래밍 첫번째 시간입니다. 지난 시간에 실행시켰던 cmd_vel_pub_node를 뜯어봅시다.

puzzling-cashew-c4c.notion.site

참고 문서

 

 

Publisher Node 작성

코드 분석(cmd_vel_pub.py)

# !/usr/bin/env/ python3
from geometry_msgs.msg import Twist
import rclpy
from rclpy.node import Node


class CmdVelPublisher(Node):

    def __init__(self):
        super().__init__('cmd_vel_pub_node')
        self.publisher = self.create_publisher(
            Twist, 'skidbot/cmd_vel', 10
        )  # queue size
        timer_period = 0.5  # seconds
        self.timer = self.create_timer(timer_period, self.publish_callback)
        self.get_logger().info(
            'DriveForward node Started, move forward during 5 seconds \n'
        )

    def publish_callback(self):
        twist_msg = Twist()
        twist_msg.linear.x = 0.5
        twist_msg.angular.z = 1.0
        self.publisher.publish(twist_msg)

    def stop_robot(self):
        stop_msg = Twist()
        stop_msg.linear.x = 0.0
        stop_msg.angular.z = 0.0
        self.publisher.publish(stop_msg)


def main(args=None):
    rclpy.init(args=args)

    cmd_vel_publisher = CmdVelPublisher()
    start_time = cmd_vel_publisher.get_clock().now().to_msg().sec
    clock_now = start_time
    time_delta = 0

    while (clock_now - start_time) < 5:
        rclpy.spin_once(cmd_vel_publisher)
        clock_now = cmd_vel_publisher.get_clock().now().to_msg().sec

        time_delta = clock_now - start_time
        cmd_vel_publisher.get_logger().info(f'{time_delta} seconds passed')

    cmd_vel_publisher.stop_robot()

    cmd_vel_publisher.get_logger().info('\n==== Stop Publishing ====')
    cmd_vel_publisher.destroy_node()

    rclpy.shutdown()


if __name__ == '__main__':
    main()

 

#!/usr/bin/env python3

쉐뱅(shebang)이라고 불리며, 스크립트 파일을 실행할 때 어떤 인터프리터를 사용할지 지정하는 역할

 

import rclpy
from rclpy.node import Node

from geometry_msgs.msg import Twist

python ros2 프로그래밍을 위한 rclpy, 이전 시간에 살펴봤던 message type인 Twist를 import

geometry_msgs/msg/Twist  >> from geometry_msgs.msg import Twist

 

def main(args=None):
    rclpy.init(args=args)

    cmd_vel_publisher = CmdVelPublisher()
		
		# ROS2에서 시간을 다루기 위해 아래와 같은 방식을 사용합니다.
		# 기본적으로 CPU clock을 사용하기 때문에 절대 시간과 정확히 일치한다는 보장은 없습니다.
    start_time = cmd_vel_publisher.get_clock().now().to_msg().sec
    clock_now  = start_time
		time_delta = 0

		# 5초 동안만 실행하기 위한 로직이 추가되었습니다.
    while (clock_now - start_time) < 5:
				# 단 한 번만 spin시키기 위해 spin_once가 사용되었습니다.
        rclpy.spin_once(cmd_vel_publisher)
        clock_now = cmd_vel_publisher.get_clock().now().to_msg().sec

        time_delta = clock_now - start_time
        print(f'{time_delta} seconds passed')

		# 나머지는 이전 예제와 동일합니다.
    cmd_vel_publisher.stop_robot()

    cmd_vel_publisher.get_logger().info('\n==== Stop Publishing ====')
    cmd_vel_publisher.destroy_node()

    rclpy.shutdown()

 

★중요★

ROS2의 모든 개발은 Class 형태로 개발을 추천하고 있다. 더불어 모든 Class는 Node를 기본적으로 상속받아야 함.

이 Node안에 node 동작에 필수적인 기능들이 모두 구현되어 있음.

class CmdVelPublisher(Node):

    def __init__(self):
				# 이 Class(=Node)가 가질 이름을 정해줍니다.
        super().__init__("cmd_vel_pub_node") # 부모 노드의 생성자 호출
				...
class CmdVelPublisher(Node):

    def __init__(self):
        super().__init__("cmd_vel_pub_node")
				# publisher를 생성하는 부분입니다. 하단에 추가 설명이 있습니다.
        self.publisher = self.create_publisher(Twist, "skidbot/cmd_vel", 10)

				# 어느정도의 주기로 publish 할 것인지를 선택합니다.
				# 지금의 경우 0.5초의 간격으로 publish_callback 함수를 반복 실행합니다.
        timer_period = 0.5  # seconds
        self.timer = self.create_timer(timer_period, self.publish_callback)

				# 시작한다는 log를 출력합니다.
        self.get_logger().info(" DriveForward node Started, move forward during 5 seconds \n")
    
    def publish_callback(self):
				...

    def stop_robot(self):
        ...

 

create_publisher

  • Twist : Topic 통신에 사용될 Message Type
  • "skidbot/cmd_vel" : 이 Topic의 이름을 지정
  • 10 : "대기열의 크기"라고 이해하면 됨.

0.5초 간격을 갖고 반복적으로 실행되는 publish_callback 함수 살펴보기

def publish_callback(self):
	# 상단 형식에 맞추어 Twist Message를 채워줍니다.

	# 전방 속도 0.5 / 각속도 1.0 => 원 운동
    twist_msg = Twist()
    twist_msg.linear.x  = 0.5
    twist_msg.angular.z = 1.0
    self.publisher.publish(twist_msg)

def stop_robot(self):
    stop_msg = Twist()

    # 전방 속도 0.0 / 각속도 0.0 => 정지
    stop_msg.linear.x  = 0.0
    stop_msg.angular.z = 0.0
    self.publisher.publish(stop_msg)

 

Subscriber Node 작성

Laser Scan Subscriber 예제

$ rosfoxy
$ ros2 run py_topic_pkg laser_raw_node
...
7.2921977043151855, 7.315267562866211, 7.348639965057373, 7.372188568115234, 
7.422288417816162, 7.451962471008301, 7.485859394073486, 7.517815589904785, 
7.548501968383789, 7.598430156707764, 7.646932125091553, 7.666992664337158, 
7.723824501037598, 7.737565517425537, 7.807446479797363, 7.838301181793213, 
7.885664463043213, 7.923019886016846, 7.958601474761963, 8.01160717010498, 
6.647082328796387])

이것들은 2D Lidar의 raw 데이터인데 다루기 위해서는 Rviz 사용

 

# Copyright 2021 Seoul Business Agency Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# !/usr/bin/env/ python3

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan


class LaserSubscriber(Node):

    def __init__(self):
        super().__init__('laser_sub_node')
        queue_size = 10
        self.subscriber = self.create_subscription(
            LaserScan, 'skidbot/scan', self.sub_callback, queue_size
        )
        self.subscriber  # prevent unused variable warning

    def sub_callback(self, msg):
        self.get_logger().info(f'Raw Laser Data : {msg.ranges}')


def main(args=None):
    rclpy.init(args=args)

    laser_subscriber = LaserSubscriber()

    rclpy.spin(laser_subscriber)

    laser_subscriber.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

main은 이전과 같으므로 차이점을 살펴보자

# 2D Lidar는 sensor_msgs/msg/LaserScan 이라는 형식을 사용합니다.
from sensor_msgs.msg import LaserScan

class LaserSubscriber(Node):

    def __init__(self):
        super().__init__("laser_sub_node")
        queue_size = 10	    
        self.subscriber = self.create_subscription(
            LaserScan, 'skidbot/scan', self.sub_callback, queue_size
        )
        self.subscriber # prevent unused variable warning

        # Topic을 통해 subscribe 할 때마다 이 함수가 실행됩니다. 
        # 그리고 두번째 매개변수인 msg에는 전달받은 Message가 담겨 있습니다.
    	def sub_callback(self, msg):
            self.get_logger().info(f'Raw Laser Data : {msg.ranges}')

create_subscription

  • LaserScan : Topic 통신에 사용될 Message Type
  • "skidbot/scan" : 데이터를 Subscribe 받을 Topic의 이름
  • self.sub_callback : callback 함수
  • queue_size : publisher때와 마찬가지로, "대기열의 크기" 라고 이해하면 됨(데이터를 몇 개나 보관할지를 결정하는 QoS 옵션

LaserScan Message

LaserScan Type

실질적으로 lidar data를 담고 있는 부분은 ranges이며, ranges는 길이 720의 list로, 다음과 같이 0.25도의 분해능으로

scan된 물체와의 거리를 저장

 

전방 물체와의 거리만 관심있으면

laser_sub.py 중

def sub_callback(self, msg):
    self.get_logger().info(f'Raw Laser Data : {msg.ranges[360]}')

 

laser_sub_node 실행

$ ros2 run py_topic_pkg laser_sub_node 
Distance from Front Object : 4.441709995269775
Distance from Front Object : 4.453824043273926
Distance from Front Object : 4.432742595672607
Distance from Front Object : 4.467106342315674
...

 

Example(전진하다가 적외선 거리가 짧아지면 정지)

# Copyright 2021 Seoul Business Agency Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# !/usr/bin/env/ python3

import sys

from geometry_msgs.msg import Twist
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan


class ParkingNode(Node):

    def __init__(self):
        super().__init__('parking_node')

        self.publisher = self.create_publisher(Twist, 'skidbot/cmd_vel', 10)

        self.subscriber = self.create_subscription(
            LaserScan, 'skidbot/scan', self.sub_callback, 10
        )
        self.subscriber  # prevent unused variable warning
        self.publisher  # prevent unused variable warning

        self.get_logger().info('==== Parking Node Started ====\n')

    def sub_callback(self, msg):
        twist_msg = Twist()
        distance_forward = msg.ranges[360]

        if distance_forward > 0.5:
            self.get_logger().info(f'Distance from Front Object : {distance_forward}')
            twist_msg.linear.x = 0.5
            self.publisher.publish(twist_msg)
        else:
            self.get_logger().info('==== Parking Done!!! ====\n')
            twist_msg.linear.x = 0.0
            self.publisher.publish(twist_msg)


def main(args=None):
    rclpy.init(args=args)

    parking_node = ParkingNode()

    try:
        rclpy.spin(parking_node)
    except KeyboardInterrupt:
        parking_node.get_logger().info('==== Server stopped cleanly ====')
    except BaseException:
        parking_node.get_logger().info('!! Exception in server:', file=sys.stderr)
        raise
    finally:
        # (optional - Done automatically when node is garbage collected)
        rclpy.shutdown()


if __name__ == '__main__':
    main()

'ROS2 > 기초' 카테고리의 다른 글

ROS2 3일차(1) Service  (0) 2024.09.23
ROS2 2일차(5) tf2  (0) 2024.09.20
ROS2 2일차(3) topic  (0) 2024.09.20
ROS2 2일차(2) launch  (0) 2024.09.20
ROS2 2일차(1) node  (0) 2024.09.20