diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index 3f6852e4085262..b5a63328c42b20 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -46,6 +46,14 @@ @InterfaceStability.Stable public interface ManagedCursor { + default CompletableFuture putCursorProperty(String key, String value) { + return CompletableFuture.completedFuture(null); + } + + default CompletableFuture removeCursorProperty(String key) { + return CompletableFuture.completedFuture(null); + } + @SuppressWarnings("checkstyle:javadoctype") enum FindPositionConstraint { SearchActiveEntries, SearchAllAvailableEntries diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/Bucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/Bucket.java new file mode 100644 index 00000000000000..4a3933f618e848 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/Bucket.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import static org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTracker.DELAYED_BUCKET_KEY_PREFIX; +import static org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTracker.DELIMITER; +import com.google.protobuf.ByteString; +import java.util.BitSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import lombok.AllArgsConstructor; +import lombok.Data; +import org.apache.commons.lang3.mutable.MutableLong; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata; + +@Data +@AllArgsConstructor +public class Bucket { + + long startLedgerId; + long endLedgerId; + + Map delayedIndexBitMap; + + long numberBucketDelayedMessages; + + int lastSegmentEntryId; + + int currentSegmentEntryId; + + long snapshotLength; + + boolean active; + + volatile CompletableFuture snapshotCreateFuture; + + Bucket(long startLedgerId, long endLedgerId, Map delayedIndexBitMap) { + this(startLedgerId, endLedgerId, delayedIndexBitMap, -1, -1, 0, 0, true, null); + } + + long covertDelayIndexMapAndCount(int startSnapshotIndex, List segmentMetadata) { + delayedIndexBitMap.clear(); + MutableLong numberMessages = new MutableLong(0); + for (int i = startSnapshotIndex; i < segmentMetadata.size(); i++) { + Map bitByteStringMap = segmentMetadata.get(i).getDelayedIndexBitMapMap(); + bitByteStringMap.forEach((k, v) -> { + boolean exist = delayedIndexBitMap.containsKey(k); + byte[] bytes = v.toByteArray(); + BitSet bitSet = BitSet.valueOf(bytes); + numberMessages.add(bitSet.cardinality()); + if (!exist) { + delayedIndexBitMap.put(k, bitSet); + } else { + delayedIndexBitMap.get(k).or(bitSet); + } + }); + } + return numberMessages.longValue(); + } + + boolean containsMessage(long ledgerId, int entryId) { + if (delayedIndexBitMap == null) { + return false; + } + + BitSet bitSet = delayedIndexBitMap.get(ledgerId); + if (bitSet == null) { + return false; + } + return bitSet.get(entryId); + } + + void putIndexBit(long ledgerId, long entryId) { + if (entryId < Integer.MAX_VALUE) { + delayedIndexBitMap.compute(ledgerId, (k, v) -> new BitSet()).set((int) entryId); + } + } + + boolean removeIndexBit(long ledgerId, int entryId) { + if (delayedIndexBitMap == null) { + return false; + } + + boolean contained = false; + BitSet bitSet = delayedIndexBitMap.get(ledgerId); + if (bitSet != null && bitSet.get(entryId)) { + contained = true; + bitSet.clear(entryId); + + if (bitSet.isEmpty()) { + delayedIndexBitMap.remove(ledgerId); + } + + if (numberBucketDelayedMessages > 0) { + numberBucketDelayedMessages--; + } + } + return contained; + } + + public String bucketKey() { + return String.join(DELIMITER, DELAYED_BUCKET_KEY_PREFIX, String.valueOf(startLedgerId), + String.valueOf(endLedgerId)); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTracker.java new file mode 100644 index 00000000000000..f5b1851eb6af41 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTracker.java @@ -0,0 +1,624 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import static com.google.common.base.Preconditions.checkArgument; +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Range; +import com.google.common.collect.RangeMap; +import com.google.common.collect.Table; +import com.google.common.collect.TreeRangeMap; +import com.google.protobuf.ByteString; +import io.netty.util.Timeout; +import io.netty.util.Timer; +import java.time.Clock; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.CompletableFuture; +import javax.annotation.concurrent.ThreadSafe; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.mutable.MutableLong; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata; +import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue; + +@Slf4j +@ThreadSafe +public class BucketDelayedDeliveryTracker extends InMemoryDelayedDeliveryTracker { + + public static final String DELAYED_BUCKET_KEY_PREFIX = "#pulsar.delayed.bucket"; + + public static final String DELIMITER = "_"; + + private final long minIndexCountPerBucket; + + private final long timeStepPerBucketSnapshotSegment; + + private final int maxNumBuckets; + + private final ManagedCursor cursor; + + public final BucketSnapshotStorage bucketSnapshotStorage; + + private long numberDelayedMessages; + + private final Bucket lastMutableBucket; + + private final TripleLongPriorityQueue sharedBucketPriorityQueue; + + private final RangeMap immutableBuckets; + + private final Table snapshotSegmentLastIndexTable; + + BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, + Timer timer, long tickTimeMillis, + boolean isDelayedDeliveryDeliverAtTimeStrict, + BucketSnapshotStorage bucketSnapshotStorage, + long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegment, + int maxNumBuckets) { + this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict, + bucketSnapshotStorage, minIndexCountPerBucket, timeStepPerBucketSnapshotSegment, maxNumBuckets); + } + + BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, + Timer timer, long tickTimeMillis, Clock clock, + boolean isDelayedDeliveryDeliverAtTimeStrict, + BucketSnapshotStorage bucketSnapshotStorage, + long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegment, + int maxNumBuckets) { + super(dispatcher, timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict); + this.minIndexCountPerBucket = minIndexCountPerBucket; + this.timeStepPerBucketSnapshotSegment = timeStepPerBucketSnapshotSegment; + this.maxNumBuckets = maxNumBuckets; + this.cursor = dispatcher.getCursor(); + this.sharedBucketPriorityQueue = new TripleLongPriorityQueue(); + this.immutableBuckets = TreeRangeMap.create(); + this.snapshotSegmentLastIndexTable = HashBasedTable.create(); + + this.bucketSnapshotStorage = bucketSnapshotStorage; + + numberDelayedMessages = recoverBucketSnapshot(); + + this.lastMutableBucket = new Bucket(-1L, -1L, new HashMap<>()); + } + + @SneakyThrows + private long recoverBucketSnapshot() { + List> completableFutures = new ArrayList<>(); + this.cursor.getCursorProperties().keySet().forEach(key -> { + if (key.startsWith(DELAYED_BUCKET_KEY_PREFIX)) { + String[] keys = key.split(DELIMITER); + checkArgument(keys.length == 3); + Bucket bucket = createImmutableBucket(Long.parseLong(keys[1]), Long.parseLong(keys[2])); + completableFutures.add(asyncLoadNextBucketSnapshotEntry(bucket, true)); + } + }); + + if (completableFutures.isEmpty()) { + return 0; + } + + FutureUtil.waitForAll(completableFutures).get(); + + MutableLong numberDelayedMessages = new MutableLong(0); + immutableBuckets.asMapOfRanges().values().forEach(bucket -> { + numberDelayedMessages.add(bucket.numberBucketDelayedMessages); + }); + + log.info("[{}] Recover delayed message index bucket snapshot finish, buckets: {}, numberDelayedMessages: {}", + dispatcher.getName(), immutableBuckets.asMapOfRanges().size(), numberDelayedMessages.getValue()); + + return numberDelayedMessages.getValue(); + } + + private void moveScheduledMessageToSharedQueue(long cutoffTime) { + TripleLongPriorityQueue priorityQueue = getPriorityQueue(); + while (!priorityQueue.isEmpty()) { + long timestamp = priorityQueue.peekN1(); + if (timestamp > cutoffTime) { + break; + } + + long ledgerId = priorityQueue.peekN2(); + long entryId = priorityQueue.peekN3(); + sharedBucketPriorityQueue.add(timestamp, ledgerId, entryId); + + priorityQueue.pop(); + } + } + + @Override + public void run(Timeout timeout) throws Exception { + synchronized (this) { + moveScheduledMessageToSharedQueue(getCutoffTime()); + } + super.run(timeout); + } + + private Optional findBucket(long ledgerId) { + if (immutableBuckets.asMapOfRanges().isEmpty()) { + return Optional.empty(); + } + + Range span = immutableBuckets.span(); + if (!span.contains(ledgerId)) { + return Optional.empty(); + } + return Optional.ofNullable(immutableBuckets.get(ledgerId)); + } + + private Long getBucketIdByBucketKey(String bucketKey) { + String bucketIdStr = cursor.getCursorProperties().get(bucketKey); + if (StringUtils.isBlank(bucketIdStr)) { + return null; + } + return Long.valueOf(bucketIdStr); + } + + private Bucket createImmutableBucket(long startLedgerId, long endLedgerId) { + Bucket newBucket = new Bucket(startLedgerId, endLedgerId, new HashMap<>()); + immutableBuckets.put(Range.closed(startLedgerId, endLedgerId), newBucket); + return newBucket; + } + + private CompletableFuture asyncSaveBucketSnapshot( + final String bucketKey, SnapshotMetadata snapshotMetadata, + List bucketSnapshotSegments) { + Long bucketId = getBucketIdByBucketKey(bucketKey); + checkArgument(bucketId == null); + + return bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata, bucketSnapshotSegments) + .thenCompose(newBucketId -> { + Objects.requireNonNull(newBucketId); + return cursor.putCursorProperty(bucketKey, String.valueOf(newBucketId)) + .thenApply(__ -> newBucketId); + }); + } + + private CompletableFuture asyncCreateBucketSnapshot() { + TripleLongPriorityQueue priorityQueue = super.getPriorityQueue(); + if (priorityQueue.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + long numMessages = 0; + + final long startLedgerId = lastMutableBucket.startLedgerId; + final long endLedgerId = lastMutableBucket.endLedgerId; + + List bucketSnapshotSegments = new ArrayList<>(); + List segmentMetadataList = new ArrayList<>(); + Map bitMap = new HashMap<>(); + SnapshotSegment.Builder snapshotSegmentBuilder = SnapshotSegment.newBuilder(); + SnapshotSegmentMetadata.Builder segmentMetadataBuilder = SnapshotSegmentMetadata.newBuilder(); + + long currentTimestampUpperLimit = 0; + while (!priorityQueue.isEmpty()) { + long timestamp = priorityQueue.peekN1(); + if (currentTimestampUpperLimit == 0) { + currentTimestampUpperLimit = timestamp + timeStepPerBucketSnapshotSegment - 1; + } + + long ledgerId = priorityQueue.peekN2(); + long entryId = priorityQueue.peekN3(); + + checkArgument(ledgerId >= startLedgerId && ledgerId <= endLedgerId); + + // Move first segment of bucket snapshot to sharedBucketPriorityQueue + if (segmentMetadataList.size() == 0) { + sharedBucketPriorityQueue.add(timestamp, ledgerId, entryId); + } + + priorityQueue.pop(); + numMessages++; + + DelayedIndex delayedIndex = DelayedIndex.newBuilder() + .setTimestamp(timestamp) + .setLedgerId(ledgerId) + .setEntryId(entryId).build(); + + if (entryId <= Integer.MAX_VALUE) { + bitMap.compute(ledgerId, (k, v) -> new BitSet()).set((int) entryId); + } + + snapshotSegmentBuilder.addIndexes(delayedIndex); + + if (priorityQueue.isEmpty() || priorityQueue.peekN1() > currentTimestampUpperLimit) { + segmentMetadataBuilder.setMaxScheduleTimestamp(timestamp); + currentTimestampUpperLimit = 0; + + Iterator> iterator = bitMap.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + ByteString byteString = ByteString.copyFrom(entry.getValue().toByteArray()); + segmentMetadataBuilder.putDelayedIndexBitMap(entry.getKey(), byteString); + iterator.remove(); + } + + segmentMetadataList.add(segmentMetadataBuilder.build()); + segmentMetadataBuilder.clear(); + + bucketSnapshotSegments.add(snapshotSegmentBuilder.build()); + snapshotSegmentBuilder.clear(); + } + } + + SnapshotMetadata bucketSnapshotMetadata = SnapshotMetadata.newBuilder() + .addAllMetadataList(segmentMetadataList) + .build(); + + final int lastSegmentEntryId = segmentMetadataList.size(); + + Bucket bucket = this.createImmutableBucket(startLedgerId, endLedgerId); + bucket.setCurrentSegmentEntryId(1); + bucket.setNumberBucketDelayedMessages(numMessages); + bucket.setLastSegmentEntryId(lastSegmentEntryId); + + // Add the first snapshot segment last message to snapshotSegmentLastMessageTable + checkArgument(!bucketSnapshotSegments.isEmpty()); + SnapshotSegment snapshotSegment = bucketSnapshotSegments.get(0); + DelayedIndex delayedIndex = snapshotSegment.getIndexes(snapshotSegment.getIndexesCount() - 1); + snapshotSegmentLastIndexTable.put(delayedIndex.getLedgerId(), delayedIndex.getEntryId(), bucket); + + if (log.isDebugEnabled()) { + log.debug("[{}] Create bucket snapshot, bucket: {}", dispatcher.getName(), bucket); + } + + String bucketKey = bucket.bucketKey(); + CompletableFuture future = asyncSaveBucketSnapshot(bucketKey, + bucketSnapshotMetadata, bucketSnapshotSegments); + bucket.setSnapshotCreateFuture(future); + future.whenComplete((__, ex) -> { + if (ex == null) { + bucket.setSnapshotCreateFuture(null); + } else { + //TODO Record create snapshot failed + log.error("Failed to create snapshot: ", ex); + } + }); + + return future; + } + + + @SneakyThrows + private CompletableFuture asyncLoadNextBucketSnapshotEntry(Bucket bucket, boolean isRebuild) { + if (log.isDebugEnabled()) { + log.debug("[{}] Load next bucket snapshot data, bucket: {}", dispatcher.getName(), bucket); + } + if (bucket == null) { + return CompletableFuture.completedFuture(null); + } + + final CompletableFuture createFuture = bucket.snapshotCreateFuture; + if (createFuture != null) { + // Wait bucket snapshot create finish + createFuture.get(); + } + + final String bucketKey = bucket.bucketKey(); + final Long bucketId = getBucketIdByBucketKey(bucketKey); + Objects.requireNonNull(bucketId); + + CompletableFuture loadMetaDataFuture = new CompletableFuture<>(); + if (isRebuild) { + final long cutoffTime = getCutoffTime(); + // Load Metadata of bucket snapshot + bucketSnapshotStorage.getBucketSnapshotMetadata(bucketId).thenAccept(snapshotMetadata -> { + List metadataList = snapshotMetadata.getMetadataListList(); + + // Skip all already reach schedule time snapshot segments + int nextSnapshotEntryIndex = 0; + while (nextSnapshotEntryIndex < metadataList.size() + && metadataList.get(nextSnapshotEntryIndex).getMaxScheduleTimestamp() <= cutoffTime) { + nextSnapshotEntryIndex++; + } + + final int lastSegmentEntryId = metadataList.size(); + + long numberMessages = bucket.covertDelayIndexMapAndCount(nextSnapshotEntryIndex, metadataList); + bucket.setNumberBucketDelayedMessages(numberMessages); + bucket.setLastSegmentEntryId(lastSegmentEntryId); + + int nextSegmentEntryId = nextSnapshotEntryIndex + 1; + loadMetaDataFuture.complete(nextSegmentEntryId); + }); + } else { + loadMetaDataFuture.complete(bucket.currentSegmentEntryId + 1); + } + + CompletableFuture future = loadMetaDataFuture.thenCompose(nextSegmentEntryId -> { + if (nextSegmentEntryId > bucket.lastSegmentEntryId) { + // TODO Delete bucket snapshot + return CompletableFuture.completedFuture(null); + } + + return bucketSnapshotStorage.getBucketSnapshotSegment(bucketId, nextSegmentEntryId, nextSegmentEntryId) + .thenAccept(bucketSnapshotSegments -> { + if (CollectionUtils.isEmpty(bucketSnapshotSegments)) { + return; + } + + SnapshotSegment snapshotSegment = bucketSnapshotSegments.get(0); + List indexList = snapshotSegment.getIndexesList(); + DelayedIndex lastDelayedIndex = indexList.get(indexList.size() - 1); + + // Rebuild delayed message index bucket load data in parallel, so should be use synchronized + // to ensure data consistency + if (isRebuild) { + synchronized (snapshotSegmentLastIndexTable) { + this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(), + lastDelayedIndex.getEntryId(), bucket); + } + + synchronized (sharedBucketPriorityQueue) { + for (DelayedIndex index : indexList) { + sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(), + index.getEntryId()); + } + } + } else { + this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(), + lastDelayedIndex.getEntryId(), bucket); + + for (DelayedIndex index : indexList) { + sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(), + index.getEntryId()); + } + } + + bucket.setCurrentSegmentEntryId(nextSegmentEntryId); + }); + }); + return future; + } + + private void resetLastMutableBucketRange() { + lastMutableBucket.setStartLedgerId(-1L); + lastMutableBucket.setEndLedgerId(-1L); + } + + @Override + public synchronized boolean addMessage(long ledgerId, long entryId, long deliverAt) { + if (containsMessage(ledgerId, entryId)) { + messagesHaveFixedDelay = false; + return true; + } + + if (deliverAt < 0 || deliverAt <= getCutoffTime()) { + messagesHaveFixedDelay = false; + return false; + } + + boolean existBucket = findBucket(ledgerId).isPresent(); + + // Create bucket snapshot + if (ledgerId > lastMutableBucket.endLedgerId && !getPriorityQueue().isEmpty()) { + if (getPriorityQueue().size() >= minIndexCountPerBucket || existBucket) { + if (immutableBuckets.asMapOfRanges().size() >= maxNumBuckets) { + // TODO merge bucket snapshot (synchronize operate) + } + + asyncCreateBucketSnapshot(); + resetLastMutableBucketRange(); + } + } + + if (ledgerId < lastMutableBucket.startLedgerId || existBucket) { + // If (ledgerId < startLedgerId || existBucket) means that message index belong to previous bucket range, + // enter sharedBucketPriorityQueue directly + sharedBucketPriorityQueue.add(deliverAt, ledgerId, entryId); + } else { + checkArgument(ledgerId >= lastMutableBucket.endLedgerId); + + getPriorityQueue().add(deliverAt, ledgerId, entryId); + + if (lastMutableBucket.startLedgerId == -1L) { + lastMutableBucket.setStartLedgerId(ledgerId); + } + lastMutableBucket.setEndLedgerId(ledgerId); + } + + // TODO If the bitSet is sparse, this memory cost very high to deduplication and skip read message + lastMutableBucket.putIndexBit(ledgerId, entryId); + numberDelayedMessages++; + + if (log.isDebugEnabled()) { + log.debug("[{}] Add message {}:{} -- Delivery in {} ms ", dispatcher.getName(), ledgerId, entryId, + deliverAt - clock.millis()); + } + + updateTimer(); + + checkAndUpdateHighest(deliverAt); + + return true; + } + + @Override + public synchronized boolean hasMessageAvailable() { + long cutoffTime = getCutoffTime(); + + boolean hasMessageAvailable = !getPriorityQueue().isEmpty() && getPriorityQueue().peekN1() <= cutoffTime; + + hasMessageAvailable = hasMessageAvailable + || !sharedBucketPriorityQueue.isEmpty() && sharedBucketPriorityQueue.peekN1() <= cutoffTime; + if (!hasMessageAvailable) { + updateTimer(); + } + return hasMessageAvailable; + } + + @Override + protected long nextDeliveryTime() { + if (getPriorityQueue().isEmpty() && !sharedBucketPriorityQueue.isEmpty()) { + return sharedBucketPriorityQueue.peekN1(); + } else if (sharedBucketPriorityQueue.isEmpty() && !getPriorityQueue().isEmpty()) { + return getPriorityQueue().peekN1(); + } + long timestamp = getPriorityQueue().peekN1(); + long bucketTimestamp = sharedBucketPriorityQueue.peekN1(); + return Math.min(timestamp, bucketTimestamp); + } + + @Override + public synchronized long getNumberOfDelayedMessages() { + return numberDelayedMessages; + } + + @Override + public synchronized long getBufferMemoryUsage() { + return getPriorityQueue().bytesCapacity() + sharedBucketPriorityQueue.bytesCapacity(); + } + + @Override + @SneakyThrows + public synchronized Set getScheduledMessages(int maxMessages) { + long cutoffTime = getCutoffTime(); + + moveScheduledMessageToSharedQueue(cutoffTime); + + Set positions = new TreeSet<>(); + int n = maxMessages; + + while (n > 0 && !sharedBucketPriorityQueue.isEmpty()) { + long timestamp = sharedBucketPriorityQueue.peekN1(); + if (timestamp > cutoffTime) { + break; + } + + long ledgerId = sharedBucketPriorityQueue.peekN2(); + long entryId = sharedBucketPriorityQueue.peekN3(); + positions.add(new PositionImpl(ledgerId, entryId)); + + sharedBucketPriorityQueue.pop(); + removeIndexBit(ledgerId, entryId); + + Bucket bucket = snapshotSegmentLastIndexTable.remove(ledgerId, entryId); + if (bucket != null && bucket.active) { + if (log.isDebugEnabled()) { + log.debug("[{}] Load next snapshot segment, bucket: {}", dispatcher.getName(), bucket); + } + // All message of current snapshot segment are scheduled, load next snapshot segment + asyncLoadNextBucketSnapshotEntry(bucket, false).get(); + } + + --n; + --numberDelayedMessages; + } + + if (numberDelayedMessages <= 0) { + // Reset to initial state + highestDeliveryTimeTracked = 0; + messagesHaveFixedDelay = true; + } + + updateTimer(); + + return positions; + } + + @Override + @SneakyThrows + public synchronized void clear() { + super.clear(); + cleanImmutableBuckets(true); + sharedBucketPriorityQueue.clear(); + resetLastMutableBucketRange(); + lastMutableBucket.delayedIndexBitMap.clear(); + snapshotSegmentLastIndexTable.clear(); + numberDelayedMessages = 0; + } + + @Override + @SneakyThrows + public synchronized void close() { + super.close(); + cleanImmutableBuckets(false); + lastMutableBucket.delayedIndexBitMap.clear(); + sharedBucketPriorityQueue.close(); + } + + private void cleanImmutableBuckets(boolean delete) { + if (immutableBuckets != null) { + Iterator iterator = immutableBuckets.asMapOfRanges().values().iterator(); + while (iterator.hasNext()) { + Bucket bucket = iterator.next(); + if (bucket.delayedIndexBitMap != null) { + bucket.delayedIndexBitMap.clear(); + } + CompletableFuture snapshotGenerateFuture = bucket.snapshotCreateFuture; + if (snapshotGenerateFuture != null) { + if (delete) { + snapshotGenerateFuture.cancel(true); + // TODO delete bucket snapshot + } else { + try { + snapshotGenerateFuture.get(); + } catch (Exception e) { + log.warn("Failed wait to snapshot generate, bucket: {}", bucket); + } + } + } + iterator.remove(); + } + } + } + + private boolean removeIndexBit(long ledgerId, long entryId) { + if (entryId > Integer.MAX_VALUE) { + return false; + } + + if (lastMutableBucket.removeIndexBit(ledgerId, (int) entryId)) { + return true; + } + + return findBucket(ledgerId).map(bucket -> bucket.removeIndexBit(ledgerId, (int) entryId)).orElse(false); + } + + @Override + public boolean containsMessage(long ledgerId, long entryId) { + if (entryId > Integer.MAX_VALUE) { + return false; + } + + if (lastMutableBucket.containsMessage(ledgerId, (int) entryId)) { + return true; + } + + return findBucket(ledgerId).map(bucket -> bucket.containsMessage(ledgerId, (int) entryId)).orElse(false); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index 390ce5d5071b4d..d34086d722f2f2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -35,13 +35,13 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T private final TripleLongPriorityQueue priorityQueue = new TripleLongPriorityQueue(); - private final PersistentDispatcherMultipleConsumers dispatcher; + protected final PersistentDispatcherMultipleConsumers dispatcher; // Reference to the shared (per-broker) timer for delayed delivery private final Timer timer; // Current timeout or null if not set - private Timeout timeout; + protected Timeout timeout; // Timestamp at which the timeout is currently set private long currentTimeoutTarget; @@ -49,9 +49,9 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T // Last time the TimerTask was triggered for this class private long lastTickRun; - private long tickTimeMillis; + protected long tickTimeMillis; - private final Clock clock; + protected final Clock clock; private final boolean isDelayedDeliveryDeliverAtTimeStrict; @@ -64,10 +64,10 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T // This is the timestamp of the message with the highest delivery time // If new added messages are lower than this, it means the delivery is requested // to be out-of-order. It gets reset to 0, once the tracker is emptied. - private long highestDeliveryTimeTracked = 0; + protected long highestDeliveryTimeTracked = 0; // Track whether we have seen all messages with fixed delay so far. - private boolean messagesHaveFixedDelay = true; + protected boolean messagesHaveFixedDelay = true; InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis, boolean isDelayedDeliveryDeliverAtTimeStrict) { @@ -94,7 +94,7 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T * * @return the cutoff time to determine whether a message is ready to deliver to the consumer */ - private long getCutoffTime() { + protected long getCutoffTime() { return isDelayedDeliveryDeliverAtTimeStrict ? clock.millis() : clock.millis() + tickTimeMillis; } @@ -114,15 +114,21 @@ public boolean addMessage(long ledgerId, long entryId, long deliverAt) { priorityQueue.add(deliverAt, ledgerId, entryId); updateTimer(); - // Check that new delivery time comes after the current highest, or at - // least within a single tick time interval of 1 second. + checkAndUpdateHighest(deliverAt); + + return true; + } + + /** + * Check that new delivery time comes after the current highest, or at + * least within a single tick time interval of 1 second. + */ + protected void checkAndUpdateHighest(long deliverAt) { if (deliverAt < (highestDeliveryTimeTracked - tickTimeMillis)) { messagesHaveFixedDelay = false; } highestDeliveryTimeTracked = Math.max(highestDeliveryTimeTracked, deliverAt); - - return true; } /** @@ -192,6 +198,7 @@ public long getNumberOfDelayedMessages() { return priorityQueue.size(); } + @Override public long getBufferMemoryUsage() { return priorityQueue.bytesCapacity(); } @@ -207,8 +214,8 @@ public long getBufferMemoryUsage() { * the last tick time plus the tickTimeMillis (to ensure we do not schedule the task more frequently than the * tickTimeMillis). */ - private void updateTimer() { - if (priorityQueue.isEmpty()) { + protected void updateTimer() { + if (getNumberOfDelayedMessages() == 0) { if (timeout != null) { currentTimeoutTarget = -1; timeout.cancel(); @@ -216,8 +223,7 @@ private void updateTimer() { } return; } - - long timestamp = priorityQueue.peekN1(); + long timestamp = nextDeliveryTime(); if (timestamp == currentTimeoutTarget) { // The timer is already set to the correct target time return; @@ -273,17 +279,29 @@ public void run(Timeout timeout) throws Exception { @Override public void close() { - priorityQueue.close(); if (timeout != null) { timeout.cancel(); } + priorityQueue.close(); } @Override public boolean shouldPauseAllDeliveries() { // Pause deliveries if we know all delays are fixed within the lookahead window return messagesHaveFixedDelay - && priorityQueue.size() >= DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES + && getNumberOfDelayedMessages() >= DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES && !hasMessageAvailable(); } + + @Override + public boolean containsMessage(long ledgerId, long entryId) { + return false; + } + + protected TripleLongPriorityQueue getPriorityQueue() { + return priorityQueue; + } + protected long nextDeliveryTime() { + return priorityQueue.peekN1(); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 02d2e725379b65..858fd8e7217fa3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -1124,6 +1124,10 @@ public long getDelayedTrackerMemoryUsage() { return 0; } + public ManagedCursor getCursor() { + return cursor; + } + protected int getStickyKeyHash(Entry entry) { return StickyKeyConsumerSelector.makeStickyKeyHash(peekStickyKey(entry.getDataBuffer())); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BuketDelayedDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BuketDelayedDeliveryTrackerTest.java new file mode 100644 index 00000000000000..5bce448ff83db2 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BuketDelayedDeliveryTrackerTest.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.delayed; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; +import io.netty.util.Timer; +import io.netty.util.TimerTask; +import io.netty.util.concurrent.DefaultThreadFactory; +import java.lang.reflect.Method; +import java.time.Clock; +import java.util.NavigableMap; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +public class BuketDelayedDeliveryTrackerTest extends InMemoryDeliveryTrackerTest { + + private final Timer timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-bucket-delayed-delivery-test"), + 500, TimeUnit.MILLISECONDS); + + private BucketSnapshotStorage bucketSnapshotStorage; + + @AfterMethod + public void clean() throws Exception { + if (bucketSnapshotStorage != null) { + bucketSnapshotStorage.close(); + } + } + + @DataProvider(name = "delayedTracker") + @Override + public Object[][] provider(Method method) throws Exception { + dispatcher = mock(PersistentDispatcherMultipleConsumers.class); + clock = mock(Clock.class); + clockTime = new AtomicLong(); + when(clock.millis()).then(x -> clockTime.get()); + + bucketSnapshotStorage = new MockBucketSnapshotStorage(); + bucketSnapshotStorage.start(); + ManagedCursor cursor = new MockManagedCursor("my_test_cursor"); + doReturn(cursor).when(dispatcher).getCursor(); + + final String methodName = method.getName(); + return switch (methodName) { + case "test" -> new Object[][]{{ + new BucketDelayedDeliveryTracker(dispatcher, timer, 1, clock, + false, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50) + }}; + case "testWithTimer" -> { + Timer timer = mock(Timer.class); + + AtomicLong clockTime = new AtomicLong(); + Clock clock = mock(Clock.class); + when(clock.millis()).then(x -> clockTime.get()); + + NavigableMap tasks = new TreeMap<>(); + + when(timer.newTimeout(any(), anyLong(), any())).then(invocation -> { + TimerTask task = invocation.getArgument(0, TimerTask.class); + long timeout = invocation.getArgument(1, Long.class); + TimeUnit unit = invocation.getArgument(2, TimeUnit.class); + long scheduleAt = clockTime.get() + unit.toMillis(timeout); + tasks.put(scheduleAt, task); + + Timeout t = mock(Timeout.class); + when(t.cancel()).then(i -> { + tasks.remove(scheduleAt, task); + return null; + }); + return t; + }); + + yield new Object[][]{{ + new BucketDelayedDeliveryTracker(dispatcher, timer, 1, clock, + false, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50), + tasks + }}; + } + case "testAddWithinTickTime" -> new Object[][]{{ + new BucketDelayedDeliveryTracker(dispatcher, timer, 100, clock, + false, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50) + }}; + case "testAddMessageWithStrictDelay" -> new Object[][]{{ + new BucketDelayedDeliveryTracker(dispatcher, timer, 100, clock, + true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50) + }}; + case "testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict" -> new Object[][]{{ + new BucketDelayedDeliveryTracker(dispatcher, timer, 1000, clock, + true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50) + }}; + case "testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict", "testRecoverSnapshot" -> new Object[][]{{ + new BucketDelayedDeliveryTracker(dispatcher, timer, 100000, clock, + true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50) + }}; + case "testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict", "testExistDelayedMessage" -> new Object[][]{{ + new BucketDelayedDeliveryTracker(dispatcher, timer, 500, clock, + true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50) + }}; + default -> new Object[][]{{ + new BucketDelayedDeliveryTracker(dispatcher, timer, 1, clock, + true, bucketSnapshotStorage, 1000, TimeUnit.MILLISECONDS.toMillis(100), 50) + }}; + }; + } + + @Test(dataProvider = "delayedTracker") + public void testContainsMessage(DelayedDeliveryTracker tracker) { + tracker.addMessage(1, 1, 10); + tracker.addMessage(2, 2, 20); + + assertTrue(tracker.containsMessage(1, 1)); + clockTime.set(20); + + Set scheduledMessages = tracker.getScheduledMessages(1); + assertEquals(scheduledMessages.stream().findFirst().get().getEntryId(), 1); + + tracker.addMessage(3, 3, 30); + + tracker.addMessage(4, 4, 30); + + tracker.addMessage(5, 5, 30); + + tracker.addMessage(6, 6, 30); + + assertTrue(tracker.containsMessage(3, 3)); + + tracker.close(); + } + + @Test(dataProvider = "delayedTracker") + public void testRecoverSnapshot(DelayedDeliveryTracker tracker) throws Exception { + for (int i = 1; i <= 100; i++) { + tracker.addMessage(i, i, i * 10); + } + + assertEquals(tracker.getNumberOfDelayedMessages(), 100); + + clockTime.set(1 * 10); + + assertTrue(tracker.hasMessageAvailable()); + Set scheduledMessages = tracker.getScheduledMessages(100); + + assertEquals(scheduledMessages.size(), 1); + + tracker.addMessage(101, 101, 101 * 10); + + tracker.close(); + + clockTime.set(30 * 10); + + tracker = new BucketDelayedDeliveryTracker(dispatcher, timer, 1000, clock, + true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50); + + assertFalse(tracker.containsMessage(101, 101)); + assertEquals(tracker.getNumberOfDelayedMessages(), 70); + + clockTime.set(100 * 10); + + assertTrue(tracker.hasMessageAvailable()); + scheduledMessages = tracker.getScheduledMessages(70); + + assertEquals(scheduledMessages.size(), 70); + tracker.close(); + } + diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java index db2db6cc1dbb06..069f335c6036db 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java @@ -20,21 +20,20 @@ import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.atMostOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoInteractions; -import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; - import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.Timer; import io.netty.util.TimerTask; - +import io.netty.util.concurrent.DefaultThreadFactory; +import java.lang.reflect.Method; import java.time.Clock; import java.util.Collections; import java.util.NavigableMap; @@ -42,14 +41,11 @@ import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; - -import io.netty.util.concurrent.DefaultThreadFactory; -import lombok.Cleanup; - import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.awaitility.Awaitility; import org.testng.annotations.AfterClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Test(groups = "broker") @@ -58,31 +54,95 @@ public class InMemoryDeliveryTrackerTest { // Create a single shared timer for the test. private final Timer timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-in-memory-delayed-delivery-test"), 500, TimeUnit.MILLISECONDS); + protected PersistentDispatcherMultipleConsumers dispatcher; + protected Clock clock; + + protected AtomicLong clockTime; @AfterClass(alwaysRun = true) public void cleanup() { timer.stop(); } - @Test - public void test() throws Exception { - PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class); - - AtomicLong clockTime = new AtomicLong(); - Clock clock = mock(Clock.class); + @DataProvider(name = "delayedTracker") + public Object[][] provider(Method method) throws Exception { + dispatcher = mock(PersistentDispatcherMultipleConsumers.class); + clock = mock(Clock.class); + clockTime = new AtomicLong(); when(clock.millis()).then(x -> clockTime.get()); - @Cleanup - InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock, - false); + final String methodName = method.getName(); + return switch (methodName) { + case "test" -> new Object[][]{{ + new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock, + false) + }}; + case "testWithTimer" -> { + Timer timer = mock(Timer.class); + + AtomicLong clockTime = new AtomicLong(); + Clock clock = mock(Clock.class); + when(clock.millis()).then(x -> clockTime.get()); + + NavigableMap tasks = new TreeMap<>(); + + when(timer.newTimeout(any(), anyLong(), any())).then(invocation -> { + TimerTask task = invocation.getArgument(0, TimerTask.class); + long timeout = invocation.getArgument(1, Long.class); + TimeUnit unit = invocation.getArgument(2, TimeUnit.class); + long scheduleAt = clockTime.get() + unit.toMillis(timeout); + tasks.put(scheduleAt, task); + + Timeout t = mock(Timeout.class); + when(t.cancel()).then(i -> { + tasks.remove(scheduleAt, task); + return null; + }); + return t; + }); + + yield new Object[][]{{ + new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock, + false), + tasks + }}; + } + case "testAddWithinTickTime" -> new Object[][]{{ + new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock, + false) + }}; + case "testAddMessageWithStrictDelay" -> new Object[][]{{ + new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock, + true) + }}; + case "testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict" -> new Object[][]{{ + new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1000, clock, + true) + }}; + case "testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict" -> new Object[][]{{ + new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100000, clock, + true) + }}; + case "testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict" -> new Object[][]{{ + new InMemoryDelayedDeliveryTracker(dispatcher, timer, 500, clock, + true) + }}; + default -> new Object[][]{{ + new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock, + true) + }}; + }; + } + @Test(dataProvider = "delayedTracker") + public void test(DelayedDeliveryTracker tracker) throws Exception { assertFalse(tracker.hasMessageAvailable()); - assertTrue(tracker.addMessage(2, 2, 20)); - assertTrue(tracker.addMessage(1, 1, 10)); + assertTrue(tracker.addMessage(1, 2, 20)); + assertTrue(tracker.addMessage(2, 1, 10)); assertTrue(tracker.addMessage(3, 3, 30)); - assertTrue(tracker.addMessage(5, 5, 50)); - assertTrue(tracker.addMessage(4, 4, 40)); + assertTrue(tracker.addMessage(4, 5, 50)); + assertTrue(tracker.addMessage(5, 4, 40)); assertFalse(tracker.hasMessageAvailable()); assertEquals(tracker.getNumberOfDelayedMessages(), 5); @@ -116,38 +176,12 @@ public void test() throws Exception { assertEquals(tracker.getNumberOfDelayedMessages(), 0); assertFalse(tracker.hasMessageAvailable()); assertEquals(tracker.getScheduledMessages(10), Collections.emptySet()); - } - - @Test - public void testWithTimer() throws Exception { - PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class); - Timer timer = mock(Timer.class); - - AtomicLong clockTime = new AtomicLong(); - Clock clock = mock(Clock.class); - when(clock.millis()).then(x -> clockTime.get()); - - NavigableMap tasks = new TreeMap<>(); - - when(timer.newTimeout(any(), anyLong(), any())).then(invocation -> { - TimerTask task = invocation.getArgument(0, TimerTask.class); - long timeout = invocation.getArgument(1, Long.class); - TimeUnit unit = invocation.getArgument(2, TimeUnit.class); - long scheduleAt = clockTime.get() + unit.toMillis(timeout); - tasks.put(scheduleAt, task); - - Timeout t = mock(Timeout.class); - when(t.cancel()).then(i -> { - tasks.remove(scheduleAt, task); - return null; - }); - return t; - }); - @Cleanup - InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock, - false); + tracker.close(); + } + @Test(dataProvider = "delayedTracker") + public void testWithTimer(DelayedDeliveryTracker tracker, NavigableMap tasks) throws Exception { assertTrue(tasks.isEmpty()); assertTrue(tracker.addMessage(2, 2, 20)); assertEquals(tasks.size(), 1); @@ -167,28 +201,20 @@ public void testWithTimer() throws Exception { Timeout cancelledTimeout = mock(Timeout.class); when(cancelledTimeout.isCancelled()).thenReturn(true); task.run(cancelledTimeout); - verifyZeroInteractions(dispatcher); + verify(dispatcher, atMostOnce()).readMoreEntries(); task.run(mock(Timeout.class)); verify(dispatcher).readMoreEntries(); + + tracker.close(); } /** * Adding a message that is about to expire within the tick time should lead * to a rejection from the tracker when isDelayedDeliveryDeliverAtTimeStrict is false. */ - @Test - public void testAddWithinTickTime() { - PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class); - - AtomicLong clockTime = new AtomicLong(); - Clock clock = mock(Clock.class); - when(clock.millis()).then(x -> clockTime.get()); - - @Cleanup - InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock, - false); - + @Test(dataProvider = "delayedTracker") + public void testAddWithinTickTime(DelayedDeliveryTracker tracker) { clockTime.set(0); assertFalse(tracker.addMessage(1, 1, 10)); @@ -198,19 +224,12 @@ public void testAddWithinTickTime() { assertTrue(tracker.addMessage(5, 5, 200)); assertEquals(tracker.getNumberOfDelayedMessages(), 2); - } - - public void testAddMessageWithStrictDelay() { - PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class); - AtomicLong clockTime = new AtomicLong(); - Clock clock = mock(Clock.class); - when(clock.millis()).then(x -> clockTime.get()); - - @Cleanup - InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, clock, - true); + tracker.close(); + } + @Test(dataProvider = "delayedTracker") + public void testAddMessageWithStrictDelay(DelayedDeliveryTracker tracker) { clockTime.set(10); // Verify behavior for the less than, equal to, and greater than deliverAt times. @@ -220,29 +239,22 @@ public void testAddMessageWithStrictDelay() { assertEquals(tracker.getNumberOfDelayedMessages(), 1); assertFalse(tracker.hasMessageAvailable()); + + tracker.close(); } /** * In this test, the deliverAt time is after now, but the deliverAt time is too early to run another tick, so the * tickTimeMillis determines the delay. */ - public void testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict() throws Exception { - PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class); - - AtomicLong clockTime = new AtomicLong(); - Clock clock = mock(Clock.class); - when(clock.millis()).then(x -> clockTime.get()); - - // Use a short tick time to show that the timer task is run based on the deliverAt time in this scenario. - @Cleanup - InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, - 1000, clock, true); - + @Test(dataProvider = "delayedTracker") + public void testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict(DelayedDeliveryTracker tracker) + throws Exception { // Set clock time, then run tracker to inherit clock time as the last tick time. clockTime.set(10000); Timeout timeout = mock(Timeout.class); when(timeout.isCancelled()).then(x -> false); - tracker.run(timeout); + ((InMemoryDelayedDeliveryTracker) tracker).run(timeout); verify(dispatcher, times(1)).readMoreEntries(); // Add a message that has a delivery time just after the previous run. It will get delivered based on the @@ -257,25 +269,17 @@ public void testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithSt // Not wait for the message delivery to get triggered. Awaitility.await().atMost(10, TimeUnit.SECONDS) .untilAsserted(() -> verify(dispatcher).readMoreEntries()); + + tracker.close(); } /** * In this test, the deliverAt time is after now, but before the (tickTimeMillis + now). Because there wasn't a * recent tick run, the deliverAt time determines the delay. */ - public void testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict() { - PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class); - - AtomicLong clockTime = new AtomicLong(); - Clock clock = mock(Clock.class); - when(clock.millis()).then(x -> clockTime.get()); - - // Use a large tick time to show that the message will get delivered earlier because there wasn't - // a previous tick run. - @Cleanup - InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, - 100000, clock, true); - + @Test(dataProvider = "delayedTracker") + public void testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict( + DelayedDeliveryTracker tracker) { clockTime.set(500000); assertTrue(tracker.addMessage(1, 1, 500005)); @@ -284,23 +288,16 @@ public void testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStr // should get scheduled early when the tick duration has passed since the last tick. Awaitility.await().atMost(10, TimeUnit.SECONDS) .untilAsserted(() -> verify(dispatcher).readMoreEntries()); + + tracker.close(); } /** * In this test, the deliverAt time is after now plus tickTimeMillis, so the tickTimeMillis determines the delay. */ - public void testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict() throws Exception { - PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class); - - AtomicLong clockTime = new AtomicLong(); - Clock clock = mock(Clock.class); - when(clock.millis()).then(x -> clockTime.get()); - - // Use a short tick time to show that the timer task is run based on the deliverAt time in this scenario. - @Cleanup - InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, - 500, clock, true); - + @Test(dataProvider = "delayedTracker") + public void testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict(DelayedDeliveryTracker tracker) + throws Exception { clockTime.set(0); assertTrue(tracker.addMessage(1, 1, 2000)); @@ -308,25 +305,17 @@ public void testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict() throws // Wait longer than the tick time plus the HashedWheelTimer's tick time to ensure that enough time has // passed where it would have been triggered if the tick time was doing the triggering. Thread.sleep(1000); - verifyNoInteractions(dispatcher); + verify(dispatcher).getCursor(); // Not wait for the message delivery to get triggered. Awaitility.await().atMost(10, TimeUnit.SECONDS) .untilAsserted(() -> verify(dispatcher).readMoreEntries()); - } - - @Test - public void testWithFixedDelays() throws Exception { - PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class); - AtomicLong clockTime = new AtomicLong(); - Clock clock = mock(Clock.class); - when(clock.millis()).then(x -> clockTime.get()); - - @Cleanup - InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock, - true); + tracker.close(); + } + @Test(dataProvider = "delayedTracker") + public void testWithFixedDelays(DelayedDeliveryTracker tracker) throws Exception { assertFalse(tracker.hasMessageAvailable()); assertTrue(tracker.addMessage(1, 1, 10)); @@ -348,6 +337,7 @@ public void testWithFixedDelays() throws Exception { clockTime.set(InMemoryDelayedDeliveryTracker.DETECT_FIXED_DELAY_LOOKAHEAD_MESSAGES * 10); tracker.getScheduledMessages(100); + assertFalse(tracker.shouldPauseAllDeliveries()); // Empty the tracker @@ -357,20 +347,12 @@ public void testWithFixedDelays() throws Exception { } while (removed > 0); assertFalse(tracker.shouldPauseAllDeliveries()); - } - - @Test - public void testWithMixedDelays() throws Exception { - PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class); - - AtomicLong clockTime = new AtomicLong(); - Clock clock = mock(Clock.class); - when(clock.millis()).then(x -> clockTime.get()); - @Cleanup - InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock, - true); + tracker.close(); + } + @Test(dataProvider = "delayedTracker") + public void testWithMixedDelays(DelayedDeliveryTracker tracker) throws Exception { assertFalse(tracker.hasMessageAvailable()); assertTrue(tracker.addMessage(1, 1, 10)); @@ -388,23 +370,15 @@ public void testWithMixedDelays() throws Exception { assertTrue(tracker.shouldPauseAllDeliveries()); // Add message with earlier delivery time - assertTrue(tracker.addMessage(5, 5, 5)); + assertTrue(tracker.addMessage(5, 6, 5)); assertFalse(tracker.shouldPauseAllDeliveries()); - } - @Test - public void testWithNoDelays() throws Exception { - PersistentDispatcherMultipleConsumers dispatcher = mock(PersistentDispatcherMultipleConsumers.class); - - AtomicLong clockTime = new AtomicLong(); - Clock clock = mock(Clock.class); - when(clock.millis()).then(x -> clockTime.get()); - - @Cleanup - InMemoryDelayedDeliveryTracker tracker = new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, clock, - true); + tracker.close(); + } + @Test(dataProvider = "delayedTracker") + public void testWithNoDelays(DelayedDeliveryTracker tracker) throws Exception { assertFalse(tracker.hasMessageAvailable()); assertTrue(tracker.addMessage(1, 1, 10)); @@ -422,9 +396,11 @@ public void testWithNoDelays() throws Exception { assertTrue(tracker.shouldPauseAllDeliveries()); // Add message with no-delay - assertFalse(tracker.addMessage(5, 5, -1L)); + assertFalse(tracker.addMessage(5, 6, -1L)); assertFalse(tracker.shouldPauseAllDeliveries()); + + tracker.close(); } }