diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index 6678c6df254b0..adb48499b5468 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -101,7 +101,14 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker private final BucketDelayedMessageIndexStats stats; - private CompletableFuture pendingLoad = null; + private State state = State.READY; + + enum State { + RECOVERING, + LOADING, + READY, + ClOSE + } public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis, @@ -136,6 +143,7 @@ public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispat } private synchronized long recoverBucketSnapshot() throws RuntimeException { + state = State.RECOVERING; ManagedCursor cursor = this.lastMutableBucket.getCursor(); FutureUtil.Sequencer sequencer = this.lastMutableBucket.getSequencer(); Map, ImmutableBucket> toBeDeletedBucketMap = new HashMap<>(); @@ -319,6 +327,10 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver return false; } + if (state == State.RECOVERING) { + return false; + } + boolean existBucket = findImmutableBucket(ledgerId).isPresent(); // Create bucket snapshot @@ -541,7 +553,7 @@ public long getBufferMemoryUsage() { @Override public synchronized NavigableSet getScheduledMessages(int maxMessages) { - if (!checkPendingOpDone()) { + if (state != State.READY) { if (log.isDebugEnabled()) { log.debug("[{}] Skip getScheduledMessages to wait for bucket snapshot load finish.", dispatcher.getName()); @@ -588,7 +600,7 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa long loadStartTime = System.currentTimeMillis(); stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load); - CompletableFuture loadFuture = pendingLoad = bucket.asyncLoadNextBucketSnapshotEntry() + CompletableFuture loadFuture = bucket.asyncLoadNextBucketSnapshotEntry() .thenAccept(indexList -> { synchronized (BucketDelayedDeliveryTracker.this) { this.snapshotSegmentLastIndexTable.remove(ledgerId, entryId); @@ -632,7 +644,7 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa } }); - if (!checkPendingOpDone() || loadFuture.isCompletedExceptionally()) { + if (!loadFuture.isDone() || loadFuture.isCompletedExceptionally()) { break; } } @@ -651,17 +663,17 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa return positions; } - private synchronized boolean checkPendingOpDone() { - if (pendingLoad == null || pendingLoad.isDone()) { - pendingLoad = null; - return true; - } - return false; - } +// private synchronized boolean checkPendingOpDone() { +// if (pendingLoad == null || pendingLoad.isDone()) { +// pendingLoad = null; +// return true; +// } +// return false; +// } @Override public boolean shouldPauseAllDeliveries() { - return false; + return state == State.READY; } @Override