Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new endpoint to support dynamically increasing topic's replication factor. #710

Merged
merged 6 commits into from
May 21, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.linkedin.kafka.cruisecontrol.servlet.response.TrainResult;
import com.linkedin.kafka.cruisecontrol.servlet.response.stats.BrokerStats;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -61,6 +62,9 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
Expand Down Expand Up @@ -923,6 +927,142 @@ public StopProposalResult stopProposalExecution(StopProposalParameters parameter
return new StopProposalResult(_config);
}

private void ensureNoOfflineReplicaForPartition(PartitionInfo partitionInfo, List<Node> aliveNodes) {
kidkun marked this conversation as resolved.
Show resolved Hide resolved
for (Node node: partitionInfo.replicas()) {
if (!aliveNodes.contains(node)) {
throw new RuntimeException(String.format("Topic partition %s-%d have offline replica on broker %d , unable to update "
+ "its replication factor.", partitionInfo.topic(), partitionInfo.partition(), node.id()));
kidkun marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

/**
* Update the replication factor of Kafka topics.
* If partition's current replication factor is less than target replication factor, add new replicas to the partition
* in a rack-aware, round-robin way.
* There are two scenarios that rack awareness property is not guaranteed.
* <ul>
* <li> If metadata does not have rack information about brokers, then it is only guaranteed that new replicas are
* added to brokers which currently do not host any replicas of partition.</li>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: missing comma before which.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is definitely not critical -- just fyi, but feel free to skip this: https://www.grammarly.com/blog/comma-before-which/

* <li> If replication factor to set for the topic is larger than number of racks in the cluster and
* skipTopicRackAwarenessCheck is set to true, then rack awareness property is ignored.</li>
* </ul>
* If partition's current replication factor is larger than target replication factor, remove one or more follower replicas
* from the partition.
*
* @param cluster The metadata of the cluster.
* @param replicationFactor The replication factor to set for the topics.
* @param topics The topics to apply the change.
* @param skipTopicRackAwarenessCheck Whether skip ignore rack awareness property if number of rack in clsuter is less
kidkun marked this conversation as resolved.
Show resolved Hide resolved
* than target replication factor.
* @return Execution proposals to increase replication factor of topics.
*/
private List<ExecutionProposal> maybeUpdateTopicReplicationFactor(Cluster cluster,
int replicationFactor,
Set<String> topics,
boolean skipTopicRackAwarenessCheck) {
Map<String, List<Integer>> brokersByRack = new HashMap<>();
Map<Integer, String> rackByBroker = new HashMap<>();
for (Node node : cluster.nodes()) {
// If the rack is not specified, we use the broker id info as rack info.
String rack = node.rack() == null || node.rack().isEmpty() ? String.valueOf(node.id()) : node.rack();
brokersByRack.putIfAbsent(rack, new ArrayList<>());
brokersByRack.get(rack).add(node.id());
rackByBroker.put(node.id(), rack);
}

if (replicationFactor > brokersByRack.size()) {
if (skipTopicRackAwarenessCheck) {
LOG.info(String.format("Target replication factor for topics %s is larger than number of racks in cluster, new replica"
kidkun marked this conversation as resolved.
Show resolved Hide resolved
+ " maybe added in none rack-aware way.", topics));
} else {
throw new RuntimeException(String.format("Unable to increase topics %s replica factor to %d since there are only %d " +
kidkun marked this conversation as resolved.
Show resolved Hide resolved
"racks in the cluster.", topics, replicationFactor, brokersByRack.size()));
}
}

List<ExecutionProposal> proposals = new ArrayList<>();
kidkun marked this conversation as resolved.
Show resolved Hide resolved
for (String topic : topics) {
List<String> racks = new ArrayList<>(brokersByRack.keySet());
int[] cursors = new int[racks.size()];
int rackCursor = 0;
for (PartitionInfo partitionInfo : cluster.partitionsForTopic(topic)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: For ease of readability and avoid repetition (in if and else parts), is it possible to move this logic to separate smaller functions?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel it is hard to extract the common logic since the only common logic is to check no offline replica and initialize local variable.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to move the relevant content of the loop to a smaller function -- e.g. something like (potentially with more/less parameters):

private Set<ExecutionProposal> updateReplicationFactorFor(PartitionInfo partitionInfo, short replicationFactor) {...}

This helps parsing the logic easier and makes it more modular.

if (partitionInfo.replicas().length < replicationFactor) {
kidkun marked this conversation as resolved.
Show resolved Hide resolved
ensureNoOfflineReplicaForPartition(partitionInfo, cluster.nodes());
List<Integer> currentAssignedReplica = new ArrayList<>(partitionInfo.replicas().length);
List<Integer> newAssignedReplica = new ArrayList<>();
Set<String> currentOccupiedRack = new HashSet<>();
// Make sure the current replicas are in new replica list.
for (Node node : partitionInfo.replicas()) {
currentAssignedReplica.add(node.id());
newAssignedReplica.add(node.id());
currentOccupiedRack.add(rackByBroker.get(node.id()));
}
// Add new replica to partition in rack-aware(if possible), round-robin way.
while (newAssignedReplica.size() < replicationFactor) {
if (!currentOccupiedRack.contains(racks.get(rackCursor)) || currentOccupiedRack.size() == racks.size()) {
String rack = racks.get(rackCursor);
int cursor = cursors[rackCursor];
newAssignedReplica.add(brokersByRack.get(rack).get(cursor));
currentOccupiedRack.add(rack);
cursors[rackCursor] = (cursor + 1) % brokersByRack.get(rack).size();
}
rackCursor = (rackCursor + 1) % racks.size();
}
proposals.add(new ExecutionProposal(new TopicPartition(topic, partitionInfo.partition()),
0,
partitionInfo.leader().id(),
currentAssignedReplica,
newAssignedReplica));
} else if (partitionInfo.replicas().length > replicationFactor) {
ensureNoOfflineReplicaForPartition(partitionInfo, cluster.nodes());
List<Integer> currentAssignedReplica = new ArrayList<>(partitionInfo.replicas().length);
List<Integer> newAssignedReplica = new ArrayList<>();
// Make sure the leader replica is in new replica list.
newAssignedReplica.add(partitionInfo.leader().id());
for (Node node : partitionInfo.replicas()) {
currentAssignedReplica.add(node.id());
if (newAssignedReplica.size() < replicationFactor && node.id() != newAssignedReplica.get(0)) {
newAssignedReplica.add(node.id());
}
}
proposals.add(new ExecutionProposal(new TopicPartition(topic, partitionInfo.partition()),
0,
kidkun marked this conversation as resolved.
Show resolved Hide resolved
partitionInfo.leader().id(),
currentAssignedReplica,
newAssignedReplica));
}
}
}
return proposals;
}

/**
* Update topic's configuration. Currently only support change topic's replication factor.
kidkun marked this conversation as resolved.
Show resolved Hide resolved
*
* @param topic The name pattern of topics to apply the change.
* @param replicationFactor The replication factor to set for the topics.
* @param skipTopicRackAwarenessCheck Whether skip ignore rack awareness property if number of rack in cluster is less
kidkun marked this conversation as resolved.
Show resolved Hide resolved
* than target replication factor.
* @param uuid UUID of the execution.
*
* @return Topics to apply the change.
*/
public Set<String> updateTopicConfiguration(Pattern topic,
int replicationFactor,
boolean skipTopicRackAwarenessCheck,
String uuid
) {
kidkun marked this conversation as resolved.
Show resolved Hide resolved
Cluster cluster = _loadMonitor.kafkaCluster();
Set<String> topics = cluster.topics().stream().filter(t -> topic.matcher(t).matches()).collect(Collectors.toSet());
List<ExecutionProposal> proposals = maybeUpdateTopicReplicationFactor(cluster,
replicationFactor,
topics,
skipTopicRackAwarenessCheck);
_executor.executeProposals(proposals, null, null, _loadMonitor, null, null, null, uuid);
return topics;
}

/**
* Get the state with selected substates for Kafka Cruise Control.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.RemoveBrokerParameters;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.UpdateTopicConfigurationParameters;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.regex.Pattern;


/**
Expand All @@ -48,6 +50,7 @@
* boolean, Integer, Integer, boolean, java.util.regex.Pattern,
* com.linkedin.kafka.cruisecontrol.executor.strategy.ReplicaMovementStrategy, String, boolean, boolean, boolean,
* boolean, Set)}</li>
* <li>{@link KafkaCruiseControl#updateTopicConfiguration(Pattern, int, boolean, String)}</li>
* </ul>
*
* The other operations are non-blocking by default.
Expand Down Expand Up @@ -158,6 +161,16 @@ public OperationFuture demoteBrokers(String uuid, DemoteBrokerParameters paramet
return future;
}

/**
* @see {@link KafkaCruiseControl#updateTopicConfiguration(Pattern, int, boolean, String)}
*/
public OperationFuture updateTopicConfiguration(UpdateTopicConfigurationParameters parameters, String uuid) {
OperationFuture future = new OperationFuture("UpdateTopicConfiguration");
pending(future.operationProgress());
_sessionExecutor.submit(new UpdateTopicConfigurationRunnable(this, future, uuid, parameters, _config));
return future;
}

private void pending(OperationProgress progress) {
progress.addStep(new Pending());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2019 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
*/

package com.linkedin.kafka.cruisecontrol.async;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.servlet.parameters.UpdateTopicConfigurationParameters;
import com.linkedin.kafka.cruisecontrol.servlet.response.UpdateTopicConfigurationResult;
import java.util.regex.Pattern;

/**
* The async runnable for {@link KafkaCruiseControl#updateTopicConfiguration(Pattern, int, boolean, String)}
*/
public class UpdateTopicConfigurationRunnable extends OperationRunnable {
private final Pattern _topic;
private final int _replicationFactor;
kidkun marked this conversation as resolved.
Show resolved Hide resolved
private final boolean _skipRackAwarenessCheck;
private final String _uuid;
private final KafkaCruiseControlConfig _config;

UpdateTopicConfigurationRunnable(KafkaCruiseControl kafkaCruiseControl,
OperationFuture future,
String uuid,
UpdateTopicConfigurationParameters parameters,
KafkaCruiseControlConfig config) {
super(kafkaCruiseControl, future);
_uuid = uuid;
_topic = parameters.topic();
_replicationFactor = parameters.replicationFactor();
_skipRackAwarenessCheck = parameters.skipRackAwarenessCheck();
_config = config;
}

@Override
protected UpdateTopicConfigurationResult getResult() {
return new UpdateTopicConfigurationResult(_kafkaCruiseControl.updateTopicConfiguration(_topic,
_replicationFactor,
_skipRackAwarenessCheck,
_uuid),
_replicationFactor,
_config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import kafka.admin.AdminUtils;
import kafka.admin.BrokerMetadata;
import kafka.admin.RackAwareMode;
import kafka.common.TopicAndPartition;
import kafka.log.LogConfig;
import kafka.utils.ZkUtils;
import org.apache.kafka.clients.consumer.Consumer;
Expand All @@ -50,15 +48,9 @@
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.JavaConversions;
import scala.collection.JavaConverters;
import scala.collection.Seq;

import static com.linkedin.kafka.cruisecontrol.monitor.MonitorUtils.ensureNoPartitionUnderPartitionReassignment;
import static com.linkedin.kafka.cruisecontrol.monitor.MonitorUtils.ensureTopicNotUnderPartitionReassignment;
import static org.apache.kafka.common.requests.MetadataResponse.TopicMetadata;
import static org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata;

/**
* The sample store that implements the {@link SampleStore}. It stores the partition metric samples and broker metric
Expand Down Expand Up @@ -235,96 +227,11 @@ private void ensureTopicsCreated(Map<String, ?> config) {
replicationFactor, _partitionSampleStoreTopicPartitionCount);
ensureTopicCreated(zkUtils, topics.keySet(), _brokerMetricSampleStoreTopic, brokerSampleRetentionMs,
replicationFactor, _brokerSampleStoreTopicPartitionCount);
maybeIncreaseTopicReplicationFactor(zkUtils, replicationFactor,
new HashSet<>(Arrays.asList(_partitionMetricSampleStoreTopic, _brokerMetricSampleStoreTopic)));
} finally {
KafkaCruiseControlUtils.closeZkUtilsWithTimeout(zkUtils, ZK_UTILS_CLOSE_TIMEOUT_MS);
}
}

/**
* Increase the replication factor of Kafka topics through adding new replicas in a rack-aware, round-robin way.
* There are two scenarios that rack awareness property is not guaranteed.
* <ul>
* <li> If Zookeeper does not have rack information about brokers, then it is only guaranteed that new replicas are
* added to brokers which do not currently host the partition.</li>
* <li> If replication factor to set for the topic is larger than number of racks in the cluster, then rack awareness
* property is ignored.</li>
* </ul>
*
* @param zkUtils ZkUtils class to use to increase replication factor.
* @param replicationFactor The replication factor to set for the topic.
* @param topics The topics to check.
*/
private void maybeIncreaseTopicReplicationFactor(ZkUtils zkUtils,
int replicationFactor,
Set<String> topics) {
if (!ensureNoPartitionUnderPartitionReassignment(zkUtils)) {
LOG.warn("There are ongoing partition reassignments, skip checking replication factor of topics {}.", topics);
return;
}
Map<String, List<Integer>> brokersByRack = new HashMap<>();
Map<Integer, String> rackByBroker = new HashMap<>();
for (BrokerMetadata bm :
JavaConversions.seqAsJavaList(AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Enforced$.MODULE$, Option.empty()))) {
// If the rack is not specified, we use the broker id info as rack info.
String rack = bm.rack().isEmpty() ? String.valueOf(bm.id()) : bm.rack().get();
brokersByRack.putIfAbsent(rack, new ArrayList<>());
brokersByRack.get(rack).add(bm.id());
rackByBroker.put(bm.id(), rack);
}

if (replicationFactor > brokersByRack.size()) {
if (_skipSampleStoreTopicRackAwarenessCheck) {
LOG.warn("Target replication factor for topics " + topics + " is larger than number of racks in cluster, new replica maybe"
+ " added in none rack-aware way.");
} else {
throw new RuntimeException("Unable to increase topics " + topics + " replica factor to " + replicationFactor
+ " since there are only " + brokersByRack.size() + " racks in the cluster.");
}
}

scala.collection.mutable.Map<TopicAndPartition, Seq<Object>> newReplicaAssignment = new scala.collection.mutable.HashMap<>();
scala.collection.Set<TopicMetadata> topicMetadatas = AdminUtils.fetchTopicMetadataFromZk(JavaConversions.asScalaSet(topics), zkUtils,
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT));
for (scala.collection.Iterator<TopicMetadata> iter = topicMetadatas.iterator(); iter.hasNext();) {
TopicMetadata topicMetadata = iter.next();
String topic = topicMetadata.topic();
List<String> racks = new ArrayList<>(brokersByRack.keySet());
int[] cursors = new int[racks.size()];
int rackCursor = 0;
for (PartitionMetadata pm : topicMetadata.partitionMetadata()) {
if (pm.replicas().size() < replicationFactor) {
List<Object> newAssignedReplica = new ArrayList<>();
Set<String> currentOccupiedRack = new HashSet<>();
// Make sure the current replicas are in new replica list.
pm.replicas().forEach(node -> {
newAssignedReplica.add(node.id());
currentOccupiedRack.add(rackByBroker.get(node.id()));
});
// Add new replica to partition in rack-aware(if possible), round-robin way.
while (newAssignedReplica.size() < replicationFactor) {
if (!currentOccupiedRack.contains(racks.get(rackCursor)) || currentOccupiedRack.size() == racks.size()) {
String rack = racks.get(rackCursor);
int cursor = cursors[rackCursor];
newAssignedReplica.add(brokersByRack.get(rack).get(cursor));
currentOccupiedRack.add(rack);
cursors[rackCursor] = (cursor + 1) % brokersByRack.get(rack).size();
}
rackCursor = (rackCursor + 1) % racks.size();
}
newReplicaAssignment.put(new TopicAndPartition(topic, pm.partition()),
JavaConverters.asScalaIteratorConverter(newAssignedReplica.iterator()).asScala().toSeq());
}
}
}
if (newReplicaAssignment.nonEmpty()) {
zkUtils.updatePartitionReassignmentData(newReplicaAssignment);
LOG.info(String.format("The replication factor of topic partition %s is increased to %d.",
newReplicaAssignment.keySet(), replicationFactor));
}
}

/**
* Add new partitions to the Kafka topic.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public enum EndPoint {
USER_TASKS,
REVIEW_BOARD,
ADMIN,
REVIEW;
REVIEW,
TOPIC_CONFIGURATION;

private static final List<EndPoint> GET_ENDPOINT = Arrays.asList(BOOTSTRAP,
TRAIN,
Expand All @@ -49,7 +50,8 @@ public enum EndPoint {
RESUME_SAMPLING,
DEMOTE_BROKER,
ADMIN,
REVIEW);
REVIEW,
TOPIC_CONFIGURATION);
private static final List<EndPoint> CACHED_VALUES = Collections.unmodifiableList(Arrays.asList(values()));

public static List<EndPoint> getEndpoint() {
Expand Down
Loading