2024. 9. 20. 16:21ㆍROS2/기초
https://puzzling-cashew-c4c.notion.site/Topic-python-abf265c0af7b45c38264914d164d7349
참고 문서
Publisher Node 작성
코드 분석(cmd_vel_pub.py)
# !/usr/bin/env/ python3
from geometry_msgs.msg import Twist
import rclpy
from rclpy.node import Node
class CmdVelPublisher(Node):
def __init__(self):
super().__init__('cmd_vel_pub_node')
self.publisher = self.create_publisher(
Twist, 'skidbot/cmd_vel', 10
) # queue size
timer_period = 0.5 # seconds
self.timer = self.create_timer(timer_period, self.publish_callback)
self.get_logger().info(
'DriveForward node Started, move forward during 5 seconds \n'
)
def publish_callback(self):
twist_msg = Twist()
twist_msg.linear.x = 0.5
twist_msg.angular.z = 1.0
self.publisher.publish(twist_msg)
def stop_robot(self):
stop_msg = Twist()
stop_msg.linear.x = 0.0
stop_msg.angular.z = 0.0
self.publisher.publish(stop_msg)
def main(args=None):
rclpy.init(args=args)
cmd_vel_publisher = CmdVelPublisher()
start_time = cmd_vel_publisher.get_clock().now().to_msg().sec
clock_now = start_time
time_delta = 0
while (clock_now - start_time) < 5:
rclpy.spin_once(cmd_vel_publisher)
clock_now = cmd_vel_publisher.get_clock().now().to_msg().sec
time_delta = clock_now - start_time
cmd_vel_publisher.get_logger().info(f'{time_delta} seconds passed')
cmd_vel_publisher.stop_robot()
cmd_vel_publisher.get_logger().info('\n==== Stop Publishing ====')
cmd_vel_publisher.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
#!/usr/bin/env python3
쉐뱅(shebang)이라고 불리며, 스크립트 파일을 실행할 때 어떤 인터프리터를 사용할지 지정하는 역할
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
python ros2 프로그래밍을 위한 rclpy, 이전 시간에 살펴봤던 message type인 Twist를 import
geometry_msgs/msg/Twist >> from geometry_msgs.msg import Twist
def main(args=None):
rclpy.init(args=args)
cmd_vel_publisher = CmdVelPublisher()
# ROS2에서 시간을 다루기 위해 아래와 같은 방식을 사용합니다.
# 기본적으로 CPU clock을 사용하기 때문에 절대 시간과 정확히 일치한다는 보장은 없습니다.
start_time = cmd_vel_publisher.get_clock().now().to_msg().sec
clock_now = start_time
time_delta = 0
# 5초 동안만 실행하기 위한 로직이 추가되었습니다.
while (clock_now - start_time) < 5:
# 단 한 번만 spin시키기 위해 spin_once가 사용되었습니다.
rclpy.spin_once(cmd_vel_publisher)
clock_now = cmd_vel_publisher.get_clock().now().to_msg().sec
time_delta = clock_now - start_time
print(f'{time_delta} seconds passed')
# 나머지는 이전 예제와 동일합니다.
cmd_vel_publisher.stop_robot()
cmd_vel_publisher.get_logger().info('\n==== Stop Publishing ====')
cmd_vel_publisher.destroy_node()
rclpy.shutdown()
★중요★
ROS2의 모든 개발은 Class 형태로 개발을 추천하고 있다. 더불어 모든 Class는 Node를 기본적으로 상속받아야 함.
이 Node안에 node 동작에 필수적인 기능들이 모두 구현되어 있음.
class CmdVelPublisher(Node):
def __init__(self):
# 이 Class(=Node)가 가질 이름을 정해줍니다.
super().__init__("cmd_vel_pub_node") # 부모 노드의 생성자 호출
...
class CmdVelPublisher(Node):
def __init__(self):
super().__init__("cmd_vel_pub_node")
# publisher를 생성하는 부분입니다. 하단에 추가 설명이 있습니다.
self.publisher = self.create_publisher(Twist, "skidbot/cmd_vel", 10)
# 어느정도의 주기로 publish 할 것인지를 선택합니다.
# 지금의 경우 0.5초의 간격으로 publish_callback 함수를 반복 실행합니다.
timer_period = 0.5 # seconds
self.timer = self.create_timer(timer_period, self.publish_callback)
# 시작한다는 log를 출력합니다.
self.get_logger().info(" DriveForward node Started, move forward during 5 seconds \n")
def publish_callback(self):
...
def stop_robot(self):
...
create_publisher
- Twist : Topic 통신에 사용될 Message Type
- "skidbot/cmd_vel" : 이 Topic의 이름을 지정
- 10 : "대기열의 크기"라고 이해하면 됨.
0.5초 간격을 갖고 반복적으로 실행되는 publish_callback 함수 살펴보기
def publish_callback(self):
# 상단 형식에 맞추어 Twist Message를 채워줍니다.
# 전방 속도 0.5 / 각속도 1.0 => 원 운동
twist_msg = Twist()
twist_msg.linear.x = 0.5
twist_msg.angular.z = 1.0
self.publisher.publish(twist_msg)
def stop_robot(self):
stop_msg = Twist()
# 전방 속도 0.0 / 각속도 0.0 => 정지
stop_msg.linear.x = 0.0
stop_msg.angular.z = 0.0
self.publisher.publish(stop_msg)
Subscriber Node 작성
Laser Scan Subscriber 예제
$ rosfoxy
$ ros2 run py_topic_pkg laser_raw_node
...
7.2921977043151855, 7.315267562866211, 7.348639965057373, 7.372188568115234,
7.422288417816162, 7.451962471008301, 7.485859394073486, 7.517815589904785,
7.548501968383789, 7.598430156707764, 7.646932125091553, 7.666992664337158,
7.723824501037598, 7.737565517425537, 7.807446479797363, 7.838301181793213,
7.885664463043213, 7.923019886016846, 7.958601474761963, 8.01160717010498,
6.647082328796387])
이것들은 2D Lidar의 raw 데이터인데 다루기 위해서는 Rviz 사용
# Copyright 2021 Seoul Business Agency Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# !/usr/bin/env/ python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan
class LaserSubscriber(Node):
def __init__(self):
super().__init__('laser_sub_node')
queue_size = 10
self.subscriber = self.create_subscription(
LaserScan, 'skidbot/scan', self.sub_callback, queue_size
)
self.subscriber # prevent unused variable warning
def sub_callback(self, msg):
self.get_logger().info(f'Raw Laser Data : {msg.ranges}')
def main(args=None):
rclpy.init(args=args)
laser_subscriber = LaserSubscriber()
rclpy.spin(laser_subscriber)
laser_subscriber.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
main은 이전과 같으므로 차이점을 살펴보자
# 2D Lidar는 sensor_msgs/msg/LaserScan 이라는 형식을 사용합니다.
from sensor_msgs.msg import LaserScan
class LaserSubscriber(Node):
def __init__(self):
super().__init__("laser_sub_node")
queue_size = 10
self.subscriber = self.create_subscription(
LaserScan, 'skidbot/scan', self.sub_callback, queue_size
)
self.subscriber # prevent unused variable warning
# Topic을 통해 subscribe 할 때마다 이 함수가 실행됩니다.
# 그리고 두번째 매개변수인 msg에는 전달받은 Message가 담겨 있습니다.
def sub_callback(self, msg):
self.get_logger().info(f'Raw Laser Data : {msg.ranges}')
create_subscription
- LaserScan : Topic 통신에 사용될 Message Type
- "skidbot/scan" : 데이터를 Subscribe 받을 Topic의 이름
- self.sub_callback : callback 함수
- queue_size : publisher때와 마찬가지로, "대기열의 크기" 라고 이해하면 됨(데이터를 몇 개나 보관할지를 결정하는 QoS 옵션
LaserScan Message
LaserScan Type
실질적으로 lidar data를 담고 있는 부분은 ranges이며, ranges는 길이 720의 list로, 다음과 같이 0.25도의 분해능으로
scan된 물체와의 거리를 저장
전방 물체와의 거리만 관심있으면
laser_sub.py 중
def sub_callback(self, msg):
self.get_logger().info(f'Raw Laser Data : {msg.ranges[360]}')
laser_sub_node 실행
$ ros2 run py_topic_pkg laser_sub_node
Distance from Front Object : 4.441709995269775
Distance from Front Object : 4.453824043273926
Distance from Front Object : 4.432742595672607
Distance from Front Object : 4.467106342315674
...
Example(전진하다가 적외선 거리가 짧아지면 정지)
# Copyright 2021 Seoul Business Agency Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# !/usr/bin/env/ python3
import sys
from geometry_msgs.msg import Twist
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan
class ParkingNode(Node):
def __init__(self):
super().__init__('parking_node')
self.publisher = self.create_publisher(Twist, 'skidbot/cmd_vel', 10)
self.subscriber = self.create_subscription(
LaserScan, 'skidbot/scan', self.sub_callback, 10
)
self.subscriber # prevent unused variable warning
self.publisher # prevent unused variable warning
self.get_logger().info('==== Parking Node Started ====\n')
def sub_callback(self, msg):
twist_msg = Twist()
distance_forward = msg.ranges[360]
if distance_forward > 0.5:
self.get_logger().info(f'Distance from Front Object : {distance_forward}')
twist_msg.linear.x = 0.5
self.publisher.publish(twist_msg)
else:
self.get_logger().info('==== Parking Done!!! ====\n')
twist_msg.linear.x = 0.0
self.publisher.publish(twist_msg)
def main(args=None):
rclpy.init(args=args)
parking_node = ParkingNode()
try:
rclpy.spin(parking_node)
except KeyboardInterrupt:
parking_node.get_logger().info('==== Server stopped cleanly ====')
except BaseException:
parking_node.get_logger().info('!! Exception in server:', file=sys.stderr)
raise
finally:
# (optional - Done automatically when node is garbage collected)
rclpy.shutdown()
if __name__ == '__main__':
main()
'ROS2 > 기초' 카테고리의 다른 글
ROS2 3일차(1) Service (0) | 2024.09.23 |
---|---|
ROS2 2일차(5) tf2 (0) | 2024.09.20 |
ROS2 2일차(3) topic (0) | 2024.09.20 |
ROS2 2일차(2) launch (0) | 2024.09.20 |
ROS2 2일차(1) node (0) | 2024.09.20 |