Skip to content

Commit

Permalink
iox-eclipse-iceoryx#65 Implement forwarding with peek then take strat…
Browse files Browse the repository at this point in the history
…egy.

Signed-off-by: Ithier Jeff (CC-AD/EYF1) <[email protected]>
  • Loading branch information
orecham committed Jul 7, 2020
1 parent 2aaedfe commit 7282455
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 46 deletions.
3 changes: 2 additions & 1 deletion iceoryx_dds/include/iceoryx_dds/dds/data_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ enum class DataReaderError : uint8_t
{
NOT_CONNECTED,
INVALID_RECV_BUFFER,
INVALID_DATA,
SAMPLE_SIZE_MISMATCH,
RECV_BUFFER_TOO_SMALL
};

constexpr char DataReaderErrorString[][64] = {
"NOT_CONNECTED", "INVALID_RECV_BUFFER", "SAMPLE_SIZE_MISMATCH", "RECV_BUFFER_TOO_SMALL"};
"NOT_CONNECTED", "INVALID_RECV_BUFFER", "INVALID_DATA", "SAMPLE_SIZE_MISMATCH", "RECV_BUFFER_TOO_SMALL"};

class DataReader
{
Expand Down
55 changes: 17 additions & 38 deletions iceoryx_dds/include/iceoryx_dds/internal/gateway/dds_to_iox.inl
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include "iceoryx_posh/capro/service_description.hpp"
#include "iceoryx_utils/cxx/string.hpp"

#include "iceoryx_dds/gateway/dds_to_iox.hpp"

namespace iox
{
namespace dds
Expand Down Expand Up @@ -51,53 +53,30 @@ inline void DDS2IceoryxGateway<channel_t, gateway_t>::discover(const iox::capro:
template <typename channel_t, typename gateway_t>
inline void DDS2IceoryxGateway<channel_t, gateway_t>::forward(const channel_t& channel) noexcept
{
// Don't forward across channels that don't provide their size.
// Data size is required to determine the required memchunk size.
auto dataSize = channel.getDataSize();
if (!dataSize.has_value())
{
// nothing to do
LogWarn() << "[DDS2IceoryxGateway] Attempted to forward over a channel with an unknown data size.";
return;
}

auto publisher = channel.getIceoryxTerminal();
auto reader = channel.getDDSTerminal();

// reserve a chunk for initial sample
if (m_reservedChunk == nullptr)
auto peekResult = reader->peekNext();
if(peekResult.has_value())
{
m_reservedChunk = publisher->allocateChunk(static_cast<uint32_t>(dataSize.value()));
}
// reserve a chunk for the sample
auto size = peekResult.value();

// read exactly one sample into the reserved chunk
auto buffer = static_cast<uint8_t*>(m_reservedChunk);
auto const numToRead = 1u;
auto result = reader->take(buffer, dataSize.value(), dataSize.value(), numToRead);
if (!result.has_error())
{
auto num = result.get_value();
if (num == 0)
{
// nothing to do.
}
else if (num == 1)
{
// publish the sample
publisher->sendChunk(buffer);
// reserve a new chunk for the next sample
m_reservedChunk = publisher->allocateChunk(static_cast<uint32_t>(dataSize.value()));
}
else
m_reservedChunk = publisher->allocateChunk(static_cast<uint32_t>(size));

// read sample into reserved chunk
auto buffer = static_cast<uint8_t*>(m_reservedChunk);
auto takeResult = reader->takeNext(buffer, size);
if(takeResult.has_error())
{
// sample is corrupt, don't publish.
LogWarn() << "[DDS2IceoryxGateway] Received corrupt sample. Buffer is larger than expected. Skipping.";
LogWarn() << "[DDS2IceoryxGateway] Encountered error reading from DDS network: " << iox::dds::DataReaderErrorString[static_cast<uint8_t>(takeResult.get_error())];
}

// publish the sample
publisher->sendChunk(buffer);
}
else
{
LogWarn() << "[DDS2IceoryxGateway] Encountered error reading from DDS network: " << iox::dds::DataReaderErrorString[static_cast<uint8_t>(result.get_error())];
}

}

// ======================================== Private ======================================== //
Expand Down
21 changes: 14 additions & 7 deletions iceoryx_dds/source/iceoryx_dds/dds/cyclone_data_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,22 @@ iox::cxx::optional<uint64_t> iox::dds::CycloneDataReader::peekNext()
.max_samples(1u)
.state(::dds::sub::status::SampleState::any())
.read();

if(readSamples.length() > 0)
{
auto nextSample = readSamples.begin();
auto nextSampleSize = nextSample->data().payload().size();
return iox::cxx::optional<uint64_t>(static_cast<uint64_t>(nextSampleSize));
}
else
{
return iox::cxx::nullopt_t();

// Ignore samples with no payload
if (nextSampleSize != 0)
{
return iox::cxx::optional<uint64_t>(static_cast<uint64_t>(nextSampleSize));
}
}

// No valid samples available
return iox::cxx::nullopt_t();

}

iox::cxx::expected<iox::dds::DataReaderError>
Expand Down Expand Up @@ -134,10 +139,12 @@ iox::cxx::expected<uint64_t, iox::dds::DataReaderError> iox::dds::CycloneDataRea
// Sample validation checks
uint64_t size = samples.begin()->data().payload().size();

if (size == 0)
{
return iox::cxx::error<iox::dds::DataReaderError>(iox::dds::DataReaderError::INVALID_DATA);
}
if (size != sampleSize)
{
// Received invalid data.
// NOTE: This causes other data points received in this read to be lost...
return iox::cxx::error<iox::dds::DataReaderError>(iox::dds::DataReaderError::SAMPLE_SIZE_MISMATCH);
}

Expand Down
2 changes: 2 additions & 0 deletions iceoryx_dds/test/mocks/google_mocks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class MockDataReader
public:
MockDataReader(const iox::capro::ServiceDescription& sd){};
MOCK_METHOD0(connect, void(void));
MOCK_METHOD0(peekNext, iox::cxx::optional<uint64_t>(void));
MOCK_METHOD2(takeNext, iox::cxx::expected<iox::dds::DataReaderError>(uint8_t* const, const uint64_t&));
MOCK_METHOD3(take,
iox::cxx::expected<uint64_t, iox::dds::DataReaderError>(uint8_t* const buffer,
const uint64_t&,
Expand Down

0 comments on commit 7282455

Please sign in to comment.