ROS 2 Services

Servers and clients

Content Outline

    Services


    services are:

    • Strongly-typed

    Create a Custom Service defintion


    create .srv file

    geometry_msgs/Point origin
    geometry_msgs/Point goal
    ---
    float64 distance
    

    Add dependencies and append .srv to rosidl_generate_interfaces

    ...
    
    find_package(rosidl_default_generators REQUIRED)
    find_package(geometry_msgs REQUIRED)
    
    rosidl_generate_interfaces(${PROJECT_NAME}
      "msg/Counter.msg"
      "srv/EuclideanDistance.srv"
      DEPENDENCIES geometry_msgs
    )
    
    ...

    Add any dependencies to package.xml

        <build_depend>geometry_msgs</build_depend>

    Minimal Server


    import math
    import rclpy
    from rclpy.executors import ExternalShutdownException
    from rclpy.node import Node
    from rclpy.task import sys
    from demo_interfaces.srv import EuclideanDistance
    
    class ServiceNode(Node):
    
        def __init__(self):
            super().__init__("service_node")
    
            self.__service = self.create_service(EuclideanDistance, "EuclideanDistance", self.compute_distance)
    
    
        def compute_distance(self, request, response):
            self.get_logger().info(f"Got a request: {request}")
            # simple calculation euclidean distance
            dx = request.goal.x - request.origin.x
            dy = request.goal.y - request.origin.y
            dz = request.goal.z - request.origin.z
    
            distance = math.sqrt(dx**2 + dy**2 + dz**2)
            response.distance = distance
    
            return response
    
    
    def main(args=None):
    
        rclpy.init(args=args)
    
        node = ServiceNode()
    
        try:
            rclpy.spin(node)
        except KeyboardInterrupt:
            pass
        except ExternalShutdownException:
            sys.exit(1)
        finally:
            node.destroy_node()
            rclpy.try_shutdown()
    
    
    if __name__ == "__main__":
        main()
    

    Minimal Client


    import sys
    import rclpy
    from rclpy.executors import ExternalShutdownException
    from rclpy.node import Node
    from demo_interfaces.srv import EuclideanDistance
    from rclpy.task import Future
    
    class ClientAsync(Node):
    
        def __init__(self):
            super().__init__("service_client")
    
            self.client = self.create_client(EuclideanDistance, 'EuclideanDistance')
            while not self.client.wait_for_service(timeout_sec=2.0):
                self.get_logger().info('service not available, waiting again...')
    
            self.__request = EuclideanDistance.Request()
    
        def send_request(self) -> Future:
            # some testing values
            self.__request.origin.x = 0.0
            self.__request.origin.y = 0.0
            self.__request.origin.z = 0.0
    
            self.__request.goal.x = 2.0
            self.__request.goal.y = 2.0
            self.__request.goal.z = 0.0
    
            return self.client.call_async(self.__request)
    
    
    def main(args=None):
    
        rclpy.init(args=args)
    
        node = ClientAsync()
    
        try:
            future = node.send_request()
    
            rclpy.spin_until_future_complete(node, future)
    
            response = future.result()
    
            node.get_logger().info(f"Euclidean distance: {response.distance:.4f} m")
        except KeyboardInterrupt:
            pass
        except ExternalShutdownException:
            sys.exit(1)
        finally:
            node.destroy_node()
            rclpy.shutdown()
    
    
    
    if __name__ == "__main__":
        main()
    

    External References