diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/AbstractDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/AbstractDelayedDeliveryTracker.java new file mode 100644 index 00000000000000..5c99e4c307d7cd --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/AbstractDelayedDeliveryTracker.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import io.netty.util.Timeout; +import io.netty.util.Timer; +import io.netty.util.TimerTask; +import java.time.Clock; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; + +@Slf4j +public abstract class AbstractDelayedDeliveryTracker implements DelayedDeliveryTracker, TimerTask { + + protected final PersistentDispatcherMultipleConsumers dispatcher; + + // Reference to the shared (per-broker) timer for delayed delivery + protected final Timer timer; + + // Current timeout or null if not set + protected Timeout timeout; + + // Timestamp at which the timeout is currently set + private long currentTimeoutTarget; + + // Last time the TimerTask was triggered for this class + private long lastTickRun; + + protected long tickTimeMillis; + + protected final Clock clock; + + private final boolean isDelayedDeliveryDeliverAtTimeStrict; + + public AbstractDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, + long tickTimeMillis, + boolean isDelayedDeliveryDeliverAtTimeStrict) { + this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict); + } + + public AbstractDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, + long tickTimeMillis, Clock clock, + boolean isDelayedDeliveryDeliverAtTimeStrict) { + this.dispatcher = dispatcher; + this.timer = timer; + this.tickTimeMillis = tickTimeMillis; + this.clock = clock; + this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict; + } + + + /** + * When {@link #isDelayedDeliveryDeliverAtTimeStrict} is false, we allow for early delivery by as much as the + * {@link #tickTimeMillis} because it is a slight optimization to let messages skip going back into the delay + * tracker for a brief amount of time when we're already trying to dispatch to the consumer. + * + * When {@link #isDelayedDeliveryDeliverAtTimeStrict} is true, we use the current time to determine when messages + * can be delivered. As a consequence, there are two delays that will affect delivery. The first is the + * {@link #tickTimeMillis} and the second is the {@link Timer}'s granularity. + * + * @return the cutoff time to determine whether a message is ready to deliver to the consumer + */ + protected long getCutoffTime() { + return isDelayedDeliveryDeliverAtTimeStrict ? clock.millis() : clock.millis() + tickTimeMillis; + } + + public void resetTickTime(long tickTime) { + if (this.tickTimeMillis != tickTime) { + this.tickTimeMillis = tickTime; + } + } + + protected void updateTimer() { + if (getNumberOfDelayedMessages() == 0) { + if (timeout != null) { + currentTimeoutTarget = -1; + timeout.cancel(); + timeout = null; + } + return; + } + long timestamp = nextDeliveryTime(); + if (timestamp == currentTimeoutTarget) { + // The timer is already set to the correct target time + return; + } + + if (timeout != null) { + timeout.cancel(); + } + + long now = clock.millis(); + long delayMillis = timestamp - now; + + if (delayMillis < 0) { + // There are messages that are already ready to be delivered. If + // the dispatcher is not getting them is because the consumer is + // either not connected or slow. + // We don't need to keep retriggering the timer. When the consumer + // catches up, the dispatcher will do the readMoreEntries() and + // get these messages + return; + } + + // Compute the earliest time that we schedule the timer to run. + long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now; + long calculatedDelayMillis = Math.max(delayMillis, remainingTickDelayMillis); + + if (log.isDebugEnabled()) { + log.debug("[{}] Start timer in {} millis", dispatcher.getName(), calculatedDelayMillis); + } + + // Even though we may delay longer than this timestamp because of the tick delay, we still track the + // current timeout with reference to the next message's timestamp. + currentTimeoutTarget = timestamp; + timeout = timer.newTimeout(this, calculatedDelayMillis, TimeUnit.MILLISECONDS); + } + + @Override + public void run(Timeout timeout) throws Exception { + if (log.isDebugEnabled()) { + log.debug("[{}] Timer triggered", dispatcher.getName()); + } + if (timeout == null || timeout.isCancelled()) { + return; + } + + synchronized (dispatcher) { + lastTickRun = clock.millis(); + currentTimeoutTarget = -1; + this.timeout = null; + dispatcher.readMoreEntries(); + } + } + + @Override + public void close() { + if (timeout != null) { + timeout.cancel(); + timeout = null; + } + } + + protected abstract long nextDeliveryTime(); +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index 58c86deb410e12..f55d5fd11694bf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -19,13 +19,10 @@ package org.apache.pulsar.broker.delayed; import com.google.common.annotations.VisibleForTesting; -import io.netty.util.Timeout; import io.netty.util.Timer; -import io.netty.util.TimerTask; import java.time.Clock; import java.util.NavigableSet; import java.util.TreeSet; -import java.util.concurrent.TimeUnit; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -33,30 +30,10 @@ import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue; @Slf4j -public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, TimerTask { +public class InMemoryDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker { protected final TripleLongPriorityQueue priorityQueue = new TripleLongPriorityQueue(); - protected final PersistentDispatcherMultipleConsumers dispatcher; - - // Reference to the shared (per-broker) timer for delayed delivery - private final Timer timer; - - // Current timeout or null if not set - protected Timeout timeout; - - // Timestamp at which the timeout is currently set - private long currentTimeoutTarget; - - // Last time the TimerTask was triggered for this class - private long lastTickRun; - - protected long tickTimeMillis; - - protected final Clock clock; - - private final boolean isDelayedDeliveryDeliverAtTimeStrict; - // If we detect that all messages have fixed delay time, such that the delivery is // always going to be in FIFO order, then we can avoid pulling all the messages in // tracker. Instead, we use the lookahead for detection and pause the read from @@ -84,29 +61,10 @@ public InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers disp long tickTimeMillis, Clock clock, boolean isDelayedDeliveryDeliverAtTimeStrict, long fixedDelayDetectionLookahead) { - this.dispatcher = dispatcher; - this.timer = timer; - this.tickTimeMillis = tickTimeMillis; - this.clock = clock; - this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict; + super(dispatcher, timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict); this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead; } - /** - * When {@link #isDelayedDeliveryDeliverAtTimeStrict} is false, we allow for early delivery by as much as the - * {@link #tickTimeMillis} because it is a slight optimization to let messages skip going back into the delay - * tracker for a brief amount of time when we're already trying to dispatch to the consumer. - * - * When {@link #isDelayedDeliveryDeliverAtTimeStrict} is true, we use the current time to determine when messages - * can be delivered. As a consequence, there are two delays that will affect delivery. The first is the - * {@link #tickTimeMillis} and the second is the {@link Timer}'s granularity. - * - * @return the cutoff time to determine whether a message is ready to deliver to the consumer - */ - protected long getCutoffTime() { - return isDelayedDeliveryDeliverAtTimeStrict ? clock.millis() : clock.millis() + tickTimeMillis; - } - @Override public boolean addMessage(long ledgerId, long entryId, long deliverAt) { if (deliverAt < 0 || deliverAt <= getCutoffTime()) { @@ -119,7 +77,6 @@ public boolean addMessage(long ledgerId, long entryId, long deliverAt) { deliverAt - clock.millis()); } - priorityQueue.add(deliverAt, ledgerId, entryId); updateTimer(); @@ -189,14 +146,6 @@ public NavigableSet getScheduledMessages(int maxMessages) { return positions; } - @Override - public void resetTickTime(long tickTime) { - - if (this.tickTimeMillis != tickTime) { - this.tickTimeMillis = tickTime; - } - } - @Override public void clear() { this.priorityQueue.clear(); @@ -212,87 +161,10 @@ public long getBufferMemoryUsage() { return priorityQueue.bytesCapacity(); } - /** - * Update the scheduled timer task such that: - * 1. If there are no delayed messages, return and do not schedule a timer task. - * 2. If the next message in the queue has the same deliverAt time as the timer task, return and leave existing - * timer task in place. - * 3. If the deliverAt time for the next delayed message has already passed (i.e. the delay is negative), return - * without scheduling a timer task since the subscription is backlogged. - * 4. Else, schedule a timer task where the delay is the greater of these two: the next message's deliverAt time or - * the last tick time plus the tickTimeMillis (to ensure we do not schedule the task more frequently than the - * tickTimeMillis). - */ - protected void updateTimer() { - if (getNumberOfDelayedMessages() == 0) { - if (timeout != null) { - currentTimeoutTarget = -1; - timeout.cancel(); - timeout = null; - } - return; - } - long timestamp = nextDeliveryTime(); - if (timestamp == currentTimeoutTarget) { - // The timer is already set to the correct target time - return; - } - - if (timeout != null) { - timeout.cancel(); - } - - long now = clock.millis(); - long delayMillis = timestamp - now; - - if (delayMillis < 0) { - // There are messages that are already ready to be delivered. If - // the dispatcher is not getting them is because the consumer is - // either not connected or slow. - // We don't need to keep retriggering the timer. When the consumer - // catches up, the dispatcher will do the readMoreEntries() and - // get these messages - return; - } - - // Compute the earliest time that we schedule the timer to run. - long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now; - long calculatedDelayMillis = Math.max(delayMillis, remainingTickDelayMillis); - - if (log.isDebugEnabled()) { - log.debug("[{}] Start timer in {} millis", dispatcher.getName(), calculatedDelayMillis); - } - - // Even though we may delay longer than this timestamp because of the tick delay, we still track the - // current timeout with reference to the next message's timestamp. - currentTimeoutTarget = timestamp; - timeout = timer.newTimeout(this, calculatedDelayMillis, TimeUnit.MILLISECONDS); - } - - @Override - public void run(Timeout timeout) throws Exception { - if (log.isDebugEnabled()) { - log.debug("[{}] Timer triggered", dispatcher.getName()); - } - if (timeout == null || timeout.isCancelled()) { - return; - } - - synchronized (dispatcher) { - lastTickRun = clock.millis(); - currentTimeoutTarget = -1; - this.timeout = null; - dispatcher.readMoreEntries(); - } - } - @Override public void close() { + super.close(); priorityQueue.close(); - if (timeout != null) { - timeout.cancel(); - timeout = null; - } } @Override @@ -309,9 +181,6 @@ public boolean containsMessage(long ledgerId, long entryId) { return false; } - protected TripleLongPriorityQueue getPriorityQueue() { - return priorityQueue; - } protected long nextDeliveryTime() { return priorityQueue.peekN1(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java index 25ff9d033e595b..fbd6d765705d40 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java @@ -36,7 +36,7 @@ @Slf4j @Data @AllArgsConstructor -public abstract class Bucket { +abstract class Bucket { static final String DELAYED_BUCKET_KEY_PREFIX = "#pulsar.internal.delayed.bucket"; static final String DELIMITER = "_"; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index 71b554fef41d5c..47347d582d4827 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -28,8 +28,8 @@ import io.netty.util.Timer; import java.time.Clock; import java.util.Iterator; +import java.util.NavigableSet; import java.util.Optional; -import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -40,7 +40,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.tuple.Pair; -import org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTracker; +import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker; import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat; import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; @@ -48,7 +48,7 @@ @Slf4j @ThreadSafe -public class BucketDelayedDeliveryTracker extends InMemoryDelayedDeliveryTracker { +public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker { static final int AsyncOperationTimeoutSeconds = 30; @@ -84,35 +84,16 @@ public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispat BucketSnapshotStorage bucketSnapshotStorage, long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegment, int maxNumBuckets) { - super(dispatcher, timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict, - -1L); + super(dispatcher, timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict); this.minIndexCountPerBucket = minIndexCountPerBucket; this.timeStepPerBucketSnapshotSegment = timeStepPerBucketSnapshotSegment; this.maxNumBuckets = maxNumBuckets; - ManagedCursor cursor = dispatcher.getCursor(); this.sharedBucketPriorityQueue = new TripleLongPriorityQueue(); this.immutableBuckets = TreeRangeMap.create(); this.snapshotSegmentLastIndexTable = HashBasedTable.create(); - this.numberDelayedMessages = 0L; - - this.lastMutableBucket = new MutableBucket(cursor, bucketSnapshotStorage, super.getPriorityQueue()); - } - - private void moveScheduledMessageToSharedQueue(long cutoffTime) { - TripleLongPriorityQueue priorityQueue = getPriorityQueue(); - while (!priorityQueue.isEmpty()) { - long timestamp = priorityQueue.peekN1(); - if (timestamp > cutoffTime) { - break; - } - - long ledgerId = priorityQueue.peekN2(); - long entryId = priorityQueue.peekN3(); - sharedBucketPriorityQueue.add(timestamp, ledgerId, entryId); - - priorityQueue.pop(); - } + ManagedCursor cursor = dispatcher.getCursor(); + this.lastMutableBucket = new MutableBucket(cursor, bucketSnapshotStorage); } @Override @@ -121,7 +102,7 @@ public void run(Timeout timeout) throws Exception { if (timeout == null || timeout.isCancelled()) { return; } - moveScheduledMessageToSharedQueue(getCutoffTime()); + lastMutableBucket.moveScheduledMessageToSharedQueue(getCutoffTime(), sharedBucketPriorityQueue); } super.run(timeout); } @@ -166,13 +147,14 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver boolean existBucket = findImmutableBucket(ledgerId).isPresent(); // Create bucket snapshot - if (ledgerId > lastMutableBucket.endLedgerId && !getPriorityQueue().isEmpty()) { - if (getPriorityQueue().size() >= minIndexCountPerBucket && !existBucket) { - sealBucket(); - lastMutableBucket.resetLastMutableBucketRange(); - if (immutableBuckets.asMapOfRanges().size() > maxNumBuckets) { - // TODO merge bucket snapshot (synchronize operate) - } + if (!existBucket && ledgerId > lastMutableBucket.endLedgerId + && lastMutableBucket.size() >= minIndexCountPerBucket + && !lastMutableBucket.isEmpty()) { + sealBucket(); + lastMutableBucket.resetLastMutableBucketRange(); + + if (immutableBuckets.asMapOfRanges().size() > maxNumBuckets) { + // TODO merge bucket snapshot (synchronize operate) } } @@ -183,7 +165,7 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver } else { checkArgument(ledgerId >= lastMutableBucket.endLedgerId); - getPriorityQueue().add(deliverAt, ledgerId, entryId); + lastMutableBucket.addMessage(ledgerId, entryId, deliverAt); if (lastMutableBucket.startLedgerId == -1L) { lastMutableBucket.setStartLedgerId(ledgerId); @@ -217,12 +199,12 @@ public synchronized boolean hasMessageAvailable() { @Override protected long nextDeliveryTime() { - if (getPriorityQueue().isEmpty() && !sharedBucketPriorityQueue.isEmpty()) { + if (lastMutableBucket.isEmpty() && !sharedBucketPriorityQueue.isEmpty()) { return sharedBucketPriorityQueue.peekN1(); - } else if (sharedBucketPriorityQueue.isEmpty() && !getPriorityQueue().isEmpty()) { - return getPriorityQueue().peekN1(); + } else if (sharedBucketPriorityQueue.isEmpty() && !lastMutableBucket.isEmpty()) { + return lastMutableBucket.nextDeliveryTime(); } - long timestamp = getPriorityQueue().peekN1(); + long timestamp = lastMutableBucket.nextDeliveryTime(); long bucketTimestamp = sharedBucketPriorityQueue.peekN1(); return Math.min(timestamp, bucketTimestamp); } @@ -234,16 +216,16 @@ public synchronized long getNumberOfDelayedMessages() { @Override public synchronized long getBufferMemoryUsage() { - return getPriorityQueue().bytesCapacity() + sharedBucketPriorityQueue.bytesCapacity(); + return this.lastMutableBucket.getBufferMemoryUsage() + sharedBucketPriorityQueue.bytesCapacity(); } @Override - public synchronized Set getScheduledMessages(int maxMessages) { + public synchronized NavigableSet getScheduledMessages(int maxMessages) { long cutoffTime = getCutoffTime(); - moveScheduledMessageToSharedQueue(cutoffTime); + lastMutableBucket.moveScheduledMessageToSharedQueue(cutoffTime, sharedBucketPriorityQueue); - Set positions = new TreeSet<>(); + NavigableSet positions = new TreeSet<>(); int n = maxMessages; while (n > 0 && !sharedBucketPriorityQueue.isEmpty()) { @@ -302,7 +284,6 @@ public boolean shouldPauseAllDeliveries() { @Override public synchronized void clear() { - super.clear(); cleanImmutableBuckets(true); sharedBucketPriorityQueue.clear(); lastMutableBucket.clear(); @@ -313,6 +294,7 @@ public synchronized void clear() { @Override public synchronized void close() { super.close(); + lastMutableBucket.close(); cleanImmutableBuckets(false); sharedBucketPriorityQueue.close(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java index 4673809105b048..23d6b131624351 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java @@ -38,14 +38,14 @@ import org.roaringbitmap.RoaringBitmap; @Slf4j -public class MutableBucket extends Bucket { +class MutableBucket extends Bucket implements AutoCloseable { private final TripleLongPriorityQueue priorityQueue; MutableBucket(ManagedCursor cursor, - BucketSnapshotStorage bucketSnapshotStorage, TripleLongPriorityQueue priorityQueue) { + BucketSnapshotStorage bucketSnapshotStorage) { super(cursor, bucketSnapshotStorage, -1L, -1L); - this.priorityQueue = priorityQueue; + this.priorityQueue = new TripleLongPriorityQueue(); } Pair sealBucketAndAsyncPersistent( @@ -147,6 +147,21 @@ Pair sealBucketAndAsyncPersistent( return result; } + void moveScheduledMessageToSharedQueue(long cutoffTime, TripleLongPriorityQueue sharedBucketPriorityQueue) { + while (!priorityQueue.isEmpty()) { + long timestamp = priorityQueue.peekN1(); + if (timestamp > cutoffTime) { + break; + } + + long ledgerId = priorityQueue.peekN2(); + long entryId = priorityQueue.peekN3(); + sharedBucketPriorityQueue.add(timestamp, ledgerId, entryId); + + priorityQueue.pop(); + } + } + void resetLastMutableBucketRange() { this.setStartLedgerId(-1L); this.setEndLedgerId(-1L); @@ -156,4 +171,28 @@ void clear() { this.resetLastMutableBucketRange(); this.delayedIndexBitMap.clear(); } + + public void close() { + priorityQueue.close(); + } + + long getBufferMemoryUsage() { + return priorityQueue.bytesCapacity(); + } + + boolean isEmpty() { + return priorityQueue.isEmpty(); + } + + long nextDeliveryTime() { + return priorityQueue.peekN1(); + } + + long size() { + return priorityQueue.size(); + } + + void addMessage(long ledgerId, long entryId, long deliverAt) { + priorityQueue.add(deliverAt, ledgerId, entryId); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/AbstractDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/AbstractDeliveryTrackerTest.java new file mode 100644 index 00000000000000..1d166a8db5c9ef --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/AbstractDeliveryTrackerTest.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import static org.mockito.Mockito.atMostOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; +import io.netty.util.Timer; +import io.netty.util.TimerTask; +import io.netty.util.concurrent.DefaultThreadFactory; +import java.time.Clock; +import java.util.Collections; +import java.util.NavigableMap; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.Test; + +public abstract class AbstractDeliveryTrackerTest { + + // Create a single shared timer for the test. + protected final Timer timer = + new HashedWheelTimer(new DefaultThreadFactory("pulsar-in-memory-delayed-delivery-test"), + 500, TimeUnit.MILLISECONDS); + protected PersistentDispatcherMultipleConsumers dispatcher; + protected Clock clock; + + protected AtomicLong clockTime; + + @AfterClass(alwaysRun = true) + public void cleanup() { + timer.stop(); + } + + @Test(dataProvider = "delayedTracker") + public void test(DelayedDeliveryTracker tracker) throws Exception { + assertFalse(tracker.hasMessageAvailable()); + + assertTrue(tracker.addMessage(1, 2, 20)); + assertTrue(tracker.addMessage(2, 1, 10)); + assertTrue(tracker.addMessage(3, 3, 30)); + assertTrue(tracker.addMessage(4, 5, 50)); + assertTrue(tracker.addMessage(5, 4, 40)); + + assertFalse(tracker.hasMessageAvailable()); + assertEquals(tracker.getNumberOfDelayedMessages(), 5); + + assertEquals(tracker.getScheduledMessages(10), Collections.emptySet()); + + // Move time forward + clockTime.set(15); + + // Message is rejected by tracker since it's already ready to send + assertFalse(tracker.addMessage(6, 6, 10)); + + assertEquals(tracker.getNumberOfDelayedMessages(), 5); + assertTrue(tracker.hasMessageAvailable()); + Set scheduled = tracker.getScheduledMessages(10); + assertEquals(scheduled.size(), 1); + + // Move time forward + clockTime.set(60); + + assertEquals(tracker.getNumberOfDelayedMessages(), 4); + assertTrue(tracker.hasMessageAvailable()); + scheduled = tracker.getScheduledMessages(1); + assertEquals(scheduled.size(), 1); + + assertEquals(tracker.getNumberOfDelayedMessages(), 3); + assertTrue(tracker.hasMessageAvailable()); + scheduled = tracker.getScheduledMessages(3); + assertEquals(scheduled.size(), 3); + + assertEquals(tracker.getNumberOfDelayedMessages(), 0); + assertFalse(tracker.hasMessageAvailable()); + assertEquals(tracker.getScheduledMessages(10), Collections.emptySet()); + + tracker.close(); + } + + @Test(dataProvider = "delayedTracker") + public void testWithTimer(DelayedDeliveryTracker tracker, NavigableMap tasks) throws Exception { + assertTrue(tasks.isEmpty()); + assertTrue(tracker.addMessage(2, 2, 20)); + assertEquals(tasks.size(), 1); + assertEquals(tasks.firstKey().longValue(), 20); + + assertTrue(tracker.addMessage(1, 1, 10)); + assertEquals(tasks.size(), 1); + assertEquals(tasks.firstKey().longValue(), 10); + + assertTrue(tracker.addMessage(3, 3, 30)); + assertEquals(tasks.size(), 1); + assertEquals(tasks.firstKey().longValue(), 10); + + clockTime.set(15); + + TimerTask task = tasks.pollFirstEntry().getValue(); + Timeout cancelledTimeout = mock(Timeout.class); + when(cancelledTimeout.isCancelled()).thenReturn(true); + task.run(cancelledTimeout); + verify(dispatcher, atMostOnce()).readMoreEntries(); + + task.run(mock(Timeout.class)); + verify(dispatcher).readMoreEntries(); + + tracker.close(); + } + + /** + * Adding a message that is about to expire within the tick time should lead + * to a rejection from the tracker when isDelayedDeliveryDeliverAtTimeStrict is false. + */ + @Test(dataProvider = "delayedTracker") + public void testAddWithinTickTime(DelayedDeliveryTracker tracker) { + clockTime.set(0); + + assertFalse(tracker.addMessage(1, 1, 10)); + assertFalse(tracker.addMessage(2, 2, 99)); + assertFalse(tracker.addMessage(3, 3, 100)); + assertTrue(tracker.addMessage(4, 4, 101)); + assertTrue(tracker.addMessage(5, 5, 200)); + + assertEquals(tracker.getNumberOfDelayedMessages(), 2); + + tracker.close(); + } + + @Test(dataProvider = "delayedTracker") + public void testAddMessageWithStrictDelay(DelayedDeliveryTracker tracker) { + clockTime.set(10); + + // Verify behavior for the less than, equal to, and greater than deliverAt times. + assertFalse(tracker.addMessage(1, 1, 9)); + assertFalse(tracker.addMessage(4, 4, 10)); + assertTrue(tracker.addMessage(1, 1, 11)); + + assertEquals(tracker.getNumberOfDelayedMessages(), 1); + assertFalse(tracker.hasMessageAvailable()); + + tracker.close(); + } + + /** + * In this test, the deliverAt time is after now, but the deliverAt time is too early to run another tick, so the + * tickTimeMillis determines the delay. + */ + @Test(dataProvider = "delayedTracker") + public void testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict(DelayedDeliveryTracker tracker) + throws Exception { + // Set clock time, then run tracker to inherit clock time as the last tick time. + clockTime.set(10000); + Timeout timeout = mock(Timeout.class); + when(timeout.isCancelled()).then(x -> false); + ((AbstractDelayedDeliveryTracker) tracker).run(timeout); + verify(dispatcher, times(1)).readMoreEntries(); + + // Add a message that has a delivery time just after the previous run. It will get delivered based on the + // tick delay plus the last tick run. + assertTrue(tracker.addMessage(1, 1, 10001)); + + // Wait longer than the tick time plus the HashedWheelTimer's tick time to ensure that enough time has + // passed where it would have been triggered if the tick time was doing the triggering. + Thread.sleep(600); + verify(dispatcher, times(1)).readMoreEntries(); + + // Not wait for the message delivery to get triggered. + Awaitility.await().atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> verify(dispatcher).readMoreEntries()); + + tracker.close(); + } + + /** + * In this test, the deliverAt time is after now, but before the (tickTimeMillis + now). Because there wasn't a + * recent tick run, the deliverAt time determines the delay. + */ + @Test(dataProvider = "delayedTracker") + public void testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict( + DelayedDeliveryTracker tracker) { + clockTime.set(500000); + + assertTrue(tracker.addMessage(1, 1, 500005)); + + // Wait long enough for the runnable to run, but not longer than the tick time. The point is that the delivery + // should get scheduled early when the tick duration has passed since the last tick. + Awaitility.await().atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> verify(dispatcher).readMoreEntries()); + + tracker.close(); + } + + /** + * In this test, the deliverAt time is after now plus tickTimeMillis, so the tickTimeMillis determines the delay. + */ + @Test(dataProvider = "delayedTracker") + public void testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict(DelayedDeliveryTracker tracker) + throws Exception { + clockTime.set(0); + + assertTrue(tracker.addMessage(1, 1, 2000)); + + // Wait longer than the tick time plus the HashedWheelTimer's tick time to ensure that enough time has + // passed where it would have been triggered if the tick time was doing the triggering. + Thread.sleep(1000); + + // Not wait for the message delivery to get triggered. + Awaitility.await().atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> verify(dispatcher).readMoreEntries()); + + tracker.close(); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerTest.java index c4fcc5a5ff9c58..331ceb83a99943 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerTest.java @@ -25,11 +25,9 @@ import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; -import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.Timer; import io.netty.util.TimerTask; -import io.netty.util.concurrent.DefaultThreadFactory; import java.lang.reflect.Method; import java.time.Clock; import java.util.NavigableMap; @@ -46,10 +44,8 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -public class BucketDelayedDeliveryTrackerTest extends InMemoryDeliveryTrackerTest { - - private final Timer timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-bucket-delayed-delivery-test"), - 500, TimeUnit.MILLISECONDS); +@Test(groups = "broker") +public class BucketDelayedDeliveryTrackerTest extends AbstractDeliveryTrackerTest { private BucketSnapshotStorage bucketSnapshotStorage; @@ -61,7 +57,6 @@ public void clean() throws Exception { } @DataProvider(name = "delayedTracker") - @Override public Object[][] provider(Method method) throws Exception { dispatcher = mock(PersistentDispatcherMultipleConsumers.class); clock = mock(Clock.class); @@ -161,25 +156,4 @@ public void testContainsMessage(DelayedDeliveryTracker tracker) { tracker.close(); } - - @Override - @Test(dataProvider = "delayedTracker") - public void testWithFixedDelays(InMemoryDelayedDeliveryTracker tracker) throws Exception { - assertEquals(tracker.getFixedDelayDetectionLookahead(), -1L); - tracker.close(); - } - - @Override - @Test(dataProvider = "delayedTracker") - public void testWithMixedDelays(InMemoryDelayedDeliveryTracker tracker) throws Exception { - assertEquals(tracker.getFixedDelayDetectionLookahead(), -1L); - tracker.close(); - } - - @Override - @Test(dataProvider = "delayedTracker") - public void testWithNoDelays(InMemoryDelayedDeliveryTracker tracker) throws Exception { - assertEquals(tracker.getFixedDelayDetectionLookahead(), -1L); - tracker.close(); - } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java index 7d50c2a05cdc68..6711aed924c20b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,17 +18,14 @@ */ package org.apache.pulsar.broker.delayed; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyLong; -import static org.mockito.Mockito.atMostOnce; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; - import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.Timer; @@ -36,34 +33,16 @@ import io.netty.util.concurrent.DefaultThreadFactory; import java.lang.reflect.Method; import java.time.Clock; -import java.util.Collections; import java.util.NavigableMap; -import java.util.Set; import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; -import org.awaitility.Awaitility; -import org.testng.annotations.AfterClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Test(groups = "broker") -public class InMemoryDeliveryTrackerTest { - - // Create a single shared timer for the test. - private final Timer timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-in-memory-delayed-delivery-test"), - 500, TimeUnit.MILLISECONDS); - protected PersistentDispatcherMultipleConsumers dispatcher; - protected Clock clock; - - protected AtomicLong clockTime; - - @AfterClass(alwaysRun = true) - public void cleanup() { - timer.stop(); - } +public class InMemoryDeliveryTrackerTest extends AbstractDeliveryTrackerTest { @DataProvider(name = "delayedTracker") public Object[][] provider(Method method) throws Exception { @@ -139,185 +118,6 @@ public Object[][] provider(Method method) throws Exception { }; } - @Test(dataProvider = "delayedTracker") - public void test(DelayedDeliveryTracker tracker) throws Exception { - assertFalse(tracker.hasMessageAvailable()); - - assertTrue(tracker.addMessage(1, 2, 20)); - assertTrue(tracker.addMessage(2, 1, 10)); - assertTrue(tracker.addMessage(3, 3, 30)); - assertTrue(tracker.addMessage(4, 5, 50)); - assertTrue(tracker.addMessage(5, 4, 40)); - - assertFalse(tracker.hasMessageAvailable()); - assertEquals(tracker.getNumberOfDelayedMessages(), 5); - - assertEquals(tracker.getScheduledMessages(10), Collections.emptySet()); - - // Move time forward - clockTime.set(15); - - // Message is rejected by tracker since it's already ready to send - assertFalse(tracker.addMessage(6, 6, 10)); - - assertEquals(tracker.getNumberOfDelayedMessages(), 5); - assertTrue(tracker.hasMessageAvailable()); - Set scheduled = tracker.getScheduledMessages(10); - assertEquals(scheduled.size(), 1); - - // Move time forward - clockTime.set(60); - - assertEquals(tracker.getNumberOfDelayedMessages(), 4); - assertTrue(tracker.hasMessageAvailable()); - scheduled = tracker.getScheduledMessages(1); - assertEquals(scheduled.size(), 1); - - assertEquals(tracker.getNumberOfDelayedMessages(), 3); - assertTrue(tracker.hasMessageAvailable()); - scheduled = tracker.getScheduledMessages(3); - assertEquals(scheduled.size(), 3); - - assertEquals(tracker.getNumberOfDelayedMessages(), 0); - assertFalse(tracker.hasMessageAvailable()); - assertEquals(tracker.getScheduledMessages(10), Collections.emptySet()); - - tracker.close(); - } - - @Test(dataProvider = "delayedTracker") - public void testWithTimer(DelayedDeliveryTracker tracker, NavigableMap tasks) throws Exception { - assertTrue(tasks.isEmpty()); - assertTrue(tracker.addMessage(2, 2, 20)); - assertEquals(tasks.size(), 1); - assertEquals(tasks.firstKey().longValue(), 20); - - assertTrue(tracker.addMessage(1, 1, 10)); - assertEquals(tasks.size(), 1); - assertEquals(tasks.firstKey().longValue(), 10); - - assertTrue(tracker.addMessage(3, 3, 30)); - assertEquals(tasks.size(), 1); - assertEquals(tasks.firstKey().longValue(), 10); - - clockTime.set(15); - - TimerTask task = tasks.pollFirstEntry().getValue(); - Timeout cancelledTimeout = mock(Timeout.class); - when(cancelledTimeout.isCancelled()).thenReturn(true); - task.run(cancelledTimeout); - verify(dispatcher, atMostOnce()).readMoreEntries(); - - task.run(mock(Timeout.class)); - verify(dispatcher).readMoreEntries(); - - tracker.close(); - } - - /** - * Adding a message that is about to expire within the tick time should lead - * to a rejection from the tracker when isDelayedDeliveryDeliverAtTimeStrict is false. - */ - @Test(dataProvider = "delayedTracker") - public void testAddWithinTickTime(DelayedDeliveryTracker tracker) { - clockTime.set(0); - - assertFalse(tracker.addMessage(1, 1, 10)); - assertFalse(tracker.addMessage(2, 2, 99)); - assertFalse(tracker.addMessage(3, 3, 100)); - assertTrue(tracker.addMessage(4, 4, 101)); - assertTrue(tracker.addMessage(5, 5, 200)); - - assertEquals(tracker.getNumberOfDelayedMessages(), 2); - - tracker.close(); - } - - @Test(dataProvider = "delayedTracker") - public void testAddMessageWithStrictDelay(DelayedDeliveryTracker tracker) { - clockTime.set(10); - - // Verify behavior for the less than, equal to, and greater than deliverAt times. - assertFalse(tracker.addMessage(1, 1, 9)); - assertFalse(tracker.addMessage(4, 4, 10)); - assertTrue(tracker.addMessage(1, 1, 11)); - - assertEquals(tracker.getNumberOfDelayedMessages(), 1); - assertFalse(tracker.hasMessageAvailable()); - - tracker.close(); - } - - /** - * In this test, the deliverAt time is after now, but the deliverAt time is too early to run another tick, so the - * tickTimeMillis determines the delay. - */ - @Test(dataProvider = "delayedTracker") - public void testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict(DelayedDeliveryTracker tracker) - throws Exception { - // Set clock time, then run tracker to inherit clock time as the last tick time. - clockTime.set(10000); - Timeout timeout = mock(Timeout.class); - when(timeout.isCancelled()).then(x -> false); - ((InMemoryDelayedDeliveryTracker) tracker).run(timeout); - verify(dispatcher, times(1)).readMoreEntries(); - - // Add a message that has a delivery time just after the previous run. It will get delivered based on the - // tick delay plus the last tick run. - assertTrue(tracker.addMessage(1, 1, 10001)); - - // Wait longer than the tick time plus the HashedWheelTimer's tick time to ensure that enough time has - // passed where it would have been triggered if the tick time was doing the triggering. - Thread.sleep(600); - verify(dispatcher, times(1)).readMoreEntries(); - - // Not wait for the message delivery to get triggered. - Awaitility.await().atMost(10, TimeUnit.SECONDS) - .untilAsserted(() -> verify(dispatcher).readMoreEntries()); - - tracker.close(); - } - - /** - * In this test, the deliverAt time is after now, but before the (tickTimeMillis + now). Because there wasn't a - * recent tick run, the deliverAt time determines the delay. - */ - @Test(dataProvider = "delayedTracker") - public void testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict( - DelayedDeliveryTracker tracker) { - clockTime.set(500000); - - assertTrue(tracker.addMessage(1, 1, 500005)); - - // Wait long enough for the runnable to run, but not longer than the tick time. The point is that the delivery - // should get scheduled early when the tick duration has passed since the last tick. - Awaitility.await().atMost(10, TimeUnit.SECONDS) - .untilAsserted(() -> verify(dispatcher).readMoreEntries()); - - tracker.close(); - } - - /** - * In this test, the deliverAt time is after now plus tickTimeMillis, so the tickTimeMillis determines the delay. - */ - @Test(dataProvider = "delayedTracker") - public void testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict(DelayedDeliveryTracker tracker) - throws Exception { - clockTime.set(0); - - assertTrue(tracker.addMessage(1, 1, 2000)); - - // Wait longer than the tick time plus the HashedWheelTimer's tick time to ensure that enough time has - // passed where it would have been triggered if the tick time was doing the triggering. - Thread.sleep(1000); - - // Not wait for the message delivery to get triggered. - Awaitility.await().atMost(10, TimeUnit.SECONDS) - .untilAsserted(() -> verify(dispatcher).readMoreEntries()); - - tracker.close(); - } - @Test(dataProvider = "delayedTracker") public void testWithFixedDelays(InMemoryDelayedDeliveryTracker tracker) throws Exception { assertFalse(tracker.hasMessageAvailable()); @@ -407,4 +207,45 @@ public void testWithNoDelays(InMemoryDelayedDeliveryTracker tracker) throws Exce tracker.close(); } + @Test + public void testClose() throws Exception { + Timer timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-in-memory-delayed-delivery-test"), + 1, TimeUnit.MILLISECONDS); + + PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class); + + AtomicLong clockTime = new AtomicLong(); + Clock clock = mock(Clock.class); + when(clock.millis()).then(x -> clockTime.get()); + + final Exception[] exceptions = new Exception[1]; + + InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock, + true, 0) { + @Override + public void run(Timeout timeout) throws Exception { + super.timeout = timer.newTimeout(this, 1, TimeUnit.MILLISECONDS); + if (timeout == null || timeout.isCancelled()) { + return; + } + try { + this.priorityQueue.peekN1(); + } catch (Exception e) { + e.printStackTrace(); + exceptions[0] = e; + } + } + }; + + tracker.addMessage(1, 1, 10); + clockTime.set(10); + + Thread.sleep(300); + + tracker.close(); + + assertNull(exceptions[0]); + + timer.stop(); + } }