ROS2 4일차(2) Maze World
2024. 9. 24. 17:05ㆍROS2/기초
$ ros2 launch gcamp_gazebo maze_world.launch.py
# Terminal 1
$ ros2 run py_action_pkg maze_action_server
# Terminal 2
$ ros2 run py_action_pkg maze_action_client
Maze Action Client
maze_action_server.py
#!/usr/bin/env/ python3
#
# Copyright 2021 Seoul Business Agency Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
import time
from custom_interfaces.action import Maze
from geometry_msgs.msg import Twist
from nav_msgs.msg import Odometry
from py_action_pkg.image_sub import ImageSubscriber
from py_action_pkg.robot_controller import euler_from_quaternion
import rclpy
from rclpy.action import ActionServer
from rclpy.executors import MultiThreadedExecutor
from rclpy.node import Node
from sensor_msgs.msg import LaserScan
direction_dict = {0: (-1 * math.pi / 2), 1: math.pi, 2: math.pi / 2, 3: 0.0}
direction_str_dict = {0: 'Up', 1: 'Right', 2: 'Down', 3: 'Left'}
# Maze.action structure
# int32[] turning_sequence
# ---
# bool success
# ---
# string feedback_msg
class MazeActionServer(Node):
def __init__(self):
super().__init__('maze_action_server')
self.yaw = 0.0
self.forward_distance = 0.0
self.twist_msg = Twist()
self.loop_rate = self.create_rate(5, self.get_clock())
self.laser_sub = self.create_subscription(
LaserScan, 'scan', self.laser_sub_cb, 10
)
self.odom_sub = self.create_subscription(
Odometry, 'odom', self.odom_sub_cb, 10
)
self.cmd_vel_pub = self.create_publisher(Twist, 'cmd_vel', 10)
timer_period = 0.1 # seconds
self.timer = self.create_timer(timer_period, self.publish_callback)
self._action_server = ActionServer(
self, Maze, 'maze_action', self.execute_callback
)
self.get_logger().info('=== Maze Action Server Started ====')
def laser_sub_cb(self, data):
self.forward_distance = data.ranges[360]
# self.get_logger().info(self.forward_distance)
def odom_sub_cb(self, data):
orientation = data.pose.pose.orientation
_, _, self.yaw = euler_from_quaternion(orientation)
# self.get_logger().info(f'yaw : {self.yaw}')
def publish_callback(self):
self.cmd_vel_pub.publish(self.twist_msg)
def turn_robot(self, euler_angle):
self.get_logger().info(f'Robot Turns to {euler_angle}')
turn_offset = 100
while abs(turn_offset) > 0.087:
turn_offset = 0.7 * (euler_angle - self.yaw)
self.twist_msg.linear.x = 0.0
self.twist_msg.angular.z = turn_offset
self.cmd_vel_pub.publish(self.twist_msg)
self.stop_robot()
def parking_robot(self):
while self.forward_distance > 1.0:
self.twist_msg.linear.x = 0.5
self.twist_msg.angular.z = 0.0
self.cmd_vel_pub.publish(self.twist_msg)
self.stop_robot()
def stop_robot(self):
self.twist_msg.linear.x = 0.0
self.twist_msg.angular.z = 0.0
self.cmd_vel_pub.publish(self.twist_msg)
time.sleep(1)
def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
feedback = Maze.Feedback()
feedback.feedback_msg = ''
for _, val in enumerate(goal_handle.request.turning_sequence):
self.get_logger().info(f'Current Cmd: {val}')
feedback.feedback_msg = f'Turning {direction_str_dict[val]}'
self.turn_robot(direction_dict[val])
self.parking_robot()
goal_handle.publish_feedback(feedback)
image_sub_node = ImageSubscriber()
rclpy.spin_once(image_sub_node)
center_pixel = image_sub_node.center_pixel
if sum(center_pixel) < 300 and center_pixel[1] > 100:
goal_handle.succeed()
self.get_logger().warn('==== Succeed ====')
result = Maze.Result()
result.success = True
else:
goal_handle.abort()
self.get_logger().error('==== Fail ====')
result = Maze.Result()
result.success = False
return result
def main(args=None):
rclpy.init(args=args)
# Referenced from robotpilot/ros2-seminar-examples
# https://github.com/robotpilot/ros2-seminar-examples/blob/main/topic_service_action_rclpy_example/topic_service_action_rclpy_example/calculator/main.py
try:
maze_action_server = MazeActionServer()
executor = MultiThreadedExecutor()
executor.add_node(maze_action_server)
try:
executor.spin()
except KeyboardInterrupt:
maze_action_server.get_logger().info('Keyboard Interrupt (SIGINT)')
finally:
executor.shutdown()
maze_action_server._action_server.destroy()
maze_action_server.destroy_node()
finally:
rclpy.shutdown()
if __name__ == '__main__':
main()
from rclpy.action import ActionServer
from custom_interfaces.action import Maze
direction_str_dict = {0: "Up", 1: "Right", 2: "Down", 3: "Left"}
self._action_server = ActionServer(
self, Maze, "maze_action", self.execute_callback
)
...
def execute_callback(self, goal_handle):
self.get_logger().info("Executing goal...")
feedback = Maze.Feedback()
feedback.feedback_msg = ""
# 일전 피보나치 예제와 유사하게 동작합니다.
for _, val in enumerate(goal_handle.request.turning_sequence):
self.get_logger().info(f'Current Cmd: {val}')
feedback.feedback_msg = f"Turning {direction_str_dict[val]}"
self.turn_robot(direction_dict[val])
self.parking_robot()
goal_handle.publish_feedback(feedback)
return result
def main(args=None):
rclpy.init(args=args)
# Referenced from robotpilot/ros2-seminar-examples
# https://github.com/robotpilot/ros2-seminar-examples/blob/main/topic_service_action_rclpy_example/topic_service_action_rclpy_example/calculator/main.py
try:
maze_action_server = MazeActionServer()
# MultiThreadedExecutor에 대해서는 Action 시간에 살펴보았습니다.
# 혹 해당 강의를 건너뛰신 분은, 여러 publish, subscribe, action들을
# 효율적으로 동시에 작업하기 위한 것이라 생각하시면 됩니다.
executor = MultiThreadedExecutor()
executor.add_node(maze_action_server)
try:
executor.spin()
except KeyboardInterrupt:
maze_action_server.get_logger().info("Keyboard Interrupt (SIGINT)")
finally:
executor.shutdown()
maze_action_server._action_server.destroy()
maze_action_server.destroy_node()
finally:
rclpy.shutdown()
if __name__ == "__main__":
main()
특정 방향으로 로봇을 회전시키기
from py_action_pkg.robot_controller import euler_from_quaternion
from nav_msgs.msg import Odometry
...
self.odom_sub = self.create_subscription(
Odometry, "/diffbot/odom", self.odom_sub_cb, 10
)
...
def turn_robot(self, euler_angle):
self.get_logger().info(f'Robot Turns to {euler_angle}')
turn_offset = 100
# 각도 한계를 너무 적게 주면, 0 -> -3.14로 갑자기 변해버리는 오류가 발생할 수 있습니다.
while abs(turn_offset) > 0.087:
# 로봇은 가속도를 갖기에 갑작스러운 변화를 줄 수 없음을 고려합니다.
turn_offset = 0.7 * (euler_angle - self.yaw)
self.twist_msg.linear.x = 0.0
self.twist_msg.angular.z = turn_offset
self.cmd_vel_pub.publish(self.twist_msg)
# 회전 이후, 로봇을 정지시킵니다.
self.stop_robot()
충돌 전까지 로봇을 직진시키기
from sensor_msgs.msg import LaserScan
...
self.laser_sub = self.create_subscription(
LaserScan, "/diffbot/scan", self.laser_sub_cb, 10
)
...
def laser_sub_cb(self, data):
self.forward_distance = data.ranges[360]
...
def parking_robot(self):
while self.forward_distance > 1.0:
self.twist_msg.linear.x = 0.5
self.twist_msg.angular.z = 0.0
self.cmd_vel_pub.publish(self.twist_msg)
self.stop_robot()
초록 박스 인식하기
from py_action_pkg.image_sub import ImageSubscriber
...
for _, val in enumerate(goal_handle.request.turning_sequence):
self.get_logger().info(f'Current Cmd: {val}')
feedback.feedback_msg = f"Turning {direction_str_dict[val]}"
self.turn_robot(direction_dict[val])
self.parking_robot()
goal_handle.publish_feedback(feedback)
# 모든 execution을 마친 시점에서, ImageSubscriber를 1회 실행합니다.
# 이를 통해 전방 이미지의 중간 pixel값을 얻을 수 있습니다.
image_sub_node = ImageSubscriber()
rclpy.spin_once(image_sub_node)
center_pixel = image_sub_node.center_pixel
# 전방 pixel값을 사용하여 초록 박스 앞인지의 유무를 판단합니다.
# 이는 곧 미로의 탈출지점에 도착했는지의 여부와 동일합니다.
if sum(center_pixel) < 300 and center_pixel[1] > 100:
goal_handle.succeed()
self.get_logger().warn("==== Succeed ====")
result = Maze.Result()
result.success = True
else:
goal_handle.abort()
self.get_logger().error("==== Fail ====")
result = Maze.Result()
result.success = False
return result
Maze Action Client
maze_action_client.py
#!/usr/bin/env/ python3
#
# Copyright 2021 Seoul Business Agency Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from custom_interfaces.action import Maze
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
"""
Maze.action structure
int32[] turning_sequence
---
bool success
---
string feedback_msg
"""
class MazeActionClient(Node):
def __init__(self):
super().__init__('maze_action_client')
self.action_client = ActionClient(self, Maze, 'diffbot/maze_action')
self.get_logger().info('=== Maze Action Client Started ====')
def send_goal(self, turning_list):
goal_msg = Maze.Goal()
goal_msg.turning_sequence = turning_list
if self.action_client.wait_for_server(10) is False:
self.get_logger().error('Server Not exists')
self._send_goal_future = self.action_client.send_goal_async(
goal_msg, feedback_callback=self.feedback_callback
)
self._send_goal_future.add_done_callback(self.goal_response_callback)
def feedback_callback(self, feedback_message):
feedback = feedback_message.feedback
self.get_logger().info(f'Received feedback: {feedback.feedback_msg}')
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected')
return
self.get_logger().info('Goal accepted')
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
result = future.result().result
self.get_logger().warn(f'Action Done !! Result: {result.success}')
rclpy.shutdown()
def main(args=None):
rclpy.init(args=args)
maze_action_client = MazeActionClient()
user_inputs = []
# Input Logic
try:
maze_action_client.get_logger().info('Enter numbers [or stop] : ')
while True:
user_inputs.append(int(input()))
# if the input is not-integer, just print the list
except Exception:
maze_action_client.get_logger().info(f'Your sequence list : {user_inputs}')
maze_action_client.get_logger().info('==== Sending Goal ====')
maze_action_client.send_goal(user_inputs)
# You can get Future for additional functoins
# future = maze_action_client.send_goal(user_inputs)
rclpy.spin(maze_action_client)
if __name__ == '__main__':
main()
main
def main(args=None):
rclpy.init(args=args)
maze_action_client = MazeActionClient()
user_inputs = []
# Input Logic
try:
maze_action_client.get_logger().info('Enter numbers [or stop] : ')
while True:
user_inputs.append(int(input()))
# if the input is not-integer, just print the list
except:
maze_action_client.get_logger().info(f'Your sequence list : {user_inputs}')
maze_action_client.get_logger().info('==== Sending Goal ====')
future = maze_action_client.send_goal(user_inputs)
rclpy.spin(maze_action_client)
여러 로봇을 사용하고 싶을 때
maze_action_srv_node = Node(
package='py_action_pkg',
namespace='diffbot',
executable='maze_action_server',
name='maze_action_server',
output='screen'
)
launch file에서 이 옵션을 줄 수 있고, 위와 같이 작성된 launch 파일을 실행시키면, 해당 Node안의 모든 통신은 /diffbot으로 시작하게 된다.
1. 기존 /diffbot이 붙던 토픽 이름을 모두 변경해 준다.
self.laser_sub = self.create_subscription(
LaserScan, "/diffbot/scan", self.laser_sub_cb, 10
)
self.odom_sub = self.create_subscription(
Odometry, "/diffbot/odom", self.odom_sub_cb, 10
)
self.cmd_vel_pub = self.create_publisher(Twist, "/diffbot/cmd_vel", 10)
...
self.laser_sub = self.create_subscription(
LaserScan, 'scan', self.laser_sub_cb, 10
)
self.odom_sub = self.create_subscription(
Odometry, 'odom', self.odom_sub_cb, 10
)
self.cmd_vel_pub = self.create_publisher(Twist, 'cmd_vel', 10)
2. launch file을 작성하며, 이때 namespace option을 지정한다.
from launch import LaunchDescription
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
maze_action_srv_node = Node(
package='py_action_pkg',
namespace='diffbot',
executable='maze_action_server',
name='maze_action_server',
output='screen'
)
return LaunchDescription(
[
maze_action_srv_node,
]
)
3. setup.py에도 launch file을 추가하는 라인이 필요하다.
from glob import glob
import os
from setuptools import setup
package_name = 'py_action_pkg'
setup(
name=package_name,
version='0.0.0',
packages=[package_name],
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
(os.path.join('share', package_name, 'launch'), glob('launch/*.launch.py')),
],
'ROS2 > 기초' 카테고리의 다른 글
ubuntu 24.04 ssh 설정 + docker 설정 (0) | 2024.12.06 |
---|---|
ROS2 - topic(c++) (0) | 2024.10.21 |
ROS2 4일차(1) Maze World (0) | 2024.09.24 |
ROS2 3일차(4) Action 프로그래밍 (0) | 2024.09.23 |
ROS2 3일차(3) Action (0) | 2024.09.23 |