Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Ensure previous delayed index be removed from snapshotSegmentLastIndexTable & Make load operate asynchronous #20086

Merged
merged 9 commits into from
Apr 16, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELAYED_BUCKET_KEY_PREFIX;
import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELIMITER;
import static org.apache.pulsar.broker.delayed.bucket.Bucket.MaxRetryTimes;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Range;
Expand Down Expand Up @@ -100,6 +99,8 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker

private final BucketDelayedMessageIndexStats stats;

private CompletableFuture<Void> pendingLoad = null;

public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher,
Timer timer, long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict,
Expand Down Expand Up @@ -267,7 +268,7 @@ private void afterCreateImmutableBucket(Pair<ImmutableBucket, DelayedIndex> immu
if (ex == null) {
immutableBucket.setSnapshotSegments(null);
immutableBucket.asyncUpdateSnapshotLength();
log.info("[{}] Creat bucket snapshot finish, bucketKey: {}", dispatcher.getName(),
log.info("[{}] Create bucket snapshot finish, bucketKey: {}", dispatcher.getName(),
immutableBucket.bucketKey());

stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.create,
Expand Down Expand Up @@ -527,17 +528,22 @@ protected long nextDeliveryTime() {
}

@Override
public synchronized long getNumberOfDelayedMessages() {
public long getNumberOfDelayedMessages() {
return numberDelayedMessages;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need an volatile on field numberDelayedMessage if we access without lock.

}

@Override
public synchronized long getBufferMemoryUsage() {
public long getBufferMemoryUsage() {
return this.lastMutableBucket.getBufferMemoryUsage() + sharedBucketPriorityQueue.bytesCapacity();
}

@Override
public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessages) {
if (!checkPendingOpDone()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the loading task only triggered we there is not any scheduled message left ? it seems dispatcher have to wait for the loading task finished before any scheduled message on time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the delivery of the message will be delayed. I think we can optimize this part in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree. hope to see PIP-195 in 3.0.0 : - )

log.info("[{}] Skip getScheduledMessages to wait for bucket snapshot load finish.", dispatcher.getName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this method is on the hot path. we need change this log level to debug to avoid print logs frequently.

return Collections.emptyNavigableSet();
}

long cutoffTime = getCutoffTime();

lastMutableBucket.moveScheduledMessageToSharedQueue(cutoffTime, sharedBucketPriorityQueue);
Expand All @@ -556,6 +562,7 @@ public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessa

ImmutableBucket bucket = snapshotSegmentLastIndexTable.get(ledgerId, entryId);
if (bucket != null && immutableBuckets.asMapOfRanges().containsValue(bucket)) {
// All message of current snapshot segment are scheduled, try load next snapshot segment
if (bucket.merging) {
log.info("[{}] Skip load to wait for bucket snapshot merge finish, bucketKey:{}",
dispatcher.getName(), bucket.bucketKey());
Expand All @@ -567,26 +574,19 @@ public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessa
log.debug("[{}] Loading next bucket snapshot segment, bucketKey: {}, nextSegmentEntryId: {}",
dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1);
}
// All message of current snapshot segment are scheduled, load next snapshot segment
// TODO make it asynchronous and not blocking this process
try {
boolean createFutureDone = bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone();

if (!createFutureDone) {
log.info("[{}] Skip load to wait for bucket snapshot create finish, bucketKey:{}",
dispatcher.getName(), bucket.bucketKey());
break;
}

if (bucket.currentSegmentEntryId == bucket.lastSegmentEntryId) {
immutableBuckets.asMapOfRanges().remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
bucket.asyncDeleteBucketSnapshot(stats);
continue;
}
boolean createFutureDone = bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone();
if (!createFutureDone) {
log.info("[{}] Skip load to wait for bucket snapshot create finish, bucketKey:{}",
dispatcher.getName(), bucket.bucketKey());
break;
}

long loadStartTime = System.currentTimeMillis();
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load);
bucket.asyncLoadNextBucketSnapshotEntry().thenAccept(indexList -> {
long loadStartTime = System.currentTimeMillis();
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load);
CompletableFuture<Void> loadFuture = pendingLoad = bucket.asyncLoadNextBucketSnapshotEntry()
Copy link
Contributor

@lifepuzzlefun lifepuzzlefun Apr 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this line split into multi line ? var a = b = c seems not usual : - )

.thenAccept(indexList -> {
synchronized (BucketDelayedDeliveryTracker.this) {
this.snapshotSegmentLastIndexTable.remove(ledgerId, entryId);
if (CollectionUtils.isEmpty(indexList)) {
immutableBuckets.asMapOfRanges()
.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
Expand All @@ -601,31 +601,36 @@ public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessa
sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(),
index.getEntryId());
}
}).whenComplete((__, ex) -> {
if (ex != null) {
// Back bucket state
bucket.setCurrentSegmentEntryId(preSegmentEntryId);

log.error("[{}] Failed to load bucket snapshot segment, bucketKey: {}, segmentEntryId: {}",
dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1, ex);

stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.load);
} else {
log.info("[{}] Load next bucket snapshot segment finish, bucketKey: {}, segmentEntryId: {}",
dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1);

stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load,
System.currentTimeMillis() - loadStartTime);
}
}).whenComplete((__, ex) -> {
if (ex != null) {
// Back bucket state
bucket.setCurrentSegmentEntryId(preSegmentEntryId);

log.error("[{}] Failed to load bucket snapshot segment, bucketKey: {}, segmentEntryId: {}",
dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1, ex);

stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.load);
} else {
log.info("[{}] Load next bucket snapshot segment finish, bucketKey: {}, segmentEntryId: {}",
dispatcher.getName(), bucket.bucketKey(),
(preSegmentEntryId == bucket.lastSegmentEntryId) ? "-1" : preSegmentEntryId + 1);

stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load,
System.currentTimeMillis() - loadStartTime);
}
synchronized (this) {
if (timeout != null) {
timeout.cancel();
}
}).get(AsyncOperationTimeoutSeconds * (MaxRetryTimes + 1), TimeUnit.SECONDS);
} catch (Exception e) {
// Ignore exception to reload this segment on the next schedule.
log.error("[{}] An exception occurs when load next bucket snapshot, bucketKey:{}",
dispatcher.getName(), bucket.bucketKey(), e);
timeout = timer.newTimeout(this, tickTimeMillis, TimeUnit.MILLISECONDS);
}
});

if (!checkPendingOpDone() || loadFuture.isCompletedExceptionally()) {
break;
}
}
snapshotSegmentLastIndexTable.remove(ledgerId, entryId);

positions.add(new PositionImpl(ledgerId, entryId));

Expand All @@ -641,6 +646,14 @@ public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessa
return positions;
}

private synchronized boolean checkPendingOpDone() {
if (pendingLoad == null || pendingLoad.isDone()) {
pendingLoad = null;
return true;
}
return false;
}

@Override
public boolean shouldPauseAllDeliveries() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker;
Expand Down Expand Up @@ -1088,7 +1087,7 @@ protected synchronized boolean shouldPauseDeliveryForDelayTracker() {
}

@Override
public synchronized long getNumberOfDelayedMessages() {
public long getNumberOfDelayedMessages() {
return delayedDeliveryTracker.map(DelayedDeliveryTracker::getNumberOfDelayedMessages).orElse(0L);
}

Expand Down Expand Up @@ -1172,15 +1171,7 @@ public PersistentTopic getTopic() {


public long getDelayedTrackerMemoryUsage() {
if (delayedDeliveryTracker.isEmpty()) {
return 0;
}

if (delayedDeliveryTracker.get() instanceof AbstractDelayedDeliveryTracker) {
return delayedDeliveryTracker.get().getBufferMemoryUsage();
}

return 0;
return delayedDeliveryTracker.map(DelayedDeliveryTracker::getBufferMemoryUsage).orElse(0L);
}

public Map<String, TopicMetricBean> getBucketDelayedIndexStats() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -197,9 +198,11 @@ public void testRecoverSnapshot(BucketDelayedDeliveryTracker tracker) {
});

assertTrue(tracker.hasMessageAvailable());
Set<PositionImpl> scheduledMessages = tracker.getScheduledMessages(100);

assertEquals(scheduledMessages.size(), 1);
Set<PositionImpl> scheduledMessages = new TreeSet<>();
Awaitility.await().untilAsserted(() -> {
scheduledMessages.addAll(tracker.getScheduledMessages(100));
assertEquals(scheduledMessages.size(), 1);
});

tracker.addMessage(101, 101, 101 * 10);

Expand All @@ -216,12 +219,15 @@ public void testRecoverSnapshot(BucketDelayedDeliveryTracker tracker) {
clockTime.set(100 * 10);

assertTrue(tracker2.hasMessageAvailable());
scheduledMessages = tracker2.getScheduledMessages(70);
Set<PositionImpl> scheduledMessages2 = new TreeSet<>();

assertEquals(scheduledMessages.size(), 70);
Awaitility.await().untilAsserted(() -> {
scheduledMessages2.addAll(tracker2.getScheduledMessages(70));
assertEquals(scheduledMessages2.size(), 70);
});

int i = 31;
for (PositionImpl scheduledMessage : scheduledMessages) {
for (PositionImpl scheduledMessage : scheduledMessages2) {
assertEquals(scheduledMessage, PositionImpl.get(i, i));
i++;
}
Expand Down Expand Up @@ -298,7 +304,11 @@ public void testMergeSnapshot(final BucketDelayedDeliveryTracker tracker) {

clockTime.set(110 * 10);

NavigableSet<PositionImpl> scheduledMessages = tracker2.getScheduledMessages(110);
NavigableSet<PositionImpl> scheduledMessages = new TreeSet<>();
Awaitility.await().untilAsserted(() -> {
scheduledMessages.addAll(tracker2.getScheduledMessages(110));
assertEquals(scheduledMessages.size(), 110);
});
for (int i = 1; i <= 110; i++) {
PositionImpl position = scheduledMessages.pollFirst();
assertEquals(position, PositionImpl.get(i, i));
Expand Down Expand Up @@ -370,7 +380,11 @@ public void testWithBkException(final BucketDelayedDeliveryTracker tracker) {

assertEquals(tracker2.getScheduledMessages(100).size(), 0);

assertEquals(tracker2.getScheduledMessages(100).size(), delayedMessagesInSnapshotValue);
Set<PositionImpl> scheduledMessages = new TreeSet<>();
Awaitility.await().untilAsserted(() -> {
scheduledMessages.addAll(tracker2.getScheduledMessages(100));
assertEquals(scheduledMessages.size(), delayedMessagesInSnapshotValue);
});

assertTrue(mockBucketSnapshotStorage.createExceptionQueue.isEmpty());
assertTrue(mockBucketSnapshotStorage.getMetaDataExceptionQueue.isEmpty());
Expand Down