-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][broker] PIP-192: Implement extensible load manager #19102
[improve][broker] PIP-192: Implement extensible load manager #19102
Conversation
5f44f74
to
acc4788
Compare
acc4788
to
d48b52a
Compare
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadManager.java
Outdated
Show resolved
Hide resolved
...broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
Outdated
Show resolved
Hide resolved
...in/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
Outdated
Show resolved
Hide resolved
public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicName topic, LookupOptions options) { | ||
long startTime = System.nanoTime(); | ||
|
||
CompletableFuture<Optional<LookupResult>> future = getBundleAsync(topic) | ||
.thenCompose(bundle -> findBrokerServiceUrl(bundle, options)); | ||
.thenCompose(bundle -> { | ||
if (isExtensibleLoadManager()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was working on these NamespaceService change as per pluggable new broker load balancer
work.
Do you want to keep this change here in this PR?
I think we need to add NamespaceService unit tests to track this variation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add the units test to cover enable the extensible load manager case.
...src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
Show resolved
Hide resolved
e9a9959
to
3d54c15
Compare
@@ -55,6 +55,7 @@ | |||
import org.apache.pulsar.broker.loadbalance.LeaderElectionService; | |||
import org.apache.pulsar.broker.loadbalance.LoadManager; | |||
import org.apache.pulsar.broker.loadbalance.ResourceUnit; | |||
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be great to add unit tests in NamespaceServiceTest for these if-else variations. (assert if the new/old load manager is called for these Namespace public funcs).
Or, please add a TODO comment if we want to add such tests later.
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) {
// new logic
} else {
// old logic
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I'll add it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After thinking about it, I guess adding tests later might be better since some methods of NamespaceService
still need to add new logic when using extensible load manager.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: It would be great if we can have a separate PR to integrate ExtensibleLoadManagerImpl into the existing code(to the NamespaceService and PulsarService). This PR could be ExtensibleLoadManagerImpl (and other dependent changes) only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to split it into two PRs, but ExtensibleLoadManagerImpl
test dependency topic lookup because we are using TableView
in ServiceUnitStateChannel
, so I have to implement the load manager function.
...src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
Show resolved
Hide resolved
...in/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java
Outdated
Show resolved
Hide resolved
ee5785f
to
cf0ad90
Compare
...src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
Show resolved
Hide resolved
...src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
Outdated
Show resolved
Hide resolved
@@ -55,6 +55,7 @@ | |||
import org.apache.pulsar.broker.loadbalance.LeaderElectionService; | |||
import org.apache.pulsar.broker.loadbalance.LoadManager; | |||
import org.apache.pulsar.broker.loadbalance.ResourceUnit; | |||
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: It would be great if we can have a separate PR to integrate ExtensibleLoadManagerImpl into the existing code(to the NamespaceService and PulsarService). This PR could be ExtensibleLoadManagerImpl (and other dependent changes) only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
6df4f47
to
3765839
Compare
@@ -0,0 +1,222 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you remove the BrokerRegistryImpl
related code in this PR since I see they were added in #18810?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work! left some trivial comments.
@@ -0,0 +1,222 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, we'd better merge the PR #18810 first.
...src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
Show resolved
Hide resolved
...src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
Outdated
Show resolved
Hide resolved
…nager # Conflicts: # pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistry.java # pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java # pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryTest.java
...src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
Show resolved
Hide resolved
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #19102 +/- ##
============================================
- Coverage 63.24% 62.49% -0.76%
+ Complexity 25936 25660 -276
============================================
Files 1823 1829 +6
Lines 133338 133915 +577
Branches 14677 14732 +55
============================================
- Hits 84330 83690 -640
- Misses 41290 42565 +1275
+ Partials 7718 7660 -58
Flags with carried forward coverage won't be shown. Click here to find out more.
|
This PR introduced a new flaky test #20007 . @Demogorgon314 Do you have a chance to fix that? |
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(config)) { | ||
return loadManager.get().findBrokerServiceUrl(Optional.of(topic), bundle); | ||
} else { | ||
// TODO: Add unit tests cover it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My personal habit is to never add those TODOs - they never get done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your advice. We should refactor the NamespaceService instead of adding so many if else here. We can remove this to-do when we're doing refactoring.
PIP: #16691
Motivation
Implement extensible load manager.
Modifications
For the PIP-192, this PR adds
ExtensibleLoadManagerImpl
and unit tests.This PR also changes:
CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl( Optional<ServiceUnitId> topic, ServiceUnitId bundle)
toLoadManager
interface.CompletableFuture<Boolean> checkOwnershipAsync(Optional<ServiceUnitId> topic, ServiceUnitId bundle)
toLoadManager
interface.CompletableFuture<String> getOwnerAsync(String serviceUnit)
toCompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit)
to unify the result.Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: Demogorgon314#10