From 6a5ab9213465fbae76004ea79520829686fb903b Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Sun, 27 Jun 2021 17:41:52 +0800 Subject: [PATCH] Add spin_and_wait_for_matched to PublicationManager and update test codes Signed-off-by: Barry Xu --- .../publication_manager.hpp | 61 +++++++++++- .../test_rosbag2_record_end_to_end.cpp | 93 +++++++++++++++++++ 2 files changed, 150 insertions(+), 4 deletions(-) diff --git a/rosbag2_test_common/include/rosbag2_test_common/publication_manager.hpp b/rosbag2_test_common/include/rosbag2_test_common/publication_manager.hpp index f9e91c02ab..ad40b12144 100644 --- a/rosbag2_test_common/include/rosbag2_test_common/publication_manager.hpp +++ b/rosbag2_test_common/include/rosbag2_test_common/publication_manager.hpp @@ -15,6 +15,7 @@ #ifndef ROSBAG2_TEST_COMMON__PUBLICATION_MANAGER_HPP_ #define ROSBAG2_TEST_COMMON__PUBLICATION_MANAGER_HPP_ +#include #include #include #include @@ -58,7 +59,9 @@ class PublicationManager { auto publisher = pub_node_->create_publisher(topic_name, qos); - publishers_.push_back( + publishers_.push_back(publisher); + + publishers_fcn_.push_back( [publisher, message, repetition, interval](bool verbose = false) { for (auto i = 0u; i < repetition; ++i) { if (rclcpp::ok()) { @@ -76,15 +79,18 @@ class PublicationManager void run_publishers(bool verbose = false) const { - for (const auto & pub_fcn : publishers_) { + for (const auto & pub_fcn : publishers_fcn_) { pub_fcn(verbose); } + rclcpp::executors::SingleThreadedExecutor exec; + exec.spin_node_some(pub_node_); + std::this_thread::sleep_for(100ms); } void run_publishers_async(bool verbose = false) const { std::vector> futures; - for (const auto & pub_fcn : publishers_) { + for (const auto & pub_fcn : publishers_fcn_) { futures.push_back(std::async(std::launch::async, pub_fcn, verbose)); } for (auto & publisher_future : futures) { @@ -92,10 +98,57 @@ class PublicationManager } } + /// \brief Wait until publisher with specified topic will be connected to the subscribers or + /// timeout occur. + /// \tparam Timeout Data type for timeout duration from std::chrono:: namespace + /// \param topic_name topic name to find corresponding publisher + /// \param timeout Maximum time duration during which discovery should happen. + /// \param n_subscribers_to_match Number of subscribers publisher should have for match. + /// \return true if not find publisher by topic name or publisher has specified number of + /// subscribers, otherwise false. + template + bool spin_and_wait_for_matched( + const char * topic_name, + Timeout timeout, size_t n_subscribers_to_match = 1) + { + rclcpp::PublisherBase::SharedPtr publisher = nullptr; + for (const auto pub : publishers_) { + if (! std::strcmp(pub->get_topic_name(), topic_name)) { + publisher = pub; + break; + } + } + + if (publisher == nullptr) { + return false; + } + + if (publisher->get_subscription_count() + + publisher->get_intra_process_subscription_count() >= n_subscribers_to_match ) { + return true; + } + + using clock = std::chrono::steady_clock; + auto start = clock::now(); + + rclcpp::executors::SingleThreadedExecutor exec; + do { + exec.spin_node_some(pub_node_); + + if (publisher->get_subscription_count() + + publisher->get_intra_process_subscription_count() >= n_subscribers_to_match ) { + return true; + } + } while((clock::now() - start) < timeout); + + return false; + } + private: std::shared_ptr pub_node_; using PublicationF = std::function; - std::vector publishers_; + std::vector publishers_fcn_; + std::vector publishers_; }; } // namespace rosbag2_test_common diff --git a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp index 194e1ebaa1..c263f5dc8c 100644 --- a/rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp +++ b/rosbag2_tests/test/rosbag2_tests/test_rosbag2_record_end_to_end.cpp @@ -20,6 +20,7 @@ #include "rclcpp/rclcpp.hpp" #include "rcpputils/filesystem_helper.hpp" +#include "rcpputils/scope_exit.hpp" #include "rcutils/filesystem.h" #include "rosbag2_compression_zstd/zstd_decompressor.hpp" #include "rosbag2_storage/metadata_io.hpp" @@ -77,11 +78,19 @@ TEST_F(RecordFixture, record_end_to_end_test_with_zstd_file_compression) { " " << topic_name; auto process_handle = start_execution(cmd.str()); + auto cleanup_process_handle = rcpputils::make_scope_exit( + [process_handle]() { + stop_execution(process_handle); + }); + + ASSERT_TRUE(pub_manager.spin_and_wait_for_matched(topic_name, 30s, 1)) << + "Expected find rosbag subscription"; wait_for_db(); pub_manager.run_publishers(); stop_execution(process_handle); + cleanup_process_handle.cancel(); wait_for_metadata(); @@ -124,11 +133,20 @@ TEST_F(RecordFixture, record_end_to_end_test) { auto process_handle = start_execution( "ros2 bag record --max-cache-size 0 --output " + root_bag_path_.string() + " /test_topic"); + auto cleanup_process_handle = rcpputils::make_scope_exit( + [process_handle]() { + stop_execution(process_handle); + }); + + ASSERT_TRUE(pub_manager.spin_and_wait_for_matched("/test_topic", 30s, 1)) << + "Expected find rosbag subscription"; + wait_for_db(); pub_manager.run_publishers(); stop_execution(process_handle); + cleanup_process_handle.cancel(); // TODO(Martin-Idel-SI): Find out how to correctly send a Ctrl-C signal on Windows // This is necessary as the process is killed hard on Windows and doesn't write a metadata file @@ -187,11 +205,22 @@ TEST_F(RecordFixture, record_end_to_end_with_splitting_metadata_contains_all_top " --max-bag-size " << bagfile_split_size << " -a"; auto process_handle = start_execution(command.str()); + auto cleanup_process_handle = rcpputils::make_scope_exit( + [process_handle]() { + stop_execution(process_handle); + }); + + ASSERT_TRUE(pub_manager.spin_and_wait_for_matched("/test_topic0", 30s, 1)) << + "Expected find rosbag subscription"; + ASSERT_TRUE(pub_manager.spin_and_wait_for_matched("/test_topic1", 30s, 1)) << + "Expected find rosbag subscription"; + wait_for_db(); pub_manager.run_publishers(); stop_execution(process_handle); + cleanup_process_handle.cancel(); wait_for_metadata(); rosbag2_storage::MetadataIo metadataIo; @@ -230,9 +259,18 @@ TEST_F(RecordFixture, record_end_to_end_with_splitting_bagsize_split_is_at_least " --max-bag-size " << bagfile_split_size << " " << topic_name; auto process_handle = start_execution(command.str()); + auto cleanup_process_handle = rcpputils::make_scope_exit( + [process_handle]() { + stop_execution(process_handle); + }); + + ASSERT_TRUE(pub_manager.spin_and_wait_for_matched(topic_name, 30s, 1)) << + "Expected find rosbag subscription"; + wait_for_db(); stop_execution(process_handle); + cleanup_process_handle.cancel(); pub_manager.run_publishers(); @@ -295,11 +333,20 @@ TEST_F(RecordFixture, record_end_to_end_with_splitting_max_size_not_reached) { " --max-bag-size " << bagfile_split_size << " " << topic_name; auto process_handle = start_execution(command.str()); + auto cleanup_process_handle = rcpputils::make_scope_exit( + [process_handle]() { + stop_execution(process_handle); + }); + + ASSERT_TRUE(pub_manager.spin_and_wait_for_matched(topic_name, 30s, 1)) << + "Expected find rosbag subscription"; + wait_for_db(); pub_manager.run_publishers(); stop_execution(process_handle); + cleanup_process_handle.cancel(); rosbag2_storage::MetadataIo metadata_io; @@ -350,11 +397,20 @@ TEST_F(RecordFixture, record_end_to_end_with_splitting_splits_bagfile) { " --max-bag-size " << bagfile_split_size << " " << topic_name; auto process_handle = start_execution(command.str()); + auto cleanup_process_handle = rcpputils::make_scope_exit( + [process_handle]() { + stop_execution(process_handle); + }); + + ASSERT_TRUE(pub_manager.spin_and_wait_for_matched(topic_name, 30s, 1)) << + "Expected find rosbag subscription"; + wait_for_db(); pub_manager.run_publishers(); stop_execution(process_handle); + cleanup_process_handle.cancel(); rosbag2_storage::MetadataIo metadata_io; @@ -412,11 +468,20 @@ TEST_F(RecordFixture, record_end_to_end_with_duration_splitting_splits_bagfile) " -d " << bagfile_split_duration << " " << topic_name; auto process_handle = start_execution(command.str()); + auto cleanup_process_handle = rcpputils::make_scope_exit( + [process_handle]() { + stop_execution(process_handle); + }); + + ASSERT_TRUE(pub_manager.spin_and_wait_for_matched(topic_name, 30s, 1)) << + "Expected find rosbag subscription"; + wait_for_db(); pub_manager.run_publishers(); stop_execution(process_handle); + cleanup_process_handle.cancel(); rosbag2_storage::MetadataIo metadata_io; @@ -469,11 +534,20 @@ TEST_F(RecordFixture, record_end_to_end_test_with_zstd_file_compression_compress " " << topic_name; auto process_handle = start_execution(command.str()); + auto cleanup_process_handle = rcpputils::make_scope_exit( + [process_handle]() { + stop_execution(process_handle); + }); + + ASSERT_TRUE(pub_manager.spin_and_wait_for_matched(topic_name, 30s, 1)) << + "Expected find rosbag subscription"; + wait_for_db(); pub_manager.run_publishers(); stop_execution(process_handle); + cleanup_process_handle.cancel(); rosbag2_storage::MetadataIo metadata_io; @@ -574,11 +648,20 @@ TEST_F(RecordFixture, record_end_to_end_test_with_cache) { auto process_handle = start_execution( "ros2 bag record --output " + root_bag_path_.string() + " " + topic_name + " " + "--max-cache-size " + std::to_string(max_cache_size)); + auto cleanup_process_handle = rcpputils::make_scope_exit( + [process_handle]() { + stop_execution(process_handle); + }); + + ASSERT_TRUE(pub_manager.spin_and_wait_for_matched(topic_name, 30s, 1)) << + "Expected find rosbag subscription"; + wait_for_db(); pub_manager.run_publishers(); stop_execution(process_handle); + cleanup_process_handle.cancel(); // TODO(Martin-Idel-SI): Find out how to correctly send a Ctrl-C signal on Windows // This is necessary as the process is killed hard on Windows and doesn't write a metadata file @@ -621,11 +704,21 @@ TEST_F(RecordFixture, rosbag2_record_and_play_multiple_topics_with_filter) { " --max-bag-size " << bagfile_split_size << " -a"; auto process_handle = start_execution(command_record.str()); + auto cleanup_process_handle = rcpputils::make_scope_exit( + [process_handle]() { + stop_execution(process_handle); + }); + + ASSERT_TRUE(pub_manager.spin_and_wait_for_matched(first_topic_name, 30s, 1)) << + "Expected find rosbag subscription"; + ASSERT_TRUE(pub_manager.spin_and_wait_for_matched(second_topic_name, 30s, 1)) << + "Expected find rosbag subscription"; wait_for_db(); pub_manager.run_publishers(); stop_execution(process_handle); + cleanup_process_handle.cancel(); wait_for_metadata();