Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Make BucketDelayedDeliveryTracker can retry snapshot operation & improve logs #19577

Merged
merged 15 commits into from
Mar 2, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.function.Supplier;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.common.util.FutureUtil;

/**
* Conveniences to use with {@link CompletableFuture}.
Expand Down Expand Up @@ -78,7 +79,8 @@ public static <T> CompletableFuture<T> executeWithRetry(Supplier<CompletableFutu
if (ex == null) {
resultFuture.complete(res);
} else {
if (needRetryExceptionClass.isAssignableFrom(ex.getClass()) && maxRetryTimes > 0) {
Throwable throwable = FutureUtil.unwrapCompletionException(ex);
if (needRetryExceptionClass.isAssignableFrom(throwable.getClass()) && maxRetryTimes > 0) {
executeWithRetry(op, needRetryExceptionClass, maxRetryTimes - 1).whenComplete((res2, ex2) -> {
if (ex2 == null) {
resultFuture.complete(res2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private CompletableFuture<LedgerHandle> createLedger(String bucketKey) {
LedgerPassword,
(rc, handle, ctx) -> {
if (rc != BKException.Code.OK) {
future.completeExceptionally(bkException("Failed to create ledger", rc, -1));
future.completeExceptionally(bkException("Create ledger", rc, -1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why modify these logs, the previous description looks more accurate

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field is the name of the operation.

} else {
future.complete(handle);
}
Expand All @@ -170,7 +170,7 @@ private CompletableFuture<LedgerHandle> openLedger(Long ledgerId) {
LedgerPassword,
(rc, handle, ctx) -> {
if (rc != BKException.Code.OK) {
future.completeExceptionally(bkException("Failed to open ledger", rc, ledgerId));
future.completeExceptionally(bkException("Open ledger", rc, ledgerId));
} else {
future.complete(handle);
}
Expand All @@ -184,7 +184,7 @@ private CompletableFuture<Void> closeLedger(LedgerHandle ledgerHandle) {
ledgerHandle.asyncClose((rc, handle, ctx) -> {
if (rc != BKException.Code.OK) {
log.warn("Failed to close a Ledger Handle: {}", ledgerHandle.getId());
future.completeExceptionally(bkException("Failed to close ledger", rc, ledgerHandle.getId()));
future.completeExceptionally(bkException("Close ledger", rc, ledgerHandle.getId()));
} else {
future.complete(null);
}
Expand All @@ -197,7 +197,7 @@ private CompletableFuture<Void> addEntry(LedgerHandle ledgerHandle, byte[] data)
ledgerHandle.asyncAddEntry(data,
(rc, handle, entryId, ctx) -> {
if (rc != BKException.Code.OK) {
future.completeExceptionally(bkException("Failed to add entry", rc, ledgerHandle.getId()));
future.completeExceptionally(bkException("Add entry", rc, ledgerHandle.getId()));
} else {
future.complete(null);
}
Expand All @@ -217,7 +217,7 @@ CompletableFuture<Enumeration<LedgerEntry>> getLedgerEntryThenCloseLedger(Ledger
ledger.asyncReadEntries(firstEntryId, lastEntryId,
(rc, handle, entries, ctx) -> {
if (rc != BKException.Code.OK) {
future.completeExceptionally(bkException("Failed to read entry", rc, ledger.getId()));
future.completeExceptionally(bkException("Read entry", rc, ledger.getId()));
} else {
future.complete(entries);
}
Expand All @@ -231,7 +231,7 @@ private CompletableFuture<Void> deleteLedger(long ledgerId) {
CompletableFuture<Void> future = new CompletableFuture<>();
bookKeeper.asyncDeleteLedger(ledgerId, (int rc, Object cnx) -> {
if (rc != BKException.Code.OK) {
future.completeExceptionally(bkException("Failed to delete ledger", rc, ledgerId));
future.completeExceptionally(bkException("Delete ledger", rc, ledgerId));
} else {
future.complete(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ abstract class Bucket {
static final String DELIMITER = "_";
static final int MaxRetryTimes = 3;

protected final String dispatcherName;

protected final ManagedCursor cursor;
protected final BucketSnapshotStorage bucketSnapshotStorage;

Expand All @@ -54,17 +56,18 @@ abstract class Bucket {

int lastSegmentEntryId;

int currentSegmentEntryId;
volatile int currentSegmentEntryId;

long snapshotLength;
volatile long snapshotLength;

private volatile Long bucketId;

private volatile CompletableFuture<Long> snapshotCreateFuture;


Bucket(ManagedCursor cursor, BucketSnapshotStorage storage, long startLedgerId, long endLedgerId) {
this(cursor, storage, startLedgerId, endLedgerId, new HashMap<>(), -1, -1, 0, 0, null, null);
Bucket(String dispatcherName, ManagedCursor cursor,
BucketSnapshotStorage storage, long startLedgerId, long endLedgerId) {
this(dispatcherName, cursor, storage, startLedgerId, endLedgerId, new HashMap<>(), -1, -1, 0, 0, null, null);
}

boolean containsMessage(long ledgerId, long entryId) {
Expand Down Expand Up @@ -126,13 +129,19 @@ CompletableFuture<Long> asyncSaveBucketSnapshot(
ImmutableBucket bucket, DelayedMessageIndexBucketSnapshotFormat.SnapshotMetadata snapshotMetadata,
List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment> bucketSnapshotSegments) {
final String bucketKey = bucket.bucketKey();
return bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata, bucketSnapshotSegments, bucketKey)
.thenCompose(newBucketId -> {
return executeWithRetry(
() -> bucketSnapshotStorage.createBucketSnapshot(snapshotMetadata, bucketSnapshotSegments, bucketKey)
.whenComplete((__, ex) -> {
if (ex != null) {
log.warn("[{}] Failed to create bucket snapshot, bucketKey: {}",
dispatcherName, bucketKey, ex);
}
}), BucketSnapshotPersistenceException.class, MaxRetryTimes).thenCompose(newBucketId -> {
bucket.setBucketId(newBucketId);

return putBucketKeyId(bucketKey, newBucketId).exceptionally(ex -> {
log.warn("Failed to record bucketId to cursor property, bucketKey: {}, bucketId: {}",
bucketKey, bucketId);
log.warn("[{}] Failed to record bucketId to cursor property, bucketKey: {}, bucketId: {}",
dispatcherName, bucketKey, newBucketId, ex);
return null;
}).thenApply(__ -> newBucketId);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELAYED_BUCKET_KEY_PREFIX;
import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELIMITER;
import static org.apache.pulsar.broker.delayed.bucket.Bucket.MaxRetryTimes;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Range;
Expand Down Expand Up @@ -60,7 +61,7 @@
@ThreadSafe
public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker {

static final int AsyncOperationTimeoutSeconds = 30;
static final int AsyncOperationTimeoutSeconds = 60;

private final long minIndexCountPerBucket;

Expand Down Expand Up @@ -103,20 +104,19 @@ public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispat
this.sharedBucketPriorityQueue = new TripleLongPriorityQueue();
this.immutableBuckets = TreeRangeMap.create();
this.snapshotSegmentLastIndexTable = HashBasedTable.create();
ManagedCursor cursor = dispatcher.getCursor();
this.lastMutableBucket = new MutableBucket(cursor, bucketSnapshotStorage);
this.lastMutableBucket = new MutableBucket(dispatcher.getName(), dispatcher.getCursor(), bucketSnapshotStorage);
this.numberDelayedMessages = recoverBucketSnapshot();
}

private synchronized long recoverBucketSnapshot() throws RuntimeException {
ManagedCursor cursor = this.lastMutableBucket.cursor;
ManagedCursor cursor = this.lastMutableBucket.getCursor();
Map<Range<Long>, ImmutableBucket> toBeDeletedBucketMap = new HashMap<>();
cursor.getCursorProperties().keySet().forEach(key -> {
if (key.startsWith(DELAYED_BUCKET_KEY_PREFIX)) {
String[] keys = key.split(DELIMITER);
checkArgument(keys.length == 3);
ImmutableBucket immutableBucket =
new ImmutableBucket(cursor, this.lastMutableBucket.bucketSnapshotStorage,
new ImmutableBucket(dispatcher.getName(), cursor, this.lastMutableBucket.bucketSnapshotStorage,
Long.parseLong(keys[1]), Long.parseLong(keys[2]));
putAndCleanOverlapRange(Range.closed(immutableBucket.startLedgerId, immutableBucket.endLedgerId),
immutableBucket, toBeDeletedBucketMap);
Expand All @@ -125,6 +125,8 @@ private synchronized long recoverBucketSnapshot() throws RuntimeException {

Map<Range<Long>, ImmutableBucket> immutableBucketMap = immutableBuckets.asMapOfRanges();
if (immutableBucketMap.isEmpty()) {
log.info("[{}] Recover delayed message index bucket snapshot finish, don't find bucket snapshot",
dispatcher.getName());
return 0;
}

Expand Down Expand Up @@ -231,10 +233,6 @@ private void afterCreateImmutableBucket(Pair<ImmutableBucket, DelayedIndex> immu
DelayedIndex lastDelayedIndex = immutableBucketDelayedIndexPair.getRight();
snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(), lastDelayedIndex.getEntryId(),
immutableBucket);
if (log.isDebugEnabled()) {
log.debug("[{}] Create bucket snapshot, bucket: {}", dispatcher.getName(),
lastMutableBucket);
}
}
}

Expand Down Expand Up @@ -262,7 +260,7 @@ public synchronized boolean addMessage(long ledgerId, long entryId, long deliver

if (immutableBuckets.asMapOfRanges().size() > maxNumBuckets) {
try {
asyncMergeBucketSnapshot().get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS);
asyncMergeBucketSnapshot().get(2 * AsyncOperationTimeoutSeconds * MaxRetryTimes, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -306,7 +304,21 @@ private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot() {
minIndex = i;
}
}
return asyncMergeBucketSnapshot(values.get(minIndex), values.get(minIndex + 1));
ImmutableBucket immutableBucketA = values.get(minIndex);
ImmutableBucket immutableBucketB = values.get(minIndex + 1);
if (log.isDebugEnabled()) {
log.info("[{}] Merging bucket snapshot, bucketAKey: {}, bucketBKey: {}", dispatcher.getName(),
immutableBucketA.bucketKey(), immutableBucketB.bucketKey());
}
return asyncMergeBucketSnapshot(immutableBucketA, immutableBucketB).whenComplete((__, ex) -> {
if (ex != null) {
log.error("[{}] Failed to merge bucket snapshot, bucketAKey: {}, bucketBKey: {}",
dispatcher.getName(), immutableBucketA.bucketKey(), immutableBucketB.bucketKey(), ex);
} else {
log.error("[{}] Merge bucket snapshot finish, bucketAKey: {}, bucketBKey: {}",
dispatcher.getName(), immutableBucketA.bucketKey(), immutableBucketB.bucketKey());
}
});
}

private synchronized CompletableFuture<Void> asyncMergeBucketSnapshot(ImmutableBucket bucketA,
Expand Down Expand Up @@ -392,15 +404,13 @@ public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessa

long ledgerId = sharedBucketPriorityQueue.peekN2();
long entryId = sharedBucketPriorityQueue.peekN3();
positions.add(new PositionImpl(ledgerId, entryId));

sharedBucketPriorityQueue.pop();
removeIndexBit(ledgerId, entryId);

ImmutableBucket bucket = snapshotSegmentLastIndexTable.remove(ledgerId, entryId);
if (bucket != null && immutableBuckets.asMapOfRanges().containsValue(bucket)) {
final int lastSegmentEntryId = bucket.currentSegmentEntryId;
if (log.isDebugEnabled()) {
log.debug("[{}] Load next snapshot segment, bucket: {}", dispatcher.getName(), bucket);
log.debug("[{}] Loading next bucket snapshot segment, bucketKey: {}, nextSegmentEntryId: {}",
dispatcher.getName(), bucket.bucketKey(), lastSegmentEntryId + 1);
}
// All message of current snapshot segment are scheduled, load next snapshot segment
// TODO make it asynchronous and not blocking this process
Expand All @@ -419,13 +429,30 @@ public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessa
sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(),
index.getEntryId());
}
}).get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
// TODO make this segment load again
throw new RuntimeException(e);
}).whenComplete((__, ex) -> {
if (ex != null) {
// Back bucket state
snapshotSegmentLastIndexTable.put(ledgerId, entryId, bucket);
bucket.setCurrentSegmentEntryId(lastSegmentEntryId);

log.error("[{}] Failed to load bucket snapshot segment, bucketKey: {}",
dispatcher.getName(), bucket.bucketKey(), ex);
} else {
log.info("[{}] Load next bucket snapshot segment finish, bucketKey: {}, segmentEntryId: {}",
dispatcher.getName(), bucket.bucketKey(), bucket.currentSegmentEntryId);
}
}).get(AsyncOperationTimeoutSeconds * MaxRetryTimes, TimeUnit.SECONDS);
} catch (Exception e) {
// Ignore exception to reload this segment on the next schedule.
break;
}
}

positions.add(new PositionImpl(ledgerId, entryId));

sharedBucketPriorityQueue.pop();
removeIndexBit(ledgerId, entryId);

--n;
--numberDelayedMessages;
}
Expand Down
Loading