Skip to content

Commit

Permalink
improve code
Browse files Browse the repository at this point in the history
  • Loading branch information
coderzc committed Oct 12, 2022
1 parent 45e2f34 commit 86a76fa
Showing 1 changed file with 29 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,20 @@ private Optional<BucketState> findBucket(long ledgerId) {
return Optional.ofNullable(immutableBuckets.get(ledgerId));
}

private Long getBucketIdByBucketKey(String bucketKey) {
String bucketIdStr = cursor.getCursorProperties().get(bucketKey);
private Long getBucketId(BucketState bucketState) {
long bucketId = bucketState.getBucketId();
if (bucketId != -1L) {
return bucketId;
}

String bucketIdStr = cursor.getCursorProperties().get(bucketState.bucketKey());
if (StringUtils.isBlank(bucketIdStr)) {
return -1L;
}
return Long.valueOf(bucketIdStr);

bucketId = Long.parseLong(bucketIdStr);
bucketState.setBucketId(bucketId);
return bucketId;
}

private BucketState createImmutableBucket(long startLedgerId, long endLedgerId) {
Expand All @@ -178,16 +186,21 @@ private CompletableFuture<Long> asyncSaveBucketSnapshot(
List<SnapshotSegment> bucketSnapshotSegments) {

return bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata, bucketSnapshotSegments)
.thenCompose(newBucketId -> {
.thenApply(newBucketId -> {
bucketState.setBucketId(newBucketId);
return putBucketKeyId(bucketState.bucketKey(), newBucketId);
String bucketKey = bucketState.bucketKey();
putBucketKeyId(bucketKey, newBucketId).exceptionally(ex -> {
log.warn("Failed to record bucketId to cursor property, bucketKey: {}", bucketKey);
return null;
});
return newBucketId;
});
}

private CompletableFuture<Long> putBucketKeyId(String bucketKey, Long bucketId) {
private CompletableFuture<Void> putBucketKeyId(String bucketKey, Long bucketId) {
Objects.requireNonNull(bucketId);
return executeWithRetry(() -> cursor.putCursorProperty(bucketKey, String.valueOf(bucketId)),
ManagedLedgerException.BadVersionException.class).thenApply(__ -> bucketId);
ManagedLedgerException.BadVersionException.class);
}

private CompletableFuture<Long> asyncCreateBucketSnapshot() {
Expand Down Expand Up @@ -300,24 +313,14 @@ private CompletableFuture<Void> asyncLoadNextBucketSnapshotEntry(BucketState buc
return CompletableFuture.completedFuture(null);
}

CompletableFuture<Long> createFuture;
if (bucketState.snapshotCreateFuture != null) {
// Wait bucket snapshot create finish
createFuture = bucketState.snapshotCreateFuture;
} else {
createFuture = CompletableFuture.completedFuture(null);
// Wait bucket snapshot create finish
CompletableFuture<Long> snapshotCreateFuture = bucketState.snapshotCreateFuture;
if (snapshotCreateFuture == null) {
snapshotCreateFuture = CompletableFuture.completedFuture(-1L);
}

return createFuture.thenCompose(__ -> {
final String bucketKey = bucketState.bucketKey();
final Long bucketId;
if (bucketState.getBucketId() != -1L) {
bucketId = bucketState.getBucketId();
} else {
bucketId = getBucketIdByBucketKey(bucketKey);
bucketState.setBucketId(bucketId);
}

return snapshotCreateFuture.thenCompose(__ -> {
final Long bucketId = getBucketId(bucketState);
CompletableFuture<Integer> loadMetaDataFuture = new CompletableFuture<>();
if (isRecover) {
// TODO Recover bucket snapshot
Expand Down Expand Up @@ -377,12 +380,11 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver
// Create bucket snapshot
if (ledgerId > lastMutableBucketState.endLedgerId && !getPriorityQueue().isEmpty()) {
if (getPriorityQueue().size() >= minIndexCountPerBucket || existBucket) {
if (immutableBuckets.asMapOfRanges().size() >= maxNumBuckets) {
// TODO merge bucket snapshot (synchronize operate)
}

asyncCreateBucketSnapshot();
resetLastMutableBucketRange();
if (immutableBuckets.asMapOfRanges().size() > maxNumBuckets) {
// TODO merge bucket snapshot (synchronize operate)
}
}
}

Expand Down

0 comments on commit 86a76fa

Please sign in to comment.