-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-12788: improve KRaft replica placement #10494
Conversation
a5f4da9
to
8c2cc53
Compare
8c2cc53
to
f6c5f7f
Compare
Implement the existing Kafka replica placement algorithm for KRaft. This also means implementing rack awareness. Previously, we just chose replicas randomly in a non-rack-aware fashion. Also, allow replicas to be placed on fenced brokers if there are no other choices. This was specified in KIP-631 but previously not implemented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cmccabe : Thanks for the PR. A few comments below. Also, we probably want to associate a jira with the PR since it's a bit large.
* Randomly shuffle the brokers in this list. | ||
*/ | ||
void shuffle(Random random) { | ||
Collections.shuffle(brokers, random); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we reset the offset here too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I don't think resetting the position is necessary... when RackList#place
shuffles, it also sets the epoch to 0, so the offset will be reset anyway.
private final List<Integer> brokers = new ArrayList<>(0); | ||
private int epoch = 0; | ||
private int index = 0; | ||
private int offset = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add a bit comment explaining epoch, index and offset?
assertEquals(Collections.singletonList(Optional.empty()), rackList.rackNames()); | ||
assertEquals(Arrays.asList(3, 2, 1), rackList.place(3)); | ||
assertEquals(Arrays.asList(2, 3, 1), rackList.place(3)); | ||
assertEquals(Arrays.asList(3, 2, 1), rackList.place(3)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, we should have shuffled the broker list here. Why is the assignment pattern repeating here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because broker 1 is fenced, we don't place a replica there until we need to (there are no more replicas remaining). So it will always be the last / least preferred replica if we have 3 brokers and need a partition with replication factor 3
Optional.of("2"), | ||
Optional.of("3"), | ||
Optional.of("4")), rackList.rackNames()); | ||
assertEquals(Arrays.asList(41, 11, 21, 30), rackList.place(4)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why didn't the leader start from rack 1, which is the fist in the rack list? Also, why didn't rack 2 start with 20, which sorts first during initialization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why didn't the leader start from rack 1, which is the fist in the rack list?
The starting rack is randomized. If the first partition always put its leader on a specific rack, that would create skew, since a lot of topics are created with only one or two partitions.
Also, why didn't rack 2 start with 20, which sorts first during initialization?
BrokerList#offset
is also randomized, for the same reason (to avoid favoring replicas with a lower id)
} | ||
|
||
@Test | ||
public void testSuccessfulPlacement() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add a test that verifies that not only the first replica is distributed evenly, but for partitions with the same first replica, their second replicas are also distributed evenly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add a distribution test
* each partition would try to get a replica there. In general racks are supposed to be | ||
* about the same size -- if they aren't, this is a user error. | ||
* | ||
* Thirdly, we would prefer to place replicas on unfenced brokers, rather than on fenced |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be useful to document another goal: for partitions with the same 1st replica, we want to distribute the second replica for those partitions evenly. This way, if the first broker fails, the new leaders will be evenly distributed among the surviving brokers. The algorithm achieves that by forcing a shuffle when the partition index is a multiple of the number of brokers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some text saying that we want new leaders to be evenly distributed if any one broker is fenced
// If we have returned as many assignments as there are unfenced brokers in | ||
// the cluster, shuffle the rack list and broker lists to try to avoid | ||
// repeating the same assignments again. | ||
if (epoch == numUnfencedBrokers) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be (epoch % numUnfencedBrokers) == 0
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The epoch gets reset to 0 once it reaches numUnfencedBrokers. This avoids doing the modulus check, which is expensive as I understand it
Fair. I created KAFKA-12788 for this PR |
for (List<Integer> partitionReplicas : replicas) { | ||
counts.put(partitionReplicas, counts.getOrDefault(partitionReplicas, 0) + 1); | ||
} | ||
assertEquals(14, counts.get(Arrays.asList(0, 1))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For even distribution, it would be useful to verify 2 things. (1) The leaders are distributed evenly when all brokers are unfenced. (2) If any broker is fenced, the new leaders are still distributed evenly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. Currently, we don't place partitions on fenced brokers unless there are no other options. We also never make the leader a fenced broker. So condition #2 does not hold in general.
The thinking is that when a broker is fenced, it may stay offline for a long time, potentially. So we don't really want to place anything there unless there is absolutely no other choice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cmccabe : Thanks for the updated PR. Just one more comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cmccabe : Thanks for the explanation. The PR LGTM
#10494 introduced a bug in the KRaft controller where the controller will loop forever in StripedReplicaPlacer trying to identify the racks on which to place partition replicas if there is a single unfenced broker in the cluster and the number of requested partitions in a CREATE_TOPICS request is greater than 1. This patch refactors out some argument sanity checks and invokes those checks in both RackList and StripedReplicaPlacer, and it adds tests for this as well as the single broker placement issue. Reviewers: Jun Rao <[email protected]>
Implement the existing Kafka replica placement algorithm for KRaft.
This also means implementing rack awareness. Previously, we just chose
replicas randomly in a non-rack-aware fashion. Also, allow replicas to
be placed on fenced brokers if there are no other choices. This was
specified in KIP-631 but previously not implemented.