Skip to content

Commit

Permalink
Callback not reached, using callback group and Fast-RTPS
Browse files Browse the repository at this point in the history
  ros2/rclcpp#1611

Signed-off-by: Tomoya Fujita <[email protected]>
  • Loading branch information
fujitatomoya committed Apr 18, 2021
1 parent 367d4c8 commit 6904c1f
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 0 deletions.
3 changes: 3 additions & 0 deletions prover_rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ custom_executable(rclcpp_1566)
#custom_executable(rclcpp_1580)
custom_executable(rclcpp_1585)
#custom_executable(rclcpp_1597)
custom_executable(rclcpp_1611_client)
custom_executable(rclcpp_1611_publisher)
custom_executable(rclcpp_1611_server)

#custom_executable(ros2_644)
#custom_executable(ros2_946_pub)
Expand Down
94 changes: 94 additions & 0 deletions prover_rclcpp/src/rclcpp_1611_client.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#include <chrono>
#include <functional>
#include <memory>
#include <string>

#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/bool.hpp>
#include <std_srvs/srv/trigger.hpp>

class CallbackGroupSrvClient: public rclcpp::Node {
public:
// Constructor
CallbackGroupSrvClient()
: Node("callback_group_srv_client"){

// Init callback group
callback_group_ = this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive, false);
callback_group_executor_thread_ = std::thread([this]() {
callback_group_executor_.add_callback_group(callback_group_, this->get_node_base_interface());
callback_group_executor_.spin();
});

// Init subscription
subscription_ = this->create_subscription<std_msgs::msg::Bool>(
"topic_bool",
rclcpp::QoS(1),
std::bind(
&CallbackGroupSrvClient::sub_callback,
this,
std::placeholders::_1
)
);

// Create service client and link it to callback group
srv_client_ = this->create_client<std_srvs::srv::Trigger>(
"trigger_srv",
rmw_qos_profile_services_default,
callback_group_
);
}

// Destructor
~CallbackGroupSrvClient(){
callback_group_executor_.cancel();
callback_group_executor_thread_.join();
}

private:
// Methods
void sub_callback(const std_msgs::msg::Bool::SharedPtr ){
RCLCPP_INFO_STREAM(this->get_logger(), "Received new msg! ");

// Send request and wait for future
auto req = std::make_shared<std_srvs::srv::Trigger::Request>();
auto future = srv_client_->async_send_request(req);
RCLCPP_INFO_STREAM(this->get_logger(), "Sending request");
auto future_status = future.wait_for(std::chrono::seconds(10));

// Process result
if (future_status == std::future_status::ready) {
if (future.get()->success) {
RCLCPP_INFO_STREAM(this->get_logger(), "Srv suceeded");
}
else {
RCLCPP_INFO_STREAM(this->get_logger(), "Srv failed");
}
}
else if (future_status == std::future_status::timeout) {
RCLCPP_INFO_STREAM(this->get_logger(), "Response not received within timeout");
}
else if (future_status == std::future_status::deferred) {
RCLCPP_INFO_STREAM(this->get_logger(), "Srv deferred");
}

return;
}

// Attributes
rclcpp::Subscription<std_msgs::msg::Bool>::SharedPtr subscription_;
rclcpp::CallbackGroup::SharedPtr callback_group_;
rclcpp::executors::SingleThreadedExecutor callback_group_executor_;
std::thread callback_group_executor_thread_;
rclcpp::Client<std_srvs::srv::Trigger>::SharedPtr srv_client_;
};



int main(int argc, char * argv[]){
rclcpp::init(argc, argv);
auto client = std::make_shared<CallbackGroupSrvClient>();
rclcpp::spin(client);
rclcpp::shutdown();
return 0;
}
46 changes: 46 additions & 0 deletions prover_rclcpp/src/rclcpp_1611_publisher.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#include <chrono>
#include <functional>
#include <memory>
#include <string>

#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/bool.hpp>


class MinimalPublisher: public rclcpp::Node {
public:
// constructor
MinimalPublisher()
: Node("minimal_publisher"){
publisher_ = this->create_publisher<std_msgs::msg::Bool>("topic_bool", rclcpp::QoS(1));
timer_ = this->create_wall_timer(
std::chrono::seconds(5),
std::bind(
&MinimalPublisher::timer_callback,
this
)
);
}

private:
// method
void timer_callback(){
std_msgs::msg::Bool msg;
msg.data = false;
publisher_->publish(msg);
RCLCPP_INFO_STREAM(this->get_logger(), "Publishing data! ");
return;
}

// attributes
rclcpp::TimerBase::SharedPtr timer_;
rclcpp::Publisher<std_msgs::msg::Bool>::SharedPtr publisher_;
};

int main(int argc, char * argv[]){
rclcpp::init(argc, argv);
auto pub = std::make_shared<MinimalPublisher>();
rclcpp::spin(pub);
rclcpp::shutdown();
return 0;
}
92 changes: 92 additions & 0 deletions prover_rclcpp/src/rclcpp_1611_server.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#include <chrono>
#include <functional>
#include <memory>
#include <string>

#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/bool.hpp>
#include <std_srvs/srv/trigger.hpp>


class CallbackGroupSrvServer: public rclcpp::Node {
public:
// Constructor
CallbackGroupSrvServer()
: Node("callback_group_srv_server"){
// Init callback group and spin it
callback_group_ = this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive, false);
//callback_group_ = this->create_callback_group(rclcpp::CallbackGroupType::Reentrant, false);
callback_group_executor_thread_ = std::thread([this]() {
callback_group_executor_.add_callback_group(callback_group_, this->get_node_base_interface());
callback_group_executor_.spin();
});

// Init service server and link it to callback group
service_ = this->create_service<std_srvs::srv::Trigger>(
"trigger_srv",
std::bind(
&CallbackGroupSrvServer::srv_callback,
this,
std::placeholders::_1,
std::placeholders::_2)
);
}

// Destructor
~CallbackGroupSrvServer(){
callback_group_executor_.cancel();
callback_group_executor_thread_.join();
}

private:
// Methods
void sub_callback(const std_msgs::msg::Bool::SharedPtr ){
new_msg_ = true;
RCLCPP_INFO_STREAM(this->get_logger(), "Received new msg!");
return;
}

void srv_callback(const std_srvs::srv::Trigger::Request::SharedPtr, std_srvs::srv::Trigger::Response::SharedPtr res){
RCLCPP_INFO_STREAM(this->get_logger(), "Received request");
new_msg_ = false;

// Create subscription
auto sub_options_ = rclcpp::SubscriptionOptions();
sub_options_.callback_group = callback_group_;
auto subscription = this->create_subscription<std_msgs::msg::Bool>(
"topic_bool",
rclcpp::QoS(1),
std::bind(
&CallbackGroupSrvServer::sub_callback,
this,
std::placeholders::_1),
sub_options_
);

// Wait for new_msg
while (rclcpp::ok()
&& new_msg_ == false) {
rclcpp::sleep_for(std::chrono::seconds(1));
RCLCPP_INFO_STREAM(this->get_logger(), "Sleep...");
}

res->message = "";
res->success = true;
return;
}

// Attributes
rclcpp::Service<std_srvs::srv::Trigger>::SharedPtr service_;
rclcpp::CallbackGroup::SharedPtr callback_group_;
rclcpp::executors::SingleThreadedExecutor callback_group_executor_;
std::thread callback_group_executor_thread_;
bool new_msg_;
};

int main(int argc, char * argv[]){
rclcpp::init(argc, argv);
auto srv_server = std::make_shared<CallbackGroupSrvServer>();
rclcpp::spin(srv_server);
rclcpp::shutdown();
return 0;
}

0 comments on commit 6904c1f

Please sign in to comment.