Skip to content

Commit

Permalink
Implement delayed message bucket recover
Browse files Browse the repository at this point in the history
  • Loading branch information
coderzc committed Nov 9, 2022
1 parent ef5346b commit 2ba0f47
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.broker.delayed.bucket;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELAYED_BUCKET_KEY_PREFIX;
import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELIMITER;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Range;
import com.google.common.collect.RangeMap;
Expand All @@ -27,10 +29,13 @@
import io.netty.util.Timeout;
import io.netty.util.Timer;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -39,11 +44,13 @@
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;

@Slf4j
Expand Down Expand Up @@ -91,9 +98,66 @@ public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispat
this.sharedBucketPriorityQueue = new TripleLongPriorityQueue();
this.immutableBuckets = TreeRangeMap.create();
this.snapshotSegmentLastIndexTable = HashBasedTable.create();
this.numberDelayedMessages = 0L;
ManagedCursor cursor = dispatcher.getCursor();
this.lastMutableBucket = new MutableBucket(cursor, bucketSnapshotStorage);
this.numberDelayedMessages = recoverBucketSnapshot();
}

private long recoverBucketSnapshot() throws RuntimeException {
ManagedCursor cursor = this.lastMutableBucket.cursor;
cursor.getCursorProperties().keySet().forEach(key -> {
if (key.startsWith(DELAYED_BUCKET_KEY_PREFIX)) {
String[] keys = key.split(DELIMITER);
checkArgument(keys.length == 3);
ImmutableBucket immutableBucket =
new ImmutableBucket(cursor, this.lastMutableBucket.bucketSnapshotStorage,
Long.parseLong(keys[1]), Long.parseLong(keys[2]));
immutableBuckets.put(Range.closed(immutableBucket.startLedgerId, immutableBucket.endLedgerId),
immutableBucket);
}
});

if (immutableBuckets.asMapOfRanges().isEmpty()) {
return 0;
}

List<CompletableFuture<Void>> futures = new ArrayList<>(immutableBuckets.asMapOfRanges().size());
for (ImmutableBucket immutableBucket : immutableBuckets.asMapOfRanges().values()) {
CompletableFuture<Void> future =
immutableBucket.asyncRecoverBucketSnapshotEntry(this::getCutoffTime).thenAccept(indexList -> {
if (CollectionUtils.isEmpty(indexList)) {
return;
}
DelayedIndex lastDelayedIndex = indexList.get(indexList.size() - 1);
synchronized (this.snapshotSegmentLastIndexTable) {
this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(),
lastDelayedIndex.getEntryId(), immutableBucket);
}
synchronized (this.sharedBucketPriorityQueue) {
for (DelayedIndex index : indexList) {
this.sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(),
index.getEntryId());
}
}
});
futures.add(future);
}

try {
FutureUtil.waitForAll(futures).get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new RuntimeException(e);
}

MutableLong numberDelayedMessages = new MutableLong(0);
immutableBuckets.asMapOfRanges().values().forEach(bucket -> {
numberDelayedMessages.add(bucket.numberBucketDelayedMessages);
});

log.info("[{}] Recover delayed message index bucket snapshot finish, buckets: {}, numberDelayedMessages: {}",
dispatcher.getName(), immutableBuckets.asMapOfRanges().size(), numberDelayedMessages.getValue());

return numberDelayedMessages.getValue();
}

@Override
Expand Down Expand Up @@ -242,7 +306,7 @@ public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessa
// All message of current snapshot segment are scheduled, load next snapshot segment
// TODO make it asynchronous and not blocking this process
try {
bucket.asyncLoadNextBucketSnapshotEntry(false).thenAccept(indexList -> {
bucket.asyncLoadNextBucketSnapshotEntry().thenAccept(indexList -> {
if (CollectionUtils.isEmpty(indexList)) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,39 @@
package org.apache.pulsar.broker.delayed.bucket;

import static org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker.AsyncOperationTimeoutSeconds;
import com.google.protobuf.ByteString;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata;
import org.roaringbitmap.RoaringBitmap;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;

@Slf4j
class ImmutableBucket extends Bucket {
ImmutableBucket(ManagedCursor cursor, BucketSnapshotStorage storage, long startLedgerId, long endLedgerId) {
super(cursor, storage, startLedgerId, endLedgerId);
}

/**
* Asynchronous load next bucket snapshot entry.
* @param isRecover whether used to recover bucket snapshot
* @return CompletableFuture
*/
CompletableFuture<List<DelayedIndex>> asyncLoadNextBucketSnapshotEntry(boolean isRecover) {
CompletableFuture<List<DelayedIndex>> asyncLoadNextBucketSnapshotEntry() {
return asyncLoadNextBucketSnapshotEntry(false, null);
}

CompletableFuture<List<DelayedIndex>> asyncRecoverBucketSnapshotEntry(Supplier<Long> cutoffTimeSupplier) {
return asyncLoadNextBucketSnapshotEntry(true, cutoffTimeSupplier);
}

private CompletableFuture<List<DelayedIndex>> asyncLoadNextBucketSnapshotEntry(boolean isRecover,
Supplier<Long> cutoffTimeSupplier) {
if (log.isDebugEnabled()) {
log.debug("[{}] Load next bucket snapshot data, bucket: {}", cursor.getName(), this);
}
Expand All @@ -54,7 +65,28 @@ CompletableFuture<List<DelayedIndex>> asyncLoadNextBucketSnapshotEntry(boolean i
final long bucketId = getAndUpdateBucketId();
CompletableFuture<Integer> loadMetaDataFuture = new CompletableFuture<>();
if (isRecover) {
// TODO Recover bucket snapshot
final long cutoffTime = cutoffTimeSupplier.get();
// Load Metadata of bucket snapshot
bucketSnapshotStorage.getBucketSnapshotMetadata(bucketId).thenAccept(snapshotMetadata -> {
List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata> metadataList =
snapshotMetadata.getMetadataListList();

// Skip all already reach schedule time snapshot segments
int nextSnapshotEntryIndex = 0;
while (nextSnapshotEntryIndex < metadataList.size()
&& metadataList.get(nextSnapshotEntryIndex).getMaxScheduleTimestamp() <= cutoffTime) {
nextSnapshotEntryIndex++;
}

this.setLastSegmentEntryId(metadataList.size());
this.recoverDelayedIndexBitMapAndNumber(nextSnapshotEntryIndex, metadataList);

int nextSegmentEntryId = nextSnapshotEntryIndex + 1;
loadMetaDataFuture.complete(nextSegmentEntryId);
}).exceptionally(ex -> {
loadMetaDataFuture.completeExceptionally(ex);
return null;
});
} else {
loadMetaDataFuture.complete(currentSegmentEntryId + 1);
}
Expand Down Expand Up @@ -82,6 +114,27 @@ CompletableFuture<List<DelayedIndex>> asyncLoadNextBucketSnapshotEntry(boolean i
});
}

private void recoverDelayedIndexBitMapAndNumber(int startSnapshotIndex,
List<SnapshotSegmentMetadata> segmentMetadata) {
this.delayedIndexBitMap.clear();
MutableLong numberMessages = new MutableLong(0);
for (int i = startSnapshotIndex; i < segmentMetadata.size(); i++) {
Map<Long, ByteString> bitByteStringMap = segmentMetadata.get(i).getDelayedIndexBitMapMap();
bitByteStringMap.forEach((leaderId, bitSetString) -> {
boolean exist = this.delayedIndexBitMap.containsKey(leaderId);
RoaringBitmap bitSet =
new ImmutableRoaringBitmap(bitSetString.asReadOnlyByteBuffer()).toRoaringBitmap();
numberMessages.add(bitSet.getCardinality());
if (!exist) {
this.delayedIndexBitMap.put(leaderId, bitSet);
} else {
this.delayedIndexBitMap.get(leaderId).or(bitSet);
}
});
}
this.setNumberBucketDelayedMessages(numberMessages.getValue());
}

void clear(boolean delete) {
delayedIndexBitMap.clear();
getSnapshotCreateFuture().ifPresent(snapshotGenerateFuture -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,17 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotSame;
import static org.testng.Assert.assertTrue;
import static org.testng.AssertJUnit.assertFalse;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.time.Clock;
import java.util.Arrays;
import java.util.List;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
Expand All @@ -40,6 +45,8 @@
import org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.roaringbitmap.RoaringBitmap;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -156,4 +163,76 @@ public void testContainsMessage(DelayedDeliveryTracker tracker) {

tracker.close();
}

@Test(dataProvider = "delayedTracker", invocationCount = 10)
public void testRecoverSnapshot(BucketDelayedDeliveryTracker tracker) {
for (int i = 1; i <= 100; i++) {
tracker.addMessage(i, i, i * 10);
}

assertEquals(tracker.getNumberOfDelayedMessages(), 100);

clockTime.set(1 * 10);

assertTrue(tracker.hasMessageAvailable());
Set<PositionImpl> scheduledMessages = tracker.getScheduledMessages(100);

assertEquals(scheduledMessages.size(), 1);

tracker.addMessage(101, 101, 101 * 10);

tracker.close();

clockTime.set(30 * 10);

tracker = new BucketDelayedDeliveryTracker(dispatcher, timer, 1000, clock,
true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50);

assertFalse(tracker.containsMessage(101, 101));
assertEquals(tracker.getNumberOfDelayedMessages(), 70);

clockTime.set(100 * 10);

assertTrue(tracker.hasMessageAvailable());
scheduledMessages = tracker.getScheduledMessages(70);

assertEquals(scheduledMessages.size(), 70);

int i = 31;
for (PositionImpl scheduledMessage : scheduledMessages) {
assertEquals(scheduledMessage, PositionImpl.get(i, i));
i++;
}

tracker.close();
}

@Test
public void testRoaringBitmapSerialize() {
List<Long> data = List.of(1L, 3L, 5L, 10L, 16L, 18L, 999L, 0L);
RoaringBitmap roaringBitmap = new RoaringBitmap();
for (Long datum : data) {
roaringBitmap.add(datum, datum + 1);
}

assertEquals(roaringBitmap.getCardinality(), data.size());
for (Long datum : data) {
assertTrue(roaringBitmap.contains(datum, datum + 1));
}

byte[] array = new byte[roaringBitmap.serializedSizeInBytes()];
roaringBitmap.serialize(ByteBuffer.wrap(array));

RoaringBitmap roaringBitmap2 = new ImmutableRoaringBitmap(ByteBuffer.wrap(array)).toRoaringBitmap();
assertEquals(roaringBitmap2.getCardinality(), data.size());
for (Long datum : data) {
assertTrue(roaringBitmap2.contains(datum, datum + 1));
}

byte[] array2 = new byte[roaringBitmap2.serializedSizeInBytes()];
roaringBitmap.serialize(ByteBuffer.wrap(array2));

assertTrue(Arrays.equals(array, array2));
assertNotSame(array, array2);
}
}

0 comments on commit 2ba0f47

Please sign in to comment.