Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[17283] Fix issues in Dynamic Network Interfaces (backport #5282) #5303

Merged
merged 1 commit into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions src/cpp/rtps/builtin/discovery/participant/PDP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
#endif // if HAVE_SECURITY
#include <utils/shared_mutex.hpp>
#include <utils/TimeConversion.hpp>
#include <rtps/writer/BaseWriter.hpp>
#include <rtps/reader/BaseReader.hpp>

namespace eprosima {
namespace fastdds {
Expand Down Expand Up @@ -1698,6 +1700,121 @@ void PDP::add_builtin_security_attributes(

#endif // HAVE_SECURITY

void PDP::local_participant_attributes_update_nts(
const RTPSParticipantAttributes& new_atts)
{
// Update user data
auto participant_data = getLocalParticipantProxyData();
participant_data->m_userData.data_vec(new_atts.userData);

// If we are intraprocess only, we do not need to update locators
bool announce_locators = !mp_RTPSParticipant->is_intraprocess_only();
if (announce_locators)
{
// Clear all locators
participant_data->metatraffic_locators.unicast.clear();
participant_data->metatraffic_locators.multicast.clear();
participant_data->default_locators.unicast.clear();
participant_data->default_locators.multicast.clear();

// Update default locators
for (const Locator_t& loc : new_atts.defaultUnicastLocatorList)
{
participant_data->default_locators.add_unicast_locator(loc);
}
for (const Locator_t& loc : new_atts.defaultMulticastLocatorList)
{
participant_data->default_locators.add_multicast_locator(loc);
}

// Update metatraffic locators
for (const auto& locator : new_atts.builtin.metatrafficUnicastLocatorList)
{
participant_data->metatraffic_locators.add_unicast_locator(locator);
}
if (!new_atts.builtin.avoid_builtin_multicast || participant_data->metatraffic_locators.unicast.empty())
{
for (const auto& locator : new_atts.builtin.metatrafficMulticastLocatorList)
{
participant_data->metatraffic_locators.add_multicast_locator(locator);
}
}

fastdds::rtps::network::external_locators::add_external_locators(*participant_data,
new_atts.builtin.metatraffic_external_unicast_locators,
new_atts.default_external_unicast_locators);
}
}

void PDP::update_endpoint_locators_if_default_nts(
const std::vector<BaseWriter*>& writers,
const std::vector<BaseReader*>& readers,
const RTPSParticipantAttributes& old_atts,
const RTPSParticipantAttributes& new_atts)
{
// Check if default locators have changed
const auto& old_default_unicast = old_atts.defaultUnicastLocatorList;
const auto& old_default_multicast = old_atts.defaultMulticastLocatorList;
const auto& new_default_unicast = new_atts.defaultUnicastLocatorList;
const auto& new_default_multicast = new_atts.defaultMulticastLocatorList;

// Early return if there is no change in default unicast locators
if ((old_default_unicast == new_default_unicast) &&
(old_default_multicast == new_default_multicast))
{
return;
}

// Update proxies of endpoints with default configured locators
EDP* edp = get_edp();
for (BaseWriter* writer : writers)
{
if ((old_default_multicast == writer->getAttributes().multicastLocatorList) &&
(old_default_unicast == writer->getAttributes().unicastLocatorList))
{
writer->getAttributes().multicastLocatorList = new_default_multicast;
writer->getAttributes().unicastLocatorList = new_default_unicast;

WriterProxyData* wdata = nullptr;
GUID_t participant_guid;
wdata = addWriterProxyData(writer->getGuid(), participant_guid,
[](WriterProxyData* proxy, bool is_update, const ParticipantProxyData& participant)
{
static_cast<void>(is_update);
assert(is_update);

proxy->set_locators(participant.default_locators);
return true;
});
assert(wdata != nullptr);
edp->process_writer_proxy_data(writer, wdata);
}
}
for (BaseReader* reader : readers)
{
if ((old_default_multicast == reader->getAttributes().multicastLocatorList) &&
(old_default_unicast == reader->getAttributes().unicastLocatorList))
{
reader->getAttributes().multicastLocatorList = new_default_multicast;
reader->getAttributes().unicastLocatorList = new_default_unicast;

ReaderProxyData* rdata = nullptr;
GUID_t participant_guid;
rdata = addReaderProxyData(reader->getGuid(), participant_guid,
[](ReaderProxyData* proxy, bool is_update, const ParticipantProxyData& participant)
{
static_cast<void>(is_update);
assert(is_update);

proxy->set_locators(participant.default_locators);
return true;
});
assert(rdata != nullptr);
edp->process_reader_proxy_data(reader, rdata);
}
}
}

} /* namespace rtps */
} /* namespace fastdds */
} /* namespace eprosima */
38 changes: 27 additions & 11 deletions src/cpp/rtps/builtin/discovery/participant/PDP.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,27 @@
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#include <atomic>
#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
#include <string>
#include <vector>

#include <fastcdr/cdr/fixed_size_string.hpp>

#include <fastdds/dds/core/Time_t.hpp>
#include <fastdds/dds/core/policy/ParameterTypes.hpp>
#include <fastdds/dds/core/policy/QosPolicies.hpp>
#include <fastdds/rtps/attributes/ReaderAttributes.hpp>
#include <fastdds/rtps/attributes/RTPSParticipantAttributes.hpp>
#include <fastdds/rtps/attributes/WriterAttributes.hpp>
#include <fastdds/rtps/common/CDRMessage_t.hpp>
#include <fastdds/rtps/common/Guid.hpp>
#include <fastdds/rtps/common/GuidPrefix_t.hpp>
#include <fastdds/rtps/common/InstanceHandle.hpp>
#include <fastdds/rtps/common/LocatorList.hpp>
#include <fastdds/rtps/common/Types.hpp>
#include <fastdds/rtps/common/WriteParams.hpp>
#include <fastdds/rtps/history/IPayloadPool.hpp>
#include <fastdds/rtps/participant/ParticipantDiscoveryInfo.hpp>
Expand Down Expand Up @@ -65,19 +79,11 @@ class TypeIdentifier;

namespace rtps {

class PDPServerListener;
class PDPEndpoints;

} // namespace rtps
} // namespace fastdds

namespace fastdds {
namespace rtps {

class RTPSWriter;
class RTPSReader;
class BaseWriter;
class BaseReader;
class WriterHistory;
class ReaderHistory;
struct RTPSParticipantAllocationAttributes;
class RTPSParticipantImpl;
class RTPSParticipantListener;
class BuiltinProtocols;
Expand All @@ -87,6 +93,7 @@ class ReaderProxyData;
class WriterProxyData;
class ParticipantProxyData;
class ReaderListener;
class PDPEndpoints;
class PDPListener;
class PDPServerListener;
class ITopicPayloadPool;
Expand Down Expand Up @@ -484,6 +491,15 @@ class PDP : public fastdds::statistics::rtps::IProxyQueryable

#endif // FASTDDS_STATISTICS

virtual void local_participant_attributes_update_nts(
const RTPSParticipantAttributes& new_atts);

virtual void update_endpoint_locators_if_default_nts(
const std::vector<BaseWriter*>& writers,
const std::vector<BaseReader*>& readers,
const RTPSParticipantAttributes& old_atts,
const RTPSParticipantAttributes& new_atts);

protected:

//!Pointer to the builtin protocols object.
Expand Down
23 changes: 6 additions & 17 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1445,6 +1445,7 @@ void RTPSParticipantImpl::update_attributes(
if (internal_metatraffic_locators_)
{
LocatorList_t metatraffic_unicast_locator_list = temp_atts.builtin.metatrafficUnicastLocatorList;
temp_atts.builtin.metatrafficUnicastLocatorList.clear();
get_default_metatraffic_locators(temp_atts);
if (!(metatraffic_unicast_locator_list == temp_atts.builtin.metatrafficUnicastLocatorList))
{
Expand All @@ -1455,6 +1456,7 @@ void RTPSParticipantImpl::update_attributes(
if (internal_default_locators_)
{
LocatorList_t default_unicast_locator_list = temp_atts.defaultUnicastLocatorList;
temp_atts.defaultUnicastLocatorList.clear();
get_default_unicast_locators(temp_atts);
if (!(default_unicast_locator_list == temp_atts.defaultUnicastLocatorList))
{
Expand Down Expand Up @@ -1529,25 +1531,12 @@ void RTPSParticipantImpl::update_attributes(

{
std::lock_guard<std::recursive_mutex> lock(*pdp->getMutex());
pdp->local_participant_attributes_update_nts(temp_atts);

// Update user data
auto local_participant_proxy_data = pdp->getLocalParticipantProxyData();
local_participant_proxy_data->m_userData.data_vec(temp_atts.userData);

// Update metatraffic locators
for (auto locator : temp_atts.builtin.metatrafficMulticastLocatorList)
{
local_participant_proxy_data->metatraffic_locators.add_multicast_locator(locator);
}
for (auto locator : temp_atts.builtin.metatrafficUnicastLocatorList)
{
local_participant_proxy_data->metatraffic_locators.add_unicast_locator(locator);
}

// Update default locators
for (auto locator : temp_atts.defaultUnicastLocatorList)
if (local_interfaces_changed && internal_default_locators_)
{
local_participant_proxy_data->default_locators.add_unicast_locator(locator);
std::lock_guard<shared_mutex> _(endpoints_list_mutex);
pdp->update_endpoint_locators_if_default_nts(m_userWriterList, m_userReaderList, m_att, temp_atts);
}

if (local_interfaces_changed)
Expand Down
4 changes: 4 additions & 0 deletions test/dds/communication/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -315,3 +315,7 @@ if(Python3_Interpreter_FOUND)
endif()

endif()

if(UNIX AND NOT(APPLE) AND NOT(QNXNTO) AND NOT(ANDROID))
add_subdirectory(dyn_network)
endif()
6 changes: 3 additions & 3 deletions test/dds/communication/PubSubMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ void publisher_run(
publisher->wait_discovery(wait);
}

publisher->run(samples, loops, interval);
publisher->run(samples, 0, loops, interval);
}

int main(
Expand Down Expand Up @@ -196,7 +196,7 @@ int main(
DomainParticipantFactory::get_instance()->load_XML_profiles_file(xml_file);
}

SubscriberModule subscriber(publishers, samples, fixed_type, zero_copy);
SubscriberModule subscriber(publishers, samples, fixed_type, zero_copy, false, false);
PublisherModule publisher(exit_on_lost_liveliness, fixed_type, zero_copy);

uint32_t result = 1;
Expand All @@ -207,7 +207,7 @@ int main(

if (subscriber.init(seed, magic))
{
result = subscriber.run(notexit, timeout) ? 0 : -1;
result = subscriber.run(notexit, 0, timeout) ? 0 : -1;
}

publisher_thread.join();
Expand Down
28 changes: 26 additions & 2 deletions test/dds/communication/PublisherMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ using namespace eprosima::fastdds::dds;
* --seed <int>
* --wait <int>
* --samples <int>
* --loops <int>
* --interval <int>
* --magic <str>
* --xmlfile <path>
* --interval <int>
* --rescan <int>
*/

int main(
Expand All @@ -46,7 +48,9 @@ int main(
uint32_t wait = 0;
char* xml_file = nullptr;
uint32_t samples = 4;
uint32_t loops = 0;
uint32_t interval = 250;
uint32_t rescan_interval_seconds = 0;
std::string magic;

while (arg_count < argc)
Expand Down Expand Up @@ -93,6 +97,16 @@ int main(

samples = strtol(argv[arg_count], nullptr, 10);
}
else if (strcmp(argv[arg_count], "--loops") == 0)
{
if (++arg_count >= argc)
{
std::cout << "--loops expects a parameter" << std::endl;
return -1;
}

loops = strtol(argv[arg_count], nullptr, 10);
}
else if (strcmp(argv[arg_count], "--interval") == 0)
{
if (++arg_count >= argc)
Expand Down Expand Up @@ -123,6 +137,16 @@ int main(

xml_file = argv[arg_count];
}
else if (strcmp(argv[arg_count], "--rescan") == 0)
{
if (++arg_count >= argc)
{
std::cout << "--rescan expects a parameter" << std::endl;
return -1;
}

rescan_interval_seconds = strtol(argv[arg_count], nullptr, 10);
}
else
{
std::cout << "Wrong argument " << argv[arg_count] << std::endl;
Expand All @@ -146,7 +170,7 @@ int main(
publisher.wait_discovery(wait);
}

publisher.run(samples, 0, interval);
publisher.run(samples, rescan_interval_seconds, loops, interval);
return 0;
}

Expand Down
20 changes: 20 additions & 0 deletions test/dds/communication/PublisherModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,30 @@ void PublisherModule::wait_discovery(

void PublisherModule::run(
uint32_t samples,
const uint32_t rescan_interval,
const uint32_t loops,
uint32_t interval)
{
uint32_t current_loop = 0;
uint16_t index = 1;
void* sample = nullptr;

std::thread net_rescan_thread([this, rescan_interval]()
{
if (rescan_interval > 0)
{
auto interval = std::chrono::seconds(rescan_interval);
while (run_)
{
std::this_thread::sleep_for(interval);
if (run_)
{
participant_->set_qos(participant_->get_qos());
}
}
}
});

while (run_ && (loops == 0 || loops > current_loop))
{
if (zero_copy_)
Expand Down Expand Up @@ -187,6 +204,9 @@ void PublisherModule::run(

std::this_thread::sleep_for(std::chrono::milliseconds(interval));
}

run_ = false;
net_rescan_thread.join();
}

void PublisherModule::on_publication_matched(
Expand Down
Loading
Loading