ROS2 3일차(2) Service 프로그래밍

2024. 9. 23. 11:21ROS2/기초

Service는 Server와 Client로 나뉨

Client를 프로그래밍 하는 방법

지난 시간에 실행시켰던, 빨간 로봇을 등장시키는 client

py_service_pkg  => spawn_model.py

# !/usr/bin/env/ python3
#
# Copyright 2021 Seoul Business Agency Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Referenced from Below Link
# https://discourse.ros.org/t/spawning-a-robot-entity-using-a-node-with-gazebo-and-ros-2/9985


import os

from ament_index_python.packages import get_package_share_directory
from gazebo_msgs.srv import SpawnEntity
import rclpy
from rclpy.node import Node


class SpawnRobot(Node):

    def __init__(self):
        super().__init__('gazebo_model_spawner')
        self.client = self.create_client(SpawnEntity, '/spawn_entity')

        while not self.client.wait_for_service(timeout_sec=1.0):
            self.get_logger().error('service not available, waiting again...')

        # Get urdf path
        self.urdf_file_path = os.path.join(
            get_package_share_directory('gcamp_gazebo'),
            'urdf',
            'skidbot2.urdf',
        )

        self.req = SpawnEntity.Request()

    def send_req(self):
        self.req.name = 'skidbot2'
        self.req.xml = open(self.urdf_file_path, 'r').read()
        self.req.robot_namespace = 'skidbot2'
        self.req.initial_pose.position.x = 1.0
        self.req.initial_pose.position.y = 1.0
        self.req.initial_pose.position.z = 0.3

        self.get_logger().debug('==== Sending service request to `/spawn_entity` ====')
        self.future = self.client.call_async(self.req)

        return self.future


def main(args=None):

    rclpy.init(args=args)

    robot_spawn_node = SpawnRobot()
    future = robot_spawn_node.send_req()

    rclpy.spin_until_future_complete(robot_spawn_node, future)

    if future.done():
        try:
            response = future.result()
        except Exception:
            raise RuntimeError(
                'exception while calling service: %r' % future.exception()
            )
        else:
            robot_spawn_node.get_logger().info('==== Service Call Done ====')
            robot_spawn_node.get_logger().info(f'Status_message : {response.status_message}')
        finally:
            robot_spawn_node.get_logger().warn('==== Shutting down node. ====')
            robot_spawn_node.destroy_node()
            rclpy.shutdown()


if __name__ == '__main__':
    main()


import 부분

#!/usr/bin/env/ python3

import os

# 패키지 폴더 내부에 위치한  파일에 접근하기 위해 사용됩니다.
from ament_index_python.packages import get_package_share_directory
# 이 client는 SpawnEntity라는 srv 타입을 사용한다고 살펴본 바 있지요?
from gazebo_msgs.srv import SpawnEntity
import rclpy
from rclpy.node import Node

 

class 내부

class SpawnRobot(Node):

    def __init__(self):
        super().__init__("gazebo_model_spawner")
				# Service Client는 다음과 같이 생성합니다. 하단에 보충 설명을 더하겠습니다. 
        self.client = self.create_client(SpawnEntity, "/spawn_entity")

				# Server가 없으면 아무리 request를 해도 대답이 없을 것입니다.
				# 이를 방지하기 위해 최대로 Server를 기다릴 시간을 지정합니다.
        while not self.client.wait_for_service(timeout_sec=1.0):
            self.get_logger().error("service not available, waiting again...")

				# 등장시킬 로봇의 외형을 담은 파일에 접근합니다.
        # Get urdf path
        self.urdf_file_path = os.path.join(
            get_package_share_directory("gcamp_gazebo"),
            "urdf",
            "skidbot2.urdf",
        )

				# request를 선언합니다.
        self.req = SpawnEntity.Request()

get_package_share_directory를 통해 패키지의 파일시스템에 접근할 수 있게 됩니다.

위 코드의 경우 gcamp_gazebo/urdf 폴더에 접근하고 있음

 

create_client

  • SpawnEntity : 사용할 srv 타입
  • /spawn_entity : request를 보낼 Service 이름. 이 이름이 server가 지정한 service 이름과 일치해야만 제대로 request를 보낼 수 있다.

 

실질적으로 request를 보내는 부분

    def send_req(self):
				# request의 각 매개변수를 올바르게 채워줍니다.
				# 앞서 선언한 urdf 파일도 전달하고, 로봇을 등장시킬 위치도 지정합니다.
        self.req.name = "skidbot2"
        self.req.xml = open(self.urdf_file_path, "r").read()
        self.req.robot_namespace = "skidbot2"
        self.req.initial_pose.position.x = 1.0
        self.req.initial_pose.position.y = 1.0
        self.req.initial_pose.position.z = 0.3

				# request call 이후 상태를 반환하는 future를 예약받습니다.
        print("==== Sending service request to `/spawn_entity` ====")
				# future란, 해당 작업이 완료될 것임의 약속입니다.
        self.future = self.client.call_async(self.req)

        return self.future

 

future

특정 작업에 대한 약속을 받아내는 것

    future = robot_spawn_node.send_req()

    rclpy.spin_until_future_complete(robot_spawn_node, future)
  • robot_spawn_node.send_req()는 반드시 끝날 것을 보장(=약속=promise)하니, 그동안 robot_spawn_node를 spin 시키고 있을게 라는 뜻

main 분석

def main(args=None):

    rclpy.init(args=args)

    robot_spawn_node = SpawnRobot()
    # robot_spawn_node으로부터 request call이 완료될 것임을 약속받았습니다.
    future = robot_spawn_node.send_req()

    # 이 약속을 지킬때까지 node는 spin 상태에 접어듭니다.
    rclpy.spin_until_future_complete(robot_spawn_node, future)

    # request call, response가 끝이 났습니다.
    # 이때, future는 response를 담고 있습니다.
    if future.done():
        # result가 제대로 전달되는지의 여부에 따라 예외 처리를 하고 있습니다.
        try:
            response = future.result()
        except Exception as e:
            raise RuntimeError(
                "exception while calling service: %r" % future.exception()
            )
        else:
            # 정상적으로 response가 도착했다는 로그를 출력합니다.
            robot_spawn_node.get_logger().info('==== Service Call Done ====')
            robot_spawn_node.get_logger().info(f'Status_message : {response.status_message}')
        finally:
            robot_spawn_node.get_logger().warn("==== Shutting down node. ====")
            robot_spawn_node.destroy_node()
            rclpy.shutdown()
  • client는 server에게 request를 보내고 response를 기다린다

Turning Server Client

# Terminal 1
ros2 launch gcamp_gazebo gcamp_world.launch.py

# Terminal 2
ros2 run py_service_pkg robot_turning_server

# Terminal 3 Service Client Call
ros2 run py_service_pkg robot_turning_client
> Type turning time duration: 5
> Type turning linear velocity: 0.5
> Type turning angular velocity: 1.0
[INFO] [1727056966.078971775] [robot_turn_client]: linear_x : 0.5 / angular_z : 1.0
[INFO] [1727056966.079202082] [robot_turn_client]:  Request Sended 
[INFO] [1727056971.001124718] [robot_turn_client]: ==== Service Call Done ====
[INFO] [1727056971.001350904] [robot_turn_client]: Result Message : Success
[WARN] [1727056971.001599853] [robot_turn_client]: ==== Shutting down node ====
  • 지정한 시간 동안
  • 정방향 선속도와
  • 정방향 각속도(z축 기준 반시계방향)를 갖고 로봇을 움직이게 된다.

Service Server 작성

robot_turning_srv.py

# !/usr/bin/env/ python3
#
# Copyright 2021 Seoul Business Agency Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from custom_interfaces.srv import TurningControl
from geometry_msgs.msg import Twist
import rclpy
from rclpy.node import Node


# uint32 time_duration
# float64 angular_vel_z
# float64 linear_vel_x
# ---
# bool success


class RobotTurnServer(Node):

    def __init__(self):
        super().__init__('robot_turn_server')
        self.publisher = self.create_publisher(Twist, 'skidbot/cmd_vel', 10)
        self.srv = self.create_service(
            TurningControl, 'turn_robot', self.robot_turn_callback
        )
        self.twist_msg = Twist()
        self.start_time = self.get_clock().now().to_msg().sec

        self.get_logger().info('==== Robot Turning Server Started, Waiting for Request ====')

    def move_robot(self, seconds=1, linear_x=0.0, angular_z=0.0):
        self.twist_msg.linear.x = linear_x
        self.twist_msg.angular.z = angular_z

        clock_now = self.get_clock().now().to_msg().sec
        self.get_logger().info('Robot Moves')
        self.get_logger().info(f'Move Commands = linear_x : {linear_x} / angular_z : {angular_z}')

        while (clock_now - self.start_time) < seconds:
            clock_now = self.get_clock().now().to_msg().sec
            self.publisher.publish(self.twist_msg)

    def stop_robot(self):
        self.twist_msg.linear.x = 0.0
        self.twist_msg.angular.z = 0.0

        self.publisher.publish(self.twist_msg)
        self.get_logger().info('Robot Stop')

    def robot_turn_callback(self, request, response):
        self.start_time = self.get_clock().now().to_msg().sec

        self.move_robot(
            request.time_duration, request.linear_vel_x, request.angular_vel_z
        )
        self.stop_robot()

        response.success = True
        self.get_logger().info('Servie Process Done...')

        return response


def main(args=None):
    rclpy.init(args=args)

    robot_turn_server = RobotTurnServer()

    rclpy.spin(robot_turn_server)

    robot_turn_server.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

 

내부 동작은 다음과 같다.

  1. Service를 생성하고
  2. request를 받아
  3. 그에 대응하는 작업을 수행한 뒤
  4. response를 되돌려준다.

service 생성

        self.srv = self.create_service(
            TurningControl, "turn_robot", self.robot_turn_callback
        )
  1. 사용되는 srv 타입과
  2. service 이름,
  3. request가 들어올 때마다 실행될 callback을 매개변수로 받습니다.

robot_turn_callback

    def robot_turn_callback(self, request, response):
        # 현재 시간을 저장합니다. 이 시점으로부터 특정 시간만큼 로봇이 회전할 것입니다.
        self.start_time = self.get_clock().now().to_msg().sec

        # move_robot은 topic 시간에 살펴본 바와 동일합니다.
        self.move_robot(
            request.time_duration, request.linear_vel_x, request.angular_vel_z
        )
        self.stop_robot()
			
        # response를 채워줍니다.
        response.success = True
        self.get_logger().info("Servie Process Done...")

        return response

...

 

main

def main(args=None):
    rclpy.init(args=args)

    robot_turn_server = RobotTurnServer()

    rclpy.spin(robot_turn_server)

    robot_turn_server.destroy_node()
    rclpy.shutdown()

main을 보면, spin으로 되어있기에 위 과정들은 무한해서 반복된다.

항시 대기하다가, 새로운 request를 처리하고, 기다리고, 처리하고 이렇게 반복하는 것

 


Topic을 통해 로봇을 움직인다.

    ...		
    self.publisher = self.create_publisher(Twist, "/skidbot/cmd_vel", 10)
    ...

    def move_robot(self, seconds=1, linear_x=0.0, angular_z=0.0):
        # request로 부터 받은 제어 신호를 topic msg에 채워넣습니다.
        self.twist_msg.linear.x = linear_x
        self.twist_msg.angular.z = angular_z
	
       # 시간의 기준이 될 현 시간을 리셋합니다.
        clock_now = self.get_clock().now().to_msg().sec
        self.get_logger().info("Robot Moves")
        print(f"Move Commands = linear_x : {linear_x} / angular_z : {angular_z}")

        #  기준 시간으로부터 request받은 특정 시간 동안 publish를 진행합니다.
        while (clock_now - self.start_time) < seconds:
            clock_now = self.get_clock().now().to_msg().sec
            self.publisher.publish(self.twist_msg)

    def stop_robot(self):
        # 로봇의 정지는 일전의 제어 신호를 모두 0으로 만들면 됩니다.
        self.twist_msg.linear.x = 0.0
        self.twist_msg.angular.z = 0.0

        self.publisher.publish(self.twist_msg)
        self.get_logger().info("Robot Stop")

 

주의해야 할 점

move_robot 을 처리하는 동안에는 다른 request를 포함하여 어떠한 개입도 받을 수 없다.

=> 이에 따라, move_robot 내에 검증되지 않은 동작이 위치하게 되면, 실제 로봇이 움직인다고 상상했을 때, 큰 사고로 이어질 수 있다.

 

'ROS2 > 기초' 카테고리의 다른 글

ROS2 3일차(4) Action 프로그래밍  (0) 2024.09.23
ROS2 3일차(3) Action  (0) 2024.09.23
ROS2 3일차(1) Service  (0) 2024.09.23
ROS2 2일차(5) tf2  (0) 2024.09.20
ROS2 2일차(4) topic 프로그래밍  (0) 2024.09.20