Skip to content

Commit

Permalink
iox-#27 Implement getQueueIndex and deliverToQueue in ChunkDistributor
Browse files Browse the repository at this point in the history
  • Loading branch information
elBoberido committed Jan 6, 2022
1 parent d56ee26 commit dd56107
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,20 @@ class ChunkDistributor
/// @brief Deliver the provided shared chunk to the chunk queue with the provided ID. The chunk will NOT be added
/// to the chunk history
/// @param[in] uniqueQueueId is an unique ID which identifies the queue to which this chunk shall be delivered
/// @param[in] lastKnownQueueIndex is used for a fast lookup of the queue with uniqueQueueId; if the queue is not
/// found at the index, the queue is searched by iteration over all stored queues and the new index is stored in
/// this parameter
/// @param[in] lastKnownQueueIndex is used for a fast lookup of the queue with uniqueQueueId
/// @param[in] chunk is the SharedChunk to be delivered
/// @return ChunkDistributorError if the queue was not found
cxx::expected<ChunkDistributorError>
deliverToQueue(const cxx::UniqueId uniqueQueueId, uint32_t& lastKnownQueueIndex, mepoo::SharedChunk chunk) noexcept;
cxx::expected<ChunkDistributorError> deliverToQueue(const cxx::UniqueId uniqueQueueId,
const uint32_t lastKnownQueueIndex,
mepoo::SharedChunk chunk) noexcept;

/// @brief Lookup for the index of a queue with a specific cxx::UniqueId
/// @param[in] uniqueQueueId is the unique ID of the queue to query the index
/// @param[in] lastKnownQueueIndex is used for a fast lookup of the queue with uniqueQueueId; if the queue is not
/// found at the index, the queue is searched by iteration over all stored queues
/// @return the index of the queue with uniqueQueueId or cxx::nullopt if the queue was not found
cxx::optional<uint32_t> getQueueIndex(const cxx::UniqueId uniqueQueueId,
const uint32_t lastKnownQueueIndex) const noexcept;

/// @brief Update the chunk history but do not deliver the chunk to any chunk queue. E.g. use case is to to update a
/// non offered field in ara
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,16 +209,72 @@ inline bool ChunkDistributor<ChunkDistributorDataType>::deliverToQueue(cxx::not_

template <typename ChunkDistributorDataType>
inline cxx::expected<ChunkDistributorError>
ChunkDistributor<ChunkDistributorDataType>::deliverToQueue(const cxx::UniqueId uniqueQueueId IOX_MAYBE_UNUSED,
uint32_t& lastKnownQueueIndex IOX_MAYBE_UNUSED,
ChunkDistributor<ChunkDistributorDataType>::deliverToQueue(const cxx::UniqueId uniqueQueueId,
const uint32_t lastKnownQueueIndex,
mepoo::SharedChunk chunk IOX_MAYBE_UNUSED) noexcept
{
/// @todo iox-#27
/// - find queue
/// - deliver to queue but respect `SubscriberTooSlowPolicy` like in `deliverToAllStoredQueues`
bool retry{false};
do
{
typename MemberType_t::LockGuard_t lock(*getMembers());

auto queueIndex = getQueueIndex(uniqueQueueId, lastKnownQueueIndex);

if (!queueIndex.has_value())
{
return cxx::error<ChunkDistributorError>(ChunkDistributorError::QUEUE_NOT_IN_CONTAINER);
}

auto& queue = getMembers()->m_queues[queueIndex.value()];

bool willWaitForSubscriber =
getMembers()->m_subscriberTooSlowPolicy == SubscriberTooSlowPolicy::WAIT_FOR_SUBSCRIBER;

bool isBlockingQueue = (willWaitForSubscriber && queue->m_queueFullPolicy == QueueFullPolicy::BLOCK_PUBLISHER);

retry = false;
if (!deliverToQueue(queue.get(), chunk))
{
if (isBlockingQueue)
{
retry = true;
}
else
{
ChunkQueuePusher_t(queue.get()).lostAChunk();
}
}
} while (!retry);

return cxx::success<>();
}

template <typename ChunkDistributorDataType>
inline cxx::optional<uint32_t>
ChunkDistributor<ChunkDistributorDataType>::getQueueIndex(const cxx::UniqueId uniqueQueueId,
const uint32_t lastKnownQueueIndex) const noexcept
{
typename MemberType_t::LockGuard_t lock(*getMembers());

auto& queues = getMembers()->m_queues;

if (queues.size() > lastKnownQueueIndex && queues[lastKnownQueueIndex]->m_uniqueId == uniqueQueueId)
{
return lastKnownQueueIndex;
}

uint32_t index{0};
for (auto& queue : queues)
{
if (queue->m_uniqueId == uniqueQueueId)
{
return index;
}
++index;
}
return cxx::nullopt;
}

template <typename ChunkDistributorDataType>
inline void ChunkDistributor<ChunkDistributorDataType>::addToHistoryWithoutDelivery(mepoo::SharedChunk chunk) noexcept
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class ChunkSender : public ChunkDistributor<typename ChunkSenderDataType::ChunkD

bool sendToQueue(mepoo::ChunkHeader* const chunkHeader,
const cxx::UniqueId uniqueQueuId,
uint32_t& lastKnownQueueIndex) noexcept;
const uint32_t lastKnownQueueIndex) noexcept;

/// @brief Push an allocated chunk to the history without sending it
/// @param[in] chunkHeader, pointer to the ChunkHeader to push to the history
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ inline void ChunkSender<ChunkSenderDataType>::send(mepoo::ChunkHeader* const chu
template <typename ChunkSenderDataType>
inline bool ChunkSender<ChunkSenderDataType>::sendToQueue(mepoo::ChunkHeader* const chunkHeader,
const cxx::UniqueId uniqueQueuId,
uint32_t& lastKnownQueueIndex) noexcept
const uint32_t lastKnownQueueIndex) noexcept
{
mepoo::SharedChunk chunk(nullptr);
// BEGIN of critical section, chunk will be lost if process gets hard terminated in between
Expand Down

0 comments on commit dd56107

Please sign in to comment.