Skip to content

Commit

Permalink
MINOR: improve KRaft replica placement
Browse files Browse the repository at this point in the history
Implement the existing Kafka replica placement algorithm for KRaft.
This also means implementing rack awareness.  Previously, we just chose
replicas randomly in a non-rack-aware fashion.  Also, allow replicas to
be placed on fenced brokers if there are no other choices.  This was
specified in KIP-631 but previously not implemented.
  • Loading branch information
cmccabe committed May 12, 2021
1 parent 13ffebe commit aebd84f
Show file tree
Hide file tree
Showing 17 changed files with 740 additions and 124 deletions.
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.metadata" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.test" />
</subpackage>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -272,6 +273,11 @@ BrokerHeartbeatStateList unfenced() {
return unfenced;
}

// VisibleForTesting
Collection<BrokerHeartbeatState> brokers() {
return brokers.values();
}

/**
* Mark a broker as fenced.
*
Expand Down Expand Up @@ -439,7 +445,7 @@ List<Integer> findStaleBrokers() {
* @param numPartitions The number of partitions to place.
* @param numReplicas The number of replicas for each partition.
* @param idToRack A function mapping broker id to broker rack.
* @param policy The replica placement policy to use.
* @param placer The replica placer to use.
*
* @return A list of replica lists.
*
Expand All @@ -449,12 +455,10 @@ List<List<Integer>> placeReplicas(int startPartition,
int numPartitions,
short numReplicas,
Function<Integer, Optional<String>> idToRack,
ReplicaPlacementPolicy policy) {
// TODO: support using fenced brokers here if necessary to get to the desired
// number of replicas. We probably need to add a fenced boolean in UsableBroker.
ReplicaPlacer placer) {
Iterator<UsableBroker> iterator = new UsableBrokerIterator(
unfenced.iterator(), idToRack);
return policy.createPlacement(startPartition, numPartitions, numReplicas, iterator);
brokers.values().iterator(), idToRack);
return placer.place(startPartition, numPartitions, numReplicas, iterator);
}

static class UsableBrokerIterator implements Iterator<UsableBroker> {
Expand Down Expand Up @@ -482,7 +486,7 @@ public boolean hasNext() {
result = iterator.next();
} while (result.shuttingDown());
Optional<String> rack = idToRack.apply(result.id());
next = new UsableBroker(result.id(), rack);
next = new UsableBroker(result.id(), rack, result.fenced());
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ boolean check() {
private final long sessionTimeoutNs;

/**
* The replica placement policy to use.
* The replica placer to use.
*/
private final ReplicaPlacementPolicy placementPolicy;
private final ReplicaPlacer replicaPlacer;

/**
* Maps broker IDs to broker registrations.
Expand All @@ -127,12 +127,12 @@ boolean check() {
Time time,
SnapshotRegistry snapshotRegistry,
long sessionTimeoutNs,
ReplicaPlacementPolicy placementPolicy) {
ReplicaPlacer replicaPlacer) {
this.logContext = logContext;
this.log = logContext.logger(ClusterControlManager.class);
this.time = time;
this.sessionTimeoutNs = sessionTimeoutNs;
this.placementPolicy = placementPolicy;
this.replicaPlacer = replicaPlacer;
this.brokerRegistrations = new TimelineHashMap<>(snapshotRegistry, 0);
this.heartbeatManager = null;
this.readyBrokersFuture = Optional.empty();
Expand Down Expand Up @@ -317,7 +317,7 @@ public List<List<Integer>> placeReplicas(int startPartition,
throw new RuntimeException("ClusterControlManager is not active.");
}
return heartbeatManager.placeReplicas(startPartition, numPartitions, numReplicas,
id -> brokerRegistrations.get(id).rack(), placementPolicy);
id -> brokerRegistrations.get(id).rack(), replicaPlacer);
}

public boolean unfenced(int brokerId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@ static public class Builder {
private Map<String, VersionRange> supportedFeatures = Collections.emptyMap();
private short defaultReplicationFactor = 3;
private int defaultNumPartitions = 1;
private ReplicaPlacementPolicy replicaPlacementPolicy =
new SimpleReplicaPlacementPolicy(new Random());
private ReplicaPlacer replicaPlacer = new StripedReplicaPlacer(new Random());
private Function<Long, SnapshotWriter> snapshotWriterBuilder;
private SnapshotReader snapshotReader;
private long sessionTimeoutNs = NANOSECONDS.convert(18, TimeUnit.SECONDS);
Expand Down Expand Up @@ -173,8 +172,8 @@ public Builder setDefaultNumPartitions(int defaultNumPartitions) {
return this;
}

public Builder setReplicaPlacementPolicy(ReplicaPlacementPolicy replicaPlacementPolicy) {
this.replicaPlacementPolicy = replicaPlacementPolicy;
public Builder setReplicaPlacer(ReplicaPlacer replicaPlacer) {
this.replicaPlacer = replicaPlacer;
return this;
}

Expand Down Expand Up @@ -224,7 +223,7 @@ public QuorumController build() throws Exception {
queue = new KafkaEventQueue(time, logContext, threadNamePrefix);
return new QuorumController(logContext, nodeId, queue, time, configDefs,
logManager, supportedFeatures, defaultReplicationFactor,
defaultNumPartitions, replicaPlacementPolicy, snapshotWriterBuilder,
defaultNumPartitions, replicaPlacer, snapshotWriterBuilder,
snapshotReader, sessionTimeoutNs, controllerMetrics);
} catch (Exception e) {
Utils.closeQuietly(queue, "event queue");
Expand Down Expand Up @@ -906,7 +905,7 @@ private QuorumController(LogContext logContext,
Map<String, VersionRange> supportedFeatures,
short defaultReplicationFactor,
int defaultNumPartitions,
ReplicaPlacementPolicy replicaPlacementPolicy,
ReplicaPlacer replicaPlacer,
Function<Long, SnapshotWriter> snapshotWriterBuilder,
SnapshotReader snapshotReader,
long sessionTimeoutNs,
Expand All @@ -923,7 +922,7 @@ private QuorumController(LogContext logContext,
snapshotRegistry, configDefs);
this.clientQuotaControlManager = new ClientQuotaControlManager(snapshotRegistry);
this.clusterControl = new ClusterControlManager(logContext, time,
snapshotRegistry, sessionTimeoutNs, replicaPlacementPolicy);
snapshotRegistry, sessionTimeoutNs, replicaPlacer);
this.featureControl = new FeatureControlManager(supportedFeatures, snapshotRegistry);
this.snapshotGeneratorManager = new SnapshotGeneratorManager(snapshotWriterBuilder);
this.replicationControl = new ReplicationControlManager(snapshotRegistry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* The interface which a Kafka replica placement policy must implement.
*/
@InterfaceStability.Unstable
interface ReplicaPlacementPolicy {
interface ReplicaPlacer {
/**
* Create a new replica placement.
*
Expand All @@ -42,9 +42,9 @@ interface ReplicaPlacementPolicy {
*
* @throws InvalidReplicationFactorException If too many replicas were requested.
*/
List<List<Integer>> createPlacement(int startPartition,
int numPartitions,
short numReplicas,
Iterator<UsableBroker> iterator)
List<List<Integer>> place(int startPartition,
int numPartitions,
short numReplicas,
Iterator<UsableBroker> iterator)
throws InvalidReplicationFactorException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ private ApiError createTopic(CreatableTopic topic,
} catch (InvalidReplicationFactorException e) {
return new ApiError(Errors.INVALID_REPLICATION_FACTOR,
"Unable to replicate the partition " + replicationFactor +
" times: " + e.getMessage());
" time(s): " + e.getMessage());
}
}
Uuid topicId = Uuid.randomUuid();
Expand Down

This file was deleted.

Loading

0 comments on commit aebd84f

Please sign in to comment.