From 1ee04ac71811f22c9750c5001973f115d983e7fe Mon Sep 17 00:00:00 2001 From: coderzc Date: Thu, 13 Apr 2023 12:15:55 +0800 Subject: [PATCH 1/9] Ensure previous delayed index be removed from snapshotSegmentLastIndexTable --- .../broker/delayed/bucket/BucketDelayedDeliveryTracker.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index a34bd51af98e4..0ab3e03af665b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -595,6 +595,7 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa } DelayedMessageIndexBucketSnapshotFormat.DelayedIndex lastDelayedIndex = indexList.get(indexList.size() - 1); + this.snapshotSegmentLastIndexTable.remove(ledgerId, entryId); this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(), lastDelayedIndex.getEntryId(), bucket); for (DelayedMessageIndexBucketSnapshotFormat.DelayedIndex index : indexList) { @@ -620,8 +621,8 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa }).get(AsyncOperationTimeoutSeconds * (MaxRetryTimes + 1), TimeUnit.SECONDS); } catch (Exception e) { // Ignore exception to reload this segment on the next schedule. - log.error("[{}] An exception occurs when load next bucket snapshot, bucketKey:{}", - dispatcher.getName(), bucket.bucketKey(), e); + log.error("[{}] An exception occurs when load next bucket snapshot, bucketKey:{},segmentEntryId:{}", + dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1, e); break; } } From be3f980df88910be0af5a07a1974b99a5f16718f Mon Sep 17 00:00:00 2001 From: coderzc Date: Thu, 13 Apr 2023 12:24:50 +0800 Subject: [PATCH 2/9] Increase timeout of future --- .../broker/delayed/bucket/BucketDelayedDeliveryTracker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index 0ab3e03af665b..fc7fe02639c96 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -618,7 +618,7 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load, System.currentTimeMillis() - loadStartTime); } - }).get(AsyncOperationTimeoutSeconds * (MaxRetryTimes + 1), TimeUnit.SECONDS); + }).get(AsyncOperationTimeoutSeconds * (MaxRetryTimes + 2), TimeUnit.SECONDS); } catch (Exception e) { // Ignore exception to reload this segment on the next schedule. log.error("[{}] An exception occurs when load next bucket snapshot, bucketKey:{},segmentEntryId:{}", From 3ff84a8ddf4875a4e0206957b00be1976e4c02a8 Mon Sep 17 00:00:00 2001 From: coderzc Date: Thu, 13 Apr 2023 12:43:34 +0800 Subject: [PATCH 3/9] Improve code --- .../bucket/BucketDelayedDeliveryTracker.java | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index fc7fe02639c96..5f3ad61ac86a8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -587,20 +587,22 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa long loadStartTime = System.currentTimeMillis(); stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load); bucket.asyncLoadNextBucketSnapshotEntry().thenAccept(indexList -> { - if (CollectionUtils.isEmpty(indexList)) { - immutableBuckets.asMapOfRanges() - .remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId)); - bucket.asyncDeleteBucketSnapshot(stats); - return; - } - DelayedMessageIndexBucketSnapshotFormat.DelayedIndex - lastDelayedIndex = indexList.get(indexList.size() - 1); - this.snapshotSegmentLastIndexTable.remove(ledgerId, entryId); - this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(), - lastDelayedIndex.getEntryId(), bucket); - for (DelayedMessageIndexBucketSnapshotFormat.DelayedIndex index : indexList) { - sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(), - index.getEntryId()); + synchronized (BucketDelayedDeliveryTracker.this) { + this.snapshotSegmentLastIndexTable.remove(ledgerId, entryId); + if (CollectionUtils.isEmpty(indexList)) { + immutableBuckets.asMapOfRanges() + .remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId)); + bucket.asyncDeleteBucketSnapshot(stats); + return; + } + DelayedMessageIndexBucketSnapshotFormat.DelayedIndex + lastDelayedIndex = indexList.get(indexList.size() - 1); + this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(), + lastDelayedIndex.getEntryId(), bucket); + for (DelayedMessageIndexBucketSnapshotFormat.DelayedIndex index : indexList) { + sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(), + index.getEntryId()); + } } }).whenComplete((__, ex) -> { if (ex != null) { From 56f4902c80befafc75858d7d836dfd62a7f31a76 Mon Sep 17 00:00:00 2001 From: coderzc Date: Thu, 13 Apr 2023 23:36:19 +0800 Subject: [PATCH 4/9] Make load operate asynchronous --- .../bucket/BucketDelayedDeliveryTracker.java | 123 ++++++++++-------- .../BucketDelayedDeliveryTrackerTest.java | 30 +++-- 2 files changed, 91 insertions(+), 62 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index 5f3ad61ac86a8..cc62e1611164a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -21,7 +21,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELAYED_BUCKET_KEY_PREFIX; import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELIMITER; -import static org.apache.pulsar.broker.delayed.bucket.Bucket.MaxRetryTimes; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.HashBasedTable; import com.google.common.collect.Range; @@ -100,6 +99,8 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker private final BucketDelayedMessageIndexStats stats; + private CompletableFuture pendingLoad = null; + public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis, boolean isDelayedDeliveryDeliverAtTimeStrict, @@ -527,17 +528,21 @@ protected long nextDeliveryTime() { } @Override - public synchronized long getNumberOfDelayedMessages() { + public long getNumberOfDelayedMessages() { return numberDelayedMessages; } @Override - public synchronized long getBufferMemoryUsage() { + public long getBufferMemoryUsage() { return this.lastMutableBucket.getBufferMemoryUsage() + sharedBucketPriorityQueue.bytesCapacity(); } @Override public synchronized NavigableSet getScheduledMessages(int maxMessages) { + if (!checkPendingOpDone()) { + return new TreeSet<>(); + } + long cutoffTime = getCutoffTime(); lastMutableBucket.moveScheduledMessageToSharedQueue(cutoffTime, sharedBucketPriorityQueue); @@ -556,6 +561,7 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa ImmutableBucket bucket = snapshotSegmentLastIndexTable.get(ledgerId, entryId); if (bucket != null && immutableBuckets.asMapOfRanges().containsValue(bucket)) { + // All message of current snapshot segment are scheduled, try load next snapshot segment if (bucket.merging) { log.info("[{}] Skip load to wait for bucket snapshot merge finish, bucketKey:{}", dispatcher.getName(), bucket.bucketKey()); @@ -567,64 +573,65 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa log.debug("[{}] Loading next bucket snapshot segment, bucketKey: {}, nextSegmentEntryId: {}", dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1); } - // All message of current snapshot segment are scheduled, load next snapshot segment - // TODO make it asynchronous and not blocking this process - try { - boolean createFutureDone = bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone(); - - if (!createFutureDone) { - log.info("[{}] Skip load to wait for bucket snapshot create finish, bucketKey:{}", - dispatcher.getName(), bucket.bucketKey()); - break; - } + boolean createFutureDone = bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone(); + if (!createFutureDone) { + log.info("[{}] Skip load to wait for bucket snapshot create finish, bucketKey:{}", + dispatcher.getName(), bucket.bucketKey()); + break; + } - if (bucket.currentSegmentEntryId == bucket.lastSegmentEntryId) { - immutableBuckets.asMapOfRanges().remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId)); - bucket.asyncDeleteBucketSnapshot(stats); - continue; - } + if (bucket.currentSegmentEntryId == bucket.lastSegmentEntryId) { + immutableBuckets.asMapOfRanges().remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId)); + bucket.asyncDeleteBucketSnapshot(stats); + continue; + } - long loadStartTime = System.currentTimeMillis(); - stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load); - bucket.asyncLoadNextBucketSnapshotEntry().thenAccept(indexList -> { - synchronized (BucketDelayedDeliveryTracker.this) { - this.snapshotSegmentLastIndexTable.remove(ledgerId, entryId); - if (CollectionUtils.isEmpty(indexList)) { - immutableBuckets.asMapOfRanges() - .remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId)); - bucket.asyncDeleteBucketSnapshot(stats); - return; - } - DelayedMessageIndexBucketSnapshotFormat.DelayedIndex - lastDelayedIndex = indexList.get(indexList.size() - 1); - this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(), - lastDelayedIndex.getEntryId(), bucket); - for (DelayedMessageIndexBucketSnapshotFormat.DelayedIndex index : indexList) { - sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(), - index.getEntryId()); - } + long loadStartTime = System.currentTimeMillis(); + stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load); + CompletableFuture loadFuture = pendingLoad = bucket.asyncLoadNextBucketSnapshotEntry() + .thenAccept(indexList -> { + synchronized (BucketDelayedDeliveryTracker.this) { + this.snapshotSegmentLastIndexTable.remove(ledgerId, entryId); + if (CollectionUtils.isEmpty(indexList)) { + immutableBuckets.asMapOfRanges() + .remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId)); + bucket.asyncDeleteBucketSnapshot(stats); + return; } - }).whenComplete((__, ex) -> { - if (ex != null) { - // Back bucket state - bucket.setCurrentSegmentEntryId(preSegmentEntryId); + DelayedMessageIndexBucketSnapshotFormat.DelayedIndex + lastDelayedIndex = indexList.get(indexList.size() - 1); + this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(), + lastDelayedIndex.getEntryId(), bucket); + for (DelayedMessageIndexBucketSnapshotFormat.DelayedIndex index : indexList) { + sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(), + index.getEntryId()); + } + } + }).whenComplete((__, ex) -> { + if (ex != null) { + // Back bucket state + bucket.setCurrentSegmentEntryId(preSegmentEntryId); - log.error("[{}] Failed to load bucket snapshot segment, bucketKey: {}, segmentEntryId: {}", - dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1, ex); + log.error("[{}] Failed to load bucket snapshot segment, bucketKey: {}, segmentEntryId: {}", + dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1, ex); - stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.load); - } else { - log.info("[{}] Load next bucket snapshot segment finish, bucketKey: {}, segmentEntryId: {}", - dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1); + stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.load); + } else { + log.info("[{}] Load next bucket snapshot segment finish, bucketKey: {}, segmentEntryId: {}", + dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1); - stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load, - System.currentTimeMillis() - loadStartTime); + stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load, + System.currentTimeMillis() - loadStartTime); + } + synchronized (this) { + if (timeout != null) { + timeout.cancel(); } - }).get(AsyncOperationTimeoutSeconds * (MaxRetryTimes + 2), TimeUnit.SECONDS); - } catch (Exception e) { - // Ignore exception to reload this segment on the next schedule. - log.error("[{}] An exception occurs when load next bucket snapshot, bucketKey:{},segmentEntryId:{}", - dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1, e); + timeout = timer.newTimeout(this, tickTimeMillis, TimeUnit.MILLISECONDS); + } + }); + + if (!checkPendingOpDone() || loadFuture.isCompletedExceptionally()) { break; } } @@ -644,6 +651,14 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa return positions; } + private synchronized boolean checkPendingOpDone() { + if (pendingLoad == null || pendingLoad.isDone()) { + pendingLoad = null; + return true; + } + return false; + } + @Override public boolean shouldPauseAllDeliveries() { return false; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java index 95234d688f6a0..39b3992fbd195 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java @@ -39,6 +39,7 @@ import java.util.NavigableSet; import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -197,9 +198,11 @@ public void testRecoverSnapshot(BucketDelayedDeliveryTracker tracker) { }); assertTrue(tracker.hasMessageAvailable()); - Set scheduledMessages = tracker.getScheduledMessages(100); - - assertEquals(scheduledMessages.size(), 1); + Set scheduledMessages = new TreeSet<>(); + Awaitility.await().untilAsserted(() -> { + scheduledMessages.addAll(tracker.getScheduledMessages(100)); + assertEquals(scheduledMessages.size(), 1); + }); tracker.addMessage(101, 101, 101 * 10); @@ -216,12 +219,15 @@ public void testRecoverSnapshot(BucketDelayedDeliveryTracker tracker) { clockTime.set(100 * 10); assertTrue(tracker2.hasMessageAvailable()); - scheduledMessages = tracker2.getScheduledMessages(70); + Set scheduledMessages2 = new TreeSet<>(); - assertEquals(scheduledMessages.size(), 70); + Awaitility.await().untilAsserted(() -> { + scheduledMessages2.addAll(tracker2.getScheduledMessages(70)); + assertEquals(scheduledMessages2.size(), 70); + }); int i = 31; - for (PositionImpl scheduledMessage : scheduledMessages) { + for (PositionImpl scheduledMessage : scheduledMessages2) { assertEquals(scheduledMessage, PositionImpl.get(i, i)); i++; } @@ -298,7 +304,11 @@ public void testMergeSnapshot(final BucketDelayedDeliveryTracker tracker) { clockTime.set(110 * 10); - NavigableSet scheduledMessages = tracker2.getScheduledMessages(110); + NavigableSet scheduledMessages = new TreeSet<>(); + Awaitility.await().untilAsserted(() -> { + scheduledMessages.addAll(tracker2.getScheduledMessages(110)); + assertEquals(scheduledMessages.size(), 110); + }); for (int i = 1; i <= 110; i++) { PositionImpl position = scheduledMessages.pollFirst(); assertEquals(position, PositionImpl.get(i, i)); @@ -370,7 +380,11 @@ public void testWithBkException(final BucketDelayedDeliveryTracker tracker) { assertEquals(tracker2.getScheduledMessages(100).size(), 0); - assertEquals(tracker2.getScheduledMessages(100).size(), delayedMessagesInSnapshotValue); + Set scheduledMessages = new TreeSet<>(); + Awaitility.await().untilAsserted(() -> { + scheduledMessages.addAll(tracker2.getScheduledMessages(100)); + assertEquals(scheduledMessages.size(), delayedMessagesInSnapshotValue); + }); assertTrue(mockBucketSnapshotStorage.createExceptionQueue.isEmpty()); assertTrue(mockBucketSnapshotStorage.getMetaDataExceptionQueue.isEmpty()); From 253cefe06cd6d75f5d405ceebdfd28c009e7e251 Mon Sep 17 00:00:00 2001 From: coderzc Date: Fri, 14 Apr 2023 12:19:50 +0800 Subject: [PATCH 5/9] improve code --- .../broker/delayed/bucket/BucketDelayedDeliveryTracker.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index cc62e1611164a..a37de0a5309af 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -268,7 +268,7 @@ private void afterCreateImmutableBucket(Pair immu if (ex == null) { immutableBucket.setSnapshotSegments(null); immutableBucket.asyncUpdateSnapshotLength(); - log.info("[{}] Creat bucket snapshot finish, bucketKey: {}", dispatcher.getName(), + log.info("[{}] Create bucket snapshot finish, bucketKey: {}", dispatcher.getName(), immutableBucket.bucketKey()); stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.create, @@ -540,7 +540,8 @@ public long getBufferMemoryUsage() { @Override public synchronized NavigableSet getScheduledMessages(int maxMessages) { if (!checkPendingOpDone()) { - return new TreeSet<>(); + log.info("[{}] Skip getScheduledMessages to wait for bucket snapshot load finish.", dispatcher.getName()); + return Collections.emptyNavigableSet(); } long cutoffTime = getCutoffTime(); From 06399298c2b3ec77572b5842d4ce0a08e7b92c41 Mon Sep 17 00:00:00 2001 From: coderzc Date: Fri, 14 Apr 2023 12:40:12 +0800 Subject: [PATCH 6/9] remove synchronized --- .../PersistentDispatcherMultipleConsumers.java | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 4ac755860fc7b..679c6f285e70a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -46,7 +46,6 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.tuple.Pair; -import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker; import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory; import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker; import org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker; @@ -1088,7 +1087,7 @@ protected synchronized boolean shouldPauseDeliveryForDelayTracker() { } @Override - public synchronized long getNumberOfDelayedMessages() { + public long getNumberOfDelayedMessages() { return delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L); } @@ -1172,15 +1171,7 @@ public PersistentTopic getTopic() { public long getDelayedTrackerMemoryUsage() { - if (delayedDeliveryTracker.isEmpty()) { - return 0; - } - - if (delayedDeliveryTracker.get() instanceof AbstractDelayedDeliveryTracker) { - return delayedDeliveryTracker.get().getBufferMemoryUsage(); - } - - return 0; + return delayedDeliveryTracker.map(DelayedDeliveryTracker::getBufferMemoryUsage).orElse(0L); } public Map getBucketDelayedIndexStats() { From 84691ea7931b69c5a1867d8c6651bab3a9356e9a Mon Sep 17 00:00:00 2001 From: coderzc Date: Fri, 14 Apr 2023 12:48:59 +0800 Subject: [PATCH 7/9] remove unnecessary code --- .../delayed/bucket/BucketDelayedDeliveryTracker.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index a37de0a5309af..a958e71fede57 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -581,12 +581,6 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa break; } - if (bucket.currentSegmentEntryId == bucket.lastSegmentEntryId) { - immutableBuckets.asMapOfRanges().remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId)); - bucket.asyncDeleteBucketSnapshot(stats); - continue; - } - long loadStartTime = System.currentTimeMillis(); stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load); CompletableFuture loadFuture = pendingLoad = bucket.asyncLoadNextBucketSnapshotEntry() @@ -636,7 +630,6 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa break; } } - snapshotSegmentLastIndexTable.remove(ledgerId, entryId); positions.add(new PositionImpl(ledgerId, entryId)); From 51f277022c719d2ae03759143d1d14a442f0fd8d Mon Sep 17 00:00:00 2001 From: coderzc Date: Fri, 14 Apr 2023 13:25:20 +0800 Subject: [PATCH 8/9] improve log --- .../broker/delayed/bucket/BucketDelayedDeliveryTracker.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index a958e71fede57..0d20926404273 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -613,7 +613,8 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.load); } else { log.info("[{}] Load next bucket snapshot segment finish, bucketKey: {}, segmentEntryId: {}", - dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1); + dispatcher.getName(), bucket.bucketKey(), + (preSegmentEntryId == bucket.lastSegmentEntryId) ? "-1" : preSegmentEntryId + 1); stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load, System.currentTimeMillis() - loadStartTime); From 0239f4d3bb51371d4d990856157015909e1af8fc Mon Sep 17 00:00:00 2001 From: coderzc Date: Sun, 16 Apr 2023 13:15:23 +0800 Subject: [PATCH 9/9] Address comment --- .../delayed/bucket/BucketDelayedDeliveryTracker.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index 0d20926404273..9f9e44bdded6d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -81,7 +81,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker private final int maxNumBuckets; - private long numberDelayedMessages; + private volatile long numberDelayedMessages; @Getter @VisibleForTesting @@ -540,7 +540,10 @@ public long getBufferMemoryUsage() { @Override public synchronized NavigableSet getScheduledMessages(int maxMessages) { if (!checkPendingOpDone()) { - log.info("[{}] Skip getScheduledMessages to wait for bucket snapshot load finish.", dispatcher.getName()); + if (log.isDebugEnabled()) { + log.debug("[{}] Skip getScheduledMessages to wait for bucket snapshot load finish.", + dispatcher.getName()); + } return Collections.emptyNavigableSet(); }