diff --git a/iceoryx_dds/include/iceoryx_dds/dds/data_reader.hpp b/iceoryx_dds/include/iceoryx_dds/dds/data_reader.hpp index cdb7f8269e..4d0e9d7130 100644 --- a/iceoryx_dds/include/iceoryx_dds/dds/data_reader.hpp +++ b/iceoryx_dds/include/iceoryx_dds/dds/data_reader.hpp @@ -29,12 +29,13 @@ enum class DataReaderError : uint8_t { NOT_CONNECTED, INVALID_RECV_BUFFER, + INVALID_DATA, SAMPLE_SIZE_MISMATCH, RECV_BUFFER_TOO_SMALL }; constexpr char DataReaderErrorString[][64] = { - "NOT_CONNECTED", "INVALID_RECV_BUFFER", "SAMPLE_SIZE_MISMATCH", "RECV_BUFFER_TOO_SMALL"}; + "NOT_CONNECTED", "INVALID_RECV_BUFFER", "INVALID_DATA", "SAMPLE_SIZE_MISMATCH", "RECV_BUFFER_TOO_SMALL"}; class DataReader { diff --git a/iceoryx_dds/include/iceoryx_dds/internal/gateway/dds_to_iox.inl b/iceoryx_dds/include/iceoryx_dds/internal/gateway/dds_to_iox.inl index 9001c2d73b..dd1c61dc8d 100644 --- a/iceoryx_dds/include/iceoryx_dds/internal/gateway/dds_to_iox.inl +++ b/iceoryx_dds/include/iceoryx_dds/internal/gateway/dds_to_iox.inl @@ -19,6 +19,8 @@ #include "iceoryx_posh/capro/service_description.hpp" #include "iceoryx_utils/cxx/string.hpp" +#include "iceoryx_dds/gateway/dds_to_iox.hpp" + namespace iox { namespace dds @@ -51,53 +53,30 @@ inline void DDS2IceoryxGateway::discover(const iox::capro: template inline void DDS2IceoryxGateway::forward(const channel_t& channel) noexcept { - // Don't forward across channels that don't provide their size. - // Data size is required to determine the required memchunk size. - auto dataSize = channel.getDataSize(); - if (!dataSize.has_value()) - { - // nothing to do - LogWarn() << "[DDS2IceoryxGateway] Attempted to forward over a channel with an unknown data size."; - return; - } auto publisher = channel.getIceoryxTerminal(); auto reader = channel.getDDSTerminal(); - // reserve a chunk for initial sample - if (m_reservedChunk == nullptr) + auto peekResult = reader->peekNext(); + if(peekResult.has_value()) { - m_reservedChunk = publisher->allocateChunk(static_cast(dataSize.value())); - } + // reserve a chunk for the sample + auto size = peekResult.value(); - // read exactly one sample into the reserved chunk - auto buffer = static_cast(m_reservedChunk); - auto const numToRead = 1u; - auto result = reader->take(buffer, dataSize.value(), dataSize.value(), numToRead); - if (!result.has_error()) - { - auto num = result.get_value(); - if (num == 0) - { - // nothing to do. - } - else if (num == 1) - { - // publish the sample - publisher->sendChunk(buffer); - // reserve a new chunk for the next sample - m_reservedChunk = publisher->allocateChunk(static_cast(dataSize.value())); - } - else + m_reservedChunk = publisher->allocateChunk(static_cast(size)); + + // read sample into reserved chunk + auto buffer = static_cast(m_reservedChunk); + auto takeResult = reader->takeNext(buffer, size); + if(takeResult.has_error()) { - // sample is corrupt, don't publish. - LogWarn() << "[DDS2IceoryxGateway] Received corrupt sample. Buffer is larger than expected. Skipping."; + LogWarn() << "[DDS2IceoryxGateway] Encountered error reading from DDS network: " << iox::dds::DataReaderErrorString[static_cast(takeResult.get_error())]; } + + // publish the sample + publisher->sendChunk(buffer); } - else - { - LogWarn() << "[DDS2IceoryxGateway] Encountered error reading from DDS network: " << iox::dds::DataReaderErrorString[static_cast(result.get_error())]; - } + } // ======================================== Private ======================================== // diff --git a/iceoryx_dds/source/iceoryx_dds/dds/cyclone_data_reader.cpp b/iceoryx_dds/source/iceoryx_dds/dds/cyclone_data_reader.cpp index c762bdcccf..5998a9283c 100644 --- a/iceoryx_dds/source/iceoryx_dds/dds/cyclone_data_reader.cpp +++ b/iceoryx_dds/source/iceoryx_dds/dds/cyclone_data_reader.cpp @@ -56,17 +56,22 @@ iox::cxx::optional iox::dds::CycloneDataReader::peekNext() .max_samples(1u) .state(::dds::sub::status::SampleState::any()) .read(); + if(readSamples.length() > 0) { auto nextSample = readSamples.begin(); auto nextSampleSize = nextSample->data().payload().size(); - return iox::cxx::optional(static_cast(nextSampleSize)); - } - else - { - return iox::cxx::nullopt_t(); + // Ignore samples with no payload + if (nextSampleSize != 0) + { + return iox::cxx::optional(static_cast(nextSampleSize)); + } } + + // No valid samples available + return iox::cxx::nullopt_t(); + } iox::cxx::expected @@ -134,10 +139,12 @@ iox::cxx::expected iox::dds::CycloneDataRea // Sample validation checks uint64_t size = samples.begin()->data().payload().size(); + if (size == 0) + { + return iox::cxx::error(iox::dds::DataReaderError::INVALID_DATA); + } if (size != sampleSize) { - // Received invalid data. - // NOTE: This causes other data points received in this read to be lost... return iox::cxx::error(iox::dds::DataReaderError::SAMPLE_SIZE_MISMATCH); } diff --git a/iceoryx_dds/test/mocks/google_mocks.hpp b/iceoryx_dds/test/mocks/google_mocks.hpp index 29092a2fbc..f08a3d8826 100644 --- a/iceoryx_dds/test/mocks/google_mocks.hpp +++ b/iceoryx_dds/test/mocks/google_mocks.hpp @@ -65,6 +65,8 @@ class MockDataReader public: MockDataReader(const iox::capro::ServiceDescription& sd){}; MOCK_METHOD0(connect, void(void)); + MOCK_METHOD0(peekNext, iox::cxx::optional(void)); + MOCK_METHOD2(takeNext, iox::cxx::expected(uint8_t* const, const uint64_t&)); MOCK_METHOD3(take, iox::cxx::expected(uint8_t* const buffer, const uint64_t&,