Skip to content

Commit

Permalink
Cache LedgerHandle in BookkeeperBucketSnapshotStorage
Browse files Browse the repository at this point in the history
  • Loading branch information
coderzc committed Apr 17, 2023
1 parent 6304546 commit 00f09a9
Showing 1 changed file with 24 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import javax.validation.constraints.NotNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
Expand All @@ -48,6 +49,8 @@ public class BookkeeperBucketSnapshotStorage implements BucketSnapshotStorage {
private final ServiceConfiguration config;
private BookKeeper bookKeeper;

private final Map<Long, LedgerHandle> ledgerHandleCache = new ConcurrentHashMap<>();

public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
this.pulsar = pulsar;
this.config = pulsar.getConfig();
Expand All @@ -66,45 +69,27 @@ public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMet

@Override
public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
return openLedger(bucketId).thenCompose(ledgerHandle -> {
CompletableFuture<SnapshotMetadata> snapshotFuture =
getLedgerEntry(ledgerHandle, 0, 0)
.thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement()));

snapshotFuture.whenComplete((__, e) -> closeLedger(ledgerHandle));

return snapshotFuture;
});
return getLedgerHandle(bucketId).thenCompose(ledgerHandle -> getLedgerEntry(ledgerHandle, 0, 0)
.thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
}

@Override
public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
long lastSegmentEntryId) {
return openLedger(bucketId).thenCompose(ledgerHandle -> {
CompletableFuture<List<SnapshotSegment>> parseFuture =
getLedgerEntry(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId)
.thenApply(this::parseSnapshotSegmentEntries);

parseFuture.whenComplete((__, e) -> closeLedger(ledgerHandle));

return parseFuture;
});
return getLedgerHandle(bucketId).thenCompose(
ledgerHandle -> getLedgerEntry(ledgerHandle, firstSegmentEntryId, lastSegmentEntryId)
.thenApply(this::parseSnapshotSegmentEntries));
}

@Override
public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
return openLedger(bucketId).thenCompose(ledgerHandle -> {
CompletableFuture<Long> lengthFuture =
CompletableFuture.completedFuture(ledgerHandle.getLength());

lengthFuture.whenComplete((__, e) -> closeLedger(ledgerHandle));

return lengthFuture;
});
return getLedgerHandle(bucketId).thenCompose(
ledgerHandle -> CompletableFuture.completedFuture(ledgerHandle.getLength()));
}

@Override
public CompletableFuture<Void> deleteBucketSnapshot(long bucketId) {
ledgerHandleCache.remove(bucketId);
return deleteLedger(bucketId);
}

Expand Down Expand Up @@ -172,12 +157,25 @@ private CompletableFuture<LedgerHandle> createLedger(String bucketKey, String to
if (rc != BKException.Code.OK) {
future.completeExceptionally(bkException("Create ledger", rc, -1));
} else {
ledgerHandleCache.put(handle.getId(), handle);
future.complete(handle);
}
}, null, metadata);
return future;
}

private CompletableFuture<LedgerHandle> getLedgerHandle(Long ledgerId) {
LedgerHandle ledgerHandle = ledgerHandleCache.get(ledgerId);
if (ledgerHandle != null) {
return CompletableFuture.completedFuture(ledgerHandle);
} else {
return openLedger(ledgerId).thenApply(ledgerHandle_ -> {
ledgerHandleCache.put(ledgerId, ledgerHandle_);
return ledgerHandle_;
});
}
}

private CompletableFuture<LedgerHandle> openLedger(Long ledgerId) {
final CompletableFuture<LedgerHandle> future = new CompletableFuture<>();
bookKeeper.asyncOpenLedger(
Expand Down

0 comments on commit 00f09a9

Please sign in to comment.