Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add spin_and_wait_for_matched to PublicationManager and update test c… #796

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef ROSBAG2_TEST_COMMON__PUBLICATION_MANAGER_HPP_
#define ROSBAG2_TEST_COMMON__PUBLICATION_MANAGER_HPP_

#include <cstring>
#include <functional>
#include <future>
#include <memory>
Expand Down Expand Up @@ -58,7 +59,9 @@ class PublicationManager
{
auto publisher = pub_node_->create_publisher<MsgT>(topic_name, qos);

publishers_.push_back(
publishers_.push_back(publisher);

publishers_fcn_.push_back(
[publisher, message, repetition, interval](bool verbose = false) {
for (auto i = 0u; i < repetition; ++i) {
if (rclcpp::ok()) {
Expand All @@ -76,26 +79,76 @@ class PublicationManager

void run_publishers(bool verbose = false) const
{
for (const auto & pub_fcn : publishers_) {
for (const auto & pub_fcn : publishers_fcn_) {
pub_fcn(verbose);
}
rclcpp::executors::SingleThreadedExecutor exec;
exec.spin_node_some(pub_node_);
std::this_thread::sleep_for(100ms);
}

void run_publishers_async(bool verbose = false) const
{
std::vector<std::future<void>> futures;
for (const auto & pub_fcn : publishers_) {
for (const auto & pub_fcn : publishers_fcn_) {
futures.push_back(std::async(std::launch::async, pub_fcn, verbose));
}
for (auto & publisher_future : futures) {
publisher_future.get();
}
}

/// \brief Wait until publisher with specified topic will be connected to the subscribers or
/// timeout occur.
/// \tparam Timeout Data type for timeout duration from std::chrono:: namespace
/// \param topic_name topic name to find corresponding publisher
/// \param timeout Maximum time duration during which discovery should happen.
/// \param n_subscribers_to_match Number of subscribers publisher should have for match.
/// \return true if not find publisher by topic name or publisher has specified number of
/// subscribers, otherwise false.
template<typename Timeout>
bool spin_and_wait_for_matched(
const char * topic_name,
Timeout timeout, size_t n_subscribers_to_match = 1)
{
rclcpp::PublisherBase::SharedPtr publisher = nullptr;
for (const auto pub : publishers_) {
if (! std::strcmp(pub->get_topic_name(), topic_name)) {
publisher = pub;
break;
}
}

if (publisher == nullptr) {
return false;
}

if (publisher->get_subscription_count() +
publisher->get_intra_process_subscription_count() >= n_subscribers_to_match ) {
return true;
}

using clock = std::chrono::steady_clock;
auto start = clock::now();

rclcpp::executors::SingleThreadedExecutor exec;
do {
exec.spin_node_some(pub_node_);

if (publisher->get_subscription_count() +
publisher->get_intra_process_subscription_count() >= n_subscribers_to_match ) {
return true;
}
} while((clock::now() - start) < timeout);

return false;
}

private:
std::shared_ptr<rclcpp::Node> pub_node_;
using PublicationF = std::function<void (bool)>;
std::vector<PublicationF> publishers_;
std::vector<PublicationF> publishers_fcn_;
std::vector<rclcpp::PublisherBase::SharedPtr> publishers_;
};

} // namespace rosbag2_test_common
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "rclcpp/rclcpp.hpp"
#include "rcpputils/filesystem_helper.hpp"
#include "rcpputils/scope_exit.hpp"
#include "rcutils/filesystem.h"
#include "rosbag2_compression_zstd/zstd_decompressor.hpp"
#include "rosbag2_storage/metadata_io.hpp"
Expand Down Expand Up @@ -77,11 +78,19 @@ TEST_F(RecordFixture, record_end_to_end_test_with_zstd_file_compression) {
" " << topic_name;

auto process_handle = start_execution(cmd.str());
auto cleanup_process_handle = rcpputils::make_scope_exit(
[process_handle]() {
stop_execution(process_handle);
});

ASSERT_TRUE(pub_manager.spin_and_wait_for_matched(topic_name, 30s, 1)) <<
"Expected find rosbag subscription";
wait_for_db();

pub_manager.run_publishers();

stop_execution(process_handle);
cleanup_process_handle.cancel();

wait_for_metadata();

Expand Down Expand Up @@ -124,11 +133,20 @@ TEST_F(RecordFixture, record_end_to_end_test) {

auto process_handle = start_execution(
"ros2 bag record --max-cache-size 0 --output " + root_bag_path_.string() + " /test_topic");
auto cleanup_process_handle = rcpputils::make_scope_exit(
[process_handle]() {
stop_execution(process_handle);
});

ASSERT_TRUE(pub_manager.spin_and_wait_for_matched("/test_topic", 30s, 1)) <<
"Expected find rosbag subscription";

wait_for_db();

pub_manager.run_publishers();

stop_execution(process_handle);
cleanup_process_handle.cancel();

// TODO(Martin-Idel-SI): Find out how to correctly send a Ctrl-C signal on Windows
// This is necessary as the process is killed hard on Windows and doesn't write a metadata file
Expand Down Expand Up @@ -187,11 +205,22 @@ TEST_F(RecordFixture, record_end_to_end_with_splitting_metadata_contains_all_top
" --max-bag-size " << bagfile_split_size <<
" -a";
auto process_handle = start_execution(command.str());
auto cleanup_process_handle = rcpputils::make_scope_exit(
[process_handle]() {
stop_execution(process_handle);
});

ASSERT_TRUE(pub_manager.spin_and_wait_for_matched("/test_topic0", 30s, 1)) <<
"Expected find rosbag subscription";
ASSERT_TRUE(pub_manager.spin_and_wait_for_matched("/test_topic1", 30s, 1)) <<
"Expected find rosbag subscription";

wait_for_db();

pub_manager.run_publishers();

stop_execution(process_handle);
cleanup_process_handle.cancel();

wait_for_metadata();
rosbag2_storage::MetadataIo metadataIo;
Expand Down Expand Up @@ -230,9 +259,18 @@ TEST_F(RecordFixture, record_end_to_end_with_splitting_bagsize_split_is_at_least
" --max-bag-size " << bagfile_split_size <<
" " << topic_name;
auto process_handle = start_execution(command.str());
auto cleanup_process_handle = rcpputils::make_scope_exit(
[process_handle]() {
stop_execution(process_handle);
});

ASSERT_TRUE(pub_manager.spin_and_wait_for_matched(topic_name, 30s, 1)) <<
"Expected find rosbag subscription";

wait_for_db();

stop_execution(process_handle);
cleanup_process_handle.cancel();

pub_manager.run_publishers();

Expand Down Expand Up @@ -295,11 +333,20 @@ TEST_F(RecordFixture, record_end_to_end_with_splitting_max_size_not_reached) {
" --max-bag-size " << bagfile_split_size <<
" " << topic_name;
auto process_handle = start_execution(command.str());
auto cleanup_process_handle = rcpputils::make_scope_exit(
[process_handle]() {
stop_execution(process_handle);
});

ASSERT_TRUE(pub_manager.spin_and_wait_for_matched(topic_name, 30s, 1)) <<
"Expected find rosbag subscription";

wait_for_db();

pub_manager.run_publishers();

stop_execution(process_handle);
cleanup_process_handle.cancel();

rosbag2_storage::MetadataIo metadata_io;

Expand Down Expand Up @@ -350,11 +397,20 @@ TEST_F(RecordFixture, record_end_to_end_with_splitting_splits_bagfile) {
" --max-bag-size " << bagfile_split_size <<
" " << topic_name;
auto process_handle = start_execution(command.str());
auto cleanup_process_handle = rcpputils::make_scope_exit(
[process_handle]() {
stop_execution(process_handle);
});

ASSERT_TRUE(pub_manager.spin_and_wait_for_matched(topic_name, 30s, 1)) <<
"Expected find rosbag subscription";

wait_for_db();

pub_manager.run_publishers();

stop_execution(process_handle);
cleanup_process_handle.cancel();

rosbag2_storage::MetadataIo metadata_io;

Expand Down Expand Up @@ -412,11 +468,20 @@ TEST_F(RecordFixture, record_end_to_end_with_duration_splitting_splits_bagfile)
" -d " << bagfile_split_duration <<
" " << topic_name;
auto process_handle = start_execution(command.str());
auto cleanup_process_handle = rcpputils::make_scope_exit(
[process_handle]() {
stop_execution(process_handle);
});

ASSERT_TRUE(pub_manager.spin_and_wait_for_matched(topic_name, 30s, 1)) <<
"Expected find rosbag subscription";

wait_for_db();

pub_manager.run_publishers();

stop_execution(process_handle);
cleanup_process_handle.cancel();

rosbag2_storage::MetadataIo metadata_io;

Expand Down Expand Up @@ -469,11 +534,20 @@ TEST_F(RecordFixture, record_end_to_end_test_with_zstd_file_compression_compress
" " << topic_name;

auto process_handle = start_execution(command.str());
auto cleanup_process_handle = rcpputils::make_scope_exit(
[process_handle]() {
stop_execution(process_handle);
});

ASSERT_TRUE(pub_manager.spin_and_wait_for_matched(topic_name, 30s, 1)) <<
"Expected find rosbag subscription";

wait_for_db();

pub_manager.run_publishers();

stop_execution(process_handle);
cleanup_process_handle.cancel();

rosbag2_storage::MetadataIo metadata_io;

Expand Down Expand Up @@ -574,11 +648,20 @@ TEST_F(RecordFixture, record_end_to_end_test_with_cache) {
auto process_handle = start_execution(
"ros2 bag record --output " + root_bag_path_.string() + " " + topic_name + " " +
"--max-cache-size " + std::to_string(max_cache_size));
auto cleanup_process_handle = rcpputils::make_scope_exit(
[process_handle]() {
stop_execution(process_handle);
});

ASSERT_TRUE(pub_manager.spin_and_wait_for_matched(topic_name, 30s, 1)) <<
"Expected find rosbag subscription";

wait_for_db();

pub_manager.run_publishers();

stop_execution(process_handle);
cleanup_process_handle.cancel();

// TODO(Martin-Idel-SI): Find out how to correctly send a Ctrl-C signal on Windows
// This is necessary as the process is killed hard on Windows and doesn't write a metadata file
Expand Down Expand Up @@ -621,11 +704,21 @@ TEST_F(RecordFixture, rosbag2_record_and_play_multiple_topics_with_filter) {
" --max-bag-size " << bagfile_split_size <<
" -a";
auto process_handle = start_execution(command_record.str());
auto cleanup_process_handle = rcpputils::make_scope_exit(
[process_handle]() {
stop_execution(process_handle);
});

ASSERT_TRUE(pub_manager.spin_and_wait_for_matched(first_topic_name, 30s, 1)) <<
"Expected find rosbag subscription";
ASSERT_TRUE(pub_manager.spin_and_wait_for_matched(second_topic_name, 30s, 1)) <<
"Expected find rosbag subscription";
wait_for_db();

pub_manager.run_publishers();

stop_execution(process_handle);
cleanup_process_handle.cancel();

wait_for_metadata();

Expand Down