Quality of Service (QoS)

A collection of my personal notes.

Content Outline

    QoS profiles can be specified for publishers, subscriptions, service servers, and clients.

    You combine QoS policies to make QoS profile

    • History
      • Keep last: only store up to N samples, configurable via the queue depth option.
      • Keep all: store all samples, subject to the configured resource limits of the underlying middleware.
    • Depth
      • Queue size: only honored if the “history” policy was set to “keep last”.
    • Reliability
      • Best effort: attempt to deliver samples, but may lose them if the network is not robust.
      • Reliable: guarantee that samples are delivered, may retry multiple times.
    • Durability
      • Transient local: the publisher becomes responsible for persisting samples for “late-joining” subscriptions.
      • Volatile: no attempt is made to persist samples.
    • Deadline
      • Duration: the expected maximum amount of time between subsequent messages being published to a topic
    • Lifespan
      • Duration: the maximum amount of time between the publishing and the reception of a message without the message being considered stale or expired (expired messages are silently dropped and are effectively never received).
    • Liveliness
      • Automatic: the system will consider all of the node’s publishers to be alive for another “lease duration” when any one of its publishers has published a message.
      • Manual by topic: the system will consider the publisher to be alive for another “lease duration” if it manually asserts that it is still alive (via a call to the publisher API).
    • Lease Duration
      • Duration: the maximum period of time a publisher has to indicate that it is alive before the system considers it to have lost liveliness (losing liveliness could be an indication of a failure).

    Pre-defined QoS Profiles


    import rclpy
    from rclpy.node import Node
    from rclpy.qos import (
        QoSProfile, 
        QoSHistoryPolicy, 
        QoSReliabilityPolicy, 
        QoSDurabilityPolicy,
        qos_profile_sensor_data
    )
    from demo_interfaces.msg import Counter
    
    class PublisherNode(Node):
    
        def __init__(self, node_name: str = "publisher") -> None:
            super().__init__(node_name)
    
            # the default QoS
            # qos = qos_profile_sensor_data
    
            # sensor-like QoS
            qos = QoSProfile(
                history=QoSHistoryPolicy.KEEP_LAST,
                depth=10,
                reliability=QoSReliabilityPolicy.BEST_EFFORT,
                durability=QoSDurabilityPolicy.VOLATILE
            )
    
            self.__pub = self.create_publisher(Counter, "/counter", qos)
    
            self.create_timer(0.1, self.pub_callback)
    
        def pub_callback(self):
            self.__pub.publish(Counter())
    
    
    def main(args=None) -> None:
    
        rclpy.init(args=args)
    
        node = PublisherNode()
    
        try:
            rclpy.spin(node)
        except KeyboardInterrupt:
            pass
        finally:
            rclpy.try_shutdown()
    
        rclpy.shutdown()
    
    
    
    if __name__ == "__main__":
        main()
    

    Subscriber QoS demo:

    import rclpy
    from rclpy.node import Node
    from rclpy.qos import (
        QoSProfile, 
        QoSHistoryPolicy, 
        QoSReliabilityPolicy, 
        QoSDurabilityPolicy,
    )
    from demo_interfaces.msg import Counter
    
    class SubscriberNode(Node):
    
        def __init__(self, node_name: str = "subscriber") -> None:
            super().__init__(node_name)
    
            qos = QoSProfile(
                history=QoSHistoryPolicy.KEEP_LAST,
                depth=10,
                reliability=QoSReliabilityPolicy.RELIABLE,
                durability=QoSDurabilityPolicy.VOLATILE
            )
            # creating the entity
            self.__sub = self.create_subscription(Counter, "/counter", self.counter_callback, qos)
    
        def counter_callback(self, msg):
            self.get_logger().info(f"Got: {msg.count}")
    
    
    def main(args=None) -> None:
    
        rclpy.init(args=args)
    
        node = SubscriberNode()
    
        try:
            rclpy.spin(node)
        except KeyboardInterrupt:
            pass
        finally:
            rclpy.shutdown()
    
    
    if __name__ == "__main__":
        main()
    

    Transient Local


    import rclpy
    from rclpy.node import Node
    from rclpy.qos import (
        QoSProfile,
        QoSHistoryPolicy,
        QoSReliabilityPolicy,
        QoSDurabilityPolicy
    )
    from demo_interfaces.msg import Counter
    
    
    class LatchedPub(Node):
        def __init__(self):
            super().__init__("latched_pub")
    
            qos = QoSProfile(
                history=QoSHistoryPolicy.KEEP_LAST,
                depth=1,
                reliability=QoSReliabilityPolicy.RELIABLE,
                durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
            )
    
            self.pub = self.create_publisher(Counter, "/counter", qos)
    
            # Publish once, then keep spinning so late-joiners can get the cached sample.
            msg = Counter()
            msg.count = 0
            self.pub.publish(msg)
            self.get_logger().info("Published counter value once; staying alive for late subscribers.")
    
            self.timer = self.create_timer(1.0, lambda: None)
    
    
    def main(args=None):
    
        rclpy.init(args=args)
    
        node = LatchedPub()
    
        try:
            rclpy.spin(node)
        finally:
            rclpy.shutdown()
    
    
    if __name__ == "__main__":
        main()
    

    Compatibilities


    Compatibility of reliability QoS policies:

    PublisherSubscriptionCompatible
    Best effortBest effortYes
    Best effortReliableNo
    ReliableBest effortYes
    ReliableReliableYes

    QoS Events


    Keep this for deadline…

    External Resources