Skip to content

Commit

Permalink
KAFKA-12897: KRaft multi-partition placement on single broker (#10823)
Browse files Browse the repository at this point in the history
#10494 introduced a bug in the KRaft controller where the controller will loop forever in StripedReplicaPlacer trying to identify the racks on which to place partition replicas if there is a single unfenced broker in the cluster and the number of requested partitions in a CREATE_TOPICS request is greater than 1.

This patch refactors out some argument sanity checks and invokes those checks in both RackList and StripedReplicaPlacer, and it adds tests for this as well as the single broker placement issue.

Reviewers: Jun Rao <[email protected]>
  • Loading branch information
rondagostino authored Jun 7, 2021
1 parent 51665b9 commit 43db8ac
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -340,14 +340,14 @@ List<Optional<String>> rackNames() {
}

List<Integer> place(int replicationFactor) {
if (replicationFactor <= 0) {
throw new InvalidReplicationFactorException("Invalid replication factor " +
replicationFactor + ": the replication factor must be positive.");
}
throwInvalidReplicationFactorIfNonPositive(replicationFactor);
throwInvalidReplicationFactorIfTooFewBrokers(replicationFactor, numTotalBrokers());
throwInvalidReplicationFactorIfZero(numUnfencedBrokers());
// If we have returned as many assignments as there are unfenced brokers in
// the cluster, shuffle the rack list and broker lists to try to avoid
// repeating the same assignments again.
if (epoch == numUnfencedBrokers) {
// But don't reset the iteration epoch for a single unfenced broker -- otherwise we would loop forever
if (epoch == numUnfencedBrokers && numUnfencedBrokers > 1) {
shuffle();
epoch = 0;
}
Expand Down Expand Up @@ -400,6 +400,27 @@ void shuffle() {
}
}

private static void throwInvalidReplicationFactorIfNonPositive(int replicationFactor) {
if (replicationFactor <= 0) {
throw new InvalidReplicationFactorException("Invalid replication factor " +
replicationFactor + ": the replication factor must be positive.");
}
}

private static void throwInvalidReplicationFactorIfZero(int numUnfenced) {
if (numUnfenced == 0) {
throw new InvalidReplicationFactorException("All brokers are currently fenced.");
}
}

private static void throwInvalidReplicationFactorIfTooFewBrokers(int replicationFactor, int numTotalBrokers) {
if (replicationFactor > numTotalBrokers) {
throw new InvalidReplicationFactorException("The target replication factor " +
"of " + replicationFactor + " cannot be reached because only " +
numTotalBrokers + " broker(s) are registered.");
}
}

private final Random random;

public StripedReplicaPlacer(Random random) {
Expand All @@ -412,14 +433,9 @@ public List<List<Integer>> place(int startPartition,
short replicationFactor,
Iterator<UsableBroker> iterator) {
RackList rackList = new RackList(random, iterator);
if (rackList.numUnfencedBrokers() == 0) {
throw new InvalidReplicationFactorException("All brokers are currently fenced.");
}
if (replicationFactor > rackList.numTotalBrokers()) {
throw new InvalidReplicationFactorException("The target replication factor " +
"of " + replicationFactor + " cannot be reached because only " +
rackList.numTotalBrokers() + " broker(s) are registered.");
}
throwInvalidReplicationFactorIfNonPositive(replicationFactor);
throwInvalidReplicationFactorIfZero(rackList.numUnfencedBrokers());
throwInvalidReplicationFactorIfTooFewBrokers(replicationFactor, rackList.numTotalBrokers());
List<List<Integer>> placements = new ArrayList<>(numPartitions);
for (int partition = 0; partition < numPartitions; partition++) {
placements.add(rackList.place(replicationFactor));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,22 @@ public void testAvoidFencedReplicaIfPossibleOnSingleRack() {
assertEquals(Arrays.asList(0, 4, 3, 2), rackList.place(4));
}

/**
* Test that we perform striped replica placement as expected for a multi-partition topic
* on a single unfenced broker
*/
@Test
public void testMultiPartitionTopicPlacementOnSingleUnfencedBroker() {
MockRandom random = new MockRandom();
StripedReplicaPlacer placer = new StripedReplicaPlacer(random);
assertEquals(Arrays.asList(Arrays.asList(0),
Arrays.asList(0),
Arrays.asList(0)),
placer.place(0, 3, (short) 1, Arrays.asList(
new UsableBroker(0, Optional.empty(), false),
new UsableBroker(1, Optional.empty(), true)).iterator()));
}

/**
* Test that we will place on the fenced replica if we need to.
*/
Expand Down Expand Up @@ -167,6 +183,17 @@ public void testNotEnoughBrokers() {
new UsableBroker(10, Optional.of("1"), false)).iterator())).getMessage());
}

@Test
public void testNonPositiveReplicationFactor() {
MockRandom random = new MockRandom();
StripedReplicaPlacer placer = new StripedReplicaPlacer(random);
assertEquals("Invalid replication factor 0: the replication factor must be positive.",
assertThrows(InvalidReplicationFactorException.class,
() -> placer.place(0, 1, (short) 0, Arrays.asList(
new UsableBroker(11, Optional.of("1"), false),
new UsableBroker(10, Optional.of("1"), false)).iterator())).getMessage());
}

@Test
public void testSuccessfulPlacement() {
MockRandom random = new MockRandom();
Expand Down Expand Up @@ -210,4 +237,42 @@ public void testEvenDistribution() {
assertEquals(11, counts.get(Arrays.asList(3, 2)));
}

@Test
public void testRackListAllBrokersFenced() {
// ensure we can place N replicas on a rack when the rack has less than N brokers
MockRandom random = new MockRandom();
RackList rackList = new RackList(random, Arrays.asList(
new UsableBroker(0, Optional.empty(), true),
new UsableBroker(1, Optional.empty(), true),
new UsableBroker(2, Optional.empty(), true)).iterator());
assertEquals(3, rackList.numTotalBrokers());
assertEquals(0, rackList.numUnfencedBrokers());
assertEquals(Collections.singletonList(Optional.empty()), rackList.rackNames());
assertEquals("All brokers are currently fenced.",
assertThrows(InvalidReplicationFactorException.class,
() -> rackList.place(3)).getMessage());
}

@Test
public void testRackListNotEnoughBrokers() {
MockRandom random = new MockRandom();
RackList rackList = new RackList(random, Arrays.asList(
new UsableBroker(11, Optional.of("1"), false),
new UsableBroker(10, Optional.of("1"), false)).iterator());
assertEquals("The target replication factor of 3 cannot be reached because only " +
"2 broker(s) are registered.",
assertThrows(InvalidReplicationFactorException.class,
() -> rackList.place(3)).getMessage());
}

@Test
public void testRackListNonPositiveReplicationFactor() {
MockRandom random = new MockRandom();
RackList rackList = new RackList(random, Arrays.asList(
new UsableBroker(11, Optional.of("1"), false),
new UsableBroker(10, Optional.of("1"), false)).iterator());
assertEquals("Invalid replication factor -1: the replication factor must be positive.",
assertThrows(InvalidReplicationFactorException.class,
() -> rackList.place(-1)).getMessage());
}
}

0 comments on commit 43db8ac

Please sign in to comment.