diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index 4932522a44..ff034f4019 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -86,9 +86,32 @@ def add_arguments(self, parser, cli_name): # noqa: D102 '-d', '--delay', type=positive_float, default=0.0, help='Sleep duration before play (each loop), in seconds. Negative durations invalid.') parser.add_argument( - '--playback_duration', type=float, default=-1.0, + '--playback-duration', type=float, default=-1.0, help='Playback duration, in seconds. Negative durations mark an infinite playback. ' - 'Default is -1.0.') + 'Default is -1.0. When positive, the maximum of `playback-until-*` and the one ' + 'that this attribute yields will be used to determine which one stops playback ' + 'execution.') + + playback_until_arg_group = parser.add_mutually_exclusive_group() + playback_until_arg_group.add_argument( + '--playback-until-sec', type=float, default=-1., + help='Playback until timestamp, expressed in seconds since epoch. ' + 'Mutually exclusive argument with `--playback-until-nsec`. ' + 'Use this argument when floating point to integer conversion error is not a ' + 'problem for your application. Negative stamps disable this feature. Default is ' + '-1.0. When positive, the maximum of the effective time that ' + '`--playback-duration` yields and this attribute will be used to determine which ' + 'one stops playback execution.') + playback_until_arg_group.add_argument( + '--playback-until-nsec', type=int, default=-1, + help='Playback until timestamp, expressed in nanoseconds since epoch. ' + 'Mutually exclusive argument with `--playback-until-sec`. ' + 'Use this argument when floating point to integer conversion error matters for ' + 'your application. Negative stamps disable this feature. Default is -1. When ' + 'positive, the maximum of the effective time that `--playback-duration` yields ' + 'and this attribute will be used to determine which one stops playback ' + 'execution.') + parser.add_argument( '--disable-keyboard-controls', action='store_true', help='disables keyboard controls for playback') @@ -120,6 +143,14 @@ def add_arguments(self, parser, cli_name): # noqa: D102 help='Play for SEC seconds. Default is None, meaning that playback will continue ' 'until the end of the bag. Valid range > 0.0') + def get_playback_until_from_arg_group(self, playback_until_sec, playback_until_nsec) -> int: + nano_scale = 1000 * 1000 * 1000 + if playback_until_sec and playback_until_sec >= 0.0: + return int(playback_until_sec * nano_scale) + if playback_until_nsec and playback_until_nsec >= 0: + return playback_until_nsec + return -1 + def main(self, *, args): # noqa: D102 qos_profile_overrides = {} # Specify a valid default if args.qos_profile_overrides_path: @@ -155,6 +186,8 @@ def main(self, *, args): # noqa: D102 play_options.clock_publish_frequency = args.clock play_options.delay = args.delay play_options.playback_duration = args.playback_duration + play_options.playback_until_timestamp = self.get_playback_until_from_arg_group( + args.playback_until_sec, args.playback_until_nsec) play_options.disable_keyboard_controls = args.disable_keyboard_controls play_options.start_paused = args.start_paused play_options.start_offset = args.start_offset diff --git a/rosbag2_interfaces/srv/Play.srv b/rosbag2_interfaces/srv/Play.srv index 54edf7dd45..b2a371b21d 100644 --- a/rosbag2_interfaces/srv/Play.srv +++ b/rosbag2_interfaces/srv/Play.srv @@ -2,6 +2,8 @@ builtin_interfaces/Time start_offset # See rosbag2_transport::PlayOptions::playback_duration builtin_interfaces/Duration playback_duration +# See rosbag2_transport::PlayOptions::playback_until_timestamp +builtin_interfaces/Time playback_until_timestamp --- # returns false when another playback execution is running, otherwise true bool success diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index 00fb965834..53134242dd 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -95,6 +95,17 @@ struct OptionsWrapper : public T return RCUTILS_NS_TO_S(static_cast(this->start_offset)); } + void setPlaybackUntilTimestamp(int64_t playback_until_timestamp) + { + this->playback_until_timestamp = + static_cast(playback_until_timestamp); + } + + int64_t getPlaybackUntilTimestamp() const + { + return this->playback_until_timestamp; + } + void setTopicQoSProfileOverrides(const py::dict & overrides) { py_dict = overrides; @@ -288,6 +299,10 @@ PYBIND11_MODULE(_transport, m) { "start_offset", &PlayOptions::getStartOffset, &PlayOptions::setStartOffset) + .def_property( + "playback_until_timestamp", + &PlayOptions::getPlaybackUntilTimestamp, + &PlayOptions::setPlaybackUntilTimestamp) .def_readwrite("wait_acked_timeout", &PlayOptions::wait_acked_timeout) .def_readwrite("disable_loan_message", &PlayOptions::disable_loan_message) ; diff --git a/rosbag2_transport/CMakeLists.txt b/rosbag2_transport/CMakeLists.txt index 3e1bbac729..5d2e7605ac 100644 --- a/rosbag2_transport/CMakeLists.txt +++ b/rosbag2_transport/CMakeLists.txt @@ -160,7 +160,13 @@ function(create_tests_for_rmw_implementation) LINK_LIBS rosbag2_transport AMENT_DEPS test_msgs rosbag2_test_common) - rosbag2_transport_add_gmock(test_burst + rosbag2_transport_add_gmock(test_play_until + test/rosbag2_transport/test_play_until.cpp + INCLUDE_DIRS $ + LINK_LIBS rosbag2_transport + AMENT_DEPS test_msgs rosbag2_test_common) + + rosbag2_transport_add_gmock(test_burst test/rosbag2_transport/test_burst.cpp INCLUDE_DIRS $ LINK_LIBS rosbag2_transport diff --git a/rosbag2_transport/include/rosbag2_transport/play_options.hpp b/rosbag2_transport/include/rosbag2_transport/play_options.hpp index 92225373cc..8c58779def 100644 --- a/rosbag2_transport/include/rosbag2_transport/play_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/play_options.hpp @@ -22,6 +22,7 @@ #include "keyboard_handler/keyboard_handler.hpp" #include "rclcpp/duration.hpp" +#include "rclcpp/time.hpp" #include "rclcpp/qos.hpp" namespace rosbag2_transport @@ -52,8 +53,16 @@ struct PlayOptions // Determines the maximum duration of the playback. Negative durations will make the playback to // not stop. Default configuration makes the player to not stop execution. + // When positive, the maximum of `play_until_timestamp` and the one that this attribute yields + // will be used to determine which one stops playback execution. rclcpp::Duration playback_duration = rclcpp::Duration(-1, 0); + // Determines the timestamp at which the playback will stop. Negative timestamps will make the + // playback to not stop. Default configuration makes the player to not stop execution. + // When positive, the maximum of the effective time that `playback_duration` yields and this + // attribute will be used to determine which one stops playback execution. + rcutils_time_point_value_t playback_until_timestamp = -1; + // Start paused. bool start_paused = false; diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index 5c8ef3aaab..58dc87d8df 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -20,7 +20,6 @@ #include #include #include -#include #include #include #include @@ -268,9 +267,11 @@ class Player : public rclcpp::Node void create_control_services(); + void configure_play_until_timestamp(); + rosbag2_storage::StorageOptions storage_options_; rosbag2_transport::PlayOptions play_options_; - rcutils_time_point_value_t play_until_time_ = -1; + rcutils_time_point_value_t play_until_timestamp_ = -1; moodycamel::ReaderWriterQueue message_queue_; mutable std::future storage_loading_future_; std::atomic_bool load_storage_content_{true}; diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 3c986b92c5..8334932f42 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -160,10 +160,7 @@ Player::Player( set_rate(play_options_.rate); topic_qos_profile_overrides_ = play_options_.topic_qos_profile_overrides; prepare_publishers(); - - if (play_options_.playback_duration >= rclcpp::Duration(0, 0)) { - play_until_time_ = starting_time_ + play_options_.playback_duration.nanoseconds(); - } + configure_play_until_timestamp(); } create_control_services(); add_keyboard_callbacks(); @@ -212,7 +209,7 @@ bool Player::play() "Invalid delay value: " << play_options_.delay.nanoseconds() << ". Delay is disabled."); } - RCLCPP_INFO_STREAM(get_logger(), "Playback duration value: " << play_until_time_); + RCLCPP_INFO_STREAM(get_logger(), "Playback until timestamp: " << play_until_timestamp_); try { do { @@ -367,7 +364,9 @@ bool Player::play_next() bool next_message_published = false; while (message_ptr != nullptr && !next_message_published) { - if (play_until_time_ >= starting_time_ && message_ptr->time_stamp > play_until_time_) { + if (play_until_timestamp_ >= starting_time_ && + message_ptr->time_stamp > play_until_timestamp_) + { break; } { @@ -531,7 +530,9 @@ void Player::play_messages_from_queue() ready_to_play_from_queue_cv_.notify_all(); } while (message_ptr != nullptr && rclcpp::ok()) { - if (play_until_time_ >= starting_time_ && message_ptr->time_stamp > play_until_time_) { + if (play_until_timestamp_ >= starting_time_ && + message_ptr->time_stamp > play_until_timestamp_) + { break; } // Do not move on until sleep_until returns true @@ -781,6 +782,9 @@ void Player::create_control_services() { play_options_.start_offset = rclcpp::Time(request->start_offset).nanoseconds(); play_options_.playback_duration = rclcpp::Duration(request->playback_duration); + play_options_.playback_until_timestamp = + rclcpp::Time(request->playback_until_timestamp).nanoseconds(); + configure_play_until_timestamp(); response->success = play(); }); srv_play_next_ = create_service( @@ -810,4 +814,17 @@ void Player::create_control_services() }); } +void Player::configure_play_until_timestamp() +{ + if (play_options_.playback_duration >= rclcpp::Duration(0, 0) || + play_options_.playback_until_timestamp >= rcutils_time_point_value_t{0}) + { + play_until_timestamp_ = std::max( + starting_time_ + play_options_.playback_duration.nanoseconds(), + play_options_.playback_until_timestamp); + } else { + play_until_timestamp_ = -1; + } +} + } // namespace rosbag2_transport diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp new file mode 100644 index 0000000000..0a0bf9944b --- /dev/null +++ b/rosbag2_transport/test/rosbag2_transport/test_play_until.cpp @@ -0,0 +1,397 @@ +// Copyright 2022, Open Source Robotics Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License.Bosch Software Innovations GmbH. + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "rclcpp/rclcpp.hpp" + +#include "rosbag2_test_common/subscription_manager.hpp" + +#include "rosbag2_transport/player.hpp" +#include "rosbag2_transport/qos.hpp" + +#include "test_msgs/msg/arrays.hpp" +#include "test_msgs/msg/basic_types.hpp" +#include "test_msgs/message_fixtures.hpp" + +#include "rosbag2_play_test_fixture.hpp" +#include "rosbag2_transport_test_fixture.hpp" + +#include "mock_player.hpp" + +using namespace ::testing; // NOLINT +using namespace rosbag2_transport; // NOLINT +using namespace std::chrono_literals; // NOLINT +using namespace rosbag2_test_common; // NOLINT + +constexpr int kIntValue{32}; + +constexpr float kFloat1Value{40.}; +constexpr float kFloat2Value{2.}; +constexpr float kFloat3Value{0.}; + +constexpr bool kBool1Value{false}; +constexpr bool kBool2Value{true}; +constexpr bool kBool3Value{false}; + +#define EVAL_REPLAYED_PRIMITIVES(replayed_primitives) \ + EXPECT_THAT( \ + replayed_primitives, \ + Each(Pointee(Field(&test_msgs::msg::BasicTypes::int32_value, kIntValue)))) + +#define EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_float_array_primitive) \ + EXPECT_THAT( \ + replayed_float_array_primitive, \ + Each( \ + Pointee( \ + Field( \ + &test_msgs::msg::Arrays::float32_values, \ + ElementsAre(kFloat1Value, kFloat2Value, kFloat3Value))))) + +#define EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_bool_array_primitive) \ + EXPECT_THAT( \ + replayed_bool_array_primitive, \ + Each( \ + Pointee( \ + Field( \ + &test_msgs::msg::Arrays::bool_values, \ + ElementsAre(kBool1Value, kBool2Value, kBool3Value))))) + +class RosBag2PlayUntilTestFixture : public RosBag2PlayTestFixture +{ +public: + static constexpr const char * kTopic1Name_{"topic1"}; + static constexpr const char * kTopic2Name_{"topic2"}; + static constexpr const char * kTopic1_{"/topic1"}; + static constexpr const char * kTopic2_{"/topic2"}; + + std::vector get_topic_types() + { + return {{kTopic1Name_, "test_msgs/BasicTypes", "", ""}, + {kTopic2Name_, "test_msgs/Arrays", "", ""}}; + } + + std::vector> + get_serialized_messages() + { + auto primitive_message1 = get_messages_basic_types()[0]; + primitive_message1->int32_value = kIntValue; + + auto complex_message1 = get_messages_arrays()[0]; + complex_message1->float32_values = {{kFloat1Value, kFloat2Value, kFloat3Value}}; + complex_message1->bool_values = {{kBool1Value, kBool2Value, kBool3Value}}; + + // @{ Ordering matters. The mock reader implementation moves messages + // around without any knowledge about message chronology. It just picks + // the next one Make sure to keep the list in order or sort it! + std::vector> messages = + {serialize_test_message(kTopic1Name_, 100, primitive_message1), + serialize_test_message(kTopic2Name_, 120, complex_message1), + serialize_test_message(kTopic1Name_, 200, primitive_message1), + serialize_test_message(kTopic2Name_, 220, complex_message1), + serialize_test_message(kTopic1Name_, 300, primitive_message1), + serialize_test_message(kTopic2Name_, 320, complex_message1)}; + // @} + return messages; + } + + void InitPlayerWithPlaybackUntilAndPlay( + int64_t playback_until_timestamp_millis, + size_t expected_number_of_messages_on_topic1 = 3, + size_t expected_number_of_messages_on_topic2 = 3, + int64_t playback_duration_millis = -1) + { + auto topic_types = get_topic_types(); + auto messages = get_serialized_messages(); + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + sub_->add_subscription( + kTopic1_, expected_number_of_messages_on_topic1); + sub_->add_subscription(kTopic2_, expected_number_of_messages_on_topic2); + + play_options_.playback_until_timestamp = RCL_MS_TO_NS(playback_until_timestamp_millis); + play_options_.playback_duration = + rclcpp::Duration(std::chrono::milliseconds(playback_duration_millis)); + player_ = std::make_shared(std::move(reader), storage_options_, play_options_); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE(sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), 5s)); + + auto await_received_messages = sub_->spin_subscriptions(); + ASSERT_TRUE(player_->play()); + await_received_messages.get(); + } + + std::shared_ptr player_; +}; + + +TEST_F(RosBag2PlayUntilTestFixture, play_until_all_are_played_due_to_timestamp) +{ + InitPlayerWithPlaybackUntilAndPlay(350); + + auto replayed_test_primitives = sub_->get_received_messages( + kTopic1_); + EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(3u))); + EVAL_REPLAYED_PRIMITIVES(replayed_test_primitives); + + auto replayed_test_arrays = sub_->get_received_messages( + kTopic2_); + EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(3u))); + EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_test_arrays); + EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays); +} + +TEST_F(RosBag2PlayUntilTestFixture, play_until_none_are_played_due_to_timestamp) +{ + auto primitive_message1 = get_messages_basic_types()[0]; + auto primitive_message2 = get_messages_basic_types()[0]; + primitive_message1->int32_value = 1; + primitive_message2->int32_value = 2; + + auto topic_types = std::vector{ + {kTopic1Name_, "test_msgs/BasicTypes", "", ""}}; + + std::vector> messages = + { + serialize_test_message(kTopic1Name_, 50, primitive_message1), + serialize_test_message(kTopic1Name_, 100, primitive_message2), + }; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + // Expect to receive no messages. + sub_->add_subscription(kTopic1_, 0u); + play_options_.playback_until_timestamp = RCL_MS_TO_NS(50) - 1; + + std::shared_ptr player_ = std::make_shared( + std::move(reader), storage_options_, play_options_); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE(sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), 5s)); + + auto await_received_messages = sub_->spin_subscriptions(); + ASSERT_TRUE(player_->play()); + + // Playing one more time with play_next() to save time and count messages. Note + // that none of the following play() and play_next() functions will make any of + // the messages to be played. + player_->pause(); + auto player_future = std::async(std::launch::async, [&player_]() -> void {player_->play();}); + + EXPECT_FALSE(player_->play_next()); + player_->resume(); + player_future.get(); + await_received_messages.get(); + auto replayed_topic1 = sub_->get_received_messages(kTopic1_); + EXPECT_THAT(replayed_topic1, SizeIs(0)); +} + +TEST_F(RosBag2PlayUntilTestFixture, play_until_less_than_the_total_duration) +{ + auto primitive_message1 = get_messages_basic_types()[0]; + auto primitive_message2 = get_messages_basic_types()[0]; + primitive_message1->int32_value = 1; + primitive_message2->int32_value = 2; + + auto topic_types = std::vector{ + {kTopic1Name_, "test_msgs/BasicTypes", "", ""}}; + + std::vector> messages = + { + serialize_test_message(kTopic1Name_, 10, primitive_message1), + serialize_test_message(kTopic1Name_, 50, primitive_message2), + }; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + // Expect to receive 1 message from play() and 1 message from play_next in second round. + sub_->add_subscription(kTopic1_, 2u); + play_options_.playback_until_timestamp = RCL_MS_TO_NS(50) - 1; + + std::shared_ptr player_ = std::make_shared( + std::move(reader), storage_options_, play_options_); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE(sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), 5s)); + + auto await_received_messages = sub_->spin_subscriptions(); + ASSERT_TRUE(player_->play()); + + // Playing one more time with play_next() to save time and count messages. + player_->pause(); + auto player_future = std::async(std::launch::async, [&player_]() -> void {player_->play();}); + + ASSERT_TRUE(player_->play_next()); + ASSERT_FALSE(player_->play_next()); + player_->resume(); + player_future.get(); + await_received_messages.get(); + auto replayed_topic1 = sub_->get_received_messages(kTopic1_); + EXPECT_THAT(replayed_topic1, SizeIs(2)); + EXPECT_EQ(replayed_topic1[0]->int32_value, 1); + EXPECT_EQ(replayed_topic1[1]->int32_value, 1); +} + +TEST_F( + RosBag2PlayUntilTestFixture, + play_until_intermediate_duration_recorded_messages_with_filtered_topics) +{ + // Filter allows /topic2, blocks /topic1 + play_options_.topics_to_filter = {"topic2"}; + InitPlayerWithPlaybackUntilAndPlay(270, 0, 2); + + auto replayed_test_primitives = + sub_->get_received_messages("/topic1"); + // No messages are allowed to have arrived + EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(0u))); + + auto replayed_test_arrays = sub_->get_received_messages("/topic2"); + // Some messages should have arrived. + EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(2u))); + EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_test_arrays); + EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays); +} + +TEST_F(RosBag2PlayUntilTestFixture, play_should_return_false_when_interrupted) +{ + auto topic_types = std::vector{ + {kTopic1Name_, "test_msgs/BasicTypes", "", ""}}; + + auto primitive_message = get_messages_basic_types()[0]; + primitive_message->int32_value = kIntValue; + std::vector> messages = + { + serialize_test_message(kTopic1Name_, 50, primitive_message), + serialize_test_message(kTopic1Name_, 100, primitive_message), + }; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + // Let the player only reproduce one message. + sub_->add_subscription(kTopic1_, 1); + play_options_.playback_until_timestamp = RCL_MS_TO_NS(75); + + std::shared_ptr player_ = std::make_shared( + std::move(reader), storage_options_, play_options_); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE(sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), 5s)); + + auto await_received_messages = sub_->spin_subscriptions(); + player_->pause(); + auto player_future = std::async(std::launch::async, [player_]() {return player_->play();}); + player_->wait_for_playback_to_start(); + ASSERT_TRUE(player_->is_paused()); + ASSERT_FALSE(player_->play()); + + player_->resume(); + player_future.get(); + await_received_messages.get(); + auto replayed_topic1 = sub_->get_received_messages(kTopic1_); + EXPECT_THAT(replayed_topic1, SizeIs(1)); +} + +TEST_F(RosBag2PlayUntilTestFixture, play_until_overrides_playback_duration) +{ + InitPlayerWithPlaybackUntilAndPlay( + 150 /* playback_until_timestamp_millis */, 1 /* num messages topic 1 */, + 1 /* num messages topic 2 */, 50 /* playback_duration_millis */); + + auto replayed_test_primitives = sub_->get_received_messages( + kTopic1_); + EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(1u))); + EVAL_REPLAYED_PRIMITIVES(replayed_test_primitives); + + auto replayed_test_arrays = sub_->get_received_messages( + kTopic2_); + EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(1u))); + EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_test_arrays); + EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays); +} + +TEST_F(RosBag2PlayUntilTestFixture, playback_duration_overrides_play_until) +{ + InitPlayerWithPlaybackUntilAndPlay( + 50 /* playback_until_timestamp_millis */, 1 /* num messages topic 1 */, + 1 /* num messages topic 2 */, 150 /* playback_duration_millis */); + + auto replayed_test_primitives = sub_->get_received_messages( + kTopic1_); + EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(1u))); + EVAL_REPLAYED_PRIMITIVES(replayed_test_primitives); + + auto replayed_test_arrays = sub_->get_received_messages( + kTopic2_); + EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(1u))); + EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_test_arrays); + EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays); +} + +TEST_F(RosBag2PlayUntilTestFixture, play_until_is_equal_to_the_total_duration) +{ + auto primitive_message1 = get_messages_basic_types()[0]; + auto primitive_message2 = get_messages_basic_types()[0]; + primitive_message1->int32_value = 1; + primitive_message2->int32_value = 2; + + auto topic_types = std::vector{ + {kTopic1Name_, "test_msgs/BasicTypes", "", ""}}; + + std::vector> messages = + { + serialize_test_message(kTopic1Name_, 10, primitive_message1), + serialize_test_message(kTopic1Name_, 50, primitive_message2), + }; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + // Expect to receive 1 message from play() and 2 messages from play_next in second round + sub_->add_subscription(kTopic1_, messages.size()); + play_options_.playback_until_timestamp = RCL_MS_TO_NS(50); + + std::shared_ptr player_ = std::make_shared( + std::move(reader), storage_options_, play_options_); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE(sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), 5s)); + + auto await_received_messages = sub_->spin_subscriptions(); + ASSERT_TRUE(player_->play()); + + await_received_messages.get(); + auto replayed_topic1 = sub_->get_received_messages(kTopic1_); + EXPECT_THAT(replayed_topic1, SizeIs(messages.size())); + EXPECT_EQ(replayed_topic1[0]->int32_value, 1); + EXPECT_EQ(replayed_topic1[1]->int32_value, 2); +}