Skip to content

Commit

Permalink
Optimization protobuf code
Browse files Browse the repository at this point in the history
  • Loading branch information
coderzc committed Apr 20, 2023
1 parent 2b41e4e commit aec64c3
Show file tree
Hide file tree
Showing 15 changed files with 170 additions and 166 deletions.
2 changes: 2 additions & 0 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@
<excludes>
<exclude>**/ResourceUsage.proto</exclude>
<exclude>**/TransactionPendingAck.proto</exclude>
<exclude>**/DelayedMessageIndexBucketSegment.proto</exclude>
</excludes>
</configuration>
<executions>
Expand Down Expand Up @@ -610,6 +611,7 @@
<sources>
<source>${project.basedir}/src/main/proto/TransactionPendingAck.proto</source>
<source>${project.basedir}/src/main/proto/ResourceUsage.proto</source>
<source>${project.basedir}/src/main/proto/DelayedMessageIndexBucketSegment.proto</source>
</sources>
<targetSourcesSubDir>generated-sources/lightproto/java</targetSourcesSubDir>
<targetTestSourcesSubDir>generated-sources/lightproto/java</targetTestSourcesSubDir>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.pulsar.broker.delayed.bucket;

import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
Expand All @@ -36,8 +36,8 @@
import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
import org.apache.pulsar.common.util.FutureUtil;

@Slf4j
Expand Down Expand Up @@ -126,23 +126,22 @@ private CompletableFuture<Void> addSnapshotSegments(LedgerHandle ledgerHandle,

private SnapshotMetadata parseSnapshotMetadataEntry(LedgerEntry ledgerEntry) {
try {
return SnapshotMetadata.parseFrom(ledgerEntry.getEntry());
ByteBuf entryBuffer = ledgerEntry.getEntryBuffer();
return SnapshotMetadata.parseFrom(entryBuffer.nioBuffer());
} catch (InvalidProtocolBufferException e) {
throw new BucketSnapshotSerializationException(e);
}
}

private List<SnapshotSegment> parseSnapshotSegmentEntries(Enumeration<LedgerEntry> entryEnumeration) {
List<SnapshotSegment> snapshotMetadataList = new ArrayList<>();
try {
while (entryEnumeration.hasMoreElements()) {
LedgerEntry ledgerEntry = entryEnumeration.nextElement();
snapshotMetadataList.add(SnapshotSegment.parseFrom(ledgerEntry.getEntry()));
}
return snapshotMetadataList;
} catch (IOException e) {
throw new BucketSnapshotSerializationException(e);
while (entryEnumeration.hasMoreElements()) {
LedgerEntry ledgerEntry = entryEnumeration.nextElement();
SnapshotSegment snapshotSegment = new SnapshotSegment();
snapshotSegment.parseFrom(ledgerEntry.getEntryBuffer(), ledgerEntry.getEntryBuffer().readableBytes());
snapshotMetadataList.add(snapshotSegment);
}
return snapshotMetadataList;
}

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.roaringbitmap.RoaringBitmap;
Expand Down Expand Up @@ -132,8 +133,8 @@ long getAndUpdateBucketId() {
}

CompletableFuture<Long> asyncSaveBucketSnapshot(
ImmutableBucket bucket, DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata,
List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments) {
ImmutableBucket bucket, SnapshotMetadata snapshotMetadata,
List<SnapshotSegment> bucketSnapshotSegments) {
final String bucketKey = bucket.bucketKey();
final String cursorName = Codec.decode(cursor.getName());
final String topicName = dispatcherName.substring(0, dispatcherName.lastIndexOf(" / " + cursorName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -286,8 +286,7 @@ private void afterCreateImmutableBucket(Pair<ImmutableBucket, DelayedIndex> immu
// Put indexes back into the shared queue and downgrade to memory mode
synchronized (BucketDelayedDeliveryTracker.this) {
immutableBucket.getSnapshotSegments().ifPresent(snapshotSegments -> {
for (DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment :
snapshotSegments) {
for (SnapshotSegment snapshotSegment : snapshotSegments) {
for (DelayedIndex delayedIndex : snapshotSegment.getIndexesList()) {
sharedBucketPriorityQueue.add(delayedIndex.getTimestamp(),
delayedIndex.getLedgerId(), delayedIndex.getEntryId());
Expand Down Expand Up @@ -450,7 +449,7 @@ private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot(List<Immut
return FutureUtil.failedFuture(new RuntimeException("Can't merge buckets due to bucket create failed"));
}

List<CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>>> getRemainFutures =
List<CompletableFuture<List<SnapshotSegment>>> getRemainFutures =
buckets.stream().map(ImmutableBucket::getRemainSnapshotSegment).toList();

return FutureUtil.waitForAll(getRemainFutures)
Expand Down Expand Up @@ -601,11 +600,11 @@ public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessa
bucket.asyncDeleteBucketSnapshot(stats);
return;
}
DelayedMessageIndexBucketSnapshotFormat.DelayedIndex
DelayedIndex
lastDelayedIndex = indexList.get(indexList.size() - 1);
this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(),
lastDelayedIndex.getEntryId(), bucket);
for (DelayedMessageIndexBucketSnapshotFormat.DelayedIndex index : indexList) {
for (DelayedIndex index : indexList) {
sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(),
index.getEntryId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;

public interface BucketSnapshotStorage {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import java.util.PriorityQueue;
import javax.annotation.concurrent.NotThreadSafe;
import lombok.AllArgsConstructor;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;

@NotThreadSafe
class CombinedSegmentDelayedIndexQueue implements DelayedIndexQueue {
Expand All @@ -40,8 +40,8 @@ static class Node {
}

private static final Comparator<Node> COMPARATOR_NODE = (node1, node2) -> DelayedIndexQueue.COMPARATOR.compare(
node1.segmentList.get(node1.segmentListCursor).getIndexes(node1.segmentCursor),
node2.segmentList.get(node2.segmentListCursor).getIndexes(node2.segmentCursor));
node1.segmentList.get(node1.segmentListCursor).getIndexeAt(node1.segmentCursor),
node2.segmentList.get(node2.segmentListCursor).getIndexeAt(node2.segmentCursor));

private final PriorityQueue<Node> kpq;

Expand Down Expand Up @@ -77,7 +77,7 @@ private DelayedIndex getValue(boolean needAdvanceCursor) {
Objects.requireNonNull(node);

SnapshotSegment snapshotSegment = node.segmentList.get(node.segmentListCursor);
DelayedIndex delayedIndex = snapshotSegment.getIndexes(node.segmentCursor);
DelayedIndex delayedIndex = snapshotSegment.getIndexeAt(node.segmentCursor);
if (!needAdvanceCursor) {
return delayedIndex;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

import java.util.Comparator;
import java.util.Objects;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
import org.apache.pulsar.broker.delayed.proto.DelayedIndex;

interface DelayedIndexQueue {
Comparator<DelayedMessageIndexBucketSnapshotFormat.DelayedIndex> COMPARATOR = (o1, o2) -> {
Comparator<DelayedIndex> COMPARATOR = (o1, o2) -> {
if (!Objects.equals(o1.getTimestamp(), o2.getTimestamp())) {
return Long.compare(o1.getTimestamp(), o2.getTimestamp());
} else if (!Objects.equals(o1.getLedgerId(), o2.getLedgerId())) {
Expand All @@ -35,7 +35,7 @@ interface DelayedIndexQueue {

boolean isEmpty();

DelayedMessageIndexBucketSnapshotFormat.DelayedIndex peek();
DelayedIndex peek();

DelayedMessageIndexBucketSnapshotFormat.DelayedIndex pop();
DelayedIndex pop();
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata;
import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegmentMetadata;
import org.apache.pulsar.common.util.FutureUtil;
import org.roaringbitmap.InvalidRoaringFormat;
import org.roaringbitmap.RoaringBitmap;
Expand All @@ -43,7 +43,7 @@
class ImmutableBucket extends Bucket {

@Setter
private List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> snapshotSegments;
private List<SnapshotSegment> snapshotSegments;

boolean merging = false;

Expand All @@ -55,7 +55,7 @@ class ImmutableBucket extends Bucket {
super(dispatcherName, cursor, sequencer, storage, startLedgerId, endLedgerId);
}

public Optional<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>> getSnapshotSegments() {
public Optional<List<SnapshotSegment>> getSnapshotSegments() {
return Optional.ofNullable(snapshotSegments);
}

Expand Down Expand Up @@ -84,7 +84,7 @@ private CompletableFuture<List<DelayedIndex>> asyncLoadNextBucketSnapshotEntry(b
}
}), BucketSnapshotPersistenceException.class, MaxRetryTimes)
.thenApply(snapshotMetadata -> {
List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata> metadataList =
List<SnapshotSegmentMetadata> metadataList =
snapshotMetadata.getMetadataListList();

// Skip all already reach schedule time snapshot segments
Expand Down Expand Up @@ -125,10 +125,9 @@ private CompletableFuture<List<DelayedIndex>> asyncLoadNextBucketSnapshotEntry(b
return Collections.emptyList();
}

DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment =
SnapshotSegment snapshotSegment =
bucketSnapshotSegments.get(0);
List<DelayedMessageIndexBucketSnapshotFormat.DelayedIndex> indexList =
snapshotSegment.getIndexesList();
List<DelayedIndex> indexList = snapshotSegment.getIndexesList();
this.setCurrentSegmentEntryId(nextSegmentEntryId);
if (isRecover) {
this.asyncUpdateSnapshotLength();
Expand Down Expand Up @@ -171,7 +170,7 @@ private void recoverDelayedIndexBitMapAndNumber(int startSnapshotIndex,
setNumberBucketDelayedMessages(numberMessages.getValue());
}

CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>> getRemainSnapshotSegment() {
CompletableFuture<List<SnapshotSegment>> getRemainSnapshotSegment() {
int nextSegmentEntryId = currentSegmentEntryId + 1;
if (nextSegmentEntryId > lastSegmentEntryId) {
return CompletableFuture.completedFuture(Collections.emptyList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata;
import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegmentMetadata;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
import org.roaringbitmap.RoaringBitmap;
Expand Down Expand Up @@ -75,7 +75,7 @@ Pair<ImmutableBucket, DelayedIndex> createImmutableBucketAndAsyncPersistent(
List<SnapshotSegment> bucketSnapshotSegments = new ArrayList<>();
List<SnapshotSegmentMetadata> segmentMetadataList = new ArrayList<>();
Map<Long, RoaringBitmap> bitMap = new HashMap<>();
SnapshotSegment.Builder snapshotSegmentBuilder = SnapshotSegment.newBuilder();
SnapshotSegment snapshotSegment = new SnapshotSegment();
SnapshotSegmentMetadata.Builder segmentMetadataBuilder = SnapshotSegmentMetadata.newBuilder();

List<Long> firstScheduleTimestamps = new ArrayList<>();
Expand Down Expand Up @@ -105,11 +105,11 @@ Pair<ImmutableBucket, DelayedIndex> createImmutableBucketAndAsyncPersistent(

bitMap.computeIfAbsent(ledgerId, k -> new RoaringBitmap()).add(entryId, entryId + 1);

snapshotSegmentBuilder.addIndexes(delayedIndex);
snapshotSegment.addIndexe().copyFrom(delayedIndex);

if (delayedIndexQueue.isEmpty() || delayedIndexQueue.peek().getTimestamp() > currentTimestampUpperLimit
|| (maxIndexesPerBucketSnapshotSegment != -1
&& snapshotSegmentBuilder.getIndexesCount() >= maxIndexesPerBucketSnapshotSegment)) {
&& snapshotSegment.getIndexesCount() >= maxIndexesPerBucketSnapshotSegment)) {
segmentMetadataBuilder.setMaxScheduleTimestamp(timestamp);
segmentMetadataBuilder.setMinScheduleTimestamp(currentFirstTimestamp);
currentTimestampUpperLimit = 0;
Expand All @@ -129,8 +129,8 @@ Pair<ImmutableBucket, DelayedIndex> createImmutableBucketAndAsyncPersistent(
segmentMetadataList.add(segmentMetadataBuilder.build());
segmentMetadataBuilder.clear();

bucketSnapshotSegments.add(snapshotSegmentBuilder.build());
snapshotSegmentBuilder.clear();
bucketSnapshotSegments.add(snapshotSegment);
snapshotSegment = new SnapshotSegment();
}
}

Expand All @@ -153,8 +153,8 @@ Pair<ImmutableBucket, DelayedIndex> createImmutableBucketAndAsyncPersistent(

// Add the first snapshot segment last message to snapshotSegmentLastMessageTable
checkArgument(!bucketSnapshotSegments.isEmpty());
SnapshotSegment snapshotSegment = bucketSnapshotSegments.get(0);
DelayedIndex lastDelayedIndex = snapshotSegment.getIndexes(snapshotSegment.getIndexesCount() - 1);
SnapshotSegment firstSnapshotSegment = bucketSnapshotSegments.get(0);
DelayedIndex lastDelayedIndex = firstSnapshotSegment.getIndexeAt(snapshotSegment.getIndexesCount() - 1);
Pair<ImmutableBucket, DelayedIndex> result = Pair.of(bucket, lastDelayedIndex);

CompletableFuture<Long> future = asyncSaveBucketSnapshot(bucket,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.pulsar.broker.delayed.bucket;

import javax.annotation.concurrent.NotThreadSafe;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;

@NotThreadSafe
Expand All @@ -41,16 +41,15 @@ public boolean isEmpty() {
}

@Override
public DelayedMessageIndexBucketSnapshotFormat.DelayedIndex peek() {
DelayedMessageIndexBucketSnapshotFormat.DelayedIndex delayedIndex =
DelayedMessageIndexBucketSnapshotFormat.DelayedIndex.newBuilder().setTimestamp(queue.peekN1())
.setLedgerId(queue.peekN2()).setEntryId(queue.peekN3()).build();
public DelayedIndex peek() {
DelayedIndex delayedIndex = new DelayedIndex().setTimestamp(queue.peekN1())
.setLedgerId(queue.peekN2()).setEntryId(queue.peekN3());
return delayedIndex;
}

@Override
public DelayedMessageIndexBucketSnapshotFormat.DelayedIndex pop() {
DelayedMessageIndexBucketSnapshotFormat.DelayedIndex peek = peek();
public DelayedIndex pop() {
DelayedIndex peek = peek();
queue.pop();
return peek;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,14 @@ syntax = "proto2";
package pulsar.delay;
option java_package = "org.apache.pulsar.broker.delayed.proto";
option optimize_for = SPEED;

message DelayedIndex {
required uint64 timestamp = 1;
required uint64 ledger_id = 2;
required uint64 entry_id = 3;
}
option java_multiple_files = true;

message SnapshotSegmentMetadata {
map<uint64, bytes> delayed_index_bit_map = 1;
required uint64 max_schedule_timestamp = 2;
required uint64 min_schedule_timestamp = 3;
}

message SnapshotSegment {
repeated DelayedIndex indexes = 1;
}

message SnapshotMetadata {
repeated SnapshotSegmentMetadata metadata_list = 1;
}
Loading

0 comments on commit aec64c3

Please sign in to comment.