Quality of Service (QoS)
A collection of my personal notes.
QoS profiles can be specified for publishers, subscriptions, service servers, and clients.
You combine QoS policies to make QoS profile
- History
- Keep last: only store up to N samples, configurable via the queue depth option.
- Keep all: store all samples, subject to the configured resource limits of the underlying middleware.
- Depth
- Queue size: only honored if the “history” policy was set to “keep last”.
- Reliability
- Best effort: attempt to deliver samples, but may lose them if the network is not robust.
- Reliable: guarantee that samples are delivered, may retry multiple times.
- Durability
- Transient local: the publisher becomes responsible for persisting samples for “late-joining” subscriptions.
- Volatile: no attempt is made to persist samples.
- Deadline
- Duration: the expected maximum amount of time between subsequent messages being published to a topic
- Lifespan
- Duration: the maximum amount of time between the publishing and the reception of a message without the message being considered stale or expired (expired messages are silently dropped and are effectively never received).
- Liveliness
- Automatic: the system will consider all of the node’s publishers to be alive for another “lease duration” when any one of its publishers has published a message.
- Manual by topic: the system will consider the publisher to be alive for another “lease duration” if it manually asserts that it is still alive (via a call to the publisher API).
- Lease Duration
- Duration: the maximum period of time a publisher has to indicate that it is alive before the system considers it to have lost liveliness (losing liveliness could be an indication of a failure).
Pre-defined QoS Profiles
import rclpy
from rclpy.node import Node
from rclpy.qos import (
QoSProfile,
QoSHistoryPolicy,
QoSReliabilityPolicy,
QoSDurabilityPolicy,
qos_profile_sensor_data
)
from demo_interfaces.msg import Counter
class PublisherNode(Node):
def __init__(self, node_name: str = "publisher") -> None:
super().__init__(node_name)
# the default QoS
# qos = qos_profile_sensor_data
# sensor-like QoS
qos = QoSProfile(
history=QoSHistoryPolicy.KEEP_LAST,
depth=10,
reliability=QoSReliabilityPolicy.BEST_EFFORT,
durability=QoSDurabilityPolicy.VOLATILE
)
self.__pub = self.create_publisher(Counter, "/counter", qos)
self.create_timer(0.1, self.pub_callback)
def pub_callback(self):
self.__pub.publish(Counter())
def main(args=None) -> None:
rclpy.init(args=args)
node = PublisherNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
rclpy.try_shutdown()
rclpy.shutdown()
if __name__ == "__main__":
main()
Subscriber QoS demo:
import rclpy
from rclpy.node import Node
from rclpy.qos import (
QoSProfile,
QoSHistoryPolicy,
QoSReliabilityPolicy,
QoSDurabilityPolicy,
)
from demo_interfaces.msg import Counter
class SubscriberNode(Node):
def __init__(self, node_name: str = "subscriber") -> None:
super().__init__(node_name)
qos = QoSProfile(
history=QoSHistoryPolicy.KEEP_LAST,
depth=10,
reliability=QoSReliabilityPolicy.RELIABLE,
durability=QoSDurabilityPolicy.VOLATILE
)
# creating the entity
self.__sub = self.create_subscription(Counter, "/counter", self.counter_callback, qos)
def counter_callback(self, msg):
self.get_logger().info(f"Got: {msg.count}")
def main(args=None) -> None:
rclpy.init(args=args)
node = SubscriberNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
rclpy.shutdown()
if __name__ == "__main__":
main()
Transient Local
import rclpy
from rclpy.node import Node
from rclpy.qos import (
QoSProfile,
QoSHistoryPolicy,
QoSReliabilityPolicy,
QoSDurabilityPolicy
)
from demo_interfaces.msg import Counter
class LatchedPub(Node):
def __init__(self):
super().__init__("latched_pub")
qos = QoSProfile(
history=QoSHistoryPolicy.KEEP_LAST,
depth=1,
reliability=QoSReliabilityPolicy.RELIABLE,
durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
)
self.pub = self.create_publisher(Counter, "/counter", qos)
# Publish once, then keep spinning so late-joiners can get the cached sample.
msg = Counter()
msg.count = 0
self.pub.publish(msg)
self.get_logger().info("Published counter value once; staying alive for late subscribers.")
self.timer = self.create_timer(1.0, lambda: None)
def main(args=None):
rclpy.init(args=args)
node = LatchedPub()
try:
rclpy.spin(node)
finally:
rclpy.shutdown()
if __name__ == "__main__":
main()
Compatibilities
Compatibility of reliability QoS policies:
| Publisher | Subscription | Compatible |
|---|---|---|
| Best effort | Best effort | Yes |
| Best effort | Reliable | No |
| Reliable | Best effort | Yes |
| Reliable | Reliable | Yes |
QoS Events
Keep this for deadline…