ROS2 3일차(2) Service 프로그래밍
2024. 9. 23. 11:21ㆍROS2/기초
Service는 Server와 Client로 나뉨
Client를 프로그래밍 하는 방법
지난 시간에 실행시켰던, 빨간 로봇을 등장시키는 client
py_service_pkg => spawn_model.py
# !/usr/bin/env/ python3
#
# Copyright 2021 Seoul Business Agency Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Referenced from Below Link
# https://discourse.ros.org/t/spawning-a-robot-entity-using-a-node-with-gazebo-and-ros-2/9985
import os
from ament_index_python.packages import get_package_share_directory
from gazebo_msgs.srv import SpawnEntity
import rclpy
from rclpy.node import Node
class SpawnRobot(Node):
def __init__(self):
super().__init__('gazebo_model_spawner')
self.client = self.create_client(SpawnEntity, '/spawn_entity')
while not self.client.wait_for_service(timeout_sec=1.0):
self.get_logger().error('service not available, waiting again...')
# Get urdf path
self.urdf_file_path = os.path.join(
get_package_share_directory('gcamp_gazebo'),
'urdf',
'skidbot2.urdf',
)
self.req = SpawnEntity.Request()
def send_req(self):
self.req.name = 'skidbot2'
self.req.xml = open(self.urdf_file_path, 'r').read()
self.req.robot_namespace = 'skidbot2'
self.req.initial_pose.position.x = 1.0
self.req.initial_pose.position.y = 1.0
self.req.initial_pose.position.z = 0.3
self.get_logger().debug('==== Sending service request to `/spawn_entity` ====')
self.future = self.client.call_async(self.req)
return self.future
def main(args=None):
rclpy.init(args=args)
robot_spawn_node = SpawnRobot()
future = robot_spawn_node.send_req()
rclpy.spin_until_future_complete(robot_spawn_node, future)
if future.done():
try:
response = future.result()
except Exception:
raise RuntimeError(
'exception while calling service: %r' % future.exception()
)
else:
robot_spawn_node.get_logger().info('==== Service Call Done ====')
robot_spawn_node.get_logger().info(f'Status_message : {response.status_message}')
finally:
robot_spawn_node.get_logger().warn('==== Shutting down node. ====')
robot_spawn_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
import 부분
#!/usr/bin/env/ python3
import os
# 패키지 폴더 내부에 위치한 파일에 접근하기 위해 사용됩니다.
from ament_index_python.packages import get_package_share_directory
# 이 client는 SpawnEntity라는 srv 타입을 사용한다고 살펴본 바 있지요?
from gazebo_msgs.srv import SpawnEntity
import rclpy
from rclpy.node import Node
class 내부
class SpawnRobot(Node):
def __init__(self):
super().__init__("gazebo_model_spawner")
# Service Client는 다음과 같이 생성합니다. 하단에 보충 설명을 더하겠습니다.
self.client = self.create_client(SpawnEntity, "/spawn_entity")
# Server가 없으면 아무리 request를 해도 대답이 없을 것입니다.
# 이를 방지하기 위해 최대로 Server를 기다릴 시간을 지정합니다.
while not self.client.wait_for_service(timeout_sec=1.0):
self.get_logger().error("service not available, waiting again...")
# 등장시킬 로봇의 외형을 담은 파일에 접근합니다.
# Get urdf path
self.urdf_file_path = os.path.join(
get_package_share_directory("gcamp_gazebo"),
"urdf",
"skidbot2.urdf",
)
# request를 선언합니다.
self.req = SpawnEntity.Request()
get_package_share_directory를 통해 패키지의 파일시스템에 접근할 수 있게 됩니다.
위 코드의 경우 gcamp_gazebo/urdf 폴더에 접근하고 있음
create_client
- SpawnEntity : 사용할 srv 타입
- /spawn_entity : request를 보낼 Service 이름. 이 이름이 server가 지정한 service 이름과 일치해야만 제대로 request를 보낼 수 있다.
실질적으로 request를 보내는 부분
def send_req(self):
# request의 각 매개변수를 올바르게 채워줍니다.
# 앞서 선언한 urdf 파일도 전달하고, 로봇을 등장시킬 위치도 지정합니다.
self.req.name = "skidbot2"
self.req.xml = open(self.urdf_file_path, "r").read()
self.req.robot_namespace = "skidbot2"
self.req.initial_pose.position.x = 1.0
self.req.initial_pose.position.y = 1.0
self.req.initial_pose.position.z = 0.3
# request call 이후 상태를 반환하는 future를 예약받습니다.
print("==== Sending service request to `/spawn_entity` ====")
# future란, 해당 작업이 완료될 것임의 약속입니다.
self.future = self.client.call_async(self.req)
return self.future
future
특정 작업에 대한 약속을 받아내는 것
future = robot_spawn_node.send_req()
rclpy.spin_until_future_complete(robot_spawn_node, future)
- robot_spawn_node.send_req()는 반드시 끝날 것을 보장(=약속=promise)하니, 그동안 robot_spawn_node를 spin 시키고 있을게 라는 뜻
main 분석
def main(args=None):
rclpy.init(args=args)
robot_spawn_node = SpawnRobot()
# robot_spawn_node으로부터 request call이 완료될 것임을 약속받았습니다.
future = robot_spawn_node.send_req()
# 이 약속을 지킬때까지 node는 spin 상태에 접어듭니다.
rclpy.spin_until_future_complete(robot_spawn_node, future)
# request call, response가 끝이 났습니다.
# 이때, future는 response를 담고 있습니다.
if future.done():
# result가 제대로 전달되는지의 여부에 따라 예외 처리를 하고 있습니다.
try:
response = future.result()
except Exception as e:
raise RuntimeError(
"exception while calling service: %r" % future.exception()
)
else:
# 정상적으로 response가 도착했다는 로그를 출력합니다.
robot_spawn_node.get_logger().info('==== Service Call Done ====')
robot_spawn_node.get_logger().info(f'Status_message : {response.status_message}')
finally:
robot_spawn_node.get_logger().warn("==== Shutting down node. ====")
robot_spawn_node.destroy_node()
rclpy.shutdown()
- client는 server에게 request를 보내고 response를 기다린다
Turning Server Client
# Terminal 1
ros2 launch gcamp_gazebo gcamp_world.launch.py
# Terminal 2
ros2 run py_service_pkg robot_turning_server
# Terminal 3 Service Client Call
ros2 run py_service_pkg robot_turning_client
> Type turning time duration: 5
> Type turning linear velocity: 0.5
> Type turning angular velocity: 1.0
[INFO] [1727056966.078971775] [robot_turn_client]: linear_x : 0.5 / angular_z : 1.0
[INFO] [1727056966.079202082] [robot_turn_client]: Request Sended
[INFO] [1727056971.001124718] [robot_turn_client]: ==== Service Call Done ====
[INFO] [1727056971.001350904] [robot_turn_client]: Result Message : Success
[WARN] [1727056971.001599853] [robot_turn_client]: ==== Shutting down node ====
- 지정한 시간 동안
- 정방향 선속도와
- 정방향 각속도(z축 기준 반시계방향)를 갖고 로봇을 움직이게 된다.
Service Server 작성
robot_turning_srv.py
# !/usr/bin/env/ python3
#
# Copyright 2021 Seoul Business Agency Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from custom_interfaces.srv import TurningControl
from geometry_msgs.msg import Twist
import rclpy
from rclpy.node import Node
# uint32 time_duration
# float64 angular_vel_z
# float64 linear_vel_x
# ---
# bool success
class RobotTurnServer(Node):
def __init__(self):
super().__init__('robot_turn_server')
self.publisher = self.create_publisher(Twist, 'skidbot/cmd_vel', 10)
self.srv = self.create_service(
TurningControl, 'turn_robot', self.robot_turn_callback
)
self.twist_msg = Twist()
self.start_time = self.get_clock().now().to_msg().sec
self.get_logger().info('==== Robot Turning Server Started, Waiting for Request ====')
def move_robot(self, seconds=1, linear_x=0.0, angular_z=0.0):
self.twist_msg.linear.x = linear_x
self.twist_msg.angular.z = angular_z
clock_now = self.get_clock().now().to_msg().sec
self.get_logger().info('Robot Moves')
self.get_logger().info(f'Move Commands = linear_x : {linear_x} / angular_z : {angular_z}')
while (clock_now - self.start_time) < seconds:
clock_now = self.get_clock().now().to_msg().sec
self.publisher.publish(self.twist_msg)
def stop_robot(self):
self.twist_msg.linear.x = 0.0
self.twist_msg.angular.z = 0.0
self.publisher.publish(self.twist_msg)
self.get_logger().info('Robot Stop')
def robot_turn_callback(self, request, response):
self.start_time = self.get_clock().now().to_msg().sec
self.move_robot(
request.time_duration, request.linear_vel_x, request.angular_vel_z
)
self.stop_robot()
response.success = True
self.get_logger().info('Servie Process Done...')
return response
def main(args=None):
rclpy.init(args=args)
robot_turn_server = RobotTurnServer()
rclpy.spin(robot_turn_server)
robot_turn_server.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
내부 동작은 다음과 같다.
- Service를 생성하고
- request를 받아
- 그에 대응하는 작업을 수행한 뒤
- response를 되돌려준다.
service 생성
self.srv = self.create_service(
TurningControl, "turn_robot", self.robot_turn_callback
)
- 사용되는 srv 타입과
- service 이름,
- request가 들어올 때마다 실행될 callback을 매개변수로 받습니다.
robot_turn_callback
def robot_turn_callback(self, request, response):
# 현재 시간을 저장합니다. 이 시점으로부터 특정 시간만큼 로봇이 회전할 것입니다.
self.start_time = self.get_clock().now().to_msg().sec
# move_robot은 topic 시간에 살펴본 바와 동일합니다.
self.move_robot(
request.time_duration, request.linear_vel_x, request.angular_vel_z
)
self.stop_robot()
# response를 채워줍니다.
response.success = True
self.get_logger().info("Servie Process Done...")
return response
...
main
def main(args=None):
rclpy.init(args=args)
robot_turn_server = RobotTurnServer()
rclpy.spin(robot_turn_server)
robot_turn_server.destroy_node()
rclpy.shutdown()
main을 보면, spin으로 되어있기에 위 과정들은 무한해서 반복된다.
항시 대기하다가, 새로운 request를 처리하고, 기다리고, 처리하고 이렇게 반복하는 것
Topic을 통해 로봇을 움직인다.
...
self.publisher = self.create_publisher(Twist, "/skidbot/cmd_vel", 10)
...
def move_robot(self, seconds=1, linear_x=0.0, angular_z=0.0):
# request로 부터 받은 제어 신호를 topic msg에 채워넣습니다.
self.twist_msg.linear.x = linear_x
self.twist_msg.angular.z = angular_z
# 시간의 기준이 될 현 시간을 리셋합니다.
clock_now = self.get_clock().now().to_msg().sec
self.get_logger().info("Robot Moves")
print(f"Move Commands = linear_x : {linear_x} / angular_z : {angular_z}")
# 기준 시간으로부터 request받은 특정 시간 동안 publish를 진행합니다.
while (clock_now - self.start_time) < seconds:
clock_now = self.get_clock().now().to_msg().sec
self.publisher.publish(self.twist_msg)
def stop_robot(self):
# 로봇의 정지는 일전의 제어 신호를 모두 0으로 만들면 됩니다.
self.twist_msg.linear.x = 0.0
self.twist_msg.angular.z = 0.0
self.publisher.publish(self.twist_msg)
self.get_logger().info("Robot Stop")
주의해야 할 점
move_robot 을 처리하는 동안에는 다른 request를 포함하여 어떠한 개입도 받을 수 없다.
=> 이에 따라, move_robot 내에 검증되지 않은 동작이 위치하게 되면, 실제 로봇이 움직인다고 상상했을 때, 큰 사고로 이어질 수 있다.
'ROS2 > 기초' 카테고리의 다른 글
ROS2 3일차(4) Action 프로그래밍 (0) | 2024.09.23 |
---|---|
ROS2 3일차(3) Action (0) | 2024.09.23 |
ROS2 3일차(1) Service (0) | 2024.09.23 |
ROS2 2일차(5) tf2 (0) | 2024.09.20 |
ROS2 2일차(4) topic 프로그래밍 (0) | 2024.09.20 |