Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qos configurability #635

Merged
merged 29 commits into from
Nov 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
fdad6e3
Add QoSOverridingOptions
ivanpauno Oct 28, 2020
3300d10
Fix Node.get_parameters_by_prefix() type annotation
ivanpauno Oct 28, 2020
67ac124
Add helper classmethod for using default policies
ivanpauno Oct 28, 2020
80781c9
Add test cases
ivanpauno Oct 28, 2020
5fbc30c
Please linters + improve test setup and teardown
ivanpauno Oct 28, 2020
5e6235b
Declare qos overrides parameters in create_publisher/create_subscript…
ivanpauno Oct 28, 2020
b18d0ba
qos -> QoS
ivanpauno Oct 30, 2020
ef1e7bb
qos -> QoS
ivanpauno Oct 30, 2020
fa434f9
qos -> QoS
ivanpauno Oct 30, 2020
2e5297f
qos -> QoS
ivanpauno Oct 30, 2020
9b06ffa
qos -> QoS
ivanpauno Oct 30, 2020
b34a04c
style
ivanpauno Oct 30, 2020
51bddf4
qos -> QoS
ivanpauno Oct 30, 2020
fad4467
qos -> QoS
ivanpauno Oct 30, 2020
85a8cb7
grammar
ivanpauno Oct 30, 2020
589d2a2
qos -> QoS
ivanpauno Oct 30, 2020
0c5e7ce
bug
ivanpauno Oct 30, 2020
17c834a
qos -> QoS
ivanpauno Oct 30, 2020
50700ac
Fix _get_qos_policy_parameter type annotation
ivanpauno Oct 30, 2020
523dd59
qos -> QoS
ivanpauno Oct 30, 2020
812d44f
_declare_qos_parameteres -> _declare_qos_parameters
ivanpauno Oct 30, 2020
c4f109b
Fix after rebasing
ivanpauno Oct 30, 2020
751a601
Handle exception gracefully
ivanpauno Nov 2, 2020
59951a3
Suppress exception context
ivanpauno Nov 3, 2020
9fce94b
flake8
ivanpauno Nov 3, 2020
f8ba331
Add test to CMakeLists
ivanpauno Nov 5, 2020
0446d24
Use declare_or_get logic
ivanpauno Nov 5, 2020
a9fc10f
Improve testing
ivanpauno Nov 5, 2020
6b6f2c2
Add error message in validation callback result
ivanpauno Nov 9, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rclpy/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ if(BUILD_TESTING)
test/test_publisher.py
test/test_qos.py
test/test_qos_event.py
test/test_qos_overriding_options.py
test/test_rate.py
test/test_serialization.py
test/test_subscription.py
Expand Down
60 changes: 50 additions & 10 deletions rclpy/rclpy/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from typing import Iterator
from typing import List
from typing import Optional
from typing import Sequence
from typing import Tuple
from typing import TypeVar
from typing import Union
Expand All @@ -43,6 +44,7 @@
from rclpy.constants import S_TO_NS
from rclpy.context import Context
from rclpy.exceptions import InvalidParameterValueException
from rclpy.exceptions import InvalidTopicNameException
from rclpy.exceptions import NotInitializedException
from rclpy.exceptions import ParameterAlreadyDeclaredException
from rclpy.exceptions import ParameterImmutableException
Expand All @@ -62,6 +64,8 @@
from rclpy.qos import QoSProfile
from rclpy.qos_event import PublisherEventCallbacks
from rclpy.qos_event import SubscriptionEventCallbacks
from rclpy.qos_overriding_options import _declare_qos_parameters
from rclpy.qos_overriding_options import QoSOverridingOptions
from rclpy.service import Service
from rclpy.subscription import Subscription
from rclpy.time_source import TimeSource
Expand Down Expand Up @@ -530,7 +534,10 @@ def get_parameter_or(

return self._parameters.get(name, alternative_value)

def get_parameters_by_prefix(self, prefix: str) -> List[Parameter]:
def get_parameters_by_prefix(self, prefix: str) -> Dict[str, Optional[Union[
bool, int, float, str, bytes,
Sequence[bool], Sequence[int], Sequence[float], Sequence[str]
]]]:
"""
Get parameters that have a given prefix in their names as a dictionary.

Expand All @@ -547,16 +554,14 @@ def get_parameters_by_prefix(self, prefix: str) -> List[Parameter]:
:param prefix: The prefix of the parameters to get.
:return: Dict of parameters with the given prefix.
"""
parameters_with_prefix = {}
if prefix:
prefix = prefix + PARAMETER_SEPARATOR_STRING
prefix_len = len(prefix)
for parameter_name in self._parameters:
if parameter_name.startswith(prefix):
parameters_with_prefix.update(
{parameter_name[prefix_len:]: self._parameters.get(parameter_name)})

return parameters_with_prefix
return {
param_name[prefix_len:]: param_value
for param_name, param_value in self._parameters.items()
if param_name.startswith(prefix)
}

def set_parameters(self, parameter_list: List[Parameter]) -> List[SetParametersResult]:
"""
Expand Down Expand Up @@ -1125,6 +1130,7 @@ def create_publisher(
*,
callback_group: Optional[CallbackGroup] = None,
event_callbacks: Optional[PublisherEventCallbacks] = None,
qos_overriding_options: Optional[QoSOverridingOptions] = None,
) -> Publisher:
"""
Create a new publisher.
Expand All @@ -1144,9 +1150,26 @@ def create_publisher(

callback_group = callback_group or self.default_callback_group

failed = False
try:
final_topic = self.resolve_topic_name(topic)
except RuntimeError:
# if it's name validation error, raise a more appropriate exception.
try:
self._validate_topic_or_service_name(topic)
except InvalidTopicNameException as ex:
raise ex from None
# else reraise the previous exception
raise

if qos_overriding_options is None:
qos_overriding_options = QoSOverridingOptions([])
_declare_qos_parameters(
Publisher, self, final_topic, qos_profile, qos_overriding_options)

# this line imports the typesupport for the message module if not already done
check_for_type_support(msg_type)
failed = False
check_for_type_support(msg_type)
try:
with self.handle as node_capsule:
publisher_capsule = _rclpy.rclpy_create_publisher(
Expand Down Expand Up @@ -1182,6 +1205,7 @@ def create_subscription(
*,
callback_group: Optional[CallbackGroup] = None,
event_callbacks: Optional[SubscriptionEventCallbacks] = None,
qos_overriding_options: Optional[QoSOverridingOptions] = None,
raw: bool = False
) -> Subscription:
"""
Expand All @@ -1205,9 +1229,25 @@ def create_subscription(

callback_group = callback_group or self.default_callback_group

try:
final_topic = self.resolve_topic_name(topic)
except RuntimeError:
# if it's name validation error, raise a more appropriate exception.
try:
self._validate_topic_or_service_name(topic)
except InvalidTopicNameException as ex:
raise ex from None
# else reraise the previous exception
raise

if qos_overriding_options is None:
qos_overriding_options = QoSOverridingOptions([])
_declare_qos_parameters(
Subscription, self, final_topic, qos_profile, qos_overriding_options)

# this line imports the typesupport for the message module if not already done
check_for_type_support(msg_type)
failed = False
check_for_type_support(msg_type)
try:
with self.handle as capsule:
subscription_capsule = _rclpy.rclpy_create_subscription(
Expand Down
190 changes: 190 additions & 0 deletions rclpy/rclpy/qos_overriding_options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
# Copyright 2020 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Callable
from typing import Iterable
from typing import Optional
from typing import Text
from typing import Type
from typing import TYPE_CHECKING
from typing import Union

from rcl_interfaces.msg import ParameterDescriptor
from rcl_interfaces.msg import SetParametersResult

import rclpy
from rclpy.duration import Duration
from rclpy.exceptions import ParameterAlreadyDeclaredException
from rclpy.parameter import Parameter
from rclpy.publisher import Publisher
from rclpy.qos import QoSPolicyKind
from rclpy.qos import QoSProfile
from rclpy.subscription import Subscription

if TYPE_CHECKING:
from rclpy.node import Node


class InvalidQosOverridesError(Exception):
pass


# Return type of qos validation callbacks
QosCallbackResult = SetParametersResult
# Qos callback type annotation
QosCallbackType = Callable[[QoSProfile], QosCallbackResult]


class QoSOverridingOptions:
"""Options to customize QoS parameter overrides."""

def __init__(
self,
policy_kinds: Iterable[QoSPolicyKind],
*,
callback: Optional[QosCallbackType] = None,
entity_id: Optional[Text] = None
):
"""
Construct a QoSOverridingOptions object.

:param policy_kinds: QoS kinds that will have a declared parameter.
:param callback: Callback that will be used to validate the QoS profile
after the paramter overrides get applied.
:param entity_id: Optional identifier, to disambiguate in the case that different QoS
policies for the same topic are desired.
"""
self._policy_kinds = policy_kinds
self._callback = callback
self._entity_id = entity_id

@property
def policy_kinds(self) -> Iterable[QoSPolicyKind]:
"""Get QoS policy kinds that will have a parameter override."""
return self._policy_kinds

@property
def callback(self) -> Optional[QosCallbackType]:
"""Get the validation callback."""
return self._callback

@property
def entity_id(self) -> Optional[Text]:
"""Get the optional entity ID."""
return self._entity_id

@classmethod
def with_default_policies(
cls, *,
callback: Optional[QosCallbackType] = None,
entity_id: Optional[Text] = None
) -> 'QoSOverridingOptions':
return cls(
policy_kinds=(QoSPolicyKind.HISTORY, QoSPolicyKind.DEPTH, QoSPolicyKind.RELIABILITY),
callback=callback,
entity_id=entity_id,
)


def _declare_qos_parameters(
entity_type: Union[Type[Publisher], Type[Subscription]],
node: 'Node',
topic_name: Text,
qos: QoSProfile,
options: QoSOverridingOptions
) -> QoSProfile:
"""
Declare QoS parameters for a Publisher or a Subscription.

:param entity_type: Either `rclpy.node.Publisher` or `rclpy.node.Subscription`.
:param node: Node used to declare the parameters.
:param topic_name: Topic name of the entity being created.
:param qos: Default QoS settings of the entity being created, that will be overridden
with the user provided QoS parameter overrides.
:param options: Options that indicates which parameters are going to be declared.
"""
if not issubclass(entity_type, (Publisher, Subscription)):
raise TypeError('Argument `entity_type` should be a subclass of Publisher or Subscription')
entity_type_str = 'publisher' if issubclass(entity_type, Publisher) else 'subscription'
id_suffix = '' if options.entity_id is None else f'_{options.entity_id}'
name = f'qos_overrides.{topic_name}.{entity_type_str}{id_suffix}.' '{}'
description = '{}' f' for {entity_type_str} `{topic_name}` with id `{options.entity_id}`'
allowed_policies = _get_allowed_policies(entity_type)
for policy in options.policy_kinds:
if policy not in allowed_policies:
continue
policy_name = policy.name.lower()
descriptor = ParameterDescriptor()
descriptor.description = description.format(policy_name)
descriptor.read_only = True
try:
param = node.declare_parameter(
name.format(policy_name),
_get_qos_policy_parameter(qos, policy),
descriptor)
except ParameterAlreadyDeclaredException:
param = node.get_parameter(name.format(policy_name))
_override_qos_policy_with_param(qos, policy, param)
if options.callback is not None:
result = options.callback(qos)
if not result.successful:
raise InvalidQosOverridesError(
f"{description.format('Provided QoS overrides')}, are not valid: {result.reason}")


def _get_allowed_policies(entity_type: Union[Type[Publisher], Type[Subscription]]):
allowed_policies = list(QoSPolicyKind.__members__.values())
if issubclass(entity_type, Subscription):
allowed_policies.remove(QoSPolicyKind.LIFESPAN)
return allowed_policies


def _get_qos_policy_parameter(qos: QoSProfile, policy: QoSPolicyKind) -> Union[str, int, bool]:
value = getattr(qos, policy.name.lower())
if policy in (
QoSPolicyKind.LIVELINESS, QoSPolicyKind.RELIABILITY,
QoSPolicyKind.HISTORY, QoSPolicyKind.DURABILITY
ivanpauno marked this conversation as resolved.
Show resolved Hide resolved
):
value = value.name.lower()
if value == 'unknown':
raise ValueError('User provided QoS profile is invalid')
if policy in (
QoSPolicyKind.LIFESPAN, QoSPolicyKind.DEADLINE, QoSPolicyKind.LIVELINESS_LEASE_DURATION
):
value = value.nanoseconds()
return value


def _override_qos_policy_with_param(qos: QoSProfile, policy: QoSPolicyKind, param: Parameter):
value = param.value
policy_name = policy.name.lower()
if policy in (
QoSPolicyKind.LIVELINESS, QoSPolicyKind.RELIABILITY,
QoSPolicyKind.HISTORY, QoSPolicyKind.DURABILITY
):
def capitalize_first_letter(x):
return x[0].upper() + x[1:]
# e.g. `policy=QosPolicyKind.LIVELINESS` -> `policy_enum_class=rclpy.qos.LivelinessPolicy`
policy_enum_class = getattr(
rclpy.qos, f'{capitalize_first_letter(policy_name)}Policy')
try:
value = policy_enum_class[value.upper()]
except KeyError:
raise RuntimeError(
f'Unexpected QoS override for policy `{policy.name.lower()}`: `{value}`')
if policy in (
QoSPolicyKind.LIFESPAN, QoSPolicyKind.DEADLINE, QoSPolicyKind.LIVELINESS_LEASE_DURATION
):
value = Duration(nanoseconds=value)
setattr(qos, policy.name.lower(), value)
Loading