-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16533; Update voter handling #16735
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only a few minor comments
@@ -543,6 +555,15 @@ public void initialize( | |||
quorumConfig.requestTimeoutMs(), | |||
logContext | |||
); | |||
|
|||
// Specialized remove voter handler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: specialized update voter handler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
); | ||
} | ||
|
||
Optional<Errors> leaderValidation = validateLeaderOnlyRequest(data.currentLeaderEpoch()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I know this is the convention handleFetchSnapshotRequest uses too, but could we rename to leaderValidationError
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you referring to the variable name? Replace "leaderValidation" with "leaderValidationError"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
if (!voterEndpoints.address(channel.listenerName()).isPresent()) { | ||
return completedFuture( | ||
RaftUtil.updateVoterResponse( | ||
Errors.INVALID_REQUEST, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wonder if we might benefit from the update voter response schema including an error message field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about this before. The ErrorMessage field is a human readable message meant to be displayed to the end user. This is why the admin RPCs (AddVoter and RemoveVoter) have the additional ErrorMessage field. UpdateVoter is an internal RPC. Internal RPCs don't tend to have a human readable error message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about some logging in the error cases? although the RPC is internal, an invalid update could stem from operator action (and it could be useful for them to dig into logs to understand why an update failed)
); | ||
} | ||
|
||
Endpoints voterEndpoints = Endpoints.fromUpdateVoterRequest(data.listeners()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do want to catch any illegalArgumentExceptions that might be thrown from this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. Catching IllegalArgumentExceptions is not enough. Let me file an issue to audit all of the validation of the new RPCs: https://issues.apache.org/jira/browse/KAFKA-17264
partitionState, | ||
channel.listenerName(), | ||
time, | ||
quorumConfig.requestTimeoutMs() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this need to be passed in vs imported in UpdateVoterHandler?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand the question. In general inputs to a type or method should be limited to what they require. This avoid passing a configuration object with many methods where only one is used or required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, you can ignore this comment. I had mistakenly thought it would be possible to import the underlying config in UpdateVoterHandler
if (leaderState.isOperationPending(currentTimeMs)) { | ||
return CompletableFuture.completedFuture( | ||
RaftUtil.updateVoterResponse( | ||
Errors.REQUEST_TIMED_OUT, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another place where it would be nice if there was an error msg (or at least logging)
@@ -95,6 +95,21 @@ public Optional<Node> voterNode(int voterId, ListenerName listenerName) { | |||
.map(address -> new Node(voterId, address.getHostString(), address.getPort())); | |||
} | |||
|
|||
/** | |||
* Return true the provided voter node is a voter and would cause a change in the voter set. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Return true if the provided voter node ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -23,6 +23,16 @@ | |||
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", | |||
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, | |||
{ "name": "ErrorCode", "type": "int16", "versions": "0+", | |||
"about": "The error code, or 0 if there was no error" } | |||
"about": "The error code, or 0 if there was no error" }, | |||
{ "name": "CurrentLeader", "type": "CurrentLeader", "versions": "0+", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like RequestResponseTest.createUpdateRaftVoterResponse
has not been updated to set these new fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
LGTM. Committed. |
Add support for handling the update voter RPC. The update voter RPC is used to automatically update the voters supported kraft versions and available endpoints as the operator upgrades and reconfigures the KRaft controllers.
The add voter RPC is handled as follow:
Lastly, this change also implements the ability for the leader to update its own entry in the voter set when it becomes leader for an epoch. This is done by updating the voter set and writing a control batch as the first batch in a new leader epoch.
Committer Checklist (excluded from commit message)