diff --git a/config/cruisecontrol.properties b/config/cruisecontrol.properties index 40fc5224a11ba..00c19a314bca5 100644 --- a/config/cruisecontrol.properties +++ b/config/cruisecontrol.properties @@ -2,7 +2,7 @@ # Copyright 2017 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. # -# This is an example property file for Kafka Cruise Control. See KafkaCruiseControlConfig for more details. +# This is an example property file for Kafka Cruise Control. See com.linkedin.kafka.cruisecontrol.config.constants for more details. # Configuration for the metadata client. # ======================================= @@ -73,8 +73,8 @@ metric.sampling.interval.ms=120000 # The partition metrics window size in milliseconds partition.metrics.window.ms=300000 -# The number of partition metric windows to keep in memory -num.partition.metrics.windows=1 +# The number of partition metric windows to keep in memory. Partition-load-history = num.partition.metrics.windows * partition.metrics.window.ms +num.partition.metrics.windows=5 # The minimum partition metric samples required for a partition in each window min.samples.per.partition.metrics.window=1 @@ -82,7 +82,7 @@ min.samples.per.partition.metrics.window=1 # The broker metrics window size in milliseconds broker.metrics.window.ms=300000 -# The number of broker metric windows to keep in memory +# The number of broker metric windows to keep in memory. Broker-load-history = num.broker.metrics.windows * broker.metrics.window.ms num.broker.metrics.windows=20 # The minimum broker metric samples required for a partition in each window diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java index f2fbd149e1fed..1d9020962003d 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java @@ -615,8 +615,8 @@ private static boolean hasProposalsToExecute(Collection propo * when executing proposals (if null, no throttling is applied). * @param isTriggeredByUserRequest Whether the execution is triggered by a user request. * @param uuid UUID of the execution. - * @param skipAutoRefreshingConcurrency {@code true} to skip auto refreshing concurrency even if the concurrency adjuster - * is enabled, {@code false} otherwise. + * @param skipInterBrokerReplicaConcurrencyAdjustment {@code true} to skip auto adjusting concurrency of inter-broker + * replica movements even if the concurrency adjuster is enabled, {@code false} otherwise. */ public void executeProposals(Set proposals, Set unthrottledBrokers, @@ -629,12 +629,13 @@ public void executeProposals(Set proposals, Long replicationThrottle, boolean isTriggeredByUserRequest, String uuid, - boolean skipAutoRefreshingConcurrency) throws OngoingExecutionException { + boolean skipInterBrokerReplicaConcurrencyAdjustment) throws OngoingExecutionException { if (hasProposalsToExecute(proposals, uuid)) { _executor.executeProposals(proposals, unthrottledBrokers, null, _loadMonitor, concurrentInterBrokerPartitionMovements, concurrentIntraBrokerPartitionMovements, concurrentLeaderMovements, executionProgressCheckIntervalMs, replicaMovementStrategy, - replicationThrottle, isTriggeredByUserRequest, uuid, isKafkaAssignerMode, skipAutoRefreshingConcurrency); + replicationThrottle, isTriggeredByUserRequest, uuid, isKafkaAssignerMode, + skipInterBrokerReplicaConcurrencyAdjustment); } else { failGeneratingProposalsForExecution(uuid); } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaCruiseControlConfig.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaCruiseControlConfig.java index 66e5cc2b8bd56..06bc881450890 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaCruiseControlConfig.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaCruiseControlConfig.java @@ -203,54 +203,87 @@ private void sanityCheckGoalNames() { * {@link ExecutorConfig#NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_CONFIG} *
  • {@link ExecutorConfig#CONCURRENCY_ADJUSTER_MAX_PARTITION_MOVEMENTS_PER_BROKER_CONFIG} <= * {@link ExecutorConfig#MAX_NUM_CLUSTER_MOVEMENTS_CONFIG}
  • + *
  • {@link ExecutorConfig#CONCURRENCY_ADJUSTER_MIN_PARTITION_MOVEMENTS_PER_BROKER_CONFIG} <= + * {@link ExecutorConfig#NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_CONFIG}
  • + *
  • {@link ExecutorConfig#CONCURRENCY_ADJUSTER_MIN_LEADERSHIP_MOVEMENTS_CONFIG} <= + * {@link ExecutorConfig#NUM_CONCURRENT_LEADER_MOVEMENTS_CONFIG}
  • + *
  • {@link ExecutorConfig#CONCURRENCY_ADJUSTER_MAX_LEADERSHIP_MOVEMENTS_CONFIG} > + * {@link ExecutorConfig#NUM_CONCURRENT_LEADER_MOVEMENTS_CONFIG}
  • + *
  • {@link ExecutorConfig#CONCURRENCY_ADJUSTER_MAX_LEADERSHIP_MOVEMENTS_CONFIG} <= + * {@link ExecutorConfig#MAX_NUM_CLUSTER_MOVEMENTS_CONFIG}
  • *
  • {@link ExecutorConfig#MIN_EXECUTION_PROGRESS_CHECK_INTERVAL_MS_CONFIG} <= * {@link ExecutorConfig#EXECUTION_PROGRESS_CHECK_INTERVAL_MS_CONFIG}
  • * */ - private void sanityCheckConcurrency() { + void sanityCheckConcurrency() { int maxClusterPartitionMovementConcurrency = getInt(ExecutorConfig.MAX_NUM_CLUSTER_MOVEMENTS_CONFIG); int interBrokerPartitionMovementConcurrency = getInt(ExecutorConfig.NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_CONFIG); if (interBrokerPartitionMovementConcurrency >= maxClusterPartitionMovementConcurrency) { - throw new ConfigException("Inter-broker partition movement concurrency [" + interBrokerPartitionMovementConcurrency - + "] must be smaller than the maximum number of allowed movements in cluster [" - + maxClusterPartitionMovementConcurrency + "]."); + throw new ConfigException(String.format("Inter-broker partition movement concurrency [%d] must be smaller than the " + + "maximum number of allowed movements in cluster [%d].", + interBrokerPartitionMovementConcurrency, maxClusterPartitionMovementConcurrency)); } int intraBrokerPartitionMovementConcurrency = getInt(ExecutorConfig.NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS_CONFIG); if (intraBrokerPartitionMovementConcurrency >= maxClusterPartitionMovementConcurrency) { - throw new ConfigException("Intra-broker partition movement concurrency [" + intraBrokerPartitionMovementConcurrency - + "] must be smaller than the maximum number of allowed movements in cluster [" - + maxClusterPartitionMovementConcurrency + "]."); + throw new ConfigException(String.format("Intra-broker partition movement concurrency [%d] must be smaller than the " + + "maximum number of allowed movements in cluster [%d].", + intraBrokerPartitionMovementConcurrency, maxClusterPartitionMovementConcurrency)); } int leadershipMovementConcurrency = getInt(ExecutorConfig.NUM_CONCURRENT_LEADER_MOVEMENTS_CONFIG); if (leadershipMovementConcurrency > maxClusterPartitionMovementConcurrency) { - throw new ConfigException("Leadership movement concurrency [" + leadershipMovementConcurrency - + "] cannot be greater than the maximum number of allowed movements in cluster [" - + maxClusterPartitionMovementConcurrency + "]."); + throw new ConfigException(String.format("Leadership movement concurrency [%d] cannot be greater than the maximum number" + + " of allowed movements in cluster [%d].", + leadershipMovementConcurrency, maxClusterPartitionMovementConcurrency)); } int concurrencyAdjusterMaxPartitionMovementsPerBroker = getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MAX_PARTITION_MOVEMENTS_PER_BROKER_CONFIG); if (interBrokerPartitionMovementConcurrency >= concurrencyAdjusterMaxPartitionMovementsPerBroker) { - throw new ConfigException("Inter-broker partition movement concurrency [" + interBrokerPartitionMovementConcurrency - + "] must be smaller than the concurrency adjuster maximum partition movements per broker [" - + concurrencyAdjusterMaxPartitionMovementsPerBroker + "]."); + throw new ConfigException(String.format("Inter-broker partition movement concurrency [%d] must be smaller than the " + + "concurrency adjuster maximum partition movements per broker [%d].", + interBrokerPartitionMovementConcurrency, concurrencyAdjusterMaxPartitionMovementsPerBroker)); } if (concurrencyAdjusterMaxPartitionMovementsPerBroker > maxClusterPartitionMovementConcurrency) { - throw new ConfigException("Maximum partition movements per broker of concurrency adjuster [" - + concurrencyAdjusterMaxPartitionMovementsPerBroker - + "] cannot be greater than the maximum number of allowed movements in cluster [" - + maxClusterPartitionMovementConcurrency + "]."); + throw new ConfigException(String.format("Maximum partition movements per broker of concurrency adjuster [%d] cannot" + + " be greater than the maximum number of allowed movements in cluster [%d].", + concurrencyAdjusterMaxPartitionMovementsPerBroker, maxClusterPartitionMovementConcurrency)); + } + + int concurrencyAdjusterMinPartitionMovementsPerBroker = getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MIN_PARTITION_MOVEMENTS_PER_BROKER_CONFIG); + if (interBrokerPartitionMovementConcurrency < concurrencyAdjusterMinPartitionMovementsPerBroker) { + throw new ConfigException(String.format("Inter-broker partition movement concurrency [%d] cannot be smaller than the" + + " concurrency adjuster minimum partition movements per broker [%d].", + interBrokerPartitionMovementConcurrency, concurrencyAdjusterMinPartitionMovementsPerBroker)); + } + + int concurrencyAdjusterMinLeadershipMovements = getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MIN_LEADERSHIP_MOVEMENTS_CONFIG); + if (leadershipMovementConcurrency < concurrencyAdjusterMinLeadershipMovements) { + throw new ConfigException(String.format("Leadership movement concurrency [%d] cannot be smaller than the concurrency " + + "adjuster minimum leadership movements [%d].", leadershipMovementConcurrency, + concurrencyAdjusterMinLeadershipMovements)); + } + + int concurrencyAdjusterMaxLeadershipMovements = getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MAX_LEADERSHIP_MOVEMENTS_CONFIG); + if (leadershipMovementConcurrency > concurrencyAdjusterMaxLeadershipMovements) { + throw new ConfigException(String.format("Leadership movement concurrency [%d] cannot be greater than the concurrency " + + "adjuster maximum leadership movements [%d].", leadershipMovementConcurrency, + concurrencyAdjusterMaxLeadershipMovements)); + } + + if (concurrencyAdjusterMaxLeadershipMovements > maxClusterPartitionMovementConcurrency) { + throw new ConfigException(String.format("Maximum leadership movements of concurrency adjuster [%d] cannot be greater " + + "than the maximum number of allowed movements in cluster [%d].", + concurrencyAdjusterMaxLeadershipMovements, maxClusterPartitionMovementConcurrency)); } long minExecutionProgressCheckIntervalMs = getLong(ExecutorConfig.MIN_EXECUTION_PROGRESS_CHECK_INTERVAL_MS_CONFIG); long defaultExecutionProgressCheckIntervalMs = getLong(ExecutorConfig.EXECUTION_PROGRESS_CHECK_INTERVAL_MS_CONFIG); if (minExecutionProgressCheckIntervalMs > defaultExecutionProgressCheckIntervalMs) { - throw new ConfigException("Minimum execution progress check interval [" - + minExecutionProgressCheckIntervalMs - + "] cannot be greater than the default execution progress check interval [" - + defaultExecutionProgressCheckIntervalMs + "]."); + throw new ConfigException(String.format("Minimum execution progress check interval [%d] cannot be greater than the " + + "default execution progress check interval [%d].", + minExecutionProgressCheckIntervalMs, defaultExecutionProgressCheckIntervalMs)); } } @@ -262,8 +295,8 @@ private void sanityCheckTaskExecutionAlertingThreshold() { long leaderMovementTimeoutMs = getLong(ExecutorConfig.LEADER_MOVEMENT_TIMEOUT_MS_CONFIG); long taskExecutionAlertingThresholdMs = getLong(ExecutorConfig.TASK_EXECUTION_ALERTING_THRESHOLD_MS_CONFIG); if (taskExecutionAlertingThresholdMs >= leaderMovementTimeoutMs) { - throw new ConfigException("Task execution time alerting threshold [" + taskExecutionAlertingThresholdMs - + "ms] cannot be greater than leader movement timeout [" + leaderMovementTimeoutMs + "ms]."); + throw new ConfigException(String.format("Task execution time alerting threshold [%dms] cannot be greater than leader movement" + + " timeout [%sms].", taskExecutionAlertingThresholdMs, leaderMovementTimeoutMs)); } } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java index 87179bf60f369..7c6cdf009bf9c 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java @@ -202,8 +202,7 @@ private ExecutorConfig() { */ public static final String CONCURRENCY_ADJUSTER_INTERVAL_MS_CONFIG = "concurrency.adjuster.interval.ms"; public static final long DEFAULT_CONCURRENCY_ADJUSTER_INTERVAL_MS = 360000L; - public static final String CONCURRENCY_ADJUSTER_INTERVAL_MS_DOC = "The interval of concurrency auto adjustment for" - + " inter-broker partition movements."; + public static final String CONCURRENCY_ADJUSTER_INTERVAL_MS_DOC = "The interval of concurrency auto adjustment."; /** * concurrency.adjuster.max.partition.movements.per.broker @@ -216,13 +215,64 @@ private ExecutorConfig() { + "It enforces a cap on the maximum concurrent inter-broker partition movements to avoid overwhelming the cluster. It " + "must be greater than num.concurrent.partition.movements.per.broker and not more than max.num.cluster.movements."; + /** + * concurrency.adjuster.max.leadership.movements + */ + public static final String CONCURRENCY_ADJUSTER_MAX_LEADERSHIP_MOVEMENTS_CONFIG = "concurrency.adjuster.max.leadership.movements"; + public static final int DEFAULT_CONCURRENCY_ADJUSTER_MAX_LEADERSHIP_MOVEMENTS = 1100; + public static final String CONCURRENCY_ADJUSTER_MAX_LEADERSHIP_MOVEMENTS_DOC = "The maximum number of leadership movements " + + "the concurrency auto adjustment will allow the executor to perform in one batch to avoid overwhelming the cluster. " + + "It cannot be (1) smaller than num.concurrent.leader.movements and (2) greater than max.num.cluster.movements."; + + /** + * concurrency.adjuster.min.partition.movements.per.broker + */ + public static final String CONCURRENCY_ADJUSTER_MIN_PARTITION_MOVEMENTS_PER_BROKER_CONFIG = + "concurrency.adjuster.min.partition.movements.per.broker"; + public static final int DEFAULT_CONCURRENCY_ADJUSTER_MIN_PARTITION_MOVEMENTS_PER_BROKER = 1; + public static final String CONCURRENCY_ADJUSTER_MIN_PARTITION_MOVEMENTS_PER_BROKER_DOC = "The minimum number of " + + "partitions the concurrency auto adjustment will allow the executor to move in or out of a broker at the same time. " + + "It enforces a cap on the minimum concurrent inter-broker partition movements to avoid an unacceptable execution pace." + + " It cannot be greater than num.concurrent.partition.movements.per.broker."; + + /** + * concurrency.adjuster.min.leadership.movements + */ + public static final String CONCURRENCY_ADJUSTER_MIN_LEADERSHIP_MOVEMENTS_CONFIG = "concurrency.adjuster.min.leadership.movements"; + public static final int DEFAULT_CONCURRENCY_ADJUSTER_MIN_LEADERSHIP_MOVEMENTS = 100; + public static final String CONCURRENCY_ADJUSTER_MIN_LEADERSHIP_MOVEMENTS_DOC = "The minimum number of leadership movements " + + "the concurrency auto adjustment will allow the executor to perform in one batch to avoid an unacceptable execution pace." + + " It cannot be greater than num.concurrent.leader.movements."; + /** * concurrency.adjuster.enabled + * @deprecated This config will be removed in a future release. Please enable concurrency adjusters individually using: + *
      + *
    • {@link #CONCURRENCY_ADJUSTER_INTER_BROKER_REPLICA_ENABLED_CONFIG}
    • + *
    • {@link #CONCURRENCY_ADJUSTER_LEADERSHIP_ENABLED_CONFIG}
    • + *
    */ + @Deprecated public static final String CONCURRENCY_ADJUSTER_ENABLED_CONFIG = "concurrency.adjuster.enabled"; public static final boolean DEFAULT_CONCURRENCY_ADJUSTER_ENABLED = false; public static final String CONCURRENCY_ADJUSTER_ENABLED_DOC = "The flag to indicate whether the concurrency of " - + "inter-broker partition movements will be auto-adjusted based on dynamically changing broker metrics."; + + "all supported movements will be auto-adjusted based on dynamically changing broker metrics. It enables concurrency adjuster " + + "for all supported concurrency types, regardless of whether the particular concurrency type is disabled."; + + /** + * concurrency.adjuster.inter.broker.replica.enabled + */ + public static final String CONCURRENCY_ADJUSTER_INTER_BROKER_REPLICA_ENABLED_CONFIG = "concurrency.adjuster.inter.broker.replica.enabled"; + public static final boolean DEFAULT_CONCURRENCY_ADJUSTER_INTER_BROKER_REPLICA_ENABLED = false; + public static final String CONCURRENCY_ADJUSTER_INTER_BROKER_REPLICA_ENABLED_DOC = "Enable concurrency adjuster for " + + "inter-broker replica reassignments."; + + /** + * concurrency.adjuster.leadership.enabled + */ + public static final String CONCURRENCY_ADJUSTER_LEADERSHIP_ENABLED_CONFIG = "concurrency.adjuster.leadership.enabled"; + public static final boolean DEFAULT_CONCURRENCY_ADJUSTER_LEADERSHIP_ENABLED = false; + public static final String CONCURRENCY_ADJUSTER_LEADERSHIP_ENABLED_DOC = "Enable concurrency adjuster for leadership reassignments."; /** * concurrency.adjuster.limit.log.flush.time.ms @@ -230,8 +280,8 @@ private ExecutorConfig() { public static final String CONCURRENCY_ADJUSTER_LIMIT_LOG_FLUSH_TIME_MS_CONFIG = "concurrency.adjuster.limit.log.flush.time.ms"; public static final double DEFAULT_CONCURRENCY_ADJUSTER_LIMIT_LOG_FLUSH_TIME_MS = 2000.0; public static final String CONCURRENCY_ADJUSTER_LIMIT_LOG_FLUSH_TIME_MS_DOC = "The limit on the 99.9th percentile broker metric" - + " value of log flush time. If any broker exceeds this limit during an ongoing inter-broker partition reassignment, the " - + "concurrency adjuster (if enabled) attempts to decrease the number of allowed concurrent inter-broker partition movements."; + + " value of log flush time. If any broker exceeds this limit during an ongoing reassignment, the relevant concurrency " + + "adjuster (if enabled) attempts to decrease the number of allowed concurrent movements."; /** * concurrency.adjuster.limit.follower.fetch.local.time.ms @@ -240,8 +290,8 @@ private ExecutorConfig() { = "concurrency.adjuster.limit.follower.fetch.local.time.ms"; public static final double DEFAULT_CONCURRENCY_ADJUSTER_LIMIT_FOLLOWER_FETCH_LOCAL_TIME_MS = 500.0; public static final String CONCURRENCY_ADJUSTER_LIMIT_FOLLOWER_FETCH_LOCAL_TIME_MS_DOC = "The limit on the 99.9th percentile broker metric" - + " value of follower fetch local time. If any broker exceeds this limit during an ongoing inter-broker partition reassignment, the " - + "concurrency adjuster (if enabled) attempts to decrease the number of allowed concurrent inter-broker partition movements."; + + " value of follower fetch local time. If any broker exceeds this limit during an ongoing reassignment, the relevant concurrency " + + "adjuster (if enabled) attempts to decrease the number of allowed concurrent movements."; /** * concurrency.adjuster.limit.produce.local.time.ms @@ -249,8 +299,8 @@ private ExecutorConfig() { public static final String CONCURRENCY_ADJUSTER_LIMIT_PRODUCE_LOCAL_TIME_MS_CONFIG = "concurrency.adjuster.limit.produce.local.time.ms"; public static final double DEFAULT_CONCURRENCY_ADJUSTER_LIMIT_PRODUCE_LOCAL_TIME_MS = 1000.0; public static final String CONCURRENCY_ADJUSTER_LIMIT_PRODUCE_LOCAL_TIME_MS_DOC = "The limit on the 99.9th percentile broker metric" - + " value of produce local time. If any broker exceeds this limit during an ongoing inter-broker partition reassignment, the " - + "concurrency adjuster (if enabled) attempts to decrease the number of allowed concurrent inter-broker partition movements."; + + " value of produce local time. If any broker exceeds this limit during an ongoing reassignment, the relevant concurrency " + + "adjuster (if enabled) attempts to decrease the number of allowed concurrent movements."; /** * concurrency.adjuster.limit.consumer.fetch.local.time.ms @@ -259,8 +309,8 @@ private ExecutorConfig() { = "concurrency.adjuster.limit.consumer.fetch.local.time.ms"; public static final double DEFAULT_CONCURRENCY_ADJUSTER_LIMIT_CONSUMER_FETCH_LOCAL_TIME_MS = 500.0; public static final String CONCURRENCY_ADJUSTER_LIMIT_CONSUMER_FETCH_LOCAL_TIME_MS_DOC = "The limit on the 99.9th percentile broker metric" - + " value of consumer fetch local time. If any broker exceeds this limit during an ongoing inter-broker partition reassignment, the " - + "concurrency adjuster (if enabled) attempts to decrease the number of allowed concurrent inter-broker partition movements."; + + " value of consumer fetch local time. If any broker exceeds this limit during an ongoing reassignment, the relevant concurrency " + + "adjuster (if enabled) attempts to decrease the number of allowed concurrent movements."; /** * concurrency.adjuster.limit.request.queue.size @@ -268,8 +318,48 @@ private ExecutorConfig() { public static final String CONCURRENCY_ADJUSTER_LIMIT_REQUEST_QUEUE_SIZE_CONFIG = "concurrency.adjuster.limit.request.queue.size"; public static final double DEFAULT_CONCURRENCY_ADJUSTER_LIMIT_REQUEST_QUEUE_SIZE = 1000.0; public static final String CONCURRENCY_ADJUSTER_LIMIT_REQUEST_QUEUE_SIZE_DOC = "The limit on the broker metric value of request " - + "queue size. If any broker exceeds this limit during an ongoing inter-broker partition reassignment, the concurrency adjuster" - + " (if enabled) attempts to decrease the number of allowed concurrent inter-broker partition movements."; + + "queue size. If any broker exceeds this limit during an ongoing reassignment, the relevant concurrency adjuster" + + " (if enabled) attempts to decrease the number of allowed concurrent movements."; + + /** + * concurrency.adjuster.additive.increase.inter.broker.replica + */ + public static final String CONCURRENCY_ADJUSTER_ADDITIVE_INCREASE_INTER_BROKER_REPLICA_CONFIG + = "concurrency.adjuster.additive.increase.inter.broker.replica"; + public static final int DEFAULT_CONCURRENCY_ADJUSTER_ADDITIVE_INCREASE_INTER_BROKER_REPLICA = 1; + public static final String CONCURRENCY_ADJUSTER_ADDITIVE_INCREASE_INTER_BROKER_REPLICA_DOC = "The fixed number by which the " + + "concurrency cap on inter-broker replica movements will be increased by the concurrency adjuster (if enabled) when all " + + "considered metrics are within the concurrency adjuster limit."; + + /** + * concurrency.adjuster.additive.increase.leadership + */ + public static final String CONCURRENCY_ADJUSTER_ADDITIVE_INCREASE_LEADERSHIP_CONFIG + = "concurrency.adjuster.additive.increase.leadership"; + public static final int DEFAULT_CONCURRENCY_ADJUSTER_ADDITIVE_INCREASE_LEADERSHIP = 100; + public static final String CONCURRENCY_ADJUSTER_ADDITIVE_INCREASE_LEADERSHIP_DOC = "The fixed number by which the " + + "concurrency cap on leadership movements will be increased by the concurrency adjuster (if enabled) when all " + + "considered metrics are within the concurrency adjuster limit."; + + /** + * concurrency.adjuster.multiplicative.decrease.inter.broker.replica + */ + public static final String CONCURRENCY_ADJUSTER_MULTIPLICATIVE_DECREASE_INTER_BROKER_REPLICA_CONFIG + = "concurrency.adjuster.multiplicative.decrease.inter.broker.replica"; + public static final int DEFAULT_CONCURRENCY_ADJUSTER_MULTIPLICATIVE_DECREASE_INTER_BROKER_REPLICA = 2; + public static final String CONCURRENCY_ADJUSTER_MULTIPLICATIVE_DECREASE_INTER_BROKER_REPLICA_DOC = "The fixed number by which the " + + "concurrency cap on inter-broker replica movements will be divided by the concurrency adjuster (if enabled) when any " + + "considered metric exceeds the concurrency adjuster limit."; + + /** + * concurrency.adjuster.multiplicative.decrease.leadership + */ + public static final String CONCURRENCY_ADJUSTER_MULTIPLICATIVE_DECREASE_LEADERSHIP_CONFIG + = "concurrency.adjuster.multiplicative.decrease.leadership"; + public static final int DEFAULT_CONCURRENCY_ADJUSTER_MULTIPLICATIVE_DECREASE_LEADERSHIP = 2; + public static final String CONCURRENCY_ADJUSTER_MULTIPLICATIVE_DECREASE_LEADERSHIP_DOC = "The fixed number by which the " + + "concurrency cap on leadership movements will be divided by the concurrency adjuster (if enabled) when any " + + "considered metric exceeds the concurrency adjuster limit."; /** * list.partition.reassignment.timeout.ms @@ -424,11 +514,39 @@ public static ConfigDef define(ConfigDef configDef) { atLeast(1), ConfigDef.Importance.MEDIUM, CONCURRENCY_ADJUSTER_MAX_PARTITION_MOVEMENTS_PER_BROKER_DOC) + .define(CONCURRENCY_ADJUSTER_MAX_LEADERSHIP_MOVEMENTS_CONFIG, + ConfigDef.Type.INT, + DEFAULT_CONCURRENCY_ADJUSTER_MAX_LEADERSHIP_MOVEMENTS, + atLeast(1), + ConfigDef.Importance.LOW, + CONCURRENCY_ADJUSTER_MAX_LEADERSHIP_MOVEMENTS_DOC) + .define(CONCURRENCY_ADJUSTER_MIN_PARTITION_MOVEMENTS_PER_BROKER_CONFIG, + ConfigDef.Type.INT, + DEFAULT_CONCURRENCY_ADJUSTER_MIN_PARTITION_MOVEMENTS_PER_BROKER, + atLeast(1), + ConfigDef.Importance.LOW, + CONCURRENCY_ADJUSTER_MIN_PARTITION_MOVEMENTS_PER_BROKER_DOC) + .define(CONCURRENCY_ADJUSTER_MIN_LEADERSHIP_MOVEMENTS_CONFIG, + ConfigDef.Type.INT, + DEFAULT_CONCURRENCY_ADJUSTER_MIN_LEADERSHIP_MOVEMENTS, + atLeast(1), + ConfigDef.Importance.LOW, + CONCURRENCY_ADJUSTER_MIN_LEADERSHIP_MOVEMENTS_DOC) .define(CONCURRENCY_ADJUSTER_ENABLED_CONFIG, ConfigDef.Type.BOOLEAN, DEFAULT_CONCURRENCY_ADJUSTER_ENABLED, ConfigDef.Importance.HIGH, CONCURRENCY_ADJUSTER_ENABLED_DOC) + .define(CONCURRENCY_ADJUSTER_INTER_BROKER_REPLICA_ENABLED_CONFIG, + ConfigDef.Type.BOOLEAN, + DEFAULT_CONCURRENCY_ADJUSTER_INTER_BROKER_REPLICA_ENABLED, + ConfigDef.Importance.HIGH, + CONCURRENCY_ADJUSTER_INTER_BROKER_REPLICA_ENABLED_DOC) + .define(CONCURRENCY_ADJUSTER_LEADERSHIP_ENABLED_CONFIG, + ConfigDef.Type.BOOLEAN, + DEFAULT_CONCURRENCY_ADJUSTER_LEADERSHIP_ENABLED, + ConfigDef.Importance.HIGH, + CONCURRENCY_ADJUSTER_LEADERSHIP_ENABLED_DOC) .define(CONCURRENCY_ADJUSTER_LIMIT_LOG_FLUSH_TIME_MS_CONFIG, ConfigDef.Type.DOUBLE, DEFAULT_CONCURRENCY_ADJUSTER_LIMIT_LOG_FLUSH_TIME_MS, @@ -459,6 +577,30 @@ public static ConfigDef define(ConfigDef configDef) { atLeast(10.0), ConfigDef.Importance.MEDIUM, CONCURRENCY_ADJUSTER_LIMIT_REQUEST_QUEUE_SIZE_DOC) + .define(CONCURRENCY_ADJUSTER_ADDITIVE_INCREASE_INTER_BROKER_REPLICA_CONFIG, + ConfigDef.Type.INT, + DEFAULT_CONCURRENCY_ADJUSTER_ADDITIVE_INCREASE_INTER_BROKER_REPLICA, + atLeast(1), + ConfigDef.Importance.LOW, + CONCURRENCY_ADJUSTER_ADDITIVE_INCREASE_INTER_BROKER_REPLICA_DOC) + .define(CONCURRENCY_ADJUSTER_ADDITIVE_INCREASE_LEADERSHIP_CONFIG, + ConfigDef.Type.INT, + DEFAULT_CONCURRENCY_ADJUSTER_ADDITIVE_INCREASE_LEADERSHIP, + atLeast(1), + ConfigDef.Importance.LOW, + CONCURRENCY_ADJUSTER_ADDITIVE_INCREASE_LEADERSHIP_DOC) + .define(CONCURRENCY_ADJUSTER_MULTIPLICATIVE_DECREASE_INTER_BROKER_REPLICA_CONFIG, + ConfigDef.Type.INT, + DEFAULT_CONCURRENCY_ADJUSTER_MULTIPLICATIVE_DECREASE_INTER_BROKER_REPLICA, + atLeast(2), + ConfigDef.Importance.LOW, + CONCURRENCY_ADJUSTER_MULTIPLICATIVE_DECREASE_INTER_BROKER_REPLICA_DOC) + .define(CONCURRENCY_ADJUSTER_MULTIPLICATIVE_DECREASE_LEADERSHIP_CONFIG, + ConfigDef.Type.INT, + DEFAULT_CONCURRENCY_ADJUSTER_MULTIPLICATIVE_DECREASE_LEADERSHIP, + atLeast(2), + ConfigDef.Importance.LOW, + CONCURRENCY_ADJUSTER_MULTIPLICATIVE_DECREASE_LEADERSHIP_DOC) .define(LIST_PARTITION_REASSIGNMENTS_TIMEOUT_MS_CONFIG, ConfigDef.Type.LONG, DEFAULT_LIST_PARTITION_REASSIGNMENTS_TIMEOUT_MS, diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionUtils.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionUtils.java index a182470e50d16..c520bf7313fd1 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionUtils.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ExecutionUtils.java @@ -65,8 +65,10 @@ public final class ExecutionUtils { public static final String GAUGE_EXECUTION_LEADERSHIP_MOVEMENTS_GLOBAL_CAP = "leadership-movements-global-cap"; public static final long EXECUTION_HISTORY_SCANNER_PERIOD_SECONDS = 5; public static final long EXECUTION_HISTORY_SCANNER_INITIAL_DELAY_SECONDS = 0; - public static final int ADDITIVE_INCREASE_PARAM = 1; - public static final int MULTIPLICATIVE_DECREASE_PARAM = 2; + static final Map ADDITIVE_INCREASE = new HashMap<>(ConcurrencyType.cachedValues().size()); + static final Map MULTIPLICATIVE_DECREASE = new HashMap<>(ConcurrencyType.cachedValues().size()); + static final Map MAX_CONCURRENCY = new HashMap<>(ConcurrencyType.cachedValues().size()); + static final Map MIN_CONCURRENCY = new HashMap<>(ConcurrencyType.cachedValues().size()); static final Map CONCURRENCY_ADJUSTER_LIMIT_BY_METRIC_NAME = new HashMap<>(5); public static final long EXECUTION_TASK_FUTURE_ERROR_VERIFICATION_TIMEOUT_MS = 10000L; static long LIST_PARTITION_REASSIGNMENTS_TIMEOUT_MS; @@ -91,6 +93,22 @@ static void init(KafkaCruiseControlConfig config) { config.getDouble(ExecutorConfig.CONCURRENCY_ADJUSTER_LIMIT_CONSUMER_FETCH_LOCAL_TIME_MS_CONFIG)); CONCURRENCY_ADJUSTER_LIMIT_BY_METRIC_NAME.put(BROKER_REQUEST_QUEUE_SIZE.name(), config.getDouble(ExecutorConfig.CONCURRENCY_ADJUSTER_LIMIT_REQUEST_QUEUE_SIZE_CONFIG)); + ADDITIVE_INCREASE.put(ConcurrencyType.INTER_BROKER_REPLICA, + config.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_ADDITIVE_INCREASE_INTER_BROKER_REPLICA_CONFIG)); + ADDITIVE_INCREASE.put(ConcurrencyType.LEADERSHIP, + config.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_ADDITIVE_INCREASE_LEADERSHIP_CONFIG)); + MULTIPLICATIVE_DECREASE.put(ConcurrencyType.INTER_BROKER_REPLICA, + config.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MULTIPLICATIVE_DECREASE_INTER_BROKER_REPLICA_CONFIG)); + MULTIPLICATIVE_DECREASE.put(ConcurrencyType.LEADERSHIP, + config.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MULTIPLICATIVE_DECREASE_LEADERSHIP_CONFIG)); + MAX_CONCURRENCY.put(ConcurrencyType.INTER_BROKER_REPLICA, + config.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MAX_PARTITION_MOVEMENTS_PER_BROKER_CONFIG)); + MAX_CONCURRENCY.put(ConcurrencyType.LEADERSHIP, + config.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MAX_LEADERSHIP_MOVEMENTS_CONFIG)); + MIN_CONCURRENCY.put(ConcurrencyType.INTER_BROKER_REPLICA, + config.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MIN_PARTITION_MOVEMENTS_PER_BROKER_CONFIG)); + MIN_CONCURRENCY.put(ConcurrencyType.LEADERSHIP, + config.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MIN_LEADERSHIP_MOVEMENTS_CONFIG)); LIST_PARTITION_REASSIGNMENTS_TIMEOUT_MS = config.getLong(ExecutorConfig.LIST_PARTITION_REASSIGNMENTS_TIMEOUT_MS_CONFIG); LIST_PARTITION_REASSIGNMENTS_MAX_ATTEMPTS = config.getInt(ExecutorConfig.LIST_PARTITION_REASSIGNMENTS_MAX_ATTEMPTS_CONFIG); } @@ -153,31 +171,33 @@ static boolean withinConcurrencyAdjusterLimit(Map currentMetricsByBroker, - int currentInterBrokerPartitionMovementConcurrency, - int maxPartitionMovementsPerBroker) { - boolean withinAdjusterLimit = ExecutionUtils.withinConcurrencyAdjusterLimit(currentMetricsByBroker); + int currentMovementConcurrency, + ConcurrencyType concurrencyType) { + boolean withinAdjusterLimit = withinConcurrencyAdjusterLimit(currentMetricsByBroker); Integer recommendedConcurrency = null; if (withinAdjusterLimit) { - // Additive-increase inter-broker replica reassignment concurrency (MAX: maxPartitionMovementsPerBroker). - if (currentInterBrokerPartitionMovementConcurrency < maxPartitionMovementsPerBroker) { - recommendedConcurrency = currentInterBrokerPartitionMovementConcurrency + ADDITIVE_INCREASE_PARAM; - LOG.info("Concurrency adjuster recommended an increase in inter-broker partition movement concurrency to {}", recommendedConcurrency); + int maxMovementsConcurrency = MAX_CONCURRENCY.get(concurrencyType); + // Additive-increase reassignment concurrency (MAX: maxMovementsConcurrency). + if (currentMovementConcurrency < maxMovementsConcurrency) { + recommendedConcurrency = Math.min(maxMovementsConcurrency, currentMovementConcurrency + ADDITIVE_INCREASE.get(concurrencyType)); + LOG.info("Concurrency adjuster recommended an increase in {} movement concurrency to {}", concurrencyType, recommendedConcurrency); } } else { - // Multiplicative-decrease inter-broker replica reassignment concurrency (MIN: 1). - if (currentInterBrokerPartitionMovementConcurrency > 1) { - recommendedConcurrency = Math.max(1, currentInterBrokerPartitionMovementConcurrency / MULTIPLICATIVE_DECREASE_PARAM); - LOG.info("Concurrency adjuster recommended a decrease in inter-broker partition movement concurrency to {}", recommendedConcurrency); + int minMovementsConcurrency = MIN_CONCURRENCY.get(concurrencyType); + // Multiplicative-decrease reassignment concurrency (MIN: minMovementsConcurrency). + if (currentMovementConcurrency > minMovementsConcurrency) { + recommendedConcurrency = Math.max(minMovementsConcurrency, currentMovementConcurrency / MULTIPLICATIVE_DECREASE.get(concurrencyType)); + LOG.info("Concurrency adjuster recommended a decrease in {} movement concurrency to {}", concurrencyType, recommendedConcurrency); } } return recommendedConcurrency; @@ -385,12 +405,12 @@ public static void deleteZNodesToForceStopLeadershipMoves(KafkaZkClient kafkaZkC /** * For an inter-broker replica movement action, the completion depends on the task state: *
      - *
    • {@link ExecutionTask.State#IN_PROGRESS}: when the current replica list is the same as the new replica list + *
    • {@link ExecutionTaskState#IN_PROGRESS}: when the current replica list is the same as the new replica list * and all replicas are in-sync.
    • - *
    • {@link ExecutionTask.State#ABORTING}: done when the current replica list is the same as the old replica list. + *
    • {@link ExecutionTaskState#ABORTING}: done when the current replica list is the same as the old replica list. * Due to race condition, we also consider it done if the current replica list is the same as the new replica list * and all replicas are in-sync.
    • - *
    • {@link ExecutionTask.State#DEAD}: always considered as done because we neither move forward or rollback.
    • + *
    • {@link ExecutionTaskState#DEAD}: always considered as done because we neither move forward or rollback.
    • *
    * * There should be no other task state seen here. @@ -433,10 +453,10 @@ static boolean isIntraBrokerReplicaActionDone(Map - *
  • {@link ExecutionTask.State#IN_PROGRESS}: Done when either (1) the proposed leader becomes the leader, (2) the + *
  • {@link ExecutionTaskState#IN_PROGRESS}: Done when either (1) the proposed leader becomes the leader, (2) the * partition has no leader in the cluster (e.g. deleted or offline), or (3) the partition has another leader and the * proposed leader is out of ISR.
  • - *
  • {@link ExecutionTask.State#ABORTING} or {@link ExecutionTask.State#DEAD}: Always considered as done. The + *
  • {@link ExecutionTaskState#ABORTING} or {@link ExecutionTaskState#DEAD}: Always considered as done. The * destination cannot become leader anymore.
  • * * diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java index f27ea603ad5ed..3100202220ff9 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/Executor.java @@ -105,7 +105,7 @@ public class Executor { private final AtomicInteger _numExecutionStartedInKafkaAssignerMode; private final AtomicInteger _numExecutionStartedInNonKafkaAssignerMode; private volatile boolean _isKafkaAssignerMode; - private volatile boolean _skipAutoRefreshingConcurrency; + private volatile boolean _skipInterBrokerReplicaConcurrencyAdjustment; // TODO: Execution history is currently kept in memory, but ideally we should move it to a persistent store. private final long _demotionHistoryRetentionTimeMs; private final long _removalHistoryRetentionTimeMs; @@ -116,7 +116,7 @@ public class Executor { private final AnomalyDetectorManager _anomalyDetectorManager; private final ConcurrencyAdjuster _concurrencyAdjuster; private final ScheduledExecutorService _concurrencyAdjusterExecutor; - private volatile boolean _concurrencyAdjusterEnabled; + private final ConcurrentMap _concurrencyAdjusterEnabled; private final long _minExecutionProgressCheckIntervalMs; public final long _slowTaskAlertingBackoffTimeMs; private final KafkaCruiseControlConfig _config; @@ -153,7 +153,7 @@ public Executor(KafkaCruiseControlConfig config, _numExecutionStartedInKafkaAssignerMode = new AtomicInteger(0); _numExecutionStartedInNonKafkaAssignerMode = new AtomicInteger(0); _isKafkaAssignerMode = false; - _skipAutoRefreshingConcurrency = false; + _skipInterBrokerReplicaConcurrencyAdjustment = false; ExecutionUtils.init(config); _config = config; @@ -198,7 +198,14 @@ public Executor(KafkaCruiseControlConfig config, _removalHistoryRetentionTimeMs = config.getLong(ExecutorConfig.REMOVAL_HISTORY_RETENTION_TIME_MS_CONFIG); _minExecutionProgressCheckIntervalMs = config.getLong(ExecutorConfig.MIN_EXECUTION_PROGRESS_CHECK_INTERVAL_MS_CONFIG); _slowTaskAlertingBackoffTimeMs = config.getLong(ExecutorConfig.SLOW_TASK_ALERTING_BACKOFF_TIME_MS_CONFIG); - _concurrencyAdjusterEnabled = config.getBoolean(ExecutorConfig.CONCURRENCY_ADJUSTER_ENABLED_CONFIG); + _concurrencyAdjusterEnabled = new ConcurrentHashMap<>(ConcurrencyType.cachedValues().size()); + boolean allEnabled = config.getBoolean(ExecutorConfig.CONCURRENCY_ADJUSTER_ENABLED_CONFIG); + _concurrencyAdjusterEnabled.put(ConcurrencyType.INTER_BROKER_REPLICA, + allEnabled || config.getBoolean(ExecutorConfig.CONCURRENCY_ADJUSTER_INTER_BROKER_REPLICA_ENABLED_CONFIG)); + _concurrencyAdjusterEnabled.put(ConcurrencyType.LEADERSHIP, + allEnabled || config.getBoolean(ExecutorConfig.CONCURRENCY_ADJUSTER_LEADERSHIP_ENABLED_CONFIG)); + // Support for intra-broker replica movement is pending https://github.com/linkedin/cruise-control/issues/1299. + _concurrencyAdjusterEnabled.put(ConcurrencyType.INTRA_BROKER_REPLICA, false); _concurrencyAdjusterExecutor = Executors.newSingleThreadScheduledExecutor( new KafkaCruiseControlThreadFactory(ConcurrencyAdjuster.class.getSimpleName(), true, null)); long intervalMs = config.getLong(ExecutorConfig.CONCURRENCY_ADJUSTER_INTERVAL_MS_CONFIG); @@ -299,54 +306,78 @@ public void run() { } /** - * A runnable class to auto-adjust the allowed inter-broker partition reassignment concurrency for ongoing executions + * A runnable class to auto-adjust the allowed reassignment concurrency for ongoing executions * using selected broker metrics and based on additive-increase/multiplicative-decrease (AIMD) feedback control algorithm. * Skips concurrency adjustment for demote operations. */ private class ConcurrencyAdjuster implements Runnable { - private final int _maxPartitionMovementsPerBroker; private LoadMonitor _loadMonitor; public ConcurrencyAdjuster() { - _maxPartitionMovementsPerBroker = _config.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MAX_PARTITION_MOVEMENTS_PER_BROKER_CONFIG); _loadMonitor = null; } /** - * Initialize the inter-broker partition reassignment concurrency adjustment with the load monitor and the initially - * requested inter-broker partition reassignment concurrency. + * Initialize the reassignment concurrency adjustment with the load monitor and the initially requested reassignment concurrency. * * @param loadMonitor Load monitor. * @param requestedInterBrokerPartitionMovementConcurrency The maximum number of concurrent inter-broker partition movements * per broker(if null, use num.concurrent.partition.movements.per.broker). + * @param requestedLeadershipMovementConcurrency The maximum number of concurrent leader movements + * (if null, use num.concurrent.leader.movements). */ - public synchronized void initAdjustment(LoadMonitor loadMonitor, Integer requestedInterBrokerPartitionMovementConcurrency) { + public synchronized void initAdjustment(LoadMonitor loadMonitor, + Integer requestedInterBrokerPartitionMovementConcurrency, + Integer requestedLeadershipMovementConcurrency) { _loadMonitor = loadMonitor; setRequestedInterBrokerPartitionMovementConcurrency(requestedInterBrokerPartitionMovementConcurrency); + setRequestedLeadershipMovementConcurrency(requestedLeadershipMovementConcurrency); } - private boolean canRefreshConcurrency() { - return _concurrencyAdjusterEnabled && _executorState.state() == ExecutorState.State.INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS - && !_skipAutoRefreshingConcurrency && _loadMonitor != null; + private boolean canRefreshConcurrency(ConcurrencyType concurrencyType) { + if (!_concurrencyAdjusterEnabled.get(concurrencyType) || _loadMonitor == null) { + return false; + } + switch (concurrencyType) { + case LEADERSHIP: + return _executorState.state() == LEADER_MOVEMENT_TASK_IN_PROGRESS; + case INTER_BROKER_REPLICA: + return _executorState.state() == ExecutorState.State.INTER_BROKER_REPLICA_MOVEMENT_TASK_IN_PROGRESS + && !_skipInterBrokerReplicaConcurrencyAdjustment; + default: + throw new IllegalArgumentException("Unsupported concurrency type " + concurrencyType + " is provided."); + } } - private synchronized void refreshConcurrency() { - if (canRefreshConcurrency()) { + private synchronized void refreshInterBrokerReplicaConcurrency() { + if (canRefreshConcurrency(ConcurrencyType.INTER_BROKER_REPLICA)) { Integer recommendedConcurrency = ExecutionUtils.recommendedConcurrency(_loadMonitor.currentBrokerMetricValues(), _executionTaskManager.interBrokerPartitionMovementConcurrency(), - _maxPartitionMovementsPerBroker); + ConcurrencyType.INTER_BROKER_REPLICA); if (recommendedConcurrency != null) { setRequestedInterBrokerPartitionMovementConcurrency(recommendedConcurrency); } } } + private synchronized void refreshLeadershipConcurrency() { + if (canRefreshConcurrency(ConcurrencyType.LEADERSHIP)) { + Integer recommendedConcurrency = ExecutionUtils.recommendedConcurrency(_loadMonitor.currentBrokerMetricValues(), + _executionTaskManager.leadershipMovementConcurrency(), + ConcurrencyType.LEADERSHIP); + if (recommendedConcurrency != null) { + setRequestedLeadershipMovementConcurrency(recommendedConcurrency); + } + } + } + @Override public void run() { try { - refreshConcurrency(); + refreshInterBrokerReplicaConcurrency(); + refreshLeadershipConcurrency(); } catch (Throwable t) { - LOG.warn("Received exception when trying to adjust inter-broker replica reassignment concurrency.", t); + LOG.warn("Received exception when trying to adjust reassignment concurrency.", t); } } } @@ -428,22 +459,18 @@ public ExecutorState state() { * Enable or disable concurrency adjuster for the given concurrency type in the executor. * *
      - *
    • TODO: Support for concurrency adjusters of {@link ConcurrencyType#LEADERSHIP leadership} and - * {@link ConcurrencyType#INTRA_BROKER_REPLICA intra-broker replica} movements are pending - * #1298 and - * #1299.
    • + *
    • TODO: Support for concurrency adjuster of {@link ConcurrencyType#INTRA_BROKER_REPLICA intra-broker replica movement} + * is pending #1299.
    • *
    * @param concurrencyType Type of concurrency for which to enable or disable concurrency adjuster. * @param isConcurrencyAdjusterEnabled {@code true} if concurrency adjuster is enabled for the given type, {@code false} otherwise. * @return {@code true} if concurrency adjuster was enabled before for the given concurrency type, {@code false} otherwise. */ - public boolean setConcurrencyAdjusterFor(ConcurrencyType concurrencyType, boolean isConcurrencyAdjusterEnabled) { - if (concurrencyType != ConcurrencyType.INTER_BROKER_REPLICA) { + public Boolean setConcurrencyAdjusterFor(ConcurrencyType concurrencyType, boolean isConcurrencyAdjusterEnabled) { + if (concurrencyType != ConcurrencyType.INTER_BROKER_REPLICA && concurrencyType != ConcurrencyType.LEADERSHIP) { throw new IllegalArgumentException(String.format("Concurrency adjuster for %s is not yet supported.", concurrencyType)); } - boolean oldValue = _concurrencyAdjusterEnabled; - _concurrencyAdjusterEnabled = isConcurrencyAdjusterEnabled; - return oldValue; + return _concurrencyAdjusterEnabled.put(concurrencyType, isConcurrencyAdjusterEnabled); } /** @@ -467,8 +494,8 @@ public boolean setConcurrencyAdjusterFor(ConcurrencyType concurrencyType, boolea * @param isTriggeredByUserRequest Whether the execution is triggered by a user request. * @param uuid UUID of the execution. * @param isKafkaAssignerMode {@code true} if kafka assigner mode, {@code false} otherwise. - * @param skipAutoRefreshingConcurrency {@code true} to skip auto refreshing concurrency even if the concurrency adjuster - * is enabled, {@code false} otherwise. + * @param skipInterBrokerReplicaConcurrencyAdjustment {@code true} to skip auto adjusting concurrency of inter-broker + * replica movements even if the concurrency adjuster is enabled, {@code false} otherwise. */ public synchronized void executeProposals(Collection proposals, Set unthrottledBrokers, @@ -483,10 +510,10 @@ public synchronized void executeProposals(Collection proposal boolean isTriggeredByUserRequest, String uuid, boolean isKafkaAssignerMode, - boolean skipAutoRefreshingConcurrency) throws OngoingExecutionException { + boolean skipInterBrokerReplicaConcurrencyAdjustment) throws OngoingExecutionException { setExecutionMode(isKafkaAssignerMode); sanityCheckExecuteProposals(loadMonitor, uuid); - _skipAutoRefreshingConcurrency = skipAutoRefreshingConcurrency; + _skipInterBrokerReplicaConcurrencyAdjustment = skipInterBrokerReplicaConcurrencyAdjustment; try { initProposalExecution(proposals, unthrottledBrokers, requestedInterBrokerPartitionMovementConcurrency, requestedIntraBrokerPartitionMovementConcurrency, requestedLeadershipMovementConcurrency, @@ -529,9 +556,8 @@ private synchronized void initProposalExecution(Collection pr _executionTaskManager.setExecutionModeForTaskTracker(_isKafkaAssignerMode); _executionTaskManager.addExecutionProposals(proposals, brokersToSkipConcurrencyCheck, _metadataClient.refreshMetadata().cluster(), replicaMovementStrategy); - _concurrencyAdjuster.initAdjustment(loadMonitor, requestedInterBrokerPartitionMovementConcurrency); + _concurrencyAdjuster.initAdjustment(loadMonitor, requestedInterBrokerPartitionMovementConcurrency, requestedLeadershipMovementConcurrency); setRequestedIntraBrokerPartitionMovementConcurrency(requestedIntraBrokerPartitionMovementConcurrency); - setRequestedLeadershipMovementConcurrency(requestedLeadershipMovementConcurrency); setRequestedExecutionProgressCheckIntervalMs(requestedExecutionProgressCheckIntervalMs); } @@ -564,7 +590,7 @@ public synchronized void executeDemoteProposals(Collection pr String uuid) throws OngoingExecutionException { setExecutionMode(false); sanityCheckExecuteProposals(loadMonitor, uuid); - _skipAutoRefreshingConcurrency = true; + _skipInterBrokerReplicaConcurrencyAdjustment = true; try { initProposalExecution(proposals, demotedBrokers, concurrentSwaps, 0, requestedLeadershipMovementConcurrency, requestedExecutionProgressCheckIntervalMs, replicaMovementStrategy, isTriggeredByUserRequest, loadMonitor); diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/config/ConcurrencyConfigTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/config/ConcurrencyConfigTest.java new file mode 100644 index 0000000000000..4b56d271fa9c2 --- /dev/null +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/config/ConcurrencyConfigTest.java @@ -0,0 +1,292 @@ +/* + * Copyright 2020 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + */ + +package com.linkedin.kafka.cruisecontrol.config; + +import com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig; +import org.apache.kafka.common.config.ConfigException; +import org.easymock.EasyMock; +import org.junit.Test; + +import static org.junit.Assert.assertThrows; + + +public class ConcurrencyConfigTest { + private static final String GET_INT_METHOD = "getInt"; + private static final String GET_LONG_METHOD = "getLong"; + + @Test + public void testNumConcurrentPartitionMovementsPerBrokerNotSmallerThanMaxNumClusterMovements() { + KafkaCruiseControlConfig config = EasyMock.partialMockBuilder(KafkaCruiseControlConfig.class) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .createNiceMock(); + EasyMock.expect(config.getInt(ExecutorConfig.MAX_NUM_CLUSTER_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_MAX_NUM_CLUSTER_MOVEMENTS_CONFIG); + EasyMock.expect(config.getInt(ExecutorConfig.NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_MAX_NUM_CLUSTER_MOVEMENTS_CONFIG); + + EasyMock.replay(config); + assertThrows(ConfigException.class, config::sanityCheckConcurrency); + EasyMock.verify(config); + } + + @Test + public void testNumConcurrentIntraBrokerPartitionMovementsNotSmallerThanMaxNumClusterMovements() { + KafkaCruiseControlConfig config = EasyMock.partialMockBuilder(KafkaCruiseControlConfig.class) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .createNiceMock(); + EasyMock.expect(config.getInt(ExecutorConfig.MAX_NUM_CLUSTER_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_MAX_NUM_CLUSTER_MOVEMENTS_CONFIG); + EasyMock.expect(config.getInt(ExecutorConfig.NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER); + EasyMock.expect(config.getInt(ExecutorConfig.NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_MAX_NUM_CLUSTER_MOVEMENTS_CONFIG); + + EasyMock.replay(config); + assertThrows(ConfigException.class, config::sanityCheckConcurrency); + EasyMock.verify(config); + } + + @Test + public void testNumConcurrentLeaderMovementsGreaterThanMaxNumClusterMovements() { + KafkaCruiseControlConfig config = EasyMock.partialMockBuilder(KafkaCruiseControlConfig.class) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .createNiceMock(); + EasyMock.expect(config.getInt(ExecutorConfig.MAX_NUM_CLUSTER_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_MAX_NUM_CLUSTER_MOVEMENTS_CONFIG); + EasyMock.expect(config.getInt(ExecutorConfig.NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER); + EasyMock.expect(config.getInt(ExecutorConfig.NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS); + EasyMock.expect(config.getInt(ExecutorConfig.NUM_CONCURRENT_LEADER_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_MAX_NUM_CLUSTER_MOVEMENTS_CONFIG + 1); + + EasyMock.replay(config); + assertThrows(ConfigException.class, config::sanityCheckConcurrency); + EasyMock.verify(config); + } + + @Test + public void testNumConcurrentPartitionMovementsPerBrokerNotSmallerThanConcurrencyAdjusterMaxPartitionMovementsPerBroker() { + KafkaCruiseControlConfig config = EasyMock.partialMockBuilder(KafkaCruiseControlConfig.class) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .createNiceMock(); + EasyMock.expect(config.getInt(ExecutorConfig.MAX_NUM_CLUSTER_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_MAX_NUM_CLUSTER_MOVEMENTS_CONFIG); + EasyMock.expect(config.getInt(ExecutorConfig.NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER); + EasyMock.expect(config.getInt(ExecutorConfig.NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS); + EasyMock.expect(config.getInt(ExecutorConfig.NUM_CONCURRENT_LEADER_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_NUM_CONCURRENT_LEADER_MOVEMENTS); + EasyMock.expect(config.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MAX_PARTITION_MOVEMENTS_PER_BROKER_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER); + + EasyMock.replay(config); + assertThrows(ConfigException.class, config::sanityCheckConcurrency); + EasyMock.verify(config); + } + + @Test + public void testConcurrencyAdjusterMaxPartitionMovementsPerBrokerGreaterThanMaxNumClusterMovements() { + KafkaCruiseControlConfig config = EasyMock.partialMockBuilder(KafkaCruiseControlConfig.class) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .createNiceMock(); + EasyMock.expect(config.getInt(ExecutorConfig.MAX_NUM_CLUSTER_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_MAX_NUM_CLUSTER_MOVEMENTS_CONFIG); + EasyMock.expect(config.getInt(ExecutorConfig.NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER); + EasyMock.expect(config.getInt(ExecutorConfig.NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS); + EasyMock.expect(config.getInt(ExecutorConfig.NUM_CONCURRENT_LEADER_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_NUM_CONCURRENT_LEADER_MOVEMENTS); + EasyMock.expect(config.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MAX_PARTITION_MOVEMENTS_PER_BROKER_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_MAX_NUM_CLUSTER_MOVEMENTS_CONFIG + 1); + + EasyMock.replay(config); + assertThrows(ConfigException.class, config::sanityCheckConcurrency); + EasyMock.verify(config); + } + + @Test + public void testConcurrencyAdjusterMinPartitionMovementsPerBrokerGreaterThanNumConcurrentPartitionMovementsPerBroker() { + KafkaCruiseControlConfig config = EasyMock.partialMockBuilder(KafkaCruiseControlConfig.class) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .createNiceMock(); + EasyMock.expect(config.getInt(ExecutorConfig.MAX_NUM_CLUSTER_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_MAX_NUM_CLUSTER_MOVEMENTS_CONFIG); + EasyMock.expect(config.getInt(ExecutorConfig.NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER); + EasyMock.expect(config.getInt(ExecutorConfig.NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS); + EasyMock.expect(config.getInt(ExecutorConfig.NUM_CONCURRENT_LEADER_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_NUM_CONCURRENT_LEADER_MOVEMENTS); + EasyMock.expect(config.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MAX_PARTITION_MOVEMENTS_PER_BROKER_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_CONCURRENCY_ADJUSTER_MAX_PARTITION_MOVEMENTS_PER_BROKER); + EasyMock.expect(config.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MIN_PARTITION_MOVEMENTS_PER_BROKER_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER + 1); + + EasyMock.replay(config); + assertThrows(ConfigException.class, config::sanityCheckConcurrency); + EasyMock.verify(config); + } + + @Test + public void testConcurrencyAdjusterMinLeadershipMovementsGreaterThanNumConcurrentLeaderMovements() { + KafkaCruiseControlConfig config = EasyMock.partialMockBuilder(KafkaCruiseControlConfig.class) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .createNiceMock(); + EasyMock.expect(config.getInt(ExecutorConfig.MAX_NUM_CLUSTER_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_MAX_NUM_CLUSTER_MOVEMENTS_CONFIG); + EasyMock.expect(config.getInt(ExecutorConfig.NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER); + EasyMock.expect(config.getInt(ExecutorConfig.NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS); + EasyMock.expect(config.getInt(ExecutorConfig.NUM_CONCURRENT_LEADER_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_NUM_CONCURRENT_LEADER_MOVEMENTS); + EasyMock.expect(config.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MAX_PARTITION_MOVEMENTS_PER_BROKER_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_CONCURRENCY_ADJUSTER_MAX_PARTITION_MOVEMENTS_PER_BROKER); + EasyMock.expect(config.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MIN_PARTITION_MOVEMENTS_PER_BROKER_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_CONCURRENCY_ADJUSTER_MIN_PARTITION_MOVEMENTS_PER_BROKER); + EasyMock.expect(config.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MIN_LEADERSHIP_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_NUM_CONCURRENT_LEADER_MOVEMENTS + 1); + + EasyMock.replay(config); + assertThrows(ConfigException.class, config::sanityCheckConcurrency); + EasyMock.verify(config); + } + + @Test + public void testConcurrencyAdjusterMaxLeadershipMovementsSmallerThanNumConcurrentLeaderMovements() { + KafkaCruiseControlConfig config = EasyMock.partialMockBuilder(KafkaCruiseControlConfig.class) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .createNiceMock(); + EasyMock.expect(config.getInt(ExecutorConfig.MAX_NUM_CLUSTER_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_MAX_NUM_CLUSTER_MOVEMENTS_CONFIG); + EasyMock.expect(config.getInt(ExecutorConfig.NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER); + EasyMock.expect(config.getInt(ExecutorConfig.NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS); + EasyMock.expect(config.getInt(ExecutorConfig.NUM_CONCURRENT_LEADER_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_NUM_CONCURRENT_LEADER_MOVEMENTS); + EasyMock.expect(config.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MAX_PARTITION_MOVEMENTS_PER_BROKER_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_CONCURRENCY_ADJUSTER_MAX_PARTITION_MOVEMENTS_PER_BROKER); + EasyMock.expect(config.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MIN_PARTITION_MOVEMENTS_PER_BROKER_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_CONCURRENCY_ADJUSTER_MIN_PARTITION_MOVEMENTS_PER_BROKER); + EasyMock.expect(config.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MIN_LEADERSHIP_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_CONCURRENCY_ADJUSTER_MIN_LEADERSHIP_MOVEMENTS); + EasyMock.expect(config.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MAX_LEADERSHIP_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_NUM_CONCURRENT_LEADER_MOVEMENTS - 1); + + EasyMock.replay(config); + assertThrows(ConfigException.class, config::sanityCheckConcurrency); + EasyMock.verify(config); + } + + @Test + public void testConcurrencyAdjusterMaxLeadershipMovementsGreaterThanMaxNumClusterMovements() { + KafkaCruiseControlConfig config = EasyMock.partialMockBuilder(KafkaCruiseControlConfig.class) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .createNiceMock(); + EasyMock.expect(config.getInt(ExecutorConfig.MAX_NUM_CLUSTER_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_MAX_NUM_CLUSTER_MOVEMENTS_CONFIG); + EasyMock.expect(config.getInt(ExecutorConfig.NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER); + EasyMock.expect(config.getInt(ExecutorConfig.NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS); + EasyMock.expect(config.getInt(ExecutorConfig.NUM_CONCURRENT_LEADER_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_NUM_CONCURRENT_LEADER_MOVEMENTS); + EasyMock.expect(config.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MAX_PARTITION_MOVEMENTS_PER_BROKER_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_CONCURRENCY_ADJUSTER_MAX_PARTITION_MOVEMENTS_PER_BROKER); + EasyMock.expect(config.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MIN_PARTITION_MOVEMENTS_PER_BROKER_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_CONCURRENCY_ADJUSTER_MIN_PARTITION_MOVEMENTS_PER_BROKER); + EasyMock.expect(config.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MIN_LEADERSHIP_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_CONCURRENCY_ADJUSTER_MIN_LEADERSHIP_MOVEMENTS); + EasyMock.expect(config.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MAX_LEADERSHIP_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_MAX_NUM_CLUSTER_MOVEMENTS_CONFIG + 1); + + EasyMock.replay(config); + assertThrows(ConfigException.class, config::sanityCheckConcurrency); + EasyMock.verify(config); + } + + @Test + public void testMinExecutionProgressCheckIntervalMsGreaterThanExecutionProgressCheckIntervalMs() { + KafkaCruiseControlConfig config = EasyMock.partialMockBuilder(KafkaCruiseControlConfig.class) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_INT_METHOD) + .addMockedMethod(GET_LONG_METHOD) + .addMockedMethod(GET_LONG_METHOD) + .createNiceMock(); + EasyMock.expect(config.getInt(ExecutorConfig.MAX_NUM_CLUSTER_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_MAX_NUM_CLUSTER_MOVEMENTS_CONFIG); + EasyMock.expect(config.getInt(ExecutorConfig.NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_NUM_CONCURRENT_PARTITION_MOVEMENTS_PER_BROKER); + EasyMock.expect(config.getInt(ExecutorConfig.NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_NUM_CONCURRENT_INTRA_BROKER_PARTITION_MOVEMENTS); + EasyMock.expect(config.getInt(ExecutorConfig.NUM_CONCURRENT_LEADER_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_NUM_CONCURRENT_LEADER_MOVEMENTS); + EasyMock.expect(config.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MAX_PARTITION_MOVEMENTS_PER_BROKER_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_CONCURRENCY_ADJUSTER_MAX_PARTITION_MOVEMENTS_PER_BROKER); + EasyMock.expect(config.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MIN_PARTITION_MOVEMENTS_PER_BROKER_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_CONCURRENCY_ADJUSTER_MIN_PARTITION_MOVEMENTS_PER_BROKER); + EasyMock.expect(config.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MIN_LEADERSHIP_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_CONCURRENCY_ADJUSTER_MIN_LEADERSHIP_MOVEMENTS); + EasyMock.expect(config.getInt(ExecutorConfig.CONCURRENCY_ADJUSTER_MAX_LEADERSHIP_MOVEMENTS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_CONCURRENCY_ADJUSTER_MAX_LEADERSHIP_MOVEMENTS); + EasyMock.expect(config.getLong(ExecutorConfig.MIN_EXECUTION_PROGRESS_CHECK_INTERVAL_MS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_EXECUTION_PROGRESS_CHECK_INTERVAL_MS + 1); + EasyMock.expect(config.getLong(ExecutorConfig.EXECUTION_PROGRESS_CHECK_INTERVAL_MS_CONFIG)) + .andReturn(ExecutorConfig.DEFAULT_EXECUTION_PROGRESS_CHECK_INTERVAL_MS); + + EasyMock.replay(config); + assertThrows(ConfigException.class, config::sanityCheckConcurrency); + EasyMock.verify(config); + } +} diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ConcurrencyAdjusterTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ConcurrencyAdjusterTest.java index bd3e118196492..0547e6a0bf31d 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ConcurrencyAdjusterTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ConcurrencyAdjusterTest.java @@ -34,7 +34,14 @@ public class ConcurrencyAdjusterTest { private static final double MOCK_COMMON_CONCURRENCY_ADJUSTER_LIMIT = 100.0; private static final int NUM_BROKERS = 4; private static final Random RANDOM = new Random(0xDEADBEEF); - private static final int MAX_PARTITION_MOVEMENTS_PER_BROKER = 12; + private static final int MOCK_ADDITIVE_INCREASE_INTER_BROKER_REPLICA = 2; + private static final int MOCK_ADDITIVE_INCREASE_LEADERSHIP = 50; + private static final int MOCK_MD_INTER_BROKER_REPLICA = 2; + private static final int MOCK_MD_LEADERSHIP = 3; + private static final int MOCK_MAX_PARTITION_MOVEMENTS_PER_BROKER = 12; + private static final int MOCK_MAX_LEADERSHIP_MOVEMENTS = 1000; + private static final int MOCK_MIN_PARTITION_MOVEMENTS_PER_BROKER = 1; + private static final int MOCK_MIN_LEADERSHIP_MOVEMENTS_CONFIG = 50; /** * Setup the test. @@ -87,35 +94,95 @@ public static Map createCurrentMetrics(Li @Test public void testRecommendedConcurrency() { - // Verify a recommended increase in concurrency. + // 1. Verify a recommended increase in concurrency for different concurrency types. List> metricValueByIdPerBroker = new ArrayList<>(NUM_BROKERS); for (int i = 0; i < NUM_BROKERS; i++) { metricValueByIdPerBroker.add(populateMetricValues(0)); } Map currentMetrics = createCurrentMetrics(metricValueByIdPerBroker); + + // 1.1. Inter-broker replica reassignment (non-capped) + int currentMovementConcurrency = MOCK_MAX_PARTITION_MOVEMENTS_PER_BROKER - MOCK_ADDITIVE_INCREASE_INTER_BROKER_REPLICA - 1; Integer recommendedConcurrency = ExecutionUtils.recommendedConcurrency(currentMetrics, - MAX_PARTITION_MOVEMENTS_PER_BROKER - 3, - MAX_PARTITION_MOVEMENTS_PER_BROKER); - assertEquals(MAX_PARTITION_MOVEMENTS_PER_BROKER - 2, recommendedConcurrency.intValue()); + currentMovementConcurrency, + ConcurrencyType.INTER_BROKER_REPLICA); + assertEquals(currentMovementConcurrency + MOCK_ADDITIVE_INCREASE_INTER_BROKER_REPLICA, recommendedConcurrency.intValue()); + + // 1.2. Leadership reassignment (non-capped) + currentMovementConcurrency = MOCK_MAX_LEADERSHIP_MOVEMENTS - MOCK_ADDITIVE_INCREASE_LEADERSHIP - 1; + recommendedConcurrency = ExecutionUtils.recommendedConcurrency(currentMetrics, + currentMovementConcurrency, + ConcurrencyType.LEADERSHIP); + assertEquals(currentMovementConcurrency + MOCK_ADDITIVE_INCREASE_LEADERSHIP, recommendedConcurrency.intValue()); + + // 1.3. Inter-broker replica reassignment (capped) + currentMovementConcurrency = MOCK_MAX_PARTITION_MOVEMENTS_PER_BROKER - MOCK_ADDITIVE_INCREASE_INTER_BROKER_REPLICA + 1; + recommendedConcurrency = ExecutionUtils.recommendedConcurrency(currentMetrics, + currentMovementConcurrency, + ConcurrencyType.INTER_BROKER_REPLICA); + assertEquals(MOCK_MAX_PARTITION_MOVEMENTS_PER_BROKER, recommendedConcurrency.intValue()); + + // 1.4. Leadership reassignment (capped) + currentMovementConcurrency = MOCK_MAX_LEADERSHIP_MOVEMENTS - MOCK_ADDITIVE_INCREASE_LEADERSHIP + 1; + recommendedConcurrency = ExecutionUtils.recommendedConcurrency(currentMetrics, + currentMovementConcurrency, + ConcurrencyType.LEADERSHIP); + assertEquals(MOCK_MAX_LEADERSHIP_MOVEMENTS, recommendedConcurrency.intValue()); + + // 2. Verify no change in concurrency due to hitting max limit for different concurrency types. + // 2.1. Inter-broker replica reassignment + recommendedConcurrency = ExecutionUtils.recommendedConcurrency(currentMetrics, + MOCK_MAX_PARTITION_MOVEMENTS_PER_BROKER, + ConcurrencyType.INTER_BROKER_REPLICA); + assertNull(recommendedConcurrency); - // Verify no change in concurrency due to hitting max limit. + // 2.2. Leadership reassignment recommendedConcurrency = ExecutionUtils.recommendedConcurrency(currentMetrics, - MAX_PARTITION_MOVEMENTS_PER_BROKER, - MAX_PARTITION_MOVEMENTS_PER_BROKER); + MOCK_MAX_LEADERSHIP_MOVEMENTS, + ConcurrencyType.LEADERSHIP); assertNull(recommendedConcurrency); - // Verify a recommended decrease in concurrency. + // 3. Verify a recommended decrease in concurrency for different concurrency types. metricValueByIdPerBroker.add(populateMetricValues(1)); currentMetrics = createCurrentMetrics(metricValueByIdPerBroker); + // 3.1. Inter-broker replica reassignment (non-capped) recommendedConcurrency = ExecutionUtils.recommendedConcurrency(currentMetrics, - MAX_PARTITION_MOVEMENTS_PER_BROKER, - MAX_PARTITION_MOVEMENTS_PER_BROKER); - assertEquals(MAX_PARTITION_MOVEMENTS_PER_BROKER / 2, recommendedConcurrency.intValue()); + MOCK_MAX_PARTITION_MOVEMENTS_PER_BROKER, + ConcurrencyType.INTER_BROKER_REPLICA); + assertEquals(MOCK_MAX_PARTITION_MOVEMENTS_PER_BROKER / MOCK_MD_INTER_BROKER_REPLICA, recommendedConcurrency.intValue()); + + // 3.2. Leadership reassignment (non-capped) + recommendedConcurrency = ExecutionUtils.recommendedConcurrency(currentMetrics, + MOCK_MAX_LEADERSHIP_MOVEMENTS, + ConcurrencyType.LEADERSHIP); + assertEquals(MOCK_MAX_LEADERSHIP_MOVEMENTS / MOCK_MD_LEADERSHIP, recommendedConcurrency.intValue()); + + + // 3.3. Inter-broker replica reassignment (capped) + currentMovementConcurrency = (MOCK_MIN_PARTITION_MOVEMENTS_PER_BROKER * MOCK_MD_INTER_BROKER_REPLICA + 1) - 1; + recommendedConcurrency = ExecutionUtils.recommendedConcurrency(currentMetrics, + currentMovementConcurrency, + ConcurrencyType.INTER_BROKER_REPLICA); + assertEquals(MOCK_MIN_PARTITION_MOVEMENTS_PER_BROKER, recommendedConcurrency.intValue()); + + // 3.4. Leadership reassignment (capped) + currentMovementConcurrency = (MOCK_MIN_LEADERSHIP_MOVEMENTS_CONFIG * MOCK_MD_LEADERSHIP) - 1; + recommendedConcurrency = ExecutionUtils.recommendedConcurrency(currentMetrics, + currentMovementConcurrency, + ConcurrencyType.LEADERSHIP); + assertEquals(MOCK_MIN_LEADERSHIP_MOVEMENTS_CONFIG, recommendedConcurrency.intValue()); + + // 4. Verify no change in concurrency due to hitting lower limit. + // 4.1. Inter-broker replica reassignment + recommendedConcurrency = ExecutionUtils.recommendedConcurrency(currentMetrics, + MOCK_MIN_PARTITION_MOVEMENTS_PER_BROKER, + ConcurrencyType.INTER_BROKER_REPLICA); + assertNull(recommendedConcurrency); - // Verify no change in concurrency due to hitting lower limit. + // 4.2. Leadership reassignment recommendedConcurrency = ExecutionUtils.recommendedConcurrency(currentMetrics, - 1, - MAX_PARTITION_MOVEMENTS_PER_BROKER); + MOCK_MIN_LEADERSHIP_MOVEMENTS_CONFIG, + ConcurrencyType.LEADERSHIP); assertNull(recommendedConcurrency); } @@ -160,6 +227,22 @@ private static Properties getExecutorProperties() { Double.toString(MOCK_COMMON_CONCURRENCY_ADJUSTER_LIMIT)); props.setProperty(ExecutorConfig.CONCURRENCY_ADJUSTER_LIMIT_REQUEST_QUEUE_SIZE_CONFIG, Double.toString(MOCK_COMMON_CONCURRENCY_ADJUSTER_LIMIT)); + props.setProperty(ExecutorConfig.CONCURRENCY_ADJUSTER_ADDITIVE_INCREASE_INTER_BROKER_REPLICA_CONFIG, + Integer.toString(MOCK_ADDITIVE_INCREASE_INTER_BROKER_REPLICA)); + props.setProperty(ExecutorConfig.CONCURRENCY_ADJUSTER_ADDITIVE_INCREASE_LEADERSHIP_CONFIG, + Integer.toString(MOCK_ADDITIVE_INCREASE_LEADERSHIP)); + props.setProperty(ExecutorConfig.CONCURRENCY_ADJUSTER_MULTIPLICATIVE_DECREASE_INTER_BROKER_REPLICA_CONFIG, + Integer.toString(MOCK_MD_INTER_BROKER_REPLICA)); + props.setProperty(ExecutorConfig.CONCURRENCY_ADJUSTER_MULTIPLICATIVE_DECREASE_LEADERSHIP_CONFIG, + Integer.toString(MOCK_MD_LEADERSHIP)); + props.setProperty(ExecutorConfig.CONCURRENCY_ADJUSTER_MAX_PARTITION_MOVEMENTS_PER_BROKER_CONFIG, + Integer.toString(MOCK_MAX_PARTITION_MOVEMENTS_PER_BROKER)); + props.setProperty(ExecutorConfig.CONCURRENCY_ADJUSTER_MAX_LEADERSHIP_MOVEMENTS_CONFIG, + Integer.toString(MOCK_MAX_LEADERSHIP_MOVEMENTS)); + props.setProperty(ExecutorConfig.CONCURRENCY_ADJUSTER_MIN_PARTITION_MOVEMENTS_PER_BROKER_CONFIG, + Integer.toString(MOCK_MIN_PARTITION_MOVEMENTS_PER_BROKER)); + props.setProperty(ExecutorConfig.CONCURRENCY_ADJUSTER_MIN_LEADERSHIP_MOVEMENTS_CONFIG, + Integer.toString(MOCK_MIN_LEADERSHIP_MOVEMENTS_CONFIG)); return props; } } diff --git a/docs/wiki/User Guide/Configurations.md b/docs/wiki/User Guide/Configurations.md index 351751f17c1a6..0c523082283f5 100644 --- a/docs/wiki/User Guide/Configurations.md +++ b/docs/wiki/User Guide/Configurations.md @@ -114,39 +114,46 @@ The following configurations are inherited from the open source Kafka client con | goal.violation.distribution.threshold.multiplier | Double | N | 1.0 | The multiplier applied to the threshold of distribution goals used for detecting and fixing anomalies. For example, 2.50 means the threshold for each distribution goal (i.e. Replica Distribution, Leader Replica Distribution, Resource Distribution, and Topic Replica Distribution Goals) will be 2.50x of the value used in manual goal optimization requests (e.g. rebalance). | ### Executor Configurations -| Name | Type | Required? | Default Value | Descriptions | -|-----------------------------------------------------------|---------|-----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| zookeeper.connect | String | Y | | The zookeeper path used by the Kafka cluster (see https://kafka.apache.org/documentation/#zookeeper.connect). | -| num.concurrent.partition.movements.per.broker | Integer | N | 5 | The maximum number of partitions the executor will move to or out of a broker at the same time. e.g. setting the value to 10 means that the executor will at most allow 10 partitions move out of a broker and 10 partitions move into a broker at any given point. This is to avoid overwhelming the cluster by partition movements. | -| max.num.cluster.movements | Integer | N | 1250 | The maximum number of allowed movements (e.g. partition, leadership) in cluster. This global limit cannot be exceeded regardless of the per-broker replica movement concurrency. When determining this limit, ensure that the (number-of-allowed-movements * maximum-size-of-each-request) is smaller than the default zNode size limit. | -| num.concurrent.intra.broker.partition.movements | Integer | N | 2 | The maximum number of partitions the executor will move across disks within a broker at the same time. e.g. setting the value to 10 means that the executor will at most allow 10 partitions to move across disks within a broker at any given point. This is to avoid overwhelming the cluster by intra-broker partition movements. | -| num.concurrent.leader.movements | Integer | N | 1000 | The maximum number of leader movements the executor will take as one batch. This is mainly because the ZNode has a 1 MB size upper limit. And it will also reduce the controller burden. | -| execution.progress.check.interval.ms | Integer | N | 10,000 | The interval in milliseconds that the " +,"executor will check on the execution progress. | -| metric.anomaly.analyzer.metrics | String | N | "" | The metric ids that the metric anomaly detector should detect if they are violated. | -| topics.excluded.from.partition.movement | String | N | "" | The topics that should be excluded from the partition movement. It is a regex. Notice that this regex will be ignored when decommission a broker is invoked. | -| default.replication.throttle | Long | N | null | The replication throttle applied to replicas being moved, in bytes per second. | -| replica.movement.strategies | List | N | [com.linkedin.kafka.cruisecontrol.executor.strategy.PostponeUrpReplicaMovementStrategy, com.linkedin.kafka.cruisecontrol.executor.strategy.PrioritizeLargeReplicaMovementStrategy, com.linkedin.kafka.cruisecontrol.executor.strategy.PrioritizeSmallReplicaMovementStrategy, com.linkedin.kafka.cruisecontrol.executor.strategy.BaseReplicaMovementStrategy] | A list of supported strategies used to determine execution order for generated partition movement tasks. | -| default.replica.movement.strategies | List | N | [com.linkedin.kafka.cruisecontrol.executor.strategy.BaseReplicaMovementStrategy] | The list of replica movement strategies that will be used by default if no replica movement strategy list is provided. | -| executor.notifier.class | Class | N | class com.linkedin.kafka.cruisecontrol.executor.ExecutorNoopNotifier | The executor notifier class to trigger an alert when an execution finishes or is stopped (by a user or by Cruise Control). | -| demotion.history.retention.time.ms | Long | N | 1209600000 | The maximum time in milliseconds to retain the demotion history of brokers. | -| removal.history.retention.time.ms | Long | N | 1209600000 | The maximum time in milliseconds to retain the removal history of brokers. | -| logdir.response.timeout.ms | Long | N | 10000 | Timeout in ms for broker logdir to respond | -| leader.movement.timeout.ms | Long | N | 180000 | The maximum time to wait for a leader movement to finish. A leader movement will be marked as failed if it takes longer than this time to finish. | -| task.execution.alerting.threshold.ms | Long | N | 90000 | Threshold of execution time to alert a replica/leader movement task. If the task's execution time exceeds this threshold and the data movement rate is lower than the threshold set for inter-broker/intra-broker replica, alert will be sent out by notifier set via executor.notifier.class. | -| inter.broker.replica.movement.rate.alerting.threshold | Double | N | 0.1 | Threshold of data movement rate(in MB/s) for inter-broker replica movement task. If the task's data movement rate is lower than this and the task's execution time exceeds the threshold set via task.execution.alerting.threshold.ms, alert will be sent out by notifier set via executor.notifier.class. | -| intra.broker.replica.movement.rate.alerting.threshold | Double | N | 0.2 | Threshold of data movement rate(in MB/s) for intra-broker replica movement task. If the task's data movement rate is lower than this and the task's execution time exceeds the threshold set via task.execution.alerting.threshold.ms, alert will be sent out by notifier set via executor.notifier.class. | -| concurrency.adjuster.interval.ms | Long | N | 360000 | The interval of concurrency auto adjustment for inter-broker partition movements. | -| concurrency.adjuster.max.partition.movements.per.broker | Integer | N | 12 | The maximum number of partitions the concurrency auto adjustment will allow the executor to move in or out of a broker at the same time. It enforces a cap on the maximum concurrent inter-broker partition movements to avoid overwhelming the cluster. It must be greater than num.concurrent.partition.movements.per.broker and not more than max.num.cluster.movements. | -| concurrency.adjuster.enabled | Boolean | N | false | The flag to indicate whether the concurrency of inter-broker partition movements will be auto-adjusted based on dynamically changing broker metrics. | -| concurrency.adjuster.limit.log.flush.time.ms | Double | N | 2000.0 | The limit on the 99.9th percentile broker metric value of log flush time. If any broker exceeds this limit during an ongoing inter-broker partition reassignment, the concurrency adjuster (if enabled) attempts to decrease the number of allowed concurrent inter-broker partition movements. | -| concurrency.adjuster.limit.follower.fetch.local.time.ms | Double | N | 500.0 | The limit on the 99.9th percentile broker metric value of follower fetch local time. If any broker exceeds this limit during an ongoing inter-broker partition reassignment, the concurrency adjuster (if enabled) attempts to decrease the number of allowed concurrent inter-broker partition movements. | -| concurrency.adjuster.limit.produce.local.time.ms | Double | N | 1000.0 | The limit on the 99.9th percentile broker metric value of produce local time. If any broker exceeds this limit during an ongoing inter-broker partition reassignment, the concurrency adjuster (if enabled) attempts to decrease the number of allowed concurrent inter-broker partition movements. | -| concurrency.adjuster.limit.consumer.fetch.local.time.ms | Double | N | 500.0 | The limit on the 99.9th percentile broker metric value of consumer fetch local time. If any broker exceeds this limit during an ongoing inter-broker partition reassignment, the concurrency adjuster (if enabled) attempts to decrease the number of allowed concurrent inter-broker partition movements. | -| concurrency.adjuster.limit.request.queue.size | Double | N | 1000.0 | The limit on the broker metric value of request queue size. If any broker exceeds this limit during an ongoing inter-broker partition reassignment, the concurrency adjuster (if enabled) attempts to decrease the number of allowed concurrent inter-broker partition movements. | -| list.partition.reassignment.timeout.ms | Long | N | 60000 | The maximum time to wait for the response of an Admin#listPartitionReassignments() request to be available. | -| list.partition.reassignment.max.attempts | Integer | N | 3 | The maximum number of attempts to get an available response for an Admin#listPartitionReassignments() request in case of a timeout. Each attempt recalculates the allowed timeout using: list-partition-reassignments-timeout-for-the-initial-response * (base-backoff ^ attempt). | -| min.execution.progress.check.interval.ms | Double | N | 5000 | The minimum execution progress check interval that users can dynamically set the execution progress check interval to. | -| slow.task.alerting.backoff.ms | Double | N | 60000 | The minimum interval between slow task alerts. This backoff helps bundling slow tasks to report rather than individually reporting them upon detection. | +| Name | Type | Required? | Default Value | Descriptions | +|-------------------------------------------------------------------|---------|-----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| zookeeper.connect | String | Y | | The zookeeper path used by the Kafka cluster (see https://kafka.apache.org/documentation/#zookeeper.connect). | +| num.concurrent.partition.movements.per.broker | Integer | N | 5 | The maximum number of partitions the executor will move to or out of a broker at the same time. e.g. setting the value to 10 means that the executor will at most allow 10 partitions move out of a broker and 10 partitions move into a broker at any given point. This is to avoid overwhelming the cluster by partition movements. | +| max.num.cluster.movements | Integer | N | 1250 | The maximum number of allowed movements (e.g. partition, leadership) in cluster. This global limit cannot be exceeded regardless of the per-broker replica movement concurrency. When determining this limit, ensure that the (number-of-allowed-movements * maximum-size-of-each-request) is smaller than the default zNode size limit. | +| num.concurrent.intra.broker.partition.movements | Integer | N | 2 | The maximum number of partitions the executor will move across disks within a broker at the same time. e.g. setting the value to 10 means that the executor will at most allow 10 partitions to move across disks within a broker at any given point. This is to avoid overwhelming the cluster by intra-broker partition movements. | +| num.concurrent.leader.movements | Integer | N | 1000 | The maximum number of leader movements the executor will take as one batch. This is mainly because the ZNode has a 1 MB size upper limit. And it will also reduce the controller burden. | +| execution.progress.check.interval.ms | Integer | N | 10,000 | The interval in milliseconds that the " +,"executor will check on the execution progress. | +| metric.anomaly.analyzer.metrics | String | N | "" | The metric ids that the metric anomaly detector should detect if they are violated. | +| topics.excluded.from.partition.movement | String | N | "" | The topics that should be excluded from the partition movement. It is a regex. Notice that this regex will be ignored when decommission a broker is invoked. | +| default.replication.throttle | Long | N | null | The replication throttle applied to replicas being moved, in bytes per second. | +| replica.movement.strategies | List | N | [com.linkedin.kafka.cruisecontrol.executor.strategy.PostponeUrpReplicaMovementStrategy, com.linkedin.kafka.cruisecontrol.executor.strategy.PrioritizeLargeReplicaMovementStrategy, com.linkedin.kafka.cruisecontrol.executor.strategy.PrioritizeSmallReplicaMovementStrategy, com.linkedin.kafka.cruisecontrol.executor.strategy.BaseReplicaMovementStrategy] | A list of supported strategies used to determine execution order for generated partition movement tasks. | +| default.replica.movement.strategies | List | N | [com.linkedin.kafka.cruisecontrol.executor.strategy.BaseReplicaMovementStrategy] | The list of replica movement strategies that will be used by default if no replica movement strategy list is provided. | +| executor.notifier.class | Class | N | class com.linkedin.kafka.cruisecontrol.executor.ExecutorNoopNotifier | The executor notifier class to trigger an alert when an execution finishes or is stopped (by a user or by Cruise Control). | +| demotion.history.retention.time.ms | Long | N | 1209600000 | The maximum time in milliseconds to retain the demotion history of brokers. | +| removal.history.retention.time.ms | Long | N | 1209600000 | The maximum time in milliseconds to retain the removal history of brokers. | +| logdir.response.timeout.ms | Long | N | 10000 | Timeout in ms for broker logdir to respond | +| leader.movement.timeout.ms | Long | N | 180000 | The maximum time to wait for a leader movement to finish. A leader movement will be marked as failed if it takes longer than this time to finish. | +| task.execution.alerting.threshold.ms | Long | N | 90000 | Threshold of execution time to alert a replica/leader movement task. If the task's execution time exceeds this threshold and the data movement rate is lower than the threshold set for inter-broker/intra-broker replica, alert will be sent out by notifier set via executor.notifier.class. | +| inter.broker.replica.movement.rate.alerting.threshold | Double | N | 0.1 | Threshold of data movement rate(in MB/s) for inter-broker replica movement task. If the task's data movement rate is lower than this and the task's execution time exceeds the threshold set via task.execution.alerting.threshold.ms, alert will be sent out by notifier set via executor.notifier.class. | +| intra.broker.replica.movement.rate.alerting.threshold | Double | N | 0.2 | Threshold of data movement rate(in MB/s) for intra-broker replica movement task. If the task's data movement rate is lower than this and the task's execution time exceeds the threshold set via task.execution.alerting.threshold.ms, alert will be sent out by notifier set via executor.notifier.class. | +| concurrency.adjuster.interval.ms | Long | N | 360000 | The interval of concurrency auto adjustment. | +| concurrency.adjuster.max.partition.movements.per.broker | Integer | N | 12 | The maximum number of partitions the concurrency auto adjustment will allow the executor to move in or out of a broker at the same time. It enforces a cap on the maximum concurrent inter-broker partition movements to avoid overwhelming the cluster. It must be greater than num.concurrent.partition.movements.per.broker and not more than max.num.cluster.movements. | +| concurrency.adjuster.max.leadership.movements | Integer | N | 1100 | The maximum number of leadership movements the concurrency auto adjustment will allow the executor to perform in one batch to avoid overwhelming the cluster. It cannot be (1) smaller than num.concurrent.leader.movements and (2) greater than max.num.cluster.movements. | +| concurrency.adjuster.enabled | Boolean | N | false | The flag to indicate whether the concurrency of all supported movements will be auto-adjusted based on dynamically changing broker metrics. It enables concurrency adjuster for all supported concurrency types, regardless of whether the particular concurrency type is disabled. | +| concurrency.adjuster.inter.broker.replica.enabled | Boolean | N | false | Enable concurrency adjuster for inter-broker replica reassignments. | +| concurrency.adjuster.leadership.enabled | Boolean | N | false | Enable concurrency adjuster for leadership reassignments. | +| concurrency.adjuster.limit.log.flush.time.ms | Double | N | 2000.0 | The limit on the 99.9th percentile broker metric value of log flush time. If any broker exceeds this limit during an ongoing reassignment, the relevant concurrency adjuster (if enabled) attempts to decrease the number of allowed concurrent movements. | +| concurrency.adjuster.limit.follower.fetch.local.time.ms | Double | N | 500.0 | The limit on the 99.9th percentile broker metric value of follower fetch local time. If any broker exceeds this limit during an ongoing reassignment, the relevant concurrency adjuster (if enabled) attempts to decrease the number of allowed concurrent movements. | +| concurrency.adjuster.limit.produce.local.time.ms | Double | N | 1000.0 | The limit on the 99.9th percentile broker metric value of produce local time. If any broker exceeds this limit during an ongoing reassignment, the relevant concurrency adjuster (if enabled) attempts to decrease the number of allowed concurrent movements. | +| concurrency.adjuster.limit.consumer.fetch.local.time.ms | Double | N | 500.0 | The limit on the 99.9th percentile broker metric value of consumer fetch local time. If any broker exceeds this limit during an ongoing reassignment, the relevant concurrency adjuster (if enabled) attempts to decrease the number of allowed concurrent movements. | +| concurrency.adjuster.limit.request.queue.size | Double | N | 1000.0 | The limit on the broker metric value of request queue size. If any broker exceeds this limit during an ongoing reassignment, the relevant concurrency adjuster (if enabled) attempts to decrease the number of allowed concurrent movements. | +| concurrency.adjuster.additive.increase.inter.broker.replica | Integer | N | 1 | The fixed number by which the concurrency cap on inter-broker replica movements will be increased by the concurrency adjuster (if enabled) when all considered metrics are within the concurrency adjuster limit. | +| concurrency.adjuster.additive.increase.leadership | Integer | N | 100 | The fixed number by which the concurrency cap on leadership movements will be increased by the concurrency adjuster (if enabled) when all considered metrics are within the concurrency adjuster limit. | +| concurrency.adjuster.multiplicative.decrease.inter.broker.replica | Integer | N | 2 | The fixed number by which the concurrency cap on inter-broker replica movements will be divided by the concurrency adjuster (if enabled) when any considered metric exceeds the concurrency adjuster limit. | +| concurrency.adjuster.multiplicative.decrease.leadership | Integer | N | 2 | The fixed number by which the concurrency cap on leadership movements will be divided by the concurrency adjuster (if enabled) when any considered metric exceeds the concurrency adjuster limit. | +| list.partition.reassignment.timeout.ms | Long | N | 60000 | The maximum time to wait for the response of an Admin#listPartitionReassignments() request to be available. | +| list.partition.reassignment.max.attempts | Integer | N | 3 | The maximum number of attempts to get an available response for an Admin#listPartitionReassignments() request in case of a timeout. Each attempt recalculates the allowed timeout using: list-partition-reassignments-timeout-for-the-initial-response * (base-backoff ^ attempt). | +| min.execution.progress.check.interval.ms | Double | N | 5000 | The minimum execution progress check interval that users can dynamically set the execution progress check interval to. | +| slow.task.alerting.backoff.ms | Double | N | 60000 | The minimum interval between slow task alerts. This backoff helps bundling slow tasks to report rather than individually reporting them upon detection. | ### AnomalyDetector Configurations | Name | Type | Required? | Default Value | Description |