From b2720b3eaeeef748a265091fb28df0a71fc5ac44 Mon Sep 17 00:00:00 2001 From: Agustin Alba Chicar Date: Mon, 25 Apr 2022 16:00:45 +0200 Subject: [PATCH 01/10] Implements play until on top of play for. Signed-off-by: Agustin Alba Chicar --- rosbag2_interfaces/srv/Play.srv | 2 ++ .../include/rosbag2_transport/play_options.hpp | 9 +++++++++ rosbag2_transport/src/rosbag2_transport/player.cpp | 11 ++++++++++- 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/rosbag2_interfaces/srv/Play.srv b/rosbag2_interfaces/srv/Play.srv index 54edf7dd45..7675fd2ae4 100644 --- a/rosbag2_interfaces/srv/Play.srv +++ b/rosbag2_interfaces/srv/Play.srv @@ -2,6 +2,8 @@ builtin_interfaces/Time start_offset # See rosbag2_transport::PlayOptions::playback_duration builtin_interfaces/Duration playback_duration +# See rosbag2_transport::PlayOptions::playback_until +builtin_interfaces/Time playback_until --- # returns false when another playback execution is running, otherwise true bool success diff --git a/rosbag2_transport/include/rosbag2_transport/play_options.hpp b/rosbag2_transport/include/rosbag2_transport/play_options.hpp index 92225373cc..dfd22fd281 100644 --- a/rosbag2_transport/include/rosbag2_transport/play_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/play_options.hpp @@ -22,6 +22,7 @@ #include "keyboard_handler/keyboard_handler.hpp" #include "rclcpp/duration.hpp" +#include "rclcpp/time.hpp" #include "rclcpp/qos.hpp" namespace rosbag2_transport @@ -52,8 +53,16 @@ struct PlayOptions // Determines the maximum duration of the playback. Negative durations will make the playback to // not stop. Default configuration makes the player to not stop execution. + // When positive, the maximum of `playback_until` and the one that this attribute yields will be + // used to determine which one stops playback execution. rclcpp::Duration playback_duration = rclcpp::Duration(-1, 0); + // Determines the timestamp at which the playback will stop. Negative timestamps will make the + // playback to not stop. Default configuration makes the player to not stop execution. + // When positive, the maximum of the effective time that `playback_duration` yields and this + // attribute will be used to determine which one stops playback execution. + rcutils_time_point_value_t playback_until = -1; + // Start paused. bool start_paused = false; diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 3c986b92c5..3a7559f858 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -212,7 +212,15 @@ bool Player::play() "Invalid delay value: " << play_options_.delay.nanoseconds() << ". Delay is disabled."); } - RCLCPP_INFO_STREAM(get_logger(), "Playback duration value: " << play_until_time_); + rcutils_time_point_value_t play_until_time = -1; + if (play_options_.playback_duration >= rclcpp::Duration(0, 0) || + play_options_.playback_until >= rcutils_time_point_value_t{0}) + { + play_until_time = std::max( + starting_time_ + play_options_.playback_duration.nanoseconds(), + play_options_.playback_until); + } + RCLCPP_INFO_STREAM(get_logger(), "Playback duration value: " << play_until_time); try { do { @@ -781,6 +789,7 @@ void Player::create_control_services() { play_options_.start_offset = rclcpp::Time(request->start_offset).nanoseconds(); play_options_.playback_duration = rclcpp::Duration(request->playback_duration); + play_options_.playback_until = rclcpp::Time(request->playback_until).nanoseconds(); response->success = play(); }); srv_play_next_ = create_service( From d0c57d69beea48426824f69777b35e12bac84caf Mon Sep 17 00:00:00 2001 From: Agustin Alba Chicar Date: Mon, 25 Apr 2022 16:46:31 +0200 Subject: [PATCH 02/10] Adds python bindings Signed-off-by: Agustin Alba Chicar --- ros2bag/ros2bag/verb/play.py | 5 +++++ rosbag2_py/src/rosbag2_py/_transport.cpp | 14 ++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index 4932522a44..a5fbcbfa96 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -89,6 +89,10 @@ def add_arguments(self, parser, cli_name): # noqa: D102 '--playback_duration', type=float, default=-1.0, help='Playback duration, in seconds. Negative durations mark an infinite playback. ' 'Default is -1.0.') + parser.add_argument( + '--playback_until', type=check_positive_float, default=-1.0, + help='Playback until timestamp, in seconds. Negative stamps disable this feature. ' + 'Default is -1.0.') parser.add_argument( '--disable-keyboard-controls', action='store_true', help='disables keyboard controls for playback') @@ -155,6 +159,7 @@ def main(self, *, args): # noqa: D102 play_options.clock_publish_frequency = args.clock play_options.delay = args.delay play_options.playback_duration = args.playback_duration + play_options.playback_until = args.playback_until play_options.disable_keyboard_controls = args.disable_keyboard_controls play_options.start_paused = args.start_paused play_options.start_offset = args.start_offset diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index 00fb965834..77a8548239 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -95,6 +95,16 @@ struct OptionsWrapper : public T return RCUTILS_NS_TO_S(static_cast(this->start_offset)); } + void setPlaybackUntil(double playback_until) + { + this->playback_until = static_cast(RCUTILS_S_TO_NS(playback_until)); + } + + double getPlaybackUntil() const + { + return RCUTILS_NS_TO_S(static_cast(this->playback_until)); + } + void setTopicQoSProfileOverrides(const py::dict & overrides) { py_dict = overrides; @@ -288,6 +298,10 @@ PYBIND11_MODULE(_transport, m) { "start_offset", &PlayOptions::getStartOffset, &PlayOptions::setStartOffset) + .def_property( + "start_offset", + &PlayOptions::getPlaybackUntil, + &PlayOptions::setPlaybackUntil) .def_readwrite("wait_acked_timeout", &PlayOptions::wait_acked_timeout) .def_readwrite("disable_loan_message", &PlayOptions::disable_loan_message) ; From a5dc861b2c21417e07c5324b44919f81edc2b47e Mon Sep 17 00:00:00 2001 From: Agustin Alba Chicar Date: Mon, 25 Apr 2022 16:46:54 +0200 Subject: [PATCH 03/10] Adds test for play until. Signed-off-by: Agustin Alba Chicar --- rosbag2_transport/CMakeLists.txt | 8 +- .../rosbag2_transport/test_play_until.cpp | 362 ++++++++++++++++++ 2 files changed, 369 insertions(+), 1 deletion(-) create mode 100644 rosbag2_transport/test/rosbag2_transport/test_play_until.cpp diff --git a/rosbag2_transport/CMakeLists.txt b/rosbag2_transport/CMakeLists.txt index 3e1bbac729..5d2e7605ac 100644 --- a/rosbag2_transport/CMakeLists.txt +++ b/rosbag2_transport/CMakeLists.txt @@ -160,7 +160,13 @@ function(create_tests_for_rmw_implementation) LINK_LIBS rosbag2_transport AMENT_DEPS test_msgs rosbag2_test_common) - rosbag2_transport_add_gmock(test_burst + rosbag2_transport_add_gmock(test_play_until + test/rosbag2_transport/test_play_until.cpp + INCLUDE_DIRS $ + LINK_LIBS rosbag2_transport + AMENT_DEPS test_msgs rosbag2_test_common) + + rosbag2_transport_add_gmock(test_burst test/rosbag2_transport/test_burst.cpp INCLUDE_DIRS $ LINK_LIBS rosbag2_transport diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp new file mode 100644 index 0000000000..c1d5391920 --- /dev/null +++ b/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp @@ -0,0 +1,362 @@ +// Copyright 2022, Open Source Robotics Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License.Bosch Software Innovations GmbH. + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "rclcpp/rclcpp.hpp" + +#include "rosbag2_test_common/subscription_manager.hpp" + +#include "rosbag2_transport/player.hpp" +#include "rosbag2_transport/qos.hpp" + +#include "test_msgs/msg/arrays.hpp" +#include "test_msgs/msg/basic_types.hpp" +#include "test_msgs/message_fixtures.hpp" + +#include "rosbag2_play_test_fixture.hpp" +#include "rosbag2_transport_test_fixture.hpp" + +#include "mock_player.hpp" + +using namespace ::testing; // NOLINT +using namespace rosbag2_transport; // NOLINT +using namespace std::chrono_literals; // NOLINT +using namespace rosbag2_test_common; // NOLINT + +constexpr int kIntValue{32}; + +constexpr float kFloat1Value{40.}; +constexpr float kFloat2Value{2.}; +constexpr float kFloat3Value{0.}; + +constexpr bool kBool1Value{false}; +constexpr bool kBool2Value{true}; +constexpr bool kBool3Value{false}; + +#define EVAL_REPLAYED_PRIMITIVES(replayed_primitives) \ + EXPECT_THAT( \ + replayed_primitives, \ + Each(Pointee(Field(&test_msgs::msg::BasicTypes::int32_value, kIntValue)))) + +#define EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_float_array_primitive) \ + EXPECT_THAT( \ + replayed_float_array_primitive, \ + Each( \ + Pointee( \ + Field( \ + &test_msgs::msg::Arrays::float32_values, \ + ElementsAre(kFloat1Value, kFloat2Value, kFloat3Value))))) + +#define EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_bool_array_primitive) \ + EXPECT_THAT( \ + replayed_bool_array_primitive, \ + Each( \ + Pointee( \ + Field( \ + &test_msgs::msg::Arrays::bool_values, \ + ElementsAre(kBool1Value, kBool2Value, kBool3Value))))) + +class RosBag2PlayUntilTestFixture : public RosBag2PlayTestFixture +{ +public: + static constexpr const char * kTopic1Name_{"topic1"}; + static constexpr const char * kTopic2Name_{"topic2"}; + static constexpr const char * kTopic1_{"/topic1"}; + static constexpr const char * kTopic2_{"/topic2"}; + + std::vector get_topic_types() + { + return {{kTopic1Name_, "test_msgs/BasicTypes", "", ""}, + {kTopic2Name_, "test_msgs/Arrays", "", ""}}; + } + + std::vector> + get_serialized_messages() + { + auto primitive_message1 = get_messages_basic_types()[0]; + primitive_message1->int32_value = kIntValue; + + auto complex_message1 = get_messages_arrays()[0]; + complex_message1->float32_values = {{kFloat1Value, kFloat2Value, kFloat3Value}}; + complex_message1->bool_values = {{kBool1Value, kBool2Value, kBool3Value}}; + + // @{ Ordering matters. The mock reader implementation moves messages + // around without any knowledge about message chronology. It just picks + // the next one Make sure to keep the list in order or sort it! + std::vector> messages = + {serialize_test_message(kTopic1Name_, 100, primitive_message1), + serialize_test_message(kTopic2Name_, 120, complex_message1), + serialize_test_message(kTopic1Name_, 200, primitive_message1), + serialize_test_message(kTopic2Name_, 220, complex_message1), + serialize_test_message(kTopic1Name_, 300, primitive_message1), + serialize_test_message(kTopic2Name_, 320, complex_message1)}; + // @} + return messages; + } + + void InitPlayerWithPlaybackUntilAndPlay( + int64_t playback_until_millis, + size_t expected_number_of_messages_on_topic1 = 3, + size_t expected_number_of_messages_on_topic2 = 3, + int64_t playback_for_millis = -1) + { + auto topic_types = get_topic_types(); + auto messages = get_serialized_messages(); + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + sub_->add_subscription( + kTopic1_, expected_number_of_messages_on_topic1); + sub_->add_subscription(kTopic2_, expected_number_of_messages_on_topic2); + + play_options_.playback_until = RCL_MS_TO_NS(playback_until_millis); + play_options_.playback_duration = rclcpp::Duration( + std::chrono::milliseconds( + playback_for_millis)); + player_ = std::make_shared(std::move(reader), storage_options_, play_options_); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE(sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), 5s)); + + auto await_received_messages = sub_->spin_subscriptions(); + ASSERT_TRUE(player_->play()); + await_received_messages.get(); + } + + std::shared_ptr player_; +}; + + +TEST_F(RosBag2PlayUntilTestFixture, play_until_all_are_played_due_to_duration) +{ + InitPlayerWithPlaybackUntilAndPlay(350); + + auto replayed_test_primitives = sub_->get_received_messages( + kTopic1_); + EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(3u))); + EVAL_REPLAYED_PRIMITIVES(replayed_test_primitives); + + auto replayed_test_arrays = sub_->get_received_messages( + kTopic2_); + EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(3u))); + EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_test_arrays); + EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays); +} + +TEST_F(RosBag2PlayUntilTestFixture, play_until_none_are_played_due_to_timestamp) +{ + auto primitive_message1 = get_messages_basic_types()[0]; + auto primitive_message2 = get_messages_basic_types()[0]; + primitive_message1->int32_value = 1; + primitive_message2->int32_value = 2; + + auto topic_types = std::vector{ + {kTopic1Name_, "test_msgs/BasicTypes", "", ""}}; + + std::vector> messages = + { + serialize_test_message(kTopic1Name_, 50, primitive_message1), + serialize_test_message(kTopic1Name_, 100, primitive_message2), + }; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + // Expect to receive messages only from play_next in second round + sub_->add_subscription(kTopic1_, messages.size()); + play_options_.playback_until = RCL_MS_TO_NS(49); + + std::shared_ptr player_ = std::make_shared( + std::move(reader), storage_options_, play_options_); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE(sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), 5s)); + + auto await_received_messages = sub_->spin_subscriptions(); + ASSERT_TRUE(player_->play()); + + // Playing one more time with play_next to save time and count messages + player_->pause(); + auto player_future = std::async(std::launch::async, [&player_]() -> void {player_->play();}); + + EXPECT_TRUE(player_->play_next()); + EXPECT_TRUE(player_->play_next()); + EXPECT_FALSE(player_->play_next()); + player_->resume(); + player_future.get(); + await_received_messages.get(); + auto replayed_topic1 = sub_->get_received_messages(kTopic1_); + EXPECT_THAT(replayed_topic1, SizeIs(2)); + EXPECT_EQ(replayed_topic1[0]->int32_value, 1); + EXPECT_EQ(replayed_topic1[1]->int32_value, 2); +} + +TEST_F(RosBag2PlayUntilTestFixture, play_until_less_than_the_total_duration) +{ + auto primitive_message1 = get_messages_basic_types()[0]; + auto primitive_message2 = get_messages_basic_types()[0]; + primitive_message1->int32_value = 1; + primitive_message2->int32_value = 2; + + auto topic_types = std::vector{ + {kTopic1Name_, "test_msgs/BasicTypes", "", ""}}; + + std::vector> messages = + { + serialize_test_message(kTopic1Name_, 10, primitive_message1), + serialize_test_message(kTopic1Name_, 50, primitive_message2), + }; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + // Expect to receive 1 message from play() and 2 messages from play_next in second round + sub_->add_subscription(kTopic1_, messages.size() + 1); + play_options_.playback_until = RCL_MS_TO_NS(49); + + std::shared_ptr player_ = std::make_shared( + std::move(reader), storage_options_, play_options_); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE(sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), 5s)); + + auto await_received_messages = sub_->spin_subscriptions(); + ASSERT_TRUE(player_->play()); + + // Playing one more time with play_next to save time and count messages + player_->pause(); + auto player_future = std::async(std::launch::async, [&player_]() -> void {player_->play();}); + + ASSERT_TRUE(player_->play_next()); + ASSERT_TRUE(player_->play_next()); + ASSERT_FALSE(player_->play_next()); + player_->resume(); + player_future.get(); + await_received_messages.get(); + auto replayed_topic1 = sub_->get_received_messages(kTopic1_); + EXPECT_THAT(replayed_topic1, SizeIs(3)); + EXPECT_EQ(replayed_topic1[0]->int32_value, 1); + EXPECT_EQ(replayed_topic1[1]->int32_value, 1); + EXPECT_EQ(replayed_topic1[2]->int32_value, 2); +} + +TEST_F( + RosBag2PlayUntilTestFixture, + play_until_intermediate_duration_recorded_messages_with_filtered_topics) +{ + // Filter allows /topic2, blocks /topic1 + play_options_.topics_to_filter = {"topic2"}; + InitPlayerWithPlaybackUntilAndPlay(270, 0, 2); + + auto replayed_test_primitives = + sub_->get_received_messages("/topic1"); + // No messages are allowed to have arrived + EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(0u))); + + auto replayed_test_arrays = sub_->get_received_messages("/topic2"); + // Some messages should have arrived. + EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(2u))); + EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_test_arrays); + EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays); +} + +TEST_F(RosBag2PlayUntilTestFixture, play_should_return_false_when_interrupted) +{ + auto topic_types = std::vector{ + {kTopic1Name_, "test_msgs/BasicTypes", "", ""}}; + + auto primitive_message = get_messages_basic_types()[0]; + primitive_message->int32_value = kIntValue; + std::vector> messages = + { + serialize_test_message(kTopic1Name_, 50, primitive_message), + serialize_test_message(kTopic1Name_, 100, primitive_message), + }; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + // Let the player only reproduce one message. + sub_->add_subscription(kTopic1_, 1); + play_options_.playback_until = RCL_MS_TO_NS(75); + + std::shared_ptr player_ = std::make_shared( + std::move(reader), storage_options_, play_options_); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE(sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), 5s)); + + auto await_received_messages = sub_->spin_subscriptions(); + player_->pause(); + auto player_future = std::async(std::launch::async, [player_]() {return player_->play();}); + player_->wait_for_playback_to_start(); + ASSERT_TRUE(player_->is_paused()); + ASSERT_FALSE(player_->play()); + + player_->resume(); + player_future.get(); + await_received_messages.get(); + auto replayed_topic1 = sub_->get_received_messages(kTopic1_); + EXPECT_THAT(replayed_topic1, SizeIs(1)); +} + +TEST_F(RosBag2PlayUntilTestFixture, play_until_all_are_played_until_is_bigger_than_for) +{ + InitPlayerWithPlaybackUntilAndPlay( + 350 /* playback_until */, 3 /* num messages topic 1 */, + 3 /* num messages topic 2 */, 50 /* playback_for */); + + auto replayed_test_primitives = sub_->get_received_messages( + kTopic1_); + EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(3u))); + EVAL_REPLAYED_PRIMITIVES(replayed_test_primitives); + + auto replayed_test_arrays = sub_->get_received_messages( + kTopic2_); + EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(3u))); + EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_test_arrays); + EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays); +} + +TEST_F(RosBag2PlayUntilTestFixture, play_until_all_are_played_until_is_smaller_than_for) +{ + InitPlayerWithPlaybackUntilAndPlay( + 50 /* playback_until */, 1 /* num messages topic 1 */, + 1 /* num messages topic 2 */, 150 /* playback_for */); + + auto replayed_test_primitives = sub_->get_received_messages( + kTopic1_); + EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(1u))); + EVAL_REPLAYED_PRIMITIVES(replayed_test_primitives); + + auto replayed_test_arrays = sub_->get_received_messages( + kTopic2_); + EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(1u))); + EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_test_arrays); + EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays); +} From 7a07771542f3732154ac34679eef58f6ee49d2a7 Mon Sep 17 00:00:00 2001 From: Agustin Alba Chicar Date: Wed, 11 May 2022 15:22:46 +0200 Subject: [PATCH 04/10] Fixes variable name. Signed-off-by: Agustin Alba Chicar --- rosbag2_py/src/rosbag2_py/_transport.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index 77a8548239..cddb291e39 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -299,7 +299,7 @@ PYBIND11_MODULE(_transport, m) { &PlayOptions::getStartOffset, &PlayOptions::setStartOffset) .def_property( - "start_offset", + "playback_until", &PlayOptions::getPlaybackUntil, &PlayOptions::setPlaybackUntil) .def_readwrite("wait_acked_timeout", &PlayOptions::wait_acked_timeout) From 5f52cd732b89fe4bdbbf968b0ce0e8a8589a3ecd Mon Sep 17 00:00:00 2001 From: Agustin Alba Chicar Date: Mon, 16 May 2022 09:18:40 +0200 Subject: [PATCH 05/10] Adds the same docstring for play_until and play_duration options than in PlayOptions to clarify usage. Signed-off-by: Agustin Alba Chicar --- ros2bag/ros2bag/verb/play.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index a5fbcbfa96..168233e102 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -88,11 +88,15 @@ def add_arguments(self, parser, cli_name): # noqa: D102 parser.add_argument( '--playback_duration', type=float, default=-1.0, help='Playback duration, in seconds. Negative durations mark an infinite playback. ' - 'Default is -1.0.') + 'Default is -1.0. When positive, the maximum of `playback_until` and the one ' + 'that this attribute yields will be used to determine which one stops playback ' + 'execution.') parser.add_argument( '--playback_until', type=check_positive_float, default=-1.0, help='Playback until timestamp, in seconds. Negative stamps disable this feature. ' - 'Default is -1.0.') + 'Default is -1.0. When positive, the maximum of the effective time that ' + '`playback_duration` yields and this attribute will be used to determine which ' + 'one stops playback execution.') parser.add_argument( '--disable-keyboard-controls', action='store_true', help='disables keyboard controls for playback') From 5db3c5c05ea7b388359fe753606bccb3d79e2053 Mon Sep 17 00:00:00 2001 From: Agustin Alba Chicar Date: Mon, 23 May 2022 11:37:49 +0200 Subject: [PATCH 06/10] Updates the CLI to combine the seconds and nanoseconds part of the playback_until stamp Signed-off-by: Agustin Alba Chicar --- ros2bag/ros2bag/verb/play.py | 27 ++++++++++++++++++++---- rosbag2_py/src/rosbag2_py/_transport.cpp | 8 +++---- 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index 168233e102..a8dfda483a 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -92,9 +92,17 @@ def add_arguments(self, parser, cli_name): # noqa: D102 'that this attribute yields will be used to determine which one stops playback ' 'execution.') parser.add_argument( - '--playback_until', type=check_positive_float, default=-1.0, - help='Playback until timestamp, in seconds. Negative stamps disable this feature. ' - 'Default is -1.0. When positive, the maximum of the effective time that ' + '--playback_until_sec', type=int, default=-1, + help='Playback until timestamp, the seconds part. See \'--playback_until_nsec\' to ' + 'complete the stamp information. Negative stamps disable this feature. ' + 'Default is -1. When positive, the maximum of the effective time that ' + '`playback_duration` yields and this attribute will be used to determine which ' + 'one stops playback execution.') + parser.add_argument( + '--playback_until_nsec', type=int, default=-1, + help='Playback until timestamp, the nanoseconds part. See \'--playback_until_sec\' to ' + 'complete the stamp information. Negative stamps disable this feature. ' + 'Default is -1. When positive, the maximum of the effective time that ' '`playback_duration` yields and this attribute will be used to determine which ' 'one stops playback execution.') parser.add_argument( @@ -128,6 +136,16 @@ def add_arguments(self, parser, cli_name): # noqa: D102 help='Play for SEC seconds. Default is None, meaning that playback will continue ' 'until the end of the bag. Valid range > 0.0') + def get_playback_until_from(self, playback_until_sec, playback_until_nsec) -> int: + nano_scale = 1000 * 1000 * 1000 + if playback_until_sec >= 0 and playback_until_nsec >= 0: + return playback_until_sec * nano_scale + playback_until_nsec + elif playback_until_sec >= 0 and playback_until_nsec < 0: + return playback_until_sec * nano_scale + elif playback_until_sec < 0 and playback_until_nsec >= 0: + return playback_until_nsec + return -1 + def main(self, *, args): # noqa: D102 qos_profile_overrides = {} # Specify a valid default if args.qos_profile_overrides_path: @@ -163,7 +181,8 @@ def main(self, *, args): # noqa: D102 play_options.clock_publish_frequency = args.clock play_options.delay = args.delay play_options.playback_duration = args.playback_duration - play_options.playback_until = args.playback_until + play_options.playback_until = self.get_playback_until_from(args.playback_until_sec, + args.playback_until_nsec) play_options.disable_keyboard_controls = args.disable_keyboard_controls play_options.start_paused = args.start_paused play_options.start_offset = args.start_offset diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index cddb291e39..4b833330b7 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -95,14 +95,14 @@ struct OptionsWrapper : public T return RCUTILS_NS_TO_S(static_cast(this->start_offset)); } - void setPlaybackUntil(double playback_until) + void setPlaybackUntil(int64_t playback_until) { - this->playback_until = static_cast(RCUTILS_S_TO_NS(playback_until)); + this->playback_until = static_cast(playback_until); } - double getPlaybackUntil() const + int64_t getPlaybackUntil() const { - return RCUTILS_NS_TO_S(static_cast(this->playback_until)); + return this->playback_until; } void setTopicQoSProfileOverrides(const py::dict & overrides) From 86079874afa79d2abf6f0628417f7fccd6e23962 Mon Sep 17 00:00:00 2001 From: Agustin Alba Chicar Date: Wed, 25 May 2022 15:48:22 +0200 Subject: [PATCH 07/10] Changes to play verb parameters: - Unifies the use of '-' instead of '_' to split words in arguments. - Uses mutually exclusive arguments: --playback-until-sec and --playback-until-nsec to pass the timestamp. Signed-off-by: Agustin Alba Chicar --- ros2bag/ros2bag/verb/play.py | 45 ++++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index a8dfda483a..35467f773e 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -86,25 +86,32 @@ def add_arguments(self, parser, cli_name): # noqa: D102 '-d', '--delay', type=positive_float, default=0.0, help='Sleep duration before play (each loop), in seconds. Negative durations invalid.') parser.add_argument( - '--playback_duration', type=float, default=-1.0, + '--playback-duration', type=float, default=-1.0, help='Playback duration, in seconds. Negative durations mark an infinite playback. ' - 'Default is -1.0. When positive, the maximum of `playback_until` and the one ' + 'Default is -1.0. When positive, the maximum of `playback-until` and the one ' 'that this attribute yields will be used to determine which one stops playback ' 'execution.') - parser.add_argument( - '--playback_until_sec', type=int, default=-1, - help='Playback until timestamp, the seconds part. See \'--playback_until_nsec\' to ' - 'complete the stamp information. Negative stamps disable this feature. ' - 'Default is -1. When positive, the maximum of the effective time that ' - '`playback_duration` yields and this attribute will be used to determine which ' - 'one stops playback execution.') - parser.add_argument( - '--playback_until_nsec', type=int, default=-1, - help='Playback until timestamp, the nanoseconds part. See \'--playback_until_sec\' to ' - 'complete the stamp information. Negative stamps disable this feature. ' - 'Default is -1. When positive, the maximum of the effective time that ' - '`playback_duration` yields and this attribute will be used to determine which ' + + playback_until_arg_group = parser.add_mutually_exclusive_group() + playback_until_arg_group.add_argument( + '--playback-until-sec', type=float, default=-1., + help='Playback until timestamp, expressed in seconds since epoch. ' + 'Mutually exclusive argument with \'--playback-until-nsec\'. ' + 'Use this argument when floating point to integer conversion error is not a ' + 'problem for your application. Negative stamps disable this feature. Default is ' + '-1.0. When positive, the maximum of the effective time that ' + '`--playback-duration` yields and this attribute will be used to determine which ' 'one stops playback execution.') + playback_until_arg_group.add_argument( + '--playback-until-nsec', type=int, default=-1, + help='Playback until timestamp, expressed in nanoseconds since epoch. ' + 'Mutually exclusive argument with \'--playback-until-sec\'. ' + 'Use this argument when floating point to integer conversion error matters for ' + 'your application. Negative stamps disable this feature. Default is -1. When ' + 'positive, the maximum of the effective time that `--playback-duration` yields ' + 'and this attribute will be used to determine which one stops playback ' + 'execution.') + parser.add_argument( '--disable-keyboard-controls', action='store_true', help='disables keyboard controls for playback') @@ -138,11 +145,9 @@ def add_arguments(self, parser, cli_name): # noqa: D102 def get_playback_until_from(self, playback_until_sec, playback_until_nsec) -> int: nano_scale = 1000 * 1000 * 1000 - if playback_until_sec >= 0 and playback_until_nsec >= 0: - return playback_until_sec * nano_scale + playback_until_nsec - elif playback_until_sec >= 0 and playback_until_nsec < 0: - return playback_until_sec * nano_scale - elif playback_until_sec < 0 and playback_until_nsec >= 0: + if playback_until_sec and playback_until_sec >= 0.0: + return int(playback_until_sec * nano_scale) + if playback_until_nsec and playback_until_nsec >= 0: return playback_until_nsec return -1 From c48d21117a227bdb05a66bfe0c37117eb3702dcd Mon Sep 17 00:00:00 2001 From: Agustin Alba Chicar Date: Fri, 3 Jun 2022 15:54:14 +0200 Subject: [PATCH 08/10] Solves review comments. - Adds test cases to the player. - Renames play_until and similar occurrences to contain timestamp. - Renames play_for and similar occurrences to not contain for and include duration. Signed-off-by: Agustin Alba Chicar --- ros2bag/ros2bag/verb/play.py | 12 ++--- rosbag2_interfaces/srv/Play.srv | 4 +- rosbag2_py/src/rosbag2_py/_transport.cpp | 15 +++--- .../rosbag2_transport/play_options.hpp | 6 +-- .../include/rosbag2_transport/player.hpp | 5 +- .../src/rosbag2_transport/player.cpp | 25 +++++----- .../rosbag2_transport/test_play_until.cpp | 49 +++++++++++++------ 7 files changed, 68 insertions(+), 48 deletions(-) diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index 35467f773e..ff034f4019 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -88,7 +88,7 @@ def add_arguments(self, parser, cli_name): # noqa: D102 parser.add_argument( '--playback-duration', type=float, default=-1.0, help='Playback duration, in seconds. Negative durations mark an infinite playback. ' - 'Default is -1.0. When positive, the maximum of `playback-until` and the one ' + 'Default is -1.0. When positive, the maximum of `playback-until-*` and the one ' 'that this attribute yields will be used to determine which one stops playback ' 'execution.') @@ -96,7 +96,7 @@ def add_arguments(self, parser, cli_name): # noqa: D102 playback_until_arg_group.add_argument( '--playback-until-sec', type=float, default=-1., help='Playback until timestamp, expressed in seconds since epoch. ' - 'Mutually exclusive argument with \'--playback-until-nsec\'. ' + 'Mutually exclusive argument with `--playback-until-nsec`. ' 'Use this argument when floating point to integer conversion error is not a ' 'problem for your application. Negative stamps disable this feature. Default is ' '-1.0. When positive, the maximum of the effective time that ' @@ -105,7 +105,7 @@ def add_arguments(self, parser, cli_name): # noqa: D102 playback_until_arg_group.add_argument( '--playback-until-nsec', type=int, default=-1, help='Playback until timestamp, expressed in nanoseconds since epoch. ' - 'Mutually exclusive argument with \'--playback-until-sec\'. ' + 'Mutually exclusive argument with `--playback-until-sec`. ' 'Use this argument when floating point to integer conversion error matters for ' 'your application. Negative stamps disable this feature. Default is -1. When ' 'positive, the maximum of the effective time that `--playback-duration` yields ' @@ -143,7 +143,7 @@ def add_arguments(self, parser, cli_name): # noqa: D102 help='Play for SEC seconds. Default is None, meaning that playback will continue ' 'until the end of the bag. Valid range > 0.0') - def get_playback_until_from(self, playback_until_sec, playback_until_nsec) -> int: + def get_playback_until_from_arg_group(self, playback_until_sec, playback_until_nsec) -> int: nano_scale = 1000 * 1000 * 1000 if playback_until_sec and playback_until_sec >= 0.0: return int(playback_until_sec * nano_scale) @@ -186,8 +186,8 @@ def main(self, *, args): # noqa: D102 play_options.clock_publish_frequency = args.clock play_options.delay = args.delay play_options.playback_duration = args.playback_duration - play_options.playback_until = self.get_playback_until_from(args.playback_until_sec, - args.playback_until_nsec) + play_options.playback_until_timestamp = self.get_playback_until_from_arg_group( + args.playback_until_sec, args.playback_until_nsec) play_options.disable_keyboard_controls = args.disable_keyboard_controls play_options.start_paused = args.start_paused play_options.start_offset = args.start_offset diff --git a/rosbag2_interfaces/srv/Play.srv b/rosbag2_interfaces/srv/Play.srv index 7675fd2ae4..b2a371b21d 100644 --- a/rosbag2_interfaces/srv/Play.srv +++ b/rosbag2_interfaces/srv/Play.srv @@ -2,8 +2,8 @@ builtin_interfaces/Time start_offset # See rosbag2_transport::PlayOptions::playback_duration builtin_interfaces/Duration playback_duration -# See rosbag2_transport::PlayOptions::playback_until -builtin_interfaces/Time playback_until +# See rosbag2_transport::PlayOptions::playback_until_timestamp +builtin_interfaces/Time playback_until_timestamp --- # returns false when another playback execution is running, otherwise true bool success diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index 4b833330b7..53134242dd 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -95,14 +95,15 @@ struct OptionsWrapper : public T return RCUTILS_NS_TO_S(static_cast(this->start_offset)); } - void setPlaybackUntil(int64_t playback_until) + void setPlaybackUntilTimestamp(int64_t playback_until_timestamp) { - this->playback_until = static_cast(playback_until); + this->playback_until_timestamp = + static_cast(playback_until_timestamp); } - int64_t getPlaybackUntil() const + int64_t getPlaybackUntilTimestamp() const { - return this->playback_until; + return this->playback_until_timestamp; } void setTopicQoSProfileOverrides(const py::dict & overrides) @@ -299,9 +300,9 @@ PYBIND11_MODULE(_transport, m) { &PlayOptions::getStartOffset, &PlayOptions::setStartOffset) .def_property( - "playback_until", - &PlayOptions::getPlaybackUntil, - &PlayOptions::setPlaybackUntil) + "playback_until_timestamp", + &PlayOptions::getPlaybackUntilTimestamp, + &PlayOptions::setPlaybackUntilTimestamp) .def_readwrite("wait_acked_timeout", &PlayOptions::wait_acked_timeout) .def_readwrite("disable_loan_message", &PlayOptions::disable_loan_message) ; diff --git a/rosbag2_transport/include/rosbag2_transport/play_options.hpp b/rosbag2_transport/include/rosbag2_transport/play_options.hpp index dfd22fd281..8c58779def 100644 --- a/rosbag2_transport/include/rosbag2_transport/play_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/play_options.hpp @@ -53,15 +53,15 @@ struct PlayOptions // Determines the maximum duration of the playback. Negative durations will make the playback to // not stop. Default configuration makes the player to not stop execution. - // When positive, the maximum of `playback_until` and the one that this attribute yields will be - // used to determine which one stops playback execution. + // When positive, the maximum of `play_until_timestamp` and the one that this attribute yields + // will be used to determine which one stops playback execution. rclcpp::Duration playback_duration = rclcpp::Duration(-1, 0); // Determines the timestamp at which the playback will stop. Negative timestamps will make the // playback to not stop. Default configuration makes the player to not stop execution. // When positive, the maximum of the effective time that `playback_duration` yields and this // attribute will be used to determine which one stops playback execution. - rcutils_time_point_value_t playback_until = -1; + rcutils_time_point_value_t playback_until_timestamp = -1; // Start paused. bool start_paused = false; diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index 5c8ef3aaab..92b0513270 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -20,7 +20,6 @@ #include #include #include -#include #include #include #include @@ -249,7 +248,7 @@ class Player : public rclcpp::Node bool is_storage_completely_loaded() const; void enqueue_up_to_boundary(size_t boundary) RCPPUTILS_TSA_REQUIRES(reader_mutex_); void wait_for_filled_queue() const; - void play_messages_from_queue(); + void play_messages_from_queue(const rcutils_time_point_value_t & play_until_timestamp); void prepare_publishers(); bool publish_message(rosbag2_storage::SerializedBagMessageSharedPtr message); static callback_handle_t get_new_on_play_msg_callback_handle(); @@ -270,7 +269,7 @@ class Player : public rclcpp::Node rosbag2_storage::StorageOptions storage_options_; rosbag2_transport::PlayOptions play_options_; - rcutils_time_point_value_t play_until_time_ = -1; + rcutils_time_point_value_t play_until_timestamp_ = -1; moodycamel::ReaderWriterQueue message_queue_; mutable std::future storage_loading_future_; std::atomic_bool load_storage_content_{true}; diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 3a7559f858..21bb213b87 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -162,7 +162,7 @@ Player::Player( prepare_publishers(); if (play_options_.playback_duration >= rclcpp::Duration(0, 0)) { - play_until_time_ = starting_time_ + play_options_.playback_duration.nanoseconds(); + play_until_timestamp_ = starting_time_ + play_options_.playback_duration.nanoseconds(); } } create_control_services(); @@ -212,15 +212,15 @@ bool Player::play() "Invalid delay value: " << play_options_.delay.nanoseconds() << ". Delay is disabled."); } - rcutils_time_point_value_t play_until_time = -1; + rcutils_time_point_value_t play_until_timestamp = -1; if (play_options_.playback_duration >= rclcpp::Duration(0, 0) || - play_options_.playback_until >= rcutils_time_point_value_t{0}) + play_options_.playback_until_timestamp >= rcutils_time_point_value_t{0}) { - play_until_time = std::max( + play_until_timestamp = std::max( starting_time_ + play_options_.playback_duration.nanoseconds(), - play_options_.playback_until); + play_options_.playback_until_timestamp); } - RCLCPP_INFO_STREAM(get_logger(), "Playback duration value: " << play_until_time); + RCLCPP_INFO_STREAM(get_logger(), "Playback until timestamp: " << play_until_timestamp); try { do { @@ -237,7 +237,7 @@ bool Player::play() load_storage_content_ = true; storage_loading_future_ = std::async(std::launch::async, [this]() {load_storage_content();}); wait_for_filled_queue(); - play_messages_from_queue(); + play_messages_from_queue(play_until_timestamp); load_storage_content_ = false; if (storage_loading_future_.valid()) {storage_loading_future_.get();} @@ -375,7 +375,9 @@ bool Player::play_next() bool next_message_published = false; while (message_ptr != nullptr && !next_message_published) { - if (play_until_time_ >= starting_time_ && message_ptr->time_stamp > play_until_time_) { + if (play_until_timestamp_ >= starting_time_ && + message_ptr->time_stamp > play_until_timestamp_) + { break; } { @@ -525,7 +527,7 @@ void Player::enqueue_up_to_boundary(size_t boundary) } } -void Player::play_messages_from_queue() +void Player::play_messages_from_queue(const rcutils_time_point_value_t & play_until_timestamp) { // Note: We need to use message_queue_.peek() instead of message_queue_.try_dequeue(message) // to support play_next() API logic. @@ -539,7 +541,7 @@ void Player::play_messages_from_queue() ready_to_play_from_queue_cv_.notify_all(); } while (message_ptr != nullptr && rclcpp::ok()) { - if (play_until_time_ >= starting_time_ && message_ptr->time_stamp > play_until_time_) { + if (play_until_timestamp >= starting_time_ && message_ptr->time_stamp > play_until_timestamp) { break; } // Do not move on until sleep_until returns true @@ -789,7 +791,8 @@ void Player::create_control_services() { play_options_.start_offset = rclcpp::Time(request->start_offset).nanoseconds(); play_options_.playback_duration = rclcpp::Duration(request->playback_duration); - play_options_.playback_until = rclcpp::Time(request->playback_until).nanoseconds(); + play_options_.playback_until_timestamp = + rclcpp::Time(request->playback_until_timestamp).nanoseconds(); response->success = play(); }); srv_play_next_ = create_service( diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp index c1d5391920..fa580e808b 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp @@ -115,10 +115,10 @@ class RosBag2PlayUntilTestFixture : public RosBag2PlayTestFixture } void InitPlayerWithPlaybackUntilAndPlay( - int64_t playback_until_millis, + int64_t playback_until_timestamp_millis, size_t expected_number_of_messages_on_topic1 = 3, size_t expected_number_of_messages_on_topic2 = 3, - int64_t playback_for_millis = -1) + int64_t playbac_duration_millis = -1) { auto topic_types = get_topic_types(); auto messages = get_serialized_messages(); @@ -131,10 +131,9 @@ class RosBag2PlayUntilTestFixture : public RosBag2PlayTestFixture kTopic1_, expected_number_of_messages_on_topic1); sub_->add_subscription(kTopic2_, expected_number_of_messages_on_topic2); - play_options_.playback_until = RCL_MS_TO_NS(playback_until_millis); - play_options_.playback_duration = rclcpp::Duration( - std::chrono::milliseconds( - playback_for_millis)); + play_options_.playback_until_timestamp = RCL_MS_TO_NS(playback_until_timestamp_millis); + play_options_.playback_duration = + rclcpp::Duration(std::chrono::milliseconds(playbac_duration_millis)); player_ = std::make_shared(std::move(reader), storage_options_, play_options_); // Wait for discovery to match publishers with subscribers @@ -149,7 +148,7 @@ class RosBag2PlayUntilTestFixture : public RosBag2PlayTestFixture }; -TEST_F(RosBag2PlayUntilTestFixture, play_until_all_are_played_due_to_duration) +TEST_F(RosBag2PlayUntilTestFixture, play_until_all_are_played_due_to_timestamp) { InitPlayerWithPlaybackUntilAndPlay(350); @@ -187,7 +186,7 @@ TEST_F(RosBag2PlayUntilTestFixture, play_until_none_are_played_due_to_timestamp) // Expect to receive messages only from play_next in second round sub_->add_subscription(kTopic1_, messages.size()); - play_options_.playback_until = RCL_MS_TO_NS(49); + play_options_.playback_until_timestamp = RCL_MS_TO_NS(50) - 1; std::shared_ptr player_ = std::make_shared( std::move(reader), storage_options_, play_options_); @@ -236,7 +235,7 @@ TEST_F(RosBag2PlayUntilTestFixture, play_until_less_than_the_total_duration) // Expect to receive 1 message from play() and 2 messages from play_next in second round sub_->add_subscription(kTopic1_, messages.size() + 1); - play_options_.playback_until = RCL_MS_TO_NS(49); + play_options_.playback_until_timestamp = RCL_MS_TO_NS(50) - 1; std::shared_ptr player_ = std::make_shared( std::move(reader), storage_options_, play_options_); @@ -303,7 +302,7 @@ TEST_F(RosBag2PlayUntilTestFixture, play_should_return_false_when_interrupted) // Let the player only reproduce one message. sub_->add_subscription(kTopic1_, 1); - play_options_.playback_until = RCL_MS_TO_NS(75); + play_options_.playback_until_timestamp = RCL_MS_TO_NS(75); std::shared_ptr player_ = std::make_shared( std::move(reader), storage_options_, play_options_); @@ -325,11 +324,11 @@ TEST_F(RosBag2PlayUntilTestFixture, play_should_return_false_when_interrupted) EXPECT_THAT(replayed_topic1, SizeIs(1)); } -TEST_F(RosBag2PlayUntilTestFixture, play_until_all_are_played_until_is_bigger_than_for) +TEST_F(RosBag2PlayUntilTestFixture, play_until_overrides_playback_duration) { InitPlayerWithPlaybackUntilAndPlay( - 350 /* playback_until */, 3 /* num messages topic 1 */, - 3 /* num messages topic 2 */, 50 /* playback_for */); + 350 /* playback_until_timestamp_millis */, 3 /* num messages topic 1 */, + 3 /* num messages topic 2 */, 50 /* playback_duration_millis */); auto replayed_test_primitives = sub_->get_received_messages( kTopic1_); @@ -343,11 +342,29 @@ TEST_F(RosBag2PlayUntilTestFixture, play_until_all_are_played_until_is_bigger_th EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays); } -TEST_F(RosBag2PlayUntilTestFixture, play_until_all_are_played_until_is_smaller_than_for) +TEST_F(RosBag2PlayUntilTestFixture, playback_duration_overrides_play_until) { InitPlayerWithPlaybackUntilAndPlay( - 50 /* playback_until */, 1 /* num messages topic 1 */, - 1 /* num messages topic 2 */, 150 /* playback_for */); + 50 /* playback_until_timestamp_millis */, 1 /* num messages topic 1 */, + 1 /* num messages topic 2 */, 150 /* playback_duration_millis */); + + auto replayed_test_primitives = sub_->get_received_messages( + kTopic1_); + EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(1u))); + EVAL_REPLAYED_PRIMITIVES(replayed_test_primitives); + + auto replayed_test_arrays = sub_->get_received_messages( + kTopic2_); + EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(1u))); + EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_test_arrays); + EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays); +} + +TEST_F(RosBag2PlayUntilTestFixture, play_until_is_equal_to_the_total_duration) +{ + InitPlayerWithPlaybackUntilAndPlay( + 150 /* playback_until_timestamp_millis */, 1 /* num messages topic 1 */, + 1 /* num messages topic 2 */, 150 /* playback_duration_millis */); auto replayed_test_primitives = sub_->get_received_messages( kTopic1_); From 9faf6a9a8861bd4977332c936d3d0970d27cb1eb Mon Sep 17 00:00:00 2001 From: Agustin Alba Chicar Date: Fri, 17 Jun 2022 12:02:13 +0200 Subject: [PATCH 09/10] Solves review comments to tests. Signed-off-by: Agustin Alba Chicar --- .../include/rosbag2_transport/player.hpp | 4 +- .../src/rosbag2_transport/player.cpp | 37 +++++++++++-------- .../rosbag2_transport/test_play_until.cpp | 36 ++++++++---------- 3 files changed, 40 insertions(+), 37 deletions(-) diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index 92b0513270..58dc87d8df 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -248,7 +248,7 @@ class Player : public rclcpp::Node bool is_storage_completely_loaded() const; void enqueue_up_to_boundary(size_t boundary) RCPPUTILS_TSA_REQUIRES(reader_mutex_); void wait_for_filled_queue() const; - void play_messages_from_queue(const rcutils_time_point_value_t & play_until_timestamp); + void play_messages_from_queue(); void prepare_publishers(); bool publish_message(rosbag2_storage::SerializedBagMessageSharedPtr message); static callback_handle_t get_new_on_play_msg_callback_handle(); @@ -267,6 +267,8 @@ class Player : public rclcpp::Node void create_control_services(); + void configure_play_until_timestamp(); + rosbag2_storage::StorageOptions storage_options_; rosbag2_transport::PlayOptions play_options_; rcutils_time_point_value_t play_until_timestamp_ = -1; diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 21bb213b87..8334932f42 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -160,10 +160,7 @@ Player::Player( set_rate(play_options_.rate); topic_qos_profile_overrides_ = play_options_.topic_qos_profile_overrides; prepare_publishers(); - - if (play_options_.playback_duration >= rclcpp::Duration(0, 0)) { - play_until_timestamp_ = starting_time_ + play_options_.playback_duration.nanoseconds(); - } + configure_play_until_timestamp(); } create_control_services(); add_keyboard_callbacks(); @@ -212,15 +209,7 @@ bool Player::play() "Invalid delay value: " << play_options_.delay.nanoseconds() << ". Delay is disabled."); } - rcutils_time_point_value_t play_until_timestamp = -1; - if (play_options_.playback_duration >= rclcpp::Duration(0, 0) || - play_options_.playback_until_timestamp >= rcutils_time_point_value_t{0}) - { - play_until_timestamp = std::max( - starting_time_ + play_options_.playback_duration.nanoseconds(), - play_options_.playback_until_timestamp); - } - RCLCPP_INFO_STREAM(get_logger(), "Playback until timestamp: " << play_until_timestamp); + RCLCPP_INFO_STREAM(get_logger(), "Playback until timestamp: " << play_until_timestamp_); try { do { @@ -237,7 +226,7 @@ bool Player::play() load_storage_content_ = true; storage_loading_future_ = std::async(std::launch::async, [this]() {load_storage_content();}); wait_for_filled_queue(); - play_messages_from_queue(play_until_timestamp); + play_messages_from_queue(); load_storage_content_ = false; if (storage_loading_future_.valid()) {storage_loading_future_.get();} @@ -527,7 +516,7 @@ void Player::enqueue_up_to_boundary(size_t boundary) } } -void Player::play_messages_from_queue(const rcutils_time_point_value_t & play_until_timestamp) +void Player::play_messages_from_queue() { // Note: We need to use message_queue_.peek() instead of message_queue_.try_dequeue(message) // to support play_next() API logic. @@ -541,7 +530,9 @@ void Player::play_messages_from_queue(const rcutils_time_point_value_t & play_un ready_to_play_from_queue_cv_.notify_all(); } while (message_ptr != nullptr && rclcpp::ok()) { - if (play_until_timestamp >= starting_time_ && message_ptr->time_stamp > play_until_timestamp) { + if (play_until_timestamp_ >= starting_time_ && + message_ptr->time_stamp > play_until_timestamp_) + { break; } // Do not move on until sleep_until returns true @@ -793,6 +784,7 @@ void Player::create_control_services() play_options_.playback_duration = rclcpp::Duration(request->playback_duration); play_options_.playback_until_timestamp = rclcpp::Time(request->playback_until_timestamp).nanoseconds(); + configure_play_until_timestamp(); response->success = play(); }); srv_play_next_ = create_service( @@ -822,4 +814,17 @@ void Player::create_control_services() }); } +void Player::configure_play_until_timestamp() +{ + if (play_options_.playback_duration >= rclcpp::Duration(0, 0) || + play_options_.playback_until_timestamp >= rcutils_time_point_value_t{0}) + { + play_until_timestamp_ = std::max( + starting_time_ + play_options_.playback_duration.nanoseconds(), + play_options_.playback_until_timestamp); + } else { + play_until_timestamp_ = -1; + } +} + } // namespace rosbag2_transport diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp index fa580e808b..7f01dc1dbe 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp @@ -118,7 +118,7 @@ class RosBag2PlayUntilTestFixture : public RosBag2PlayTestFixture int64_t playback_until_timestamp_millis, size_t expected_number_of_messages_on_topic1 = 3, size_t expected_number_of_messages_on_topic2 = 3, - int64_t playbac_duration_millis = -1) + int64_t playback_duration_millis = -1) { auto topic_types = get_topic_types(); auto messages = get_serialized_messages(); @@ -133,7 +133,7 @@ class RosBag2PlayUntilTestFixture : public RosBag2PlayTestFixture play_options_.playback_until_timestamp = RCL_MS_TO_NS(playback_until_timestamp_millis); play_options_.playback_duration = - rclcpp::Duration(std::chrono::milliseconds(playbac_duration_millis)); + rclcpp::Duration(std::chrono::milliseconds(playback_duration_millis)); player_ = std::make_shared(std::move(reader), storage_options_, play_options_); // Wait for discovery to match publishers with subscribers @@ -184,8 +184,8 @@ TEST_F(RosBag2PlayUntilTestFixture, play_until_none_are_played_due_to_timestamp) prepared_mock_reader->prepare(messages, topic_types); auto reader = std::make_unique(std::move(prepared_mock_reader)); - // Expect to receive messages only from play_next in second round - sub_->add_subscription(kTopic1_, messages.size()); + // Expect to receive no messages. + sub_->add_subscription(kTopic1_, 0u); play_options_.playback_until_timestamp = RCL_MS_TO_NS(50) - 1; std::shared_ptr player_ = std::make_shared( @@ -197,20 +197,18 @@ TEST_F(RosBag2PlayUntilTestFixture, play_until_none_are_played_due_to_timestamp) auto await_received_messages = sub_->spin_subscriptions(); ASSERT_TRUE(player_->play()); - // Playing one more time with play_next to save time and count messages + // Playing one more time with play_next() to save time and count messages. Note + // that none of the following play() and play_next() functions will make any of + // the messages to be played. player_->pause(); auto player_future = std::async(std::launch::async, [&player_]() -> void {player_->play();}); - EXPECT_TRUE(player_->play_next()); - EXPECT_TRUE(player_->play_next()); EXPECT_FALSE(player_->play_next()); player_->resume(); player_future.get(); await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages(kTopic1_); - EXPECT_THAT(replayed_topic1, SizeIs(2)); - EXPECT_EQ(replayed_topic1[0]->int32_value, 1); - EXPECT_EQ(replayed_topic1[1]->int32_value, 2); + EXPECT_THAT(replayed_topic1, SizeIs(0)); } TEST_F(RosBag2PlayUntilTestFixture, play_until_less_than_the_total_duration) @@ -233,8 +231,8 @@ TEST_F(RosBag2PlayUntilTestFixture, play_until_less_than_the_total_duration) prepared_mock_reader->prepare(messages, topic_types); auto reader = std::make_unique(std::move(prepared_mock_reader)); - // Expect to receive 1 message from play() and 2 messages from play_next in second round - sub_->add_subscription(kTopic1_, messages.size() + 1); + // Expect to receive 1 message from play() and 1 message from play_next in second round. + sub_->add_subscription(kTopic1_, 2u); play_options_.playback_until_timestamp = RCL_MS_TO_NS(50) - 1; std::shared_ptr player_ = std::make_shared( @@ -246,21 +244,19 @@ TEST_F(RosBag2PlayUntilTestFixture, play_until_less_than_the_total_duration) auto await_received_messages = sub_->spin_subscriptions(); ASSERT_TRUE(player_->play()); - // Playing one more time with play_next to save time and count messages + // Playing one more time with play_next() to save time and count messages. player_->pause(); auto player_future = std::async(std::launch::async, [&player_]() -> void {player_->play();}); - ASSERT_TRUE(player_->play_next()); ASSERT_TRUE(player_->play_next()); ASSERT_FALSE(player_->play_next()); player_->resume(); player_future.get(); await_received_messages.get(); auto replayed_topic1 = sub_->get_received_messages(kTopic1_); - EXPECT_THAT(replayed_topic1, SizeIs(3)); + EXPECT_THAT(replayed_topic1, SizeIs(2)); EXPECT_EQ(replayed_topic1[0]->int32_value, 1); EXPECT_EQ(replayed_topic1[1]->int32_value, 1); - EXPECT_EQ(replayed_topic1[2]->int32_value, 2); } TEST_F( @@ -327,17 +323,17 @@ TEST_F(RosBag2PlayUntilTestFixture, play_should_return_false_when_interrupted) TEST_F(RosBag2PlayUntilTestFixture, play_until_overrides_playback_duration) { InitPlayerWithPlaybackUntilAndPlay( - 350 /* playback_until_timestamp_millis */, 3 /* num messages topic 1 */, - 3 /* num messages topic 2 */, 50 /* playback_duration_millis */); + 150 /* playback_until_timestamp_millis */, 1 /* num messages topic 1 */, + 1 /* num messages topic 2 */, 50 /* playback_duration_millis */); auto replayed_test_primitives = sub_->get_received_messages( kTopic1_); - EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(3u))); + EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(1u))); EVAL_REPLAYED_PRIMITIVES(replayed_test_primitives); auto replayed_test_arrays = sub_->get_received_messages( kTopic2_); - EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(3u))); + EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(1u))); EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_test_arrays); EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays); } From fa3654fc1a6f59eb85b52aae4584c7816b530d85 Mon Sep 17 00:00:00 2001 From: Agustin Alba Chicar Date: Tue, 21 Jun 2022 14:36:37 +0200 Subject: [PATCH 10/10] Modifies play_until_is_equal_to_the_total_duration The test body now evaluates a bag whose duration is exactly the same as given timestamp in PlayOptions. Signed-off-by: Agustin Alba Chicar --- .../rosbag2_transport/test_play_until.cpp | 46 ++++++++++++++----- 1 file changed, 34 insertions(+), 12 deletions(-) diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp index 7f01dc1dbe..0a0bf9944b 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp @@ -358,18 +358,40 @@ TEST_F(RosBag2PlayUntilTestFixture, playback_duration_overrides_play_until) TEST_F(RosBag2PlayUntilTestFixture, play_until_is_equal_to_the_total_duration) { - InitPlayerWithPlaybackUntilAndPlay( - 150 /* playback_until_timestamp_millis */, 1 /* num messages topic 1 */, - 1 /* num messages topic 2 */, 150 /* playback_duration_millis */); + auto primitive_message1 = get_messages_basic_types()[0]; + auto primitive_message2 = get_messages_basic_types()[0]; + primitive_message1->int32_value = 1; + primitive_message2->int32_value = 2; - auto replayed_test_primitives = sub_->get_received_messages( - kTopic1_); - EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(1u))); - EVAL_REPLAYED_PRIMITIVES(replayed_test_primitives); + auto topic_types = std::vector{ + {kTopic1Name_, "test_msgs/BasicTypes", "", ""}}; - auto replayed_test_arrays = sub_->get_received_messages( - kTopic2_); - EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(1u))); - EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_test_arrays); - EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays); + std::vector> messages = + { + serialize_test_message(kTopic1Name_, 10, primitive_message1), + serialize_test_message(kTopic1Name_, 50, primitive_message2), + }; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + // Expect to receive 1 message from play() and 2 messages from play_next in second round + sub_->add_subscription(kTopic1_, messages.size()); + play_options_.playback_until_timestamp = RCL_MS_TO_NS(50); + + std::shared_ptr player_ = std::make_shared( + std::move(reader), storage_options_, play_options_); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE(sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), 5s)); + + auto await_received_messages = sub_->spin_subscriptions(); + ASSERT_TRUE(player_->play()); + + await_received_messages.get(); + auto replayed_topic1 = sub_->get_received_messages(kTopic1_); + EXPECT_THAT(replayed_topic1, SizeIs(messages.size())); + EXPECT_EQ(replayed_topic1[0]->int32_value, 1); + EXPECT_EQ(replayed_topic1[1]->int32_value, 2); }