Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker][PIP-195] Make bucket merge operation asynchronous #19873

Merged
merged 4 commits into from
Mar 25, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker

private final Table<Long, Long, ImmutableBucket> snapshotSegmentLastIndexTable;

private static final Long INVALID_BUCKET_ID = -1L;

public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher,
Timer timer, long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict,
Expand Down Expand Up @@ -246,12 +248,12 @@ private void afterCreateImmutableBucket(Pair<ImmutableBucket, DelayedIndex> immu
immutableBucket);

immutableBucket.getSnapshotCreateFuture().ifPresent(createFuture -> {
CompletableFuture<Long> future = createFuture.whenComplete((__, ex) -> {
CompletableFuture<Long> future = createFuture.handle((bucketId, ex) -> {
if (ex == null) {
immutableBucket.setSnapshotSegments(null);
log.info("[{}] Creat bucket snapshot finish, bucketKey: {}", dispatcher.getName(),
immutableBucket.bucketKey());
return;
return bucketId;
}

//TODO Record create snapshot failed
Expand All @@ -277,6 +279,7 @@ private void afterCreateImmutableBucket(Pair<ImmutableBucket, DelayedIndex> immu
snapshotSegmentLastIndexTable.remove(lastDelayedIndex.getLedgerId(),
lastDelayedIndex.getTimestamp());
}
return INVALID_BUCKET_ID;
});
immutableBucket.setSnapshotCreateFuture(future);
});
Expand Down Expand Up @@ -308,12 +311,7 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver
lastMutableBucket.resetLastMutableBucketRange();

if (immutableBuckets.asMapOfRanges().size() > maxNumBuckets) {
try {
asyncMergeBucketSnapshot().get(2 * AsyncOperationTimeoutSeconds * MaxRetryTimes, TimeUnit.SECONDS);
} catch (Exception e) {
// Ignore exception to merge bucket on the next schedule.
log.error("[{}] An exception occurs when merge bucket snapshot.", dispatcher.getName(), e);
}
asyncMergeBucketSnapshot();
}
}

Expand Down Expand Up @@ -341,18 +339,26 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver
private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot() {
List<ImmutableBucket> values = immutableBuckets.asMapOfRanges().values().stream().toList();
long minNumberMessages = Long.MAX_VALUE;
long minScheduleTimestamp = Long.MAX_VALUE;
int minIndex = -1;
for (int i = 0; i + 1 < values.size(); i++) {
ImmutableBucket bucketL = values.get(i);
ImmutableBucket bucketR = values.get(i + 1);
long numberMessages = bucketL.numberBucketDelayedMessages + bucketR.numberBucketDelayedMessages;
if (numberMessages < minNumberMessages) {
minNumberMessages = (int) numberMessages;
if (bucketL.lastSegmentEntryId > bucketL.getCurrentSegmentEntryId()
&& bucketR.lastSegmentEntryId > bucketR.getCurrentSegmentEntryId()
&& bucketL.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone()
&& bucketR.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone()) {
minIndex = i;
// We should skip the bucket which last segment already been load to memory, avoid record replicated index.
if (bucketL.lastSegmentEntryId > bucketL.getCurrentSegmentEntryId()
poorbarcode marked this conversation as resolved.
Show resolved Hide resolved
&& bucketR.lastSegmentEntryId > bucketR.getCurrentSegmentEntryId()
// Skip the bucket that is merging
&& !bucketL.merging && !bucketR.merging){
long scheduleTimestamp =
Math.min(bucketL.firstScheduleTimestamps.get(bucketL.currentSegmentEntryId + 1),
bucketR.firstScheduleTimestamps.get(bucketR.currentSegmentEntryId + 1));
long numberMessages = bucketL.numberBucketDelayedMessages + bucketR.numberBucketDelayedMessages;
if (scheduleTimestamp <= minScheduleTimestamp) {
minScheduleTimestamp = scheduleTimestamp;
if (numberMessages < minNumberMessages) {
minNumberMessages = numberMessages;
minIndex = i;
}
}
}
}
Expand All @@ -369,7 +375,14 @@ private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot() {
log.info("[{}] Merging bucket snapshot, bucketAKey: {}, bucketBKey: {}", dispatcher.getName(),
immutableBucketA.bucketKey(), immutableBucketB.bucketKey());
}

immutableBucketA.merging = true;
immutableBucketB.merging = true;
return asyncMergeBucketSnapshot(immutableBucketA, immutableBucketB).whenComplete((__, ex) -> {
synchronized (this) {
immutableBucketA.merging = false;
immutableBucketB.merging = false;
}
if (ex != null) {
log.error("[{}] Failed to merge bucket snapshot, bucketAKey: {}, bucketBKey: {}",
dispatcher.getName(), immutableBucketA.bucketKey(), immutableBucketB.bucketKey(), ex);
Expand All @@ -382,46 +395,58 @@ private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot() {

private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot(ImmutableBucket bucketA,
ImmutableBucket bucketB) {
CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>> futureA =
bucketA.getRemainSnapshotSegment();
CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>> futureB =
bucketB.getRemainSnapshotSegment();
return futureA.thenCombine(futureB, CombinedSegmentDelayedIndexQueue::wrap)
.thenAccept(combinedDelayedIndexQueue -> {
Pair<ImmutableBucket, DelayedIndex> immutableBucketDelayedIndexPair =
lastMutableBucket.createImmutableBucketAndAsyncPersistent(
timeStepPerBucketSnapshotSegmentInMillis, maxIndexesPerBucketSnapshotSegment,
sharedBucketPriorityQueue, combinedDelayedIndexQueue, bucketA.startLedgerId,
bucketB.endLedgerId);

// Merge bit map to new bucket
Map<Long, RoaringBitmap> delayedIndexBitMapA = bucketA.getDelayedIndexBitMap();
Map<Long, RoaringBitmap> delayedIndexBitMapB = bucketB.getDelayedIndexBitMap();
Map<Long, RoaringBitmap> delayedIndexBitMap = new HashMap<>(delayedIndexBitMapA);
delayedIndexBitMapB.forEach((ledgerId, bitMapB) -> {
delayedIndexBitMap.compute(ledgerId, (k, bitMapA) -> {
if (bitMapA == null) {
return bitMapB;
}
CompletableFuture<Long> createAFuture = bucketA.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE);
CompletableFuture<Long> createBFuture = bucketB.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE);

bitMapA.or(bitMapB);
return bitMapA;
});
});
immutableBucketDelayedIndexPair.getLeft().setDelayedIndexBitMap(delayedIndexBitMap);

afterCreateImmutableBucket(immutableBucketDelayedIndexPair);
return CompletableFuture.allOf(createAFuture, createBFuture).thenCompose(bucketId -> {
if (INVALID_BUCKET_ID.equals(createAFuture.join()) || INVALID_BUCKET_ID.equals(createBFuture.join())) {
return FutureUtil.failedFuture(new RuntimeException("Can't merge buckets due to bucket create failed"));
}

immutableBucketDelayedIndexPair.getLeft().getSnapshotCreateFuture()
.orElse(NULL_LONG_PROMISE).thenCompose(___ -> {
CompletableFuture<Void> removeAFuture = bucketA.asyncDeleteBucketSnapshot();
CompletableFuture<Void> removeBFuture = bucketB.asyncDeleteBucketSnapshot();
return CompletableFuture.allOf(removeAFuture, removeBFuture);
CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>> futureA =
bucketA.getRemainSnapshotSegment();
CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>> futureB =
bucketB.getRemainSnapshotSegment();
return futureA.thenCombine(futureB, CombinedSegmentDelayedIndexQueue::wrap)
.thenAccept(combinedDelayedIndexQueue -> {
synchronized (BucketDelayedDeliveryTracker.this) {
Pair<ImmutableBucket, DelayedIndex> immutableBucketDelayedIndexPair =
lastMutableBucket.createImmutableBucketAndAsyncPersistent(
timeStepPerBucketSnapshotSegmentInMillis,
maxIndexesPerBucketSnapshotSegment,
sharedBucketPriorityQueue, combinedDelayedIndexQueue, bucketA.startLedgerId,
bucketB.endLedgerId);

// Merge bit map to new bucket
Map<Long, RoaringBitmap> delayedIndexBitMapA = bucketA.getDelayedIndexBitMap();
Map<Long, RoaringBitmap> delayedIndexBitMapB = bucketB.getDelayedIndexBitMap();
Map<Long, RoaringBitmap> delayedIndexBitMap = new HashMap<>(delayedIndexBitMapA);
delayedIndexBitMapB.forEach((ledgerId, bitMapB) -> {
delayedIndexBitMap.compute(ledgerId, (k, bitMapA) -> {
if (bitMapA == null) {
return bitMapB;
}

bitMapA.or(bitMapB);
return bitMapA;
});
});
immutableBucketDelayedIndexPair.getLeft().setDelayedIndexBitMap(delayedIndexBitMap);

afterCreateImmutableBucket(immutableBucketDelayedIndexPair);

immutableBucketDelayedIndexPair.getLeft().getSnapshotCreateFuture()
.orElse(NULL_LONG_PROMISE).thenCompose(___ -> {
CompletableFuture<Void> removeAFuture = bucketA.asyncDeleteBucketSnapshot();
CompletableFuture<Void> removeBFuture = bucketB.asyncDeleteBucketSnapshot();
return CompletableFuture.allOf(removeAFuture, removeBFuture);
});

immutableBuckets.remove(Range.closed(bucketA.startLedgerId, bucketA.endLedgerId));
immutableBuckets.remove(Range.closed(bucketB.startLedgerId, bucketB.endLedgerId));
}
});

immutableBuckets.remove(Range.closed(bucketA.startLedgerId, bucketA.endLedgerId));
immutableBuckets.remove(Range.closed(bucketB.startLedgerId, bucketB.endLedgerId));
});
});
}

@Override
Expand Down Expand Up @@ -477,6 +502,12 @@ public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessa

ImmutableBucket bucket = snapshotSegmentLastIndexTable.get(ledgerId, entryId);
if (bucket != null && immutableBuckets.asMapOfRanges().containsValue(bucket)) {
if (bucket.merging) {
log.info("[{}] Skip load to wait for bucket snapshot merge finish, bucketKey:{}",
dispatcher.getName(), bucket.bucketKey());
break;
}

final int preSegmentEntryId = bucket.currentSegmentEntryId;
if (log.isDebugEnabled()) {
log.debug("[{}] Loading next bucket snapshot segment, bucketKey: {}, nextSegmentEntryId: {}",
Expand Down Expand Up @@ -525,14 +556,14 @@ public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessa
dispatcher.getName(), bucket.bucketKey(), bucket.currentSegmentEntryId);
}
}).get(AsyncOperationTimeoutSeconds * MaxRetryTimes, TimeUnit.SECONDS);
snapshotSegmentLastIndexTable.remove(ledgerId, entryId);
} catch (Exception e) {
// Ignore exception to reload this segment on the next schedule.
log.error("[{}] An exception occurs when load next bucket snapshot, bucketKey:{}",
dispatcher.getName(), bucket.bucketKey(), e);
break;
}
}
snapshotSegmentLastIndexTable.remove(ledgerId, entryId);

positions.add(new PositionImpl(ledgerId, entryId));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker.AsyncOperationTimeoutSeconds;
import static org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker.NULL_LONG_PROMISE;
import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -44,7 +45,12 @@
class ImmutableBucket extends Bucket {

@Setter
private volatile List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> snapshotSegments;
private List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> snapshotSegments;

boolean merging = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the variable merging should be modified with volatile.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because all the read and write of merging is in synchronized block.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right, the document of J2SE describes this feature like this:

Intrinsic Locks and Synchronization

Synchronization is built around an internal entity known as the intrinsic lock or monitor lock. (The API specification often refers to this entity simply as a "monitor.") Intrinsic locks play a role in both aspects of synchronization: enforcing exclusive access to an object's state and establishing happens-before relationships that are essential to visibility.

Synchronized Methods

Second, when a synchronized method exits, it automatically establishes a happens-before relationship with any subsequent invocation of a synchronized method for the same object. This guarantees that changes to the state of the object are visible to all threads.


@Setter
List<Long> firstScheduleTimestamps = new ArrayList<>();

ImmutableBucket(String dispatcherName, ManagedCursor cursor,
BucketSnapshotStorage storage, long startLedgerId, long endLedgerId) {
Expand Down Expand Up @@ -92,6 +98,9 @@ private CompletableFuture<List<DelayedIndex>> asyncLoadNextBucketSnapshotEntry(b

this.setLastSegmentEntryId(metadataList.size());
this.recoverDelayedIndexBitMapAndNumber(nextSnapshotEntryIndex, metadataList);
List<Long> firstScheduleTimestamps = metadataList.stream().map(
SnapshotSegmentMetadata::getMinScheduleTimestamp).toList();
this.setFirstScheduleTimestamps(firstScheduleTimestamps);

return nextSnapshotEntryIndex + 1;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,15 @@ Pair<ImmutableBucket, DelayedIndex> createImmutableBucketAndAsyncPersistent(
SnapshotSegment.Builder snapshotSegmentBuilder = SnapshotSegment.newBuilder();
SnapshotSegmentMetadata.Builder segmentMetadataBuilder = SnapshotSegmentMetadata.newBuilder();

List<Long> firstScheduleTimestamps = new ArrayList<>();
long currentTimestampUpperLimit = 0;
long currentFirstTimestamp = 0L;
while (!delayedIndexQueue.isEmpty()) {
DelayedIndex delayedIndex = delayedIndexQueue.peek();
long timestamp = delayedIndex.getTimestamp();
if (currentTimestampUpperLimit == 0) {
currentFirstTimestamp = timestamp;
firstScheduleTimestamps.add(currentFirstTimestamp);
currentTimestampUpperLimit = timestamp + timeStepPerBucketSnapshotSegment - 1;
}

Expand All @@ -104,6 +108,7 @@ Pair<ImmutableBucket, DelayedIndex> createImmutableBucketAndAsyncPersistent(
|| (maxIndexesPerBucketSnapshotSegment != -1
&& snapshotSegmentBuilder.getIndexesCount() >= maxIndexesPerBucketSnapshotSegment)) {
segmentMetadataBuilder.setMaxScheduleTimestamp(timestamp);
segmentMetadataBuilder.setMinScheduleTimestamp(currentFirstTimestamp);
currentTimestampUpperLimit = 0;

Iterator<Map.Entry<Long, RoaringBitmap>> iterator = bitMap.entrySet().iterator();
Expand Down Expand Up @@ -134,6 +139,7 @@ Pair<ImmutableBucket, DelayedIndex> createImmutableBucketAndAsyncPersistent(
bucket.setCurrentSegmentEntryId(1);
bucket.setNumberBucketDelayedMessages(numMessages);
bucket.setLastSegmentEntryId(lastSegmentEntryId);
bucket.setFirstScheduleTimestamps(firstScheduleTimestamps);

// Skip first segment, because it has already been loaded
List<SnapshotSegment> snapshotSegments = bucketSnapshotSegments.subList(1, bucketSnapshotSegments.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ message DelayedIndex {
message SnapshotSegmentMetadata {
map<uint64, bytes> delayed_index_bit_map = 1;
required uint64 max_schedule_timestamp = 2;
required uint64 min_schedule_timestamp = 3;
}

message SnapshotSegment {
Expand Down
Loading