diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java index bc531c9ceb190..f2fbd149e1fed 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControl.java @@ -116,7 +116,7 @@ public KafkaCruiseControl(KafkaCruiseControlConfig config, MetricRegistry dropwi _executor = new Executor(config, _time, dropwizardMetricRegistry, _anomalyDetectorManager); _loadMonitor = new LoadMonitor(config, _time, dropwizardMetricRegistry, KafkaMetricDef.commonMetricDef()); _goalOptimizerExecutor = Executors.newSingleThreadExecutor(new KafkaCruiseControlThreadFactory("GoalOptimizerExecutor", true, null)); - _goalOptimizer = new GoalOptimizer(config, _loadMonitor, _time, dropwizardMetricRegistry, _executor); + _goalOptimizer = new GoalOptimizer(config, _loadMonitor, _time, dropwizardMetricRegistry, _executor, _adminClient); } /** diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUtils.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUtils.java index abeedf742f97a..ba7a4da068201 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUtils.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUtils.java @@ -78,6 +78,8 @@ */ public class KafkaCruiseControlUtils { private static final Logger LOG = LoggerFactory.getLogger(KafkaCruiseControlUtils.class); + // Config to pass an Admin client to a pluggable component + public static final String ADMIN_CLIENT_CONFIG = "admin.client.object"; public static final double MAX_BALANCEDNESS_SCORE = 100.0; public static final int ZK_SESSION_TIMEOUT = 120000; public static final int ZK_CONNECTION_TIMEOUT = 120000; diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/GoalOptimizer.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/GoalOptimizer.java index 6745b7974ed8c..c4d8b667c3f34 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/GoalOptimizer.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/analyzer/GoalOptimizer.java @@ -24,7 +24,7 @@ import com.linkedin.kafka.cruisecontrol.monitor.MonitorUtils; import com.linkedin.kafka.cruisecontrol.monitor.task.LoadMonitorTaskRunner; import com.linkedin.kafka.cruisecontrol.servlet.response.stats.BrokerStats; -import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; @@ -40,12 +40,14 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; import java.util.stream.Collectors; +import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.ADMIN_CLIENT_CONFIG; import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.balancednessCostByGoal; import static com.linkedin.kafka.cruisecontrol.monitor.task.LoadMonitorTaskRunner.LoadMonitorTaskRunnerState.BOOTSTRAPPING; import static com.linkedin.kafka.cruisecontrol.monitor.task.LoadMonitorTaskRunner.LoadMonitorTaskRunnerState.LOADING; @@ -92,7 +94,8 @@ public GoalOptimizer(KafkaCruiseControlConfig config, LoadMonitor loadMonitor, Time time, MetricRegistry dropwizardMetricRegistry, - Executor executor) { + Executor executor, + AdminClient adminClient) { _goalsByPriority = AnalyzerUtils.getGoalsByPriority(config); _defaultModelCompletenessRequirements = MonitorUtils.combineLoadRequirementOptions(_goalsByPriority); _requirementsWithAvailableValidWindows = new ModelCompletenessRequirements( @@ -121,7 +124,9 @@ public GoalOptimizer(KafkaCruiseControlConfig config, _priorityWeight = config.getDouble(AnalyzerConfig.GOAL_BALANCEDNESS_PRIORITY_WEIGHT_CONFIG); _strictnessWeight = config.getDouble(AnalyzerConfig.GOAL_BALANCEDNESS_STRICTNESS_WEIGHT_CONFIG); _allowCapacityEstimationOnProposalPrecompute = config.getBoolean(AnalyzerConfig.ALLOW_CAPACITY_ESTIMATION_ON_PROPOSAL_PRECOMPUTE_CONFIG); - Map overrideConfigs = Collections.singletonMap(KAFKA_CRUISE_CONTROL_CONFIG_OBJECT_CONFIG, config); + Map overrideConfigs = new HashMap<>(2); + overrideConfigs.put(KAFKA_CRUISE_CONTROL_CONFIG_OBJECT_CONFIG, config); + overrideConfigs.put(ADMIN_CLIENT_CONFIG, adminClient); _optimizationOptionsGenerator = config.getConfiguredInstance(AnalyzerConfig.OPTIMIZATION_OPTIONS_GENERATOR_CLASS_CONFIG, OptimizationOptionsGenerator.class, overrideConfigs); diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/GoalViolationDetector.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/GoalViolationDetector.java index 8d8e823d7f144..f623b036da525 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/GoalViolationDetector.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/detector/GoalViolationDetector.java @@ -34,6 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.ADMIN_CLIENT_CONFIG; import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.balancednessCostByGoal; import static com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils.MAX_BALANCEDNESS_SCORE; import static com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorUtils.ANOMALY_DETECTION_TIME_MS_OBJECT_CONFIG; @@ -72,7 +73,9 @@ public GoalViolationDetector(Queue anomalies, KafkaCruiseControl kafkaC config.getDouble(AnalyzerConfig.GOAL_BALANCEDNESS_PRIORITY_WEIGHT_CONFIG), config.getDouble(AnalyzerConfig.GOAL_BALANCEDNESS_STRICTNESS_WEIGHT_CONFIG)); _balancednessScore = MAX_BALANCEDNESS_SCORE; - Map overrideConfigs = Collections.singletonMap(KAFKA_CRUISE_CONTROL_CONFIG_OBJECT_CONFIG, config); + Map overrideConfigs = new HashMap<>(2); + overrideConfigs.put(KAFKA_CRUISE_CONTROL_CONFIG_OBJECT_CONFIG, config); + overrideConfigs.put(ADMIN_CLIENT_CONFIG, _kafkaCruiseControl.adminClient()); _optimizationOptionsGenerator = config.getConfiguredInstance(AnalyzerConfig.OPTIMIZATION_OPTIONS_GENERATOR_CLASS_CONFIG, OptimizationOptionsGenerator.class, overrideConfigs); diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/analyzer/GoalOptimizerTest.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/analyzer/GoalOptimizerTest.java index 28cb19e8fe8a7..a727c85147cd1 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/analyzer/GoalOptimizerTest.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/analyzer/GoalOptimizerTest.java @@ -13,6 +13,7 @@ import com.linkedin.kafka.cruisecontrol.executor.Executor; import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor; import java.util.Properties; +import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.common.utils.SystemTime; import org.easymock.EasyMock; import org.junit.Test; @@ -29,8 +30,8 @@ public void testNoPreComputingThread() { props.setProperty(AnalyzerConfig.DEFAULT_GOALS_CONFIG, TestConstants.DEFAULT_GOALS_VALUES); KafkaCruiseControlConfig config = new KafkaCruiseControlConfig(props); - GoalOptimizer goalOptimizer = new GoalOptimizer(config, EasyMock.mock(LoadMonitor.class), new SystemTime(), - new MetricRegistry(), EasyMock.mock(Executor.class)); + GoalOptimizer goalOptimizer = new GoalOptimizer(config, EasyMock.mock(LoadMonitor.class), new SystemTime(), new MetricRegistry(), + EasyMock.mock(Executor.class), EasyMock.mock(AdminClient.class)); // Should exit immediately. goalOptimizer.run(); } diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/analyzer/OfflineProposalGenerator.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/analyzer/OfflineProposalGenerator.java index 13578b57e96b9..d9b4ac7d58e37 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/analyzer/OfflineProposalGenerator.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/analyzer/OfflineProposalGenerator.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.Properties; import java.util.stream.Collectors; +import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.common.utils.SystemTime; import org.easymock.EasyMock; @@ -57,7 +58,8 @@ public static void main(String[] argv) throws Exception { null, new SystemTime(), new MetricRegistry(), - EasyMock.mock(Executor.class)); + EasyMock.mock(Executor.class), + EasyMock.mock(AdminClient.class)); start = System.currentTimeMillis(); OptimizerResult optimizerResult = goalOptimizer.optimizations(clusterModel, new OperationProgress()); end = System.currentTimeMillis(); diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/analyzer/OptimizationVerifier.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/analyzer/OptimizationVerifier.java index ecf7262943005..25aecf4177b5f 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/analyzer/OptimizationVerifier.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/analyzer/OptimizationVerifier.java @@ -32,6 +32,7 @@ import com.linkedin.kafka.cruisecontrol.model.Replica; import java.util.stream.Collectors; +import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.SystemTime; import org.easymock.EasyMock; @@ -143,10 +144,11 @@ static boolean executeGoalsFor(BalancingConstraint constraint, null, new SystemTime(), new MetricRegistry(), - EasyMock.mock(Executor.class)); + EasyMock.mock(Executor.class), + EasyMock.mock(AdminClient.class)); - List goalsOfFirstPass = null; - OptimizerResult resultOfFirstPass = null; + List goalsOfFirstPass; + OptimizerResult resultOfFirstPass; List goalsOfSecondPass = null; OptimizerResult resultOfSecondPass = null;