Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] Fix wrong behavior when removing the chunkedMessageCtx #110

Merged
merged 2 commits into from
Nov 10, 2022

Conversation

RobertIndie
Copy link
Member

Fixes #104

Motivation

Currently, the consumer ack the last message when the chunked messages exceed maxPendingChunkMessages. This is wrong behavior. This may lead to unexpected data loss.

This PR also fixes serval issues related to maxPendingChunkedMessages:

if (it == chunkedMessageCache_.end()) {
it = chunkedMessageCache_.putIfAbsent(
uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg(), metadata.total_chunk_msg_size()});
}
if (maxPendingChunkedMessage_ > 0 && chunkedMessageCache_.size() >= maxPendingChunkedMessage_) {
chunkedMessageCache_.removeOldestValues(
chunkedMessageCache_.size() - maxPendingChunkedMessage_ + 1,
[this, messageId](const std::string& uuid, const ChunkedMessageCtx& ctx) {
if (autoAckOldestChunkedMessageOnQueueFull_) {
doAcknowledgeIndividual(messageId, [uuid, messageId](Result result) {
if (result != ResultOk) {
LOG_WARN("Failed to acknowledge discarded chunk, uuid: "
<< uuid << ", messageId: " << messageId);
}
});
} else {
trackMessage(messageId);
}
});
it = chunkedMessageCache_.putIfAbsent(
uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg(), metadata.total_chunk_msg_size()});

In the current logic, there are two putIfAbsent operations here, and they are confusing. If a new chunk message is received, it will be added to the chunkedMessageCache. But if the size of the cache reaches the maxPendingChunkedMessages, it will remove at least 1 ctx from the cache due to chunkedMessageCache_.size() - maxPendingChunkedMessage_ + 1. But the message is then put into the cache again. This can lead to unnecessary ctx buffer memory allocations.

Here are some key point of this issue:
image

Modifications

  • Fix consumer acked the wrong message when pending chunked messages exceed maxPendingChunkMessages
  • Fix wrong behavior when remove the ctx from the chunkedMessageCache.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Documentation

  • doc-required
    (Your PR needs to update docs and you will update later)

  • doc-not-needed
    (Please explain why)

  • doc
    (Your PR contains doc changes)

  • doc-complete
    (Docs have been already added)

@RobertIndie RobertIndie added the bug Something isn't working label Nov 9, 2022
@RobertIndie RobertIndie self-assigned this Nov 9, 2022
@RobertIndie RobertIndie marked this pull request as ready for review November 9, 2022 08:29
@BewareMyPower
Copy link
Contributor

In the current logic, there are two putIfAbsent operations here, and they are confusing

The original logic should have been like the following diff based on your fix:

diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 8658e08..f103144 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -392,11 +392,8 @@ Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& pay
     auto it = chunkedMessageCache_.find(uuid);

     if (chunkId == 0) {
-        if (it == chunkedMessageCache_.end()) {
-            it = chunkedMessageCache_.putIfAbsent(
-                uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg(), metadata.total_chunk_msg_size()});
-        }
-        if (maxPendingChunkedMessage_ > 0 && chunkedMessageCache_.size() > maxPendingChunkedMessage_) {
+        if (it == chunkedMessageCache_.end() && maxPendingChunkedMessage_ > 0 &&
+            chunkedMessageCache_.size() >= maxPendingChunkedMessage_) {
             chunkedMessageCache_.removeOldestValues(
                 chunkedMessageCache_.size() - maxPendingChunkedMessage_,
                 [this](const std::string& uuid, const ChunkedMessageCtx& ctx) {
@@ -404,7 +401,10 @@ Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& pay
                         discardChunkMessages(uuid, msgId, autoAckOldestChunkedMessageOnQueueFull_);
                     }
                 });
-            it = chunkedMessageCache_.find(uuid);  // Need to reset the iterator after changing the cache.
+        }
+        if (it == chunkedMessageCache_.end()) {
+            it = chunkedMessageCache_.putIfAbsent(
+                uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg(), metadata.total_chunk_msg_size()});
         }
     }

I intended to check the size first, then remove the oldest values if necessary. After the oldest values are removed, insert the chunk message context.

But I forgot to remove the first putIfAbsent.

@RobertIndie
Copy link
Member Author

The original logic should have been like the following diff based on your fix:

@BewareMyPower There is a small problem with this patch. If there are two duplicate messages with the same uuid and chunk-id, then the message ctx discard is also triggered, resulting in the removal of one more message ctx.

@BewareMyPower
Copy link
Contributor

BewareMyPower commented Nov 10, 2022

@RobertIndie I think not. The patch calls it == chunkedMessageCache_.end() (before maxPendingChunkedMessage_ > 0) to check if the current chunk exists in the cache, just like putIfAbsent. Could you add an example?

@RobertIndie
Copy link
Member Author

@RobertIndie I think not. The patch calls it == chunkedMessageCache_.end()

@BewareMyPower Thanks. I didn't notice that you changed here. This patch gives me some inspiration. I have pushed a commit to refactor this logic to make it more straightforward. PTAL again.

@BewareMyPower BewareMyPower merged commit ad79bec into apache:main Nov 10, 2022
@RobertIndie RobertIndie added this to the 3.1.0 milestone Nov 24, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug] Consumer acked the wrong message when pending chunked messages exceed maxPendingChunkMessages
2 participants