diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index f3839f750abaa..099cc5750888c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -155,20 +155,20 @@ public PendingAckHandleImpl(PersistentSubscription persistentSubscription) { .getBrokerService().getPulsar().getTransactionPendingAckStoreProvider(); pendingAckStoreProvider.checkInitializedBefore(persistentSubscription) - .thenAccept(init -> { + .thenAcceptAsync(init -> { if (init) { initPendingAckStore(); } else { completeHandleFuture(); } - }) - .exceptionally(e -> { + }, internalPinnedExecutor) + .exceptionallyAsync(e -> { Throwable t = FutureUtil.unwrapCompletionException(e); changeToErrorState(); exceptionHandleFuture(t); this.pendingAckStoreFuture.completeExceptionally(t); return null; - }); + }, internalPinnedExecutor); } private void initPendingAckStore() { @@ -937,18 +937,16 @@ public TransactionPendingAckStats getStats(boolean lowWaterMarks) { return transactionPendingAckStats; } - public synchronized void completeHandleFuture() { - if (!this.pendingAckHandleCompletableFuture.isDone()) { - this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this); - } - if (recoverTime.getRecoverStartTime() != 0L) { + public void completeHandleFuture() { + this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this); + if (recoverTime.getRecoverStartTime() != 0L && recoverTime.getRecoverEndTime() == 0L) { recoverTime.setRecoverEndTime(System.currentTimeMillis()); } } - public synchronized void exceptionHandleFuture(Throwable t) { - if (!this.pendingAckHandleCompletableFuture.isDone()) { - this.pendingAckHandleCompletableFuture.completeExceptionally(t); + public void exceptionHandleFuture(Throwable t) { + final boolean completedNow = this.pendingAckHandleCompletableFuture.completeExceptionally(t); + if (completedNow) { recoverTime.setRecoverEndTime(System.currentTimeMillis()); } }