Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to configure and disable the size of lookahead for detecting fixed delays in messages #17907

Merged
merged 1 commit into from
Oct 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,12 @@ delayedDeliveryMaxNumBuckets=50
# Enable share the delayed message index across subscriptions
delayedDeliverySharedIndexEnabled=false

# Size of the lookahead window to use when detecting if all the messages in the topic
# have a fixed delay.
# Default is 50,000. Setting the lookahead window to 0 will disable the logic to handle
# fixed delays in messages in a different way.
delayedDeliveryFixedDelayDetectionLookahead=50000

# Whether to enable acknowledge of batch local index.
acknowledgmentAtBatchIndexLevelEnabled=false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,12 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se
@FieldContext(category = CATEGORY_SERVER, doc = "Enable share the delayed message index across subscriptions")
private boolean delayedDeliverySharedIndexEnabled = false;

@FieldContext(category = CATEGORY_SERVER, doc = "Size of the lookahead window to use "
+ "when detecting if all the messages in the topic have a fixed delay. "
+ "Default is 50,000. Setting the lookahead window to 0 will disable the "
+ "logic to handle fixed delays in messages in a different way.")
private long delayedDeliveryFixedDelayDetectionLookahead = 50_000;

@FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the acknowledge of batch local index")
private boolean acknowledgmentAtBatchIndexLevelEnabled = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
// always going to be in FIFO order, then we can avoid pulling all the messages in
// tracker. Instead, we use the lookahead for detection and pause the read from
// the cursor if the delays are fixed.
public static final long DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES = 50_000;
private final long fixedDelayDetectionLookahead;

// This is the timestamp of the message with the highest delivery time
// If new added messages are lower than this, it means the delivery is requested
Expand All @@ -70,17 +70,22 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
private boolean messagesHaveFixedDelay = true;

InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict) {
this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict);
boolean isDelayedDeliveryDeliverAtTimeStrict,
long fixedDelayDetectionLookahead) {
this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict,
fixedDelayDetectionLookahead);
}

InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer,
long tickTimeMillis, Clock clock, boolean isDelayedDeliveryDeliverAtTimeStrict) {
long tickTimeMillis, Clock clock,
boolean isDelayedDeliveryDeliverAtTimeStrict,
long fixedDelayDetectionLookahead) {
this.dispatcher = dispatcher;
this.timer = timer;
this.tickTimeMillis = tickTimeMillis;
this.clock = clock;
this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict;
this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
}

/**
Expand Down Expand Up @@ -283,8 +288,9 @@ public void close() {
@Override
public boolean shouldPauseAllDeliveries() {
// Pause deliveries if we know all delays are fixed within the lookahead window
return messagesHaveFixedDelay
&& priorityQueue.size() >= DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES
return fixedDelayDetectionLookahead > 0
&& messagesHaveFixedDelay
&& priorityQueue.size() >= fixedDelayDetectionLookahead
&& !hasMessageAvailable();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,21 @@ public class InMemoryDelayedDeliveryTrackerFactory implements DelayedDeliveryTra

private boolean isDelayedDeliveryDeliverAtTimeStrict;

private long fixedDelayDetectionLookahead;

@Override
public void initialize(ServiceConfiguration config) {
this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-delayed-delivery"),
config.getDelayedDeliveryTickTimeMillis(), TimeUnit.MILLISECONDS);
this.tickTimeMillis = config.getDelayedDeliveryTickTimeMillis();
this.isDelayedDeliveryDeliverAtTimeStrict = config.isDelayedDeliveryDeliverAtTimeStrict();
this.fixedDelayDetectionLookahead = config.getDelayedDeliveryFixedDelayDetectionLookahead();
}

@Override
public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
return new InMemoryDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis,
isDelayedDeliveryDeliverAtTimeStrict);
isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void test() throws Exception {

@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
false);
false, 0);

assertFalse(tracker.hasMessageAvailable());

Expand Down Expand Up @@ -146,7 +146,7 @@ public void testWithTimer() throws Exception {

@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
false);
false, 0);

assertTrue(tasks.isEmpty());
assertTrue(tracker.addMessage(2, 2, 20));
Expand Down Expand Up @@ -187,7 +187,7 @@ public void testAddWithinTickTime() {

@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock,
false);
false, 0);

clockTime.set(0);

Expand All @@ -209,7 +209,7 @@ public void testAddMessageWithStrictDelay() {

@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock,
true);
true, 0);

clockTime.set(10);

Expand All @@ -236,7 +236,7 @@ public void testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithSt
// Use a short tick time to show that the timer task is run based on the deliverAt time in this scenario.
@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer,
1000, clock, true);
1000, clock, true, 0);

// Set clock time, then run tracker to inherit clock time as the last tick time.
clockTime.set(10000);
Expand Down Expand Up @@ -274,7 +274,7 @@ public void testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStr
// a previous tick run.
@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer,
100000, clock, true);
100000, clock, true, 0);

clockTime.set(500000);

Expand All @@ -299,7 +299,7 @@ public void testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict() throws
// Use a short tick time to show that the timer task is run based on the deliverAt time in this scenario.
@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer,
500, clock, true);
500, clock, true, 0);

clockTime.set(0);

Expand All @@ -323,9 +323,11 @@ public void testWithFixedDelays() throws Exception {
Clock clock = mock(Clock.class);
when(clock.millis()).then(x -> clockTime.get());

final long fixedDelayLookahead = 100;

@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
true);
true, fixedDelayLookahead);

assertFalse(tracker.hasMessageAvailable());

Expand All @@ -339,13 +341,13 @@ public void testWithFixedDelays() throws Exception {
assertEquals(tracker.getNumberOfDelayedMessages(), 5);
assertFalse(tracker.shouldPauseAllDeliveries());

for (int i = 6; i <= InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) {
for (int i = 6; i <= fixedDelayLookahead; i++) {
assertTrue(tracker.addMessage(i, i, i * 10));
}

assertTrue(tracker.shouldPauseAllDeliveries());

clockTime.set(InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES * 10);
clockTime.set(fixedDelayLookahead * 10);

tracker.getScheduledMessages(100);
assertFalse(tracker.shouldPauseAllDeliveries());
Expand All @@ -367,9 +369,11 @@ public void testWithMixedDelays() throws Exception {
Clock clock = mock(Clock.class);
when(clock.millis()).then(x -> clockTime.get());

long fixedDelayLookahead = 100;

@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
true);
true, fixedDelayLookahead);

assertFalse(tracker.hasMessageAvailable());

Expand All @@ -381,7 +385,7 @@ public void testWithMixedDelays() throws Exception {

assertFalse(tracker.shouldPauseAllDeliveries());

for (int i = 6; i <= InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) {
for (int i = 6; i <= fixedDelayLookahead; i++) {
assertTrue(tracker.addMessage(i, i, i * 10));
}

Expand All @@ -401,9 +405,11 @@ public void testWithNoDelays() throws Exception {
Clock clock = mock(Clock.class);
when(clock.millis()).then(x -> clockTime.get());

long fixedDelayLookahead = 100;

@Cleanup
InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock,
true);
true, fixedDelayLookahead);

assertFalse(tracker.hasMessageAvailable());

Expand All @@ -415,7 +421,7 @@ public void testWithNoDelays() throws Exception {

assertFalse(tracker.shouldPauseAllDeliveries());

for (int i = 6; i <= InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES; i++) {
for (int i = 6; i <= fixedDelayLookahead; i++) {
assertTrue(tracker.addMessage(i, i, i * 10));
}

Expand Down