Skip to content

Commit

Permalink
[fix] Fix acknowledge MessageId list does not work when ackGroupingTi…
Browse files Browse the repository at this point in the history
…meMs is 0 (apache#128)
  • Loading branch information
BewareMyPower authored and RobertIndie committed Dec 1, 2022
1 parent 411843e commit f9cca17
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 125 deletions.
24 changes: 22 additions & 2 deletions lib/AckGroupingTracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,19 @@ bool AckGroupingTracker::doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uin
return true;
}

static std::ostream& operator<<(std::ostream& os, const std::set<MessageId>& msgIds) {
bool first = true;
for (auto&& msgId : msgIds) {
if (first) {
first = false;
} else {
os << ", ";
}
os << "[" << msgId << "]";
}
return os;
}

bool AckGroupingTracker::doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t consumerId,
const std::set<MessageId>& msgIds) {
auto cnx = connWeakPtr.lock();
Expand All @@ -54,8 +67,15 @@ bool AckGroupingTracker::doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uin
return false;
}

for (const auto& msgId : msgIds) {
sendAck(cnx, consumerId, msgId, CommandAck_AckType_Individual);
if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) {
auto cmd = Commands::newMultiMessageAck(consumerId, msgIds);
cnx->sendCommand(cmd);
LOG_DEBUG("ACK request is sent for " << msgIds.size() << " messages: " << msgIds);
} else {
// Broker does not support multi-message ACK, use multiple individual ACKs instead.
for (const auto& msgId : msgIds) {
sendAck(cnx, consumerId, msgId, CommandAck_AckType_Individual);
}
}
return true;
}
Expand Down
8 changes: 8 additions & 0 deletions lib/AckGroupingTrackerDisabled.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ void AckGroupingTrackerDisabled::addAcknowledge(const MessageId& msgId) {
this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgId, CommandAck_AckType_Individual);
}

void AckGroupingTrackerDisabled::addAcknowledgeList(const MessageIdList& msgIds) {
std::set<MessageId> msgIdSet;
for (auto&& msgId : msgIds) {
msgIdSet.emplace(msgId);
}
this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgIdSet);
}

void AckGroupingTrackerDisabled::addAcknowledgeCumulative(const MessageId& msgId) {
this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgId, CommandAck_AckType_Cumulative);
}
Expand Down
1 change: 1 addition & 0 deletions lib/AckGroupingTrackerDisabled.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class AckGroupingTrackerDisabled : public AckGroupingTracker {
AckGroupingTrackerDisabled(HandlerBase& handler, uint64_t consumerId);

void addAcknowledge(const MessageId& msgId) override;
void addAcknowledgeList(const MessageIdList& msgIds) override;
void addAcknowledgeCumulative(const MessageId& msgId) override;

private:
Expand Down
8 changes: 1 addition & 7 deletions lib/AckGroupingTrackerEnabled.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,7 @@ void AckGroupingTrackerEnabled::flush() {
// Send ACK for individual ACK requests.
std::lock_guard<std::recursive_mutex> lock(this->rmutexPendingIndAcks_);
if (!this->pendingIndividualAcks_.empty()) {
if (Commands::peerSupportsMultiMessageAcknowledgement(cnx->getServerProtocolVersion())) {
auto cmd = Commands::newMultiMessageAck(this->consumerId_, this->pendingIndividualAcks_);
cnx->sendCommand(cmd);
} else {
// Broker does not support multi-message ACK, use multiple individual ACK instead.
this->doImmediateAck(cnx, this->consumerId_, this->pendingIndividualAcks_);
}
this->doImmediateAck(cnx, consumerId_, this->pendingIndividualAcks_);
this->pendingIndividualAcks_.clear();
}
}
Expand Down
154 changes: 154 additions & 0 deletions tests/AcknowledgeTest.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <gtest/gtest.h>
#include <pulsar/Client.h>

#include "HttpHelper.h"
#include "PulsarFriend.h"
#include "lib/LogUtils.h"

DECLARE_LOG_OBJECT()

using namespace pulsar;

static std::string lookupUrl = "pulsar://localhost:6650";
static std::string adminUrl = "http://localhost:8080/";

extern std::string unique_str();

class AcknowledgeTest : public testing::TestWithParam<int> {};

TEST_P(AcknowledgeTest, testAckMsgList) {
Client client(lookupUrl);
auto clientImplPtr = PulsarFriend::getClientImplPtr(client);

constexpr auto numMsg = 100;
std::string uniqueChunk = unique_str();
std::string topicName = "persistent://public/default/test-ack-msgs" + uniqueChunk;
const std::string subName = "sub-ack-list";

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));

ConsumerConfiguration consumerConfig;
consumerConfig.setAckGroupingMaxSize(numMsg);
consumerConfig.setAckGroupingTimeMs(GetParam());
consumerConfig.setUnAckedMessagesTimeoutMs(10000);
Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumerConfig, consumer));

// Sending and receiving messages.
for (auto count = 0; count < numMsg; ++count) {
Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}

std::vector<MessageId> recvMsgId;
for (auto count = 0; count < numMsg; ++count) {
Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
recvMsgId.emplace_back(msg.getMessageId());
}
ASSERT_EQ(ResultOk, consumer.acknowledge(recvMsgId));

// try redeliver unack messages.
consumer.redeliverUnacknowledgedMessages();

auto consumerStats = PulsarFriend::getConsumerStatsPtr(consumer);
auto ackMap = consumerStats->getAckedMsgMap();
unsigned long totalAck = ackMap[std::make_pair(ResultOk, CommandAck_AckType_Individual)];
ASSERT_EQ(totalAck, numMsg);

Message msg;
auto ret = consumer.receive(msg, 1000);
ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId();

producer.close();
consumer.close();
client.close();
}

TEST_P(AcknowledgeTest, testAckMsgListWithMultiConsumer) {
Client client(lookupUrl);
auto clientImplPtr = PulsarFriend::getClientImplPtr(client);

std::string uniqueChunk = unique_str();
std::string topicName = "persistent://public/default/test-ack-msgs" + uniqueChunk;

// call admin api to make it partitioned
std::string url =
adminUrl + "admin/v2/persistent/public/default/test-ack-msgs" + uniqueChunk + "/partitions";
int res = makePutRequest(url, "5");
LOG_INFO("res = " << res);
ASSERT_FALSE(res != 204 && res != 409);

constexpr auto numMsg = 100;
const std::string subName = "sub-ack-list";

Producer producer;
ProducerConfiguration producerConfig;
// Turn off batch to ensure even distribution
producerConfig.setBatchingEnabled(false);
producerConfig.setPartitionsRoutingMode(pulsar::ProducerConfiguration::RoundRobinDistribution);
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, producer));

ConsumerConfiguration consumerConfig;
// set ack grouping max size is 10
consumerConfig.setAckGroupingMaxSize(10);
consumerConfig.setAckGroupingTimeMs(GetParam());
consumerConfig.setUnAckedMessagesTimeoutMs(10000);
Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumerConfig, consumer));

// Sending and receiving messages.
for (auto count = 0; count < numMsg; ++count) {
Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}

std::vector<MessageId> recvMsgId;
for (auto count = 0; count < numMsg; ++count) {
Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
recvMsgId.emplace_back(msg.getMessageId());
}
ASSERT_EQ(ResultOk, consumer.acknowledge(recvMsgId));

// try redeliver unack messages.
consumer.redeliverUnacknowledgedMessages();

// assert stats
unsigned long totalAck = 0;
auto consumerStatsList = PulsarFriend::getConsumerStatsPtrList(consumer);
for (auto consumerStats : consumerStatsList) {
auto ackMap = consumerStats->getAckedMsgMap();
totalAck += ackMap[std::make_pair(ResultOk, CommandAck_AckType_Individual)];
}
ASSERT_EQ(totalAck, numMsg);

Message msg;
auto ret = consumer.receive(msg, 1000);
ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId();

producer.close();
consumer.close();
client.close();
}

INSTANTIATE_TEST_SUITE_P(BasicEndToEndTest, AcknowledgeTest, testing::Values(100, 0));
118 changes: 2 additions & 116 deletions tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <pulsar/Client.h>

#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstring>
#include <functional>
Expand Down Expand Up @@ -58,7 +59,7 @@ static int globalCount = 0;
static long globalResendMessageCount = 0;
std::string lookupUrl = "pulsar://localhost:6650";
static std::string adminUrl = "http://localhost:8080/";
static int uniqueCounter = 0;
static std::atomic_int uniqueCounter{0};

std::string unique_str() {
long nanos = std::chrono::duration_cast<std::chrono::milliseconds>(
Expand Down Expand Up @@ -4276,118 +4277,3 @@ void testBatchReceiveClose(bool multiConsumer) {
TEST(BasicEndToEndTest, testBatchReceiveClose) { testBatchReceiveClose(false); }

TEST(BasicEndToEndTest, testBatchReceiveCloseWithMultiConsumer) { testBatchReceiveClose(true); }

TEST(BasicEndToEndTest, testAckMsgList) {
Client client(lookupUrl);
auto clientImplPtr = PulsarFriend::getClientImplPtr(client);

constexpr auto numMsg = 100;
std::string uniqueChunk = unique_str();
std::string topicName = "persistent://public/default/test-ack-msgs" + uniqueChunk;
const std::string subName = "sub-ack-list";

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));

ConsumerConfiguration consumerConfig;
consumerConfig.setAckGroupingMaxSize(numMsg);
consumerConfig.setUnAckedMessagesTimeoutMs(10000);
Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumerConfig, consumer));

// Sending and receiving messages.
for (auto count = 0; count < numMsg; ++count) {
Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}

std::vector<MessageId> recvMsgId;
for (auto count = 0; count < numMsg; ++count) {
Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
recvMsgId.emplace_back(msg.getMessageId());
}
ASSERT_EQ(ResultOk, consumer.acknowledge(recvMsgId));

// try redeliver unack messages.
consumer.redeliverUnacknowledgedMessages();

auto consumerStats = PulsarFriend::getConsumerStatsPtr(consumer);
auto ackMap = consumerStats->getAckedMsgMap();
unsigned long totalAck = ackMap[std::make_pair(ResultOk, CommandAck_AckType_Individual)];
ASSERT_EQ(totalAck, numMsg);

Message msg;
auto ret = consumer.receive(msg, 1000);
ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId();

producer.close();
consumer.close();
client.close();
}

TEST(BasicEndToEndTest, testAckMsgListWithMultiConsumer) {
Client client(lookupUrl);
auto clientImplPtr = PulsarFriend::getClientImplPtr(client);

std::string uniqueChunk = unique_str();
std::string topicName = "persistent://public/default/test-ack-msgs" + uniqueChunk;

// call admin api to make it partitioned
std::string url =
adminUrl + "admin/v2/persistent/public/default/test-ack-msgs" + uniqueChunk + "/partitions";
int res = makePutRequest(url, "5");
LOG_INFO("res = " << res);
ASSERT_FALSE(res != 204 && res != 409);

constexpr auto numMsg = 100;
const std::string subName = "sub-ack-list";

Producer producer;
ProducerConfiguration producerConfig;
// Turn off batch to ensure even distribution
producerConfig.setBatchingEnabled(false);
producerConfig.setPartitionsRoutingMode(pulsar::ProducerConfiguration::RoundRobinDistribution);
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, producer));

ConsumerConfiguration consumerConfig;
// set ack grouping max size is 10
consumerConfig.setAckGroupingMaxSize(10);
consumerConfig.setUnAckedMessagesTimeoutMs(10000);
Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumerConfig, consumer));

// Sending and receiving messages.
for (auto count = 0; count < numMsg; ++count) {
Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}

std::vector<MessageId> recvMsgId;
for (auto count = 0; count < numMsg; ++count) {
Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
recvMsgId.emplace_back(msg.getMessageId());
}
ASSERT_EQ(ResultOk, consumer.acknowledge(recvMsgId));

// try redeliver unack messages.
consumer.redeliverUnacknowledgedMessages();

// assert stats
unsigned long totalAck = 0;
auto consumerStatsList = PulsarFriend::getConsumerStatsPtrList(consumer);
for (auto consumerStats : consumerStatsList) {
auto ackMap = consumerStats->getAckedMsgMap();
totalAck += ackMap[std::make_pair(ResultOk, CommandAck_AckType_Individual)];
}
ASSERT_EQ(totalAck, numMsg);

Message msg;
auto ret = consumer.receive(msg, 1000);
ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId();

producer.close();
consumer.close();
client.close();
}

0 comments on commit f9cca17

Please sign in to comment.