Skip to content

Commit

Permalink
Add option in ros2 topic pub to wait for N matching subscriptions, us…
Browse files Browse the repository at this point in the history
…e N=1 by default when combined with --times (#642)

Signed-off-by: Ivan Santiago Paunovic <[email protected]>
  • Loading branch information
ivanpauno authored Jun 4, 2021
1 parent 72cd9fb commit 7a6f3b8
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 6 deletions.
19 changes: 17 additions & 2 deletions ros2topic/ros2topic/verb/pub.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ def add_arguments(self, parser, cli_name):
group.add_argument(
'-t', '--times', type=nonnegative_int, default=0,
help='Publish this number of times and then exit')
parser.add_argument(
'-w', '--wait-matching-subscriptions', type=nonnegative_int, default=None,
help=(
'Wait until finding the specified number of matching subscriptions. '
'Defaults to 1 when using "-1"/"--once"/"--times", otherwise defaults to 0.'))
parser.add_argument(
'--keep-alive', metavar='N', type=positive_float, default=0.1,
help='Keep publishing node alive for N seconds after the last msg '
Expand Down Expand Up @@ -146,6 +151,8 @@ def main(args):
1. / args.rate,
args.print,
times,
args.wait_matching_subscriptions
if args.wait_matching_subscriptions is not None else int(times != 0),
qos_profile,
args.keep_alive)

Expand All @@ -158,6 +165,7 @@ def publisher(
period: float,
print_nth: int,
times: int,
wait_matching_subscriptions: int,
qos_profile: QoSProfile,
keep_alive: float,
) -> Optional[str]:
Expand All @@ -172,6 +180,15 @@ def publisher(

pub = node.create_publisher(msg_module, topic_name, qos_profile)

times_since_last_log = 0
while pub.get_subscription_count() < wait_matching_subscriptions:
# Print a message reporting we're waiting each 1s, check condition each 100ms.
if not times_since_last_log:
print(
f'Waiting for at least {wait_matching_subscriptions} matching subscription(s)...')
times_since_last_log = (times_since_last_log + 1) % 10
time.sleep(0.1)

msg = msg_module()
try:
set_message_fields(msg, values_dictionary)
Expand All @@ -188,8 +205,6 @@ def timer_callback():
print('publishing #%d: %r\n' % (count, msg))
pub.publish(msg)

# give some time for discovery process
time.sleep(0.1)
timer_callback()
if times != 1:
timer = node.create_timer(period, timer_callback)
Expand Down
51 changes: 48 additions & 3 deletions ros2topic/test/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ def test_topic_pub(self):
'publisher: beginning loop',
"publishing #1: std_msgs.msg.String(data='foo')",
''
], strict=True
]
), timeout=10)
assert self.listener_node.wait_for_output(functools.partial(
launch_testing.tools.expect_output, expected_lines=[
Expand All @@ -686,7 +686,7 @@ def test_topic_pub_once(self):
'publisher: beginning loop',
"publishing #1: std_msgs.msg.String(data='bar')",
''
], strict=True
]
), timeout=10)
assert topic_command.wait_for_shutdown(timeout=10)
assert self.listener_node.wait_for_output(functools.partial(
Expand All @@ -696,6 +696,51 @@ def test_topic_pub_once(self):
), timeout=10)
assert topic_command.exit_code == launch_testing.asserts.EXIT_OK

def test_topic_pub_once_matching_two_listeners(
self, launch_service, proc_info, proc_output, path_to_listener_node_script, additional_env
):
second_listener_node_action = Node(
executable=sys.executable,
arguments=[path_to_listener_node_script],
remappings=[('chatter', 'chit_chatter')],
additional_env=additional_env,
name='second_listener',
)
with launch_testing.tools.launch_process(
launch_service, second_listener_node_action, proc_info, proc_output
) as second_listener_node, \
self.launch_topic_command(
arguments=[
'pub', '--once',
'--keep-alive', '3', # seconds
'-w', '2',
'--qos-durability', 'transient_local',
'--qos-reliability', 'reliable',
'/chit_chatter',
'std_msgs/msg/String',
'{data: bar}'
]
) as topic_command:
assert topic_command.wait_for_output(functools.partial(
launch_testing.tools.expect_output, expected_lines=[
'publisher: beginning loop',
"publishing #1: std_msgs.msg.String(data='bar')",
''
]
), timeout=10)
assert topic_command.wait_for_shutdown(timeout=10)
assert self.listener_node.wait_for_output(functools.partial(
launch_testing.tools.expect_output, expected_lines=[
re.compile(r'\[INFO\] \[\d+\.\d*\] \[listener\]: I heard: \[bar\]')
]
), timeout=10)
assert second_listener_node.wait_for_output(functools.partial(
launch_testing.tools.expect_output, expected_lines=[
re.compile(r'\[INFO\] \[\d+\.\d*\] \[second_listener\]: I heard: \[bar\]')
]
), timeout=10)
assert topic_command.exit_code == launch_testing.asserts.EXIT_OK

def test_topic_pub_print_every_two(self):
with self.launch_topic_command(
arguments=[
Expand All @@ -716,7 +761,7 @@ def test_topic_pub_print_every_two(self):
'',
"publishing #4: std_msgs.msg.String(data='fizz')",
''
], strict=True
]
), timeout=10), 'Output does not match: ' + topic_command.output
assert self.listener_node.wait_for_output(functools.partial(
launch_testing.tools.expect_output, expected_lines=[
Expand Down
2 changes: 1 addition & 1 deletion ros2topic/test/test_echo_pub.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def message_callback(msg):
@launch_testing.markers.retry_on_failure(times=5)
def test_pub_times(self, launch_service, proc_info, proc_output):
command_action = ExecuteProcess(
cmd=(['ros2', 'topic', 'pub', '-t', '5', '/clitest/topic/pub_times',
cmd=(['ros2', 'topic', 'pub', '-t', '5', '-w', '0', '/clitest/topic/pub_times',
'std_msgs/String', 'data: hello']),
additional_env={
'PYTHONUNBUFFERED': '1'
Expand Down

0 comments on commit 7a6f3b8

Please sign in to comment.