Skip to content

Commit

Permalink
Do not process acks in the Netty thread (apache#271)
Browse files Browse the repository at this point in the history
(cherry picked from commit 5ed7dd1)
  • Loading branch information
eolivelli authored and dlg99 committed May 28, 2024
1 parent 82025b8 commit 7cb7da8
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -557,22 +557,29 @@ private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String,

totalAckCount += ackedCount;
}
subscription.acknowledgeMessage(positionsAcked, AckType.Individual, properties);
CompletableFuture<Long> completableFuture = new CompletableFuture<>();
completableFuture.complete(totalAckCount);
if (isTransactionEnabled() && Subscription.isIndividualAckMode(subType)) {
completableFuture.whenComplete((v, e) -> positionsAcked.forEach(position -> {
//check if the position can remove from the consumer pending acks.
// the bit set is empty in pending ack handle.
if (((PositionImpl) position).getAckSet() != null) {
if (((PersistentSubscription) subscription)
.checkIsCanDeleteConsumerPendingAck((PositionImpl) position)) {
removePendingAcks((PositionImpl) position);

// we use acknowledgeMessageAsync because we don't want to perform the
// flush of the cursor (that may take much time in case of a long list of individuallyDeletedMessages)
// in the Netty eventloop thread
long totalCount = totalAckCount;
return subscription.acknowledgeMessageAsync(positionsAcked, AckType.Individual, properties)
.thenCompose(___ -> {
CompletableFuture<Long> completableFuture = new CompletableFuture<>();
completableFuture.complete(totalCount);
if (isTransactionEnabled() && Subscription.isIndividualAckMode(subType)) {
completableFuture.whenComplete((v, e) -> positionsAcked.forEach(position -> {
//check if the position can remove from the consumer pending acks.
// the bit set is empty in pending ack handle.
if (((PositionImpl) position).getAckSet() != null) {
if (((PersistentSubscription) subscription)
.checkIsCanDeleteConsumerPendingAck((PositionImpl) position)) {
removePendingAcks((PositionImpl) position);
}
}
}
}));
}
return completableFuture;
}));
}
return completableFuture;
});
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ default void removeConsumer(Consumer consumer) throws BrokerServiceException {

void acknowledgeMessage(List<Position> positions, AckType ackType, Map<String, Long> properties);

default CompletableFuture<Void> acknowledgeMessageAsync(List<Position> positions,
AckType ackType, Map<String, Long> properties) {
acknowledgeMessage(positions, ackType, properties);
return CompletableFuture.completedFuture(null);
}

String getTopicName();

boolean isReplicated();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,15 @@ public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
dispatcher.consumerFlow(consumer, additionalNumberOfMessages);
}

@Override
public CompletableFuture<Void> acknowledgeMessageAsync(List<Position> positions,
AckType ackType, Map<String, Long> properties) {
// which is the best thread ?
return CompletableFuture.runAsync(() -> {
acknowledgeMessage(positions, ackType, properties);
}, topic.getBrokerService().pulsar().getExecutor());
}

@Override
public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<String, Long> properties) {
cursor.updateLastActive();
Expand Down

0 comments on commit 7cb7da8

Please sign in to comment.