Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker][PIP-195] Cut off snapshot segment according to maxIndexesPerBucketSnapshotSegment #19706

Merged
merged 3 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -571,13 +571,14 @@ delayedDeliveryMinIndexCountPerBucket=50000
# after reaching the max time step limitation, the snapshot segment will be cut off.
delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds=300

# The max number of delayed message index in per bucket snapshot segment, -1 means no limitation
# after reaching the max number limitation, the snapshot segment will be cut off.
delayedDeliveryMaxIndexesPerBucketSnapshotSegment=5000

# The max number of delayed message index bucket,
# after reaching the max buckets limitation, the adjacent buckets will be merged.
delayedDeliveryMaxNumBuckets=50

# Enable share the delayed message index across subscriptions
delayedDeliverySharedIndexEnabled=false

# Size of the lookahead window to use when detecting if all the messages in the topic
# have a fixed delay.
# Default is 50,000. Setting the lookahead window to 0 will disable the logic to handle
Expand Down
42 changes: 42 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1223,3 +1223,45 @@ configurationStoreServers=
# zookeeper.
# Deprecated: use managedLedgerMaxUnackedRangesToPersistInMetadataStore
managedLedgerMaxUnackedRangesToPersistInZooKeeper=-1

# Whether to enable the delayed delivery for messages.
# If disabled, messages will be immediately delivered and there will
# be no tracking overhead.
delayedDeliveryEnabled=true

# Class name of the factory that implements the delayed deliver tracker.
# If value is "org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory",
# will create bucket based delayed message index tracker.
delayedDeliveryTrackerFactoryClassName=org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTrackerFactory

# Control the tick time for when retrying on delayed delivery,
# affecting the accuracy of the delivery time compared to the scheduled time.
# Note that this time is used to configure the HashedWheelTimer's tick time for the
# InMemoryDelayedDeliveryTrackerFactory (the default DelayedDeliverTrackerFactory).
# Default is 1 second.
delayedDeliveryTickTimeMillis=1000

# When using the InMemoryDelayedDeliveryTrackerFactory (the default DelayedDeliverTrackerFactory), whether
# the deliverAt time is strictly followed. When false (default), messages may be sent to consumers before the deliverAt
# time by as much as the tickTimeMillis. This can reduce the overhead on the broker of maintaining the delayed index
# for a potentially very short time period. When true, messages will not be sent to consumer until the deliverAt time
# has passed, and they may be as late as the deliverAt time plus the tickTimeMillis for the topic plus the
# delayedDeliveryTickTimeMillis.
isDelayedDeliveryDeliverAtTimeStrict=false

# The delayed message index bucket min index count.
# When the index count of the current bucket is more than this value and all message indexes of current ledger
# have already been added to the tracker we will seal the bucket.
delayedDeliveryMinIndexCountPerBucket=50000

# The delayed message index bucket time step(in seconds) in per bucket snapshot segment,
# after reaching the max time step limitation, the snapshot segment will be cut off.
delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds=300

# The max number of delayed message index in per bucket snapshot segment, -1 means no limitation
# after reaching the max number limitation, the snapshot segment will be cut off.
delayedDeliveryMaxIndexesPerBucketSnapshotSegment=5000

# The max number of delayed message index bucket,
# after reaching the max buckets limitation, the adjacent buckets will be merged.
delayedDeliveryMaxNumBuckets=50
Original file line number Diff line number Diff line change
Expand Up @@ -361,18 +361,20 @@ public class ServiceConfiguration implements PulsarConfiguration {
private long delayedDeliveryMinIndexCountPerBucket = 50000;

@FieldContext(category = CATEGORY_SERVER, doc = """
The delayed message index bucket time step(in seconds) in per bucket snapshot segment, \
The delayed message index time step(in seconds) in per bucket snapshot segment, \
after reaching the max time step limitation, the snapshot segment will be cut off.""")
private long delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds = 300;
private int delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds = 300;

@FieldContext(category = CATEGORY_SERVER, doc = """
The max number of delayed message index in per bucket snapshot segment, -1 means no limitation\
after reaching the max number limitation, the snapshot segment will be cut off.""")
private int delayedDeliveryMaxIndexesPerBucketSnapshotSegment = 5000;

@FieldContext(category = CATEGORY_SERVER, doc = """
The max number of delayed message index bucket, \
after reaching the max buckets limitation, the adjacent buckets will be merged.""")
private int delayedDeliveryMaxNumBuckets = 50;

@FieldContext(category = CATEGORY_SERVER, doc = "Enable share the delayed message index across subscriptions")
private boolean delayedDeliverySharedIndexEnabled = false;

@FieldContext(category = CATEGORY_SERVER, doc = "Size of the lookahead window to use "
+ "when detecting if all the messages in the topic have a fixed delay. "
+ "Default is 50,000. Setting the lookahead window to 0 will disable the "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ public class BucketDelayedDeliveryTrackerFactory implements DelayedDeliveryTrack

private long delayedDeliveryMinIndexCountPerBucket;

private long delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds;
private int delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds;

private int delayedDeliveryMaxIndexesPerBucketSnapshotSegment;

@Override
public void initialize(PulsarService pulsarService) throws Exception {
Expand All @@ -58,14 +60,16 @@ public void initialize(PulsarService pulsarService) throws Exception {
this.delayedDeliveryMaxNumBuckets = config.getDelayedDeliveryMaxNumBuckets();
this.delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds =
config.getDelayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds();
this.delayedDeliveryMaxIndexesPerBucketSnapshotSegment =
config.getDelayedDeliveryMaxIndexesPerBucketSnapshotSegment();
}

@Override
public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
return new BucketDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis, isDelayedDeliveryDeliverAtTimeStrict,
bucketSnapshotStorage, delayedDeliveryMinIndexCountPerBucket,
TimeUnit.SECONDS.toMillis(delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds),
delayedDeliveryMaxNumBuckets);
delayedDeliveryMaxIndexesPerBucketSnapshotSegment, delayedDeliveryMaxNumBuckets);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker

private final long timeStepPerBucketSnapshotSegmentInMillis;

private final int maxIndexesPerBucketSnapshotSegment;

private final int maxNumBuckets;

private long numberDelayedMessages;
Expand All @@ -89,20 +91,22 @@ public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispat
boolean isDelayedDeliveryDeliverAtTimeStrict,
BucketSnapshotStorage bucketSnapshotStorage,
long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis,
int maxNumBuckets) {
int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets) {
this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict,
bucketSnapshotStorage, minIndexCountPerBucket, timeStepPerBucketSnapshotSegmentInMillis, maxNumBuckets);
bucketSnapshotStorage, minIndexCountPerBucket, timeStepPerBucketSnapshotSegmentInMillis,
maxIndexesPerBucketSnapshotSegment, maxNumBuckets);
}

public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher,
Timer timer, long tickTimeMillis, Clock clock,
boolean isDelayedDeliveryDeliverAtTimeStrict,
BucketSnapshotStorage bucketSnapshotStorage,
long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis,
int maxNumBuckets) {
int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets) {
super(dispatcher, timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict);
this.minIndexCountPerBucket = minIndexCountPerBucket;
this.timeStepPerBucketSnapshotSegmentInMillis = timeStepPerBucketSnapshotSegmentInMillis;
this.maxIndexesPerBucketSnapshotSegment = maxIndexesPerBucketSnapshotSegment;
this.maxNumBuckets = maxNumBuckets;
this.sharedBucketPriorityQueue = new TripleLongPriorityQueue();
this.immutableBuckets = TreeRangeMap.create();
Expand Down Expand Up @@ -292,7 +296,9 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver
&& lastMutableBucket.size() >= minIndexCountPerBucket
&& !lastMutableBucket.isEmpty()) {
Pair<ImmutableBucket, DelayedIndex> immutableBucketDelayedIndexPair =
lastMutableBucket.sealBucketAndAsyncPersistent(this.timeStepPerBucketSnapshotSegmentInMillis,
lastMutableBucket.sealBucketAndAsyncPersistent(
this.timeStepPerBucketSnapshotSegmentInMillis,
this.maxIndexesPerBucketSnapshotSegment,
this.sharedBucketPriorityQueue);
afterCreateImmutableBucket(immutableBucketDelayedIndexPair);
lastMutableBucket.resetLastMutableBucketRange();
Expand Down Expand Up @@ -380,8 +386,9 @@ private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot(ImmutableB
.thenAccept(combinedDelayedIndexQueue -> {
Pair<ImmutableBucket, DelayedIndex> immutableBucketDelayedIndexPair =
lastMutableBucket.createImmutableBucketAndAsyncPersistent(
timeStepPerBucketSnapshotSegmentInMillis, sharedBucketPriorityQueue,
combinedDelayedIndexQueue, bucketA.startLedgerId, bucketB.endLedgerId);
timeStepPerBucketSnapshotSegmentInMillis, maxIndexesPerBucketSnapshotSegment,
sharedBucketPriorityQueue, combinedDelayedIndexQueue, bucketA.startLedgerId,
bucketB.endLedgerId);

// Merge bit map to new bucket
Map<Long, RoaringBitmap> delayedIndexBitMapA = bucketA.getDelayedIndexBitMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,15 @@ class MutableBucket extends Bucket implements AutoCloseable {

Pair<ImmutableBucket, DelayedIndex> sealBucketAndAsyncPersistent(
long timeStepPerBucketSnapshotSegment,
int maxIndexesPerBucketSnapshotSegment,
TripleLongPriorityQueue sharedQueue) {
return createImmutableBucketAndAsyncPersistent(timeStepPerBucketSnapshotSegment, sharedQueue,
return createImmutableBucketAndAsyncPersistent(timeStepPerBucketSnapshotSegment,
maxIndexesPerBucketSnapshotSegment, sharedQueue,
TripleLongPriorityDelayedIndexQueue.wrap(priorityQueue), startLedgerId, endLedgerId);
}

Pair<ImmutableBucket, DelayedIndex> createImmutableBucketAndAsyncPersistent(
final long timeStepPerBucketSnapshotSegment,
final long timeStepPerBucketSnapshotSegment, final int maxIndexesPerBucketSnapshotSegment,
TripleLongPriorityQueue sharedQueue, DelayedIndexQueue delayedIndexQueue, final long startLedgerId,
final long endLedgerId) {
log.info("[{}] Creating bucket snapshot, startLedgerId: {}, endLedgerId: {}", dispatcherName,
Expand Down Expand Up @@ -98,7 +100,9 @@ Pair<ImmutableBucket, DelayedIndex> createImmutableBucketAndAsyncPersistent(

snapshotSegmentBuilder.addIndexes(delayedIndex);

if (delayedIndexQueue.isEmpty() || delayedIndexQueue.peek().getTimestamp() > currentTimestampUpperLimit) {
if (delayedIndexQueue.isEmpty() || delayedIndexQueue.peek().getTimestamp() > currentTimestampUpperLimit
|| (maxIndexesPerBucketSnapshotSegment != -1
&& snapshotSegmentBuilder.getIndexesCount() >= maxIndexesPerBucketSnapshotSegment)) {
segmentMetadataBuilder.setMaxScheduleTimestamp(timestamp);
currentTimestampUpperLimit = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public Object[][] provider(Method method) throws Exception {
return switch (methodName) {
case "test" -> new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 1, clock,
false, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50)
false, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1, 50)
}};
case "testWithTimer" -> {
Timer timer = mock(Timer.class);
Expand Down Expand Up @@ -113,39 +113,43 @@ public Object[][] provider(Method method) throws Exception {

yield new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 1, clock,
false, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50),
false, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1, 50),
tasks
}};
}
case "testAddWithinTickTime" -> new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 100, clock,
false, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50)
false, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1, 50)
}};
case "testAddMessageWithStrictDelay" -> new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 100, clock,
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50)
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1, 50)
}};
case "testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict" -> new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 1000, clock,
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50)
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1, 50)
}};
case "testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict", "testRecoverSnapshot" ->
new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 100000, clock,
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50)
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1, 50)
}};
case "testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict", "testExistDelayedMessage" ->
new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 500, clock,
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50)
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1, 50)
}};
case "testMergeSnapshot", "testWithBkException", "testWithCreateFailDowngrade" -> new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 100000, clock,
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 10)
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1, 10)
}};
case "testMaxIndexesPerSegment" -> new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 100000, clock,
true, bucketSnapshotStorage, 20, TimeUnit.HOURS.toMillis(1), 5, 100)
}};
default -> new Object[][]{{
new BucketDelayedDeliveryTracker(dispatcher, timer, 1, clock,
true, bucketSnapshotStorage, 1000, TimeUnit.MILLISECONDS.toMillis(100), 50)
true, bucketSnapshotStorage, 1000, TimeUnit.MILLISECONDS.toMillis(100), -1, 50)
}};
};
}
Expand Down Expand Up @@ -196,7 +200,7 @@ public void testRecoverSnapshot(BucketDelayedDeliveryTracker tracker) {
clockTime.set(30 * 10);

tracker = new BucketDelayedDeliveryTracker(dispatcher, timer, 1000, clock,
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50);
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1,50);

assertFalse(tracker.containsMessage(101, 101));
assertEquals(tracker.getNumberOfDelayedMessages(), 70);
Expand Down Expand Up @@ -268,7 +272,7 @@ public void testMergeSnapshot(BucketDelayedDeliveryTracker tracker) {
tracker.close();

tracker = new BucketDelayedDeliveryTracker(dispatcher, timer, 1000, clock,
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 10);
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1,10);

assertEquals(tracker.getNumberOfDelayedMessages(), delayedMessagesInSnapshot.getValue());

Expand Down Expand Up @@ -322,7 +326,7 @@ public void testWithBkException(BucketDelayedDeliveryTracker tracker) {
tracker.close();

tracker = new BucketDelayedDeliveryTracker(dispatcher, timer, 1000, clock,
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 10);
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), -1,10);

Long delayedMessagesInSnapshotValue = delayedMessagesInSnapshot.getValue();
assertEquals(tracker.getNumberOfDelayedMessages(), delayedMessagesInSnapshotValue);
Expand Down Expand Up @@ -378,4 +382,17 @@ public void testWithCreateFailDowngrade(BucketDelayedDeliveryTracker tracker) {
assertEquals(position, PositionImpl.get(i, i));
}
}

@Test(dataProvider = "delayedTracker")
public void testMaxIndexesPerSegment(BucketDelayedDeliveryTracker tracker) {
for (int i = 1; i <= 101; i++) {
tracker.addMessage(i, i, i * 10);
}

assertEquals(tracker.getImmutableBuckets().asMapOfRanges().size(), 5);

tracker.getImmutableBuckets().asMapOfRanges().forEach((k, bucket) -> {
assertEquals(bucket.getLastSegmentEntryId(), 4);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public void setup() throws Exception {
conf.setDelayedDeliveryTrackerFactoryClassName(BucketDelayedDeliveryTrackerFactory.class.getName());
conf.setDelayedDeliveryMaxNumBuckets(10);
conf.setDelayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds(1);
conf.setDelayedDeliveryMaxIndexesPerBucketSnapshotSegment(10);
conf.setDelayedDeliveryMinIndexCountPerBucket(50);
conf.setManagedLedgerMaxEntriesPerLedger(50);
conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
Expand Down