ROS2 4일차(2) Maze World

2024. 9. 24. 17:05ROS2/기초

$ ros2 launch gcamp_gazebo maze_world.launch.py

# Terminal 1
$ ros2 run py_action_pkg maze_action_server

# Terminal 2
$ ros2 run py_action_pkg maze_action_client

 

Maze Action Client

maze_action_server.py

#!/usr/bin/env/ python3
#
# Copyright 2021 Seoul Business Agency Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import math
import time

from custom_interfaces.action import Maze
from geometry_msgs.msg import Twist
from nav_msgs.msg import Odometry

from py_action_pkg.image_sub import ImageSubscriber
from py_action_pkg.robot_controller import euler_from_quaternion

import rclpy
from rclpy.action import ActionServer
from rclpy.executors import MultiThreadedExecutor
from rclpy.node import Node

from sensor_msgs.msg import LaserScan

direction_dict = {0: (-1 * math.pi / 2), 1: math.pi, 2: math.pi / 2, 3: 0.0}
direction_str_dict = {0: 'Up', 1: 'Right', 2: 'Down', 3: 'Left'}

# Maze.action structure

#     int32[] turning_sequence
#     ---
#     bool success
#     ---
#     string feedback_msg


class MazeActionServer(Node):

    def __init__(self):
        super().__init__('maze_action_server')

        self.yaw = 0.0
        self.forward_distance = 0.0

        self.twist_msg = Twist()
        self.loop_rate = self.create_rate(5, self.get_clock())

        self.laser_sub = self.create_subscription(
            LaserScan, 'scan', self.laser_sub_cb, 10
        )

        self.odom_sub = self.create_subscription(
            Odometry, 'odom', self.odom_sub_cb, 10
        )

        self.cmd_vel_pub = self.create_publisher(Twist, 'cmd_vel', 10)

        timer_period = 0.1  # seconds
        self.timer = self.create_timer(timer_period, self.publish_callback)

        self._action_server = ActionServer(
            self, Maze, 'maze_action', self.execute_callback
        )
        self.get_logger().info('=== Maze Action Server Started ====')

    def laser_sub_cb(self, data):
        self.forward_distance = data.ranges[360]
        # self.get_logger().info(self.forward_distance)

    def odom_sub_cb(self, data):
        orientation = data.pose.pose.orientation
        _, _, self.yaw = euler_from_quaternion(orientation)
        # self.get_logger().info(f'yaw : {self.yaw}')

    def publish_callback(self):
        self.cmd_vel_pub.publish(self.twist_msg)

    def turn_robot(self, euler_angle):
        self.get_logger().info(f'Robot Turns to {euler_angle}')

        turn_offset = 100

        while abs(turn_offset) > 0.087:
            turn_offset = 0.7 * (euler_angle - self.yaw)
            self.twist_msg.linear.x = 0.0
            self.twist_msg.angular.z = turn_offset
            self.cmd_vel_pub.publish(self.twist_msg)

        self.stop_robot()

    def parking_robot(self):

        while self.forward_distance > 1.0:
            self.twist_msg.linear.x = 0.5
            self.twist_msg.angular.z = 0.0

            self.cmd_vel_pub.publish(self.twist_msg)

        self.stop_robot()

    def stop_robot(self):
        self.twist_msg.linear.x = 0.0
        self.twist_msg.angular.z = 0.0
        self.cmd_vel_pub.publish(self.twist_msg)

        time.sleep(1)

    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')

        feedback = Maze.Feedback()
        feedback.feedback_msg = ''

        for _, val in enumerate(goal_handle.request.turning_sequence):
            self.get_logger().info(f'Current Cmd: {val}')

            feedback.feedback_msg = f'Turning {direction_str_dict[val]}'

            self.turn_robot(direction_dict[val])
            self.parking_robot()

            goal_handle.publish_feedback(feedback)

        image_sub_node = ImageSubscriber()
        rclpy.spin_once(image_sub_node)
        center_pixel = image_sub_node.center_pixel

        if sum(center_pixel) < 300 and center_pixel[1] > 100:
            goal_handle.succeed()
            self.get_logger().warn('==== Succeed ====')
            result = Maze.Result()
            result.success = True
        else:
            goal_handle.abort()
            self.get_logger().error('==== Fail ====')
            result = Maze.Result()
            result.success = False

        return result


def main(args=None):
    rclpy.init(args=args)

    # Referenced from robotpilot/ros2-seminar-examples
    # https://github.com/robotpilot/ros2-seminar-examples/blob/main/topic_service_action_rclpy_example/topic_service_action_rclpy_example/calculator/main.py
    try:
        maze_action_server = MazeActionServer()
        executor = MultiThreadedExecutor()
        executor.add_node(maze_action_server)
        try:
            executor.spin()
        except KeyboardInterrupt:
            maze_action_server.get_logger().info('Keyboard Interrupt (SIGINT)')
        finally:
            executor.shutdown()
            maze_action_server._action_server.destroy()
            maze_action_server.destroy_node()
    finally:
        rclpy.shutdown()


if __name__ == '__main__':
    main()

 

from rclpy.action import ActionServer

from custom_interfaces.action import Maze

direction_str_dict = {0: "Up", 1: "Right", 2: "Down", 3: "Left"}

        self._action_server = ActionServer(
            self, Maze, "maze_action", self.execute_callback
        )

		...

    def execute_callback(self, goal_handle):
        self.get_logger().info("Executing goal...")

        feedback = Maze.Feedback()
        feedback.feedback_msg = ""
				
        # 일전 피보나치 예제와 유사하게 동작합니다.
        for _, val in enumerate(goal_handle.request.turning_sequence):
            self.get_logger().info(f'Current Cmd: {val}')

            feedback.feedback_msg = f"Turning {direction_str_dict[val]}"

            self.turn_robot(direction_dict[val])
            self.parking_robot()

            goal_handle.publish_feedback(feedback)

        return result

def main(args=None):
    rclpy.init(args=args)

    # Referenced from robotpilot/ros2-seminar-examples
    # https://github.com/robotpilot/ros2-seminar-examples/blob/main/topic_service_action_rclpy_example/topic_service_action_rclpy_example/calculator/main.py
    try:
        maze_action_server = MazeActionServer()
        # MultiThreadedExecutor에 대해서는 Action 시간에 살펴보았습니다.
        # 혹 해당 강의를 건너뛰신 분은, 여러 publish, subscribe, action들을 
        # 효율적으로 동시에 작업하기 위한 것이라 생각하시면 됩니다.
        executor = MultiThreadedExecutor()
        executor.add_node(maze_action_server)
        try:
            executor.spin()
        except KeyboardInterrupt:
            maze_action_server.get_logger().info("Keyboard Interrupt (SIGINT)")
        finally:
            executor.shutdown()
            maze_action_server._action_server.destroy()
            maze_action_server.destroy_node()
    finally:
        rclpy.shutdown()

if __name__ == "__main__":
    main()

 

특정 방향으로 로봇을 회전시키기

from py_action_pkg.robot_controller import euler_from_quaternion
from nav_msgs.msg import Odometry
...

        self.odom_sub = self.create_subscription(
            Odometry, "/diffbot/odom", self.odom_sub_cb, 10
        )

...

def turn_robot(self, euler_angle):
    self.get_logger().info(f'Robot Turns to {euler_angle}')

    turn_offset = 100
		
    # 각도 한계를 너무 적게 주면, 0 -> -3.14로 갑자기 변해버리는 오류가 발생할 수 있습니다.
    while abs(turn_offset) > 0.087:
        # 로봇은 가속도를 갖기에 갑작스러운 변화를 줄 수 없음을 고려합니다.
        turn_offset = 0.7 * (euler_angle - self.yaw)
        self.twist_msg.linear.x = 0.0
        self.twist_msg.angular.z = turn_offset
        self.cmd_vel_pub.publish(self.twist_msg)
			
    # 회전 이후, 로봇을 정지시킵니다.
    self.stop_robot()

 

충돌 전까지 로봇을 직진시키기

from sensor_msgs.msg import LaserScan
  ...

        self.laser_sub = self.create_subscription(
            LaserScan, "/diffbot/scan", self.laser_sub_cb, 10
        )

		...

def laser_sub_cb(self, data):
        self.forward_distance = data.ranges[360]
		...

def parking_robot(self):

    while self.forward_distance > 1.0:
        self.twist_msg.linear.x = 0.5
        self.twist_msg.angular.z = 0.0

        self.cmd_vel_pub.publish(self.twist_msg)

    self.stop_robot()

 

초록 박스 인식하기

from py_action_pkg.image_sub import ImageSubscriber

...
				
				
        for _, val in enumerate(goal_handle.request.turning_sequence):
            self.get_logger().info(f'Current Cmd: {val}')

            feedback.feedback_msg = f"Turning {direction_str_dict[val]}"

            self.turn_robot(direction_dict[val])
            self.parking_robot()

            goal_handle.publish_feedback(feedback)

        # 모든 execution을 마친 시점에서, ImageSubscriber를 1회 실행합니다.
        # 이를 통해 전방 이미지의 중간 pixel값을 얻을 수 있습니다.
        image_sub_node = ImageSubscriber()
        rclpy.spin_once(image_sub_node)
        center_pixel = image_sub_node.center_pixel

        # 전방 pixel값을 사용하여 초록 박스 앞인지의 유무를 판단합니다.
        # 이는 곧 미로의 탈출지점에 도착했는지의 여부와 동일합니다.
        if sum(center_pixel) < 300 and center_pixel[1] > 100:
            goal_handle.succeed()
            self.get_logger().warn("==== Succeed ====")
            result = Maze.Result()
            result.success = True
        else:
            goal_handle.abort()
            self.get_logger().error("==== Fail ====")
            result = Maze.Result()
            result.success = False

        return result

 

Maze Action Client

maze_action_client.py

#!/usr/bin/env/ python3
#
# Copyright 2021 Seoul Business Agency Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from custom_interfaces.action import Maze
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node

"""
Maze.action structure

int32[] turning_sequence
---
bool success
---
string feedback_msg
"""


class MazeActionClient(Node):

    def __init__(self):
        super().__init__('maze_action_client')
        self.action_client = ActionClient(self, Maze, 'diffbot/maze_action')
        self.get_logger().info('=== Maze Action Client Started ====')

    def send_goal(self, turning_list):
        goal_msg = Maze.Goal()
        goal_msg.turning_sequence = turning_list

        if self.action_client.wait_for_server(10) is False:
            self.get_logger().error('Server Not exists')

        self._send_goal_future = self.action_client.send_goal_async(
            goal_msg, feedback_callback=self.feedback_callback
        )

        self._send_goal_future.add_done_callback(self.goal_response_callback)

    def feedback_callback(self, feedback_message):
        feedback = feedback_message.feedback
        self.get_logger().info(f'Received feedback: {feedback.feedback_msg}')

    def goal_response_callback(self, future):
        goal_handle = future.result()

        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected')
            return

        self.get_logger().info('Goal accepted')

        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

    def get_result_callback(self, future):
        result = future.result().result
        self.get_logger().warn(f'Action Done !! Result: {result.success}')
        rclpy.shutdown()


def main(args=None):
    rclpy.init(args=args)

    maze_action_client = MazeActionClient()
    user_inputs = []

    # Input Logic
    try:
        maze_action_client.get_logger().info('Enter numbers [or stop] : ')

        while True:
            user_inputs.append(int(input()))
    # if the input is not-integer, just print the list
    except Exception:
        maze_action_client.get_logger().info(f'Your sequence list : {user_inputs}')
    maze_action_client.get_logger().info('==== Sending Goal ====')
    maze_action_client.send_goal(user_inputs)

    # You can get Future for additional functoins
    # future = maze_action_client.send_goal(user_inputs)

    rclpy.spin(maze_action_client)


if __name__ == '__main__':
    main()

 

main

def main(args=None):
    rclpy.init(args=args)

    maze_action_client = MazeActionClient()
    user_inputs = []
    # Input Logic
    try:
        maze_action_client.get_logger().info('Enter numbers [or stop] : ')

        while True:
            user_inputs.append(int(input()))
    # if the input is not-integer, just print the list
    except:
        maze_action_client.get_logger().info(f'Your sequence list : {user_inputs}')
    maze_action_client.get_logger().info('==== Sending Goal ====')
    future = maze_action_client.send_goal(user_inputs)

    rclpy.spin(maze_action_client)

 

여러 로봇을 사용하고 싶을 때

    maze_action_srv_node = Node(
        package='py_action_pkg',
        namespace='diffbot',
        executable='maze_action_server',
        name='maze_action_server',
        output='screen'
    )

 

launch file에서 이 옵션을 줄 수 있고, 위와 같이 작성된 launch 파일을 실행시키면, 해당 Node안의 모든 통신은 /diffbot으로 시작하게 된다.

 

1. 기존 /diffbot이 붙던 토픽 이름을 모두 변경해 준다.

        self.laser_sub = self.create_subscription(
            LaserScan, "/diffbot/scan", self.laser_sub_cb, 10
        )

        self.odom_sub = self.create_subscription(
            Odometry, "/diffbot/odom", self.odom_sub_cb, 10
        )

        self.cmd_vel_pub = self.create_publisher(Twist, "/diffbot/cmd_vel", 10)

...

        self.laser_sub = self.create_subscription(
            LaserScan, 'scan', self.laser_sub_cb, 10
        )

        self.odom_sub = self.create_subscription(
            Odometry, 'odom', self.odom_sub_cb, 10
        )

        self.cmd_vel_pub = self.create_publisher(Twist, 'cmd_vel', 10)

 

2. launch file을 작성하며, 이때 namespace option을 지정한다.

from launch import LaunchDescription
from launch.substitutions import LaunchConfiguration

from launch_ros.actions import Node


def generate_launch_description():

    maze_action_srv_node = Node(
        package='py_action_pkg',
        namespace='diffbot',
        executable='maze_action_server',
        name='maze_action_server',
        output='screen'
    )

    return LaunchDescription(
        [
            maze_action_srv_node,
        ]
    )

 

3. setup.py에도 launch file을 추가하는 라인이 필요하다.

from glob import glob

import os

from setuptools import setup

package_name = 'py_action_pkg'

setup(
    name=package_name,
    version='0.0.0',
    packages=[package_name],
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
        (os.path.join('share', package_name, 'launch'), glob('launch/*.launch.py')),
    ],

'ROS2 > 기초' 카테고리의 다른 글

ubuntu 24.04 ssh 설정 + docker 설정  (0) 2024.12.06
ROS2 - topic(c++)  (0) 2024.10.21
ROS2 4일차(1) Maze World  (0) 2024.09.24
ROS2 3일차(4) Action 프로그래밍  (0) 2024.09.23
ROS2 3일차(3) Action  (0) 2024.09.23