From bff14025bc01aa0ecb936178a65ad22bab9cdf2d Mon Sep 17 00:00:00 2001 From: coderzc Date: Sun, 26 Mar 2023 14:57:32 +0800 Subject: [PATCH] Fix code --- .../bucket/BucketDelayedDeliveryTracker.java | 13 +++++++------ .../bucket/BucketDelayedMessageIndexStats.java | 2 +- .../bucket/BucketDelayedDeliveryTrackerTest.java | 2 +- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index 41f9479ae860e0..88e1279b211e41 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -436,12 +436,13 @@ private synchronized CompletableFuture asyncMergeBucketSnapshot(ImmutableB .thenAccept(combinedDelayedIndexQueue -> { synchronized (BucketDelayedDeliveryTracker.this) { long createStartTime = System.currentTimeMillis(); - stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.create); - Pair immutableBucketDelayedIndexPair = - lastMutableBucket.createImmutableBucketAndAsyncPersistent( - timeStepPerBucketSnapshotSegmentInMillis, maxIndexesPerBucketSnapshotSegment, - sharedBucketPriorityQueue, combinedDelayedIndexQueue, bucketA.startLedgerId, - bucketB.endLedgerId); + stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.create); + Pair immutableBucketDelayedIndexPair = + lastMutableBucket.createImmutableBucketAndAsyncPersistent( + timeStepPerBucketSnapshotSegmentInMillis, + maxIndexesPerBucketSnapshotSegment, + sharedBucketPriorityQueue, combinedDelayedIndexQueue, bucketA.startLedgerId, + bucketB.endLedgerId); // Merge bit map to new bucket Map delayedIndexBitMapA = bucketA.getDelayedIndexBitMap(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedMessageIndexStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedMessageIndexStats.java index f479a03092eaf2..68788c359d5605 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedMessageIndexStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedMessageIndexStats.java @@ -75,7 +75,7 @@ public Map genTopicMetricMap() { String[] labels = splitKey(k); String[] labelsAndValues = new String[] {"state", labels[0], "type", labels[1]}; String key = OP_COUNT_NAME + joinKey(labelsAndValues); - metrics.put(key, new TopicMetricBean(OP_COUNT_NAME, count.longValue(), labelsAndValues)); + metrics.put(key, new TopicMetricBean(OP_COUNT_NAME, count.sumThenReset(), labelsAndValues)); }); delayedMessageIndexBucketOpLatencyMs.forEach((typeName, statsBuckets) -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java index fbb866d48ed692..4016bf361d2d16 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java @@ -158,7 +158,7 @@ public Object[][] provider(Method method) throws Exception { } @Test(dataProvider = "delayedTracker") - public void testContainsMessage(DelayedDeliveryTracker tracker) { + public void testContainsMessage(BucketDelayedDeliveryTracker tracker) { tracker.addMessage(1, 1, 10); tracker.addMessage(2, 2, 20);