Skip to content

Commit

Permalink
Allow programmatic configuration of unicast relays. (#498)
Browse files Browse the repository at this point in the history
This change allows users to configure relays from code without having to
`setenv(GZ_RELAY)`.

---------

Signed-off-by: Michael Beardsworth <[email protected]>
Co-authored-by: Carlos Agüero <[email protected]>
  • Loading branch information
mbeards and caguero authored May 23, 2024
1 parent ba47741 commit 508d28b
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 20 deletions.
58 changes: 38 additions & 20 deletions include/gz/transport/Discovery.hh
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,42 @@ namespace gz
}
}

/// \brief Register a new relay address.
/// \param[in] _ip New IP address.
public: void AddRelayAddress(const std::string &_ip)
{
std::lock_guard<std::mutex> lock(this->mutex);
// Sanity check: Make sure that this IP address is not already saved.
for (auto const &addr : this->relayAddrs)
{
if (addr.sin_addr.s_addr == inet_addr(_ip.c_str()))
return;
}

sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr(_ip.c_str());
addr.sin_port = htons(static_cast<u_short>(this->port));

this->relayAddrs.push_back(addr);
}

// \brief Gets this instance's relay addresses.
// \return The list of relay addresses.
public: std::vector<std::string> RelayAddresses() const
{
std::vector<std::string> result;

std::lock_guard<std::mutex> lock(this->mutex);

for (auto const &addr : this->relayAddrs) {
result.push_back(inet_ntoa(addr.sin_addr));
}

return result;
}

/// \brief Broadcast periodic heartbeats.
private: void UpdateHeartbeat()
{
Expand Down Expand Up @@ -1254,6 +1290,8 @@ namespace gz
if (_msg.SerializeToArray(buffer + sizeof(msgSize), msgSize))
{
// Send the discovery message to the unicast relays.
std::lock_guard<std::mutex> lock(this->mutex);

for (const auto &sockAddr : this->relayAddrs)
{
errno = 0;
Expand Down Expand Up @@ -1420,26 +1458,6 @@ namespace gz
return true;
}

/// \brief Register a new relay address.
/// \param[in] _ip New IP address.
private: void AddRelayAddress(const std::string &_ip)
{
// Sanity check: Make sure that this IP address is not already saved.
for (auto const &addr : this->relayAddrs)
{
if (addr.sin_addr.s_addr == inet_addr(_ip.c_str()))
return;
}

sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr(_ip.c_str());
addr.sin_port = htons(static_cast<u_short>(this->port));

this->relayAddrs.push_back(addr);
}

/// \brief Default activity interval value (ms.).
/// \sa ActivityInterval.
/// \sa SetActivityInterval.
Expand Down
13 changes: 13 additions & 0 deletions include/gz/transport/Node.hh
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,19 @@ namespace gz
public: std::optional<TopicStatistics> TopicStats(
const std::string &_topic) const;

/// \brief Adds a unicast relay IP. All nodes in this process will send
/// UDP unicast traffic to the address to connect networks when UDP
/// multicast traffic is not forwarded.
/// It's also possible to use the environment variable GZ_RELAY to add
/// relays.
/// \param[in] _relayAddress IPv4 address of the relay to add.
public: void AddGlobalRelay(const std::string& _relayAddress);

/// \brief Gets the relay addresses configured for all nodes in this
/// process.
/// \return The relay addresses.
public: std::vector<std::string> GlobalRelays() const;

/// \brief Get a pointer to the shared node (singleton shared by all the
/// nodes).
/// \return The pointer to the shared node.
Expand Down
13 changes: 13 additions & 0 deletions include/gz/transport/NodeShared.hh
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,19 @@ namespace gz
public: std::optional<TopicStatistics> TopicStats(
const std::string &_topic) const;

/// \brief Adds a unicast relay IP. All nodes in this process will send
/// UDP unicast traffic to the address to connect networks when UDP
/// multicast traffic is not forwarded.
/// It's also possible to use the environment variable GZ_RELAY to add
/// relays.
/// \param[in] _relayAddress IPv4 address of the relay to add.
public: void AddGlobalRelay(const std::string& _relayAddress);

/// \brief Gets the relay addresses configured for all nodes in this
/// process.
/// \return The relay addresses.
public: std::vector<std::string> GlobalRelays() const;

/// \brief Constructor.
protected: NodeShared();

Expand Down
10 changes: 10 additions & 0 deletions src/Node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1139,3 +1139,13 @@ bool Node::RequestRaw(const std::string &_topic,
bool executed = this->Request(_topic, *req, _timeout, *res, _result);
return executed && res->SerializeToString(&_response);
}

/////////////////////////////////////////////////
void Node::AddGlobalRelay(const std::string& _relayAddress) {
Shared()->AddGlobalRelay(_relayAddress);
}

/////////////////////////////////////////////////
std::vector<std::string> Node::GlobalRelays() const {
return Shared()->GlobalRelays();
}
20 changes: 20 additions & 0 deletions src/NodeShared.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <iostream>
#include <map>
#include <mutex>
#include <set>
#include <shared_mutex> //NOLINT
#include <string>
#include <thread>
Expand Down Expand Up @@ -1941,3 +1942,22 @@ int NodeSharedPrivate::NonNegativeEnvVar(const std::string &_envVar,
}
return numVal;
}

void NodeShared::AddGlobalRelay(const std::string& _relayAddress) {
dataPtr->msgDiscovery->AddRelayAddress(_relayAddress);
dataPtr->srvDiscovery->AddRelayAddress(_relayAddress);
}

std::vector<std::string> NodeShared::GlobalRelays() const {
// Merge relays from message and service discovery. They should be identical
// since they're typically build from the same sources.
//
// This is confusing - do we want to add different handling here?
auto msgRelays = dataPtr->msgDiscovery->RelayAddresses();
std::set<std::string> msgRelaySet(msgRelays.cbegin(), msgRelays.cend());
auto srvRelays = dataPtr->srvDiscovery->RelayAddresses();
std::set<std::string> srvRelaySet(srvRelays.cbegin(), srvRelays.cend());
srvRelaySet.merge(msgRelaySet);

return std::vector<std::string>(srvRelaySet.cbegin(), srvRelaySet.cend());
}
25 changes: 25 additions & 0 deletions src/Node_TEST.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2341,6 +2341,31 @@ TEST(NodeTest, statistics)
EXPECT_EQ(std::nullopt, node.TopicStats("/test"));
}

//////////////////////////////////////////////////
/// \brief Test adding and querying relays
TEST(NodeTest, relay) {
transport::Node node;

// Make sure the relay we're about to add hasn't already been configured.
const std::vector<std::string> relaysBeforeAdd = node.GlobalRelays();
{
auto relaysBeforeIt = std::find_if(
relaysBeforeAdd.cbegin(), relaysBeforeAdd.cend(),
[](const std::string &relay) { return relay == "127.0.0.123"; });
ASSERT_EQ(relaysBeforeIt, relaysBeforeAdd.cend());
}
node.AddGlobalRelay("127.0.0.123");

const std::vector<std::string> relaysAfterAdd = node.GlobalRelays();
{
EXPECT_EQ(relaysAfterAdd.size(), relaysBeforeAdd.size() + 1);
auto relayIt = std::find_if(
relaysAfterAdd.cbegin(), relaysAfterAdd.cend(),
[](const std::string &relay) { return relay == "127.0.0.123"; });
EXPECT_NE(relayIt, relaysAfterAdd.cend());
}
}

//////////////////////////////////////////////////
int main(int argc, char **argv)
{
Expand Down

0 comments on commit 508d28b

Please sign in to comment.