-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][broker] Make BucketDelayedDeliveryTracker can retry snapshot operation & improve logs #19577
Conversation
7e5a16c
to
af32f57
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are still some unit tests missing
623d557
to
b47aa6f
Compare
@315157973 I already add unit tests to cover the change, please take a look. |
b47aa6f
to
f1566c8
Compare
05f1c73
to
4c6b0b4
Compare
4c6b0b4
to
82178f0
Compare
return asyncMergeBucketSnapshot(values.get(minIndex), values.get(minIndex + 1)); | ||
ImmutableBucket immutableBucketA = values.get(minIndex); | ||
ImmutableBucket immutableBucketB = values.get(minIndex + 1); | ||
log.info("[{}] Merging bucket snapshot, bucketAKey: {}, bucketBKey: {}", dispatcher.getName(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should change to debug level since it will make a lot of noise if you have many topics on the broker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
log.debug("[{}] Load next snapshot segment, bucket: {}", dispatcher.getName(), bucket); | ||
} | ||
final int lastSegmentEntryId = bucket.currentSegmentEntryId; | ||
log.info("[{}] Loading next bucket snapshot segment, bucketKey: {}, nextSegmentEntryId: {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change to debug level?
c0184a7
to
2316e13
Compare
@@ -154,7 +154,7 @@ private CompletableFuture<LedgerHandle> createLedger(String bucketKey) { | |||
LedgerPassword, | |||
(rc, handle, ctx) -> { | |||
if (rc != BKException.Code.OK) { | |||
future.completeExceptionally(bkException("Failed to create ledger", rc, -1)); | |||
future.completeExceptionally(bkException("Create ledger", rc, -1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why modify these logs, the previous description looks more accurate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This field is the name of the operation.
|
||
private volatile Long bucketId; | ||
|
||
private volatile CompletableFuture<Long> snapshotCreateFuture; | ||
|
||
|
||
Bucket(ManagedCursor cursor, BucketSnapshotStorage storage, long startLedgerId, long endLedgerId) { | ||
this(cursor, storage, startLedgerId, endLedgerId, new HashMap<>(), -1, -1, 0, 0, null, null); | ||
Bucket(PersistentDispatcherMultipleConsumers dispatcher, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change to dispatch, just to get the name of dispatch and print log?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I need print name of dispatcher
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't need to access other APIs of the dispatcher, we'd better only pass the topic name and subscription name to the Bucket.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I improve it.
# Conflicts: # pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java # pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java # pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerTest.java
6db0a93
to
99e6ee9
Compare
4916f53
to
a184148
Compare
60e1eae
to
51de068
Compare
51de068
to
be9092b
Compare
PIP: #16763
Motivation & Modifications
executeWithRetry
method to auto retry operation.doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: