From 7cb7da8d60a69fc939267ea00719f666b5ef66ff Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Thu, 16 May 2024 17:05:24 +0200 Subject: [PATCH] Do not process acks in the Netty thread (#271) (cherry picked from commit 5ed7dd102577bb9d029e71042b20c760fa1a4ea5) --- .../pulsar/broker/service/Consumer.java | 37 +++++++++++-------- .../pulsar/broker/service/Subscription.java | 6 +++ .../persistent/PersistentSubscription.java | 9 +++++ 3 files changed, 37 insertions(+), 15 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index c9f417c4bc4f7..3232a01829a63 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -557,22 +557,29 @@ private CompletableFuture individualAckNormal(CommandAck ack, Map completableFuture = new CompletableFuture<>(); - completableFuture.complete(totalAckCount); - if (isTransactionEnabled() && Subscription.isIndividualAckMode(subType)) { - completableFuture.whenComplete((v, e) -> positionsAcked.forEach(position -> { - //check if the position can remove from the consumer pending acks. - // the bit set is empty in pending ack handle. - if (((PositionImpl) position).getAckSet() != null) { - if (((PersistentSubscription) subscription) - .checkIsCanDeleteConsumerPendingAck((PositionImpl) position)) { - removePendingAcks((PositionImpl) position); + + // we use acknowledgeMessageAsync because we don't want to perform the + // flush of the cursor (that may take much time in case of a long list of individuallyDeletedMessages) + // in the Netty eventloop thread + long totalCount = totalAckCount; + return subscription.acknowledgeMessageAsync(positionsAcked, AckType.Individual, properties) + .thenCompose(___ -> { + CompletableFuture completableFuture = new CompletableFuture<>(); + completableFuture.complete(totalCount); + if (isTransactionEnabled() && Subscription.isIndividualAckMode(subType)) { + completableFuture.whenComplete((v, e) -> positionsAcked.forEach(position -> { + //check if the position can remove from the consumer pending acks. + // the bit set is empty in pending ack handle. + if (((PositionImpl) position).getAckSet() != null) { + if (((PersistentSubscription) subscription) + .checkIsCanDeleteConsumerPendingAck((PositionImpl) position)) { + removePendingAcks((PositionImpl) position); + } } - } - })); - } - return completableFuture; + })); + } + return completableFuture; + }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java index 61107b7b0dbb3..13f5f0eeeca2b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java @@ -51,6 +51,12 @@ default void removeConsumer(Consumer consumer) throws BrokerServiceException { void acknowledgeMessage(List positions, AckType ackType, Map properties); + default CompletableFuture acknowledgeMessageAsync(List positions, + AckType ackType, Map properties) { + acknowledgeMessage(positions, ackType, properties); + return CompletableFuture.completedFuture(null); + } + String getTopicName(); boolean isReplicated(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index dbbf92aa76dce..5cdad418d6903 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -367,6 +367,15 @@ public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) { dispatcher.consumerFlow(consumer, additionalNumberOfMessages); } + @Override + public CompletableFuture acknowledgeMessageAsync(List positions, + AckType ackType, Map properties) { + // which is the best thread ? + return CompletableFuture.runAsync(() -> { + acknowledgeMessage(positions, ackType, properties); + }, topic.getBrokerService().pulsar().getExecutor()); + } + @Override public void acknowledgeMessage(List positions, AckType ackType, Map properties) { cursor.updateLastActive();