Skip to content

Commit

Permalink
Refs #2428. Improve onParticipantDiscovery
Browse files Browse the repository at this point in the history
  • Loading branch information
richiware committed Dec 19, 2017
1 parent 672bcf0 commit adb0014
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 44 deletions.
92 changes: 48 additions & 44 deletions src/cpp/rtps/builtin/discovery/participant/PDPSimpleListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,60 +133,64 @@ void PDPSimpleListener::onNewCacheChangeAdded(RTPSReader* reader, const CacheCha
{
GUID_t guid;
iHandle2GUID(guid,change->instanceHandle);
this->mp_SPDP->removeRemoteParticipant(guid);
RTPSParticipantDiscoveryInfo info;
info.m_status = REMOVED_RTPSPARTICIPANT;
info.m_guid = guid;
if(this->mp_SPDP->getRTPSParticipant()->getListener()!=nullptr)
this->mp_SPDP->getRTPSParticipant()->getListener()->onRTPSParticipantDiscovery(
this->mp_SPDP->getRTPSParticipant()->getUserRTPSParticipant(),
info);
if(this->mp_SPDP->removeRemoteParticipant(guid))
{
if(this->mp_SPDP->getRTPSParticipant()->getListener()!=nullptr)
{
RTPSParticipantDiscoveryInfo info;
info.m_status = REMOVED_RTPSPARTICIPANT;
info.m_guid = guid;
this->mp_SPDP->getRTPSParticipant()->getListener()->onRTPSParticipantDiscovery(
this->mp_SPDP->getRTPSParticipant()->getUserRTPSParticipant(),
info);
}
}
}

//Remove change form history.
this->mp_SPDP->mp_SPDPReaderHistory->remove_change(change);

return;
return;
}

bool PDPSimpleListener::getKey(CacheChange_t* change)
{
SerializedPayload_t* pl = &change->serializedPayload;
CDRMessage::initCDRMsg(&aux_msg);
SerializedPayload_t* pl = &change->serializedPayload;
CDRMessage::initCDRMsg(&aux_msg);
// TODO CHange because it create a buffer to remove after.
free(aux_msg.buffer);
aux_msg.buffer = pl->data;
aux_msg.length = pl->length;
aux_msg.max_size = pl->max_size;
aux_msg.msg_endian = pl->encapsulation == PL_CDR_BE ? BIGEND : LITTLEEND;
bool valid = false;
uint16_t pid;
uint16_t plength;
while(aux_msg.pos < aux_msg.length)
{
valid = true;
valid&=CDRMessage::readUInt16(&aux_msg,(uint16_t*)&pid);
valid&=CDRMessage::readUInt16(&aux_msg,&plength);
if(pid == PID_SENTINEL)
{
break;
}
if(pid == PID_PARTICIPANT_GUID)
{
valid &= CDRMessage::readData(&aux_msg,change->instanceHandle.value,16);
aux_msg.buffer = nullptr;
return true;
}
if(pid == PID_KEY_HASH)
{
valid &= CDRMessage::readData(&aux_msg,change->instanceHandle.value,16);
aux_msg.buffer = nullptr;
return true;
}
aux_msg.pos+=plength;
}
aux_msg.buffer = nullptr;
return false;
aux_msg.buffer = pl->data;
aux_msg.length = pl->length;
aux_msg.max_size = pl->max_size;
aux_msg.msg_endian = pl->encapsulation == PL_CDR_BE ? BIGEND : LITTLEEND;
bool valid = false;
uint16_t pid;
uint16_t plength;
while(aux_msg.pos < aux_msg.length)
{
valid = true;
valid&=CDRMessage::readUInt16(&aux_msg,(uint16_t*)&pid);
valid&=CDRMessage::readUInt16(&aux_msg,&plength);
if(pid == PID_SENTINEL)
{
break;
}
if(pid == PID_PARTICIPANT_GUID)
{
valid &= CDRMessage::readData(&aux_msg,change->instanceHandle.value,16);
aux_msg.buffer = nullptr;
return true;
}
if(pid == PID_KEY_HASH)
{
valid &= CDRMessage::readData(&aux_msg,change->instanceHandle.value,16);
aux_msg.buffer = nullptr;
return true;
}
aux_msg.pos+=plength;
}
aux_msg.buffer = nullptr;
return false;
}


Expand Down
40 changes: 40 additions & 0 deletions test/blackbox/BlackboxTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1740,6 +1740,46 @@ BLACKBOXTEST(BlackBox, PubSubAsReliableHelloworldUserData)
ASSERT_TRUE(reader.getDiscoveryResult());
}

BLACKBOXTEST(BlackBox, PubSubAsReliableHelloworldParticipantDiscovery)
{
PubSubReader<HelloWorldType> reader(TEST_TOPIC_NAME);
PubSubWriter<HelloWorldType> writer(TEST_TOPIC_NAME);

GUID_t participant_guid;
reader.setOnDiscoveryFunction([&participant_guid](const ParticipantDiscoveryInfo& info) -> bool{
if(info.rtps.m_status == DISCOVERED_RTPSPARTICIPANT)
{
std::cout << "Discovered participant " << info.rtps.m_guid << std::endl;
participant_guid = info.rtps.m_guid;
}
else if(info.rtps.m_status == REMOVED_RTPSPARTICIPANT)
{
std::cout << "Removed participant " << info.rtps.m_guid << std::endl;
return participant_guid == info.rtps.m_guid;
}

return false;
});

reader.history_depth(100).
reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS).init();

ASSERT_TRUE(reader.isInitialized());

writer.history_depth(100).init();

ASSERT_TRUE(writer.isInitialized());

reader.waitDiscovery();
writer.waitDiscovery();

writer.destroy();

reader.wait_undiscovery();

ASSERT_TRUE(reader.getDiscoveryResult());
}

BLACKBOXTEST(BlackBox, EDPSlaveReaderAttachment)
{
PubSubWriter<HelloWorldType> checker(TEST_TOPIC_NAME);
Expand Down
13 changes: 13 additions & 0 deletions test/blackbox/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,19 @@ class PubSubReader
std::cout << "Reader discovery finished..." << std::endl;
}

void wait_undiscovery()
{
std::unique_lock<std::mutex> lock(mutexDiscovery_);

std::cout << "Reader is waiting undiscovery..." << std::endl;

if(matched_ != 0)
cvDiscovery_.wait(lock);

ASSERT_EQ(matched_, 0u);
std::cout << "Reader undiscovery finished..." << std::endl;
}

void waitRemoval()
{
std::unique_lock<std::mutex> lock(mutexDiscovery_);
Expand Down
13 changes: 13 additions & 0 deletions test/blackbox/PubSubWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,19 @@ class PubSubWriter
std::cout << "Writer discovery finished..." << std::endl;
}

void wait_undiscovery()
{
std::unique_lock<std::mutex> lock(mutexDiscovery_);

std::cout << "Writer is waiting undiscovery..." << std::endl;

if(matched_ != 0)
cv_.wait(lock);

ASSERT_EQ(matched_, 0u);
std::cout << "Writer undiscovery finished..." << std::endl;
}

void waitRemoval()
{
std::unique_lock<std::mutex> lock(mutexDiscovery_);
Expand Down

0 comments on commit adb0014

Please sign in to comment.