Skip to content

Commit

Permalink
Record all topics (ros2#30)
Browse files Browse the repository at this point in the history
* ros2GH-23 Get all topics from node and sanitize

* ros2GH-23 Move methods to node for better interface

* ros2GH-23 Use rmw_serialized_message_t consistently

* ros2GH-23 Improve santization of topics

* ros2GH-65 Introduce and use better logging macros

* ros2GH-23 Use publisher to serialized message directly

* ros2GH-23 Improve readability of sanitizing topics and types

* ros2GH-23 Allow to write all available topics

* ros2GH-23 Add test for record all

* ros2GH-23 Cleanup: add missing const ref to record interface

* Cleanup for doxygen

* Improve topic sanitization

- correctly expand topic names using rcl
- do not check type correctness (supposed to be done internally)

* Pass topic_name by reference
  • Loading branch information
Martin-Idel-SI authored and Karsten1987 committed Sep 11, 2018
1 parent 52d5ab3 commit 3b23acc
Show file tree
Hide file tree
Showing 19 changed files with 651 additions and 295 deletions.
21 changes: 13 additions & 8 deletions rosbag2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ if(BUILD_TESTING)
target_link_libraries(rosbag2_write_integration_test librosbag2)
endif()

ament_add_gmock(rosbag2_write_all_integration_test
test/rosbag2/rosbag2_write_all_integration_test.cpp
test/rosbag2/test_memory_management.cpp
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
if(TARGET rosbag2_write_all_integration_test)
target_link_libraries(rosbag2_write_all_integration_test librosbag2)
endif()

ament_add_gmock(rosbag2_read_integration_test
test/rosbag2/rosbag2_read_integration_test.cpp
test/rosbag2/test_memory_management.cpp
Expand All @@ -110,14 +118,6 @@ if(BUILD_TESTING)
)
endif()

ament_add_gmock(rosbag2_integration_test
test/rosbag2/rosbag2_integration_test.cpp
src/rosbag2/typesupport_helpers.cpp
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
if(TARGET rosbag2_integration_test)
target_link_libraries(rosbag2_integration_test librosbag2)
endif()

ament_add_gmock(rosbag2_rosbag_node_test
test/rosbag2/rosbag2_rosbag_node_test.cpp
test/rosbag2/test_memory_management.cpp
Expand All @@ -127,6 +127,11 @@ if(BUILD_TESTING)
src/rosbag2/typesupport_helpers.cpp
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
if(TARGET rosbag2_rosbag_node_test)
target_include_directories(rosbag2_rosbag_node_test
PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>
)
ament_target_dependencies(rosbag2_rosbag_node_test
ament_index_cpp
Poco
Expand Down
61 changes: 61 additions & 0 deletions rosbag2/include/rosbag2/logging.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2018, Bosch Software Innovations GmbH.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2__LOGGING_HPP_
#define ROSBAG2__LOGGING_HPP_

#include <sstream>
#include <string>

#include "rcutils/logging_macros.h"

#define ROSBAG2_PACKAGE_NAME "rosbag2"

#define ROSBAG2_LOG_INFO(...) \
RCUTILS_LOG_INFO_NAMED(ROSBAG2_PACKAGE_NAME, __VA_ARGS__)

#define ROSBAG2_LOG_INFO_STREAM(args) do { \
std::stringstream __ss; \
__ss << args; \
RCUTILS_LOG_INFO_NAMED(ROSBAG2_PACKAGE_NAME, __ss.str().c_str()); \
} while (0)

#define ROSBAG2_LOG_ERROR(...) \
RCUTILS_LOG_ERROR_NAMED(ROSBAG2_PACKAGE_NAME, __VA_ARGS__)

#define ROSBAG2_LOG_ERROR_STREAM(args) do { \
std::stringstream __ss; \
__ss << args; \
RCUTILS_LOG_ERROR_NAMED(ROSBAG2_PACKAGE_NAME, __ss.str().c_str()); \
} while (0)

#define ROSBAG2_LOG_WARN(...) \
RCUTILS_LOG_WARN_NAMED(ROSBAG2_PACKAGE_NAME, __VA_ARGS__)

#define ROSBAG2_LOG_WARN_STREAM(args) do { \
std::stringstream __ss; \
__ss << args; \
RCUTILS_LOG_WARN_NAMED(ROSBAG2_PACKAGE_NAME, __ss.str().c_str()); \
} while (0)

#define ROSBAG2_LOG_DEBUG(...) \
RCUTILS_LOG_DEBUG_NAMED(ROSBAG2_PACKAGE_NAME, __VA_ARGS__)

#define ROSBAG2_LOG_DEBUG_STREAM(args) do { \
std::stringstream __ss; \
__ss << args; \
RCUTILS_LOG_DEBUG_NAMED(ROSBAG2_PACKAGE_NAME, __ss.str().c_str()); \
} while (0)

#endif // ROSBAG2__LOGGING_HPP_
22 changes: 17 additions & 5 deletions rosbag2/include/rosbag2/rosbag2.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,31 @@ class Rosbag2Node;
class Rosbag2
{
public:
/**
* Records topics to a bagfile. Subscription happens at startup time, hence the topics must
* exist when "record" is called.
*
* \param file_name Name of the bagfile to write
* \param topic_names Vector of topics to subscribe to. Topics must exist at startup time. If
* the vector is empty, all topics will be subscribed.
* \param after_write_action This function will be executed after each write to the database
* where the input parameter is the topic name of the topic written Currently needed for testing.
* Might be removed later.
*/
ROSBAG2_PUBLIC
void record(
const std::string & file_name,
std::vector<std::string> topic_names,
const std::vector<std::string> & topic_names,
std::function<void(std::string)> after_write_action = nullptr);

/**
* Replay all topics in a bagfile.
*
* \param file_name Name of the bagfile to replay
*/
ROSBAG2_PUBLIC
void play(const std::string & file_name);

ROSBAG2_PUBLIC
std::map<std::string, std::string> get_topics_with_types(
const std::vector<std::string> & topic_names, std::shared_ptr<rclcpp::Node> node);

private:
void prepare_publishers(
std::shared_ptr<Rosbag2Node> node,
Expand Down
8 changes: 5 additions & 3 deletions rosbag2/src/rosbag2/demo_record.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
int main(int argc, const char ** argv)
{
if (argc < 2) {
std::cerr << "\nThe names of topics to record must be given as parameter!\n";
std::cerr << "\nThe names of topics or `--all` must be given to record as parameter!\n";
return 0;
}
std::vector<std::string> topics;

for (int i = 1; i < argc; i++) {
topics.emplace_back(argv[i]);
if (strcmp(argv[1], "--all") != 0) {
for (int i = 1; i < argc; i++) {
topics.emplace_back(argv[i]);
}
}

// TODO(anhosi): allow output file to be specified by cli argument and do proper checking if
Expand Down
7 changes: 4 additions & 3 deletions rosbag2/src/rosbag2/generic_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include "rclcpp/any_subscription_callback.hpp"
#include "rclcpp/subscription.hpp"

#include "rosbag2/logging.hpp"

namespace rosbag2
{

Expand Down Expand Up @@ -98,9 +100,8 @@ GenericSubscription::borrow_serialized_message(size_t capacity)
auto fini_return = rmw_serialized_message_fini(msg);
delete msg;
if (fini_return != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rosbag2",
"failed to destroy serialized message: %s", rcl_get_error_string_safe());
ROSBAG2_LOG_ERROR_STREAM(
"Failed to destroy serialized message: " << rcl_get_error_string_safe());
}
});

Expand Down
8 changes: 4 additions & 4 deletions rosbag2/src/rosbag2/generic_subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ class GenericSubscription : public rclcpp::SubscriptionBase
* Constructor. In order to properly subscribe to a topic, this subscription needs to be added to
* the node_topic_interface of the node passed into this constructor.
*
* @param node_handle Node handle to the node to create the subscription to
* @param ts Type support handle
* @param topic_name Topic name
* @param callback Callback for new messages of serialized form
* \param node_handle Node handle to the node to create the subscription to
* \param ts Type support handle
* \param topic_name Topic name
* \param callback Callback for new messages of serialized form
*/
GenericSubscription(
std::shared_ptr<rcl_node_t> node_handle,
Expand Down
47 changes: 9 additions & 38 deletions rosbag2/src/rosbag2/rosbag2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "rclcpp/rclcpp.hpp"
#include "rcutils/logging_macros.h"

#include "rosbag2/logging.hpp"
#include "rosbag2_storage/serialized_bag_message.hpp"
#include "rosbag2_storage/storage_factory.hpp"
#include "rosbag2_storage/storage_interfaces/read_only_interface.hpp"
Expand All @@ -42,20 +43,21 @@ const char * ROS_PACKAGE_NAME = "rosbag2";

void Rosbag2::record(
const std::string & file_name,
std::vector<std::string> topic_names,
const std::vector<std::string> & topic_names,
std::function<void(std::string)> after_write_action)
{
rosbag2_storage::StorageFactory factory;
auto storage = factory.open_read_write(file_name, "sqlite3");

if (!storage) {
throw std::runtime_error("No storage could be initialized. Abort");
return;
}

auto node = std::make_shared<Rosbag2Node>("rosbag2");

auto topics_and_types = get_topics_with_types(topic_names, node);
auto topics_and_types = topic_names.empty() ?
node->get_all_topics_with_types() :
node->get_topics_with_types(topic_names);

if (topics_and_types.empty()) {
throw std::runtime_error("No topics found. Abort");
Expand All @@ -77,45 +79,15 @@ void Rosbag2::record(

if (subscriptions_.empty()) {
throw std::runtime_error("No topics could be subscribed. Abort");
return;
}

RCUTILS_LOG_INFO_NAMED(ROS_PACKAGE_NAME, "Waiting for messages...");
ROSBAG2_LOG_INFO("Waiting for messages...");
while (rclcpp::ok()) {
rclcpp::spin(node);
}
subscriptions_.clear();
}

std::map<std::string, std::string> Rosbag2::get_topics_with_types(
const std::vector<std::string> & topic_names, std::shared_ptr<rclcpp::Node> node)
{
// TODO(Martin-Idel-SI): This is a short sleep to allow the node some time to discover the topic
// This should be replaced by an auto-discovery system in the future
std::this_thread::sleep_for(std::chrono::milliseconds(100));
auto topics = node->get_topic_names_and_types();

std::map<std::string, std::string> topic_names_and_types;
for (const auto & topic_name : topic_names) {
std::string complete_topic_name = topic_name;
if (topic_name[0] != '/') {
complete_topic_name = "/" + topic_name;
}
auto position = topics.find(complete_topic_name);
if (position != topics.end()) {
if (position->second.size() > 1) {
RCUTILS_LOG_ERROR_NAMED(
ROS_PACKAGE_NAME,
"Topic '%s' has several types associated. Only topics with one type are supported.",
position->first.c_str());
} else {
topic_names_and_types.insert({position->first, position->second[0]});
}
}
}
return topic_names_and_types;
}

std::shared_ptr<GenericSubscription>
Rosbag2::create_subscription(
const std::function<void(std::string)> & after_write_action,
Expand All @@ -133,9 +105,8 @@ Rosbag2::create_subscription(
rcutils_time_point_value_t time_stamp;
int error = rcutils_system_time_now(&time_stamp);
if (error != RCUTILS_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
ROS_PACKAGE_NAME,
"Error getting current time. Error: %s", rcutils_get_error_string_safe());
ROSBAG2_LOG_ERROR_STREAM(
"Error getting current time. Error:" << rcutils_get_error_string_safe());
}
bag_message->time_stamp = time_stamp;

Expand All @@ -161,7 +132,7 @@ void Rosbag2::play(const std::string & file_name)
// without the sleep_for() many messages are lost.
std::this_thread::sleep_for(std::chrono::milliseconds(50));
publishers_[message->topic_name]->publish(message->serialized_data);
RCUTILS_LOG_INFO_NAMED(ROS_PACKAGE_NAME, "published message");
ROSBAG2_LOG_INFO("Published message");
}
}
}
Expand Down
Loading

0 comments on commit 3b23acc

Please sign in to comment.