Skip to content

Commit

Permalink
KAFKA-10661; Add new resigned state for graceful shutdown/initializat…
Browse files Browse the repository at this point in the history
…ion (#9531)

When initializing the raft state machine after shutting down as a leader, we were previously entering the "unattached" state, which means we have no leader and no voted candidate. This was a bug because it allowed a reinitialized leader to cast a vote for a candidate in the same epoch that it was already the leader of. This patch fixes the problem by introducing a new "resigned" state which allows us to retain the leader state so that we cannot change our vote and we will not accept additional appends.

This patch also revamps the shutdown logic to make use of the new "resigned" state. Previously we had a separate path in `KafkaRaftClient.poll` for the shutdown logic which resulted in some duplication. Instead now we incorporate shutdown behavior into each state's respective logic.

Finally, this patch changes the shutdown logic so that `EndQuorumEpoch` is only sent by resigning leaders. Previously we allowed this request to be sent by candidates as well.

Reviewers: dengziming <[email protected]>, Guozhang Wang <[email protected]>
  • Loading branch information
hachikuji authored Nov 9, 2020
1 parent d61dc0c commit f49c6c2
Show file tree
Hide file tree
Showing 11 changed files with 687 additions and 206 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,14 @@ public EndQuorumEpochResponse getErrorResponse(int throttleTimeMs, Throwable e)
}

public static EndQuorumEpochRequestData singletonRequest(TopicPartition topicPartition,
int replicaId,
int leaderEpoch,
int leaderId,
List<Integer> preferredSuccessors) {
return singletonRequest(topicPartition, null, replicaId, leaderEpoch, leaderId, preferredSuccessors);
return singletonRequest(topicPartition, null, leaderEpoch, leaderId, preferredSuccessors);
}

public static EndQuorumEpochRequestData singletonRequest(TopicPartition topicPartition,
String clusterId,
int replicaId,
int leaderEpoch,
int leaderId,
List<Integer> preferredSuccessors) {
Expand All @@ -91,7 +89,6 @@ public static EndQuorumEpochRequestData singletonRequest(TopicPartition topicPar
.setPartitions(Collections.singletonList(
new EndQuorumEpochRequestData.PartitionData()
.setPartitionIndex(topicPartition.partition())
.setReplicaId(replicaId)
.setLeaderEpoch(leaderEpoch)
.setLeaderId(leaderId)
.setPreferredSuccessors(preferredSuccessors))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@
"versions": "0+", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "ReplicaId", "type": "int32", "versions": "0+",
"about": "The ID of the replica sending this request"},
{ "name": "LeaderId", "type": "int32", "versions": "0+",
"about": "The current leader ID or -1 if there is a vote in progress"},
"about": "The current leader ID that is resigning"},
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+",
"about": "The current epoch"},
{ "name": "PreferredSuccessors", "type": "[]int32", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,8 @@ class KafkaNetworkChannelTest {
BeginQuorumEpochRequest.singletonRequest(topicPartition, clusterId, leaderEpoch, leaderId)

case ApiKeys.END_QUORUM_EPOCH =>
val replicaId = 1
EndQuorumEpochRequest.singletonRequest(topicPartition, clusterId, replicaId,
leaderId, leaderEpoch, Collections.singletonList(2))
EndQuorumEpochRequest.singletonRequest(topicPartition, clusterId, leaderId,
leaderEpoch, Collections.singletonList(2))

case ApiKeys.VOTE =>
val lastEpoch = 4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ class RequestQuotaTest extends BaseRequestTest {

case ApiKeys.END_QUORUM_EPOCH =>
new EndQuorumEpochRequest.Builder(EndQuorumEpochRequest.singletonRequest(
tp, 10, 2, 5, Collections.singletonList(3)))
tp, 10, 5, Collections.singletonList(3)))

case ApiKeys.ALTER_ISR =>
new AlterIsrRequest.Builder(new AlterIsrRequestData())
Expand Down
Loading

0 comments on commit f49c6c2

Please sign in to comment.