diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java index e7d4f9301dd36..9c30ccf1c0b7e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java @@ -66,22 +66,41 @@ public CompletableFuture createBucketSnapshot(SnapshotMetadata snapshotMet @Override public CompletableFuture getBucketSnapshotMetadata(long bucketId) { - return openLedger(bucketId).thenCompose( - ledgerHandle -> getLedgerEntry(ledgerHandle, 0, 0). - thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement()))); + return openLedger(bucketId).thenCompose(ledgerHandle -> { + CompletableFuture snapshotFuture = + getLedgerEntry(ledgerHandle, 0, 0) + .thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())); + + snapshotFuture.whenComplete((__, e) -> closeLedger(ledgerHandle)); + + return snapshotFuture; + }); } @Override public CompletableFuture> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId, long lastSegmentEntryId) { - return openLedger(bucketId).thenCompose( - ledgerHandle -> getLedgerEntry(ledgerHandle, firstSegmentEntryId, - lastSegmentEntryId).thenApply(this::parseSnapshotSegmentEntries)); + return openLedger(bucketId).thenCompose(ledgerHandle -> { + CompletableFuture> parseFuture = + getLedgerEntry(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId) + .thenApply(this::parseSnapshotSegmentEntries); + + parseFuture.whenComplete((__, e) -> closeLedger(ledgerHandle)); + + return parseFuture; + }); } @Override public CompletableFuture getBucketSnapshotLength(long bucketId) { - return openLedger(bucketId).thenApply(LedgerHandle::getLength); + return openLedger(bucketId).thenCompose(ledgerHandle -> { + CompletableFuture lengthFuture = + CompletableFuture.completedFuture(ledgerHandle.getLength()); + + lengthFuture.whenComplete((__, e) -> closeLedger(ledgerHandle)); + + return lengthFuture; + }); } @Override