diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java index dc1d1eb6c9ac5..f5ad77a71d8c4 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java @@ -24,6 +24,7 @@ import java.util.function.Supplier; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.pulsar.common.util.FutureUtil; /** * Conveniences to use with {@link CompletableFuture}. @@ -78,7 +79,8 @@ public static CompletableFuture executeWithRetry(Supplier 0) { + Throwable throwable = FutureUtil.unwrapCompletionException(ex); + if (needRetryExceptionClass.isAssignableFrom(throwable.getClass()) && maxRetryTimes > 0) { executeWithRetry(op, needRetryExceptionClass, maxRetryTimes - 1).whenComplete((res2, ex2) -> { if (ex2 == null) { resultFuture.complete(res2); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java index 1cadc6d98e268..7dd6266e2115c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java @@ -154,7 +154,7 @@ private CompletableFuture createLedger(String bucketKey) { LedgerPassword, (rc, handle, ctx) -> { if (rc != BKException.Code.OK) { - future.completeExceptionally(bkException("Failed to create ledger", rc, -1)); + future.completeExceptionally(bkException("Create ledger", rc, -1)); } else { future.complete(handle); } @@ -170,7 +170,7 @@ private CompletableFuture openLedger(Long ledgerId) { LedgerPassword, (rc, handle, ctx) -> { if (rc != BKException.Code.OK) { - future.completeExceptionally(bkException("Failed to open ledger", rc, ledgerId)); + future.completeExceptionally(bkException("Open ledger", rc, ledgerId)); } else { future.complete(handle); } @@ -184,7 +184,7 @@ private CompletableFuture closeLedger(LedgerHandle ledgerHandle) { ledgerHandle.asyncClose((rc, handle, ctx) -> { if (rc != BKException.Code.OK) { log.warn("Failed to close a Ledger Handle: {}", ledgerHandle.getId()); - future.completeExceptionally(bkException("Failed to close ledger", rc, ledgerHandle.getId())); + future.completeExceptionally(bkException("Close ledger", rc, ledgerHandle.getId())); } else { future.complete(null); } @@ -197,7 +197,7 @@ private CompletableFuture addEntry(LedgerHandle ledgerHandle, byte[] data) ledgerHandle.asyncAddEntry(data, (rc, handle, entryId, ctx) -> { if (rc != BKException.Code.OK) { - future.completeExceptionally(bkException("Failed to add entry", rc, ledgerHandle.getId())); + future.completeExceptionally(bkException("Add entry", rc, ledgerHandle.getId())); } else { future.complete(null); } @@ -217,7 +217,7 @@ CompletableFuture> getLedgerEntryThenCloseLedger(Ledger ledger.asyncReadEntries(firstEntryId, lastEntryId, (rc, handle, entries, ctx) -> { if (rc != BKException.Code.OK) { - future.completeExceptionally(bkException("Failed to read entry", rc, ledger.getId())); + future.completeExceptionally(bkException("Read entry", rc, ledger.getId())); } else { future.complete(entries); } @@ -231,7 +231,7 @@ private CompletableFuture deleteLedger(long ledgerId) { CompletableFuture future = new CompletableFuture<>(); bookKeeper.asyncDeleteLedger(ledgerId, (int rc, Object cnx) -> { if (rc != BKException.Code.OK) { - future.completeExceptionally(bkException("Failed to delete ledger", rc, ledgerId)); + future.completeExceptionally(bkException("Delete ledger", rc, ledgerId)); } else { future.complete(null); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java index 5d2a556337a6e..50b5cd12ead07 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java @@ -42,6 +42,8 @@ abstract class Bucket { static final String DELIMITER = "_"; static final int MaxRetryTimes = 3; + protected final String dispatcherName; + protected final ManagedCursor cursor; protected final BucketSnapshotStorage bucketSnapshotStorage; @@ -54,17 +56,18 @@ abstract class Bucket { int lastSegmentEntryId; - int currentSegmentEntryId; + volatile int currentSegmentEntryId; - long snapshotLength; + volatile long snapshotLength; private volatile Long bucketId; private volatile CompletableFuture snapshotCreateFuture; - Bucket(ManagedCursor cursor, BucketSnapshotStorage storage, long startLedgerId, long endLedgerId) { - this(cursor, storage, startLedgerId, endLedgerId, new HashMap<>(), -1, -1, 0, 0, null, null); + Bucket(String dispatcherName, ManagedCursor cursor, + BucketSnapshotStorage storage, long startLedgerId, long endLedgerId) { + this(dispatcherName, cursor, storage, startLedgerId, endLedgerId, new HashMap<>(), -1, -1, 0, 0, null, null); } boolean containsMessage(long ledgerId, long entryId) { @@ -126,13 +129,19 @@ CompletableFuture asyncSaveBucketSnapshot( ImmutableBucket bucket, DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata, List bucketSnapshotSegments) { final String bucketKey = bucket.bucketKey(); - return bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata, bucketSnapshotSegments, bucketKey) - .thenCompose(newBucketId -> { + return executeWithRetry( + () -> bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata, bucketSnapshotSegments, bucketKey) + .whenComplete((__, ex) -> { + if (ex != null) { + log.warn("[{}] Failed to create bucket snapshot, bucketKey: {}", + dispatcherName, bucketKey, ex); + } + }), BucketSnapshotPersistenceException.class, MaxRetryTimes).thenCompose(newBucketId -> { bucket.setBucketId(newBucketId); return putBucketKeyId(bucketKey, newBucketId).exceptionally(ex -> { - log.warn("Failed to record bucketId to cursor property, bucketKey: {}, bucketId: {}", - bucketKey, bucketId); + log.warn("[{}] Failed to record bucketId to cursor property, bucketKey: {}, bucketId: {}", + dispatcherName, bucketKey, newBucketId, ex); return null; }).thenApply(__ -> newBucketId); }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index bd4ef92cc7320..a77b272297be4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELAYED_BUCKET_KEY_PREFIX; import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELIMITER; +import static org.apache.pulsar.broker.delayed.bucket.Bucket.MaxRetryTimes; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.HashBasedTable; import com.google.common.collect.Range; @@ -61,7 +62,9 @@ @ThreadSafe public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker { - static final int AsyncOperationTimeoutSeconds = 30; + static final CompletableFuture NULL_LONG_PROMISE = CompletableFuture.completedFuture(null); + + static final int AsyncOperationTimeoutSeconds = 60; private final long minIndexCountPerBucket; @@ -104,20 +107,19 @@ public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispat this.sharedBucketPriorityQueue = new TripleLongPriorityQueue(); this.immutableBuckets = TreeRangeMap.create(); this.snapshotSegmentLastIndexTable = HashBasedTable.create(); - ManagedCursor cursor = dispatcher.getCursor(); - this.lastMutableBucket = new MutableBucket(cursor, bucketSnapshotStorage); + this.lastMutableBucket = new MutableBucket(dispatcher.getName(), dispatcher.getCursor(), bucketSnapshotStorage); this.numberDelayedMessages = recoverBucketSnapshot(); } private synchronized long recoverBucketSnapshot() throws RuntimeException { - ManagedCursor cursor = this.lastMutableBucket.cursor; + ManagedCursor cursor = this.lastMutableBucket.getCursor(); Map, ImmutableBucket> toBeDeletedBucketMap = new HashMap<>(); cursor.getCursorProperties().keySet().forEach(key -> { if (key.startsWith(DELAYED_BUCKET_KEY_PREFIX)) { String[] keys = key.split(DELIMITER); checkArgument(keys.length == 3); ImmutableBucket immutableBucket = - new ImmutableBucket(cursor, this.lastMutableBucket.bucketSnapshotStorage, + new ImmutableBucket(dispatcher.getName(), cursor, this.lastMutableBucket.bucketSnapshotStorage, Long.parseLong(keys[1]), Long.parseLong(keys[2])); putAndCleanOverlapRange(Range.closed(immutableBucket.startLedgerId, immutableBucket.endLedgerId), immutableBucket, toBeDeletedBucketMap); @@ -126,6 +128,8 @@ private synchronized long recoverBucketSnapshot() throws RuntimeException { Map, ImmutableBucket> immutableBucketMap = immutableBuckets.asMapOfRanges(); if (immutableBucketMap.isEmpty()) { + log.info("[{}] Recover delayed message index bucket snapshot finish, don't find bucket snapshot", + dispatcher.getName()); return 0; } @@ -232,10 +236,42 @@ private void afterCreateImmutableBucket(Pair immu DelayedIndex lastDelayedIndex = immutableBucketDelayedIndexPair.getRight(); snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(), lastDelayedIndex.getEntryId(), immutableBucket); - if (log.isDebugEnabled()) { - log.debug("[{}] Create bucket snapshot, bucket: {}", dispatcher.getName(), - lastMutableBucket); - } + + immutableBucket.getSnapshotCreateFuture().ifPresent(createFuture -> { + CompletableFuture future = createFuture.whenComplete((__, ex) -> { + if (ex == null) { + immutableBucket.setSnapshotSegments(null); + log.info("[{}] Creat bucket snapshot finish, bucketKey: {}", dispatcher.getName(), + immutableBucket.bucketKey()); + return; + } + + //TODO Record create snapshot failed + log.error("[{}] Failed to create bucket snapshot, bucketKey: {}", + dispatcher.getName(), immutableBucket.bucketKey(), ex); + + // Put indexes back into the shared queue and downgrade to memory mode + synchronized (BucketDelayedDeliveryTracker.this) { + immutableBucket.getSnapshotSegments().ifPresent(snapshotSegments -> { + for (DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment : + snapshotSegments) { + for (DelayedIndex delayedIndex : snapshotSegment.getIndexesList()) { + sharedBucketPriorityQueue.add(delayedIndex.getTimestamp(), + delayedIndex.getLedgerId(), delayedIndex.getEntryId()); + } + } + immutableBucket.setSnapshotSegments(null); + }); + + immutableBucket.setCurrentSegmentEntryId(immutableBucket.lastSegmentEntryId); + immutableBuckets.remove( + Range.closed(immutableBucket.startLedgerId, immutableBucket.endLedgerId)); + snapshotSegmentLastIndexTable.remove(lastDelayedIndex.getLedgerId(), + lastDelayedIndex.getTimestamp()); + } + }); + immutableBucket.setSnapshotCreateFuture(future); + }); } } @@ -263,12 +299,10 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver if (immutableBuckets.asMapOfRanges().size() > maxNumBuckets) { try { - asyncMergeBucketSnapshot().get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - throw new RuntimeException(e); + asyncMergeBucketSnapshot().get(2 * AsyncOperationTimeoutSeconds * MaxRetryTimes, TimeUnit.SECONDS); + } catch (Exception e) { + // Ignore exception to merge bucket on the next schedule. + log.error("[{}] An exception occurs when merge bucket snapshot.", dispatcher.getName(), e); } } } @@ -304,7 +338,10 @@ private synchronized CompletableFuture asyncMergeBucketSnapshot() { long numberMessages = bucketL.numberBucketDelayedMessages + bucketR.numberBucketDelayedMessages; if (numberMessages < minNumberMessages) { minNumberMessages = (int) numberMessages; - if (bucketL.lastSegmentEntryId > bucketL.getCurrentSegmentEntryId()) { + if (bucketL.lastSegmentEntryId > bucketL.getCurrentSegmentEntryId() + && bucketR.lastSegmentEntryId > bucketR.getCurrentSegmentEntryId() + && bucketL.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone() + && bucketR.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone()) { minIndex = i; } } @@ -314,62 +351,66 @@ private synchronized CompletableFuture asyncMergeBucketSnapshot() { log.warn("[{}] Can't find able merged bucket", dispatcher.getName()); return CompletableFuture.completedFuture(null); } - return asyncMergeBucketSnapshot(values.get(minIndex), values.get(minIndex + 1)); + + ImmutableBucket immutableBucketA = values.get(minIndex); + ImmutableBucket immutableBucketB = values.get(minIndex + 1); + + if (log.isDebugEnabled()) { + log.info("[{}] Merging bucket snapshot, bucketAKey: {}, bucketBKey: {}", dispatcher.getName(), + immutableBucketA.bucketKey(), immutableBucketB.bucketKey()); + } + return asyncMergeBucketSnapshot(immutableBucketA, immutableBucketB).whenComplete((__, ex) -> { + if (ex != null) { + log.error("[{}] Failed to merge bucket snapshot, bucketAKey: {}, bucketBKey: {}", + dispatcher.getName(), immutableBucketA.bucketKey(), immutableBucketB.bucketKey(), ex); + } else { + log.info("[{}] Merge bucket snapshot finish, bucketAKey: {}, bucketBKey: {}", + dispatcher.getName(), immutableBucketA.bucketKey(), immutableBucketB.bucketKey()); + } + }); } private synchronized CompletableFuture asyncMergeBucketSnapshot(ImmutableBucket bucketA, ImmutableBucket bucketB) { - CompletableFuture snapshotCreateFutureA = - bucketA.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null)); - CompletableFuture snapshotCreateFutureB = - bucketB.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null)); - - return CompletableFuture.allOf(snapshotCreateFutureA, snapshotCreateFutureB).thenCompose(__ -> { - CompletableFuture> futureA = - bucketA.getRemainSnapshotSegment(); - CompletableFuture> futureB = - bucketB.getRemainSnapshotSegment(); - return futureA.thenCombine(futureB, CombinedSegmentDelayedIndexQueue::wrap) - .thenAccept(combinedDelayedIndexQueue -> { - Pair immutableBucketDelayedIndexPair = - lastMutableBucket.createImmutableBucketAndAsyncPersistent( - timeStepPerBucketSnapshotSegmentInMillis, sharedBucketPriorityQueue, - combinedDelayedIndexQueue, bucketA.startLedgerId, bucketB.endLedgerId); - - // Merge bit map to new bucket - Map delayedIndexBitMapA = bucketA.getDelayedIndexBitMap(); - Map delayedIndexBitMapB = bucketB.getDelayedIndexBitMap(); - Map delayedIndexBitMap = new HashMap<>(delayedIndexBitMapA); - delayedIndexBitMapB.forEach((ledgerId, bitMapB) -> { - delayedIndexBitMap.compute(ledgerId, (k, bitMapA) -> { - if (bitMapA == null) { - return bitMapB; - } - - bitMapA.or(bitMapB); - return bitMapA; - }); + CompletableFuture> futureA = + bucketA.getRemainSnapshotSegment(); + CompletableFuture> futureB = + bucketB.getRemainSnapshotSegment(); + return futureA.thenCombine(futureB, CombinedSegmentDelayedIndexQueue::wrap) + .thenAccept(combinedDelayedIndexQueue -> { + Pair immutableBucketDelayedIndexPair = + lastMutableBucket.createImmutableBucketAndAsyncPersistent( + timeStepPerBucketSnapshotSegmentInMillis, sharedBucketPriorityQueue, + combinedDelayedIndexQueue, bucketA.startLedgerId, bucketB.endLedgerId); + + // Merge bit map to new bucket + Map delayedIndexBitMapA = bucketA.getDelayedIndexBitMap(); + Map delayedIndexBitMapB = bucketB.getDelayedIndexBitMap(); + Map delayedIndexBitMap = new HashMap<>(delayedIndexBitMapA); + delayedIndexBitMapB.forEach((ledgerId, bitMapB) -> { + delayedIndexBitMap.compute(ledgerId, (k, bitMapA) -> { + if (bitMapA == null) { + return bitMapB; + } + + bitMapA.or(bitMapB); + return bitMapA; }); - immutableBucketDelayedIndexPair.getLeft().setDelayedIndexBitMap(delayedIndexBitMap); - - afterCreateImmutableBucket(immutableBucketDelayedIndexPair); - - CompletableFuture snapshotCreateFuture = CompletableFuture.completedFuture(null); - if (immutableBucketDelayedIndexPair != null) { - snapshotCreateFuture = immutableBucketDelayedIndexPair.getLeft().getSnapshotCreateFuture() - .orElse(CompletableFuture.completedFuture(null)); - } + }); + immutableBucketDelayedIndexPair.getLeft().setDelayedIndexBitMap(delayedIndexBitMap); - snapshotCreateFuture.thenCompose(___ -> { - CompletableFuture removeAFuture = bucketA.asyncDeleteBucketSnapshot(); - CompletableFuture removeBFuture = bucketB.asyncDeleteBucketSnapshot(); - return CompletableFuture.allOf(removeAFuture, removeBFuture); - }); + afterCreateImmutableBucket(immutableBucketDelayedIndexPair); - immutableBuckets.remove(Range.closed(bucketA.startLedgerId, bucketA.endLedgerId)); - immutableBuckets.remove(Range.closed(bucketB.startLedgerId, bucketB.endLedgerId)); + immutableBucketDelayedIndexPair.getLeft().getSnapshotCreateFuture() + .orElse(NULL_LONG_PROMISE).thenCompose(___ -> { + CompletableFuture removeAFuture = bucketA.asyncDeleteBucketSnapshot(); + CompletableFuture removeBFuture = bucketB.asyncDeleteBucketSnapshot(); + return CompletableFuture.allOf(removeAFuture, removeBFuture); }); - }); + + immutableBuckets.remove(Range.closed(bucketA.startLedgerId, bucketA.endLedgerId)); + immutableBuckets.remove(Range.closed(bucketB.startLedgerId, bucketB.endLedgerId)); + }); } @Override @@ -422,19 +463,31 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa long ledgerId = sharedBucketPriorityQueue.peekN2(); long entryId = sharedBucketPriorityQueue.peekN3(); - positions.add(new PositionImpl(ledgerId, entryId)); - sharedBucketPriorityQueue.pop(); - removeIndexBit(ledgerId, entryId); - - ImmutableBucket bucket = snapshotSegmentLastIndexTable.remove(ledgerId, entryId); + ImmutableBucket bucket = snapshotSegmentLastIndexTable.get(ledgerId, entryId); if (bucket != null && immutableBuckets.asMapOfRanges().containsValue(bucket)) { + final int preSegmentEntryId = bucket.currentSegmentEntryId; if (log.isDebugEnabled()) { - log.debug("[{}] Load next snapshot segment, bucket: {}", dispatcher.getName(), bucket); + log.debug("[{}] Loading next bucket snapshot segment, bucketKey: {}, nextSegmentEntryId: {}", + dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1); } // All message of current snapshot segment are scheduled, load next snapshot segment // TODO make it asynchronous and not blocking this process try { + boolean createFutureDone = bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone(); + + if (!createFutureDone) { + log.info("[{}] Skip load to wait for bucket snapshot create finish, bucketKey:{}", + dispatcher.getName(), bucket.bucketKey()); + break; + } + + if (bucket.currentSegmentEntryId == bucket.lastSegmentEntryId) { + immutableBuckets.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId)); + bucket.asyncDeleteBucketSnapshot(); + continue; + } + bucket.asyncLoadNextBucketSnapshotEntry().thenAccept(indexList -> { if (CollectionUtils.isEmpty(indexList)) { immutableBuckets.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId)); @@ -449,13 +502,32 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(), index.getEntryId()); } - }).get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - // TODO make this segment load again - throw new RuntimeException(e); + }).whenComplete((__, ex) -> { + if (ex != null) { + // Back bucket state + bucket.setCurrentSegmentEntryId(preSegmentEntryId); + + log.error("[{}] Failed to load bucket snapshot segment, bucketKey: {}", + dispatcher.getName(), bucket.bucketKey(), ex); + } else { + log.info("[{}] Load next bucket snapshot segment finish, bucketKey: {}, segmentEntryId: {}", + dispatcher.getName(), bucket.bucketKey(), bucket.currentSegmentEntryId); + } + }).get(AsyncOperationTimeoutSeconds * MaxRetryTimes, TimeUnit.SECONDS); + snapshotSegmentLastIndexTable.remove(ledgerId, entryId); + } catch (Exception e) { + // Ignore exception to reload this segment on the next schedule. + log.error("[{}] An exception occurs when load next bucket snapshot, bucketKey:{}", + dispatcher.getName(), bucket.bucketKey(), e); + break; } } + positions.add(new PositionImpl(ledgerId, entryId)); + + sharedBucketPriorityQueue.pop(); + removeIndexBit(ledgerId, entryId); + --n; --numberDelayedMessages; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java index 3e9c577454fed..c947e53843a85 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java @@ -18,14 +18,17 @@ */ package org.apache.pulsar.broker.delayed.bucket; +import static org.apache.bookkeeper.mledger.util.Futures.executeWithRetry; import static org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker.AsyncOperationTimeoutSeconds; import com.google.protobuf.ByteString; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.commons.collections4.CollectionUtils; @@ -38,8 +41,17 @@ @Slf4j class ImmutableBucket extends Bucket { - ImmutableBucket(ManagedCursor cursor, BucketSnapshotStorage storage, long startLedgerId, long endLedgerId) { - super(cursor, storage, startLedgerId, endLedgerId); + + @Setter + private volatile List snapshotSegments; + + ImmutableBucket(String dispatcherName, ManagedCursor cursor, + BucketSnapshotStorage storage, long startLedgerId, long endLedgerId) { + super(dispatcherName, cursor, storage, startLedgerId, endLedgerId); + } + + public Optional> getSnapshotSegments() { + return Optional.ofNullable(snapshotSegments); } CompletableFuture> asyncLoadNextBucketSnapshotEntry() { @@ -52,61 +64,65 @@ CompletableFuture> asyncRecoverBucketSnapshotEntry(Supplier> asyncLoadNextBucketSnapshotEntry(boolean isRecover, Supplier cutoffTimeSupplier) { - if (log.isDebugEnabled()) { - log.debug("[{}] Load next bucket snapshot data, bucket: {}", cursor.getName(), this); + final long bucketId = getAndUpdateBucketId(); + final CompletableFuture loadMetaDataFuture; + if (isRecover) { + final long cutoffTime = cutoffTimeSupplier.get(); + // Load Metadata of bucket snapshot + final String bucketKey = bucketKey(); + loadMetaDataFuture = executeWithRetry(() -> bucketSnapshotStorage.getBucketSnapshotMetadata(bucketId) + .whenComplete((___, ex) -> { + if (ex != null) { + log.warn("[{}] Failed to get bucket snapshot metadata," + + " bucketKey: {}, bucketId: {}", + dispatcherName, bucketKey, bucketId, ex); + } + }), BucketSnapshotPersistenceException.class, MaxRetryTimes) + .thenApply(snapshotMetadata -> { + List metadataList = + snapshotMetadata.getMetadataListList(); + + // Skip all already reach schedule time snapshot segments + int nextSnapshotEntryIndex = 0; + while (nextSnapshotEntryIndex < metadataList.size() + && metadataList.get(nextSnapshotEntryIndex).getMaxScheduleTimestamp() <= cutoffTime) { + nextSnapshotEntryIndex++; + } + + this.setLastSegmentEntryId(metadataList.size()); + this.recoverDelayedIndexBitMapAndNumber(nextSnapshotEntryIndex, metadataList); + + return nextSnapshotEntryIndex + 1; + }); + } else { + loadMetaDataFuture = CompletableFuture.completedFuture(currentSegmentEntryId + 1); } - // Wait bucket snapshot create finish - CompletableFuture snapshotCreateFuture = - getSnapshotCreateFuture().orElseGet(() -> CompletableFuture.completedFuture(null)) - .thenApply(__ -> null); - - return snapshotCreateFuture.thenCompose(__ -> { - final long bucketId = getAndUpdateBucketId(); - final CompletableFuture loadMetaDataFuture; - if (isRecover) { - final long cutoffTime = cutoffTimeSupplier.get(); - // Load Metadata of bucket snapshot - loadMetaDataFuture = bucketSnapshotStorage.getBucketSnapshotMetadata(bucketId) - .thenApply(snapshotMetadata -> { - List metadataList = - snapshotMetadata.getMetadataListList(); - - // Skip all already reach schedule time snapshot segments - int nextSnapshotEntryIndex = 0; - while (nextSnapshotEntryIndex < metadataList.size() - && metadataList.get(nextSnapshotEntryIndex).getMaxScheduleTimestamp() <= cutoffTime) { - nextSnapshotEntryIndex++; - } - - this.setLastSegmentEntryId(metadataList.size()); - this.recoverDelayedIndexBitMapAndNumber(nextSnapshotEntryIndex, metadataList); - - return nextSnapshotEntryIndex + 1; - }); - } else { - loadMetaDataFuture = CompletableFuture.completedFuture(currentSegmentEntryId + 1); + return loadMetaDataFuture.thenCompose(nextSegmentEntryId -> { + if (nextSegmentEntryId > lastSegmentEntryId) { + return CompletableFuture.completedFuture(null); } - return loadMetaDataFuture.thenCompose(nextSegmentEntryId -> { - if (nextSegmentEntryId > lastSegmentEntryId) { - return CompletableFuture.completedFuture(null); - } - - return bucketSnapshotStorage.getBucketSnapshotSegment(bucketId, nextSegmentEntryId, nextSegmentEntryId) - .thenApply(bucketSnapshotSegments -> { - if (CollectionUtils.isEmpty(bucketSnapshotSegments)) { - return Collections.emptyList(); - } - - DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment = - bucketSnapshotSegments.get(0); - List indexList = - snapshotSegment.getIndexesList(); - this.setCurrentSegmentEntryId(nextSegmentEntryId); - return indexList; - }); - }); + return executeWithRetry( + () -> bucketSnapshotStorage.getBucketSnapshotSegment(bucketId, nextSegmentEntryId, + nextSegmentEntryId).whenComplete((___, ex) -> { + if (ex != null) { + log.warn("[{}] Failed to get bucket snapshot segment. bucketKey: {}, bucketId: {}", + dispatcherName, bucketKey(), bucketId, ex); + } + }), BucketSnapshotPersistenceException.class, MaxRetryTimes) + .thenApply(bucketSnapshotSegments -> { + if (CollectionUtils.isEmpty(bucketSnapshotSegments)) { + return Collections.emptyList(); + } + + DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment = + bucketSnapshotSegments.get(0); + List indexList = + snapshotSegment.getIndexesList(); + this.setCurrentSegmentEntryId(nextSegmentEntryId); + return indexList; + }); }); } @@ -136,18 +152,34 @@ CompletableFuture> if (nextSegmentEntryId > lastSegmentEntryId) { return CompletableFuture.completedFuture(Collections.emptyList()); } - return bucketSnapshotStorage.getBucketSnapshotSegment(getAndUpdateBucketId(), nextSegmentEntryId, - lastSegmentEntryId); + return executeWithRetry(() -> { + return bucketSnapshotStorage.getBucketSnapshotSegment(getAndUpdateBucketId(), nextSegmentEntryId, + lastSegmentEntryId).whenComplete((__, ex) -> { + if (ex != null) { + log.warn("[{}] Failed to get remain bucket snapshot segment, bucketKey: {}.", + dispatcherName, bucketKey(), ex); + } + }); + }, BucketSnapshotPersistenceException.class, MaxRetryTimes); } CompletableFuture asyncDeleteBucketSnapshot() { String bucketKey = bucketKey(); long bucketId = getAndUpdateBucketId(); return removeBucketCursorProperty(bucketKey).thenCompose(__ -> - bucketSnapshotStorage.deleteBucketSnapshot(bucketId)).whenComplete((__, ex) -> { + executeWithRetry(() -> bucketSnapshotStorage.deleteBucketSnapshot(bucketId).whenComplete((___, ex) -> { + if (ex != null) { + log.warn("[{}] Failed to delete bucket snapshot. bucketKey: {}, bucketId: {}", + dispatcherName, bucketKey, bucketId, ex); + } + }), + BucketSnapshotPersistenceException.class, MaxRetryTimes)).whenComplete((__, ex) -> { if (ex != null) { - log.warn("Failed to delete bucket snapshot, bucketId: {}, bucketKey: {}", - bucketId, bucketKey, ex); + log.warn("[{}] Failed to delete bucket snapshot, bucketId: {}, bucketKey: {}", + dispatcherName, bucketId, bucketKey, ex); + } else { + log.info("[{}] Delete bucket snapshot finish, bucketId: {}, bucketKey: {}", + dispatcherName, bucketId, bucketKey); } }); } @@ -179,8 +211,8 @@ void clear(boolean delete) { try { snapshotGenerateFuture.get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS); } catch (Exception e) { - log.warn("Failed wait to snapshot generate, bucketId: {}, bucketKey: {}", getBucketId(), - bucketKey()); + log.warn("[{}] Failed wait to snapshot generate, bucketId: {}, bucketKey: {}", dispatcherName, + getBucketId(), bucketKey()); } } }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java index ad457329c427f..40ba8f4c4b593 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java @@ -42,9 +42,9 @@ class MutableBucket extends Bucket implements AutoCloseable { private final TripleLongPriorityQueue priorityQueue; - MutableBucket(ManagedCursor cursor, + MutableBucket(String dispatcherName, ManagedCursor cursor, BucketSnapshotStorage bucketSnapshotStorage) { - super(cursor, bucketSnapshotStorage, -1L, -1L); + super(dispatcherName, cursor, bucketSnapshotStorage, -1L, -1L); this.priorityQueue = new TripleLongPriorityQueue(); } @@ -59,6 +59,9 @@ Pair createImmutableBucketAndAsyncPersistent( final long timeStepPerBucketSnapshotSegment, TripleLongPriorityQueue sharedQueue, DelayedIndexQueue delayedIndexQueue, final long startLedgerId, final long endLedgerId) { + log.info("[{}] Creating bucket snapshot, startLedgerId: {}, endLedgerId: {}", dispatcherName, + startLedgerId, endLedgerId); + if (delayedIndexQueue.isEmpty()) { return null; } @@ -122,11 +125,16 @@ Pair createImmutableBucketAndAsyncPersistent( final int lastSegmentEntryId = segmentMetadataList.size(); - ImmutableBucket bucket = new ImmutableBucket(cursor, bucketSnapshotStorage, startLedgerId, endLedgerId); + ImmutableBucket bucket = new ImmutableBucket(dispatcherName, cursor, bucketSnapshotStorage, + startLedgerId, endLedgerId); bucket.setCurrentSegmentEntryId(1); bucket.setNumberBucketDelayedMessages(numMessages); bucket.setLastSegmentEntryId(lastSegmentEntryId); + // Skip first segment, because it has already been loaded + List snapshotSegments = bucketSnapshotSegments.subList(1, bucketSnapshotSegments.size()); + bucket.setSnapshotSegments(snapshotSegments); + // Add the first snapshot segment last message to snapshotSegmentLastMessageTable checkArgument(!bucketSnapshotSegments.isEmpty()); SnapshotSegment snapshotSegment = bucketSnapshotSegments.get(0); @@ -136,12 +144,6 @@ Pair createImmutableBucketAndAsyncPersistent( CompletableFuture future = asyncSaveBucketSnapshot(bucket, bucketSnapshotMetadata, bucketSnapshotSegments); bucket.setSnapshotCreateFuture(future); - future.whenComplete((__, ex) -> { - if (ex != null) { - //TODO Record create snapshot failed - log.error("Failed to create snapshot: ", ex); - } - }); return result; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java index 9b2fbda4195da..b106642915587 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java @@ -23,8 +23,10 @@ import io.netty.buffer.PooledByteBufAllocator; import io.netty.util.concurrent.DefaultThreadFactory; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -36,6 +38,7 @@ import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage; import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata; import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment; +import org.apache.pulsar.common.util.FutureUtil; @Slf4j public class MockBucketSnapshotStorage implements BucketSnapshotStorage { @@ -53,9 +56,35 @@ public MockBucketSnapshotStorage() { this.maxBucketId = new AtomicLong(); } + public Queue createExceptionQueue = new LinkedList<>(); + public Queue getMetaDataExceptionQueue = new LinkedList<>(); + public Queue getSegmentExceptionQueue = new LinkedList<>(); + public Queue deleteExceptionQueue = new LinkedList<>(); + + + public void injectCreateException(Throwable throwable) { + createExceptionQueue.add(throwable); + } + + public void injectGetMetaDataException(Throwable throwable) { + getMetaDataExceptionQueue.add(throwable); + } + + public void injectGetSegmentException(Throwable throwable) { + getSegmentExceptionQueue.add(throwable); + } + + public void injectDeleteException(Throwable throwable) { + deleteExceptionQueue.add(throwable); + } + @Override public CompletableFuture createBucketSnapshot( SnapshotMetadata snapshotMetadata, List bucketSnapshotSegments, String bucketKey) { + Throwable throwable = createExceptionQueue.poll(); + if (throwable != null) { + return FutureUtil.failedFuture(throwable); + } return CompletableFuture.supplyAsync(() -> { long bucketId = maxBucketId.getAndIncrement(); List entries = new ArrayList<>(); @@ -81,6 +110,10 @@ public CompletableFuture createBucketSnapshot( @Override public CompletableFuture getBucketSnapshotMetadata(long bucketId) { + Throwable throwable = getMetaDataExceptionQueue.poll(); + if (throwable != null) { + return FutureUtil.failedFuture(throwable); + } return CompletableFuture.supplyAsync(() -> { ByteBuf byteBuf = this.bucketSnapshots.get(bucketId).get(0); SnapshotMetadata snapshotMetadata; @@ -96,6 +129,10 @@ public CompletableFuture getBucketSnapshotMetadata(long bucket @Override public CompletableFuture> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId, long lastSegmentEntryId) { + Throwable throwable = getSegmentExceptionQueue.poll(); + if (throwable != null) { + return FutureUtil.failedFuture(throwable); + } return CompletableFuture.supplyAsync(() -> { List snapshotSegments = new ArrayList<>(); long lastEntryId = Math.min(lastSegmentEntryId, this.bucketSnapshots.get(bucketId).size()); @@ -115,6 +152,10 @@ public CompletableFuture> getBucketSnapshotSegment(long bu @Override public CompletableFuture deleteBucketSnapshot(long bucketId) { + Throwable throwable = deleteExceptionQueue.poll(); + if (throwable != null) { + return FutureUtil.failedFuture(throwable); + } return CompletableFuture.supplyAsync(() -> { List remove = this.bucketSnapshots.remove(bucketId); if (remove != null) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java index efb0fa7ab7ba2..499262c1e60b9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java @@ -46,7 +46,7 @@ public MockManagedCursor(String name) { @Override public String getName() { - return null; + return this.name; } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java index 920f2cf2b64b3..08e1f78725bf4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java @@ -50,6 +50,8 @@ import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.roaringbitmap.RoaringBitmap; import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.testcontainers.shaded.org.apache.commons.lang3.mutable.MutableLong; +import org.testcontainers.shaded.org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -77,6 +79,7 @@ public Object[][] provider(Method method) throws Exception { bucketSnapshotStorage.start(); ManagedCursor cursor = new MockManagedCursor("my_test_cursor"); doReturn(cursor).when(dispatcher).getCursor(); + doReturn(cursor.getName()).when(dispatcher).getName(); final String methodName = method.getName(); return switch (methodName) { @@ -136,7 +139,7 @@ public Object[][] provider(Method method) throws Exception { new BucketDelayedDeliveryTracker(dispatcher, timer, 500, clock, true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50) }}; - case "testMergeSnapshot" -> new Object[][]{{ + case "testMergeSnapshot", "testWithBkException", "testWithCreateFailDowngrade" -> new Object[][]{{ new BucketDelayedDeliveryTracker(dispatcher, timer, 100000, clock, true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 10) }}; @@ -255,6 +258,24 @@ public void testMergeSnapshot(BucketDelayedDeliveryTracker tracker) { assertEquals(10, size); + tracker.addMessage(111, 1011, 111 * 10); + + MutableLong delayedMessagesInSnapshot = new MutableLong(); + tracker.getImmutableBuckets().asMapOfRanges().forEach((k, v) -> { + delayedMessagesInSnapshot.add(v.getNumberBucketDelayedMessages()); + }); + + tracker.close(); + + tracker = new BucketDelayedDeliveryTracker(dispatcher, timer, 1000, clock, + true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 10); + + assertEquals(tracker.getNumberOfDelayedMessages(), delayedMessagesInSnapshot.getValue()); + + for (int i = 1; i <= 110; i++) { + tracker.addMessage(i, i, i * 10); + } + clockTime.set(110 * 10); NavigableSet scheduledMessages = tracker.getScheduledMessages(110); @@ -263,4 +284,98 @@ public void testMergeSnapshot(BucketDelayedDeliveryTracker tracker) { assertEquals(position, PositionImpl.get(i, i)); } } + + @Test(dataProvider = "delayedTracker") + public void testWithBkException(BucketDelayedDeliveryTracker tracker) { + MockBucketSnapshotStorage mockBucketSnapshotStorage = (MockBucketSnapshotStorage) bucketSnapshotStorage; + mockBucketSnapshotStorage.injectCreateException( + new BucketSnapshotPersistenceException("Bookie operation timeout, op: Create entry")); + mockBucketSnapshotStorage.injectGetMetaDataException( + new BucketSnapshotPersistenceException("Bookie operation timeout, op: Get entry")); + mockBucketSnapshotStorage.injectGetSegmentException( + new BucketSnapshotPersistenceException("Bookie operation timeout, op: Get entry")); + mockBucketSnapshotStorage.injectDeleteException( + new BucketSnapshotPersistenceException("Bookie operation timeout, op: Delete entry")); + + assertEquals(1, mockBucketSnapshotStorage.createExceptionQueue.size()); + assertEquals(1, mockBucketSnapshotStorage.getMetaDataExceptionQueue.size()); + assertEquals(1, mockBucketSnapshotStorage.getSegmentExceptionQueue.size()); + assertEquals(1, mockBucketSnapshotStorage.deleteExceptionQueue.size()); + + for (int i = 1; i <= 110; i++) { + tracker.addMessage(i, i, i * 10); + } + + assertEquals(110, tracker.getNumberOfDelayedMessages()); + + int size = tracker.getImmutableBuckets().asMapOfRanges().size(); + + assertEquals(10, size); + + tracker.addMessage(111, 1011, 111 * 10); + + MutableLong delayedMessagesInSnapshot = new MutableLong(); + tracker.getImmutableBuckets().asMapOfRanges().forEach((k, v) -> { + delayedMessagesInSnapshot.add(v.getNumberBucketDelayedMessages()); + }); + + tracker.close(); + + tracker = new BucketDelayedDeliveryTracker(dispatcher, timer, 1000, clock, + true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 10); + + Long delayedMessagesInSnapshotValue = delayedMessagesInSnapshot.getValue(); + assertEquals(tracker.getNumberOfDelayedMessages(), delayedMessagesInSnapshotValue); + + clockTime.set(110 * 10); + + mockBucketSnapshotStorage.injectGetSegmentException( + new BucketSnapshotPersistenceException("Bookie operation timeout1, op: Get entry")); + mockBucketSnapshotStorage.injectGetSegmentException( + new BucketSnapshotPersistenceException("Bookie operation timeout2, op: Get entry")); + mockBucketSnapshotStorage.injectGetSegmentException( + new BucketSnapshotPersistenceException("Bookie operation timeout3, op: Get entry")); + mockBucketSnapshotStorage.injectGetSegmentException( + new BucketSnapshotPersistenceException("Bookie operation timeout4, op: Get entry")); + + assertEquals(tracker.getScheduledMessages(100).size(), 0); + + assertEquals(tracker.getScheduledMessages(100).size(), delayedMessagesInSnapshotValue); + + assertTrue(mockBucketSnapshotStorage.createExceptionQueue.isEmpty()); + assertTrue(mockBucketSnapshotStorage.getMetaDataExceptionQueue.isEmpty()); + assertTrue(mockBucketSnapshotStorage.getSegmentExceptionQueue.isEmpty()); + assertTrue(mockBucketSnapshotStorage.deleteExceptionQueue.isEmpty()); + } + + @Test(dataProvider = "delayedTracker") + public void testWithCreateFailDowngrade(BucketDelayedDeliveryTracker tracker) { + MockBucketSnapshotStorage mockBucketSnapshotStorage = (MockBucketSnapshotStorage) bucketSnapshotStorage; + mockBucketSnapshotStorage.injectCreateException( + new BucketSnapshotPersistenceException("Bookie operation timeout, op: Create entry")); + mockBucketSnapshotStorage.injectCreateException( + new BucketSnapshotPersistenceException("Bookie operation timeout, op: Create entry")); + mockBucketSnapshotStorage.injectCreateException( + new BucketSnapshotPersistenceException("Bookie operation timeout, op: Create entry")); + mockBucketSnapshotStorage.injectCreateException( + new BucketSnapshotPersistenceException("Bookie operation timeout, op: Create entry")); + + assertEquals(4, mockBucketSnapshotStorage.createExceptionQueue.size()); + + for (int i = 1; i <= 6; i++) { + tracker.addMessage(i, i, i * 10); + } + + Awaitility.await().untilAsserted(() -> assertEquals(0, tracker.getImmutableBuckets().asMapOfRanges().size())); + + clockTime.set(5 * 10); + + assertEquals(6, tracker.getNumberOfDelayedMessages()); + + NavigableSet scheduledMessages = tracker.getScheduledMessages(5); + for (int i = 1; i <= 5; i++) { + PositionImpl position = scheduledMessages.pollFirst(); + assertEquals(position, PositionImpl.get(i, i)); + } + } }