Skip to content

Commit

Permalink
[fix][cpp] Fix issue where unexpected ack timeout occurred (#17503)
Browse files Browse the repository at this point in the history
(cherry picked from commit a98f025)
  • Loading branch information
Masahiro Sakamoto authored and congbobo184 committed Dec 7, 2022
1 parent 8036bd0 commit 3b8ebf8
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 4 deletions.
17 changes: 13 additions & 4 deletions pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,8 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message&
ReceiveCallback callback = pendingReceives_.front();
pendingReceives_.pop();
lock.unlock();
unAckedMessageTrackerPtr_->add(msg.getMessageId());
listenerExecutor_->postWork(std::bind(callback, ResultOk, msg));
listenerExecutor_->postWork(std::bind(&MultiTopicsConsumerImpl::notifyPendingReceivedCallback,
shared_from_this(), ResultOk, msg, callback));
} else {
if (messages_.full()) {
lock.unlock();
Expand All @@ -469,7 +469,7 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message&
void MultiTopicsConsumerImpl::internalListener(Consumer consumer) {
Message m;
messages_.pop(m);

unAckedMessageTrackerPtr_->add(m.getMessageId());
try {
messageListener_(Consumer(shared_from_this()), m);
} catch (const std::exception& e) {
Expand Down Expand Up @@ -535,11 +535,20 @@ void MultiTopicsConsumerImpl::failPendingReceiveCallback() {
while (!pendingReceives_.empty()) {
ReceiveCallback callback = pendingReceives_.front();
pendingReceives_.pop();
listenerExecutor_->postWork(std::bind(callback, ResultAlreadyClosed, msg));
listenerExecutor_->postWork(std::bind(&MultiTopicsConsumerImpl::notifyPendingReceivedCallback,
shared_from_this(), ResultAlreadyClosed, msg, callback));
}
lock.unlock();
}

void MultiTopicsConsumerImpl::notifyPendingReceivedCallback(Result result, Message& msg,
const ReceiveCallback& callback) {
if (result == ResultOk) {
unAckedMessageTrackerPtr_->add(msg.getMessageId());
}
callback(result, msg);
}

void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) {
if (state_ != Ready) {
callback(ResultAlreadyClosed);
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
void internalListener(Consumer consumer);
void receiveMessages();
void failPendingReceiveCallback();
void notifyPendingReceivedCallback(Result result, Message& message, const ReceiveCallback& callback);

void handleOneTopicSubscribed(Result result, Consumer consumer, const std::string& topic,
std::shared_ptr<std::atomic<int>> topicsNeedCreate);
Expand Down
71 changes: 71 additions & 0 deletions pulsar-client-cpp/tests/ConsumerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,77 @@ TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery) {
client.close();
}

TEST(ConsumerTest, testPartitionedConsumerUnexpectedAckTimeout) {
ClientConfiguration clientConfig;
clientConfig.setMessageListenerThreads(1);
Client client(lookupUrl, clientConfig);

const std::string partitionedTopic =
"testPartitionedConsumerUnexpectedAckTimeout" + std::to_string(time(nullptr));
std::string subName = "sub";
constexpr int numPartitions = 2;
constexpr int numOfMessages = 3;
constexpr int unAckedMessagesTimeoutMs = 10000;
constexpr int tickDurationInMs = 1000;
pulsar::Latch latch(numOfMessages);
std::vector<Message> messages;
std::mutex mtx;

int res =
makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions",
std::to_string(numPartitions));
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;

Consumer consumer;
ConsumerConfiguration consumerConfig;
consumerConfig.setConsumerType(ConsumerShared);
consumerConfig.setUnAckedMessagesTimeoutMs(unAckedMessagesTimeoutMs);
consumerConfig.setTickDurationInMs(tickDurationInMs);
consumerConfig.setMessageListener([&](Consumer cons, const Message& msg) {
// acknowledge received messages immediately, so no ack timeout is expected
ASSERT_EQ(ResultOk, cons.acknowledge(msg.getMessageId()));
ASSERT_EQ(0, msg.getRedeliveryCount());

{
std::lock_guard<std::mutex> lock(mtx);
messages.emplace_back(msg);
}

if (latch.getCount() > 0) {
std::this_thread::sleep_for(
std::chrono::milliseconds(unAckedMessagesTimeoutMs + tickDurationInMs * 2));
latch.countdown();
}
});
ASSERT_EQ(ResultOk, client.subscribe(partitionedTopic, subName, consumerConfig, consumer));

// send messages
ProducerConfiguration producerConfig;
producerConfig.setBatchingEnabled(false);
producerConfig.setBlockIfQueueFull(true);
producerConfig.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(partitionedTopic, producerConfig, producer));
std::string prefix = "message-";
for (int i = 0; i < numOfMessages; i++) {
std::string messageContent = prefix + std::to_string(i);
Message msg = MessageBuilder().setContent(messageContent).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}
producer.close();

bool wasUnblocked = latch.wait(
std::chrono::milliseconds((unAckedMessagesTimeoutMs + tickDurationInMs * 2) * numOfMessages + 5000));
ASSERT_TRUE(wasUnblocked);

std::this_thread::sleep_for(std::chrono::milliseconds(5000));
// messages are expected not to be redelivered
ASSERT_EQ(numOfMessages, messages.size());

consumer.close();
client.close();
}

TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery) {
Client client(lookupUrl);
const std::string nonPartitionedTopic =
Expand Down

0 comments on commit 3b8ebf8

Please sign in to comment.