Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] Fix wrong behavior when removing the chunkedMessageCtx #110

Merged
merged 2 commits into from
Nov 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 20 additions & 23 deletions lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,19 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) {
}
}

void ConsumerImpl::discardChunkMessages(std::string uuid, MessageId messageId, bool autoAck) {
if (autoAck) {
doAcknowledgeIndividual(messageId, [uuid, messageId](Result result) {
if (result != ResultOk) {
LOG_WARN("Failed to acknowledge discarded chunk, uuid: " << uuid
<< ", messageId: " << messageId);
}
});
} else {
trackMessage(messageId);
}
}

void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
checkExpiredChunkedTimer_->expires_from_now(
boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
Expand All @@ -347,12 +360,7 @@ void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
}
for (const MessageId& msgId : ctx.getChunkedMessageIds()) {
LOG_INFO("Removing expired chunk messages: uuid: " << uuid << ", messageId: " << msgId);
doAcknowledgeIndividual(msgId, [uuid, msgId](Result result) {
if (result != ResultOk) {
LOG_WARN("Failed to acknowledge discarded chunk, uuid: "
<< uuid << ", messageId: " << msgId);
}
});
discardChunkMessages(uuid, msgId, true);
}
return true;
});
Expand Down Expand Up @@ -383,29 +391,18 @@ Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& pay

auto it = chunkedMessageCache_.find(uuid);

if (chunkId == 0) {
if (it == chunkedMessageCache_.end()) {
it = chunkedMessageCache_.putIfAbsent(
uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg(), metadata.total_chunk_msg_size()});
}
if (chunkId == 0 && it == chunkedMessageCache_.end()) {
if (maxPendingChunkedMessage_ > 0 && chunkedMessageCache_.size() >= maxPendingChunkedMessage_) {
chunkedMessageCache_.removeOldestValues(
chunkedMessageCache_.size() - maxPendingChunkedMessage_ + 1,
[this, messageId](const std::string& uuid, const ChunkedMessageCtx& ctx) {
if (autoAckOldestChunkedMessageOnQueueFull_) {
doAcknowledgeIndividual(messageId, [uuid, messageId](Result result) {
if (result != ResultOk) {
LOG_WARN("Failed to acknowledge discarded chunk, uuid: "
<< uuid << ", messageId: " << messageId);
}
});
} else {
trackMessage(messageId);
[this](const std::string& uuid, const ChunkedMessageCtx& ctx) {
for (const MessageId& msgId : ctx.getChunkedMessageIds()) {
discardChunkMessages(uuid, msgId, autoAckOldestChunkedMessageOnQueueFull_);
}
});
it = chunkedMessageCache_.putIfAbsent(
uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg(), metadata.total_chunk_msg_size()});
}
it = chunkedMessageCache_.putIfAbsent(
uuid, ChunkedMessageCtx{metadata.num_chunks_from_msg(), metadata.total_chunk_msg_size()});
}

auto& chunkedMsgCtx = it->second;
Expand Down
1 change: 1 addition & 0 deletions lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ class ConsumerImpl : public ConsumerImplBase {
std::atomic_bool expireChunkMessageTaskScheduled_{false};

void triggerCheckExpiredChunkedTimer();
void discardChunkMessages(std::string uuid, MessageId messageId, bool autoAck);

/**
* Process a chunk. If the chunk is the last chunk of a message, concatenate all buffered chunks into the
Expand Down
73 changes: 73 additions & 0 deletions tests/MessageChunkingTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,79 @@ TEST_P(MessageChunkingTest, testExpireIncompleteChunkMessage) {
consumer.close();
}

TEST_P(MessageChunkingTest, testMaxPendingChunkMessages) {
if (toString(GetParam()) != "None") {
return;
}
const std::string topic = "MessageChunkingTest-testMaxPendingChunkMessages-" + toString(GetParam()) +
std::to_string(time(nullptr));
Consumer consumer;
ConsumerConfiguration consumerConf;
consumerConf.setMaxPendingChunkedMessage(1);
consumerConf.setAutoAckOldestChunkedMessageOnQueueFull(true);
createConsumer(topic, consumer, consumerConf);
Producer producer;
createProducer(topic, producer);

auto msg = MessageBuilder().setContent("chunk-0-0|").build();
auto& metadata = PulsarFriend::getMessageMetadata(msg);
metadata.set_num_chunks_from_msg(2);
metadata.set_chunk_id(0);
metadata.set_uuid("0");
metadata.set_total_chunk_msg_size(100);

producer.send(msg);

auto msg2 = MessageBuilder().setContent("chunk-1-0|").build();
auto& metadata2 = PulsarFriend::getMessageMetadata(msg2);
metadata2.set_num_chunks_from_msg(2);
metadata2.set_uuid("1");
metadata2.set_chunk_id(0);
metadata2.set_total_chunk_msg_size(100);

producer.send(msg2);

auto msg3 = MessageBuilder().setContent("chunk-1-1|").build();
auto& metadata3 = PulsarFriend::getMessageMetadata(msg3);
metadata3.set_num_chunks_from_msg(2);
metadata3.set_uuid("1");
metadata3.set_chunk_id(1);
metadata3.set_total_chunk_msg_size(100);

producer.send(msg3);

Message receivedMsg;
ASSERT_EQ(ResultOk, consumer.receive(receivedMsg, 3000));
ASSERT_EQ(receivedMsg.getDataAsString(), "chunk-1-0|chunk-1-1|");

consumer.redeliverUnacknowledgedMessages();

// The consumer may acknowledge the wrong message(the latest message) in the old version of codes. This
// test case ensure that it should not happen again.
Message receivedMsg2;
ASSERT_EQ(ResultOk, consumer.receive(receivedMsg2, 3000));
ASSERT_EQ(receivedMsg2.getDataAsString(), "chunk-1-0|chunk-1-1|");

consumer.acknowledge(receivedMsg2);

consumer.redeliverUnacknowledgedMessages();
auto msg4 = MessageBuilder().setContent("chunk-0-1|").build();
auto& metadata4 = PulsarFriend::getMessageMetadata(msg4);
metadata4.set_num_chunks_from_msg(2);
metadata4.set_uuid("0");
metadata4.set_chunk_id(1);
metadata4.set_total_chunk_msg_size(100);

producer.send(msg4);

// This ensures that the message chunk-0-0 was acknowledged successfully. So we cannot receive it anymore.
Message receivedMsg3;
consumer.receive(receivedMsg3, 3000);

producer.close();
consumer.close();
}

// The CI env is Ubuntu 16.04, the gtest-dev version is 1.8.0 that doesn't have INSTANTIATE_TEST_SUITE_P
INSTANTIATE_TEST_CASE_P(Pulsar, MessageChunkingTest,
::testing::Values(CompressionNone, CompressionLZ4, CompressionZLib, CompressionZSTD,
Expand Down