diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java index 08202bb19155dc..99a92eb2515a75 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java @@ -81,11 +81,7 @@ public CompletableFuture> getBucketSnapshotSegment(long bu @Override public CompletableFuture getBucketSnapshotLength(long bucketId) { - return openLedger(bucketId).thenApply(ledgerHandle -> { - long length = ledgerHandle.getLength(); - closeLedger(ledgerHandle); - return length; - }); + return openLedger(bucketId).thenApply(LedgerHandle::getLength); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index f06ca2bf85e3fa..0435971496c2f2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -266,6 +266,7 @@ private void afterCreateImmutableBucket(Pair immu CompletableFuture future = createFuture.handle((bucketId, ex) -> { if (ex == null) { immutableBucket.setSnapshotSegments(null); + immutableBucket.asyncUpdateSnapshotLength(); log.info("[{}] Creat bucket snapshot finish, bucketKey: {}", dispatcher.getName(), immutableBucket.bucketKey()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java index 96a95588590693..e49ebe9606e01f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java @@ -155,9 +155,7 @@ Pair createImmutableBucketAndAsyncPersistent( Pair result = Pair.of(bucket, lastDelayedIndex); CompletableFuture future = asyncSaveBucketSnapshot(bucket, - bucketSnapshotMetadata, bucketSnapshotSegments).thenCompose(__ -> - bucket.asyncUpdateSnapshotLength().exceptionally(ex -> null) - ); + bucketSnapshotMetadata, bucketSnapshotSegments); bucket.setSnapshotCreateFuture(future); return result;