-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker] Ensure previous delayed index be removed from snapshotSegmentLastIndexTable & Make load operate asynchronous #20086
Conversation
4a75481
to
0bf5554
Compare
0bf5554
to
56f4902
Compare
7ed8dba
to
84691ea
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #20086 +/- ##
=============================================
+ Coverage 35.58% 72.91% +37.33%
- Complexity 12423 31908 +19485
=============================================
Files 1691 1868 +177
Lines 128771 138317 +9546
Branches 14044 15214 +1170
=============================================
+ Hits 45822 100860 +55038
+ Misses 76931 29449 -47482
- Partials 6018 8008 +1990
Flags with carried forward coverage won't be shown. Click here to find out more.
|
bucket.asyncLoadNextBucketSnapshotEntry().thenAccept(indexList -> { | ||
long loadStartTime = System.currentTimeMillis(); | ||
stats.recordTriggerEvent(BucketDelayedMessageIndexStats.Type.load); | ||
CompletableFuture<Void> loadFuture = pendingLoad = bucket.asyncLoadNextBucketSnapshotEntry() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this line split into multi line ? var a = b = c seems not usual : - )
@@ -527,17 +528,22 @@ protected long nextDeliveryTime() { | |||
} | |||
|
|||
@Override | |||
public synchronized long getNumberOfDelayedMessages() { | |||
public long getNumberOfDelayedMessages() { | |||
return numberDelayedMessages; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need an volatile
on field numberDelayedMessage
if we access without lock.
return this.lastMutableBucket.getBufferMemoryUsage() + sharedBucketPriorityQueue.bytesCapacity(); | ||
} | ||
|
||
@Override | ||
public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessages) { | ||
if (!checkPendingOpDone()) { | ||
log.info("[{}] Skip getScheduledMessages to wait for bucket snapshot load finish.", dispatcher.getName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this method is on the hot path. we need change this log level to debug to avoid print logs frequently.
return this.lastMutableBucket.getBufferMemoryUsage() + sharedBucketPriorityQueue.bytesCapacity(); | ||
} | ||
|
||
@Override | ||
public synchronized NavigableSet<PositionImpl> getScheduledMessages(int maxMessages) { | ||
if (!checkPendingOpDone()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does the loading task only triggered we there is not any scheduled message left ? it seems dispatcher have to wait for the loading task finished before any scheduled message on time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the delivery of the message will be delayed. I think we can optimize this part in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree. hope to see PIP-195 in 3.0.0 : - )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM ~
This has been cherry-picked to branch-3.0 despite the code freeze. |
This is a severe issue for PIP-195 so it needs into 3.0, thanks~ |
PIP: #16763
Motivation
If the future of load segment timeout but, In fact load segment is successful then we need to ensure the previous delayed index is removed from snapshotSegmentLastIndexTable, otherwise, tracker will load the next segment even though we don't need it yet.
Modifications
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: