-
Notifications
You must be signed in to change notification settings - Fork 596
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a new endpoint to support dynamically increasing topic's replication factor. #710
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch! -- added some comments.
cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java
Outdated
Show resolved
Hide resolved
cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java
Outdated
Show resolved
Hide resolved
.../com/linkedin/kafka/cruisecontrol/servlet/parameters/UpdateTopicConfigurationParameters.java
Outdated
Show resolved
Hide resolved
@@ -411,6 +418,12 @@ private void demoteBroker(HttpServletRequest request, HttpServletResponse respon | |||
request, response); | |||
} | |||
|
|||
private void updateTopicConfiguration(HttpServletRequest request, HttpServletResponse response, Supplier<UpdateTopicConfigurationParameters> paramSupplier) | |||
throws IOException, ExecutionException, InterruptedException { | |||
asyncRequest(paramSupplier, parameters -> (uuid -> _asyncKafkaCruiseControl.updateTopicConfiguration(parameters, uuid)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you clarify why we choose to make this endpoint async?
(Currently we only make the endpoints that need generating a cluster model async)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two reasons
- Executor needs a UUID when we submit a task to it, using current syncRequest, we cannot get UUID
- in the follow-up patch, we plan to generate cluster mode and consider goals to generate proposal.
...l/src/main/java/com/linkedin/kafka/cruisecontrol/async/UpdateTopicConfigurationRunnable.java
Outdated
Show resolved
Hide resolved
List<String> racks = new ArrayList<>(brokersByRack.keySet()); | ||
int[] cursors = new int[racks.size()]; | ||
int rackCursor = 0; | ||
for (PartitionInfo partitionInfo : cluster.partitionsForTopic(topic)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: For ease of readability and avoid repetition (in if
and else parts
), is it possible to move this logic to separate smaller functions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel it is hard to extract the common logic since the only common logic is to check no offline replica and initialize local variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to move the relevant content of the loop to a smaller function -- e.g. something like (potentially with more/less parameters):
private Set<ExecutionProposal> updateReplicationFactorFor(PartitionInfo partitionInfo, short replicationFactor) {...}
This helps parsing the logic easier and makes it more modular.
...n/java/com/linkedin/kafka/cruisecontrol/servlet/response/UpdateTopicConfigurationResult.java
Outdated
Show resolved
Hide resolved
...n/java/com/linkedin/kafka/cruisecontrol/servlet/response/UpdateTopicConfigurationResult.java
Outdated
Show resolved
Hide resolved
...n/java/com/linkedin/kafka/cruisecontrol/servlet/response/UpdateTopicConfigurationResult.java
Outdated
Show resolved
Hide resolved
...n/java/com/linkedin/kafka/cruisecontrol/servlet/response/UpdateTopicConfigurationResult.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a couple comments -- otherwise LGTM!
cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java
Show resolved
Hide resolved
List<String> racks = new ArrayList<>(brokersByRack.keySet()); | ||
int[] cursors = new int[racks.size()]; | ||
int rackCursor = 0; | ||
for (PartitionInfo partitionInfo : cluster.partitionsForTopic(topic)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to move the relevant content of the loop to a smaller function -- e.g. something like (potentially with more/less parameters):
private Set<ExecutionProposal> updateReplicationFactorFor(PartitionInfo partitionInfo, short replicationFactor) {...}
This helps parsing the logic easier and makes it more modular.
cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java
Show resolved
Hide resolved
* There are two scenarios that rack awareness property is not guaranteed. | ||
* <ul> | ||
* <li> If metadata does not have rack information about brokers, then it is only guaranteed that new replicas are | ||
* added to brokers which currently do not host any replicas of partition.</li> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is definitely not critical -- just fyi, but feel free to skip this: https://www.grammarly.com/blog/comma-before-which/
patch for feature request 590.