Skip to content

Commit

Permalink
Add a new endpoint to support increasing topic's replication factor. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
kun du authored May 21, 2019
1 parent 932805f commit d4638c5
Show file tree
Hide file tree
Showing 10 changed files with 378 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.linkedin.kafka.cruisecontrol.servlet.response.TrainResult;
import com.linkedin.kafka.cruisecontrol.servlet.response.stats.BrokerStats;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -61,11 +62,15 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.offlineReplicasForPartition;
import static com.linkedin.kafka.cruisecontrol.servlet.response.CruiseControlState.SubState.*;


Expand Down Expand Up @@ -923,6 +928,141 @@ public StopProposalResult stopProposalExecution(StopProposalParameters parameter
return new StopProposalResult(_config);
}

/**
* Update the replication factor of Kafka topics.
* If partition's current replication factor is less than target replication factor, add new replicas to the partition
* in a rack-aware, round-robin way.
* There are two scenarios that rack awareness property is not guaranteed.
* <ul>
* <li> If metadata does not have rack information about brokers, then it is only guaranteed that new replicas are
* added to brokers, which currently do not host any replicas of partition.</li>
* <li> If replication factor to set for the topic is larger than number of racks in the cluster and
* skipTopicRackAwarenessCheck is set to true, then rack awareness property is ignored.</li>
* </ul>
* If partition's current replication factor is larger than target replication factor, remove one or more follower replicas
* from the partition.
*
* @param cluster The metadata of the cluster.
* @param replicationFactor The replication factor to set for the topics.
* @param topics The topics to apply the change.
* @param skipTopicRackAwarenessCheck Whether ignore rack awareness property if number of rack in cluster is less
* than target replication factor.
* @return Execution proposals to increase replication factor of topics.
*/
private Set<ExecutionProposal> maybeUpdateTopicReplicationFactor(Cluster cluster,
short replicationFactor,
Set<String> topics,
boolean skipTopicRackAwarenessCheck) {
Map<String, List<Integer>> brokersByRack = new HashMap<>();
Map<Integer, String> rackByBroker = new HashMap<>();
for (Node node : cluster.nodes()) {
// If the rack is not specified, we use the broker id info as rack info.
String rack = node.rack() == null || node.rack().isEmpty() ? String.valueOf(node.id()) : node.rack();
brokersByRack.putIfAbsent(rack, new ArrayList<>());
brokersByRack.get(rack).add(node.id());
rackByBroker.put(node.id(), rack);
}

if (replicationFactor > brokersByRack.size()) {
if (skipTopicRackAwarenessCheck) {
LOG.info(String.format("Target replication factor for topics %s is larger than number of racks in cluster, rack-awareness "
+ "property may not be guaranteed.", topics));
} else {
throw new RuntimeException(String.format("Unable to change replication factor of topics %s to %d since there are only %d "
+ "racks in the cluster, to skip the rack-awareness check, set "
+ "skipTopicRackAwarenessCheck to true in the request.",
topics, replicationFactor, brokersByRack.size()));
}
}

Set<ExecutionProposal> proposals = new HashSet<>();
for (String topic : topics) {
List<String> racks = new ArrayList<>(brokersByRack.keySet());
int[] cursors = new int[racks.size()];
int rackCursor = 0;
for (PartitionInfo partitionInfo : cluster.partitionsForTopic(topic)) {
if (partitionInfo.replicas().length == replicationFactor) {
continue;
}

Set<Integer> offlineReplicas = offlineReplicasForPartition(partitionInfo, cluster.nodes());
if (!offlineReplicas.isEmpty()) {
throw new RuntimeException(String.format("Topic partition %s-%d has offline replicas on brokers %s, unable to update "
+ "its replication factor.", partitionInfo.topic(), partitionInfo.partition(), offlineReplicas));
}
List<Integer> currentAssignedReplica = new ArrayList<>(partitionInfo.replicas().length);
List<Integer> newAssignedReplica = new ArrayList<>();
if (partitionInfo.replicas().length < replicationFactor) {
Set<String> currentOccupiedRack = new HashSet<>();
// Make sure the current replicas are in new replica list.
for (Node node : partitionInfo.replicas()) {
currentAssignedReplica.add(node.id());
newAssignedReplica.add(node.id());
currentOccupiedRack.add(rackByBroker.get(node.id()));
}
// Add new replica to partition in rack-aware(if possible), round-robin way.
while (newAssignedReplica.size() < replicationFactor) {
if (!currentOccupiedRack.contains(racks.get(rackCursor)) || currentOccupiedRack.size() == racks.size()) {
String rack = racks.get(rackCursor);
int cursor = cursors[rackCursor];
newAssignedReplica.add(brokersByRack.get(rack).get(cursor));
currentOccupiedRack.add(rack);
cursors[rackCursor] = (cursor + 1) % brokersByRack.get(rack).size();
}
rackCursor = (rackCursor + 1) % racks.size();
}
// TODO: get the partition size and populate into execution proposal, check https://github.com/linkedin/cruise-control/issues/722
proposals.add(new ExecutionProposal(new TopicPartition(topic, partitionInfo.partition()),
0,
partitionInfo.leader().id(),
currentAssignedReplica,
newAssignedReplica));
} else {
// Make sure the leader replica is in new replica list.
newAssignedReplica.add(partitionInfo.leader().id());
for (Node node : partitionInfo.replicas()) {
currentAssignedReplica.add(node.id());
if (newAssignedReplica.size() < replicationFactor && node.id() != newAssignedReplica.get(0)) {
newAssignedReplica.add(node.id());
}
}
// TODO: get the partition size and populate into execution proposal, check https://github.com/linkedin/cruise-control/issues/722
proposals.add(new ExecutionProposal(new TopicPartition(topic, partitionInfo.partition()),
0,
partitionInfo.leader().id(),
currentAssignedReplica,
newAssignedReplica));
}
}
}
return proposals;
}

/**
* Update configuration of topic pattern. Currently only support change replication factor of topics from the given pattern.
*
* @param topic The name pattern of topics to apply the change.
* @param replicationFactor The replication factor to set for the topics.
* @param skipTopicRackAwarenessCheck Whether skip ignore rack awareness property if number of rack in cluster is less
* than target replication factor.
* @param uuid UUID of the execution.
*
* @return Topics to apply the change.
*/
public Set<String> updateTopicConfiguration(Pattern topic,
short replicationFactor,
boolean skipTopicRackAwarenessCheck,
String uuid) {
Cluster cluster = _loadMonitor.kafkaCluster();
Set<String> topics = cluster.topics().stream().filter(t -> topic.matcher(t).matches()).collect(Collectors.toSet());
Set<ExecutionProposal> proposals = maybeUpdateTopicReplicationFactor(cluster,
replicationFactor,
topics,
skipTopicRackAwarenessCheck);
_executor.executeProposals(proposals, null, null, _loadMonitor, null, null, null, uuid);
return topics;
}

/**
* Get the state with selected substates for Kafka Cruise Control.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@
import com.linkedin.kafka.cruisecontrol.servlet.response.CruiseControlState;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.stream.Collectors;
import kafka.utils.ZkUtils;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
Expand Down Expand Up @@ -132,4 +136,18 @@ public static boolean isPartitionUnderReplicated(Cluster cluster, TopicPartition
PartitionInfo partitionInfo = cluster.partition(tp);
return partitionInfo.inSyncReplicas().length != partitionInfo.replicas().length;
}

/**
* Get the offline replicas for the partition.
* @param partitionInfo The partition information of topic partition to check.
* @param aliveNodes The alive nodes of the cluster.
* @return Id of brokers which host offline replica of the partition.
*/
public static Set<Integer> offlineReplicasForPartition(PartitionInfo partitionInfo, List<Node> aliveNodes) {
return Arrays.stream(partitionInfo.replicas())
.filter(node -> !aliveNodes.contains(node))
.mapToInt(Node::id)
.boxed()
.collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RemoveBrokerParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.TopicConfigurationParameters;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.regex.Pattern;


/**
Expand All @@ -48,6 +50,7 @@
* boolean, Integer, Integer, boolean, java.util.regex.Pattern,
* com.linkedin.kafka.cruisecontrol.executor.strategy.ReplicaMovementStrategy, String, boolean, boolean, boolean,
* boolean, Set)}</li>
* <li>{@link KafkaCruiseControl#updateTopicConfiguration(Pattern, short, boolean, String)}</li>
* </ul>
*
* The other operations are non-blocking by default.
Expand Down Expand Up @@ -158,6 +161,16 @@ public OperationFuture demoteBrokers(String uuid, DemoteBrokerParameters paramet
return future;
}

/**
* @see {@link KafkaCruiseControl#updateTopicConfiguration(Pattern, short, boolean, String)}
*/
public OperationFuture updateTopicConfiguration(TopicConfigurationParameters parameters, String uuid) {
OperationFuture future = new OperationFuture("UpdateTopicConfiguration");
pending(future.operationProgress());
_sessionExecutor.submit(new UpdateTopicConfigurationRunnable(this, future, uuid, parameters, _config));
return future;
}

private void pending(OperationProgress progress) {
progress.addStep(new Pending());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2019 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
*/

package com.linkedin.kafka.cruisecontrol.async;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.TopicConfigurationParameters;
import com.linkedin.kafka.cruisecontrol.servlet.response.TopicConfigurationResult;
import java.util.regex.Pattern;

/**
* The async runnable for {@link KafkaCruiseControl#updateTopicConfiguration(Pattern, short, boolean, String)}
*/
public class UpdateTopicConfigurationRunnable extends OperationRunnable {
private final Pattern _topic;
private final short _replicationFactor;
private final boolean _skipRackAwarenessCheck;
private final String _uuid;
private final KafkaCruiseControlConfig _config;

UpdateTopicConfigurationRunnable(KafkaCruiseControl kafkaCruiseControl,
OperationFuture future,
String uuid,
TopicConfigurationParameters parameters,
KafkaCruiseControlConfig config) {
super(kafkaCruiseControl, future);
_uuid = uuid;
_topic = parameters.topic();
_replicationFactor = parameters.replicationFactor();
_skipRackAwarenessCheck = parameters.skipRackAwarenessCheck();
_config = config;
}

@Override
protected TopicConfigurationResult getResult() {
return new TopicConfigurationResult(_kafkaCruiseControl.updateTopicConfiguration(_topic,
_replicationFactor,
_skipRackAwarenessCheck,
_uuid),
_replicationFactor,
_config);
}
}
Loading

0 comments on commit d4638c5

Please sign in to comment.