Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][broker][PIP-195] Add metrics for bucket delayed message tracker #19716

Merged
merged 11 commits into from
Mar 30, 2023
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
private int delayedDeliveryMaxTimeStepPerBucketSnapshotSegmentSeconds = 300;

@FieldContext(category = CATEGORY_SERVER, doc = """
The max number of delayed message index in per bucket snapshot segment, -1 means no limitation\
The max number of delayed message index in per bucket snapshot segment, -1 means no limitation, \
after reaching the max number limitation, the snapshot segment will be cut off.""")
private int delayedDeliveryMaxIndexesPerBucketSnapshotSegment = 5000;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,6 @@ public interface DelayedDeliveryTracker extends AutoCloseable {
*/
boolean shouldPauseAllDeliveries();

/**
* Tells whether this DelayedDeliveryTracker contains this message index,
* if the tracker is not supported it or disabled this feature also will return false.
*/
boolean containsMessage(long ledgerId, long entryId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we remove this method in this PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry this addresses a previous comment.
See: #17677 (review)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the context!


/**
* Reset tick time use zk policies cache.
* @param tickTime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,6 @@ && getNumberOfDelayedMessages() >= fixedDelayDetectionLookahead
&& !hasMessageAvailable();
}

@Override
public boolean containsMessage(long ledgerId, long entryId) {
return false;
}

protected long nextDeliveryTime() {
return priorityQueue.peekN1();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,25 +67,21 @@ public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMet
@Override
public CompletableFuture<SnapshotMetadata> getBucketSnapshotMetadata(long bucketId) {
return openLedger(bucketId).thenCompose(
ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, 0, 0).
ledgerHandle -> getLedgerEntry(ledgerHandle, 0, 0).
thenApply(entryEnumeration -> parseSnapshotMetadataEntry(entryEnumeration.nextElement())));
}

@Override
public CompletableFuture<List<SnapshotSegment>> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId,
long lastSegmentEntryId) {
return openLedger(bucketId).thenCompose(
ledgerHandle -> getLedgerEntryThenCloseLedger(ledgerHandle, firstSegmentEntryId,
ledgerHandle -> getLedgerEntry(ledgerHandle, firstSegmentEntryId,
lastSegmentEntryId).thenApply(this::parseSnapshotSegmentEntries));
}

@Override
public CompletableFuture<Long> getBucketSnapshotLength(long bucketId) {
return openLedger(bucketId).thenApply(ledgerHandle -> {
long length = ledgerHandle.getLength();
closeLedger(ledgerHandle);
return length;
});
return openLedger(bucketId).thenApply(LedgerHandle::getLength);
}

@Override
Expand Down Expand Up @@ -212,8 +208,8 @@ private CompletableFuture<Void> addEntry(LedgerHandle ledgerHandle, byte[] data)
});
}

CompletableFuture<Enumeration<LedgerEntry>> getLedgerEntryThenCloseLedger(LedgerHandle ledger,
long firstEntryId, long lastEntryId) {
CompletableFuture<Enumeration<LedgerEntry>> getLedgerEntry(LedgerHandle ledger,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ledger is already closed after createBucketSnapshot, so we don't need to close it again.

long firstEntryId, long lastEntryId) {
final CompletableFuture<Enumeration<LedgerEntry>> future = new CompletableFuture<>();
ledger.asyncReadEntries(firstEntryId, lastEntryId,
(rc, handle, entries, ctx) -> {
Expand All @@ -222,7 +218,6 @@ CompletableFuture<Enumeration<LedgerEntry>> getLedgerEntryThenCloseLedger(Ledger
} else {
future.complete(entries);
}
closeLedger(handle);
}, null
);
return future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
import org.roaringbitmap.RoaringBitmap;
Expand All @@ -69,6 +70,10 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker

static final int AsyncOperationTimeoutSeconds = 60;

private static final Long INVALID_BUCKET_ID = -1L;

private static final int MAX_MERGE_NUM = 4;

private final long minIndexCountPerBucket;

private final long timeStepPerBucketSnapshotSegmentInMillis;
Expand All @@ -93,9 +98,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker

private final Table<Long, Long, ImmutableBucket> snapshotSegmentLastIndexTable;

private static final Long INVALID_BUCKET_ID = -1L;

private static final int MAX_MERGE_NUM = 4;
private final BucketDelayedMessageIndexStats stats;

public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher,
Timer timer, long tickTimeMillis,
Expand Down Expand Up @@ -125,6 +128,7 @@ public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispat
this.lastMutableBucket =
new MutableBucket(dispatcher.getName(), dispatcher.getCursor(), FutureUtil.Sequencer.create(),
bucketSnapshotStorage);
this.stats = new BucketDelayedMessageIndexStats();
this.numberDelayedMessages = recoverBucketSnapshot();
}

Expand Down Expand Up @@ -161,8 +165,9 @@ private synchronized long recoverBucketSnapshot() throws RuntimeException {
}

try {
FutureUtil.waitForAll(futures.values()).get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS);
FutureUtil.waitForAll(futures.values()).get(AsyncOperationTimeoutSeconds * 2, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
log.error("[{}] Failed to recover delayed message index bucket snapshot.", dispatcher.getName(), e);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
Expand Down Expand Up @@ -193,7 +198,7 @@ private synchronized long recoverBucketSnapshot() throws RuntimeException {
ImmutableBucket immutableBucket = mapEntry.getValue();
immutableBucketMap.remove(key);
// delete asynchronously without waiting for completion
immutableBucket.asyncDeleteBucketSnapshot();
immutableBucket.asyncDeleteBucketSnapshot(stats);
}

MutableLong numberDelayedMessages = new MutableLong(0);
Expand Down Expand Up @@ -246,7 +251,8 @@ private Optional<ImmutableBucket> findImmutableBucket(long ledgerId) {
return Optional.ofNullable(immutableBuckets.get(ledgerId));
}

private void afterCreateImmutableBucket(Pair<ImmutableBucket, DelayedIndex> immutableBucketDelayedIndexPair) {
private void afterCreateImmutableBucket(Pair<ImmutableBucket, DelayedIndex> immutableBucketDelayedIndexPair,
long startTime) {
if (immutableBucketDelayedIndexPair != null) {
ImmutableBucket immutableBucket = immutableBucketDelayedIndexPair.getLeft();
immutableBuckets.put(Range.closed(immutableBucket.startLedgerId, immutableBucket.endLedgerId),
Expand All @@ -260,14 +266,19 @@ private void afterCreateImmutableBucket(Pair<ImmutableBucket, DelayedIndex> immu
CompletableFuture<Long> future = createFuture.handle((bucketId, ex) -> {
if (ex == null) {
immutableBucket.setSnapshotSegments(null);
immutableBucket.asyncUpdateSnapshotLength();
log.info("[{}] Creat bucket snapshot finish, bucketKey: {}", dispatcher.getName(),
immutableBucket.bucketKey());

stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.create,
System.currentTimeMillis() - startTime);

return bucketId;
}

//TODO Record create snapshot failed
log.error("[{}] Failed to create bucket snapshot, bucketKey: {}",
dispatcher.getName(), immutableBucket.bucketKey(), ex);
log.error("[{}] Failed to create bucket snapshot, bucketKey: {}", dispatcher.getName(),
immutableBucket.bucketKey(), ex);
stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.create);

// Put indexes back into the shared queue and downgrade to memory mode
synchronized (BucketDelayedDeliveryTracker.this) {
Expand Down Expand Up @@ -311,12 +322,14 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver
if (!existBucket && ledgerId > lastMutableBucket.endLedgerId
&& lastMutableBucket.size() >= minIndexCountPerBucket
&& !lastMutableBucket.isEmpty()) {
long createStartTime = System.currentTimeMillis();
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.create);
Pair<ImmutableBucket, DelayedIndex> immutableBucketDelayedIndexPair =
lastMutableBucket.sealBucketAndAsyncPersistent(
this.timeStepPerBucketSnapshotSegmentInMillis,
this.maxIndexesPerBucketSnapshotSegment,
this.sharedBucketPriorityQueue);
afterCreateImmutableBucket(immutableBucketDelayedIndexPair);
afterCreateImmutableBucket(immutableBucketDelayedIndexPair, createStartTime);
lastMutableBucket.resetLastMutableBucketRange();

if (immutableBuckets.asMapOfRanges().size() > maxNumBuckets) {
Expand Down Expand Up @@ -374,7 +387,7 @@ private synchronized List<ImmutableBucket> selectMergedBuckets(final List<Immuta
}

if (minIndex >= 0) {
return values.subList(minIndex, minIndex + MAX_MERGE_NUM);
return values.subList(minIndex, minIndex + mergeNum);
} else if (mergeNum > 2){
return selectMergedBuckets(values, mergeNum - 1);
} else {
Expand All @@ -400,6 +413,9 @@ private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot() {
for (ImmutableBucket immutableBucket : toBeMergeImmutableBuckets) {
immutableBucket.merging = true;
}

long mergeStartTime = System.currentTimeMillis();
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.merge);
return asyncMergeBucketSnapshot(toBeMergeImmutableBuckets).whenComplete((__, ex) -> {
synchronized (this) {
for (ImmutableBucket immutableBucket : toBeMergeImmutableBuckets) {
Expand All @@ -409,9 +425,14 @@ private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot() {
if (ex != null) {
log.error("[{}] Failed to merge bucket snapshot, bucketKeys: {}",
dispatcher.getName(), bucketsStr, ex);

stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.merge);
} else {
log.info("[{}] Merge bucket snapshot finish, bucketKeys: {}, bucketNum: {}",
dispatcher.getName(), bucketsStr, immutableBuckets.asMapOfRanges().size());

stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.merge,
System.currentTimeMillis() - mergeStartTime);
}
});
}
Expand All @@ -436,6 +457,8 @@ private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot(List<Immut
})
.thenAccept(combinedDelayedIndexQueue -> {
synchronized (BucketDelayedDeliveryTracker.this) {
long createStartTime = System.currentTimeMillis();
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.create);
Pair<ImmutableBucket, DelayedIndex> immutableBucketDelayedIndexPair =
lastMutableBucket.createImmutableBucketAndAsyncPersistent(
timeStepPerBucketSnapshotSegmentInMillis,
Expand All @@ -461,12 +484,12 @@ private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot(List<Immut
}
immutableBucketDelayedIndexPair.getLeft().setDelayedIndexBitMap(delayedIndexBitMap);

afterCreateImmutableBucket(immutableBucketDelayedIndexPair);
afterCreateImmutableBucket(immutableBucketDelayedIndexPair, createStartTime);

immutableBucketDelayedIndexPair.getLeft().getSnapshotCreateFuture()
.orElse(NULL_LONG_PROMISE).thenCompose(___ -> {
List<CompletableFuture<Void>> removeFutures =
buckets.stream().map(ImmutableBucket::asyncDeleteBucketSnapshot)
buckets.stream().map(bucket -> bucket.asyncDeleteBucketSnapshot(stats))
.toList();
return FutureUtil.waitForAll(removeFutures);
});
Expand Down Expand Up @@ -557,15 +580,17 @@ public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessa

if (bucket.currentSegmentEntryId == bucket.lastSegmentEntryId) {
immutableBuckets.asMapOfRanges().remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
bucket.asyncDeleteBucketSnapshot();
bucket.asyncDeleteBucketSnapshot(stats);
continue;
}

long loadStartTime = System.currentTimeMillis();
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load);
bucket.asyncLoadNextBucketSnapshotEntry().thenAccept(indexList -> {
if (CollectionUtils.isEmpty(indexList)) {
immutableBuckets.asMapOfRanges()
.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId));
bucket.asyncDeleteBucketSnapshot();
bucket.asyncDeleteBucketSnapshot(stats);
return;
}
DelayedMessageIndexBucketSnapshotFormat.DelayedIndex
Expand All @@ -583,9 +608,14 @@ public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessa

log.error("[{}] Failed to load bucket snapshot segment, bucketKey: {}, segmentEntryId: {}",
dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1, ex);

stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.load);
} else {
log.info("[{}] Load next bucket snapshot segment finish, bucketKey: {}, segmentEntryId: {}",
dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1);

stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.load,
System.currentTimeMillis() - loadStartTime);
}
}).get(AsyncOperationTimeoutSeconds * (MaxRetryTimes + 1), TimeUnit.SECONDS);
} catch (Exception e) {
Expand Down Expand Up @@ -645,7 +675,7 @@ private CompletableFuture<Void> cleanImmutableBuckets() {
Iterator<ImmutableBucket> iterator = immutableBuckets.asMapOfRanges().values().iterator();
while (iterator.hasNext()) {
ImmutableBucket bucket = iterator.next();
futures.add(bucket.clear());
futures.add(bucket.clear(stats));
numberDelayedMessages -= bucket.getNumberBucketDelayedMessages();
iterator.remove();
}
Expand All @@ -661,7 +691,6 @@ private boolean removeIndexBit(long ledgerId, long entryId) {
.orElse(false);
}

@Override
public boolean containsMessage(long ledgerId, long entryId) {
if (lastMutableBucket.containsMessage(ledgerId, entryId)) {
return true;
Expand All @@ -670,4 +699,15 @@ public boolean containsMessage(long ledgerId, long entryId) {
return findImmutableBucket(ledgerId).map(bucket -> bucket.containsMessage(ledgerId, entryId))
.orElse(false);
}

public Map<String, TopicMetricBean> genTopicMetricMap() {
stats.recordNumOfBuckets(immutableBuckets.asMapOfRanges().size() + 1);
stats.recordDelayedMessageIndexLoaded(this.sharedBucketPriorityQueue.size() + this.lastMutableBucket.size());
MutableLong totalSnapshotLength = new MutableLong();
immutableBuckets.asMapOfRanges().values().forEach(immutableBucket -> {
totalSnapshotLength.add(immutableBucket.getSnapshotLength());
});
stats.recordBucketSnapshotSizeBytes(totalSnapshotLength.longValue());
return stats.genTopicMetricMap();
}
}
Loading