diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index 5f3ad61ac86a8f..87fa51d51b5b31 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -21,7 +21,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELAYED_BUCKET_KEY_PREFIX; import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELIMITER; -import static org.apache.pulsar.broker.delayed.bucket.Bucket.MaxRetryTimes; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.HashBasedTable; import com.google.common.collect.Range; @@ -100,6 +99,8 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker private final BucketDelayedMessageIndexStats stats; + private CompletableFuture pendingLoad = null; + public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis, boolean isDelayedDeliveryDeliverAtTimeStrict, @@ -527,17 +528,21 @@ protected long nextDeliveryTime() { } @Override - public synchronized long getNumberOfDelayedMessages() { + public long getNumberOfDelayedMessages() { return numberDelayedMessages; } @Override - public synchronized long getBufferMemoryUsage() { + public long getBufferMemoryUsage() { return this.lastMutableBucket.getBufferMemoryUsage() + sharedBucketPriorityQueue.bytesCapacity(); } @Override public synchronized NavigableSet getScheduledMessages(int maxMessages) { + if (!checkPendingOpDone()) { + return new TreeSet<>(); + } + long cutoffTime = getCutoffTime(); lastMutableBucket.moveScheduledMessageToSharedQueue(cutoffTime, sharedBucketPriorityQueue); @@ -556,6 +561,7 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa ImmutableBucket bucket = snapshotSegmentLastIndexTable.get(ledgerId, entryId); if (bucket != null && immutableBuckets.asMapOfRanges().containsValue(bucket)) { + // All message of current snapshot segment are scheduled, try load next snapshot segment if (bucket.merging) { log.info("[{}] Skip load to wait for bucket snapshot merge finish, bucketKey:{}", dispatcher.getName(), bucket.bucketKey()); @@ -567,64 +573,67 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa log.debug("[{}] Loading next bucket snapshot segment, bucketKey: {}, nextSegmentEntryId: {}", dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1); } - // All message of current snapshot segment are scheduled, load next snapshot segment - // TODO make it asynchronous and not blocking this process - try { - boolean createFutureDone = bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone(); - - if (!createFutureDone) { - log.info("[{}] Skip load to wait for bucket snapshot create finish, bucketKey:{}", - dispatcher.getName(), bucket.bucketKey()); - break; - } + boolean createFutureDone = bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone(); + if (!createFutureDone) { + log.info("[{}] Skip load to wait for bucket snapshot create finish, bucketKey:{}", + dispatcher.getName(), bucket.bucketKey()); + break; + } - if (bucket.currentSegmentEntryId == bucket.lastSegmentEntryId) { - immutableBuckets.asMapOfRanges().remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId)); - bucket.asyncDeleteBucketSnapshot(stats); - continue; - } + if (bucket.currentSegmentEntryId == bucket.lastSegmentEntryId) { + immutableBuckets.asMapOfRanges().remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId)); + bucket.asyncDeleteBucketSnapshot(stats); + continue; + } - long loadStartTime = System.currentTimeMillis(); - stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load); - bucket.asyncLoadNextBucketSnapshotEntry().thenAccept(indexList -> { - synchronized (BucketDelayedDeliveryTracker.this) { - this.snapshotSegmentLastIndexTable.remove(ledgerId, entryId); - if (CollectionUtils.isEmpty(indexList)) { - immutableBuckets.asMapOfRanges() - .remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId)); - bucket.asyncDeleteBucketSnapshot(stats); - return; - } - DelayedMessageIndexBucketSnapshotFormat.DelayedIndex - lastDelayedIndex = indexList.get(indexList.size() - 1); - this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(), - lastDelayedIndex.getEntryId(), bucket); - for (DelayedMessageIndexBucketSnapshotFormat.DelayedIndex index : indexList) { - sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(), - index.getEntryId()); - } + long loadStartTime = System.currentTimeMillis(); + stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load); + CompletableFuture loadFuture = pendingLoad = bucket.asyncLoadNextBucketSnapshotEntry() + .thenAccept(indexList -> { + synchronized (BucketDelayedDeliveryTracker.this) { + this.snapshotSegmentLastIndexTable.remove(ledgerId, entryId); + if (CollectionUtils.isEmpty(indexList)) { + immutableBuckets.asMapOfRanges() + .remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId)); + bucket.asyncDeleteBucketSnapshot(stats); + return; + } + DelayedMessageIndexBucketSnapshotFormat.DelayedIndex + lastDelayedIndex = indexList.get(indexList.size() - 1); + this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(), + lastDelayedIndex.getEntryId(), bucket); + for (DelayedMessageIndexBucketSnapshotFormat.DelayedIndex index : indexList) { + sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(), + index.getEntryId()); } - }).whenComplete((__, ex) -> { - if (ex != null) { + } + }).whenComplete((__, ex) -> { + if (ex != null) { + synchronized (this) { // Back bucket state bucket.setCurrentSegmentEntryId(preSegmentEntryId); + } - log.error("[{}] Failed to load bucket snapshot segment, bucketKey: {}, segmentEntryId: {}", - dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1, ex); + log.error("[{}] Failed to load bucket snapshot segment, bucketKey: {}, segmentEntryId: {}", + dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1, ex); - stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.load); - } else { - log.info("[{}] Load next bucket snapshot segment finish, bucketKey: {}, segmentEntryId: {}", - dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1); + stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.load); + } else { + log.info("[{}] Load next bucket snapshot segment finish, bucketKey: {}, segmentEntryId: {}", + dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1); - stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load, - System.currentTimeMillis() - loadStartTime); + stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load, + System.currentTimeMillis() - loadStartTime); + } + synchronized (this) { + if (timeout != null) { + timeout.cancel(); } - }).get(AsyncOperationTimeoutSeconds * (MaxRetryTimes + 2), TimeUnit.SECONDS); - } catch (Exception e) { - // Ignore exception to reload this segment on the next schedule. - log.error("[{}] An exception occurs when load next bucket snapshot, bucketKey:{},segmentEntryId:{}", - dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1, e); + timeout = timer.newTimeout(this, tickTimeMillis, TimeUnit.MILLISECONDS); + } + }); + + if (!checkPendingOpDone() || loadFuture.isCompletedExceptionally()) { break; } } @@ -644,6 +653,14 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa return positions; } + private synchronized boolean checkPendingOpDone() { + if (pendingLoad == null || pendingLoad.isDone()) { + pendingLoad = null; + return true; + } + return false; + } + @Override public boolean shouldPauseAllDeliveries() { return false; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java index 95234d688f6a0a..39b3992fbd1953 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java @@ -39,6 +39,7 @@ import java.util.NavigableSet; import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -197,9 +198,11 @@ public void testRecoverSnapshot(BucketDelayedDeliveryTracker tracker) { }); assertTrue(tracker.hasMessageAvailable()); - Set scheduledMessages = tracker.getScheduledMessages(100); - - assertEquals(scheduledMessages.size(), 1); + Set scheduledMessages = new TreeSet<>(); + Awaitility.await().untilAsserted(() -> { + scheduledMessages.addAll(tracker.getScheduledMessages(100)); + assertEquals(scheduledMessages.size(), 1); + }); tracker.addMessage(101, 101, 101 * 10); @@ -216,12 +219,15 @@ public void testRecoverSnapshot(BucketDelayedDeliveryTracker tracker) { clockTime.set(100 * 10); assertTrue(tracker2.hasMessageAvailable()); - scheduledMessages = tracker2.getScheduledMessages(70); + Set scheduledMessages2 = new TreeSet<>(); - assertEquals(scheduledMessages.size(), 70); + Awaitility.await().untilAsserted(() -> { + scheduledMessages2.addAll(tracker2.getScheduledMessages(70)); + assertEquals(scheduledMessages2.size(), 70); + }); int i = 31; - for (PositionImpl scheduledMessage : scheduledMessages) { + for (PositionImpl scheduledMessage : scheduledMessages2) { assertEquals(scheduledMessage, PositionImpl.get(i, i)); i++; } @@ -298,7 +304,11 @@ public void testMergeSnapshot(final BucketDelayedDeliveryTracker tracker) { clockTime.set(110 * 10); - NavigableSet scheduledMessages = tracker2.getScheduledMessages(110); + NavigableSet scheduledMessages = new TreeSet<>(); + Awaitility.await().untilAsserted(() -> { + scheduledMessages.addAll(tracker2.getScheduledMessages(110)); + assertEquals(scheduledMessages.size(), 110); + }); for (int i = 1; i <= 110; i++) { PositionImpl position = scheduledMessages.pollFirst(); assertEquals(position, PositionImpl.get(i, i)); @@ -370,7 +380,11 @@ public void testWithBkException(final BucketDelayedDeliveryTracker tracker) { assertEquals(tracker2.getScheduledMessages(100).size(), 0); - assertEquals(tracker2.getScheduledMessages(100).size(), delayedMessagesInSnapshotValue); + Set scheduledMessages = new TreeSet<>(); + Awaitility.await().untilAsserted(() -> { + scheduledMessages.addAll(tracker2.getScheduledMessages(100)); + assertEquals(scheduledMessages.size(), delayedMessagesInSnapshotValue); + }); assertTrue(mockBucketSnapshotStorage.createExceptionQueue.isEmpty()); assertTrue(mockBucketSnapshotStorage.getMetaDataExceptionQueue.isEmpty());