Skip to content

Commit

Permalink
[fix] Fix consumer doesn't acknowledge all chunk message Ids (#321)
Browse files Browse the repository at this point in the history
  • Loading branch information
RobertIndie authored Oct 7, 2023
1 parent f2c580b commit eea59bb
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 62 deletions.
33 changes: 29 additions & 4 deletions lib/AckGroupingTracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@

#include <atomic>
#include <limits>
#include <set>

#include "BitSet.h"
#include "ChunkMessageIdImpl.h"
#include "ClientConnection.h"
#include "Commands.h"
#include "LogUtils.h"
Expand All @@ -42,6 +44,17 @@ void AckGroupingTracker::doImmediateAck(const MessageId& msgId, ResultCallback c
}
return;
}
if (ackType == CommandAck_AckType_Individual) {
// If it's individual ack, we need to acknowledge all message IDs in a chunked message Id
// If it's cumulative ack, we only need to ack the last message ID of a chunked message.
// ChunkedMessageId return last chunk message ID by default, so we don't need to handle it.
if (auto chunkMessageId =
std::dynamic_pointer_cast<ChunkMessageIdImpl>(Commands::getMessageIdImpl(msgId))) {
auto msgIdList = chunkMessageId->getChunkedMessageIds();
doImmediateAck(std::set<MessageId>(msgIdList.begin(), msgIdList.end()), callback);
return;
}
}
const auto& ackSet = Commands::getMessageIdImpl(msgId)->getBitSet();
if (waitResponse_) {
const auto requestId = requestIdSupplier_();
Expand Down Expand Up @@ -84,29 +97,41 @@ void AckGroupingTracker::doImmediateAck(const std::set<MessageId>& msgIds, Resul
return;
}

std::set<MessageId> ackMsgIds;

for (const auto& msgId : msgIds) {
if (auto chunkMessageId =
std::dynamic_pointer_cast<ChunkMessageIdImpl>(Commands::getMessageIdImpl(msgId))) {
auto msgIdList = chunkMessageId->getChunkedMessageIds();
ackMsgIds.insert(msgIdList.begin(), msgIdList.end());
} else {
ackMsgIds.insert(msgId);
}
}

if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) {
if (waitResponse_) {
const auto requestId = requestIdSupplier_();
cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, msgIds, requestId), requestId)
cnx->sendRequestWithId(Commands::newMultiMessageAck(consumerId_, ackMsgIds, requestId), requestId)
.addListener([callback](Result result, const ResponseData&) {
if (callback) {
callback(result);
}
});
} else {
cnx->sendCommand(Commands::newMultiMessageAck(consumerId_, msgIds));
cnx->sendCommand(Commands::newMultiMessageAck(consumerId_, ackMsgIds));
if (callback) {
callback(ResultOk);
}
}
} else {
auto count = std::make_shared<std::atomic<size_t>>(msgIds.size());
auto count = std::make_shared<std::atomic<size_t>>(ackMsgIds.size());
auto wrappedCallback = [callback, count](Result result) {
if (--*count == 0 && callback) {
callback(result);
}
};
for (auto&& msgId : msgIds) {
for (auto&& msgId : ackMsgIds) {
doImmediateAck(msgId, wrappedCallback, CommandAck_AckType_Individual);
}
}
Expand Down
18 changes: 8 additions & 10 deletions lib/ChunkMessageIdImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,19 @@ class ChunkMessageIdImpl;
typedef std::shared_ptr<ChunkMessageIdImpl> ChunkMessageIdImplPtr;
class ChunkMessageIdImpl : public MessageIdImpl, public std::enable_shared_from_this<ChunkMessageIdImpl> {
public:
ChunkMessageIdImpl() : firstChunkMsgId_(std::make_shared<MessageIdImpl>()) {}

void setFirstChunkMessageId(const MessageId& msgId) { *firstChunkMsgId_ = *msgId.impl_; }

void setLastChunkMessageId(const MessageId& msgId) {
this->ledgerId_ = msgId.ledgerId();
this->entryId_ = msgId.entryId();
this->partition_ = msgId.partition();
explicit ChunkMessageIdImpl(std::vector<MessageId>&& chunkedMessageIds)
: chunkedMessageIds_(std::move(chunkedMessageIds)) {
auto lastChunkMsgId = chunkedMessageIds_.back();
this->ledgerId_ = lastChunkMsgId.ledgerId();
this->entryId_ = lastChunkMsgId.entryId();
this->partition_ = lastChunkMsgId.partition();
}

std::shared_ptr<const MessageIdImpl> getFirstChunkMessageId() const { return firstChunkMsgId_; }
const std::vector<MessageId>& getChunkedMessageIds() const noexcept { return chunkedMessageIds_; }

MessageId build() { return MessageId{std::dynamic_pointer_cast<MessageIdImpl>(shared_from_this())}; }

private:
std::shared_ptr<MessageIdImpl> firstChunkMsgId_;
std::vector<MessageId> chunkedMessageIds_;
};
} // namespace pulsar
6 changes: 3 additions & 3 deletions lib/Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -583,9 +583,9 @@ SharedBuffer Commands::newSeek(uint64_t consumerId, uint64_t requestId, const Me

auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageId.impl_);
if (chunkMsgId) {
auto firstId = chunkMsgId->getFirstChunkMessageId();
messageIdData.set_ledgerid(firstId->ledgerId_);
messageIdData.set_entryid(firstId->entryId_);
const auto& firstId = chunkMsgId->getChunkedMessageIds().front();
messageIdData.set_ledgerid(firstId.ledgerId());
messageIdData.set_entryid(firstId.entryId());
} else {
messageIdData.set_ledgerid(messageId.ledgerId());
messageIdData.set_entryid(messageId.entryId());
Expand Down
8 changes: 4 additions & 4 deletions lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -479,10 +479,7 @@ boost::optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuff
return boost::none;
}

ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>();
chunkMsgId->setFirstChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().front());
chunkMsgId->setLastChunkMessageId(chunkedMsgCtx.getChunkedMessageIds().back());
messageId = chunkMsgId->build();
messageId = std::make_shared<ChunkMessageIdImpl>(chunkedMsgCtx.moveChunkedMessageIds())->build();

LOG_DEBUG("Chunked message completed chunkId: " << chunkId << ", ChunkedMessageCtx: " << chunkedMsgCtx
<< ", sequenceId: " << metadata.sequence_id());
Expand Down Expand Up @@ -1174,6 +1171,9 @@ std::pair<MessageId, bool> ConsumerImpl::prepareIndividualAck(const MessageId& m
(batchSize > 0) ? batchSize : 1);
unAckedMessageTrackerPtr_->remove(messageId);
possibleSendToDeadLetterTopicMessages_.remove(messageId);
if (std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageIdImpl)) {
return std::make_pair(messageId, true);
}
return std::make_pair(discardBatch(messageId), true);
} else if (config_.isBatchIndexAckEnabled()) {
return std::make_pair(messageId, true);
Expand Down
4 changes: 2 additions & 2 deletions lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ class ConsumerImpl : public ConsumerImplBase {

const std::vector<MessageId>& getChunkedMessageIds() const noexcept { return chunkedMessageIds_; }

std::vector<MessageId> moveChunkedMessageIds() noexcept { return std::move(chunkedMessageIds_); }

long getReceivedTimeMs() const noexcept { return receivedTimeMs_; }

friend std::ostream& operator<<(std::ostream& os, const ChunkedMessageCtx& ctx) {
Expand All @@ -292,8 +294,6 @@ class ConsumerImpl : public ConsumerImplBase {
// concurrently on the topic) then it guards against broken chunked message which was not fully published
const bool autoAckOldestChunkedMessageOnQueueFull_;

// The key is UUID, value is the associated ChunkedMessageCtx of the chunked message.
std::unordered_map<std::string, ChunkedMessageCtx> chunkedMessagesMap_;
// This list contains all the keys of `chunkedMessagesMap_`, each key is an UUID that identifies a pending
// chunked message. Once the number of pending chunked messages exceeds the limit, the oldest UUIDs and
// the associated ChunkedMessageCtx will be removed.
Expand Down
19 changes: 9 additions & 10 deletions lib/MessageId.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ void MessageId::serialize(std::string& result) const {
auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(impl_);
if (chunkMsgId) {
proto::MessageIdData& firstChunkIdData = *idData.mutable_first_chunk_message_id();
auto firstChunkId = chunkMsgId->getFirstChunkMessageId();
firstChunkIdData.set_ledgerid(firstChunkId->ledgerId_);
firstChunkIdData.set_entryid(firstChunkId->entryId_);
const auto& firstChunkId = chunkMsgId->getChunkedMessageIds().front();
firstChunkIdData.set_ledgerid(firstChunkId.ledgerId());
firstChunkIdData.set_entryid(firstChunkId.entryId());
if (chunkMsgId->partition_ != -1) {
firstChunkIdData.set_partition(firstChunkId->partition_);
firstChunkIdData.set_partition(firstChunkId.partition());
}
}

Expand All @@ -99,9 +99,8 @@ MessageId MessageId::deserialize(const std::string& serializedMessageId) {
MessageId msgId = MessageIdBuilder::from(idData).build();

if (idData.has_first_chunk_message_id()) {
ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>();
chunkMsgId->setFirstChunkMessageId(MessageIdBuilder::from(idData.first_chunk_message_id()).build());
chunkMsgId->setLastChunkMessageId(msgId);
ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>(
std::vector<MessageId>({MessageIdBuilder::from(idData.first_chunk_message_id()).build(), msgId}));
return chunkMsgId->build();
}

Expand All @@ -121,9 +120,9 @@ int32_t MessageId::batchSize() const { return impl_->batchSize_; }
PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const pulsar::MessageId& messageId) {
auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(messageId.impl_);
if (chunkMsgId) {
auto firstId = chunkMsgId->getFirstChunkMessageId();
s << '(' << firstId->ledgerId_ << ',' << firstId->entryId_ << ',' << firstId->partition_ << ','
<< firstId->batchIndex_ << ");";
const auto& firstId = chunkMsgId->getChunkedMessageIds().front();
s << '(' << firstId.ledgerId() << ',' << firstId.entryId() << ',' << firstId.partition() << ','
<< firstId.batchIndex() << ");";
}
s << '(' << messageId.impl_->ledgerId_ << ',' << messageId.impl_->entryId_ << ','
<< messageId.impl_->partition_ << ',' << messageId.impl_->batchIndex_ << ')';
Expand Down
8 changes: 5 additions & 3 deletions lib/OpSendMsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ struct SendArguments {
SendArguments& operator=(const SendArguments&) = delete;
};

typedef std::shared_ptr<std::vector<MessageId>> ChunkMessageIdListPtr;

struct OpSendMsg {
const Result result;
const int32_t chunkId;
Expand All @@ -54,7 +56,7 @@ struct OpSendMsg {
const boost::posix_time::ptime timeout;
const SendCallback sendCallback;
std::vector<std::function<void(Result)>> trackerCallbacks;
ChunkMessageIdImplPtr chunkedMessageId;
ChunkMessageIdListPtr chunkMessageIdList;
// Use shared_ptr here because producer might resend the message with the same arguments
const std::shared_ptr<SendArguments> sendArgs;

Expand Down Expand Up @@ -89,7 +91,7 @@ struct OpSendMsg {
sendArgs(nullptr) {}

OpSendMsg(const proto::MessageMetadata& metadata, uint32_t messagesCount, uint64_t messagesSize,
int sendTimeoutMs, SendCallback&& callback, ChunkMessageIdImplPtr chunkedMessageId,
int sendTimeoutMs, SendCallback&& callback, ChunkMessageIdListPtr chunkMessageIdList,
uint64_t producerId, SharedBuffer payload)
: result(ResultOk),
chunkId(metadata.chunk_id()),
Expand All @@ -98,7 +100,7 @@ struct OpSendMsg {
messagesSize(messagesSize),
timeout(TimeUtils::now() + boost::posix_time::milliseconds(sendTimeoutMs)),
sendCallback(std::move(callback)),
chunkedMessageId(chunkedMessageId),
chunkMessageIdList(std::move(chunkMessageIdList)),
sendArgs(new SendArguments(producerId, metadata.sequence_id(), metadata, payload)) {}
};

Expand Down
19 changes: 9 additions & 10 deletions lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -572,14 +572,14 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& c
}
} else {
const bool sendChunks = (totalChunks > 1);
ChunkMessageIdListPtr chunkMessageIdList;
if (sendChunks) {
msgMetadata.set_uuid(producerName_ + "-" + std::to_string(sequenceId));
msgMetadata.set_num_chunks_from_msg(totalChunks);
msgMetadata.set_total_chunk_msg_size(compressedSize);
chunkMessageIdList = std::make_shared<std::vector<MessageId>>();
}

auto chunkMessageId = totalChunks > 1 ? std::make_shared<ChunkMessageIdImpl>() : nullptr;

int beginIndex = 0;
for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
if (sendChunks) {
Expand All @@ -596,7 +596,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& c
}

auto op = OpSendMsg::create(msgMetadata, 1, uncompressedSize, conf_.getSendTimeout(),
(chunkId == totalChunks - 1) ? callback : nullptr, chunkMessageId,
(chunkId == totalChunks - 1) ? callback : nullptr, chunkMessageIdList,
producerId_, encryptedPayload);

if (!chunkingEnabled_) {
Expand Down Expand Up @@ -887,7 +887,7 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
return true;
}

const auto& op = *pendingMessagesQueue_.front();
auto& op = *pendingMessagesQueue_.front();
if (op.result != ResultOk) {
LOG_ERROR("Unexpected OpSendMsg whose result is " << op.result << " for " << sequenceId << " and "
<< rawMessageId);
Expand All @@ -911,13 +911,12 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) {
// Message was persisted correctly
LOG_DEBUG(getName() << "Received ack for msg " << sequenceId);

if (op.chunkedMessageId) {
if (op.chunkMessageIdList) {
// Handling the chunk message id.
if (op.chunkId == 0) {
op.chunkedMessageId->setFirstChunkMessageId(messageId);
} else if (op.chunkId == op.numChunks - 1) {
op.chunkedMessageId->setLastChunkMessageId(messageId);
messageId = op.chunkedMessageId->build();
op.chunkMessageIdList->push_back(messageId);
if (op.chunkId == op.numChunks - 1) {
auto chunkedMessageId = std::make_shared<ChunkMessageIdImpl>(std::move(*op.chunkMessageIdList));
messageId = chunkedMessageId->build();
}
}

Expand Down
47 changes: 31 additions & 16 deletions tests/MessageChunkingTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ class MessageChunkingTest : public ::testing::TestWithParam<CompressionType> {
}

void createConsumer(const std::string& topic, Consumer& consumer) {
ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", consumer));
ConsumerConfiguration conf;
conf.setBrokerConsumerStatsCacheTimeInMs(1000);
ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", conf, consumer));
}

void createConsumer(const std::string& topic, Consumer& consumer, ConsumerConfiguration& conf) {
Expand Down Expand Up @@ -118,9 +120,6 @@ TEST_P(MessageChunkingTest, testEndToEnd) {
for (int i = 0; i < numMessages; i++) {
MessageId messageId;
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(largeMessage).build(), messageId));
auto chunkMsgId =
std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(messageId));
ASSERT_TRUE(chunkMsgId);
LOG_INFO("Send " << i << " to " << messageId);
sendMessageIds.emplace_back(messageId);
}
Expand All @@ -134,19 +133,35 @@ TEST_P(MessageChunkingTest, testEndToEnd) {
ASSERT_EQ(msg.getMessageId().batchIndex(), -1);
ASSERT_EQ(msg.getMessageId().batchSize(), 0);
auto messageId = msg.getMessageId();
auto chunkMsgId =
std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(messageId));
ASSERT_TRUE(chunkMsgId);
receivedMessageIds.emplace_back(messageId);
consumer.acknowledge(messageId);
}
ASSERT_EQ(receivedMessageIds, sendMessageIds);
ASSERT_EQ(receivedMessageIds.front().ledgerId(), receivedMessageIds.front().ledgerId());
for (int i = 0; i < sendMessageIds.size(); ++i) {
auto sendChunkMsgId =
std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(sendMessageIds[i]));
ASSERT_TRUE(sendChunkMsgId);
auto receiveChunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(
PulsarFriend::getMessageIdImpl(receivedMessageIds[i]));
ASSERT_TRUE(receiveChunkMsgId);
ASSERT_EQ(sendChunkMsgId->getChunkedMessageIds(), receiveChunkMsgId->getChunkedMessageIds());
}
ASSERT_GT(receivedMessageIds.back().entryId(), numMessages);

// Verify the cache has been cleared
auto& chunkedMessageCache = PulsarFriend::getChunkedMessageCache(consumer);
ASSERT_EQ(chunkedMessageCache.size(), 0);

BrokerConsumerStats consumerStats;
waitUntil(
std::chrono::seconds(10),
[&] {
return consumer.getBrokerConsumerStats(consumerStats) == ResultOk &&
consumerStats.getMsgBacklog() == 0;
},
1000);
ASSERT_EQ(consumerStats.getMsgBacklog(), 0);

producer.close();
consumer.close();
}
Expand Down Expand Up @@ -317,9 +332,9 @@ TEST_P(MessageChunkingTest, testSeekChunkMessages) {
TEST(ChunkMessageIdTest, testSetChunkMessageId) {
MessageId msgId;
{
ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>();
chunkMsgId->setFirstChunkMessageId(MessageIdBuilder().ledgerId(1).entryId(2).partition(3).build());
chunkMsgId->setLastChunkMessageId(MessageIdBuilder().ledgerId(4).entryId(5).partition(6).build());
ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>(
std::vector<MessageId>({MessageIdBuilder().ledgerId(1).entryId(2).partition(3).build(),
MessageIdBuilder().ledgerId(4).entryId(5).partition(6).build()}));
msgId = chunkMsgId->build();
// Test the destructor of the underlying message id should also work for the generated messageId.
}
Expand All @@ -332,13 +347,13 @@ TEST(ChunkMessageIdTest, testSetChunkMessageId) {
ASSERT_EQ(deserializedMsgId.entryId(), 5);
ASSERT_EQ(deserializedMsgId.partition(), 6);

auto chunkMsgId =
const auto& chunkMsgId =
std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(deserializedMsgId));
ASSERT_TRUE(chunkMsgId);
auto firstChunkMsgId = chunkMsgId->getFirstChunkMessageId();
ASSERT_EQ(firstChunkMsgId->ledgerId_, 1);
ASSERT_EQ(firstChunkMsgId->entryId_, 2);
ASSERT_EQ(firstChunkMsgId->partition_, 3);
auto firstChunkMsgId = chunkMsgId->getChunkedMessageIds().front();
ASSERT_EQ(firstChunkMsgId.ledgerId(), 1);
ASSERT_EQ(firstChunkMsgId.entryId(), 2);
ASSERT_EQ(firstChunkMsgId.partition(), 3);
}

// The CI env is Ubuntu 16.04, the gtest-dev version is 1.8.0 that doesn't have INSTANTIATE_TEST_SUITE_P
Expand Down

0 comments on commit eea59bb

Please sign in to comment.