From 51beb74a4d3c794e2cf34a1d2c2897d34798e307 Mon Sep 17 00:00:00 2001 From: coderzc Date: Tue, 21 Feb 2023 15:19:54 +0800 Subject: [PATCH 01/14] Auto retry bucket snapshot create/load/delete --- .../apache/pulsar/broker/delayed/bucket/Bucket.java | 5 +++-- .../pulsar/broker/delayed/bucket/ImmutableBucket.java | 11 ++++++++--- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java index 5d2a556337a6e..f932a243607d9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java @@ -126,8 +126,9 @@ CompletableFuture asyncSaveBucketSnapshot( ImmutableBucket bucket, DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata, List bucketSnapshotSegments) { final String bucketKey = bucket.bucketKey(); - return bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata, bucketSnapshotSegments, bucketKey) - .thenCompose(newBucketId -> { + return executeWithRetry( + () -> bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata, bucketSnapshotSegments, bucketKey), + BucketSnapshotPersistenceException.class, MaxRetryTimes).thenCompose(newBucketId -> { bucket.setBucketId(newBucketId); return putBucketKeyId(bucketKey, newBucketId).exceptionally(ex -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java index 8348b4999ed80..15f726a0eb1d9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.delayed.bucket; +import static org.apache.bookkeeper.mledger.util.Futures.executeWithRetry; import static org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker.AsyncOperationTimeoutSeconds; import com.google.protobuf.ByteString; import java.util.Collections; @@ -69,7 +70,8 @@ private CompletableFuture> asyncLoadNextBucketSnapshotEntry(b if (isRecover) { final long cutoffTime = cutoffTimeSupplier.get(); // Load Metadata of bucket snapshot - loadMetaDataFuture = bucketSnapshotStorage.getBucketSnapshotMetadata(bucketId) + loadMetaDataFuture = executeWithRetry(() -> bucketSnapshotStorage.getBucketSnapshotMetadata(bucketId), + BucketSnapshotPersistenceException.class, MaxRetryTimes) .thenApply(snapshotMetadata -> { List metadataList = snapshotMetadata.getMetadataListList(); @@ -95,7 +97,9 @@ private CompletableFuture> asyncLoadNextBucketSnapshotEntry(b return CompletableFuture.completedFuture(null); } - return bucketSnapshotStorage.getBucketSnapshotSegment(bucketId, nextSegmentEntryId, nextSegmentEntryId) + return executeWithRetry( + () -> bucketSnapshotStorage.getBucketSnapshotSegment(bucketId, nextSegmentEntryId, + nextSegmentEntryId), BucketSnapshotPersistenceException.class, MaxRetryTimes) .thenApply(bucketSnapshotSegments -> { if (CollectionUtils.isEmpty(bucketSnapshotSegments)) { return Collections.emptyList(); @@ -142,7 +146,8 @@ CompletableFuture asyncDeleteBucketSnapshot() { String bucketKey = bucketKey(); long bucketId = getAndUpdateBucketId(); return removeBucketCursorProperty(bucketKey).thenCompose(__ -> - bucketSnapshotStorage.deleteBucketSnapshot(bucketId)).whenComplete((__, ex) -> { + executeWithRetry(() -> bucketSnapshotStorage.deleteBucketSnapshot(bucketId), + BucketSnapshotPersistenceException.class, MaxRetryTimes)).whenComplete((__, ex) -> { if (ex != null) { log.warn("Failed to delete bucket snapshot, bucketId: {}, bucketKey: {}", bucketId, bucketKey, ex); From 3f749a0570562a9b642648e3f61286d886a0cf9b Mon Sep 17 00:00:00 2001 From: coderzc Date: Tue, 21 Feb 2023 15:45:41 +0800 Subject: [PATCH 02/14] improve logs --- .../BookkeeperBucketSnapshotStorage.java | 12 ++-- .../pulsar/broker/delayed/bucket/Bucket.java | 28 ++++++---- .../bucket/BucketDelayedDeliveryTracker.java | 43 +++++++++++---- .../delayed/bucket/ImmutableBucket.java | 55 +++++++++++++------ .../broker/delayed/bucket/MutableBucket.java | 16 ++++-- 5 files changed, 104 insertions(+), 50 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java index 1cadc6d98e268..7dd6266e2115c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BookkeeperBucketSnapshotStorage.java @@ -154,7 +154,7 @@ private CompletableFuture createLedger(String bucketKey) { LedgerPassword, (rc, handle, ctx) -> { if (rc != BKException.Code.OK) { - future.completeExceptionally(bkException("Failed to create ledger", rc, -1)); + future.completeExceptionally(bkException("Create ledger", rc, -1)); } else { future.complete(handle); } @@ -170,7 +170,7 @@ private CompletableFuture openLedger(Long ledgerId) { LedgerPassword, (rc, handle, ctx) -> { if (rc != BKException.Code.OK) { - future.completeExceptionally(bkException("Failed to open ledger", rc, ledgerId)); + future.completeExceptionally(bkException("Open ledger", rc, ledgerId)); } else { future.complete(handle); } @@ -184,7 +184,7 @@ private CompletableFuture closeLedger(LedgerHandle ledgerHandle) { ledgerHandle.asyncClose((rc, handle, ctx) -> { if (rc != BKException.Code.OK) { log.warn("Failed to close a Ledger Handle: {}", ledgerHandle.getId()); - future.completeExceptionally(bkException("Failed to close ledger", rc, ledgerHandle.getId())); + future.completeExceptionally(bkException("Close ledger", rc, ledgerHandle.getId())); } else { future.complete(null); } @@ -197,7 +197,7 @@ private CompletableFuture addEntry(LedgerHandle ledgerHandle, byte[] data) ledgerHandle.asyncAddEntry(data, (rc, handle, entryId, ctx) -> { if (rc != BKException.Code.OK) { - future.completeExceptionally(bkException("Failed to add entry", rc, ledgerHandle.getId())); + future.completeExceptionally(bkException("Add entry", rc, ledgerHandle.getId())); } else { future.complete(null); } @@ -217,7 +217,7 @@ CompletableFuture> getLedgerEntryThenCloseLedger(Ledger ledger.asyncReadEntries(firstEntryId, lastEntryId, (rc, handle, entries, ctx) -> { if (rc != BKException.Code.OK) { - future.completeExceptionally(bkException("Failed to read entry", rc, ledger.getId())); + future.completeExceptionally(bkException("Read entry", rc, ledger.getId())); } else { future.complete(entries); } @@ -231,7 +231,7 @@ private CompletableFuture deleteLedger(long ledgerId) { CompletableFuture future = new CompletableFuture<>(); bookKeeper.asyncDeleteLedger(ledgerId, (int rc, Object cnx) -> { if (rc != BKException.Code.OK) { - future.completeExceptionally(bkException("Failed to delete ledger", rc, ledgerId)); + future.completeExceptionally(bkException("Delete ledger", rc, ledgerId)); } else { future.complete(null); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java index f932a243607d9..d82dad076bf63 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java @@ -28,9 +28,9 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat; +import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.roaringbitmap.RoaringBitmap; @Slf4j @@ -42,7 +42,7 @@ abstract class Bucket { static final String DELIMITER = "_"; static final int MaxRetryTimes = 3; - protected final ManagedCursor cursor; + protected final PersistentDispatcherMultipleConsumers dispatcher; protected final BucketSnapshotStorage bucketSnapshotStorage; long startLedgerId; @@ -63,8 +63,9 @@ abstract class Bucket { private volatile CompletableFuture snapshotCreateFuture; - Bucket(ManagedCursor cursor, BucketSnapshotStorage storage, long startLedgerId, long endLedgerId) { - this(cursor, storage, startLedgerId, endLedgerId, new HashMap<>(), -1, -1, 0, 0, null, null); + Bucket(PersistentDispatcherMultipleConsumers dispatcher, + BucketSnapshotStorage storage, long startLedgerId, long endLedgerId) { + this(dispatcher, storage, startLedgerId, endLedgerId, new HashMap<>(), -1, -1, 0, 0, null, null); } boolean containsMessage(long ledgerId, long entryId) { @@ -116,7 +117,7 @@ long getAndUpdateBucketId() { return bucketIdOptional.get(); } - String bucketIdStr = cursor.getCursorProperties().get(bucketKey()); + String bucketIdStr = dispatcher.getCursor().getCursorProperties().get(bucketKey()); long bucketId = Long.parseLong(bucketIdStr); setBucketId(bucketId); return bucketId; @@ -127,13 +128,18 @@ CompletableFuture asyncSaveBucketSnapshot( List bucketSnapshotSegments) { final String bucketKey = bucket.bucketKey(); return executeWithRetry( - () -> bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata, bucketSnapshotSegments, bucketKey), - BucketSnapshotPersistenceException.class, MaxRetryTimes).thenCompose(newBucketId -> { + () -> bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata, bucketSnapshotSegments, bucketKey) + .whenComplete((__, ex) -> { + if (ex != null) { + log.warn("[{}] Failed to create bucket snapshot. bucketKey: {}, bucketId: {}", + dispatcher.getName(), bucketKey, bucketId, ex); + } + }), BucketSnapshotPersistenceException.class, MaxRetryTimes).thenCompose(newBucketId -> { bucket.setBucketId(newBucketId); return putBucketKeyId(bucketKey, newBucketId).exceptionally(ex -> { - log.warn("Failed to record bucketId to cursor property, bucketKey: {}, bucketId: {}", - bucketKey, bucketId); + log.warn("[{}] Failed to record bucketId to cursor property, bucketKey: {}, bucketId: {}", + dispatcher.getName(), bucketKey, bucketId); return null; }).thenApply(__ -> newBucketId); }); @@ -141,12 +147,12 @@ CompletableFuture asyncSaveBucketSnapshot( private CompletableFuture putBucketKeyId(String bucketKey, Long bucketId) { Objects.requireNonNull(bucketId); - return executeWithRetry(() -> cursor.putCursorProperty(bucketKey, String.valueOf(bucketId)), + return executeWithRetry(() -> dispatcher.getCursor().putCursorProperty(bucketKey, String.valueOf(bucketId)), ManagedLedgerException.BadVersionException.class, MaxRetryTimes); } protected CompletableFuture removeBucketCursorProperty(String bucketKey) { - return executeWithRetry(() -> cursor.removeCursorProperty(bucketKey), + return executeWithRetry(() -> dispatcher.getCursor().removeCursorProperty(bucketKey), ManagedLedgerException.BadVersionException.class, MaxRetryTimes); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index 77c1dfb1eea14..f3817ff3b00d7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELAYED_BUCKET_KEY_PREFIX; import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELIMITER; +import static org.apache.pulsar.broker.delayed.bucket.Bucket.MaxRetryTimes; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.HashBasedTable; import com.google.common.collect.Range; @@ -60,7 +61,7 @@ @ThreadSafe public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker { - static final int AsyncOperationTimeoutSeconds = 30; + static final int AsyncOperationTimeoutSeconds = 60; private final long minIndexCountPerBucket; @@ -103,20 +104,19 @@ public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispat this.sharedBucketPriorityQueue = new TripleLongPriorityQueue(); this.immutableBuckets = TreeRangeMap.create(); this.snapshotSegmentLastIndexTable = HashBasedTable.create(); - ManagedCursor cursor = dispatcher.getCursor(); - this.lastMutableBucket = new MutableBucket(cursor, bucketSnapshotStorage); + this.lastMutableBucket = new MutableBucket(dispatcher, bucketSnapshotStorage); this.numberDelayedMessages = recoverBucketSnapshot(); } private synchronized long recoverBucketSnapshot() throws RuntimeException { - ManagedCursor cursor = this.lastMutableBucket.cursor; + ManagedCursor cursor = this.lastMutableBucket.dispatcher.getCursor(); Map, ImmutableBucket> toBeDeletedBucketMap = new HashMap<>(); cursor.getCursorProperties().keySet().forEach(key -> { if (key.startsWith(DELAYED_BUCKET_KEY_PREFIX)) { String[] keys = key.split(DELIMITER); checkArgument(keys.length == 3); ImmutableBucket immutableBucket = - new ImmutableBucket(cursor, this.lastMutableBucket.bucketSnapshotStorage, + new ImmutableBucket(dispatcher, this.lastMutableBucket.bucketSnapshotStorage, Long.parseLong(keys[1]), Long.parseLong(keys[2])); putAndCleanOverlapRange(Range.closed(immutableBucket.startLedgerId, immutableBucket.endLedgerId), immutableBucket, toBeDeletedBucketMap); @@ -125,6 +125,8 @@ private synchronized long recoverBucketSnapshot() throws RuntimeException { Map, ImmutableBucket> immutableBucketMap = immutableBuckets.asMapOfRanges(); if (immutableBucketMap.isEmpty()) { + log.info("[{}] Recover delayed message index bucket snapshot finish, don't find bucket snapshot", + dispatcher.getName()); return 0; } @@ -262,7 +264,7 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver if (immutableBuckets.asMapOfRanges().size() > maxNumBuckets) { try { - asyncMergeBucketSnapshot().get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS); + asyncMergeBucketSnapshot().get(2 * AsyncOperationTimeoutSeconds * MaxRetryTimes, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); @@ -306,7 +308,19 @@ private synchronized CompletableFuture asyncMergeBucketSnapshot() { minIndex = i; } } - return asyncMergeBucketSnapshot(values.get(minIndex), values.get(minIndex + 1)); + ImmutableBucket immutableBucketA = values.get(minIndex); + ImmutableBucket immutableBucketB = values.get(minIndex + 1); + log.info("[{}] Merging bucket snapshot, bucketAKey: {}, bucketBKey: {}", dispatcher.getName(), + immutableBucketA.bucketKey(), immutableBucketB.bucketKey()); + return asyncMergeBucketSnapshot(immutableBucketA, immutableBucketB).whenComplete((__, ex) -> { + if (ex != null) { + log.error("[{}] Failed to merge bucket snapshot, bucketAKey: {}, bucketBKey: {}", + dispatcher.getName(), immutableBucketA.bucketKey(), immutableBucketB.bucketKey(), ex); + } else { + log.error("[{}] Merge bucket snapshot finish, bucketAKey: {}, bucketBKey: {}", + dispatcher.getName(), immutableBucketA.bucketKey(), immutableBucketB.bucketKey()); + } + }); } private synchronized CompletableFuture asyncMergeBucketSnapshot(ImmutableBucket bucketA, @@ -399,9 +413,8 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa ImmutableBucket bucket = snapshotSegmentLastIndexTable.remove(ledgerId, entryId); if (bucket != null && immutableBuckets.asMapOfRanges().containsValue(bucket)) { - if (log.isDebugEnabled()) { - log.debug("[{}] Load next snapshot segment, bucket: {}", dispatcher.getName(), bucket); - } + log.info("[{}] Loading next bucket snapshot segment, bucketKey: {}, nextSegmentEntryId: {}", + dispatcher.getName(), bucket.bucketKey(), bucket.currentSegmentEntryId + 1); // All message of current snapshot segment are scheduled, load next snapshot segment // TODO make it asynchronous and not blocking this process try { @@ -419,7 +432,15 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(), index.getEntryId()); } - }).get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS); + }).whenComplete((__, ex) -> { + if (ex != null) { + log.error("[{}] Failed to load bucket snapshot segment, bucketKey: {}", + dispatcher.getName(), bucket.bucketKey(), ex); + } else { + log.info("[{}] Load next bucket snapshot segment finish, bucketKey: {}, segmentEntryId: {}", + dispatcher.getName(), bucket.bucketKey(), bucket.currentSegmentEntryId); + } + }).get(AsyncOperationTimeoutSeconds * MaxRetryTimes, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { // TODO make this segment load again throw new RuntimeException(e); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java index 15f726a0eb1d9..315531d6617fb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java @@ -30,19 +30,20 @@ import java.util.concurrent.TimeoutException; import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.mutable.MutableLong; import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat; import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex; import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata; +import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.roaringbitmap.RoaringBitmap; import org.roaringbitmap.buffer.ImmutableRoaringBitmap; @Slf4j class ImmutableBucket extends Bucket { - ImmutableBucket(ManagedCursor cursor, BucketSnapshotStorage storage, long startLedgerId, long endLedgerId) { - super(cursor, storage, startLedgerId, endLedgerId); + ImmutableBucket(PersistentDispatcherMultipleConsumers dispatcher, + BucketSnapshotStorage storage, long startLedgerId, long endLedgerId) { + super(dispatcher, storage, startLedgerId, endLedgerId); } CompletableFuture> asyncLoadNextBucketSnapshotEntry() { @@ -55,10 +56,6 @@ CompletableFuture> asyncRecoverBucketSnapshotEntry(Supplier> asyncLoadNextBucketSnapshotEntry(boolean isRecover, Supplier cutoffTimeSupplier) { - if (log.isDebugEnabled()) { - log.debug("[{}] Load next bucket snapshot data, bucket: {}", cursor.getName(), this); - } - // Wait bucket snapshot create finish CompletableFuture snapshotCreateFuture = getSnapshotCreateFuture().orElseGet(() -> CompletableFuture.completedFuture(null)) @@ -70,8 +67,15 @@ private CompletableFuture> asyncLoadNextBucketSnapshotEntry(b if (isRecover) { final long cutoffTime = cutoffTimeSupplier.get(); // Load Metadata of bucket snapshot - loadMetaDataFuture = executeWithRetry(() -> bucketSnapshotStorage.getBucketSnapshotMetadata(bucketId), - BucketSnapshotPersistenceException.class, MaxRetryTimes) + final String bucketKey = bucketKey(); + loadMetaDataFuture = executeWithRetry(() -> bucketSnapshotStorage.getBucketSnapshotMetadata(bucketId) + .whenComplete((___, ex) -> { + if (ex != null) { + log.warn("[{}] Failed to get bucket snapshot metadata," + + " bucketKey: {}, bucketId: {}", + dispatcher.getName(), bucketKey, bucketId, ex); + } + }), BucketSnapshotPersistenceException.class, MaxRetryTimes) .thenApply(snapshotMetadata -> { List metadataList = snapshotMetadata.getMetadataListList(); @@ -99,7 +103,12 @@ private CompletableFuture> asyncLoadNextBucketSnapshotEntry(b return executeWithRetry( () -> bucketSnapshotStorage.getBucketSnapshotSegment(bucketId, nextSegmentEntryId, - nextSegmentEntryId), BucketSnapshotPersistenceException.class, MaxRetryTimes) + nextSegmentEntryId).whenComplete((___, ex) -> { + if (ex != null) { + log.warn("[{}] Failed to get bucket snapshot segment. bucketKey: {}, bucketId: {}", + dispatcher.getName(), bucketKey(), bucketId, ex); + } + }), BucketSnapshotPersistenceException.class, MaxRetryTimes) .thenApply(bucketSnapshotSegments -> { if (CollectionUtils.isEmpty(bucketSnapshotSegments)) { return Collections.emptyList(); @@ -138,19 +147,31 @@ private void recoverDelayedIndexBitMapAndNumber(int startSnapshotIndex, } CompletableFuture> getRemainSnapshotSegment() { - return bucketSnapshotStorage.getBucketSnapshotSegment(getAndUpdateBucketId(), currentSegmentEntryId, - lastSegmentEntryId); + return executeWithRetry(() -> { + return bucketSnapshotStorage.getBucketSnapshotSegment(getAndUpdateBucketId(), currentSegmentEntryId, + lastSegmentEntryId).whenComplete((__, ex) -> { + if (ex != null) { + log.warn("[{}] Failed to get remain bucket snapshot segment, bucketKey: {}.", + dispatcher.getName(), bucketKey(), ex); + } + }); + }, BucketSnapshotPersistenceException.class, MaxRetryTimes); } CompletableFuture asyncDeleteBucketSnapshot() { String bucketKey = bucketKey(); long bucketId = getAndUpdateBucketId(); return removeBucketCursorProperty(bucketKey).thenCompose(__ -> - executeWithRetry(() -> bucketSnapshotStorage.deleteBucketSnapshot(bucketId), + executeWithRetry(() -> bucketSnapshotStorage.deleteBucketSnapshot(bucketId).whenComplete((___, ex) -> { + if (ex != null) { + log.warn("[{}] Failed to delete bucket snapshot. bucketKey: {}, bucketId: {}", + dispatcher.getName(), bucketKey, bucketId, ex); + } + }), BucketSnapshotPersistenceException.class, MaxRetryTimes)).whenComplete((__, ex) -> { if (ex != null) { - log.warn("Failed to delete bucket snapshot, bucketId: {}, bucketKey: {}", - bucketId, bucketKey, ex); + log.warn("[{}] Failed to delete bucket snapshot, bucketId: {}, bucketKey: {}", + dispatcher.getName(), bucketId, bucketKey, ex); } }); } @@ -172,8 +193,8 @@ void clear(boolean delete) { try { snapshotGenerateFuture.get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS); } catch (Exception e) { - log.warn("Failed wait to snapshot generate, bucketId: {}, bucketKey: {}", getBucketId(), - bucketKey()); + log.warn("[{}] Failed wait to snapshot generate, bucketId: {}, bucketKey: {}", dispatcher.getName(), + getBucketId(), bucketKey()); } } }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java index ad457329c427f..5d80bab2a3812 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java @@ -28,12 +28,12 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex; import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata; import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment; import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata; +import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue; import org.roaringbitmap.RoaringBitmap; @@ -42,9 +42,9 @@ class MutableBucket extends Bucket implements AutoCloseable { private final TripleLongPriorityQueue priorityQueue; - MutableBucket(ManagedCursor cursor, + MutableBucket(PersistentDispatcherMultipleConsumers dispatcher, BucketSnapshotStorage bucketSnapshotStorage) { - super(cursor, bucketSnapshotStorage, -1L, -1L); + super(dispatcher, bucketSnapshotStorage, -1L, -1L); this.priorityQueue = new TripleLongPriorityQueue(); } @@ -59,6 +59,9 @@ Pair createImmutableBucketAndAsyncPersistent( final long timeStepPerBucketSnapshotSegment, TripleLongPriorityQueue sharedQueue, DelayedIndexQueue delayedIndexQueue, final long startLedgerId, final long endLedgerId) { + log.info("[{}] Creating bucket snapshot, startLedgerId: {}, endLedgerId: {}", dispatcher.getName(), + startLedgerId, endLedgerId); + if (delayedIndexQueue.isEmpty()) { return null; } @@ -122,7 +125,7 @@ Pair createImmutableBucketAndAsyncPersistent( final int lastSegmentEntryId = segmentMetadataList.size(); - ImmutableBucket bucket = new ImmutableBucket(cursor, bucketSnapshotStorage, startLedgerId, endLedgerId); + ImmutableBucket bucket = new ImmutableBucket(dispatcher, bucketSnapshotStorage, startLedgerId, endLedgerId); bucket.setCurrentSegmentEntryId(1); bucket.setNumberBucketDelayedMessages(numMessages); bucket.setLastSegmentEntryId(lastSegmentEntryId); @@ -139,7 +142,10 @@ Pair createImmutableBucketAndAsyncPersistent( future.whenComplete((__, ex) -> { if (ex != null) { //TODO Record create snapshot failed - log.error("Failed to create snapshot: ", ex); + log.error("[{}] Failed to create bucket snapshot, bucketKey: {}, ex: ", + dispatcher.getName(), bucketKey(), ex); + } else { + log.info("[{}] Creat bucket snapshot finish, bucketKey: {}", dispatcher.getName(), bucket.bucketKey()); } }); From 43c165fb56db6a417a5c08a933ae0f5061e7e4bf Mon Sep 17 00:00:00 2001 From: coderzc Date: Thu, 23 Feb 2023 13:44:59 +0800 Subject: [PATCH 03/14] fix executeWithRetry --- .../main/java/org/apache/bookkeeper/mledger/util/Futures.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java index dc1d1eb6c9ac5..f5ad77a71d8c4 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java @@ -24,6 +24,7 @@ import java.util.function.Supplier; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.pulsar.common.util.FutureUtil; /** * Conveniences to use with {@link CompletableFuture}. @@ -78,7 +79,8 @@ public static CompletableFuture executeWithRetry(Supplier 0) { + Throwable throwable = FutureUtil.unwrapCompletionException(ex); + if (needRetryExceptionClass.isAssignableFrom(throwable.getClass()) && maxRetryTimes > 0) { executeWithRetry(op, needRetryExceptionClass, maxRetryTimes - 1).whenComplete((res2, ex2) -> { if (ex2 == null) { resultFuture.complete(res2); From d5cf128e8c64a5d8119308e0e453a8f391652efe Mon Sep 17 00:00:00 2001 From: coderzc Date: Thu, 23 Feb 2023 16:07:59 +0800 Subject: [PATCH 04/14] add test --- .../delayed/MockBucketSnapshotStorage.java | 41 ++++++++++++++ .../broker/delayed/MockManagedCursor.java | 2 +- .../BucketDelayedDeliveryTrackerTest.java | 55 ++++++++++++++++++- 3 files changed, 96 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java index 9b2fbda4195da..d49c9280d8629 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java @@ -23,8 +23,10 @@ import io.netty.buffer.PooledByteBufAllocator; import io.netty.util.concurrent.DefaultThreadFactory; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -36,6 +38,7 @@ import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage; import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata; import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment; +import org.apache.pulsar.common.util.FutureUtil; @Slf4j public class MockBucketSnapshotStorage implements BucketSnapshotStorage { @@ -53,9 +56,35 @@ public MockBucketSnapshotStorage() { this.maxBucketId = new AtomicLong(); } + public Queue createExceptionQueue = new LinkedList<>(); + public Queue getMetaDataExceptionQueue = new LinkedList<>(); + public Queue getSegmentExceptionQueue = new LinkedList<>(); + public Queue deleteExceptionQueue = new LinkedList<>(); + + + public void injectCreateException(Throwable throwable) { + createExceptionQueue.add(throwable); + } + + public void injectGetMetaDataExceptionQueue(Throwable throwable) { + getMetaDataExceptionQueue.add(throwable); + } + + public void injectGetSegmentExceptionQueue(Throwable throwable) { + getSegmentExceptionQueue.add(throwable); + } + + public void injectDeleteExceptionQueue(Throwable throwable) { + deleteExceptionQueue.add(throwable); + } + @Override public CompletableFuture createBucketSnapshot( SnapshotMetadata snapshotMetadata, List bucketSnapshotSegments, String bucketKey) { + Throwable throwable = createExceptionQueue.poll(); + if (throwable != null) { + return FutureUtil.failedFuture(throwable); + } return CompletableFuture.supplyAsync(() -> { long bucketId = maxBucketId.getAndIncrement(); List entries = new ArrayList<>(); @@ -81,6 +110,10 @@ public CompletableFuture createBucketSnapshot( @Override public CompletableFuture getBucketSnapshotMetadata(long bucketId) { + Throwable throwable = getMetaDataExceptionQueue.poll(); + if (throwable != null) { + return FutureUtil.failedFuture(throwable); + } return CompletableFuture.supplyAsync(() -> { ByteBuf byteBuf = this.bucketSnapshots.get(bucketId).get(0); SnapshotMetadata snapshotMetadata; @@ -96,6 +129,10 @@ public CompletableFuture getBucketSnapshotMetadata(long bucket @Override public CompletableFuture> getBucketSnapshotSegment(long bucketId, long firstSegmentEntryId, long lastSegmentEntryId) { + Throwable throwable = getSegmentExceptionQueue.poll(); + if (throwable != null) { + return FutureUtil.failedFuture(throwable); + } return CompletableFuture.supplyAsync(() -> { List snapshotSegments = new ArrayList<>(); long lastEntryId = Math.min(lastSegmentEntryId, this.bucketSnapshots.get(bucketId).size()); @@ -115,6 +152,10 @@ public CompletableFuture> getBucketSnapshotSegment(long bu @Override public CompletableFuture deleteBucketSnapshot(long bucketId) { + Throwable throwable = deleteExceptionQueue.poll(); + if (throwable != null) { + return FutureUtil.failedFuture(throwable); + } return CompletableFuture.supplyAsync(() -> { List remove = this.bucketSnapshots.remove(bucketId); if (remove != null) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java index efb0fa7ab7ba2..499262c1e60b9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java @@ -46,7 +46,7 @@ public MockManagedCursor(String name) { @Override public String getName() { - return null; + return this.name; } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java index 0a2a76ec339eb..f7c983defa541 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java @@ -76,6 +76,7 @@ public Object[][] provider(Method method) throws Exception { bucketSnapshotStorage.start(); ManagedCursor cursor = new MockManagedCursor("my_test_cursor"); doReturn(cursor).when(dispatcher).getCursor(); + doReturn(cursor.getName()).when(dispatcher).getName(); final String methodName = method.getName(); return switch (methodName) { @@ -135,7 +136,7 @@ public Object[][] provider(Method method) throws Exception { new BucketDelayedDeliveryTracker(dispatcher, timer, 500, clock, true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50) }}; - case "testMergeSnapshot" -> new Object[][]{{ + case "testMergeSnapshot", "testWithBkException" -> new Object[][]{{ new BucketDelayedDeliveryTracker(dispatcher, timer, 100000, clock, true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 10) }}; @@ -253,5 +254,57 @@ public void testMergeSnapshot(BucketDelayedDeliveryTracker tracker) { int size = tracker.getImmutableBuckets().asMapOfRanges().size(); assertEquals(10, size); + + tracker.addMessage(111, 1011, 111 * 10); + + tracker.close(); + + tracker = new BucketDelayedDeliveryTracker(dispatcher, timer, 1000, clock, + true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 10); + + assertEquals(110, tracker.getNumberOfDelayedMessages()); + } + + @Test(dataProvider = "delayedTracker") + public void testWithBkException(BucketDelayedDeliveryTracker tracker) { + MockBucketSnapshotStorage mockBucketSnapshotStorage = (MockBucketSnapshotStorage) bucketSnapshotStorage; + mockBucketSnapshotStorage.injectCreateException( + new BucketSnapshotPersistenceException("Bookie operation timeout, op: Create entry")); + mockBucketSnapshotStorage.injectGetMetaDataExceptionQueue( + new BucketSnapshotPersistenceException("Bookie operation timeout, op: Get entry")); + mockBucketSnapshotStorage.injectGetSegmentExceptionQueue( + new BucketSnapshotPersistenceException("Bookie operation timeout, op: Get entry")); + mockBucketSnapshotStorage.injectDeleteExceptionQueue( + new BucketSnapshotPersistenceException("Bookie operation timeout, op: Delete entry")); + + for (int i = 1; i <= 110; i++) { + tracker.addMessage(i, i, i * 10); + } + + assertEquals(110, tracker.getNumberOfDelayedMessages()); + + int size = tracker.getImmutableBuckets().asMapOfRanges().size(); + + assertEquals(10, size); + + tracker.addMessage(111, 1011, 111 * 10); + + tracker.close(); + + clockTime.set(30 * 10); + + tracker = new BucketDelayedDeliveryTracker(dispatcher, timer, 1000, clock, + true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 10); + + assertEquals(80, tracker.getNumberOfDelayedMessages()); + + clockTime.set(110 * 10); + + assertEquals(tracker.getScheduledMessages(100).size(), 80); + + assertTrue(mockBucketSnapshotStorage.createExceptionQueue.isEmpty()); + assertTrue(mockBucketSnapshotStorage.getMetaDataExceptionQueue.isEmpty()); + assertTrue(mockBucketSnapshotStorage.getSegmentExceptionQueue.isEmpty()); + assertTrue(mockBucketSnapshotStorage.deleteExceptionQueue.isEmpty()); } } From 82178f00c4b98006f087fc92837bb3ef83cc17cc Mon Sep 17 00:00:00 2001 From: coderzc Date: Thu, 23 Feb 2023 16:39:04 +0800 Subject: [PATCH 05/14] improve test --- .../pulsar/broker/delayed/bucket/ImmutableBucket.java | 3 +++ .../broker/delayed/MockBucketSnapshotStorage.java | 6 +++--- .../bucket/BucketDelayedDeliveryTrackerTest.java | 11 ++++++++--- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java index 315531d6617fb..cdddbd953a261 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java @@ -172,6 +172,9 @@ CompletableFuture asyncDeleteBucketSnapshot() { if (ex != null) { log.warn("[{}] Failed to delete bucket snapshot, bucketId: {}, bucketKey: {}", dispatcher.getName(), bucketId, bucketKey, ex); + } else { + log.info("[{}] Delete bucket snapshot finish, bucketId: {}, bucketKey: {}", + dispatcher.getName(), bucketId, bucketKey); } }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java index d49c9280d8629..b106642915587 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockBucketSnapshotStorage.java @@ -66,15 +66,15 @@ public void injectCreateException(Throwable throwable) { createExceptionQueue.add(throwable); } - public void injectGetMetaDataExceptionQueue(Throwable throwable) { + public void injectGetMetaDataException(Throwable throwable) { getMetaDataExceptionQueue.add(throwable); } - public void injectGetSegmentExceptionQueue(Throwable throwable) { + public void injectGetSegmentException(Throwable throwable) { getSegmentExceptionQueue.add(throwable); } - public void injectDeleteExceptionQueue(Throwable throwable) { + public void injectDeleteException(Throwable throwable) { deleteExceptionQueue.add(throwable); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java index f7c983defa541..2ef7dff90a243 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java @@ -270,13 +270,18 @@ public void testWithBkException(BucketDelayedDeliveryTracker tracker) { MockBucketSnapshotStorage mockBucketSnapshotStorage = (MockBucketSnapshotStorage) bucketSnapshotStorage; mockBucketSnapshotStorage.injectCreateException( new BucketSnapshotPersistenceException("Bookie operation timeout, op: Create entry")); - mockBucketSnapshotStorage.injectGetMetaDataExceptionQueue( + mockBucketSnapshotStorage.injectGetMetaDataException( new BucketSnapshotPersistenceException("Bookie operation timeout, op: Get entry")); - mockBucketSnapshotStorage.injectGetSegmentExceptionQueue( + mockBucketSnapshotStorage.injectGetSegmentException( new BucketSnapshotPersistenceException("Bookie operation timeout, op: Get entry")); - mockBucketSnapshotStorage.injectDeleteExceptionQueue( + mockBucketSnapshotStorage.injectDeleteException( new BucketSnapshotPersistenceException("Bookie operation timeout, op: Delete entry")); + assertEquals(1, mockBucketSnapshotStorage.createExceptionQueue.size()); + assertEquals(1, mockBucketSnapshotStorage.getMetaDataExceptionQueue.size()); + assertEquals(1, mockBucketSnapshotStorage.getSegmentExceptionQueue.size()); + assertEquals(1, mockBucketSnapshotStorage.deleteExceptionQueue.size()); + for (int i = 1; i <= 110; i++) { tracker.addMessage(i, i, i * 10); } From 6c43792023bf3ebde632538dc4a42ec2ab15aae0 Mon Sep 17 00:00:00 2001 From: coderzc Date: Thu, 23 Feb 2023 18:54:18 +0800 Subject: [PATCH 06/14] Ignore exception to reload this segment on the next schedule. --- .../pulsar/broker/delayed/bucket/Bucket.java | 4 ++-- .../bucket/BucketDelayedDeliveryTracker.java | 22 ++++++++++++------- .../BucketDelayedDeliveryTrackerTest.java | 11 ++++++++++ 3 files changed, 27 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java index d82dad076bf63..f3dbbb7b73937 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java @@ -54,9 +54,9 @@ abstract class Bucket { int lastSegmentEntryId; - int currentSegmentEntryId; + volatile int currentSegmentEntryId; - long snapshotLength; + volatile long snapshotLength; private volatile Long bucketId; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index f3817ff3b00d7..f24203a954a86 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -406,15 +406,12 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa long ledgerId = sharedBucketPriorityQueue.peekN2(); long entryId = sharedBucketPriorityQueue.peekN3(); - positions.add(new PositionImpl(ledgerId, entryId)); - - sharedBucketPriorityQueue.pop(); - removeIndexBit(ledgerId, entryId); ImmutableBucket bucket = snapshotSegmentLastIndexTable.remove(ledgerId, entryId); if (bucket != null && immutableBuckets.asMapOfRanges().containsValue(bucket)) { + final int lastSegmentEntryId = bucket.currentSegmentEntryId; log.info("[{}] Loading next bucket snapshot segment, bucketKey: {}, nextSegmentEntryId: {}", - dispatcher.getName(), bucket.bucketKey(), bucket.currentSegmentEntryId + 1); + dispatcher.getName(), bucket.bucketKey(), lastSegmentEntryId + 1); // All message of current snapshot segment are scheduled, load next snapshot segment // TODO make it asynchronous and not blocking this process try { @@ -434,6 +431,10 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa } }).whenComplete((__, ex) -> { if (ex != null) { + // Back bucket state + snapshotSegmentLastIndexTable.put(ledgerId, entryId, bucket); + bucket.setCurrentSegmentEntryId(lastSegmentEntryId); + log.error("[{}] Failed to load bucket snapshot segment, bucketKey: {}", dispatcher.getName(), bucket.bucketKey(), ex); } else { @@ -441,12 +442,17 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa dispatcher.getName(), bucket.bucketKey(), bucket.currentSegmentEntryId); } }).get(AsyncOperationTimeoutSeconds * MaxRetryTimes, TimeUnit.SECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - // TODO make this segment load again - throw new RuntimeException(e); + } catch (Exception e) { + // Ignore exception to reload this segment on the next schedule. + break; } } + positions.add(new PositionImpl(ledgerId, entryId)); + + sharedBucketPriorityQueue.pop(); + removeIndexBit(ledgerId, entryId); + --n; --numberDelayedMessages; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java index 2ef7dff90a243..3dd7a5f574705 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java @@ -305,6 +305,17 @@ public void testWithBkException(BucketDelayedDeliveryTracker tracker) { clockTime.set(110 * 10); + mockBucketSnapshotStorage.injectGetSegmentException( + new BucketSnapshotPersistenceException("Bookie operation timeout1, op: Get entry")); + mockBucketSnapshotStorage.injectGetSegmentException( + new BucketSnapshotPersistenceException("Bookie operation timeout2, op: Get entry")); + mockBucketSnapshotStorage.injectGetSegmentException( + new BucketSnapshotPersistenceException("Bookie operation timeout3, op: Get entry")); + mockBucketSnapshotStorage.injectGetSegmentException( + new BucketSnapshotPersistenceException("Bookie operation timeout4, op: Get entry")); + + assertEquals(tracker.getScheduledMessages(100).size(), 0); + assertEquals(tracker.getScheduledMessages(100).size(), 80); assertTrue(mockBucketSnapshotStorage.createExceptionQueue.isEmpty()); From 2316e13142d803b42ac4889a68a0f38ab5858aca Mon Sep 17 00:00:00 2001 From: coderzc Date: Mon, 27 Feb 2023 18:32:30 +0800 Subject: [PATCH 07/14] improve logs --- .../bucket/BucketDelayedDeliveryTracker.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index f24203a954a86..0702adb6f32fc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -233,10 +233,6 @@ private void afterCreateImmutableBucket(Pair immu DelayedIndex lastDelayedIndex = immutableBucketDelayedIndexPair.getRight(); snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(), lastDelayedIndex.getEntryId(), immutableBucket); - if (log.isDebugEnabled()) { - log.debug("[{}] Create bucket snapshot, bucket: {}", dispatcher.getName(), - lastMutableBucket); - } } } @@ -310,8 +306,10 @@ private synchronized CompletableFuture asyncMergeBucketSnapshot() { } ImmutableBucket immutableBucketA = values.get(minIndex); ImmutableBucket immutableBucketB = values.get(minIndex + 1); - log.info("[{}] Merging bucket snapshot, bucketAKey: {}, bucketBKey: {}", dispatcher.getName(), - immutableBucketA.bucketKey(), immutableBucketB.bucketKey()); + if (log.isDebugEnabled()) { + log.info("[{}] Merging bucket snapshot, bucketAKey: {}, bucketBKey: {}", dispatcher.getName(), + immutableBucketA.bucketKey(), immutableBucketB.bucketKey()); + } return asyncMergeBucketSnapshot(immutableBucketA, immutableBucketB).whenComplete((__, ex) -> { if (ex != null) { log.error("[{}] Failed to merge bucket snapshot, bucketAKey: {}, bucketBKey: {}", @@ -410,8 +408,10 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa ImmutableBucket bucket = snapshotSegmentLastIndexTable.remove(ledgerId, entryId); if (bucket != null && immutableBuckets.asMapOfRanges().containsValue(bucket)) { final int lastSegmentEntryId = bucket.currentSegmentEntryId; - log.info("[{}] Loading next bucket snapshot segment, bucketKey: {}, nextSegmentEntryId: {}", - dispatcher.getName(), bucket.bucketKey(), lastSegmentEntryId + 1); + if (log.isDebugEnabled()) { + log.debug("[{}] Loading next bucket snapshot segment, bucketKey: {}, nextSegmentEntryId: {}", + dispatcher.getName(), bucket.bucketKey(), lastSegmentEntryId + 1); + } // All message of current snapshot segment are scheduled, load next snapshot segment // TODO make it asynchronous and not blocking this process try { From beb73a16ab1feb7b1ab4931a40c391dc26e0dd3c Mon Sep 17 00:00:00 2001 From: coderzc Date: Tue, 28 Feb 2023 11:01:45 +0800 Subject: [PATCH 08/14] only pass the dispatcherName to the Bucket. --- .../pulsar/broker/delayed/bucket/Bucket.java | 22 ++++++++++--------- .../bucket/BucketDelayedDeliveryTracker.java | 6 ++--- .../delayed/bucket/ImmutableBucket.java | 20 ++++++++--------- .../broker/delayed/bucket/MutableBucket.java | 15 +++++++------ 4 files changed, 33 insertions(+), 30 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java index f3dbbb7b73937..50b5cd12ead07 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/Bucket.java @@ -28,9 +28,9 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat; -import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.roaringbitmap.RoaringBitmap; @Slf4j @@ -42,7 +42,9 @@ abstract class Bucket { static final String DELIMITER = "_"; static final int MaxRetryTimes = 3; - protected final PersistentDispatcherMultipleConsumers dispatcher; + protected final String dispatcherName; + + protected final ManagedCursor cursor; protected final BucketSnapshotStorage bucketSnapshotStorage; long startLedgerId; @@ -63,9 +65,9 @@ abstract class Bucket { private volatile CompletableFuture snapshotCreateFuture; - Bucket(PersistentDispatcherMultipleConsumers dispatcher, + Bucket(String dispatcherName, ManagedCursor cursor, BucketSnapshotStorage storage, long startLedgerId, long endLedgerId) { - this(dispatcher, storage, startLedgerId, endLedgerId, new HashMap<>(), -1, -1, 0, 0, null, null); + this(dispatcherName, cursor, storage, startLedgerId, endLedgerId, new HashMap<>(), -1, -1, 0, 0, null, null); } boolean containsMessage(long ledgerId, long entryId) { @@ -117,7 +119,7 @@ long getAndUpdateBucketId() { return bucketIdOptional.get(); } - String bucketIdStr = dispatcher.getCursor().getCursorProperties().get(bucketKey()); + String bucketIdStr = cursor.getCursorProperties().get(bucketKey()); long bucketId = Long.parseLong(bucketIdStr); setBucketId(bucketId); return bucketId; @@ -131,15 +133,15 @@ CompletableFuture asyncSaveBucketSnapshot( () -> bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata, bucketSnapshotSegments, bucketKey) .whenComplete((__, ex) -> { if (ex != null) { - log.warn("[{}] Failed to create bucket snapshot. bucketKey: {}, bucketId: {}", - dispatcher.getName(), bucketKey, bucketId, ex); + log.warn("[{}] Failed to create bucket snapshot, bucketKey: {}", + dispatcherName, bucketKey, ex); } }), BucketSnapshotPersistenceException.class, MaxRetryTimes).thenCompose(newBucketId -> { bucket.setBucketId(newBucketId); return putBucketKeyId(bucketKey, newBucketId).exceptionally(ex -> { log.warn("[{}] Failed to record bucketId to cursor property, bucketKey: {}, bucketId: {}", - dispatcher.getName(), bucketKey, bucketId); + dispatcherName, bucketKey, newBucketId, ex); return null; }).thenApply(__ -> newBucketId); }); @@ -147,12 +149,12 @@ CompletableFuture asyncSaveBucketSnapshot( private CompletableFuture putBucketKeyId(String bucketKey, Long bucketId) { Objects.requireNonNull(bucketId); - return executeWithRetry(() -> dispatcher.getCursor().putCursorProperty(bucketKey, String.valueOf(bucketId)), + return executeWithRetry(() -> cursor.putCursorProperty(bucketKey, String.valueOf(bucketId)), ManagedLedgerException.BadVersionException.class, MaxRetryTimes); } protected CompletableFuture removeBucketCursorProperty(String bucketKey) { - return executeWithRetry(() -> dispatcher.getCursor().removeCursorProperty(bucketKey), + return executeWithRetry(() -> cursor.removeCursorProperty(bucketKey), ManagedLedgerException.BadVersionException.class, MaxRetryTimes); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index 0702adb6f32fc..2bbb3a439ac83 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -104,19 +104,19 @@ public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispat this.sharedBucketPriorityQueue = new TripleLongPriorityQueue(); this.immutableBuckets = TreeRangeMap.create(); this.snapshotSegmentLastIndexTable = HashBasedTable.create(); - this.lastMutableBucket = new MutableBucket(dispatcher, bucketSnapshotStorage); + this.lastMutableBucket = new MutableBucket(dispatcher.getName(), dispatcher.getCursor(), bucketSnapshotStorage); this.numberDelayedMessages = recoverBucketSnapshot(); } private synchronized long recoverBucketSnapshot() throws RuntimeException { - ManagedCursor cursor = this.lastMutableBucket.dispatcher.getCursor(); + ManagedCursor cursor = this.lastMutableBucket.getCursor(); Map, ImmutableBucket> toBeDeletedBucketMap = new HashMap<>(); cursor.getCursorProperties().keySet().forEach(key -> { if (key.startsWith(DELAYED_BUCKET_KEY_PREFIX)) { String[] keys = key.split(DELIMITER); checkArgument(keys.length == 3); ImmutableBucket immutableBucket = - new ImmutableBucket(dispatcher, this.lastMutableBucket.bucketSnapshotStorage, + new ImmutableBucket(dispatcher.getName(), cursor, this.lastMutableBucket.bucketSnapshotStorage, Long.parseLong(keys[1]), Long.parseLong(keys[2])); putAndCleanOverlapRange(Range.closed(immutableBucket.startLedgerId, immutableBucket.endLedgerId), immutableBucket, toBeDeletedBucketMap); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java index cdddbd953a261..65f65742bc40d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java @@ -30,20 +30,20 @@ import java.util.concurrent.TimeoutException; import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.mutable.MutableLong; import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat; import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex; import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata; -import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.roaringbitmap.RoaringBitmap; import org.roaringbitmap.buffer.ImmutableRoaringBitmap; @Slf4j class ImmutableBucket extends Bucket { - ImmutableBucket(PersistentDispatcherMultipleConsumers dispatcher, + ImmutableBucket(String dispatcherName, ManagedCursor cursor, BucketSnapshotStorage storage, long startLedgerId, long endLedgerId) { - super(dispatcher, storage, startLedgerId, endLedgerId); + super(dispatcherName, cursor, storage, startLedgerId, endLedgerId); } CompletableFuture> asyncLoadNextBucketSnapshotEntry() { @@ -73,7 +73,7 @@ private CompletableFuture> asyncLoadNextBucketSnapshotEntry(b if (ex != null) { log.warn("[{}] Failed to get bucket snapshot metadata," + " bucketKey: {}, bucketId: {}", - dispatcher.getName(), bucketKey, bucketId, ex); + dispatcherName, bucketKey, bucketId, ex); } }), BucketSnapshotPersistenceException.class, MaxRetryTimes) .thenApply(snapshotMetadata -> { @@ -106,7 +106,7 @@ private CompletableFuture> asyncLoadNextBucketSnapshotEntry(b nextSegmentEntryId).whenComplete((___, ex) -> { if (ex != null) { log.warn("[{}] Failed to get bucket snapshot segment. bucketKey: {}, bucketId: {}", - dispatcher.getName(), bucketKey(), bucketId, ex); + dispatcherName, bucketKey(), bucketId, ex); } }), BucketSnapshotPersistenceException.class, MaxRetryTimes) .thenApply(bucketSnapshotSegments -> { @@ -152,7 +152,7 @@ CompletableFuture> lastSegmentEntryId).whenComplete((__, ex) -> { if (ex != null) { log.warn("[{}] Failed to get remain bucket snapshot segment, bucketKey: {}.", - dispatcher.getName(), bucketKey(), ex); + dispatcherName, bucketKey(), ex); } }); }, BucketSnapshotPersistenceException.class, MaxRetryTimes); @@ -165,16 +165,16 @@ CompletableFuture asyncDeleteBucketSnapshot() { executeWithRetry(() -> bucketSnapshotStorage.deleteBucketSnapshot(bucketId).whenComplete((___, ex) -> { if (ex != null) { log.warn("[{}] Failed to delete bucket snapshot. bucketKey: {}, bucketId: {}", - dispatcher.getName(), bucketKey, bucketId, ex); + dispatcherName, bucketKey, bucketId, ex); } }), BucketSnapshotPersistenceException.class, MaxRetryTimes)).whenComplete((__, ex) -> { if (ex != null) { log.warn("[{}] Failed to delete bucket snapshot, bucketId: {}, bucketKey: {}", - dispatcher.getName(), bucketId, bucketKey, ex); + dispatcherName, bucketId, bucketKey, ex); } else { log.info("[{}] Delete bucket snapshot finish, bucketId: {}, bucketKey: {}", - dispatcher.getName(), bucketId, bucketKey); + dispatcherName, bucketId, bucketKey); } }); } @@ -196,7 +196,7 @@ void clear(boolean delete) { try { snapshotGenerateFuture.get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS); } catch (Exception e) { - log.warn("[{}] Failed wait to snapshot generate, bucketId: {}, bucketKey: {}", dispatcher.getName(), + log.warn("[{}] Failed wait to snapshot generate, bucketId: {}, bucketKey: {}", dispatcherName, getBucketId(), bucketKey()); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java index 5d80bab2a3812..9cba6dbb7ef77 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java @@ -28,12 +28,12 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex; import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata; import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment; import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata; -import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue; import org.roaringbitmap.RoaringBitmap; @@ -42,9 +42,9 @@ class MutableBucket extends Bucket implements AutoCloseable { private final TripleLongPriorityQueue priorityQueue; - MutableBucket(PersistentDispatcherMultipleConsumers dispatcher, + MutableBucket(String dispatcherName, ManagedCursor cursor, BucketSnapshotStorage bucketSnapshotStorage) { - super(dispatcher, bucketSnapshotStorage, -1L, -1L); + super(dispatcherName, cursor, bucketSnapshotStorage, -1L, -1L); this.priorityQueue = new TripleLongPriorityQueue(); } @@ -59,7 +59,7 @@ Pair createImmutableBucketAndAsyncPersistent( final long timeStepPerBucketSnapshotSegment, TripleLongPriorityQueue sharedQueue, DelayedIndexQueue delayedIndexQueue, final long startLedgerId, final long endLedgerId) { - log.info("[{}] Creating bucket snapshot, startLedgerId: {}, endLedgerId: {}", dispatcher.getName(), + log.info("[{}] Creating bucket snapshot, startLedgerId: {}, endLedgerId: {}", dispatcherName, startLedgerId, endLedgerId); if (delayedIndexQueue.isEmpty()) { @@ -125,7 +125,8 @@ Pair createImmutableBucketAndAsyncPersistent( final int lastSegmentEntryId = segmentMetadataList.size(); - ImmutableBucket bucket = new ImmutableBucket(dispatcher, bucketSnapshotStorage, startLedgerId, endLedgerId); + ImmutableBucket bucket = new ImmutableBucket(dispatcherName, cursor, bucketSnapshotStorage, + startLedgerId, endLedgerId); bucket.setCurrentSegmentEntryId(1); bucket.setNumberBucketDelayedMessages(numMessages); bucket.setLastSegmentEntryId(lastSegmentEntryId); @@ -143,9 +144,9 @@ Pair createImmutableBucketAndAsyncPersistent( if (ex != null) { //TODO Record create snapshot failed log.error("[{}] Failed to create bucket snapshot, bucketKey: {}, ex: ", - dispatcher.getName(), bucketKey(), ex); + dispatcherName, bucketKey(), ex); } else { - log.info("[{}] Creat bucket snapshot finish, bucketKey: {}", dispatcher.getName(), bucket.bucketKey()); + log.info("[{}] Creat bucket snapshot finish, bucketKey: {}", dispatcherName, bucket.bucketKey()); } }); From 99e6ee98225b2920be695d8682f04bdc85c438eb Mon Sep 17 00:00:00 2001 From: coderzc Date: Wed, 1 Mar 2023 10:33:54 +0800 Subject: [PATCH 09/14] improve code --- .../broker/delayed/bucket/BucketDelayedDeliveryTracker.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index 8561e3cc7a567..12f2a29a313d6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -438,10 +438,10 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa ImmutableBucket bucket = snapshotSegmentLastIndexTable.remove(ledgerId, entryId); if (bucket != null && immutableBuckets.asMapOfRanges().containsValue(bucket)) { - final int lastSegmentEntryId = bucket.currentSegmentEntryId; + final int preSegmentEntryId = bucket.currentSegmentEntryId; if (log.isDebugEnabled()) { log.debug("[{}] Loading next bucket snapshot segment, bucketKey: {}, nextSegmentEntryId: {}", - dispatcher.getName(), bucket.bucketKey(), lastSegmentEntryId + 1); + dispatcher.getName(), bucket.bucketKey(), preSegmentEntryId + 1); } // All message of current snapshot segment are scheduled, load next snapshot segment // TODO make it asynchronous and not blocking this process @@ -464,7 +464,7 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa if (ex != null) { // Back bucket state snapshotSegmentLastIndexTable.put(ledgerId, entryId, bucket); - bucket.setCurrentSegmentEntryId(lastSegmentEntryId); + bucket.setCurrentSegmentEntryId(preSegmentEntryId); log.error("[{}] Failed to load bucket snapshot segment, bucketKey: {}", dispatcher.getName(), bucket.bucketKey(), ex); From 4a22c2b031f8593569d027d128aa2e3fb18c33db Mon Sep 17 00:00:00 2001 From: coderzc Date: Wed, 1 Mar 2023 13:43:44 +0800 Subject: [PATCH 10/14] fix test --- .../BucketDelayedDeliveryTrackerTest.java | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java index 79d040868d92f..e0d62f682708d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java @@ -50,6 +50,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.roaringbitmap.RoaringBitmap; import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.testcontainers.shaded.org.apache.commons.lang3.mutable.MutableLong; import org.testng.annotations.AfterMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -258,12 +259,21 @@ public void testMergeSnapshot(BucketDelayedDeliveryTracker tracker) { tracker.addMessage(111, 1011, 111 * 10); + MutableLong delayedMessagesInSnapshot = new MutableLong(); + tracker.getImmutableBuckets().asMapOfRanges().forEach((k, v) -> { + delayedMessagesInSnapshot.add(v.getNumberBucketDelayedMessages()); + }); + tracker.close(); tracker = new BucketDelayedDeliveryTracker(dispatcher, timer, 1000, clock, true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 10); - assertEquals(110, tracker.getNumberOfDelayedMessages()); + assertEquals(tracker.getNumberOfDelayedMessages(), delayedMessagesInSnapshot.getValue()); + + for (int i = 1; i <= 110; i++) { + tracker.addMessage(i, i, i * 10); + } clockTime.set(110 * 10); @@ -303,14 +313,18 @@ public void testWithBkException(BucketDelayedDeliveryTracker tracker) { tracker.addMessage(111, 1011, 111 * 10); - tracker.close(); + MutableLong delayedMessagesInSnapshot = new MutableLong(); + tracker.getImmutableBuckets().asMapOfRanges().forEach((k, v) -> { + delayedMessagesInSnapshot.add(v.getNumberBucketDelayedMessages()); + }); - clockTime.set(30 * 10); + tracker.close(); tracker = new BucketDelayedDeliveryTracker(dispatcher, timer, 1000, clock, true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 10); - assertEquals(80, tracker.getNumberOfDelayedMessages()); + Long delayedMessagesInSnapshotValue = delayedMessagesInSnapshot.getValue(); + assertEquals(tracker.getNumberOfDelayedMessages(), delayedMessagesInSnapshotValue); clockTime.set(110 * 10); @@ -325,7 +339,7 @@ public void testWithBkException(BucketDelayedDeliveryTracker tracker) { assertEquals(tracker.getScheduledMessages(100).size(), 0); - assertEquals(tracker.getScheduledMessages(100).size(), 80); + assertEquals(tracker.getScheduledMessages(100).size(), delayedMessagesInSnapshotValue); assertTrue(mockBucketSnapshotStorage.createExceptionQueue.isEmpty()); assertTrue(mockBucketSnapshotStorage.getMetaDataExceptionQueue.isEmpty()); From 0d40adc6ec4b805750aba8709c23e36953ac2ca6 Mon Sep 17 00:00:00 2001 From: coderzc Date: Wed, 1 Mar 2023 14:24:10 +0800 Subject: [PATCH 11/14] Put the index back into the shared queue and downgrade to memory mode --- .../bucket/BucketDelayedDeliveryTracker.java | 39 +++++++++++++++++++ .../delayed/bucket/ImmutableBucket.java | 10 +++++ .../broker/delayed/bucket/MutableBucket.java | 13 ++----- .../BucketDelayedDeliveryTrackerTest.java | 33 +++++++++++++++- 4 files changed, 85 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index 12f2a29a313d6..9916d296f2ceb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -234,6 +234,45 @@ private void afterCreateImmutableBucket(Pair immu DelayedIndex lastDelayedIndex = immutableBucketDelayedIndexPair.getRight(); snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(), lastDelayedIndex.getEntryId(), immutableBucket); + + immutableBucket.getSnapshotCreateFuture().ifPresent(createFuture -> { + CompletableFuture future = createFuture.whenComplete((__, ex) -> { + if (ex == null) { + log.info("[{}] Creat bucket snapshot finish, bucketKey: {}", dispatcher.getName(), + immutableBucket.bucketKey()); + return; + } + + //TODO Record create snapshot failed + log.error("[{}] Failed to create bucket snapshot, bucketKey: {}", + dispatcher.getName(), immutableBucket.bucketKey(), ex); + + // Put the index back into the shared queue and downgrade to memory mode + synchronized (BucketDelayedDeliveryTracker.this) { + immutableBucket.getSnapshotSegments().ifPresent(snapshotSegments -> { + for (DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment : + snapshotSegments) { + for (DelayedIndex delayedIndex : snapshotSegment.getIndexesList()) { + sharedBucketPriorityQueue.add(delayedIndex.getTimestamp(), + delayedIndex.getLedgerId(), delayedIndex.getEntryId()); + } + } + immutableBucket.setSnapshotSegments(null); + }); + + immutableBucket.setCurrentSegmentEntryId(immutableBucket.lastSegmentEntryId); + immutableBuckets.remove( + Range.closed(immutableBucket.startLedgerId, immutableBucket.endLedgerId)); + + immutableBucket.setSnapshotCreateFuture(null); + } + }); + synchronized (BucketDelayedDeliveryTracker.this) { + if (!future.isDone()) { + immutableBucket.setSnapshotCreateFuture(future); + } + } + }); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java index dedd3bdf7c0d8..50e838b3b9dfd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java @@ -24,9 +24,11 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.commons.collections4.CollectionUtils; @@ -39,11 +41,19 @@ @Slf4j class ImmutableBucket extends Bucket { + + @Setter + private volatile List snapshotSegments; + ImmutableBucket(String dispatcherName, ManagedCursor cursor, BucketSnapshotStorage storage, long startLedgerId, long endLedgerId) { super(dispatcherName, cursor, storage, startLedgerId, endLedgerId); } + public Optional> getSnapshotSegments() { + return Optional.ofNullable(snapshotSegments); + } + CompletableFuture> asyncLoadNextBucketSnapshotEntry() { return asyncLoadNextBucketSnapshotEntry(false, null); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java index 9cba6dbb7ef77..40ba8f4c4b593 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java @@ -131,6 +131,10 @@ Pair createImmutableBucketAndAsyncPersistent( bucket.setNumberBucketDelayedMessages(numMessages); bucket.setLastSegmentEntryId(lastSegmentEntryId); + // Skip first segment, because it has already been loaded + List snapshotSegments = bucketSnapshotSegments.subList(1, bucketSnapshotSegments.size()); + bucket.setSnapshotSegments(snapshotSegments); + // Add the first snapshot segment last message to snapshotSegmentLastMessageTable checkArgument(!bucketSnapshotSegments.isEmpty()); SnapshotSegment snapshotSegment = bucketSnapshotSegments.get(0); @@ -140,15 +144,6 @@ Pair createImmutableBucketAndAsyncPersistent( CompletableFuture future = asyncSaveBucketSnapshot(bucket, bucketSnapshotMetadata, bucketSnapshotSegments); bucket.setSnapshotCreateFuture(future); - future.whenComplete((__, ex) -> { - if (ex != null) { - //TODO Record create snapshot failed - log.error("[{}] Failed to create bucket snapshot, bucketKey: {}, ex: ", - dispatcherName, bucketKey(), ex); - } else { - log.info("[{}] Creat bucket snapshot finish, bucketKey: {}", dispatcherName, bucket.bucketKey()); - } - }); return result; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java index e0d62f682708d..7faee82cd75e1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java @@ -138,7 +138,7 @@ public Object[][] provider(Method method) throws Exception { new BucketDelayedDeliveryTracker(dispatcher, timer, 500, clock, true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50) }}; - case "testMergeSnapshot", "testWithBkException" -> new Object[][]{{ + case "testMergeSnapshot", "testWithBkException", "testWithCreateEx" -> new Object[][]{{ new BucketDelayedDeliveryTracker(dispatcher, timer, 100000, clock, true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 10) }}; @@ -346,4 +346,35 @@ public void testWithBkException(BucketDelayedDeliveryTracker tracker) { assertTrue(mockBucketSnapshotStorage.getSegmentExceptionQueue.isEmpty()); assertTrue(mockBucketSnapshotStorage.deleteExceptionQueue.isEmpty()); } + + @Test(dataProvider = "delayedTracker") + public void testWithCreateFailDowngrade(BucketDelayedDeliveryTracker tracker) { + MockBucketSnapshotStorage mockBucketSnapshotStorage = (MockBucketSnapshotStorage) bucketSnapshotStorage; + mockBucketSnapshotStorage.injectCreateException( + new BucketSnapshotPersistenceException("Bookie operation timeout, op: Create entry")); + mockBucketSnapshotStorage.injectCreateException( + new BucketSnapshotPersistenceException("Bookie operation timeout, op: Create entry")); + mockBucketSnapshotStorage.injectCreateException( + new BucketSnapshotPersistenceException("Bookie operation timeout, op: Create entry")); + mockBucketSnapshotStorage.injectCreateException( + new BucketSnapshotPersistenceException("Bookie operation timeout, op: Create entry")); + + assertEquals(4, mockBucketSnapshotStorage.createExceptionQueue.size()); + + for (int i = 1; i <= 6; i++) { + tracker.addMessage(i, i, i * 10); + } + + assertEquals(0, tracker.getImmutableBuckets().asMapOfRanges().size()); + + clockTime.set(5 * 10); + + assertEquals(6, tracker.getNumberOfDelayedMessages()); + + NavigableSet scheduledMessages = tracker.getScheduledMessages(5); + for (int i = 1; i <= 5; i++) { + PositionImpl position = scheduledMessages.pollFirst(); + assertEquals(position, PositionImpl.get(i, i)); + } + } } From fbd4bf35fd24801ba303373e7572d1c660200594 Mon Sep 17 00:00:00 2001 From: coderzc Date: Wed, 1 Mar 2023 14:51:54 +0800 Subject: [PATCH 12/14] improve code --- .../bucket/BucketDelayedDeliveryTracker.java | 120 +++++++++++------- .../delayed/bucket/ImmutableBucket.java | 119 ++++++++--------- 2 files changed, 128 insertions(+), 111 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index 9916d296f2ceb..a1d5de4550261 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -354,6 +354,22 @@ private synchronized CompletableFuture asyncMergeBucketSnapshot() { ImmutableBucket immutableBucketA = values.get(minIndex); ImmutableBucket immutableBucketB = values.get(minIndex + 1); + + CompletableFuture snapshotCreateFutureA = + immutableBucketA.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null)); + if (!snapshotCreateFutureA.isDone()) { + log.info("[{}] Wait for bucket snapshot create finish to merge, bucketKey:{}", dispatcher.getName(), + immutableBucketA.bucketKey()); + return CompletableFuture.completedFuture(null); + } + CompletableFuture snapshotCreateFutureB = + immutableBucketB.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null)); + if (!snapshotCreateFutureB.isDone()) { + log.info("[{}] Wait for bucket snapshot create finish to merge, bucketKey:{}", dispatcher.getName(), + immutableBucketB.bucketKey()); + return CompletableFuture.completedFuture(null); + } + if (log.isDebugEnabled()) { log.info("[{}] Merging bucket snapshot, bucketAKey: {}, bucketBKey: {}", dispatcher.getName(), immutableBucketA.bucketKey(), immutableBucketB.bucketKey()); @@ -363,7 +379,7 @@ private synchronized CompletableFuture asyncMergeBucketSnapshot() { log.error("[{}] Failed to merge bucket snapshot, bucketAKey: {}, bucketBKey: {}", dispatcher.getName(), immutableBucketA.bucketKey(), immutableBucketB.bucketKey(), ex); } else { - log.error("[{}] Merge bucket snapshot finish, bucketAKey: {}, bucketBKey: {}", + log.info("[{}] Merge bucket snapshot finish, bucketAKey: {}, bucketBKey: {}", dispatcher.getName(), immutableBucketA.bucketKey(), immutableBucketB.bucketKey()); } }); @@ -371,57 +387,50 @@ private synchronized CompletableFuture asyncMergeBucketSnapshot() { private synchronized CompletableFuture asyncMergeBucketSnapshot(ImmutableBucket bucketA, ImmutableBucket bucketB) { - CompletableFuture snapshotCreateFutureA = - bucketA.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null)); - CompletableFuture snapshotCreateFutureB = - bucketB.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null)); - - return CompletableFuture.allOf(snapshotCreateFutureA, snapshotCreateFutureB).thenCompose(__ -> { - CompletableFuture> futureA = - bucketA.getRemainSnapshotSegment(); - CompletableFuture> futureB = - bucketB.getRemainSnapshotSegment(); - return futureA.thenCombine(futureB, CombinedSegmentDelayedIndexQueue::wrap) - .thenAccept(combinedDelayedIndexQueue -> { - Pair immutableBucketDelayedIndexPair = - lastMutableBucket.createImmutableBucketAndAsyncPersistent( - timeStepPerBucketSnapshotSegmentInMillis, sharedBucketPriorityQueue, - combinedDelayedIndexQueue, bucketA.startLedgerId, bucketB.endLedgerId); - - // Merge bit map to new bucket - Map delayedIndexBitMapA = bucketA.getDelayedIndexBitMap(); - Map delayedIndexBitMapB = bucketB.getDelayedIndexBitMap(); - Map delayedIndexBitMap = new HashMap<>(delayedIndexBitMapA); - delayedIndexBitMapB.forEach((ledgerId, bitMapB) -> { - delayedIndexBitMap.compute(ledgerId, (k, bitMapA) -> { - if (bitMapA == null) { - return bitMapB; - } + CompletableFuture> futureA = + bucketA.getRemainSnapshotSegment(); + CompletableFuture> futureB = + bucketB.getRemainSnapshotSegment(); + return futureA.thenCombine(futureB, CombinedSegmentDelayedIndexQueue::wrap) + .thenAccept(combinedDelayedIndexQueue -> { + Pair immutableBucketDelayedIndexPair = + lastMutableBucket.createImmutableBucketAndAsyncPersistent( + timeStepPerBucketSnapshotSegmentInMillis, sharedBucketPriorityQueue, + combinedDelayedIndexQueue, bucketA.startLedgerId, bucketB.endLedgerId); + + // Merge bit map to new bucket + Map delayedIndexBitMapA = bucketA.getDelayedIndexBitMap(); + Map delayedIndexBitMapB = bucketB.getDelayedIndexBitMap(); + Map delayedIndexBitMap = new HashMap<>(delayedIndexBitMapA); + delayedIndexBitMapB.forEach((ledgerId, bitMapB) -> { + delayedIndexBitMap.compute(ledgerId, (k, bitMapA) -> { + if (bitMapA == null) { + return bitMapB; + } - bitMapA.or(bitMapB); - return bitMapA; - }); + bitMapA.or(bitMapB); + return bitMapA; }); - immutableBucketDelayedIndexPair.getLeft().setDelayedIndexBitMap(delayedIndexBitMap); - - afterCreateImmutableBucket(immutableBucketDelayedIndexPair); + }); + immutableBucketDelayedIndexPair.getLeft().setDelayedIndexBitMap(delayedIndexBitMap); - CompletableFuture snapshotCreateFuture = CompletableFuture.completedFuture(null); - if (immutableBucketDelayedIndexPair != null) { - snapshotCreateFuture = immutableBucketDelayedIndexPair.getLeft().getSnapshotCreateFuture() - .orElse(CompletableFuture.completedFuture(null)); - } + afterCreateImmutableBucket(immutableBucketDelayedIndexPair); - snapshotCreateFuture.thenCompose(___ -> { - CompletableFuture removeAFuture = bucketA.asyncDeleteBucketSnapshot(); - CompletableFuture removeBFuture = bucketB.asyncDeleteBucketSnapshot(); - return CompletableFuture.allOf(removeAFuture, removeBFuture); - }); + CompletableFuture snapshotCreateFuture = CompletableFuture.completedFuture(null); + if (immutableBucketDelayedIndexPair != null) { + snapshotCreateFuture = immutableBucketDelayedIndexPair.getLeft().getSnapshotCreateFuture() + .orElse(CompletableFuture.completedFuture(null)); + } - immutableBuckets.remove(Range.closed(bucketA.startLedgerId, bucketA.endLedgerId)); - immutableBuckets.remove(Range.closed(bucketB.startLedgerId, bucketB.endLedgerId)); + snapshotCreateFuture.thenCompose(___ -> { + CompletableFuture removeAFuture = bucketA.asyncDeleteBucketSnapshot(); + CompletableFuture removeBFuture = bucketB.asyncDeleteBucketSnapshot(); + return CompletableFuture.allOf(removeAFuture, removeBFuture); }); - }); + + immutableBuckets.remove(Range.closed(bucketA.startLedgerId, bucketA.endLedgerId)); + immutableBuckets.remove(Range.closed(bucketB.startLedgerId, bucketB.endLedgerId)); + }); } @Override @@ -475,7 +484,7 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa long ledgerId = sharedBucketPriorityQueue.peekN2(); long entryId = sharedBucketPriorityQueue.peekN3(); - ImmutableBucket bucket = snapshotSegmentLastIndexTable.remove(ledgerId, entryId); + ImmutableBucket bucket = snapshotSegmentLastIndexTable.get(ledgerId, entryId); if (bucket != null && immutableBuckets.asMapOfRanges().containsValue(bucket)) { final int preSegmentEntryId = bucket.currentSegmentEntryId; if (log.isDebugEnabled()) { @@ -485,6 +494,21 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa // All message of current snapshot segment are scheduled, load next snapshot segment // TODO make it asynchronous and not blocking this process try { + boolean createFutureDone = bucket.getSnapshotCreateFuture() + .orElse(CompletableFuture.completedFuture(null)).isDone(); + + if (!createFutureDone) { + log.info("[{}] Wait for bucket snapshot create finish to load, bucketKey:{}", + dispatcher.getName(), bucket.bucketKey()); + break; + } + + if (bucket.currentSegmentEntryId == bucket.lastSegmentEntryId) { + immutableBuckets.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId)); + bucket.asyncDeleteBucketSnapshot(); + break; + } + bucket.asyncLoadNextBucketSnapshotEntry().thenAccept(indexList -> { if (CollectionUtils.isEmpty(indexList)) { immutableBuckets.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId)); @@ -502,7 +526,6 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa }).whenComplete((__, ex) -> { if (ex != null) { // Back bucket state - snapshotSegmentLastIndexTable.put(ledgerId, entryId, bucket); bucket.setCurrentSegmentEntryId(preSegmentEntryId); log.error("[{}] Failed to load bucket snapshot segment, bucketKey: {}", @@ -512,6 +535,7 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa dispatcher.getName(), bucket.bucketKey(), bucket.currentSegmentEntryId); } }).get(AsyncOperationTimeoutSeconds * MaxRetryTimes, TimeUnit.SECONDS); + snapshotSegmentLastIndexTable.remove(ledgerId, entryId); } catch (Exception e) { // Ignore exception to reload this segment on the next schedule. break; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java index 50e838b3b9dfd..c947e53843a85 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java @@ -64,72 +64,65 @@ CompletableFuture> asyncRecoverBucketSnapshotEntry(Supplier> asyncLoadNextBucketSnapshotEntry(boolean isRecover, Supplier cutoffTimeSupplier) { - // Wait bucket snapshot create finish - CompletableFuture snapshotCreateFuture = - getSnapshotCreateFuture().orElseGet(() -> CompletableFuture.completedFuture(null)) - .thenApply(__ -> null); - - return snapshotCreateFuture.thenCompose(__ -> { - final long bucketId = getAndUpdateBucketId(); - final CompletableFuture loadMetaDataFuture; - if (isRecover) { - final long cutoffTime = cutoffTimeSupplier.get(); - // Load Metadata of bucket snapshot - final String bucketKey = bucketKey(); - loadMetaDataFuture = executeWithRetry(() -> bucketSnapshotStorage.getBucketSnapshotMetadata(bucketId) - .whenComplete((___, ex) -> { - if (ex != null) { - log.warn("[{}] Failed to get bucket snapshot metadata," - + " bucketKey: {}, bucketId: {}", - dispatcherName, bucketKey, bucketId, ex); - } - }), BucketSnapshotPersistenceException.class, MaxRetryTimes) - .thenApply(snapshotMetadata -> { - List metadataList = - snapshotMetadata.getMetadataListList(); - - // Skip all already reach schedule time snapshot segments - int nextSnapshotEntryIndex = 0; - while (nextSnapshotEntryIndex < metadataList.size() - && metadataList.get(nextSnapshotEntryIndex).getMaxScheduleTimestamp() <= cutoffTime) { - nextSnapshotEntryIndex++; - } - - this.setLastSegmentEntryId(metadataList.size()); - this.recoverDelayedIndexBitMapAndNumber(nextSnapshotEntryIndex, metadataList); + final long bucketId = getAndUpdateBucketId(); + final CompletableFuture loadMetaDataFuture; + if (isRecover) { + final long cutoffTime = cutoffTimeSupplier.get(); + // Load Metadata of bucket snapshot + final String bucketKey = bucketKey(); + loadMetaDataFuture = executeWithRetry(() -> bucketSnapshotStorage.getBucketSnapshotMetadata(bucketId) + .whenComplete((___, ex) -> { + if (ex != null) { + log.warn("[{}] Failed to get bucket snapshot metadata," + + " bucketKey: {}, bucketId: {}", + dispatcherName, bucketKey, bucketId, ex); + } + }), BucketSnapshotPersistenceException.class, MaxRetryTimes) + .thenApply(snapshotMetadata -> { + List metadataList = + snapshotMetadata.getMetadataListList(); + + // Skip all already reach schedule time snapshot segments + int nextSnapshotEntryIndex = 0; + while (nextSnapshotEntryIndex < metadataList.size() + && metadataList.get(nextSnapshotEntryIndex).getMaxScheduleTimestamp() <= cutoffTime) { + nextSnapshotEntryIndex++; + } + + this.setLastSegmentEntryId(metadataList.size()); + this.recoverDelayedIndexBitMapAndNumber(nextSnapshotEntryIndex, metadataList); + + return nextSnapshotEntryIndex + 1; + }); + } else { + loadMetaDataFuture = CompletableFuture.completedFuture(currentSegmentEntryId + 1); + } - return nextSnapshotEntryIndex + 1; - }); - } else { - loadMetaDataFuture = CompletableFuture.completedFuture(currentSegmentEntryId + 1); + return loadMetaDataFuture.thenCompose(nextSegmentEntryId -> { + if (nextSegmentEntryId > lastSegmentEntryId) { + return CompletableFuture.completedFuture(null); } - return loadMetaDataFuture.thenCompose(nextSegmentEntryId -> { - if (nextSegmentEntryId > lastSegmentEntryId) { - return CompletableFuture.completedFuture(null); - } - - return executeWithRetry( - () -> bucketSnapshotStorage.getBucketSnapshotSegment(bucketId, nextSegmentEntryId, - nextSegmentEntryId).whenComplete((___, ex) -> { - if (ex != null) { - log.warn("[{}] Failed to get bucket snapshot segment. bucketKey: {}, bucketId: {}", - dispatcherName, bucketKey(), bucketId, ex); - } - }), BucketSnapshotPersistenceException.class, MaxRetryTimes) - .thenApply(bucketSnapshotSegments -> { - if (CollectionUtils.isEmpty(bucketSnapshotSegments)) { - return Collections.emptyList(); - } - - DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment = - bucketSnapshotSegments.get(0); - List indexList = - snapshotSegment.getIndexesList(); - this.setCurrentSegmentEntryId(nextSegmentEntryId); - return indexList; - }); - }); + return executeWithRetry( + () -> bucketSnapshotStorage.getBucketSnapshotSegment(bucketId, nextSegmentEntryId, + nextSegmentEntryId).whenComplete((___, ex) -> { + if (ex != null) { + log.warn("[{}] Failed to get bucket snapshot segment. bucketKey: {}, bucketId: {}", + dispatcherName, bucketKey(), bucketId, ex); + } + }), BucketSnapshotPersistenceException.class, MaxRetryTimes) + .thenApply(bucketSnapshotSegments -> { + if (CollectionUtils.isEmpty(bucketSnapshotSegments)) { + return Collections.emptyList(); + } + + DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment = + bucketSnapshotSegments.get(0); + List indexList = + snapshotSegment.getIndexesList(); + this.setCurrentSegmentEntryId(nextSegmentEntryId); + return indexList; + }); }); } From be9092bdb08fac067e119c4fcf20d1a45aed51df Mon Sep 17 00:00:00 2001 From: coderzc Date: Wed, 1 Mar 2023 16:22:38 +0800 Subject: [PATCH 13/14] fix bug --- .../bucket/BucketDelayedDeliveryTracker.java | 51 ++++++------------- .../BucketDelayedDeliveryTrackerTest.java | 5 +- 2 files changed, 19 insertions(+), 37 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index a1d5de4550261..ad2c2076daa0b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -62,6 +62,8 @@ @ThreadSafe public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker { + static final CompletableFuture NULL_LONG_PROMISE = CompletableFuture.completedFuture(null); + static final int AsyncOperationTimeoutSeconds = 60; private final long minIndexCountPerBucket; @@ -238,6 +240,7 @@ private void afterCreateImmutableBucket(Pair immu immutableBucket.getSnapshotCreateFuture().ifPresent(createFuture -> { CompletableFuture future = createFuture.whenComplete((__, ex) -> { if (ex == null) { + immutableBucket.setSnapshotSegments(null); log.info("[{}] Creat bucket snapshot finish, bucketKey: {}", dispatcher.getName(), immutableBucket.bucketKey()); return; @@ -247,7 +250,7 @@ private void afterCreateImmutableBucket(Pair immu log.error("[{}] Failed to create bucket snapshot, bucketKey: {}", dispatcher.getName(), immutableBucket.bucketKey(), ex); - // Put the index back into the shared queue and downgrade to memory mode + // Put indexes back into the shared queue and downgrade to memory mode synchronized (BucketDelayedDeliveryTracker.this) { immutableBucket.getSnapshotSegments().ifPresent(snapshotSegments -> { for (DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment snapshotSegment : @@ -263,15 +266,11 @@ private void afterCreateImmutableBucket(Pair immu immutableBucket.setCurrentSegmentEntryId(immutableBucket.lastSegmentEntryId); immutableBuckets.remove( Range.closed(immutableBucket.startLedgerId, immutableBucket.endLedgerId)); - - immutableBucket.setSnapshotCreateFuture(null); + snapshotSegmentLastIndexTable.remove(lastDelayedIndex.getLedgerId(), + lastDelayedIndex.getTimestamp()); } }); - synchronized (BucketDelayedDeliveryTracker.this) { - if (!future.isDone()) { - immutableBucket.setSnapshotCreateFuture(future); - } - } + immutableBucket.setSnapshotCreateFuture(future); }); } } @@ -341,7 +340,10 @@ private synchronized CompletableFuture asyncMergeBucketSnapshot() { long numberMessages = bucketL.numberBucketDelayedMessages + bucketR.numberBucketDelayedMessages; if (numberMessages < minNumberMessages) { minNumberMessages = (int) numberMessages; - if (bucketL.lastSegmentEntryId > bucketL.getCurrentSegmentEntryId()) { + if (bucketL.lastSegmentEntryId > bucketL.getCurrentSegmentEntryId() + && bucketR.lastSegmentEntryId > bucketR.getCurrentSegmentEntryId() + && bucketL.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone() + && bucketR.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone()) { minIndex = i; } } @@ -355,21 +357,6 @@ private synchronized CompletableFuture asyncMergeBucketSnapshot() { ImmutableBucket immutableBucketA = values.get(minIndex); ImmutableBucket immutableBucketB = values.get(minIndex + 1); - CompletableFuture snapshotCreateFutureA = - immutableBucketA.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null)); - if (!snapshotCreateFutureA.isDone()) { - log.info("[{}] Wait for bucket snapshot create finish to merge, bucketKey:{}", dispatcher.getName(), - immutableBucketA.bucketKey()); - return CompletableFuture.completedFuture(null); - } - CompletableFuture snapshotCreateFutureB = - immutableBucketB.getSnapshotCreateFuture().orElse(CompletableFuture.completedFuture(null)); - if (!snapshotCreateFutureB.isDone()) { - log.info("[{}] Wait for bucket snapshot create finish to merge, bucketKey:{}", dispatcher.getName(), - immutableBucketB.bucketKey()); - return CompletableFuture.completedFuture(null); - } - if (log.isDebugEnabled()) { log.info("[{}] Merging bucket snapshot, bucketAKey: {}, bucketBKey: {}", dispatcher.getName(), immutableBucketA.bucketKey(), immutableBucketB.bucketKey()); @@ -416,13 +403,8 @@ private synchronized CompletableFuture asyncMergeBucketSnapshot(ImmutableB afterCreateImmutableBucket(immutableBucketDelayedIndexPair); - CompletableFuture snapshotCreateFuture = CompletableFuture.completedFuture(null); - if (immutableBucketDelayedIndexPair != null) { - snapshotCreateFuture = immutableBucketDelayedIndexPair.getLeft().getSnapshotCreateFuture() - .orElse(CompletableFuture.completedFuture(null)); - } - - snapshotCreateFuture.thenCompose(___ -> { + immutableBucketDelayedIndexPair.getLeft().getSnapshotCreateFuture() + .orElse(NULL_LONG_PROMISE).thenCompose(___ -> { CompletableFuture removeAFuture = bucketA.asyncDeleteBucketSnapshot(); CompletableFuture removeBFuture = bucketB.asyncDeleteBucketSnapshot(); return CompletableFuture.allOf(removeAFuture, removeBFuture); @@ -494,11 +476,10 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa // All message of current snapshot segment are scheduled, load next snapshot segment // TODO make it asynchronous and not blocking this process try { - boolean createFutureDone = bucket.getSnapshotCreateFuture() - .orElse(CompletableFuture.completedFuture(null)).isDone(); + boolean createFutureDone = bucket.getSnapshotCreateFuture().orElse(NULL_LONG_PROMISE).isDone(); if (!createFutureDone) { - log.info("[{}] Wait for bucket snapshot create finish to load, bucketKey:{}", + log.info("[{}] Skip load to wait for bucket snapshot create finish, bucketKey:{}", dispatcher.getName(), bucket.bucketKey()); break; } @@ -506,7 +487,7 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa if (bucket.currentSegmentEntryId == bucket.lastSegmentEntryId) { immutableBuckets.remove(Range.closed(bucket.startLedgerId, bucket.endLedgerId)); bucket.asyncDeleteBucketSnapshot(); - break; + continue; } bucket.asyncLoadNextBucketSnapshotEntry().thenAccept(indexList -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java index 7faee82cd75e1..08e1f78725bf4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java @@ -51,6 +51,7 @@ import org.roaringbitmap.RoaringBitmap; import org.roaringbitmap.buffer.ImmutableRoaringBitmap; import org.testcontainers.shaded.org.apache.commons.lang3.mutable.MutableLong; +import org.testcontainers.shaded.org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -138,7 +139,7 @@ public Object[][] provider(Method method) throws Exception { new BucketDelayedDeliveryTracker(dispatcher, timer, 500, clock, true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50) }}; - case "testMergeSnapshot", "testWithBkException", "testWithCreateEx" -> new Object[][]{{ + case "testMergeSnapshot", "testWithBkException", "testWithCreateFailDowngrade" -> new Object[][]{{ new BucketDelayedDeliveryTracker(dispatcher, timer, 100000, clock, true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 10) }}; @@ -365,7 +366,7 @@ public void testWithCreateFailDowngrade(BucketDelayedDeliveryTracker tracker) { tracker.addMessage(i, i, i * 10); } - assertEquals(0, tracker.getImmutableBuckets().asMapOfRanges().size()); + Awaitility.await().untilAsserted(() -> assertEquals(0, tracker.getImmutableBuckets().asMapOfRanges().size())); clockTime.set(5 * 10); From 8f73e8fb19377f24cd4db66763a1acb4906d1a38 Mon Sep 17 00:00:00 2001 From: coderzc Date: Wed, 1 Mar 2023 18:38:00 +0800 Subject: [PATCH 14/14] improve code --- .../delayed/bucket/BucketDelayedDeliveryTracker.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index ad2c2076daa0b..a77b272297be4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -300,11 +300,9 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver if (immutableBuckets.asMapOfRanges().size() > maxNumBuckets) { try { asyncMergeBucketSnapshot().get(2 * AsyncOperationTimeoutSeconds * MaxRetryTimes, TimeUnit.SECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - throw new RuntimeException(e); + } catch (Exception e) { + // Ignore exception to merge bucket on the next schedule. + log.error("[{}] An exception occurs when merge bucket snapshot.", dispatcher.getName(), e); } } } @@ -519,6 +517,8 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa snapshotSegmentLastIndexTable.remove(ledgerId, entryId); } catch (Exception e) { // Ignore exception to reload this segment on the next schedule. + log.error("[{}] An exception occurs when load next bucket snapshot, bucketKey:{}", + dispatcher.getName(), bucket.bucketKey(), e); break; } }