-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][broker] PIP-192 Added SplitScheduler and DefaultNamespaceBundleSplitStrategyImpl #19622
[improve][broker] PIP-192 Added SplitScheduler and DefaultNamespaceBundleSplitStrategyImpl #19622
Conversation
@@ -847,7 +847,6 @@ protected void splitServiceUnitOnceAndRetry(NamespaceService namespaceService, | |||
return null; | |||
}); | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Useless change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
var bundleHighTrafficIterator = | ||
bundleHighTrafficFrequency.entrySet().iterator(); | ||
while (bundleHighTrafficIterator.hasNext()) { | ||
String bundle = bundleHighTrafficIterator.next().getKey(); | ||
if (!bundleStatsMap.containsKey(bundle)) { | ||
bundleHighTrafficIterator.remove(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
var bundleHighTrafficIterator = | |
bundleHighTrafficFrequency.entrySet().iterator(); | |
while (bundleHighTrafficIterator.hasNext()) { | |
String bundle = bundleHighTrafficIterator.next().getKey(); | |
if (!bundleStatsMap.containsKey(bundle)) { | |
bundleHighTrafficIterator.remove(); | |
} | |
} | |
bundleHighTrafficFrequency.keySet().retainAll(bundleStatsMap.keySet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
for (SplitDecision decision : decisions) { | ||
if (decision.getLabel() == Success) { | ||
var split = decision.getSplit(); | ||
futures.add(serviceUnitStateChannel.publishSplitEventAsync(split) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to wait until the split (Received Deleted
message) is finished,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great point. I see that UnloadScheduler does not wait for unload completion now.
However, I agree that we better wait for completion (wait on channel.getOwnerAsync) to confirm the completion on the same dedicated thread.
The class name, SplitScheduler would sound counterintuitive if the logic schedules splits and waits for completion. Perhaps, the better name could be SplitManager.
Do we want this class name change and adding the waiting logic in this PR, or a separate PR(also refactoring UnloadScheduler)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we add a method like this?
https://github.com/apache/pulsar/pull/19538/files#diff-b9a27acaccf9d79fd9c1e344f940ade6565f3c1b9fa8000e62d0c116e311a113R134
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's continue this discussion in your PR, as you have a proposal for this waiting logic in that PR.
I will add the waiting logic once this PR gets merged. #19538 |
...e/pulsar/broker/loadbalance/extensions/strategy/DefaultNamespaceBundleSplitStrategyImpl.java
Show resolved
Hide resolved
} | ||
|
||
if (counter.updatedAt() > counterLastUpdatedAt) { | ||
splitMetrics.set(counter.toMetrics(pulsar.getAdvertisedAddress())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we use brokerRegistry().getBrokerId()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be consistent with other metrics code, here we use pulsar.getAdvertisedAddress()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I see.
} | ||
} | ||
|
||
if (counter.updatedAt() > counterLastUpdatedAt) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should update when FutureUtil.waitForAll(futures).whenComplete
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to make syncWaiting after FutureUtil.waitForAll .
} | ||
FutureUtil.waitForAll(futures).exceptionally(ex -> { | ||
log.error("Failed to wait for split events to persist.", ex); | ||
counter.update(Failure, Unknown); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already updated when serviceUnitStateChannel.publishSplitEventAsync(split)
failure. Here is an unnecessary update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
73be2ba
to
c856344
Compare
} | ||
}); | ||
return new InFlightSplitRequest(decision, future); | ||
}).future); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should update the counter here, because the eventPubFuture
might be failed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved the pub failure handling logic to SplitManager.
.exceptionally(e -> { | ||
log.error("Failed to publish the bundle split event for bundle:{}. Skipping wait.", bundle); | ||
counter.update(Failure, Unknown); | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using exceptionally will cause the returned future to lose the exception info. We should use whenComplete
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
var future = inFlightSplitRequest.future; | ||
if (!future.isDone()) { | ||
if (ex != null) { | ||
counter.update(Failure, Unknown); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we update the counter here, when complete exception it will updates twice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest moving the counter update to waitAsync
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you clarify how we are updating this failure counter twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh I see. I think we need to clean this part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch. Updated.
log.warn("Timed out while waiting for the bundle split event: {}", bundle, ex); | ||
} | ||
}); | ||
return new InFlightSplitRequest(decision, future); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now we don't need InFlightSplitRequest
obj right? Since we already pass the decision in whenComplete
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. Updated.
e71ea9f
to
1ac2868
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
/pulsarbot run-failure-checks |
Master Issue: #16691
Motivation
We will start raising PRs to implement PIP-192, #16691
Modifications
This PR implemented
Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
doc
doc-required
doc-not-needed
doc-complete
We will have separate PRs to update the Doc later.
Matching PR in forked repository
PR in forked repository: heesung-sn#30