diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 318384fca9c0b..6495ee8e0defd 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -232,6 +232,7 @@ + diff --git a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java index d78ac8833cad8..f5ed4a287c890 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java @@ -25,6 +25,7 @@ import org.slf4j.Logger; import java.util.ArrayList; +import java.util.Collection; import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; @@ -272,6 +273,11 @@ BrokerHeartbeatStateList unfenced() { return unfenced; } + // VisibleForTesting + Collection brokers() { + return brokers.values(); + } + /** * Mark a broker as fenced. * @@ -439,7 +445,7 @@ List findStaleBrokers() { * @param numPartitions The number of partitions to place. * @param numReplicas The number of replicas for each partition. * @param idToRack A function mapping broker id to broker rack. - * @param policy The replica placement policy to use. + * @param placer The replica placer to use. * * @return A list of replica lists. * @@ -449,12 +455,10 @@ List> placeReplicas(int startPartition, int numPartitions, short numReplicas, Function> idToRack, - ReplicaPlacementPolicy policy) { - // TODO: support using fenced brokers here if necessary to get to the desired - // number of replicas. We probably need to add a fenced boolean in UsableBroker. + ReplicaPlacer placer) { Iterator iterator = new UsableBrokerIterator( - unfenced.iterator(), idToRack); - return policy.createPlacement(startPartition, numPartitions, numReplicas, iterator); + brokers.values().iterator(), idToRack); + return placer.place(startPartition, numPartitions, numReplicas, iterator); } static class UsableBrokerIterator implements Iterator { @@ -482,7 +486,7 @@ public boolean hasNext() { result = iterator.next(); } while (result.shuttingDown()); Optional rack = idToRack.apply(result.id()); - next = new UsableBroker(result.id(), rack); + next = new UsableBroker(result.id(), rack, result.fenced()); return true; } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java index 6d34024ced922..db639432ee194 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java @@ -103,9 +103,9 @@ boolean check() { private final long sessionTimeoutNs; /** - * The replica placement policy to use. + * The replica placer to use. */ - private final ReplicaPlacementPolicy placementPolicy; + private final ReplicaPlacer replicaPlacer; /** * Maps broker IDs to broker registrations. @@ -127,12 +127,12 @@ boolean check() { Time time, SnapshotRegistry snapshotRegistry, long sessionTimeoutNs, - ReplicaPlacementPolicy placementPolicy) { + ReplicaPlacer replicaPlacer) { this.logContext = logContext; this.log = logContext.logger(ClusterControlManager.class); this.time = time; this.sessionTimeoutNs = sessionTimeoutNs; - this.placementPolicy = placementPolicy; + this.replicaPlacer = replicaPlacer; this.brokerRegistrations = new TimelineHashMap<>(snapshotRegistry, 0); this.heartbeatManager = null; this.readyBrokersFuture = Optional.empty(); @@ -317,7 +317,7 @@ public List> placeReplicas(int startPartition, throw new RuntimeException("ClusterControlManager is not active."); } return heartbeatManager.placeReplicas(startPartition, numPartitions, numReplicas, - id -> brokerRegistrations.get(id).rack(), placementPolicy); + id -> brokerRegistrations.get(id).rack(), replicaPlacer); } public boolean unfenced(int brokerId) { diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 2645b8615ce60..c45a6f1125ce8 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -122,8 +122,7 @@ static public class Builder { private Map supportedFeatures = Collections.emptyMap(); private short defaultReplicationFactor = 3; private int defaultNumPartitions = 1; - private ReplicaPlacementPolicy replicaPlacementPolicy = - new SimpleReplicaPlacementPolicy(new Random()); + private ReplicaPlacer replicaPlacer = new StripedReplicaPlacer(new Random()); private Function snapshotWriterBuilder; private SnapshotReader snapshotReader; private long sessionTimeoutNs = NANOSECONDS.convert(18, TimeUnit.SECONDS); @@ -173,8 +172,8 @@ public Builder setDefaultNumPartitions(int defaultNumPartitions) { return this; } - public Builder setReplicaPlacementPolicy(ReplicaPlacementPolicy replicaPlacementPolicy) { - this.replicaPlacementPolicy = replicaPlacementPolicy; + public Builder setReplicaPlacer(ReplicaPlacer replicaPlacer) { + this.replicaPlacer = replicaPlacer; return this; } @@ -224,7 +223,7 @@ public QuorumController build() throws Exception { queue = new KafkaEventQueue(time, logContext, threadNamePrefix); return new QuorumController(logContext, nodeId, queue, time, configDefs, logManager, supportedFeatures, defaultReplicationFactor, - defaultNumPartitions, replicaPlacementPolicy, snapshotWriterBuilder, + defaultNumPartitions, replicaPlacer, snapshotWriterBuilder, snapshotReader, sessionTimeoutNs, controllerMetrics); } catch (Exception e) { Utils.closeQuietly(queue, "event queue"); @@ -906,7 +905,7 @@ private QuorumController(LogContext logContext, Map supportedFeatures, short defaultReplicationFactor, int defaultNumPartitions, - ReplicaPlacementPolicy replicaPlacementPolicy, + ReplicaPlacer replicaPlacer, Function snapshotWriterBuilder, SnapshotReader snapshotReader, long sessionTimeoutNs, @@ -923,7 +922,7 @@ private QuorumController(LogContext logContext, snapshotRegistry, configDefs); this.clientQuotaControlManager = new ClientQuotaControlManager(snapshotRegistry); this.clusterControl = new ClusterControlManager(logContext, time, - snapshotRegistry, sessionTimeoutNs, replicaPlacementPolicy); + snapshotRegistry, sessionTimeoutNs, replicaPlacer); this.featureControl = new FeatureControlManager(supportedFeatures, snapshotRegistry); this.snapshotGeneratorManager = new SnapshotGeneratorManager(snapshotWriterBuilder); this.replicationControl = new ReplicationControlManager(snapshotRegistry, diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicaPlacementPolicy.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicaPlacer.java similarity index 86% rename from metadata/src/main/java/org/apache/kafka/controller/ReplicaPlacementPolicy.java rename to metadata/src/main/java/org/apache/kafka/controller/ReplicaPlacer.java index 097463f7e76da..9a705f43d8445 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicaPlacementPolicy.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicaPlacer.java @@ -28,7 +28,7 @@ * The interface which a Kafka replica placement policy must implement. */ @InterfaceStability.Unstable -interface ReplicaPlacementPolicy { +interface ReplicaPlacer { /** * Create a new replica placement. * @@ -42,9 +42,9 @@ interface ReplicaPlacementPolicy { * * @throws InvalidReplicationFactorException If too many replicas were requested. */ - List> createPlacement(int startPartition, - int numPartitions, - short numReplicas, - Iterator iterator) + List> place(int startPartition, + int numPartitions, + short numReplicas, + Iterator iterator) throws InvalidReplicationFactorException; } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index b270e35205adb..fc4ab6f705c57 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -510,7 +510,7 @@ private ApiError createTopic(CreatableTopic topic, } catch (InvalidReplicationFactorException e) { return new ApiError(Errors.INVALID_REPLICATION_FACTOR, "Unable to replicate the partition " + replicationFactor + - " times: " + e.getMessage()); + " time(s): " + e.getMessage()); } } Uuid topicId = Uuid.randomUuid(); diff --git a/metadata/src/main/java/org/apache/kafka/controller/SimpleReplicaPlacementPolicy.java b/metadata/src/main/java/org/apache/kafka/controller/SimpleReplicaPlacementPolicy.java deleted file mode 100644 index a2f7c892e45dc..0000000000000 --- a/metadata/src/main/java/org/apache/kafka/controller/SimpleReplicaPlacementPolicy.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.controller; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Random; - -import org.apache.kafka.common.errors.InvalidReplicationFactorException; -import org.apache.kafka.metadata.UsableBroker; - - -/** - * A simple uniformly random placement policy. - * - * TODO: implement the current "striped" placement policy, plus rack aware placement - * policies, etc. - */ -public class SimpleReplicaPlacementPolicy implements ReplicaPlacementPolicy { - private final Random random; - - public SimpleReplicaPlacementPolicy(Random random) { - this.random = random; - } - - @Override - public List> createPlacement(int startPartition, - int numPartitions, - short numReplicas, - Iterator iterator) { - List usable = new ArrayList<>(); - while (iterator.hasNext()) { - usable.add(iterator.next()); - } - if (usable.size() < numReplicas) { - throw new InvalidReplicationFactorException("there are only " + usable.size() + - " usable brokers"); - } - List> results = new ArrayList<>(); - for (int p = 0; p < numPartitions; p++) { - List choices = new ArrayList<>(); - // TODO: rack-awareness - List indexes = new ArrayList<>(); - int initialIndex = random.nextInt(usable.size()); - for (int i = 0; i < numReplicas; i++) { - indexes.add((initialIndex + i) % usable.size()); - } - indexes.sort(Integer::compareTo); - Iterator iter = usable.iterator(); - for (int i = 0; choices.size() < indexes.size(); i++) { - int brokerId = iter.next().id(); - if (indexes.get(choices.size()) == i) { - choices.add(brokerId); - } - } - Collections.shuffle(choices, random); - results.add(choices); - } - return results; - } -} diff --git a/metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java b/metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java new file mode 100644 index 0000000000000..8d3c90ef80863 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/controller/StripedReplicaPlacer.java @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; + +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.metadata.OptionalStringComparator; +import org.apache.kafka.metadata.UsableBroker; + + +/** + * The striped replica placer. + * + * + * GOALS + * The design of this placer attempts to satisfy a few competing goals. Firstly, we want + * to spread the replicas as evenly as we can across racks. In the simple case where + * broker racks have not been configured, this goal is a no-op, of course. But it is the + * highest priority goal in multi-rack clusters. + * + * Our second goal is to spread the replicas evenly across brokers. Since we are placing + * multiple partitions, we try to avoid putting each partition on the same set of + * replicas, even if it does satisfy the rack placement goal. However, we treat the rack + * placement goal as higher priority than this goal-- if you configure 10 brokers in rack + * A and B, and 1 broker in rack C, you will end up with a lot of partitions on that one + * broker in rack C. If you were to place a lot of partitions with replication factor 3, + * each partition would try to get a replica there. In general racks are supposed to be + * about the same size -- if they aren't, this is a user error. + * + * Thirdly, we would prefer to place replicas on unfenced brokers, rather than on fenced + * brokers. + * + * + * CONSTRAINTS + * In addition to these goals, we have two constraints. Unlike the goals, these are not + * optional -- they are mandatory. Placement will fail if a constraint cannot be + * satisfied. The first constraint is that we can't place more than one replica on the + * same broker. This imposes an upper limit on replication factor-- for example, a 3-node + * cluster can't have any topics with replication factor 4. This constraint comes from + * Kafka's internal design. + * + * The second constraint is that the leader of each partition must be an unfenced broker. + * This constraint is a bit arbitrary. In theory, we could allow people to create + * new topics even if every broker were fenced. However, this would be confusing for + * users. + * + * + * ALGORITHM + * The StripedReplicaPlacer constructor loads the broker data into rack objects. Each + * rack object contains a sorted list of fenced brokers, and a separate sorted list of + * unfenced brokers. The racks themselves are organized into a sorted list, stored inside + * the top-level RackList object. + * + * The general idea is that we place replicas on to racks in a round-robin fashion. So if + * we had racks A, B, C, and D, and we were creating a new partition with replication + * factor 3, our first replica might come from A, our second from B, and our third from C. + * Of course our placement would not be very fair if we always started with rack A. + * Therefore, we generate a random starting offset when the RackList is created. So one + * time we might go B, C, D. Another time we might go C, D, A. And so forth. + * + * Note that each partition we generate advances the starting offset by one. + * So in our 4-rack cluster, with 3 partitions, we might choose these racks: + * + * partition 1: A, B, C + * partition 2: B, C, A + * partition 3: C, A, B + * + * This is what generates the characteristic "striped" pattern of this placer. + * + * So far I haven't said anything about how we choose a replica from within a rack. In + * fact, this is also done in a round-robin fashion. So if rack A had replica A0, A1, A2, + * and A3, we might return A0 the first time, A1, the second, A2 the third, and so on. + * Just like with the racks, we add a random starting offset to mix things up a bit. + * + * So let's say you had a cluster with racks A, B, and C, and each rack had 3 replicas, + * for 9 nodes in total. + * If all the offsets were 0, you'd get placements like this: + * + * partition 1: A0, B0, C0 + * partition 2: B1, C1, A1 + * partition 3: C2, A2, B2 + * + * One additional complication with choosing a replica within a rack is that we want to + * choose the unfenced replicas first. In a big cluster with lots of nodes available, + * we'd prefer not to place a new partition on a node that is fenced. Therefore, we + * actually maintain two lists, rather than the single list I described above. + * We only start using the fenced node list when the unfenced node list is totally + * exhausted. + * + * Furthermore, we cannot place the first replica (the leader) of a new partition on a + * fenced replica. Therefore, we have some special logic to ensure that this doesn't + * happen. + */ +public class StripedReplicaPlacer implements ReplicaPlacer { + /** + * A list of brokers that we can iterate through. + */ + static class BrokerList { + final static BrokerList EMPTY = new BrokerList(); + private final List brokers = new ArrayList<>(0); + private int epoch = 0; + private int index = 0; + private int offset = 0; + + BrokerList add(int broker) { + this.brokers.add(broker); + return this; + } + + /** + * Initialize this broker list by sorting it and randomizing the start offset. + * + * @param random The random number generator. + */ + void initialize(Random random) { + if (!brokers.isEmpty()) { + brokers.sort(Integer::compareTo); + this.offset = random.nextInt(brokers.size()); + } + } + + /** + * Randomly shuffle the brokers in this list. + */ + void shuffle(Random random) { + Collections.shuffle(brokers, random); + } + + /** + * @return The number of brokers in this list. + */ + int size() { + return brokers.size(); + } + + /** + * Get the next broker in this list, or -1 if there are no more elements to be + * returned. + * + * @param epoch The current iteration epoch. + * + * @return The broker ID, or -1 if there are no more brokers to be + * returned in this epoch. + */ + int next(int epoch) { + if (brokers.size() == 0) return -1; + if (this.epoch != epoch) { + this.epoch = epoch; + this.index = 0; + this.offset = (offset + 1) % brokers.size(); + } + if (index >= brokers.size()) return -1; + int broker = brokers.get((index + offset) % brokers.size()); + index++; + return broker; + } + } + + /** + * A rack in the cluster, which contains brokers. + */ + static class Rack { + private final BrokerList fenced = new BrokerList(); + private final BrokerList unfenced = new BrokerList(); + + /** + * Initialize this rack. + * + * @param random The random number generator. + */ + void initialize(Random random) { + fenced.initialize(random); + unfenced.initialize(random); + } + + void shuffle(Random random) { + fenced.shuffle(random); + unfenced.shuffle(random); + } + + BrokerList fenced() { + return fenced; + } + + BrokerList unfenced() { + return unfenced; + } + + /** + * Get the next unfenced broker in this rack, or -1 if there are no more brokers + * to be returned. + * + * @param epoch The current iteration epoch. + * + * @return The broker ID, or -1 if there are no more brokers to be + * returned in this epoch. + */ + int nextUnfenced(int epoch) { + return unfenced.next(epoch); + } + + /** + * Get the next broker in this rack, or -1 if there are no more brokers to be + * returned. + * + * @param epoch The current iteration epoch. + * + * @return The broker ID, or -1 if there are no more brokers to be + * returned in this epoch. + */ + int next(int epoch) { + int result = unfenced.next(epoch); + if (result >= 0) return result; + return fenced.next(epoch); + } + } + + /** + * A list of racks that we can iterate through. + */ + static class RackList { + /** + * The random number generator. + */ + private final Random random; + + /** + * A map from rack names to the brokers contained within them. + */ + private final Map, Rack> racks = new HashMap<>(); + + /** + * The names of all the racks in the cluster. + * + * Racks which have at least one unfenced broker come first (in sorted order), + * followed by racks which have only fenced brokers (also in sorted order). + */ + private final List> rackNames = new ArrayList<>(); + + /** + * The total number of brokers in the cluster, both fenced and unfenced. + */ + private final int numTotalBrokers; + + /** + * The total number of unfenced brokers in the cluster. + */ + private final int numUnfencedBrokers; + + /** + * The iteration epoch. + */ + private int epoch = 0; + + /** + * The offset we use to determine which rack is returned first. + */ + private int offset; + + RackList(Random random, Iterator iterator) { + this.random = random; + int numTotalBrokersCount = 0, numUnfencedBrokersCount = 0; + while (iterator.hasNext()) { + UsableBroker broker = iterator.next(); + Rack rack = racks.get(broker.rack()); + if (rack == null) { + rackNames.add(broker.rack()); + rack = new Rack(); + racks.put(broker.rack(), rack); + } + if (broker.fenced()) { + rack.fenced().add(broker.id()); + } else { + numUnfencedBrokersCount++; + rack.unfenced().add(broker.id()); + } + numTotalBrokersCount++; + } + for (Rack rack : racks.values()) { + rack.initialize(random); + } + this.rackNames.sort(OptionalStringComparator.INSTANCE); + this.numTotalBrokers = numTotalBrokersCount; + this.numUnfencedBrokers = numUnfencedBrokersCount; + this.offset = rackNames.isEmpty() ? 0 : random.nextInt(rackNames.size()); + } + + int numTotalBrokers() { + return numTotalBrokers; + } + + int numUnfencedBrokers() { + return numUnfencedBrokers; + } + + // VisibleForTesting + List> rackNames() { + return rackNames; + } + + List place(int replicationFactor) { + if (replicationFactor <= 0) { + throw new InvalidReplicationFactorException("Invalid replication factor " + + replicationFactor + ": the replication factor must be positive."); + } + // If we have returned as many assignments as there are unfenced brokers in + // the cluster, shuffle the rack list and broker lists to try to avoid + // repeating the same assignments again. + if (epoch == numUnfencedBrokers) { + shuffle(); + epoch = 0; + } + if (offset == rackNames.size()) { + offset = 0; + } + List brokers = new ArrayList<>(replicationFactor); + int firstRackIndex = offset; + while (true) { + Optional name = rackNames.get(firstRackIndex); + Rack rack = racks.get(name); + int result = rack.nextUnfenced(epoch); + if (result >= 0) { + brokers.add(result); + break; + } + firstRackIndex++; + if (firstRackIndex == rackNames.size()) { + firstRackIndex = 0; + } + } + int rackIndex = offset; + for (int replica = 1; replica < replicationFactor; replica++) { + int result = -1; + do { + if (rackIndex == firstRackIndex) { + firstRackIndex = -1; + } else { + Optional rackName = rackNames.get(rackIndex); + Rack rack = racks.get(rackName); + result = rack.next(epoch); + } + rackIndex++; + if (rackIndex == rackNames.size()) { + rackIndex = 0; + } + } while (result < 0); + brokers.add(result); + } + epoch++; + offset++; + return brokers; + } + + void shuffle() { + Collections.shuffle(rackNames, random); + for (Rack rack : racks.values()) { + rack.shuffle(random); + } + } + } + + private final Random random; + + public StripedReplicaPlacer(Random random) { + this.random = random; + } + + @Override + public List> place(int startPartition, + int numPartitions, + short replicationFactor, + Iterator iterator) { + RackList rackList = new RackList(random, iterator); + if (rackList.numUnfencedBrokers() == 0) { + throw new InvalidReplicationFactorException("All brokers are currently fenced."); + } + if (replicationFactor > rackList.numTotalBrokers()) { + throw new InvalidReplicationFactorException("The target replication factor " + + "of " + replicationFactor + " cannot be reached because only " + + rackList.numTotalBrokers() + " broker(s) are registered."); + } + List> placements = new ArrayList<>(numPartitions); + for (int partition = 0; partition < numPartitions; partition++) { + placements.add(rackList.place(replicationFactor)); + } + return placements; + } +} diff --git a/metadata/src/main/java/org/apache/kafka/metadata/OptionalStringComparator.java b/metadata/src/main/java/org/apache/kafka/metadata/OptionalStringComparator.java new file mode 100644 index 0000000000000..3f20507b33c39 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/OptionalStringComparator.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata; + +import java.util.Comparator; +import java.util.Optional; + + +public class OptionalStringComparator implements Comparator> { + public static final OptionalStringComparator INSTANCE = new OptionalStringComparator(); + + @Override + public int compare(Optional a, Optional b) { + if (!a.isPresent()) { + if (!b.isPresent()) { + return 0; + } else { + return -1; + } + } else if (!b.isPresent()) { + return 1; + } + return a.get().compareTo(b.get()); + } +} diff --git a/metadata/src/main/java/org/apache/kafka/metadata/UsableBroker.java b/metadata/src/main/java/org/apache/kafka/metadata/UsableBroker.java index 9a2db10fc0ab1..9c04ebd480b8c 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/UsableBroker.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/UsableBroker.java @@ -29,9 +29,12 @@ public class UsableBroker { private final Optional rack; - public UsableBroker(int id, Optional rack) { + private final boolean fenced; + + public UsableBroker(int id, Optional rack, boolean fenced) { this.id = id; this.rack = rack; + this.fenced = fenced; } public int id() { @@ -42,20 +45,24 @@ public Optional rack() { return rack; } + public boolean fenced() { + return fenced; + } + @Override public boolean equals(Object o) { if (!(o instanceof UsableBroker)) return false; UsableBroker other = (UsableBroker) o; - return other.id == id && other.rack.equals(rack); + return other.id == id && other.rack.equals(rack) && other.fenced == fenced; } @Override public int hashCode() { - return Objects.hash(id, rack); + return Objects.hash(id, rack, fenced); } @Override public String toString() { - return "UsableBroker(id=" + id + ", rack=" + rack + ")"; + return "UsableBroker(id=" + id + ", rack=" + rack + ", fenced=" + fenced + ")"; } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java index d70cc5c37d4ac..6c8ef7e7e88fd 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java @@ -174,7 +174,7 @@ public void testMetadataOffsetComparator() { private static Set usableBrokersToSet(BrokerHeartbeatManager manager) { Set brokers = new HashSet<>(); for (Iterator iterator = new UsableBrokerIterator( - manager.unfenced().iterator(), + manager.brokers().iterator(), id -> id % 2 == 0 ? Optional.of("rack1") : Optional.of("rack2")); iterator.hasNext(); ) { brokers.add(iterator.next()); @@ -193,10 +193,11 @@ public void testUsableBrokerIterator() { manager.touch(4, true, 100); assertEquals(98L, manager.lowestActiveOffset()); Set expected = new HashSet<>(); - expected.add(new UsableBroker(0, Optional.of("rack1"))); - expected.add(new UsableBroker(1, Optional.of("rack2"))); - expected.add(new UsableBroker(2, Optional.of("rack1"))); - expected.add(new UsableBroker(3, Optional.of("rack2"))); + expected.add(new UsableBroker(0, Optional.of("rack1"), false)); + expected.add(new UsableBroker(1, Optional.of("rack2"), false)); + expected.add(new UsableBroker(2, Optional.of("rack1"), false)); + expected.add(new UsableBroker(3, Optional.of("rack2"), false)); + expected.add(new UsableBroker(4, Optional.of("rack1"), true)); assertEquals(expected, usableBrokersToSet(manager)); manager.updateControlledShutdownOffset(2, 0); assertEquals(100L, manager.lowestActiveOffset()); @@ -204,7 +205,8 @@ public void testUsableBrokerIterator() { () -> manager.updateControlledShutdownOffset(4, 0)); manager.touch(4, false, 100); manager.updateControlledShutdownOffset(4, 0); - expected.remove(new UsableBroker(2, Optional.of("rack1"))); + expected.remove(new UsableBroker(2, Optional.of("rack1"), false)); + expected.remove(new UsableBroker(4, Optional.of("rack1"), true)); assertEquals(expected, usableBrokersToSet(manager)); } diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java index 04fc8168a952e..e5f321a47894a 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java @@ -57,7 +57,7 @@ public void testReplay() { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); ClusterControlManager clusterControl = new ClusterControlManager( new LogContext(), time, snapshotRegistry, 1000, - new SimpleReplicaPlacementPolicy(new Random())); + new StripedReplicaPlacer(new Random())); clusterControl.activate(); assertFalse(clusterControl.unfenced(0)); @@ -98,7 +98,7 @@ public void testUnregister() throws Exception { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); ClusterControlManager clusterControl = new ClusterControlManager( new LogContext(), new MockTime(0, 0, 0), snapshotRegistry, 1000, - new SimpleReplicaPlacementPolicy(new Random())); + new StripedReplicaPlacer(new Random())); clusterControl.activate(); clusterControl.replay(brokerRecord); assertEquals(new BrokerRegistration(1, 100, @@ -121,7 +121,7 @@ public void testPlaceReplicas(int numUsableBrokers) throws Exception { MockRandom random = new MockRandom(); ClusterControlManager clusterControl = new ClusterControlManager( new LogContext(), time, snapshotRegistry, 1000, - new SimpleReplicaPlacementPolicy(random)); + new StripedReplicaPlacer(random)); clusterControl.activate(); for (int i = 0; i < numUsableBrokers; i++) { RegisterBrokerRecord brokerRecord = @@ -158,7 +158,7 @@ public void testIterator() throws Exception { SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); ClusterControlManager clusterControl = new ClusterControlManager( new LogContext(), time, snapshotRegistry, 1000, - new SimpleReplicaPlacementPolicy(new Random())); + new StripedReplicaPlacer(new Random())); clusterControl.activate(); assertFalse(clusterControl.unfenced(0)); for (int i = 0; i < 3; i++) { diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java index b6eea362c9024..bc8dc13cc7420 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java @@ -164,9 +164,11 @@ public void testUnregisterBroker() throws Throwable { new CreatableTopicCollection(Collections.singleton( new CreatableTopic().setName("foo").setNumPartitions(1). setReplicationFactor((short) 1)).iterator())); - // TODO: place on a fenced broker if we have no choice assertEquals(Errors.INVALID_REPLICATION_FACTOR.code(), active.createTopics( createTopicsRequestData).get().topics().find("foo").errorCode()); + assertEquals("Unable to replicate the partition 1 time(s): All brokers " + + "are currently fenced.", active.createTopics( + createTopicsRequestData).get().topics().find("foo").errorMessage()); assertEquals(new BrokerHeartbeatReply(true, false, false, false), active.processBrokerHeartbeat(new BrokerHeartbeatRequestData(). setWantFence(false).setBrokerEpoch(0L).setBrokerId(0). diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index f7e0277eb87bf..3924ab5518d99 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -87,7 +87,7 @@ private static class ReplicationControlTestContext { final MockRandom random = new MockRandom(); final ClusterControlManager clusterControl = new ClusterControlManager( logContext, time, snapshotRegistry, 1000, - new SimpleReplicaPlacementPolicy(random)); + new StripedReplicaPlacer(random)); final ConfigurationControlManager configurationControl = new ConfigurationControlManager( new LogContext(), snapshotRegistry, Collections.emptyMap()); final ReplicationControlManager replicationControl = new ReplicationControlManager(snapshotRegistry, @@ -162,7 +162,8 @@ public void testCreateTopics() throws Exception { CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData(); expectedResponse.topics().add(new CreatableTopicResult().setName("foo"). setErrorCode(Errors.INVALID_REPLICATION_FACTOR.code()). - setErrorMessage("Unable to replicate the partition 3 times: there are only 0 usable brokers")); + setErrorMessage("Unable to replicate the partition 3 time(s): All " + + "brokers are currently fenced.")); assertEquals(expectedResponse, result.response()); registerBroker(0, ctx); @@ -180,8 +181,8 @@ public void testCreateTopics() throws Exception { setTopicId(result2.response().topics().find("foo").topicId())); assertEquals(expectedResponse2, result2.response()); ctx.replay(result2.records()); - assertEquals(new PartitionControlInfo(new int[] {2, 0, 1}, - new int[] {2, 0, 1}, null, null, 2, 0, 0), + assertEquals(new PartitionControlInfo(new int[] {1, 2, 0}, + new int[] {1, 2, 0}, null, null, 1, 0, 0), replicationControl.getPartition( ((TopicRecord) result2.records().get(0).message()).topicId(), 0)); ControllerResult result3 = @@ -195,8 +196,8 @@ public void testCreateTopics() throws Exception { ControllerTestUtils.assertBatchIteratorContains(Arrays.asList( Arrays.asList(new ApiMessageAndVersion(new PartitionRecord(). setPartitionId(0).setTopicId(fooId). - setReplicas(Arrays.asList(2, 0, 1)).setIsr(Arrays.asList(2, 0, 1)). - setRemovingReplicas(null).setAddingReplicas(null).setLeader(2). + setReplicas(Arrays.asList(1, 2, 0)).setIsr(Arrays.asList(1, 2, 0)). + setRemovingReplicas(null).setAddingReplicas(null).setLeader(1). setLeaderEpoch(0).setPartitionEpoch(0), (short) 0), new ApiMessageAndVersion(new TopicRecord(). setTopicId(fooId).setName("foo"), (short) 0))), diff --git a/metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java b/metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java new file mode 100644 index 0000000000000..f03105d99a503 --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/controller/StripedReplicaPlacerTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.errors.InvalidReplicationFactorException; +import org.apache.kafka.controller.StripedReplicaPlacer.BrokerList; +import org.apache.kafka.controller.StripedReplicaPlacer.RackList; +import org.apache.kafka.metadata.UsableBroker; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + + +@Timeout(value = 40) +public class StripedReplicaPlacerTest { + /** + * Test that the BrokerList class works as expected. + */ + @Test + public void testBrokerList() { + assertEquals(0, BrokerList.EMPTY.size()); + assertEquals(-1, BrokerList.EMPTY.next(1)); + BrokerList brokers = new BrokerList().add(0).add(1).add(2).add(3); + assertEquals(4, brokers.size()); + assertEquals(0, brokers.next(0)); + assertEquals(1, brokers.next(0)); + assertEquals(2, brokers.next(0)); + assertEquals(3, brokers.next(0)); + assertEquals(-1, brokers.next(0)); + assertEquals(-1, brokers.next(0)); + assertEquals(1, brokers.next(1)); + assertEquals(2, brokers.next(1)); + assertEquals(3, brokers.next(1)); + assertEquals(0, brokers.next(1)); + assertEquals(-1, brokers.next(1)); + } + + /** + * Test that we perform striped replica placement as expected, and don't use the + * fenced replica if we don't have to. + */ + @Test + public void testAvoidFencedReplicaIfPossibleOnSingleRack() { + MockRandom random = new MockRandom(); + RackList rackList = new RackList(random, Arrays.asList( + new UsableBroker(3, Optional.empty(), false), + new UsableBroker(1, Optional.empty(), true), + new UsableBroker(0, Optional.empty(), false), + new UsableBroker(4, Optional.empty(), false), + new UsableBroker(2, Optional.empty(), false)).iterator()); + assertEquals(5, rackList.numTotalBrokers()); + assertEquals(4, rackList.numUnfencedBrokers()); + assertEquals(Collections.singletonList(Optional.empty()), rackList.rackNames()); + assertThrows(InvalidReplicationFactorException.class, () -> rackList.place(0)); + assertThrows(InvalidReplicationFactorException.class, () -> rackList.place(-1)); + assertEquals(Arrays.asList(3, 4, 0, 2), rackList.place(4)); + assertEquals(Arrays.asList(4, 0, 2, 3), rackList.place(4)); + assertEquals(Arrays.asList(0, 2, 3, 4), rackList.place(4)); + assertEquals(Arrays.asList(2, 3, 4, 0), rackList.place(4)); + assertEquals(Arrays.asList(0, 4, 3, 2), rackList.place(4)); + } + + /** + * Test that we will place on the fenced replica if we need to. + */ + @Test + public void testPlacementOnFencedReplicaOnSingleRack() { + MockRandom random = new MockRandom(); + RackList rackList = new RackList(random, Arrays.asList( + new UsableBroker(3, Optional.empty(), false), + new UsableBroker(1, Optional.empty(), true), + new UsableBroker(2, Optional.empty(), false)).iterator()); + assertEquals(3, rackList.numTotalBrokers()); + assertEquals(2, rackList.numUnfencedBrokers()); + assertEquals(Collections.singletonList(Optional.empty()), rackList.rackNames()); + assertEquals(Arrays.asList(3, 2, 1), rackList.place(3)); + assertEquals(Arrays.asList(2, 3, 1), rackList.place(3)); + assertEquals(Arrays.asList(3, 2, 1), rackList.place(3)); + assertEquals(Arrays.asList(2, 3, 1), rackList.place(3)); + } + + @Test + public void testRackListWithMultipleRacks() { + MockRandom random = new MockRandom(); + RackList rackList = new RackList(random, Arrays.asList( + new UsableBroker(11, Optional.of("1"), false), + new UsableBroker(10, Optional.of("1"), false), + new UsableBroker(30, Optional.of("3"), false), + new UsableBroker(31, Optional.of("3"), false), + new UsableBroker(21, Optional.of("2"), false), + new UsableBroker(20, Optional.of("2"), true)).iterator()); + assertEquals(6, rackList.numTotalBrokers()); + assertEquals(5, rackList.numUnfencedBrokers()); + assertEquals(Arrays.asList(Optional.of("1"), Optional.of("2"), Optional.of("3")), rackList.rackNames()); + assertEquals(Arrays.asList(11, 21, 31, 10), rackList.place(4)); + assertEquals(Arrays.asList(21, 30, 10, 20), rackList.place(4)); + assertEquals(Arrays.asList(31, 11, 21, 30), rackList.place(4)); + } + + @Test + public void testRackListWithInvalidRacks() { + MockRandom random = new MockRandom(); + RackList rackList = new RackList(random, Arrays.asList( + new UsableBroker(11, Optional.of("1"), false), + new UsableBroker(10, Optional.of("1"), false), + new UsableBroker(30, Optional.of("3"), true), + new UsableBroker(31, Optional.of("3"), true), + new UsableBroker(20, Optional.of("2"), true), + new UsableBroker(21, Optional.of("2"), true), + new UsableBroker(41, Optional.of("4"), false), + new UsableBroker(40, Optional.of("4"), true)).iterator()); + assertEquals(8, rackList.numTotalBrokers()); + assertEquals(3, rackList.numUnfencedBrokers()); + assertEquals(Arrays.asList(Optional.of("1"), + Optional.of("2"), + Optional.of("3"), + Optional.of("4")), rackList.rackNames()); + assertEquals(Arrays.asList(41, 11, 21, 30), rackList.place(4)); + assertEquals(Arrays.asList(10, 20, 31, 41), rackList.place(4)); + assertEquals(Arrays.asList(41, 21, 30, 11), rackList.place(4)); + } + + @Test + public void testAllBrokersFenced() { + MockRandom random = new MockRandom(); + StripedReplicaPlacer placer = new StripedReplicaPlacer(random); + assertEquals("All brokers are currently fenced.", + assertThrows(InvalidReplicationFactorException.class, + () -> placer.place(0, 1, (short) 1, Arrays.asList( + new UsableBroker(11, Optional.of("1"), true), + new UsableBroker(10, Optional.of("1"), true)).iterator())).getMessage()); + } + + @Test + public void testNotEnoughBrokers() { + MockRandom random = new MockRandom(); + StripedReplicaPlacer placer = new StripedReplicaPlacer(random); + assertEquals("The target replication factor of 3 cannot be reached because only " + + "2 broker(s) are registered.", + assertThrows(InvalidReplicationFactorException.class, + () -> placer.place(0, 1, (short) 3, Arrays.asList( + new UsableBroker(11, Optional.of("1"), false), + new UsableBroker(10, Optional.of("1"), false)).iterator())).getMessage()); + } + + @Test + public void testSuccessfulPlacement() { + MockRandom random = new MockRandom(); + StripedReplicaPlacer placer = new StripedReplicaPlacer(random); + assertEquals(Arrays.asList(Arrays.asList(2, 3, 0), + Arrays.asList(3, 0, 1), + Arrays.asList(0, 1, 2), + Arrays.asList(1, 2, 3), + Arrays.asList(1, 0, 2)), + placer.place(0, 5, (short) 3, Arrays.asList( + new UsableBroker(0, Optional.empty(), false), + new UsableBroker(3, Optional.empty(), false), + new UsableBroker(2, Optional.empty(), false), + new UsableBroker(1, Optional.empty(), false)).iterator())); + } +} diff --git a/metadata/src/test/java/org/apache/kafka/metadata/OptionalStringComparatorTest.java b/metadata/src/test/java/org/apache/kafka/metadata/OptionalStringComparatorTest.java new file mode 100644 index 0000000000000..68f25444aecae --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/metadata/OptionalStringComparatorTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.Optional; + +import static org.apache.kafka.metadata.OptionalStringComparator.INSTANCE; +import static org.junit.jupiter.api.Assertions.assertEquals; + + +@Timeout(value = 40) +public class OptionalStringComparatorTest { + @Test + public void testComparisons() { + assertEquals(0, INSTANCE.compare(Optional.of("foo"), Optional.of("foo"))); + assertEquals(-1, INSTANCE.compare(Optional.of("a"), Optional.of("b"))); + assertEquals(1, INSTANCE.compare(Optional.of("b"), Optional.of("a"))); + assertEquals(-1, INSTANCE.compare(Optional.empty(), Optional.of("a"))); + assertEquals(1, INSTANCE.compare(Optional.of("a"), Optional.empty())); + assertEquals(0, INSTANCE.compare(Optional.empty(), Optional.empty())); + } +} diff --git a/tests/kafkatest/tests/core/round_trip_fault_test.py b/tests/kafkatest/tests/core/round_trip_fault_test.py index d8873b2d84235..b9085cb8b5b71 100644 --- a/tests/kafkatest/tests/core/round_trip_fault_test.py +++ b/tests/kafkatest/tests/core/round_trip_fault_test.py @@ -75,11 +75,13 @@ def remote_quorum_nodes(self): return [] @cluster(num_nodes=9) + @matrix(metadata_quorum=quorum.all_non_upgrade) def test_round_trip_workload(self, metadata_quorum=quorum.zk): workload1 = self.trogdor.create_task("workload1", self.round_trip_spec) workload1.wait_for_done(timeout_sec=600) @cluster(num_nodes=9) + @matrix(metadata_quorum=quorum.all_non_upgrade) def test_round_trip_workload_with_broker_partition(self, metadata_quorum=quorum.zk): workload1 = self.trogdor.create_task("workload1", self.round_trip_spec) time.sleep(2) @@ -93,6 +95,7 @@ def test_round_trip_workload_with_broker_partition(self, metadata_quorum=quorum. partition1.wait_for_done() @cluster(num_nodes=9) + @matrix(metadata_quorum=quorum.all_non_upgrade) def test_produce_consume_with_broker_pause(self, metadata_quorum=quorum.zk): workload1 = self.trogdor.create_task("workload1", self.round_trip_spec) time.sleep(2) @@ -105,6 +108,7 @@ def test_produce_consume_with_broker_pause(self, metadata_quorum=quorum.zk): self.kafka.stop_node(self.kafka.nodes[0], False) @cluster(num_nodes=9) + @matrix(metadata_quorum=quorum.all_non_upgrade) def test_produce_consume_with_client_partition(self, metadata_quorum=quorum.zk): workload1 = self.trogdor.create_task("workload1", self.round_trip_spec) time.sleep(2) @@ -117,6 +121,7 @@ def test_produce_consume_with_client_partition(self, metadata_quorum=quorum.zk): stop1.wait_for_done() @cluster(num_nodes=9) + @matrix(metadata_quorum=quorum.all_non_upgrade) def test_produce_consume_with_latency(self, metadata_quorum=quorum.zk): workload1 = self.trogdor.create_task("workload1", self.round_trip_spec) time.sleep(2)