Skip to content

Commit

Permalink
fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
coderzc committed Mar 1, 2023
1 parent fbd4bf3 commit 60e1eae
Showing 1 changed file with 17 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
@ThreadSafe
public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker {

static final CompletableFuture<Long> NULL_PROMISE = CompletableFuture.completedFuture(null);

static final int AsyncOperationTimeoutSeconds = 60;

private final long minIndexCountPerBucket;
Expand Down Expand Up @@ -238,6 +240,7 @@ private void afterCreateImmutableBucket(Pair<ImmutableBucket, DelayedIndex> immu
immutableBucket.getSnapshotCreateFuture().ifPresent(createFuture -> {
CompletableFuture<Long> future = createFuture.whenComplete((__, ex) -> {
if (ex == null) {
immutableBucket.setSnapshotSegments(null);
log.info("[{}] Creat bucket snapshot finish, bucketKey: {}", dispatcher.getName(),
immutableBucket.bucketKey());
return;
Expand All @@ -247,14 +250,17 @@ private void afterCreateImmutableBucket(Pair<ImmutableBucket, DelayedIndex> immu
log.error("[{}] Failed to create bucket snapshot, bucketKey: {}",
dispatcher.getName(), immutableBucket.bucketKey(), ex);

// Put the index back into the shared queue and downgrade to memory mode
// Put indexes back into the shared queue and downgrade to memory mode
synchronized (BucketDelayedDeliveryTracker.this) {
immutableBucket.getSnapshotSegments().ifPresent(snapshotSegments -> {
for (DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment :
snapshotSegments) {
for (DelayedIndex delayedIndex : snapshotSegment.getIndexesList()) {
Iterator<DelayedIndex> iterator = snapshotSegment.getIndexesList().iterator();
while (iterator.hasNext()) {
DelayedIndex delayedIndex = iterator.next();
sharedBucketPriorityQueue.add(delayedIndex.getTimestamp(),
delayedIndex.getLedgerId(), delayedIndex.getEntryId());
iterator.remove();
}
}
immutableBucket.setSnapshotSegments(null);
Expand All @@ -263,15 +269,9 @@ private void afterCreateImmutableBucket(Pair<ImmutableBucket, DelayedIndex> immu
immutableBucket.setCurrentSegmentEntryId(immutableBucket.lastSegmentEntryId);
immutableBuckets.remove(
Range.closed(immutableBucket.startLedgerId, immutableBucket.endLedgerId));

immutableBucket.setSnapshotCreateFuture(null);
}
});
synchronized (BucketDelayedDeliveryTracker.this) {
if (!future.isDone()) {
immutableBucket.setSnapshotCreateFuture(future);
}
}
immutableBucket.setSnapshotCreateFuture(future);
});
}
}
Expand Down Expand Up @@ -341,7 +341,10 @@ private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot() {
long numberMessages = bucketL.numberBucketDelayedMessages + bucketR.numberBucketDelayedMessages;
if (numberMessages < minNumberMessages) {
minNumberMessages = (int) numberMessages;
if (bucketL.lastSegmentEntryId > bucketL.getCurrentSegmentEntryId()) {
if (bucketL.lastSegmentEntryId > bucketL.getCurrentSegmentEntryId()
&& bucketR.lastSegmentEntryId > bucketR.getCurrentSegmentEntryId()
&& bucketL.getSnapshotCreateFuture().orElse(NULL_PROMISE).isDone()
&& bucketR.getSnapshotCreateFuture().orElse(NULL_PROMISE).isDone()) {
minIndex = i;
}
}
Expand All @@ -355,21 +358,6 @@ private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot() {
ImmutableBucket immutableBucketA = values.get(minIndex);
ImmutableBucket immutableBucketB = values.get(minIndex + 1);

CompletableFuture<Long> snapshotCreateFutureA =
immutableBucketA.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null));
if (!snapshotCreateFutureA.isDone()) {
log.info("[{}] Wait for bucket snapshot create finish to merge, bucketKey:{}", dispatcher.getName(),
immutableBucketA.bucketKey());
return CompletableFuture.completedFuture(null);
}
CompletableFuture<Long> snapshotCreateFutureB =
immutableBucketB.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null));
if (!snapshotCreateFutureB.isDone()) {
log.info("[{}] Wait for bucket snapshot create finish to merge, bucketKey:{}", dispatcher.getName(),
immutableBucketB.bucketKey());
return CompletableFuture.completedFuture(null);
}

if (log.isDebugEnabled()) {
log.info("[{}] Merging bucket snapshot, bucketAKey: {}, bucketBKey: {}", dispatcher.getName(),
immutableBucketA.bucketKey(), immutableBucketB.bucketKey());
Expand Down Expand Up @@ -416,13 +404,8 @@ private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot(ImmutableB

afterCreateImmutableBucket(immutableBucketDelayedIndexPair);

CompletableFuture<Long> snapshotCreateFuture = CompletableFuture.completedFuture(null);
if (immutableBucketDelayedIndexPair != null) {
snapshotCreateFuture = immutableBucketDelayedIndexPair.getLeft().getSnapshotCreateFuture()
.orElse(CompletableFuture.completedFuture(null));
}

snapshotCreateFuture.thenCompose(___ -> {
immutableBucketDelayedIndexPair.getLeft().getSnapshotCreateFuture()
.orElse(NULL_PROMISE).thenCompose(___ -> {
CompletableFuture<Void> removeAFuture = bucketA.asyncDeleteBucketSnapshot();
CompletableFuture<Void> removeBFuture = bucketB.asyncDeleteBucketSnapshot();
return CompletableFuture.allOf(removeAFuture, removeBFuture);
Expand Down Expand Up @@ -498,15 +481,15 @@ public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessa
.orElse(CompletableFuture.completedFuture(null)).isDone();

if (!createFutureDone) {
log.info("[{}] Wait for bucket snapshot create finish to load, bucketKey:{}",
log.info("[{}] Skip load to wait for bucket snapshot create finish, bucketKey:{}",
dispatcher.getName(), bucket.bucketKey());
break;
}

if (bucket.currentSegmentEntryId == bucket.lastSegmentEntryId) {
immutableBuckets.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
bucket.asyncDeleteBucketSnapshot();
break;
continue;
}

bucket.asyncLoadNextBucketSnapshotEntry().thenAccept(indexList -> {
Expand Down

0 comments on commit 60e1eae

Please sign in to comment.