diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java index 9c30ccf1c0b7e6..00a64947ff9bda 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import javax.validation.constraints.NotNull; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BKException; @@ -48,6 +49,8 @@ public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage { private final ServiceConfiguration config; private BookKeeper bookKeeper; + private final Map ledgerHandleCache = new ConcurrentHashMap<>(); + public BookkeeperBucketSnapshotStorage(PulsarService pulsar) { this.pulsar = pulsar; this.config = pulsar.getConfig(); @@ -66,45 +69,27 @@ public CompletableFuture createBucketSnapshot(SnapshotMetadata snapshotMet @Override public CompletableFuture getBucketSnapshotMetadata(long bucketId) { - return openLedger(bucketId).thenCompose(ledgerHandle -> { - CompletableFuture snapshotFuture = - getLedgerEntry(ledgerHandle, 0, 0) - .thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())); - - snapshotFuture.whenComplete((__, e) -> closeLedger(ledgerHandle)); - - return snapshotFuture; - }); + return getLedgerHandle(bucketId).thenCompose(ledgerHandle -> getLedgerEntry(ledgerHandle, 0, 0) + .thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement()))); } @Override public CompletableFuture> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId, long lastSegmentEntryId) { - return openLedger(bucketId).thenCompose(ledgerHandle -> { - CompletableFuture> parseFuture = - getLedgerEntry(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId) - .thenApply(this::parseSnapshotSegmentEntries); - - parseFuture.whenComplete((__, e) -> closeLedger(ledgerHandle)); - - return parseFuture; - }); + return getLedgerHandle(bucketId).thenCompose( + ledgerHandle -> getLedgerEntry(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId) + .thenApply(this::parseSnapshotSegmentEntries)); } @Override public CompletableFuture getBucketSnapshotLength(long bucketId) { - return openLedger(bucketId).thenCompose(ledgerHandle -> { - CompletableFuture lengthFuture = - CompletableFuture.completedFuture(ledgerHandle.getLength()); - - lengthFuture.whenComplete((__, e) -> closeLedger(ledgerHandle)); - - return lengthFuture; - }); + return getLedgerHandle(bucketId).thenCompose( + ledgerHandle -> CompletableFuture.completedFuture(ledgerHandle.getLength())); } @Override public CompletableFuture deleteBucketSnapshot(long bucketId) { + ledgerHandleCache.remove(bucketId); return deleteLedger(bucketId); } @@ -178,6 +163,18 @@ private CompletableFuture createLedger(String bucketKey, String to return future; } + private CompletableFuture getLedgerHandle(Long ledgerId) { + LedgerHandle ledgerHandle = ledgerHandleCache.get(ledgerId); + if (ledgerHandle != null) { + return CompletableFuture.completedFuture(ledgerHandle); + } else { + return openLedger(ledgerId).thenApply(ledgerHandle_ -> { + ledgerHandleCache.put(ledgerId, ledgerHandle_); + return ledgerHandle_; + }); + } + } + private CompletableFuture openLedger(Long ledgerId) { final CompletableFuture future = new CompletableFuture<>(); bookKeeper.asyncOpenLedger(