From 1d1a3ef864a65c995ceda4b7875ed934c2574298 Mon Sep 17 00:00:00 2001 From: lifepuzzlefun Date: Mon, 17 Apr 2023 19:32:13 +0800 Subject: [PATCH] [improve] [broker] Close temporary open ledger in BookkeeperBucketSnapshotStorage (#20111) (cherry picked from commit b50e8802a5224dd68832e263e7046650771a1a4e) --- .../BookkeeperBucketSnapshotStorage.java | 33 +++++++++++++++---- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java index e7d4f9301dd36..9c30ccf1c0b7e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java @@ -66,22 +66,41 @@ public CompletableFuture createBucketSnapshot(SnapshotMetadata snapshotMet @Override public CompletableFuture getBucketSnapshotMetadata(long bucketId) { - return openLedger(bucketId).thenCompose( - ledgerHandle -> getLedgerEntry(ledgerHandle, 0, 0). - thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement()))); + return openLedger(bucketId).thenCompose(ledgerHandle -> { + CompletableFuture snapshotFuture = + getLedgerEntry(ledgerHandle, 0, 0) + .thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())); + + snapshotFuture.whenComplete((__, e) -> closeLedger(ledgerHandle)); + + return snapshotFuture; + }); } @Override public CompletableFuture> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId, long lastSegmentEntryId) { - return openLedger(bucketId).thenCompose( - ledgerHandle -> getLedgerEntry(ledgerHandle, firstSegmentEntryId, - lastSegmentEntryId).thenApply(this::parseSnapshotSegmentEntries)); + return openLedger(bucketId).thenCompose(ledgerHandle -> { + CompletableFuture> parseFuture = + getLedgerEntry(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId) + .thenApply(this::parseSnapshotSegmentEntries); + + parseFuture.whenComplete((__, e) -> closeLedger(ledgerHandle)); + + return parseFuture; + }); } @Override public CompletableFuture getBucketSnapshotLength(long bucketId) { - return openLedger(bucketId).thenApply(LedgerHandle::getLength); + return openLedger(bucketId).thenCompose(ledgerHandle -> { + CompletableFuture lengthFuture = + CompletableFuture.completedFuture(ledgerHandle.getLength()); + + lengthFuture.whenComplete((__, e) -> closeLedger(ledgerHandle)); + + return lengthFuture; + }); } @Override