Skip to content

Commit

Permalink
Merge pull request #172 from fmauch/max_num_tries
Browse files Browse the repository at this point in the history
Add support for setting socket max num tries and reconnect timeout
  • Loading branch information
fmauch authored Sep 5, 2023
2 parents 7626fe5 + 174e0b5 commit b90a3ec
Show file tree
Hide file tree
Showing 15 changed files with 211 additions and 37 deletions.
19 changes: 16 additions & 3 deletions include/ur_client_library/comm/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,13 @@ class IProducer
public:
/*!
* \brief Set-up functionality of the producers.
*
* \param max_num_tries Maximum number of connection attempts before counting the connection as
* failed. Unlimited number of attempts when set to 0.
* \param reconnection_time time in between connection attempts to the server
*/
virtual void setupProducer()
virtual void setupProducer(const size_t max_num_tries = 0,
const std::chrono::milliseconds reconnection_time = std::chrono::seconds(10))
{
}
/*!
Expand Down Expand Up @@ -287,9 +292,17 @@ class Pipeline
stop();
}

void init()
/*!
* \brief Initialize the pipeline. Internally calls setup of producer and consumer.
*
* \param max_num_tries Maximum number of connection attempts before counting the connection as
* failed. Unlimited number of attempts when set to 0.
* \param reconnection_time time in between connection attempts to the server
*/
void init(const size_t max_num_tries = 0,
const std::chrono::milliseconds reconnection_time = std::chrono::seconds(10))
{
producer_.setupProducer();
producer_.setupProducer(max_num_tries, reconnection_time);
if (consumer_ != nullptr)
consumer_->setupConsumer();
}
Expand Down
9 changes: 7 additions & 2 deletions include/ur_client_library/comm/producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,19 @@ class URProducer : public IProducer<T>

/*!
* \brief Triggers the stream to connect to the robot.
*
* \param max_num_tries Maximum number of connection attempts before counting the connection as
* failed. Unlimited number of attempts when set to 0.
* \param reconnection_time time in between connection attempts to the server
*/
void setupProducer() override
void setupProducer(const size_t max_num_tries = 0,
const std::chrono::milliseconds reconnection_time = std::chrono::seconds(10)) override
{
timeval tv;
tv.tv_sec = 1;
tv.tv_usec = 0;
stream_.setReceiveTimeout(tv);
if (!stream_.connect())
if (!stream_.connect(max_num_tries, reconnection_time))
{
throw UrException("Failed to connect to robot. Please check if the robot is booted and connected.");
}
Expand Down
10 changes: 8 additions & 2 deletions include/ur_client_library/comm/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <sys/socket.h>
#include <sys/types.h>
#include <atomic>
#include <chrono>
#include <mutex>
#include <string>
#include "ur_client_library/log.h"
Expand Down Expand Up @@ -56,11 +57,16 @@ class URStream : public TCPSocket
/*!
* \brief Connects to the configured socket.
*
* \param max_num_tries Maximum number of connection attempts before counting the connection as
* failed. Unlimited number of attempts when set to 0.
* \param reconnection_time time in between connection attempts to the server
*
* \returns True on success, false if connection could not be established
*/
bool connect()
bool connect(const size_t max_num_tries = 0,
const std::chrono::milliseconds reconnection_time = std::chrono::seconds(10))
{
return TCPSocket::setup(host_, port_);
return TCPSocket::setup(host_, port_, max_num_tries, reconnection_time);
}

/*!
Expand Down
14 changes: 8 additions & 6 deletions include/ur_client_library/comm/tcp_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <sys/socket.h>
#include <sys/types.h>
#include <atomic>
#include <chrono>
#include <mutex>
#include <string>
#include <memory>
Expand Down Expand Up @@ -50,7 +51,8 @@ class TCPSocket
private:
std::atomic<int> socket_fd_;
std::atomic<SocketState> state_;
std::chrono::seconds reconnection_time_;
std::chrono::milliseconds reconnection_time_;
bool reconnection_time_modified_deprecated_ = false;

void setupOptions();

Expand All @@ -60,11 +62,13 @@ class TCPSocket
return ::connect(socket_fd, address, address_len) == 0;
}

bool setup(std::string& host, int port);
bool setup(const std::string& host, const int port, const size_t max_num_tries = 0,
const std::chrono::milliseconds reconnection_time = DEFAULT_RECONNECTION_TIME);

std::unique_ptr<timeval> recv_timeout_;

public:
static constexpr std::chrono::milliseconds DEFAULT_RECONNECTION_TIME{ 10000 };
/*!
* \brief Creates a TCPSocket object
*/
Expand Down Expand Up @@ -147,10 +151,8 @@ class TCPSocket
*
* \param reconnection_time time in between connection attempts to the server
*/
void setReconnectionTime(std::chrono::seconds reconnection_time)
{
reconnection_time_ = reconnection_time;
}
[[deprecated("Reconnection time is passed to setup directly now.")]] void
setReconnectionTime(const std::chrono::milliseconds reconnection_time);
};
} // namespace comm
} // namespace urcl
10 changes: 8 additions & 2 deletions include/ur_client_library/rtde/rtde_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,14 @@ class RTDEClient
* \brief Sets up RTDE communication with the robot. The handshake includes negotiation of the
* used protocol version and setting of input and output recipes.
*
* \param max_num_tries Maximum number of connection attempts before counting the connection as
* failed. Unlimited number of attempts when set to 0.
* \param reconnection_time time in between connection attempts to the server
*
* \returns Success of the handshake
*/
bool init();
bool init(const size_t max_num_tries = 0,
const std::chrono::milliseconds reconnection_time = std::chrono::seconds(10));
/*!
* \brief Triggers the robot to start sending RTDE data packages in the negotiated format.
*
Expand Down Expand Up @@ -228,7 +233,8 @@ class RTDEClient
// the robot is booted.
std::vector<std::string> ensureTimestampIsPresent(const std::vector<std::string>& output_recipe) const;

void setupCommunication();
void setupCommunication(const size_t max_num_tries = 0,
const std::chrono::milliseconds reconnection_time = std::chrono::seconds(10));
bool negotiateProtocolVersion(const uint16_t protocol_version);
void queryURControlVersion();
void setupOutputs(const uint16_t protocol_version);
Expand Down
9 changes: 7 additions & 2 deletions include/ur_client_library/ur/dashboard_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,19 @@ class DashboardClient : public comm::TCPSocket
DashboardClient() = delete;
virtual ~DashboardClient() = default;

const int DASHBOARD_SERVER_PORT = 29999;
static constexpr int DASHBOARD_SERVER_PORT = 29999;

/*!
* \brief Opens a connection to the dashboard server on the host as specified in the constructor.
*
* \param max_num_tries Maximum number of connection attempts before counting the connection as
* failed. Unlimited number of attempts when set to 0.
* \param reconnection_time time in between connection attempts to the server
*
* \returns True on successful connection, false otherwise.
*/
bool connect();
bool connect(const size_t max_num_tries = 0,
const std::chrono::milliseconds reconnection_time = std::chrono::seconds(10));

/*!
* \brief Makes sure no connection to the dashboard server is held inside the object.
Expand Down
37 changes: 34 additions & 3 deletions src/comm/tcp_socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <endian.h>
#include <netinet/tcp.h>
#include <unistd.h>
#include <chrono>
#include <cstring>
#include <sstream>
#include <thread>
Expand Down Expand Up @@ -55,8 +56,19 @@ void TCPSocket::setupOptions()
}
}

bool TCPSocket::setup(std::string& host, int port)
bool TCPSocket::setup(const std::string& host, const int port, const size_t max_num_tries,
const std::chrono::milliseconds reconnection_time)
{
// This can be removed once we remove the setReconnectionTime() method
auto reconnection_time_resolved = reconnection_time;
if (reconnection_time_modified_deprecated_)
{
URCL_LOG_WARN("TCPSocket::setup(): Reconnection time was modified using `setReconnectionTime()` which is "
"deprecated. Please change your code to set reconnection_time through the `setup()` method "
"directly. The value passed to this function will be ignored.");
reconnection_time_resolved = reconnection_time_;
}

if (state_ == SocketState::Connected)
return false;

Expand All @@ -74,6 +86,7 @@ bool TCPSocket::setup(std::string& host, int port)
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;

size_t connect_counter = 0;
bool connected = false;
while (!connected)
{
Expand All @@ -96,15 +109,25 @@ bool TCPSocket::setup(std::string& host, int port)

freeaddrinfo(result);

if (max_num_tries > 0)
{
if (connect_counter++ >= max_num_tries)
{
URCL_LOG_ERROR("Failed to establish connection for %s:%d after %d tries", host.c_str(), port, max_num_tries);
state_ = SocketState::Invalid;
return false;
}
}

if (!connected)
{
state_ = SocketState::Invalid;
std::stringstream ss;
ss << "Failed to connect to robot on IP " << host_name
<< ". Please check that the robot is booted and reachable on " << host_name << ". Retrying in "
<< reconnection_time_.count() << " seconds";
<< std::chrono::duration_cast<std::chrono::duration<float>>(reconnection_time_resolved).count() << " seconds";
URCL_LOG_ERROR("%s", ss.str().c_str());
std::this_thread::sleep_for(reconnection_time_);
std::this_thread::sleep_for(reconnection_time_resolved);
}
}
setupOptions();
Expand Down Expand Up @@ -210,5 +233,13 @@ void TCPSocket::setReceiveTimeout(const timeval& timeout)
}
}

void TCPSocket::setReconnectionTime(const std::chrono::milliseconds reconnection_time)
{
URCL_LOG_ERROR("Calling setReconnectionTime is deprecated. Reconnection timeout is passed to the setup method "
"directly.");
reconnection_time_ = reconnection_time;
reconnection_time_modified_deprecated_ = true;
}

} // namespace comm
} // namespace urcl
8 changes: 4 additions & 4 deletions src/rtde/rtde_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ RTDEClient::~RTDEClient()
disconnect();
}

bool RTDEClient::init()
bool RTDEClient::init(const size_t max_num_tries, const std::chrono::milliseconds reconnection_time)
{
if (client_state_ > ClientState::UNINITIALIZED)
{
Expand All @@ -79,7 +79,7 @@ bool RTDEClient::init()
unsigned int attempts = 0;
while (attempts < MAX_INITIALIZE_ATTEMPTS)
{
setupCommunication();
setupCommunication(max_num_tries, reconnection_time);
if (client_state_ == ClientState::INITIALIZED)
return true;

Expand All @@ -92,11 +92,11 @@ bool RTDEClient::init()
throw UrException(ss.str());
}

void RTDEClient::setupCommunication()
void RTDEClient::setupCommunication(const size_t max_num_tries, const std::chrono::milliseconds reconnection_time)
{
client_state_ = ClientState::INITIALIZING;
// A running pipeline is needed inside setup
pipeline_.init();
pipeline_.init(max_num_tries, reconnection_time);
pipeline_.run();

uint16_t protocol_version = MAX_RTDE_PROTOCOL_VERSION;
Expand Down
8 changes: 6 additions & 2 deletions src/ur/dashboard_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ void DashboardClient::rtrim(std::string& str, const std::string& chars)
str.erase(str.find_last_not_of(chars) + 1);
}

bool DashboardClient::connect()
bool DashboardClient::connect(const size_t max_num_tries, const std::chrono::milliseconds reconnection_time)
{
if (getState() == comm::SocketState::Connected)
{
Expand All @@ -67,11 +67,15 @@ bool DashboardClient::connect()
TCPSocket::setReceiveTimeout(tv);
try
{
if (TCPSocket::setup(host_, port_))
if (TCPSocket::setup(host_, port_, max_num_tries, reconnection_time))
{
URCL_LOG_INFO("%s", read().c_str());
ret_val = true;
}
else
{
return false;
}
}
catch (const TimeoutException&)
{
Expand Down
16 changes: 16 additions & 0 deletions tests/test_dashboard_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <ur_client_library/exceptions.h>
#include <chrono>
#include <thread>
#include "ur_client_library/comm/tcp_socket.h"
#include "ur_client_library/ur/version_information.h"
#define private public
#include <ur_client_library/ur/dashboard_client.h>
Expand Down Expand Up @@ -203,6 +204,21 @@ TEST_F(DashboardClientTest, set_receive_timeout)
}
}

TEST_F(DashboardClientTest, connect_non_running_robot)
{
std::unique_ptr<DashboardClient> dashboard_client;
// We use an IP address on the integration_test's subnet
dashboard_client.reset(new DashboardClient("192.168.56.123"));
auto start = std::chrono::system_clock::now();
EXPECT_FALSE(dashboard_client->connect(2, std::chrono::milliseconds(500)));
auto end = std::chrono::system_clock::now();
auto elapsed = end - start;
// This is only a rough estimate, obviously.
// Since this isn't done on the loopback device, trying to open a socket on a non-existing address
// takes considerably longer.
EXPECT_LT(elapsed, 2 * comm::TCPSocket::DEFAULT_RECONNECTION_TIME);
}

int main(int argc, char* argv[])
{
::testing::InitGoogleTest(&argc, argv);
Expand Down
16 changes: 16 additions & 0 deletions tests/test_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,22 @@ TEST_F(PipelineTest, consumer_pipeline)
pipeline_->stop();
}

TEST_F(PipelineTest, connect_non_connected_robot)
{
stream_.reset(new comm::URStream<rtde_interface::RTDEPackage>("127.0.0.1", 12321));
producer_.reset(new comm::URProducer<rtde_interface::RTDEPackage>(*stream_.get(), *parser_.get()));
TestConsumer consumer;
pipeline_.reset(
new comm::Pipeline<rtde_interface::RTDEPackage>(*producer_.get(), &consumer, "RTDE_PIPELINE", notifier_));

auto start = std::chrono::system_clock::now();
EXPECT_THROW(pipeline_->init(2, std::chrono::milliseconds(500)), UrException);
auto end = std::chrono::system_clock::now();
auto elapsed = end - start;
// This is only a rough estimate, obviously
EXPECT_LT(elapsed, std::chrono::milliseconds(1500));
}

int main(int argc, char* argv[])
{
::testing::InitGoogleTest(&argc, argv);
Expand Down
Loading

0 comments on commit b90a3ec

Please sign in to comment.