Skip to content

Commit

Permalink
Make load operate asynchronous
Browse files Browse the repository at this point in the history
  • Loading branch information
coderzc committed Apr 14, 2023
1 parent 3ff84a8 commit 56f4902
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELAYED_BUCKET_KEY_PREFIX;
import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELIMITER;
import static org.apache.pulsar.broker.delayed.bucket.Bucket.MaxRetryTimes;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Range;
Expand Down Expand Up @@ -100,6 +99,8 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker

private final BucketDelayedMessageIndexStats stats;

private CompletableFuture<Void> pendingLoad = null;

public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher,
Timer timer, long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict,
Expand Down Expand Up @@ -527,17 +528,21 @@ protected long nextDeliveryTime() {
}

@Override
public synchronized long getNumberOfDelayedMessages() {
public long getNumberOfDelayedMessages() {
return numberDelayedMessages;
}

@Override
public synchronized long getBufferMemoryUsage() {
public long getBufferMemoryUsage() {
return this.lastMutableBucket.getBufferMemoryUsage() + sharedBucketPriorityQueue.bytesCapacity();
}

@Override
public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessages) {
if (!checkPendingOpDone()) {
return new TreeSet<>();
}

long cutoffTime = getCutoffTime();

lastMutableBucket.moveScheduledMessageToSharedQueue(cutoffTime, sharedBucketPriorityQueue);
Expand All @@ -556,6 +561,7 @@ public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessa

ImmutableBucket bucket = snapshotSegmentLastIndexTable.get(ledgerId, entryId);
if (bucket != null && immutableBuckets.asMapOfRanges().containsValue(bucket)) {
// All message of current snapshot segment are scheduled, try load next snapshot segment
if (bucket.merging) {
log.info("[{}] Skip load to wait for bucket snapshot merge finish, bucketKey:{}",
dispatcher.getName(), bucket.bucketKey());
Expand All @@ -567,64 +573,65 @@ public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessa
log.debug("[{}] Loading next bucket snapshot segment, bucketKey: {}, nextSegmentEntryId: {}",
dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1);
}
// All message of current snapshot segment are scheduled, load next snapshot segment
// TODO make it asynchronous and not blocking this process
try {
boolean createFutureDone = bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone();

if (!createFutureDone) {
log.info("[{}] Skip load to wait for bucket snapshot create finish, bucketKey:{}",
dispatcher.getName(), bucket.bucketKey());
break;
}
boolean createFutureDone = bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone();
if (!createFutureDone) {
log.info("[{}] Skip load to wait for bucket snapshot create finish, bucketKey:{}",
dispatcher.getName(), bucket.bucketKey());
break;
}

if (bucket.currentSegmentEntryId == bucket.lastSegmentEntryId) {
immutableBuckets.asMapOfRanges().remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
bucket.asyncDeleteBucketSnapshot(stats);
continue;
}
if (bucket.currentSegmentEntryId == bucket.lastSegmentEntryId) {
immutableBuckets.asMapOfRanges().remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
bucket.asyncDeleteBucketSnapshot(stats);
continue;
}

long loadStartTime = System.currentTimeMillis();
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load);
bucket.asyncLoadNextBucketSnapshotEntry().thenAccept(indexList -> {
synchronized (BucketDelayedDeliveryTracker.this) {
this.snapshotSegmentLastIndexTable.remove(ledgerId, entryId);
if (CollectionUtils.isEmpty(indexList)) {
immutableBuckets.asMapOfRanges()
.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
bucket.asyncDeleteBucketSnapshot(stats);
return;
}
DelayedMessageIndexBucketSnapshotFormat.DelayedIndex
lastDelayedIndex = indexList.get(indexList.size() - 1);
this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(),
lastDelayedIndex.getEntryId(), bucket);
for (DelayedMessageIndexBucketSnapshotFormat.DelayedIndex index : indexList) {
sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(),
index.getEntryId());
}
long loadStartTime = System.currentTimeMillis();
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load);
CompletableFuture<Void> loadFuture = pendingLoad = bucket.asyncLoadNextBucketSnapshotEntry()
.thenAccept(indexList -> {
synchronized (BucketDelayedDeliveryTracker.this) {
this.snapshotSegmentLastIndexTable.remove(ledgerId, entryId);
if (CollectionUtils.isEmpty(indexList)) {
immutableBuckets.asMapOfRanges()
.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
bucket.asyncDeleteBucketSnapshot(stats);
return;
}
}).whenComplete((__, ex) -> {
if (ex != null) {
// Back bucket state
bucket.setCurrentSegmentEntryId(preSegmentEntryId);
DelayedMessageIndexBucketSnapshotFormat.DelayedIndex
lastDelayedIndex = indexList.get(indexList.size() - 1);
this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(),
lastDelayedIndex.getEntryId(), bucket);
for (DelayedMessageIndexBucketSnapshotFormat.DelayedIndex index : indexList) {
sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(),
index.getEntryId());
}
}
}).whenComplete((__, ex) -> {
if (ex != null) {
// Back bucket state
bucket.setCurrentSegmentEntryId(preSegmentEntryId);

log.error("[{}] Failed to load bucket snapshot segment, bucketKey: {}, segmentEntryId: {}",
dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1, ex);
log.error("[{}] Failed to load bucket snapshot segment, bucketKey: {}, segmentEntryId: {}",
dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1, ex);

stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.load);
} else {
log.info("[{}] Load next bucket snapshot segment finish, bucketKey: {}, segmentEntryId: {}",
dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1);
stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.load);
} else {
log.info("[{}] Load next bucket snapshot segment finish, bucketKey: {}, segmentEntryId: {}",
dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1);

stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load,
System.currentTimeMillis() - loadStartTime);
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load,
System.currentTimeMillis() - loadStartTime);
}
synchronized (this) {
if (timeout != null) {
timeout.cancel();
}
}).get(AsyncOperationTimeoutSeconds * (MaxRetryTimes + 2), TimeUnit.SECONDS);
} catch (Exception e) {
// Ignore exception to reload this segment on the next schedule.
log.error("[{}] An exception occurs when load next bucket snapshot, bucketKey:{},segmentEntryId:{}",
dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1, e);
timeout = timer.newTimeout(this, tickTimeMillis, TimeUnit.MILLISECONDS);
}
});

if (!checkPendingOpDone() || loadFuture.isCompletedExceptionally()) {
break;
}
}
Expand All @@ -644,6 +651,14 @@ public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessa
return positions;
}

private synchronized boolean checkPendingOpDone() {
if (pendingLoad == null || pendingLoad.isDone()) {
pendingLoad = null;
return true;
}
return false;
}

@Override
public boolean shouldPauseAllDeliveries() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -197,9 +198,11 @@ public void testRecoverSnapshot(BucketDelayedDeliveryTracker tracker) {
});

assertTrue(tracker.hasMessageAvailable());
Set<PositionImpl> scheduledMessages = tracker.getScheduledMessages(100);

assertEquals(scheduledMessages.size(), 1);
Set<PositionImpl> scheduledMessages = new TreeSet<>();
Awaitility.await().untilAsserted(() -> {
scheduledMessages.addAll(tracker.getScheduledMessages(100));
assertEquals(scheduledMessages.size(), 1);
});

tracker.addMessage(101, 101, 101 * 10);

Expand All @@ -216,12 +219,15 @@ public void testRecoverSnapshot(BucketDelayedDeliveryTracker tracker) {
clockTime.set(100 * 10);

assertTrue(tracker2.hasMessageAvailable());
scheduledMessages = tracker2.getScheduledMessages(70);
Set<PositionImpl> scheduledMessages2 = new TreeSet<>();

assertEquals(scheduledMessages.size(), 70);
Awaitility.await().untilAsserted(() -> {
scheduledMessages2.addAll(tracker2.getScheduledMessages(70));
assertEquals(scheduledMessages2.size(), 70);
});

int i = 31;
for (PositionImpl scheduledMessage : scheduledMessages) {
for (PositionImpl scheduledMessage : scheduledMessages2) {
assertEquals(scheduledMessage, PositionImpl.get(i, i));
i++;
}
Expand Down Expand Up @@ -298,7 +304,11 @@ public void testMergeSnapshot(final BucketDelayedDeliveryTracker tracker) {

clockTime.set(110 * 10);

NavigableSet<PositionImpl> scheduledMessages = tracker2.getScheduledMessages(110);
NavigableSet<PositionImpl> scheduledMessages = new TreeSet<>();
Awaitility.await().untilAsserted(() -> {
scheduledMessages.addAll(tracker2.getScheduledMessages(110));
assertEquals(scheduledMessages.size(), 110);
});
for (int i = 1; i <= 110; i++) {
PositionImpl position = scheduledMessages.pollFirst();
assertEquals(position, PositionImpl.get(i, i));
Expand Down Expand Up @@ -370,7 +380,11 @@ public void testWithBkException(final BucketDelayedDeliveryTracker tracker) {

assertEquals(tracker2.getScheduledMessages(100).size(), 0);

assertEquals(tracker2.getScheduledMessages(100).size(), delayedMessagesInSnapshotValue);
Set<PositionImpl> scheduledMessages = new TreeSet<>();
Awaitility.await().untilAsserted(() -> {
scheduledMessages.addAll(tracker2.getScheduledMessages(100));
assertEquals(scheduledMessages.size(), delayedMessagesInSnapshotValue);
});

assertTrue(mockBucketSnapshotStorage.createExceptionQueue.isEmpty());
assertTrue(mockBucketSnapshotStorage.getMetaDataExceptionQueue.isEmpty());
Expand Down

0 comments on commit 56f4902

Please sign in to comment.