Skip to content

Commit

Permalink
Automate leadership concurrency adjustment based on broker metrics (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
efeg authored and Adem Efe Gencer committed Nov 3, 2020
1 parent 4389c75 commit 5af8c00
Show file tree
Hide file tree
Showing 9 changed files with 753 additions and 149 deletions.
8 changes: 4 additions & 4 deletions config/cruisecontrol.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright 2017 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
#

# This is an example property file for Kafka Cruise Control. See KafkaCruiseControlConfig for more details.
# This is an example property file for Kafka Cruise Control. See com.linkedin.kafka.cruisecontrol.config.constants for more details.

# Configuration for the metadata client.
# =======================================
Expand Down Expand Up @@ -73,16 +73,16 @@ metric.sampling.interval.ms=120000
# The partition metrics window size in milliseconds
partition.metrics.window.ms=300000

# The number of partition metric windows to keep in memory
num.partition.metrics.windows=1
# The number of partition metric windows to keep in memory. Partition-load-history = num.partition.metrics.windows * partition.metrics.window.ms
num.partition.metrics.windows=5

# The minimum partition metric samples required for a partition in each window
min.samples.per.partition.metrics.window=1

# The broker metrics window size in milliseconds
broker.metrics.window.ms=300000

# The number of broker metric windows to keep in memory
# The number of broker metric windows to keep in memory. Broker-load-history = num.broker.metrics.windows * broker.metrics.window.ms
num.broker.metrics.windows=20

# The minimum broker metric samples required for a partition in each window
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,8 +615,8 @@ private static boolean hasProposalsToExecute(Collection<ExecutionProposal> propo
* when executing proposals (if null, no throttling is applied).
* @param isTriggeredByUserRequest Whether the execution is triggered by a user request.
* @param uuid UUID of the execution.
* @param skipAutoRefreshingConcurrency {@code true} to skip auto refreshing concurrency even if the concurrency adjuster
* is enabled, {@code false} otherwise.
* @param skipInterBrokerReplicaConcurrencyAdjustment {@code true} to skip auto adjusting concurrency of inter-broker
* replica movements even if the concurrency adjuster is enabled, {@code false} otherwise.
*/
public void executeProposals(Set<ExecutionProposal> proposals,
Set<Integer> unthrottledBrokers,
Expand All @@ -629,12 +629,13 @@ public void executeProposals(Set<ExecutionProposal> proposals,
Long replicationThrottle,
boolean isTriggeredByUserRequest,
String uuid,
boolean skipAutoRefreshingConcurrency) throws OngoingExecutionException {
boolean skipInterBrokerReplicaConcurrencyAdjustment) throws OngoingExecutionException {
if (hasProposalsToExecute(proposals, uuid)) {
_executor.executeProposals(proposals, unthrottledBrokers, null, _loadMonitor,
concurrentInterBrokerPartitionMovements, concurrentIntraBrokerPartitionMovements,
concurrentLeaderMovements, executionProgressCheckIntervalMs, replicaMovementStrategy,
replicationThrottle, isTriggeredByUserRequest, uuid, isKafkaAssignerMode, skipAutoRefreshingConcurrency);
replicationThrottle, isTriggeredByUserRequest, uuid, isKafkaAssignerMode,
skipInterBrokerReplicaConcurrencyAdjustment);
} else {
failGeneratingProposalsForExecution(uuid);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,54 +203,87 @@ private void sanityCheckGoalNames() {
* {@link ExecutorConfig#NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_CONFIG}</li>
* <li>{@link ExecutorConfig#CONCURRENCY_ADJUSTER_MAX_PARTITION_MOVEMENTS_PER_BROKER_CONFIG} <=
* {@link ExecutorConfig#MAX_NUM_CLUSTER_MOVEMENTS_CONFIG}</li>
* <li>{@link ExecutorConfig#CONCURRENCY_ADJUSTER_MIN_PARTITION_MOVEMENTS_PER_BROKER_CONFIG} <=
* {@link ExecutorConfig#NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_CONFIG}</li>
* <li>{@link ExecutorConfig#CONCURRENCY_ADJUSTER_MIN_LEADERSHIP_MOVEMENTS_CONFIG} <=
* {@link ExecutorConfig#NUM_CONCURRENT_LEADER_MOVEMENTS_CONFIG}</li>
* <li>{@link ExecutorConfig#CONCURRENCY_ADJUSTER_MAX_LEADERSHIP_MOVEMENTS_CONFIG} >
* {@link ExecutorConfig#NUM_CONCURRENT_LEADER_MOVEMENTS_CONFIG}</li>
* <li>{@link ExecutorConfig#CONCURRENCY_ADJUSTER_MAX_LEADERSHIP_MOVEMENTS_CONFIG} <=
* {@link ExecutorConfig#MAX_NUM_CLUSTER_MOVEMENTS_CONFIG}</li>
* <li>{@link ExecutorConfig#MIN_EXECUTION_PROGRESS_CHECK_INTERVAL_MS_CONFIG} <=
* {@link ExecutorConfig#EXECUTION_PROGRESS_CHECK_INTERVAL_MS_CONFIG}</li>
* </ul>
*/
private void sanityCheckConcurrency() {
void sanityCheckConcurrency() {
int maxClusterPartitionMovementConcurrency = getInt(ExecutorConfig.MAX_NUM_CLUSTER_MOVEMENTS_CONFIG);

int interBrokerPartitionMovementConcurrency = getInt(ExecutorConfig.NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_CONFIG);
if (interBrokerPartitionMovementConcurrency >= maxClusterPartitionMovementConcurrency) {
throw new ConfigException("Inter-broker partition movement concurrency [" + interBrokerPartitionMovementConcurrency
+ "] must be smaller than the maximum number of allowed movements in cluster ["
+ maxClusterPartitionMovementConcurrency + "].");
throw new ConfigException(String.format("Inter-broker partition movement concurrency [%d] must be smaller than the "
+ "maximum number of allowed movements in cluster [%d].",
interBrokerPartitionMovementConcurrency, maxClusterPartitionMovementConcurrency));
}

int intraBrokerPartitionMovementConcurrency = getInt(ExecutorConfig.NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS_CONFIG);
if (intraBrokerPartitionMovementConcurrency >= maxClusterPartitionMovementConcurrency) {
throw new ConfigException("Intra-broker partition movement concurrency [" + intraBrokerPartitionMovementConcurrency
+ "] must be smaller than the maximum number of allowed movements in cluster ["
+ maxClusterPartitionMovementConcurrency + "].");
throw new ConfigException(String.format("Intra-broker partition movement concurrency [%d] must be smaller than the "
+ "maximum number of allowed movements in cluster [%d].",
intraBrokerPartitionMovementConcurrency, maxClusterPartitionMovementConcurrency));
}

int leadershipMovementConcurrency = getInt(ExecutorConfig.NUM_CONCURRENT_LEADER_MOVEMENTS_CONFIG);
if (leadershipMovementConcurrency > maxClusterPartitionMovementConcurrency) {
throw new ConfigException("Leadership movement concurrency [" + leadershipMovementConcurrency
+ "] cannot be greater than the maximum number of allowed movements in cluster ["
+ maxClusterPartitionMovementConcurrency + "].");
throw new ConfigException(String.format("Leadership movement concurrency [%d] cannot be greater than the maximum number"
+ " of allowed movements in cluster [%d].",
leadershipMovementConcurrency, maxClusterPartitionMovementConcurrency));
}

int concurrencyAdjusterMaxPartitionMovementsPerBroker = getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MAX_PARTITION_MOVEMENTS_PER_BROKER_CONFIG);
if (interBrokerPartitionMovementConcurrency >= concurrencyAdjusterMaxPartitionMovementsPerBroker) {
throw new ConfigException("Inter-broker partition movement concurrency [" + interBrokerPartitionMovementConcurrency
+ "] must be smaller than the concurrency adjuster maximum partition movements per broker ["
+ concurrencyAdjusterMaxPartitionMovementsPerBroker + "].");
throw new ConfigException(String.format("Inter-broker partition movement concurrency [%d] must be smaller than the "
+ "concurrency adjuster maximum partition movements per broker [%d].",
interBrokerPartitionMovementConcurrency, concurrencyAdjusterMaxPartitionMovementsPerBroker));
}

if (concurrencyAdjusterMaxPartitionMovementsPerBroker > maxClusterPartitionMovementConcurrency) {
throw new ConfigException("Maximum partition movements per broker of concurrency adjuster ["
+ concurrencyAdjusterMaxPartitionMovementsPerBroker
+ "] cannot be greater than the maximum number of allowed movements in cluster ["
+ maxClusterPartitionMovementConcurrency + "].");
throw new ConfigException(String.format("Maximum partition movements per broker of concurrency adjuster [%d] cannot"
+ " be greater than the maximum number of allowed movements in cluster [%d].",
concurrencyAdjusterMaxPartitionMovementsPerBroker, maxClusterPartitionMovementConcurrency));
}

int concurrencyAdjusterMinPartitionMovementsPerBroker = getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MIN_PARTITION_MOVEMENTS_PER_BROKER_CONFIG);
if (interBrokerPartitionMovementConcurrency < concurrencyAdjusterMinPartitionMovementsPerBroker) {
throw new ConfigException(String.format("Inter-broker partition movement concurrency [%d] cannot be smaller than the"
+ " concurrency adjuster minimum partition movements per broker [%d].",
interBrokerPartitionMovementConcurrency, concurrencyAdjusterMinPartitionMovementsPerBroker));
}

int concurrencyAdjusterMinLeadershipMovements = getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MIN_LEADERSHIP_MOVEMENTS_CONFIG);
if (leadershipMovementConcurrency < concurrencyAdjusterMinLeadershipMovements) {
throw new ConfigException(String.format("Leadership movement concurrency [%d] cannot be smaller than the concurrency "
+ "adjuster minimum leadership movements [%d].", leadershipMovementConcurrency,
concurrencyAdjusterMinLeadershipMovements));
}

int concurrencyAdjusterMaxLeadershipMovements = getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MAX_LEADERSHIP_MOVEMENTS_CONFIG);
if (leadershipMovementConcurrency > concurrencyAdjusterMaxLeadershipMovements) {
throw new ConfigException(String.format("Leadership movement concurrency [%d] cannot be greater than the concurrency "
+ "adjuster maximum leadership movements [%d].", leadershipMovementConcurrency,
concurrencyAdjusterMaxLeadershipMovements));
}

if (concurrencyAdjusterMaxLeadershipMovements > maxClusterPartitionMovementConcurrency) {
throw new ConfigException(String.format("Maximum leadership movements of concurrency adjuster [%d] cannot be greater "
+ "than the maximum number of allowed movements in cluster [%d].",
concurrencyAdjusterMaxLeadershipMovements, maxClusterPartitionMovementConcurrency));
}
long minExecutionProgressCheckIntervalMs = getLong(ExecutorConfig.MIN_EXECUTION_PROGRESS_CHECK_INTERVAL_MS_CONFIG);
long defaultExecutionProgressCheckIntervalMs = getLong(ExecutorConfig.EXECUTION_PROGRESS_CHECK_INTERVAL_MS_CONFIG);
if (minExecutionProgressCheckIntervalMs > defaultExecutionProgressCheckIntervalMs) {
throw new ConfigException("Minimum execution progress check interval ["
+ minExecutionProgressCheckIntervalMs
+ "] cannot be greater than the default execution progress check interval ["
+ defaultExecutionProgressCheckIntervalMs + "].");
throw new ConfigException(String.format("Minimum execution progress check interval [%d] cannot be greater than the "
+ "default execution progress check interval [%d].",
minExecutionProgressCheckIntervalMs, defaultExecutionProgressCheckIntervalMs));
}
}

Expand All @@ -262,8 +295,8 @@ private void sanityCheckTaskExecutionAlertingThreshold() {
long leaderMovementTimeoutMs = getLong(ExecutorConfig.LEADER_MOVEMENT_TIMEOUT_MS_CONFIG);
long taskExecutionAlertingThresholdMs = getLong(ExecutorConfig.TASK_EXECUTION_ALERTING_THRESHOLD_MS_CONFIG);
if (taskExecutionAlertingThresholdMs >= leaderMovementTimeoutMs) {
throw new ConfigException("Task execution time alerting threshold [" + taskExecutionAlertingThresholdMs
+ "ms] cannot be greater than leader movement timeout [" + leaderMovementTimeoutMs + "ms].");
throw new ConfigException(String.format("Task execution time alerting threshold [%dms] cannot be greater than leader movement"
+ " timeout [%sms].", taskExecutionAlertingThresholdMs, leaderMovementTimeoutMs));
}
}

Expand Down
Loading

0 comments on commit 5af8c00

Please sign in to comment.