Skip to content

Commit

Permalink
Cut off bucket delayed index snapshot segment according to maxIndexes…
Browse files Browse the repository at this point in the history
…PerBucketSnapshotSegment.
  • Loading branch information
coderzc committed Mar 3, 2023
1 parent 0e6ef10 commit 1adaaa4
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -361,18 +361,20 @@ public class ServiceConfiguration implements PulsarConfiguration {
private long delayedDeliveryMinIndexCountPerBucket = 50000;

@FieldContext(category = CATEGORY_SERVER, doc = """
The delayed message index bucket time step(in seconds) in per bucket snapshot segment, \
The delayed message index time step(in seconds) in per bucket snapshot segment, \
after reaching the max time step limitation, the snapshot segment will be cut off.""")
private long delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds = 300;
private int delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds = 300;

@FieldContext(category = CATEGORY_SERVER, doc = """
The max number of delayed message index in per bucket snapshot segment, \
after reaching the max number limitation, the snapshot segment will be cut off.""")
private int delayedDeliveryMaxIndexesPerBucketSnapshotSegment = 500;

@FieldContext(category = CATEGORY_SERVER, doc = """
The max number of delayed message index bucket, \
after reaching the max buckets limitation, the adjacent buckets will be merged.""")
private int delayedDeliveryMaxNumBuckets = 50;

@FieldContext(category = CATEGORY_SERVER, doc = "Enable share the delayed message index across subscriptions")
private boolean delayedDeliverySharedIndexEnabled = false;

@FieldContext(category = CATEGORY_SERVER, doc = "Size of the lookahead window to use "
+ "when detecting if all the messages in the topic have a fixed delay. "
+ "Default is 50,000. Setting the lookahead window to 0 will disable the "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ public class BucketDelayedDeliveryTrackerFactory implements DelayedDeliveryTrack

private long delayedDeliveryMinIndexCountPerBucket;

private long delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds;
private int delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds;

private int delayedDeliveryMaxIndexesPerBucketSnapshotSegment;

@Override
public void initialize(PulsarService pulsarService) throws Exception {
Expand All @@ -58,14 +60,16 @@ public void initialize(PulsarService pulsarService) throws Exception {
this.delayedDeliveryMaxNumBuckets = config.getDelayedDeliveryMaxNumBuckets();
this.delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds =
config.getDelayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds();
this.delayedDeliveryMaxIndexesPerBucketSnapshotSegment =
config.getDelayedDeliveryMaxIndexesPerBucketSnapshotSegment();
}

@Override
public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
return new BucketDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis, isDelayedDeliveryDeliverAtTimeStrict,
bucketSnapshotStorage, delayedDeliveryMinIndexCountPerBucket,
TimeUnit.SECONDS.toMillis(delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds),
delayedDeliveryMaxNumBuckets);
delayedDeliveryMaxIndexesPerBucketSnapshotSegment, delayedDeliveryMaxNumBuckets);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker

private final long timeStepPerBucketSnapshotSegmentInMillis;

private final int maxIndexesPerBucketSnapshotSegment;

private final int maxNumBuckets;

private long numberDelayedMessages;
Expand All @@ -89,20 +91,22 @@ public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispat
boolean isDelayedDeliveryDeliverAtTimeStrict,
BucketSnapshotStorage bucketSnapshotStorage,
long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis,
int maxNumBuckets) {
int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets) {
this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict,
bucketSnapshotStorage, minIndexCountPerBucket, timeStepPerBucketSnapshotSegmentInMillis, maxNumBuckets);
bucketSnapshotStorage, minIndexCountPerBucket, timeStepPerBucketSnapshotSegmentInMillis,
maxIndexesPerBucketSnapshotSegment, maxNumBuckets);
}

public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher,
Timer timer, long tickTimeMillis, Clock clock,
boolean isDelayedDeliveryDeliverAtTimeStrict,
BucketSnapshotStorage bucketSnapshotStorage,
long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis,
int maxNumBuckets) {
int maxIndexesPerBucketSnapshotSegment,int maxNumBuckets) {
super(dispatcher, timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict);
this.minIndexCountPerBucket = minIndexCountPerBucket;
this.timeStepPerBucketSnapshotSegmentInMillis = timeStepPerBucketSnapshotSegmentInMillis;
this.maxIndexesPerBucketSnapshotSegment = maxIndexesPerBucketSnapshotSegment;
this.maxNumBuckets = maxNumBuckets;
this.sharedBucketPriorityQueue = new TripleLongPriorityQueue();
this.immutableBuckets = TreeRangeMap.create();
Expand Down Expand Up @@ -292,7 +296,9 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver
&& lastMutableBucket.size() >= minIndexCountPerBucket
&& !lastMutableBucket.isEmpty()) {
Pair<ImmutableBucket, DelayedIndex> immutableBucketDelayedIndexPair =
lastMutableBucket.sealBucketAndAsyncPersistent(this.timeStepPerBucketSnapshotSegmentInMillis,
lastMutableBucket.sealBucketAndAsyncPersistent(
this.timeStepPerBucketSnapshotSegmentInMillis,
this.maxIndexesPerBucketSnapshotSegment,
this.sharedBucketPriorityQueue);
afterCreateImmutableBucket(immutableBucketDelayedIndexPair);
lastMutableBucket.resetLastMutableBucketRange();
Expand Down Expand Up @@ -380,8 +386,9 @@ private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot(ImmutableB
.thenAccept(combinedDelayedIndexQueue -> {
Pair<ImmutableBucket, DelayedIndex> immutableBucketDelayedIndexPair =
lastMutableBucket.createImmutableBucketAndAsyncPersistent(
timeStepPerBucketSnapshotSegmentInMillis, sharedBucketPriorityQueue,
combinedDelayedIndexQueue, bucketA.startLedgerId, bucketB.endLedgerId);
timeStepPerBucketSnapshotSegmentInMillis, maxIndexesPerBucketSnapshotSegment,
sharedBucketPriorityQueue, combinedDelayedIndexQueue, bucketA.startLedgerId,
bucketB.endLedgerId);

// Merge bit map to new bucket
Map<Long, RoaringBitmap> delayedIndexBitMapA = bucketA.getDelayedIndexBitMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,15 @@ class MutableBucket extends Bucket implements AutoCloseable {

Pair<ImmutableBucket, DelayedIndex> sealBucketAndAsyncPersistent(
long timeStepPerBucketSnapshotSegment,
int maxIndexesPerBucketSnapshotSegment,
TripleLongPriorityQueue sharedQueue) {
return createImmutableBucketAndAsyncPersistent(timeStepPerBucketSnapshotSegment, sharedQueue,
return createImmutableBucketAndAsyncPersistent(timeStepPerBucketSnapshotSegment,
maxIndexesPerBucketSnapshotSegment, sharedQueue,
TripleLongPriorityDelayedIndexQueue.wrap(priorityQueue), startLedgerId, endLedgerId);
}

Pair<ImmutableBucket, DelayedIndex> createImmutableBucketAndAsyncPersistent(
final long timeStepPerBucketSnapshotSegment,
final long timeStepPerBucketSnapshotSegment, final int maxIndexesPerBucketSnapshotSegment,
TripleLongPriorityQueue sharedQueue, DelayedIndexQueue delayedIndexQueue, final long startLedgerId,
final long endLedgerId) {
log.info("[{}] Creating bucket snapshot, startLedgerId: {}, endLedgerId: {}", dispatcherName,
Expand Down Expand Up @@ -98,7 +100,9 @@ Pair<ImmutableBucket, DelayedIndex> createImmutableBucketAndAsyncPersistent(

snapshotSegmentBuilder.addIndexes(delayedIndex);

if (delayedIndexQueue.isEmpty() || delayedIndexQueue.peek().getTimestamp() > currentTimestampUpperLimit) {
if (delayedIndexQueue.isEmpty() || delayedIndexQueue.peek().getTimestamp() > currentTimestampUpperLimit
|| (maxIndexesPerBucketSnapshotSegment != -1
&& snapshotSegmentBuilder.getIndexesCount() >= maxIndexesPerBucketSnapshotSegment)) {
segmentMetadataBuilder.setMaxScheduleTimestamp(timestamp);
currentTimestampUpperLimit = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public Object[][] provider(Method method) throws Exception {
return switch (methodName) {
case "test" -> new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 1, clock,
false, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50)
false, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1,50)
}};
case "testWithTimer" -> {
Timer timer = mock(Timer.class);
Expand Down Expand Up @@ -113,39 +113,39 @@ public Object[][] provider(Method method) throws Exception {

yield new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 1, clock,
false, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50),
false, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1, 50),
tasks
}};
}
case "testAddWithinTickTime" -> new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 100, clock,
false, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50)
false, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1,50)
}};
case "testAddMessageWithStrictDelay" -> new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 100, clock,
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50)
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1,50)
}};
case "testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict" -> new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 1000, clock,
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50)
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1,50)
}};
case "testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict", "testRecoverSnapshot" ->
new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 100000, clock,
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50)
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1,50)
}};
case "testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict", "testExistDelayedMessage" ->
new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 500, clock,
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50)
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1,50)
}};
case "testMergeSnapshot", "testWithBkException", "testWithCreateFailDowngrade" -> new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 100000, clock,
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 10)
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1,10)
}};
default -> new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 1, clock,
true, bucketSnapshotStorage, 1000, TimeUnit.MILLISECONDS.toMillis(100), 50)
true, bucketSnapshotStorage, 1000, TimeUnit.MILLISECONDS.toMillis(100), -1,50)
}};
};
}
Expand Down Expand Up @@ -196,7 +196,7 @@ public void testRecoverSnapshot(BucketDelayedDeliveryTracker tracker) {
clockTime.set(30 * 10);

tracker = new BucketDelayedDeliveryTracker(dispatcher, timer, 1000, clock,
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50);
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1,50);

assertFalse(tracker.containsMessage(101, 101));
assertEquals(tracker.getNumberOfDelayedMessages(), 70);
Expand Down Expand Up @@ -268,7 +268,7 @@ public void testMergeSnapshot(BucketDelayedDeliveryTracker tracker) {
tracker.close();

tracker = new BucketDelayedDeliveryTracker(dispatcher, timer, 1000, clock,
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 10);
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1,10);

assertEquals(tracker.getNumberOfDelayedMessages(), delayedMessagesInSnapshot.getValue());

Expand Down Expand Up @@ -322,7 +322,7 @@ public void testWithBkException(BucketDelayedDeliveryTracker tracker) {
tracker.close();

tracker = new BucketDelayedDeliveryTracker(dispatcher, timer, 1000, clock,
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 10);
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1,10);

Long delayedMessagesInSnapshotValue = delayedMessagesInSnapshot.getValue();
assertEquals(tracker.getNumberOfDelayedMessages(), delayedMessagesInSnapshotValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public void setup() throws Exception {
conf.setDelayedDeliveryTrackerFactoryClassName(BucketDelayedDeliveryTrackerFactory.class.getName());
conf.setDelayedDeliveryMaxNumBuckets(10);
conf.setDelayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds(1);
conf.setDelayedDeliveryMaxIndexesPerBucketSnapshotSegment(10);
conf.setDelayedDeliveryMinIndexCountPerBucket(50);
conf.setManagedLedgerMaxEntriesPerLedger(50);
conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
Expand Down

0 comments on commit 1adaaa4

Please sign in to comment.