Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Optimization protobuf code in the bucket delayed tracker #20158

Merged
merged 3 commits into from
Apr 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@
<excludes>
<exclude>**/ResourceUsage.proto</exclude>
<exclude>**/TransactionPendingAck.proto</exclude>
<exclude>**/DelayedMessageIndexBucketSegment.proto</exclude>
</excludes>
</configuration>
<executions>
Expand Down Expand Up @@ -610,6 +611,7 @@
<sources>
<source>${project.basedir}/src/main/proto/TransactionPendingAck.proto</source>
<source>${project.basedir}/src/main/proto/ResourceUsage.proto</source>
<source>${project.basedir}/src/main/proto/DelayedMessageIndexBucketSegment.proto</source>
</sources>
<targetSourcesSubDir>generated-sources/lightproto/java</targetSourcesSubDir>
<targetTestSourcesSubDir>generated-sources/lightproto/java</targetTestSourcesSubDir>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.pulsar.broker.delayed.bucket;

import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
Expand All @@ -36,8 +36,8 @@
import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
import org.apache.pulsar.common.util.FutureUtil;

@Slf4j
Expand Down Expand Up @@ -126,23 +126,23 @@ private CompletableFuture<Void> addSnapshotSegments(LedgerHandle ledgerHandle,

private SnapshotMetadata parseSnapshotMetadataEntry(LedgerEntry ledgerEntry) {
try {
return SnapshotMetadata.parseFrom(ledgerEntry.getEntry());
ByteBuf entryBuffer = ledgerEntry.getEntryBuffer();
return SnapshotMetadata.parseFrom(entryBuffer.nioBuffer());
} catch (InvalidProtocolBufferException e) {
throw new BucketSnapshotSerializationException(e);
}
}

private List<SnapshotSegment> parseSnapshotSegmentEntries(Enumeration<LedgerEntry> entryEnumeration) {
List<SnapshotSegment> snapshotMetadataList = new ArrayList<>();
try {
while (entryEnumeration.hasMoreElements()) {
LedgerEntry ledgerEntry = entryEnumeration.nextElement();
snapshotMetadataList.add(SnapshotSegment.parseFrom(ledgerEntry.getEntry()));
}
return snapshotMetadataList;
} catch (IOException e) {
throw new BucketSnapshotSerializationException(e);
while (entryEnumeration.hasMoreElements()) {
LedgerEntry ledgerEntry = entryEnumeration.nextElement();
SnapshotSegment snapshotSegment = new SnapshotSegment();
ByteBuf entryBuffer = ledgerEntry.getEntryBuffer();
snapshotSegment.parseFrom(entryBuffer, entryBuffer.readableBytes());
snapshotMetadataList.add(snapshotSegment);
}
return snapshotMetadataList;
}

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.roaringbitmap.RoaringBitmap;
Expand Down Expand Up @@ -132,8 +133,8 @@ long getAndUpdateBucketId() {
}

CompletableFuture<Long> asyncSaveBucketSnapshot(
ImmutableBucket bucket, DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata,
List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments) {
ImmutableBucket bucket, SnapshotMetadata snapshotMetadata,
List<SnapshotSegment> bucketSnapshotSegments) {
final String bucketKey = bucket.bucketKey();
final String cursorName = Codec.decode(cursor.getName());
final String topicName = dispatcherName.substring(0, dispatcherName.lastIndexOf(" / " + cursorName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -286,8 +286,7 @@ private void afterCreateImmutableBucket(Pair<ImmutableBucket, DelayedIndex> immu
// Put indexes back into the shared queue and downgrade to memory mode
synchronized (BucketDelayedDeliveryTracker.this) {
immutableBucket.getSnapshotSegments().ifPresent(snapshotSegments -> {
for (DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment :
snapshotSegments) {
for (SnapshotSegment snapshotSegment : snapshotSegments) {
for (DelayedIndex delayedIndex : snapshotSegment.getIndexesList()) {
sharedBucketPriorityQueue.add(delayedIndex.getTimestamp(),
delayedIndex.getLedgerId(), delayedIndex.getEntryId());
Expand Down Expand Up @@ -450,7 +449,7 @@ private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot(List<Immut
return FutureUtil.failedFuture(new RuntimeException("Can't merge buckets due to bucket create failed"));
}

List<CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>>> getRemainFutures =
List<CompletableFuture<List<SnapshotSegment>>> getRemainFutures =
buckets.stream().map(ImmutableBucket::getRemainSnapshotSegment).toList();

return FutureUtil.waitForAll(getRemainFutures)
Expand Down Expand Up @@ -601,11 +600,11 @@ public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessa
bucket.asyncDeleteBucketSnapshot(stats);
return;
}
DelayedMessageIndexBucketSnapshotFormat.DelayedIndex
DelayedIndex
lastDelayedIndex = indexList.get(indexList.size() - 1);
this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(),
lastDelayedIndex.getEntryId(), bucket);
for (DelayedMessageIndexBucketSnapshotFormat.DelayedIndex index : indexList) {
for (DelayedIndex index : indexList) {
sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(),
index.getEntryId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;

public interface BucketSnapshotStorage {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import java.util.PriorityQueue;
import javax.annotation.concurrent.NotThreadSafe;
import lombok.AllArgsConstructor;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;

@NotThreadSafe
class CombinedSegmentDelayedIndexQueue implements DelayedIndexQueue {
Expand All @@ -40,8 +40,8 @@ static class Node {
}

private static final Comparator<Node> COMPARATOR_NODE = (node1, node2) -> DelayedIndexQueue.COMPARATOR.compare(
node1.segmentList.get(node1.segmentListCursor).getIndexes(node1.segmentCursor),
node2.segmentList.get(node2.segmentListCursor).getIndexes(node2.segmentCursor));
node1.segmentList.get(node1.segmentListCursor).getIndexeAt(node1.segmentCursor),
node2.segmentList.get(node2.segmentListCursor).getIndexeAt(node2.segmentCursor));

private final PriorityQueue<Node> kpq;

Expand Down Expand Up @@ -77,7 +77,7 @@ private DelayedIndex getValue(boolean needAdvanceCursor) {
Objects.requireNonNull(node);

SnapshotSegment snapshotSegment = node.segmentList.get(node.segmentListCursor);
DelayedIndex delayedIndex = snapshotSegment.getIndexes(node.segmentCursor);
DelayedIndex delayedIndex = snapshotSegment.getIndexeAt(node.segmentCursor);
if (!needAdvanceCursor) {
return delayedIndex;
}
Expand All @@ -104,4 +104,16 @@ private DelayedIndex getValue(boolean needAdvanceCursor) {

return delayedIndex;
}

@Override
public void popToObject(DelayedIndex delayedIndex) {
DelayedIndex value = getValue(true);
delayedIndex.copyFrom(value);
}

@Override
public long peekTimestamp() {
DelayedIndex value = getValue(false);
return value.getTimestamp();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

import java.util.Comparator;
import java.util.Objects;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
import org.apache.pulsar.broker.delayed.proto.DelayedIndex;

interface DelayedIndexQueue {
Comparator<DelayedMessageIndexBucketSnapshotFormat.DelayedIndex> COMPARATOR = (o1, o2) -> {
Comparator<DelayedIndex> COMPARATOR = (o1, o2) -> {
if (!Objects.equals(o1.getTimestamp(), o2.getTimestamp())) {
return Long.compare(o1.getTimestamp(), o2.getTimestamp());
} else if (!Objects.equals(o1.getLedgerId(), o2.getLedgerId())) {
Expand All @@ -35,7 +35,11 @@ interface DelayedIndexQueue {

boolean isEmpty();

DelayedMessageIndexBucketSnapshotFormat.DelayedIndex peek();
DelayedIndex peek();

DelayedMessageIndexBucketSnapshotFormat.DelayedIndex pop();
DelayedIndex pop();

void popToObject(DelayedIndex delayedIndex);

long peekTimestamp();
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata;
import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegmentMetadata;
import org.apache.pulsar.common.util.FutureUtil;
import org.roaringbitmap.InvalidRoaringFormat;
import org.roaringbitmap.RoaringBitmap;
Expand All @@ -43,7 +43,7 @@
class ImmutableBucket extends Bucket {

@Setter
private List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> snapshotSegments;
private List<SnapshotSegment> snapshotSegments;

boolean merging = false;

Expand All @@ -55,7 +55,7 @@ class ImmutableBucket extends Bucket {
super(dispatcherName, cursor, sequencer, storage, startLedgerId, endLedgerId);
}

public Optional<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>> getSnapshotSegments() {
public Optional<List<SnapshotSegment>> getSnapshotSegments() {
return Optional.ofNullable(snapshotSegments);
}

Expand Down Expand Up @@ -84,7 +84,7 @@ private CompletableFuture<List<DelayedIndex>> asyncLoadNextBucketSnapshotEntry(b
}
}), BucketSnapshotPersistenceException.class, MaxRetryTimes)
.thenApply(snapshotMetadata -> {
List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata> metadataList =
List<SnapshotSegmentMetadata> metadataList =
snapshotMetadata.getMetadataListList();

// Skip all already reach schedule time snapshot segments
Expand Down Expand Up @@ -125,10 +125,9 @@ private CompletableFuture<List<DelayedIndex>> asyncLoadNextBucketSnapshotEntry(b
return Collections.emptyList();
}

DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment =
SnapshotSegment snapshotSegment =
bucketSnapshotSegments.get(0);
List<DelayedMessageIndexBucketSnapshotFormat.DelayedIndex> indexList =
snapshotSegment.getIndexesList();
List<DelayedIndex> indexList = snapshotSegment.getIndexesList();
this.setCurrentSegmentEntryId(nextSegmentEntryId);
if (isRecover) {
this.asyncUpdateSnapshotLength();
Expand Down Expand Up @@ -171,7 +170,7 @@ private void recoverDelayedIndexBitMapAndNumber(int startSnapshotIndex,
setNumberBucketDelayedMessages(numberMessages.getValue());
}

CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>> getRemainSnapshotSegment() {
CompletableFuture<List<SnapshotSegment>> getRemainSnapshotSegment() {
int nextSegmentEntryId = currentSegmentEntryId + 1;
if (nextSegmentEntryId > lastSegmentEntryId) {
return CompletableFuture.completedFuture(Collections.emptyList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata;
import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegmentMetadata;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
import org.roaringbitmap.RoaringBitmap;
Expand Down Expand Up @@ -75,14 +75,16 @@ Pair<ImmutableBucket, DelayedIndex> createImmutableBucketAndAsyncPersistent(
List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
List<SnapshotSegmentMetadata> segmentMetadataList = new ArrayList<>();
Map<Long, RoaringBitmap> bitMap = new HashMap<>();
SnapshotSegment.Builder snapshotSegmentBuilder = SnapshotSegment.newBuilder();
SnapshotSegment snapshotSegment = new SnapshotSegment();
SnapshotSegmentMetadata.Builder segmentMetadataBuilder = SnapshotSegmentMetadata.newBuilder();

List<Long> firstScheduleTimestamps = new ArrayList<>();
long currentTimestampUpperLimit = 0;
long currentFirstTimestamp = 0L;
while (!delayedIndexQueue.isEmpty()) {
DelayedIndex delayedIndex = delayedIndexQueue.peek();
DelayedIndex delayedIndex = snapshotSegment.addIndexe();
delayedIndexQueue.popToObject(delayedIndex);

long timestamp = delayedIndex.getTimestamp();
if (currentTimestampUpperLimit == 0) {
currentFirstTimestamp = timestamp;
Expand All @@ -100,16 +102,13 @@ Pair<ImmutableBucket, DelayedIndex> createImmutableBucketAndAsyncPersistent(
sharedQueue.add(timestamp, ledgerId, entryId);
}

delayedIndexQueue.pop();
numMessages++;

bitMap.computeIfAbsent(ledgerId, k -> new RoaringBitmap()).add(entryId, entryId + 1);

snapshotSegmentBuilder.addIndexes(delayedIndex);

if (delayedIndexQueue.isEmpty() || delayedIndexQueue.peek().getTimestamp() > currentTimestampUpperLimit
if (delayedIndexQueue.isEmpty() || delayedIndexQueue.peekTimestamp() > currentTimestampUpperLimit
|| (maxIndexesPerBucketSnapshotSegment != -1
&& snapshotSegmentBuilder.getIndexesCount() >= maxIndexesPerBucketSnapshotSegment)) {
&& snapshotSegment.getIndexesCount() >= maxIndexesPerBucketSnapshotSegment)) {
segmentMetadataBuilder.setMaxScheduleTimestamp(timestamp);
segmentMetadataBuilder.setMinScheduleTimestamp(currentFirstTimestamp);
currentTimestampUpperLimit = 0;
Expand All @@ -129,8 +128,8 @@ Pair<ImmutableBucket, DelayedIndex> createImmutableBucketAndAsyncPersistent(
segmentMetadataList.add(segmentMetadataBuilder.build());
segmentMetadataBuilder.clear();

bucketSnapshotSegments.add(snapshotSegmentBuilder.build());
snapshotSegmentBuilder.clear();
bucketSnapshotSegments.add(snapshotSegment);
snapshotSegment = new SnapshotSegment();
}
}

Expand All @@ -153,8 +152,8 @@ Pair<ImmutableBucket, DelayedIndex> createImmutableBucketAndAsyncPersistent(

// Add the first snapshot segment last message to snapshotSegmentLastMessageTable
checkArgument(!bucketSnapshotSegments.isEmpty());
SnapshotSegment snapshotSegment = bucketSnapshotSegments.get(0);
DelayedIndex lastDelayedIndex = snapshotSegment.getIndexes(snapshotSegment.getIndexesCount() - 1);
SnapshotSegment firstSnapshotSegment = bucketSnapshotSegments.get(0);
DelayedIndex lastDelayedIndex = firstSnapshotSegment.getIndexeAt(firstSnapshotSegment.getIndexesCount() - 1);
Pair<ImmutableBucket, DelayedIndex> result = Pair.of(bucket, lastDelayedIndex);

CompletableFuture<Long> future = asyncSaveBucketSnapshot(bucket,
Expand Down
Loading