-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker] Ensure previous delayed index be removed from snapshotSegmentLastIndexTable & Make load operate asynchronous #20086
Changes from all commits
1ee04ac
be3f980
3ff84a8
56f4902
253cefe
0639929
84691ea
51f2770
0239f4d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,7 +21,6 @@ | |
import static com.google.common.base.Preconditions.checkArgument; | ||
import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELAYED_BUCKET_KEY_PREFIX; | ||
import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELIMITER; | ||
import static org.apache.pulsar.broker.delayed.bucket.Bucket.MaxRetryTimes; | ||
import com.google.common.annotations.VisibleForTesting; | ||
import com.google.common.collect.HashBasedTable; | ||
import com.google.common.collect.Range; | ||
|
@@ -82,7 +81,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker | |
|
||
private final int maxNumBuckets; | ||
|
||
private long numberDelayedMessages; | ||
private volatile long numberDelayedMessages; | ||
|
||
@Getter | ||
@VisibleForTesting | ||
|
@@ -100,6 +99,8 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker | |
|
||
private final BucketDelayedMessageIndexStats stats; | ||
|
||
private CompletableFuture<Void> pendingLoad = null; | ||
|
||
public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, | ||
Timer timer, long tickTimeMillis, | ||
boolean isDelayedDeliveryDeliverAtTimeStrict, | ||
|
@@ -267,7 +268,7 @@ private void afterCreateImmutableBucket(Pair<ImmutableBucket, DelayedIndex> immu | |
if (ex == null) { | ||
immutableBucket.setSnapshotSegments(null); | ||
immutableBucket.asyncUpdateSnapshotLength(); | ||
log.info("[{}] Creat bucket snapshot finish, bucketKey: {}", dispatcher.getName(), | ||
log.info("[{}] Create bucket snapshot finish, bucketKey: {}", dispatcher.getName(), | ||
immutableBucket.bucketKey()); | ||
|
||
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.create, | ||
|
@@ -527,17 +528,25 @@ protected long nextDeliveryTime() { | |
} | ||
|
||
@Override | ||
public synchronized long getNumberOfDelayedMessages() { | ||
public long getNumberOfDelayedMessages() { | ||
return numberDelayedMessages; | ||
} | ||
|
||
@Override | ||
public synchronized long getBufferMemoryUsage() { | ||
public long getBufferMemoryUsage() { | ||
return this.lastMutableBucket.getBufferMemoryUsage() + sharedBucketPriorityQueue.bytesCapacity(); | ||
} | ||
|
||
@Override | ||
public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessages) { | ||
if (!checkPendingOpDone()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does the loading task only triggered we there is not any scheduled message left ? it seems dispatcher have to wait for the loading task finished before any scheduled message on time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, the delivery of the message will be delayed. I think we can optimize this part in the future. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. agree. hope to see PIP-195 in 3.0.0 : - ) |
||
if (log.isDebugEnabled()) { | ||
log.debug("[{}] Skip getScheduledMessages to wait for bucket snapshot load finish.", | ||
dispatcher.getName()); | ||
} | ||
return Collections.emptyNavigableSet(); | ||
} | ||
|
||
long cutoffTime = getCutoffTime(); | ||
|
||
lastMutableBucket.moveScheduledMessageToSharedQueue(cutoffTime, sharedBucketPriorityQueue); | ||
|
@@ -556,6 +565,7 @@ public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessa | |
|
||
ImmutableBucket bucket = snapshotSegmentLastIndexTable.get(ledgerId, entryId); | ||
if (bucket != null && immutableBuckets.asMapOfRanges().containsValue(bucket)) { | ||
// All message of current snapshot segment are scheduled, try load next snapshot segment | ||
if (bucket.merging) { | ||
log.info("[{}] Skip load to wait for bucket snapshot merge finish, bucketKey:{}", | ||
dispatcher.getName(), bucket.bucketKey()); | ||
|
@@ -567,26 +577,19 @@ public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessa | |
log.debug("[{}] Loading next bucket snapshot segment, bucketKey: {}, nextSegmentEntryId: {}", | ||
dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1); | ||
} | ||
// All message of current snapshot segment are scheduled, load next snapshot segment | ||
// TODO make it asynchronous and not blocking this process | ||
try { | ||
boolean createFutureDone = bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone(); | ||
|
||
if (!createFutureDone) { | ||
log.info("[{}] Skip load to wait for bucket snapshot create finish, bucketKey:{}", | ||
dispatcher.getName(), bucket.bucketKey()); | ||
break; | ||
} | ||
|
||
if (bucket.currentSegmentEntryId == bucket.lastSegmentEntryId) { | ||
immutableBuckets.asMapOfRanges().remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId)); | ||
bucket.asyncDeleteBucketSnapshot(stats); | ||
continue; | ||
} | ||
boolean createFutureDone = bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone(); | ||
if (!createFutureDone) { | ||
log.info("[{}] Skip load to wait for bucket snapshot create finish, bucketKey:{}", | ||
dispatcher.getName(), bucket.bucketKey()); | ||
break; | ||
} | ||
|
||
long loadStartTime = System.currentTimeMillis(); | ||
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load); | ||
bucket.asyncLoadNextBucketSnapshotEntry().thenAccept(indexList -> { | ||
long loadStartTime = System.currentTimeMillis(); | ||
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load); | ||
CompletableFuture<Void> loadFuture = pendingLoad = bucket.asyncLoadNextBucketSnapshotEntry() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can this line split into multi line ? var a = b = c seems not usual : - ) |
||
.thenAccept(indexList -> { | ||
synchronized (BucketDelayedDeliveryTracker.this) { | ||
this.snapshotSegmentLastIndexTable.remove(ledgerId, entryId); | ||
if (CollectionUtils.isEmpty(indexList)) { | ||
immutableBuckets.asMapOfRanges() | ||
.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId)); | ||
|
@@ -601,31 +604,36 @@ public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessa | |
sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(), | ||
index.getEntryId()); | ||
} | ||
}).whenComplete((__, ex) -> { | ||
if (ex != null) { | ||
// Back bucket state | ||
bucket.setCurrentSegmentEntryId(preSegmentEntryId); | ||
|
||
log.error("[{}] Failed to load bucket snapshot segment, bucketKey: {}, segmentEntryId: {}", | ||
dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1, ex); | ||
|
||
stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.load); | ||
} else { | ||
log.info("[{}] Load next bucket snapshot segment finish, bucketKey: {}, segmentEntryId: {}", | ||
dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1); | ||
|
||
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load, | ||
System.currentTimeMillis() - loadStartTime); | ||
} | ||
}).whenComplete((__, ex) -> { | ||
if (ex != null) { | ||
// Back bucket state | ||
bucket.setCurrentSegmentEntryId(preSegmentEntryId); | ||
|
||
log.error("[{}] Failed to load bucket snapshot segment, bucketKey: {}, segmentEntryId: {}", | ||
dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1, ex); | ||
|
||
stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.load); | ||
} else { | ||
log.info("[{}] Load next bucket snapshot segment finish, bucketKey: {}, segmentEntryId: {}", | ||
dispatcher.getName(), bucket.bucketKey(), | ||
(preSegmentEntryId == bucket.lastSegmentEntryId) ? "-1" : preSegmentEntryId + 1); | ||
|
||
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load, | ||
System.currentTimeMillis() - loadStartTime); | ||
} | ||
synchronized (this) { | ||
if (timeout != null) { | ||
timeout.cancel(); | ||
} | ||
}).get(AsyncOperationTimeoutSeconds * (MaxRetryTimes + 1), TimeUnit.SECONDS); | ||
} catch (Exception e) { | ||
// Ignore exception to reload this segment on the next schedule. | ||
log.error("[{}] An exception occurs when load next bucket snapshot, bucketKey:{}", | ||
dispatcher.getName(), bucket.bucketKey(), e); | ||
timeout = timer.newTimeout(this, tickTimeMillis, TimeUnit.MILLISECONDS); | ||
} | ||
}); | ||
|
||
if (!checkPendingOpDone() || loadFuture.isCompletedExceptionally()) { | ||
break; | ||
} | ||
} | ||
snapshotSegmentLastIndexTable.remove(ledgerId, entryId); | ||
|
||
positions.add(new PositionImpl(ledgerId, entryId)); | ||
|
||
|
@@ -641,6 +649,14 @@ public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessa | |
return positions; | ||
} | ||
|
||
private synchronized boolean checkPendingOpDone() { | ||
if (pendingLoad == null || pendingLoad.isDone()) { | ||
pendingLoad = null; | ||
return true; | ||
} | ||
return false; | ||
} | ||
|
||
@Override | ||
public boolean shouldPauseAllDeliveries() { | ||
return false; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need an
volatile
on fieldnumberDelayedMessage
if we access without lock.