Skip to content

Commit

Permalink
Revert stats related changes
Browse files Browse the repository at this point in the history
  • Loading branch information
BewareMyPower committed Dec 1, 2022
1 parent bba4802 commit c12340a
Show file tree
Hide file tree
Showing 6 changed files with 8 additions and 13 deletions.
3 changes: 1 addition & 2 deletions lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1036,8 +1036,7 @@ void ConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCall
const auto& msgIdToAck = pair.first;
const auto& readyToAck = pair.second;
if (readyToAck) {
int numRemoved = unAckedMessageTrackerPtr_->removeMessagesTill(msgIdToAck);
consumerStatsBasePtr_->messageAcknowledged(ResultOk, CommandAck_AckType_Cumulative, numRemoved);
unAckedMessageTrackerPtr_->removeMessagesTill(msgIdToAck);
ackGroupingTrackerPtr_->addAcknowledgeCumulative(msgIdToAck);
}
if (callback) {
Expand Down
2 changes: 1 addition & 1 deletion lib/UnAckedMessageTrackerDisabled.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class UnAckedMessageTrackerDisabled : public UnAckedMessageTrackerInterface {
bool add(const MessageId& m) { return false; }
bool remove(const MessageId& m) { return false; }
void remove(const MessageIdList& msgIds) {}
int removeMessagesTill(const MessageId& msgId) { return 0; }
void removeMessagesTill(const MessageId& msgId) {}
void removeTopicMessage(const std::string& topic) {}

void clear() {}
Expand Down
5 changes: 1 addition & 4 deletions lib/UnAckedMessageTrackerEnabled.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,20 +137,17 @@ long UnAckedMessageTrackerEnabled::size() {
return messageIdPartitionMap.size();
}

int UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) {
void UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) {
std::lock_guard<std::recursive_mutex> acquire(lock_);
int numRemoved = 0;
for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end();) {
MessageId msgIdInMap = it->first;
if (msgIdInMap <= msgId) {
it->second.erase(msgIdInMap);
messageIdPartitionMap.erase(it++);
numRemoved++;
} else {
it++;
}
}
return numRemoved;
}

// this is only for MultiTopicsConsumerImpl, when un-subscribe a single topic, should remove all it's message.
Expand Down
2 changes: 1 addition & 1 deletion lib/UnAckedMessageTrackerEnabled.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
bool add(const MessageId& msgId);
bool remove(const MessageId& msgId);
void remove(const MessageIdList& msgIds);
int removeMessagesTill(const MessageId& msgId);
void removeMessagesTill(const MessageId& msgId);
void removeTopicMessage(const std::string& topic);
void timeoutHandler();

Expand Down
2 changes: 1 addition & 1 deletion lib/UnAckedMessageTrackerInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class UnAckedMessageTrackerInterface {
virtual bool add(const MessageId& m) = 0;
virtual bool remove(const MessageId& m) = 0;
virtual void remove(const MessageIdList& msgIds) = 0;
virtual int removeMessagesTill(const MessageId& msgId) = 0;
virtual void removeMessagesTill(const MessageId& msgId) = 0;
virtual void clear() = 0;
// this is only for MultiTopicsConsumerImpl, when un-subscribe a single topic, should remove all it's
// message.
Expand Down
7 changes: 3 additions & 4 deletions tests/AcknowledgeTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -200,15 +200,14 @@ TEST_F(AcknowledgeTest, testBatchedMessageId) {
consumers[2].acknowledgeAndRedeliver({batchingMaxMessages + 1}, CommandAck_AckType_Cumulative);
ASSERT_EQ(consumers[2].receive(msg), ResultOk);
EXPECT_EQ(msg.getMessageId(), consumers[2].messageIdList()[batchingMaxMessages]);
// NOTE: Currently the unacked message tracker doesn't count the batch size, so the result is the number
// of entries (not messages)
ASSERT_EQ(consumers[2].getNumAcked(CommandAck_AckType_Cumulative), 1);
// TODO: Currently there is no stats for cumulative ACK
ASSERT_EQ(consumers[2].getNumAcked(CommandAck_AckType_Cumulative), 0);

// the whole 2nd batch is acknowledged
consumers[3].acknowledgeAndRedeliver({batchingMaxMessages + 2}, CommandAck_AckType_Cumulative);
ASSERT_EQ(consumers[3].receive(msg), ResultOk);
EXPECT_EQ(msg.getMessageId(), consumers[3].messageIdList()[batchingMaxMessages * 2]);
ASSERT_EQ(consumers[3].getNumAcked(CommandAck_AckType_Cumulative), 2);
ASSERT_EQ(consumers[3].getNumAcked(CommandAck_AckType_Cumulative), 0);
}

INSTANTIATE_TEST_SUITE_P(BasicEndToEndTest, AcknowledgeTest, testing::Values(100, 0));

0 comments on commit c12340a

Please sign in to comment.