Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker][PIP-195] Cut off snapshot segment according to maxIndexesPerBucketSnapshotSegment #19706

Merged
merged 3 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -361,18 +361,20 @@ public class ServiceConfiguration implements PulsarConfiguration {
private long delayedDeliveryMinIndexCountPerBucket = 50000;

@FieldContext(category = CATEGORY_SERVER, doc = """
The delayed message index bucket time step(in seconds) in per bucket snapshot segment, \
The delayed message index time step(in seconds) in per bucket snapshot segment, \
after reaching the max time step limitation, the snapshot segment will be cut off.""")
private long delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds = 300;
private int delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds = 300;

@FieldContext(category = CATEGORY_SERVER, doc = """
The max number of delayed message index in per bucket snapshot segment, -1 means no limitation\
after reaching the max number limitation, the snapshot segment will be cut off.""")
private int delayedDeliveryMaxIndexesPerBucketSnapshotSegment = 500;

@FieldContext(category = CATEGORY_SERVER, doc = """
The max number of delayed message index bucket, \
after reaching the max buckets limitation, the adjacent buckets will be merged.""")
private int delayedDeliveryMaxNumBuckets = 50;

@FieldContext(category = CATEGORY_SERVER, doc = "Enable share the delayed message index across subscriptions")
private boolean delayedDeliverySharedIndexEnabled = false;

@FieldContext(category = CATEGORY_SERVER, doc = "Size of the lookahead window to use "
+ "when detecting if all the messages in the topic have a fixed delay. "
+ "Default is 50,000. Setting the lookahead window to 0 will disable the "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ public class BucketDelayedDeliveryTrackerFactory implements DelayedDeliveryTrack

private long delayedDeliveryMinIndexCountPerBucket;

private long delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds;
private int delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds;

private int delayedDeliveryMaxIndexesPerBucketSnapshotSegment;

@Override
public void initialize(PulsarService pulsarService) throws Exception {
Expand All @@ -58,14 +60,16 @@ public void initialize(PulsarService pulsarService) throws Exception {
this.delayedDeliveryMaxNumBuckets = config.getDelayedDeliveryMaxNumBuckets();
this.delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds =
config.getDelayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds();
this.delayedDeliveryMaxIndexesPerBucketSnapshotSegment =
config.getDelayedDeliveryMaxIndexesPerBucketSnapshotSegment();
}

@Override
public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
return new BucketDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis, isDelayedDeliveryDeliverAtTimeStrict,
bucketSnapshotStorage, delayedDeliveryMinIndexCountPerBucket,
TimeUnit.SECONDS.toMillis(delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds),
delayedDeliveryMaxNumBuckets);
delayedDeliveryMaxIndexesPerBucketSnapshotSegment, delayedDeliveryMaxNumBuckets);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker

private final long timeStepPerBucketSnapshotSegmentInMillis;

private final int maxIndexesPerBucketSnapshotSegment;

private final int maxNumBuckets;

private long numberDelayedMessages;
Expand All @@ -89,20 +91,22 @@ public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispat
boolean isDelayedDeliveryDeliverAtTimeStrict,
BucketSnapshotStorage bucketSnapshotStorage,
long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis,
int maxNumBuckets) {
int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets) {
this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict,
bucketSnapshotStorage, minIndexCountPerBucket, timeStepPerBucketSnapshotSegmentInMillis, maxNumBuckets);
bucketSnapshotStorage, minIndexCountPerBucket, timeStepPerBucketSnapshotSegmentInMillis,
maxIndexesPerBucketSnapshotSegment, maxNumBuckets);
}

public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher,
Timer timer, long tickTimeMillis, Clock clock,
boolean isDelayedDeliveryDeliverAtTimeStrict,
BucketSnapshotStorage bucketSnapshotStorage,
long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis,
int maxNumBuckets) {
int maxIndexesPerBucketSnapshotSegment,int maxNumBuckets) {
super(dispatcher, timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict);
this.minIndexCountPerBucket = minIndexCountPerBucket;
this.timeStepPerBucketSnapshotSegmentInMillis = timeStepPerBucketSnapshotSegmentInMillis;
this.maxIndexesPerBucketSnapshotSegment = maxIndexesPerBucketSnapshotSegment;
this.maxNumBuckets = maxNumBuckets;
this.sharedBucketPriorityQueue = new TripleLongPriorityQueue();
this.immutableBuckets = TreeRangeMap.create();
Expand Down Expand Up @@ -292,7 +296,9 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver
&& lastMutableBucket.size() >= minIndexCountPerBucket
&& !lastMutableBucket.isEmpty()) {
Pair<ImmutableBucket, DelayedIndex> immutableBucketDelayedIndexPair =
lastMutableBucket.sealBucketAndAsyncPersistent(this.timeStepPerBucketSnapshotSegmentInMillis,
lastMutableBucket.sealBucketAndAsyncPersistent(
this.timeStepPerBucketSnapshotSegmentInMillis,
this.maxIndexesPerBucketSnapshotSegment,
this.sharedBucketPriorityQueue);
afterCreateImmutableBucket(immutableBucketDelayedIndexPair);
lastMutableBucket.resetLastMutableBucketRange();
Expand Down Expand Up @@ -380,8 +386,9 @@ private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot(ImmutableB
.thenAccept(combinedDelayedIndexQueue -> {
Pair<ImmutableBucket, DelayedIndex> immutableBucketDelayedIndexPair =
lastMutableBucket.createImmutableBucketAndAsyncPersistent(
timeStepPerBucketSnapshotSegmentInMillis, sharedBucketPriorityQueue,
combinedDelayedIndexQueue, bucketA.startLedgerId, bucketB.endLedgerId);
timeStepPerBucketSnapshotSegmentInMillis, maxIndexesPerBucketSnapshotSegment,
sharedBucketPriorityQueue, combinedDelayedIndexQueue, bucketA.startLedgerId,
bucketB.endLedgerId);

// Merge bit map to new bucket
Map<Long, RoaringBitmap> delayedIndexBitMapA = bucketA.getDelayedIndexBitMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,15 @@ class MutableBucket extends Bucket implements AutoCloseable {

Pair<ImmutableBucket, DelayedIndex> sealBucketAndAsyncPersistent(
long timeStepPerBucketSnapshotSegment,
int maxIndexesPerBucketSnapshotSegment,
TripleLongPriorityQueue sharedQueue) {
return createImmutableBucketAndAsyncPersistent(timeStepPerBucketSnapshotSegment, sharedQueue,
return createImmutableBucketAndAsyncPersistent(timeStepPerBucketSnapshotSegment,
maxIndexesPerBucketSnapshotSegment, sharedQueue,
TripleLongPriorityDelayedIndexQueue.wrap(priorityQueue), startLedgerId, endLedgerId);
}

Pair<ImmutableBucket, DelayedIndex> createImmutableBucketAndAsyncPersistent(
final long timeStepPerBucketSnapshotSegment,
final long timeStepPerBucketSnapshotSegment, final int maxIndexesPerBucketSnapshotSegment,
TripleLongPriorityQueue sharedQueue, DelayedIndexQueue delayedIndexQueue, final long startLedgerId,
final long endLedgerId) {
log.info("[{}] Creating bucket snapshot, startLedgerId: {}, endLedgerId: {}", dispatcherName,
Expand Down Expand Up @@ -98,7 +100,9 @@ Pair<ImmutableBucket, DelayedIndex> createImmutableBucketAndAsyncPersistent(

snapshotSegmentBuilder.addIndexes(delayedIndex);

if (delayedIndexQueue.isEmpty() || delayedIndexQueue.peek().getTimestamp() > currentTimestampUpperLimit) {
if (delayedIndexQueue.isEmpty() || delayedIndexQueue.peek().getTimestamp() > currentTimestampUpperLimit
|| (maxIndexesPerBucketSnapshotSegment != -1
&& snapshotSegmentBuilder.getIndexesCount() >= maxIndexesPerBucketSnapshotSegment)) {
segmentMetadataBuilder.setMaxScheduleTimestamp(timestamp);
currentTimestampUpperLimit = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public Object[][] provider(Method method) throws Exception {
return switch (methodName) {
case "test" -> new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 1, clock,
false, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50)
false, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1, 50)
}};
case "testWithTimer" -> {
Timer timer = mock(Timer.class);
Expand Down Expand Up @@ -113,39 +113,43 @@ public Object[][] provider(Method method) throws Exception {

yield new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 1, clock,
false, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50),
false, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1, 50),
tasks
}};
}
case "testAddWithinTickTime" -> new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 100, clock,
false, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50)
false, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1, 50)
}};
case "testAddMessageWithStrictDelay" -> new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 100, clock,
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50)
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1, 50)
}};
case "testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict" -> new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 1000, clock,
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50)
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1, 50)
}};
case "testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict", "testRecoverSnapshot" ->
new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 100000, clock,
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50)
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1, 50)
}};
case "testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict", "testExistDelayedMessage" ->
new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 500, clock,
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50)
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1, 50)
}};
case "testMergeSnapshot", "testWithBkException", "testWithCreateFailDowngrade" -> new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 100000, clock,
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 10)
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1, 10)
}};
case "testMaxIndexesPerSegment" -> new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 100000, clock,
true, bucketSnapshotStorage, 20, TimeUnit.HOURS.toMillis(1), 5, 100)
}};
default -> new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 1, clock,
true, bucketSnapshotStorage, 1000, TimeUnit.MILLISECONDS.toMillis(100), 50)
true, bucketSnapshotStorage, 1000, TimeUnit.MILLISECONDS.toMillis(100), -1, 50)
}};
};
}
Expand Down Expand Up @@ -196,7 +200,7 @@ public void testRecoverSnapshot(BucketDelayedDeliveryTracker tracker) {
clockTime.set(30 * 10);

tracker = new BucketDelayedDeliveryTracker(dispatcher, timer, 1000, clock,
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50);
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1,50);

assertFalse(tracker.containsMessage(101, 101));
assertEquals(tracker.getNumberOfDelayedMessages(), 70);
Expand Down Expand Up @@ -268,7 +272,7 @@ public void testMergeSnapshot(BucketDelayedDeliveryTracker tracker) {
tracker.close();

tracker = new BucketDelayedDeliveryTracker(dispatcher, timer, 1000, clock,
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 10);
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1,10);

assertEquals(tracker.getNumberOfDelayedMessages(), delayedMessagesInSnapshot.getValue());

Expand Down Expand Up @@ -322,7 +326,7 @@ public void testWithBkException(BucketDelayedDeliveryTracker tracker) {
tracker.close();

tracker = new BucketDelayedDeliveryTracker(dispatcher, timer, 1000, clock,
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 10);
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1,10);

Long delayedMessagesInSnapshotValue = delayedMessagesInSnapshot.getValue();
assertEquals(tracker.getNumberOfDelayedMessages(), delayedMessagesInSnapshotValue);
Expand Down Expand Up @@ -378,4 +382,17 @@ public void testWithCreateFailDowngrade(BucketDelayedDeliveryTracker tracker) {
assertEquals(position, PositionImpl.get(i, i));
}
}

@Test(dataProvider = "delayedTracker")
public void testMaxIndexesPerSegment(BucketDelayedDeliveryTracker tracker) {
for (int i = 1; i <= 101; i++) {
tracker.addMessage(i, i, i * 10);
}

assertEquals(tracker.getImmutableBuckets().asMapOfRanges().size(), 5);

tracker.getImmutableBuckets().asMapOfRanges().forEach((k, bucket) -> {
assertEquals(bucket.getLastSegmentEntryId(), 4);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public void setup() throws Exception {
conf.setDelayedDeliveryTrackerFactoryClassName(BucketDelayedDeliveryTrackerFactory.class.getName());
conf.setDelayedDeliveryMaxNumBuckets(10);
conf.setDelayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds(1);
conf.setDelayedDeliveryMaxIndexesPerBucketSnapshotSegment(10);
conf.setDelayedDeliveryMinIndexCountPerBucket(50);
conf.setManagedLedgerMaxEntriesPerLedger(50);
conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
Expand Down