ROS 2 Services
Servers and clients
Services
services are:
- Strongly-typed
Create a Custom Service defintion
create .srv file
geometry_msgs/Point origin
geometry_msgs/Point goal
---
float64 distance
Add dependencies and append .srv to rosidl_generate_interfaces
...
find_package(rosidl_default_generators REQUIRED)
find_package(geometry_msgs REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/Counter.msg"
"srv/EuclideanDistance.srv"
DEPENDENCIES geometry_msgs
)
... Add any dependencies to package.xml
<build_depend>geometry_msgs</build_depend> Minimal Server
import math
import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from rclpy.task import sys
from demo_interfaces.srv import EuclideanDistance
class ServiceNode(Node):
def __init__(self):
super().__init__("service_node")
self.__service = self.create_service(EuclideanDistance, "EuclideanDistance", self.compute_distance)
def compute_distance(self, request, response):
self.get_logger().info(f"Got a request: {request}")
# simple calculation euclidean distance
dx = request.goal.x - request.origin.x
dy = request.goal.y - request.origin.y
dz = request.goal.z - request.origin.z
distance = math.sqrt(dx**2 + dy**2 + dz**2)
response.distance = distance
return response
def main(args=None):
rclpy.init(args=args)
node = ServiceNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
except ExternalShutdownException:
sys.exit(1)
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()
Minimal Client
import sys
import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from demo_interfaces.srv import EuclideanDistance
from rclpy.task import Future
class ClientAsync(Node):
def __init__(self):
super().__init__("service_client")
self.client = self.create_client(EuclideanDistance, 'EuclideanDistance')
while not self.client.wait_for_service(timeout_sec=2.0):
self.get_logger().info('service not available, waiting again...')
self.__request = EuclideanDistance.Request()
def send_request(self) -> Future:
# some testing values
self.__request.origin.x = 0.0
self.__request.origin.y = 0.0
self.__request.origin.z = 0.0
self.__request.goal.x = 2.0
self.__request.goal.y = 2.0
self.__request.goal.z = 0.0
return self.client.call_async(self.__request)
def main(args=None):
rclpy.init(args=args)
node = ClientAsync()
try:
future = node.send_request()
rclpy.spin_until_future_complete(node, future)
response = future.result()
node.get_logger().info(f"Euclidean distance: {response.distance:.4f} m")
except KeyboardInterrupt:
pass
except ExternalShutdownException:
sys.exit(1)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()