From 92316f5391afcd27e189d0a20879ad795863e826 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Thu, 10 Aug 2023 08:34:51 +0100 Subject: [PATCH] [ML] Make the scale the processor count setting updatable (#98298) --- .../xpack/ml/MachineLearning.java | 5 +- .../TransportGetMlAutoscalingStats.java | 2 + .../MlAutoscalingDeciderService.java | 21 ++- .../MlAutoscalingResourceTracker.java | 5 +- .../MlMemoryAutoscalingDecider.java | 17 ++- .../MlProcessorAutoscalingDecider.java | 15 +- .../TrainedModelAssignmentClusterService.java | 13 +- .../TrainedModelAssignmentRebalancer.java | 36 ++--- .../xpack/ml/utils/MlProcessors.java | 11 +- .../MlAutoscalingDeciderServiceTests.java | 1 + .../MlMemoryAutoscalingDeciderTests.java | 10 +- .../MlProcessorAutoscalingDeciderTests.java | 31 +++- ...nedModelAssignmentClusterServiceTests.java | 3 +- ...TrainedModelAssignmentRebalancerTests.java | 141 +++++++++++------- .../xpack/ml/utils/MlProcessorsTests.java | 12 +- 15 files changed, 206 insertions(+), 117 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index d0ef386efbbf3..3f62dc10c6fa8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -596,10 +596,10 @@ public void loadExtensions(ExtensionLoader loader) { * allocations that can be deployed on a node. */ public static final Setting ALLOCATED_PROCESSORS_SCALE = Setting.intSetting( - "ml.allocated_processors_scale", + "xpack.ml.allocated_processors_scale", 1, 1, - Property.OperatorDynamic, + Property.Dynamic, Property.NodeScope ); @@ -782,6 +782,7 @@ public static boolean isMlNode(DiscoveryNode node) { @Override public List> getSettings() { return List.of( + ALLOCATED_PROCESSORS_SCALE, MachineLearningField.AUTODETECT_PROCESS, PROCESS_CONNECT_TIMEOUT, CONCURRENT_JOB_ALLOCATIONS, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetMlAutoscalingStats.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetMlAutoscalingStats.java index 16005b4bb7f43..c2acd97e53c95 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetMlAutoscalingStats.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetMlAutoscalingStats.java @@ -77,6 +77,7 @@ protected void masterOperation(Task task, Request request, ClusterState state, A if (mlMemoryTracker.isRecentlyRefreshed()) { MlAutoscalingResourceTracker.getMlAutoscalingStats( state, + clusterService.getClusterSettings(), parentTaskAssigningClient, request.timeout(), mlMemoryTracker, @@ -96,6 +97,7 @@ protected void masterOperation(Task task, Request request, ClusterState state, A ActionListener.wrap( ignored -> MlAutoscalingResourceTracker.getMlAutoscalingStats( state, + clusterService.getClusterSettings(), parentTaskAssigningClient, request.timeout(), mlMemoryTracker, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java index 3acec6b6ce39d..b006f5728fcd2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java @@ -20,6 +20,7 @@ import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderContext; import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResult; import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderService; +import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.NodeLoadDetector; import org.elasticsearch.xpack.ml.process.MlMemoryTracker; @@ -43,6 +44,7 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService, L private final MlProcessorAutoscalingDecider processorDecider; private volatile boolean isMaster; + private volatile int allocatedProcessorsScale; public MlAutoscalingDeciderService( MlMemoryTracker memoryTracker, @@ -69,7 +71,15 @@ public MlAutoscalingDeciderService( scaleTimer ); this.processorDecider = new MlProcessorAutoscalingDecider(scaleTimer); + this.allocatedProcessorsScale = MachineLearning.ALLOCATED_PROCESSORS_SCALE.get(settings); + clusterService.addLocalNodeMasterListener(this); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(MachineLearning.ALLOCATED_PROCESSORS_SCALE, this::setAllocatedProcessorsScale); + } + + void setAllocatedProcessorsScale(int scale) { + this.allocatedProcessorsScale = scale; } @Override @@ -96,7 +106,7 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider final MlMemoryAutoscalingCapacity currentMemoryCapacity = memoryDecider.capacityFromNativeMemory(currentNativeMemoryCapacity); final MlProcessorAutoscalingCapacity currentProcessorCapacity = processorDecider.computeCurrentCapacity( mlContext.mlNodes, - configuration + allocatedProcessorsScale ); final MlScalingReason.Builder reasonBuilder = MlScalingReason.builder(mlContext) @@ -123,7 +133,7 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider return downscaleToZero(configuration, context, currentNativeMemoryCapacity, reasonBuilder); } - MlMemoryAutoscalingCapacity memoryCapacity = memoryDecider.scale(configuration, context, mlContext); + MlMemoryAutoscalingCapacity memoryCapacity = memoryDecider.scale(configuration, context, mlContext, allocatedProcessorsScale); if (memoryCapacity.isUndetermined()) { // If we cannot determine memory capacity we shouldn't make any autoscaling decision // as it could lead to undesired capacity. For example, it could be that the processor decider decides @@ -134,7 +144,12 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider reasonBuilder.setSimpleReason(format("[memory_decider] %s", memoryCapacity.reason())).build() ); } - MlProcessorAutoscalingCapacity processorCapacity = processorDecider.scale(configuration, context, mlContext); + MlProcessorAutoscalingCapacity processorCapacity = processorDecider.scale( + configuration, + context, + mlContext, + allocatedProcessorsScale + ); reasonBuilder.setSimpleReason( format("[memory_decider] %s; [processor_decider] %s", memoryCapacity.reason(), processorCapacity.reason()) ); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingResourceTracker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingResourceTracker.java index 91fe33415d024..f1a7abce8e87e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingResourceTracker.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingResourceTracker.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; @@ -65,6 +66,7 @@ private MlAutoscalingResourceTracker() {} public static void getMlAutoscalingStats( ClusterState clusterState, + ClusterSettings clusterSettings, Client client, TimeValue timeout, MlMemoryTracker mlMemoryTracker, @@ -82,7 +84,8 @@ public static void getMlAutoscalingStats( ? NativeMemoryCalculator.allowedBytesForMl(clusterState.nodes().get(mlNodes[0]), settings).orElse(0L) : 0L; int processorsAvailableFirstNode = mlNodes.length > 0 - ? MlProcessors.get(clusterState.nodes().get(mlNodes[0]), settings).roundDown() + ? MlProcessors.get(clusterState.nodes().get(mlNodes[0]), clusterSettings.get(MachineLearning.ALLOCATED_PROCESSORS_SCALE)) + .roundDown() : 0; // Todo: MAX_LOW_PRIORITY_MODELS_PER_NODE not checked yet diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDecider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDecider.java index 1fcfca5330d59..0ebea7956a661 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDecider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDecider.java @@ -118,7 +118,12 @@ void setMaxMlNodeSize(ByteSizeValue maxMlNodeSize) { } } - public MlMemoryAutoscalingCapacity scale(Settings configuration, AutoscalingDeciderContext context, MlAutoscalingContext mlContext) { + public MlMemoryAutoscalingCapacity scale( + Settings configuration, + AutoscalingDeciderContext context, + MlAutoscalingContext mlContext, + int allocatedProcessorsScale + ) { final ClusterState clusterState = context.state(); scaleTimer.lastScaleToScaleIntervalMillis() @@ -258,7 +263,11 @@ public MlMemoryAutoscalingCapacity scale(Settings configuration, AutoscalingDeci } // We should keep this check here as well as in the processor decider while cloud is not // reacting to processor autoscaling. - if (modelAssignmentsRequireMoreThanHalfCpu(mlContext.modelAssignments.values(), mlContext.mlNodes, configuration)) { + if (modelAssignmentsRequireMoreThanHalfCpu( + mlContext.modelAssignments.values(), + mlContext.mlNodes, + allocatedProcessorsScale + )) { logger.debug("not down-scaling; model assignments require more than half of the ML tier's allocated processors"); return null; } @@ -818,12 +827,12 @@ static MlMemoryAutoscalingCapacity ensureScaleDown( static boolean modelAssignmentsRequireMoreThanHalfCpu( Collection assignments, List mlNodes, - Settings settings + int allocatedProcessorsScale ) { int totalRequiredProcessors = assignments.stream() .mapToInt(t -> t.getTaskParams().getNumberOfAllocations() * t.getTaskParams().getThreadsPerAllocation()) .sum(); - int totalMlProcessors = mlNodes.stream().mapToInt(node -> MlProcessors.get(node, settings).roundUp()).sum(); + int totalMlProcessors = mlNodes.stream().mapToInt(node -> MlProcessors.get(node, allocatedProcessorsScale).roundUp()).sum(); return totalRequiredProcessors * 2 > totalMlProcessors; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDecider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDecider.java index 38ec86273fd86..9dc0604bb2e26 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDecider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDecider.java @@ -41,7 +41,12 @@ class MlProcessorAutoscalingDecider { this.scaleTimer = Objects.requireNonNull(scaleTimer); } - public MlProcessorAutoscalingCapacity scale(Settings configuration, AutoscalingDeciderContext context, MlAutoscalingContext mlContext) { + public MlProcessorAutoscalingCapacity scale( + Settings configuration, + AutoscalingDeciderContext context, + MlAutoscalingContext mlContext, + int allocatedProcessorsScale + ) { TrainedModelAssignmentMetadata trainedModelAssignmentMetadata = TrainedModelAssignmentMetadata.fromState(context.state()); if (hasUnsatisfiedDeployments(trainedModelAssignmentMetadata, mlContext.mlNodes)) { @@ -52,7 +57,7 @@ public MlProcessorAutoscalingCapacity scale(Settings configuration, AutoscalingD ).build(); } - final MlProcessorAutoscalingCapacity currentCapacity = computeCurrentCapacity(mlContext.mlNodes, configuration); + final MlProcessorAutoscalingCapacity currentCapacity = computeCurrentCapacity(mlContext.mlNodes, allocatedProcessorsScale); final MlProcessorAutoscalingCapacity requiredCapacity = computeRequiredCapacity(trainedModelAssignmentMetadata).build(); @@ -65,7 +70,7 @@ public MlProcessorAutoscalingCapacity scale(Settings configuration, AutoscalingD if (MlMemoryAutoscalingDecider.modelAssignmentsRequireMoreThanHalfCpu( trainedModelAssignmentMetadata.allAssignments().values(), mlContext.mlNodes, - configuration + allocatedProcessorsScale )) { return MlProcessorAutoscalingCapacity.builder(currentCapacity.nodeProcessors(), currentCapacity.tierProcessors()) .setReason("not scaling down as model assignments require more than half of the ML tier's allocated processors") @@ -137,11 +142,11 @@ private MlProcessorAutoscalingCapacity.Builder computeRequiredCapacity(TrainedMo ); } - MlProcessorAutoscalingCapacity computeCurrentCapacity(List mlNodes, Settings settings) { + MlProcessorAutoscalingCapacity computeCurrentCapacity(List mlNodes, int allocatedProcessorsScale) { Processors maxNodeProcessors = Processors.ZERO; Processors tierProcessors = Processors.ZERO; for (DiscoveryNode node : mlNodes) { - Processors nodeProcessors = MlProcessors.get(node, settings); + Processors nodeProcessors = MlProcessors.get(node, allocatedProcessorsScale); if (nodeProcessors.compareTo(maxNodeProcessors) > 0) { maxNodeProcessors = nodeProcessors; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterService.java index d02db1d652d35..80c2b5f7cf2ca 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterService.java @@ -80,6 +80,7 @@ public class TrainedModelAssignmentClusterService implements ClusterStateListene private volatile int maxOpenJobs; protected volatile int maxLazyMLNodes; protected volatile long maxMLNodeSize; + protected volatile int allocatedProcessorsScale; public TrainedModelAssignmentClusterService( Settings settings, @@ -99,6 +100,7 @@ public TrainedModelAssignmentClusterService( this.maxOpenJobs = MachineLearning.MAX_OPEN_JOBS_PER_NODE.get(settings); this.maxLazyMLNodes = MachineLearning.MAX_LAZY_ML_NODES.get(settings); this.maxMLNodeSize = MachineLearning.MAX_ML_NODE_SIZE.get(settings).getBytes(); + this.allocatedProcessorsScale = MachineLearning.ALLOCATED_PROCESSORS_SCALE.get(settings); // Only nodes that can possibly be master nodes really need this service running if (DiscoveryNode.isMasterNode(settings)) { clusterService.addListener(this); @@ -109,6 +111,8 @@ public TrainedModelAssignmentClusterService( clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_OPEN_JOBS_PER_NODE, this::setMaxOpenJobs); clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_LAZY_ML_NODES, this::setMaxLazyMLNodes); clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_ML_NODE_SIZE, this::setMaxMLNodeSize); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(MachineLearning.ALLOCATED_PROCESSORS_SCALE, this::setAllocatedProcessorsScale); } } @@ -132,6 +136,10 @@ private void setMaxMLNodeSize(ByteSizeValue value) { this.maxMLNodeSize = value.getBytes(); } + private void setAllocatedProcessorsScale(int scale) { + this.allocatedProcessorsScale = scale; + } + @SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) { clusterService.submitUnbatchedStateUpdateTask(source, task); @@ -486,9 +494,10 @@ private TrainedModelAssignmentMetadata.Builder rebalanceAssignments( TrainedModelAssignmentMetadata.fromState(currentState), nodeLoads, nodeAvailabilityZoneMapper.buildMlNodesByAvailabilityZone(currentState), - modelToAdd + modelToAdd, + allocatedProcessorsScale ); - TrainedModelAssignmentMetadata.Builder rebalanced = rebalancer.rebalance(clusterService.getSettings()); + TrainedModelAssignmentMetadata.Builder rebalanced = rebalancer.rebalance(); if (modelToAdd.isPresent()) { checkModelIsFullyAllocatedIfScalingIsNotPossible(modelToAdd.get().getDeploymentId(), rebalanced, nodes); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancer.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancer.java index fa8d1ad345759..f9e9ebed5acc4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancer.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancer.java @@ -13,7 +13,6 @@ import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction; import org.elasticsearch.xpack.core.ml.inference.assignment.Priority; @@ -51,20 +50,23 @@ class TrainedModelAssignmentRebalancer { private final Map nodeLoads; private final Map, Collection> mlNodesByZone; private final Optional deploymentToAdd; + private final int allocatedProcessorsScale; TrainedModelAssignmentRebalancer( TrainedModelAssignmentMetadata currentMetadata, Map nodeLoads, Map, Collection> mlNodesByZone, - Optional deploymentToAdd + Optional deploymentToAdd, + int allocatedProcessorsScale ) { this.currentMetadata = Objects.requireNonNull(currentMetadata); this.nodeLoads = Objects.requireNonNull(nodeLoads); this.mlNodesByZone = Objects.requireNonNull(mlNodesByZone); this.deploymentToAdd = Objects.requireNonNull(deploymentToAdd); + this.allocatedProcessorsScale = allocatedProcessorsScale; } - TrainedModelAssignmentMetadata.Builder rebalance(Settings settings) { + TrainedModelAssignmentMetadata.Builder rebalance() { if (deploymentToAdd.isPresent() && currentMetadata.hasDeployment(deploymentToAdd.get().getDeploymentId())) { throw new ResourceAlreadyExistsException( "[{}] assignment for deployment with model [{}] already exists", @@ -78,8 +80,8 @@ TrainedModelAssignmentMetadata.Builder rebalance(Settings settings) { return TrainedModelAssignmentMetadata.Builder.fromMetadata(currentMetadata); } - AssignmentPlan assignmentPlan = computeAssignmentPlan(settings); - return buildAssignmentsFromPlan(assignmentPlan, settings); + AssignmentPlan assignmentPlan = computeAssignmentPlan(); + return buildAssignmentsFromPlan(assignmentPlan); } private boolean areAllModelsSatisfiedAndNoOutdatedRoutingEntries() { @@ -92,8 +94,8 @@ private boolean areAllModelsSatisfiedAndNoOutdatedRoutingEntries() { return true; } - AssignmentPlan computeAssignmentPlan(Settings settings) { - final Map, List> nodesByZone = createNodesByZoneMap(settings); + AssignmentPlan computeAssignmentPlan() { + final Map, List> nodesByZone = createNodesByZoneMap(); final Set assignableNodeIds = nodesByZone.values() .stream() .flatMap(List::stream) @@ -271,7 +273,7 @@ private Map findFittingAssignments( return fittingAssignments; } - private Map, List> createNodesByZoneMap(Settings settings) { + private Map, List> createNodesByZoneMap() { return mlNodesByZone.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> { Collection discoveryNodes = e.getValue(); List nodes = new ArrayList<>(); @@ -285,7 +287,7 @@ private Map, List> createNodesByZoneMap(Settin // We subtract native inference memory as the planner expects available memory for // native inference including current assignments. getNodeFreeMemoryExcludingPerNodeOverheadAndNativeInference(load), - MlProcessors.get(discoveryNode, settings).roundUp() + MlProcessors.get(discoveryNode, allocatedProcessorsScale).roundUp() ) ); } else { @@ -305,7 +307,7 @@ private static long getNodeFreeMemoryExcludingPerNodeOverheadAndNativeInference( return load.getFreeMemoryExcludingPerNodeOverhead() - load.getAssignedNativeInferenceMemory(); } - private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(AssignmentPlan assignmentPlan, Settings settings) { + private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(AssignmentPlan assignmentPlan) { TrainedModelAssignmentMetadata.Builder builder = TrainedModelAssignmentMetadata.Builder.empty(); for (AssignmentPlan.Deployment deployment : assignmentPlan.models()) { TrainedModelAssignment existingAssignment = currentMetadata.getDeploymentAssignment(deployment.id()); @@ -343,7 +345,7 @@ private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(Assignme } assignmentBuilder.calculateAndSetAssignmentState(); - explainAssignments(assignmentPlan, nodeLoads, deployment, settings).ifPresent(assignmentBuilder::setReason); + explainAssignments(assignmentPlan, nodeLoads, deployment).ifPresent(assignmentBuilder::setReason); builder.addNewAssignment(deployment.id(), assignmentBuilder); } return builder; @@ -352,8 +354,7 @@ private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(Assignme private Optional explainAssignments( AssignmentPlan assignmentPlan, Map nodeLoads, - AssignmentPlan.Deployment deployment, - Settings settings + AssignmentPlan.Deployment deployment ) { if (assignmentPlan.satisfiesAllocations(deployment)) { return Optional.empty(); @@ -365,7 +366,7 @@ private Optional explainAssignments( Map nodeToReason = new TreeMap<>(); for (Map.Entry nodeAndLoad : nodeLoads.entrySet()) { - Optional reason = explainAssignment(assignmentPlan, nodeAndLoad.getKey(), nodeAndLoad.getValue(), deployment, settings); + Optional reason = explainAssignment(assignmentPlan, nodeAndLoad.getKey(), nodeAndLoad.getValue(), deployment); reason.ifPresent(s -> nodeToReason.put(nodeAndLoad.getKey().getId(), s)); } @@ -384,8 +385,7 @@ private Optional explainAssignment( AssignmentPlan assignmentPlan, DiscoveryNode node, NodeLoad load, - AssignmentPlan.Deployment deployment, - Settings settings + AssignmentPlan.Deployment deployment ) { if (Strings.isNullOrEmpty(load.getError()) == false) { return Optional.of(load.getError()); @@ -398,7 +398,7 @@ private Optional explainAssignment( // But we should also check if we managed to assign a model during the rebalance for which // we check if the node has used up any of its allocated processors. boolean isPerNodeOverheadAccountedFor = load.getNumAssignedJobsAndModels() > 0 - || assignmentPlan.getRemainingNodeCores(load.getNodeId()) < MlProcessors.get(node, settings).roundUp(); + || assignmentPlan.getRemainingNodeCores(load.getNodeId()) < MlProcessors.get(node, allocatedProcessorsScale).roundUp(); long requiredMemory = deployment.memoryBytes() + (isPerNodeOverheadAccountedFor ? 0 : MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes()); @@ -427,7 +427,7 @@ private Optional explainAssignment( "This node has insufficient allocated processors. Available processors [{}], free processors [{}], " + "processors required for each allocation of this model [{}]", new Object[] { - MlProcessors.get(node, settings).roundUp(), + MlProcessors.get(node, allocatedProcessorsScale).roundUp(), assignmentPlan.getRemainingNodeCores(node.getId()), deployment.threadsPerAllocation() } ) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/MlProcessors.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/MlProcessors.java index 1035a3b6f687b..1c45c6da2bcc7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/MlProcessors.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/MlProcessors.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.utils; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.Processors; import org.elasticsearch.xpack.ml.MachineLearning; @@ -16,7 +15,7 @@ public final class MlProcessors { private MlProcessors() {} - public static Processors get(DiscoveryNode node, Settings settings) { + public static Processors get(DiscoveryNode node, Integer allocatedProcessorScale) { // Try getting the most modern setting, and if that's null then instead get the older setting. (If both are null then return zero.) String allocatedProcessorsString = node.getAttributes().get(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR); if (allocatedProcessorsString == null) { @@ -31,12 +30,8 @@ public static Processors get(DiscoveryNode node, Settings settings) { return Processors.ZERO; } - Integer scale = null; - if (settings != null) { - scale = MachineLearning.ALLOCATED_PROCESSORS_SCALE.get(settings); - } - if (scale != null) { - processorsAsDouble = processorsAsDouble / scale; + if (allocatedProcessorScale != null) { + processorsAsDouble = processorsAsDouble / allocatedProcessorScale; } return Processors.of(processorsAsDouble); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderServiceTests.java index 78591f6fa5a8d..ee9389a251ede 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderServiceTests.java @@ -132,6 +132,7 @@ public void setup() { MachineLearning.MAX_OPEN_JOBS_PER_NODE, MachineLearning.USE_AUTO_MACHINE_MEMORY_PERCENT, MachineLearning.MAX_ML_NODE_SIZE, + MachineLearning.ALLOCATED_PROCESSORS_SCALE, AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING ) ); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDeciderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDeciderTests.java index a87fd769ed04a..afad2b76f3d62 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDeciderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDeciderTests.java @@ -1080,7 +1080,7 @@ public void testCpuModelAssignmentRequirements() { ).build() ), withMlNodes("ml_node_1", "ml_node_2"), - Settings.EMPTY + 1 ) ); assertTrue( @@ -1112,7 +1112,7 @@ public void testCpuModelAssignmentRequirements() { ).build() ), withMlNodes("ml_node_1", "ml_node_2"), - Settings.EMPTY + 1 ) ); assertFalse( @@ -1144,7 +1144,7 @@ public void testCpuModelAssignmentRequirements() { ).build() ), withMlNodes("ml_node_1", "ml_node_2", "ml_node_3", "ml_node_4"), - Settings.EMPTY + 1 ) ); } @@ -1240,7 +1240,7 @@ public void testScale_WithNoScaleUpButWaitingJobs() { DeciderContext deciderContext = new DeciderContext(clusterState, autoscalingCapacity); MlAutoscalingContext mlAutoscalingContext = new MlAutoscalingContext(clusterState); - MlMemoryAutoscalingCapacity result = decider.scale(settings, deciderContext, mlAutoscalingContext); + MlMemoryAutoscalingCapacity result = decider.scale(settings, deciderContext, mlAutoscalingContext, 1); assertThat(result.reason(), containsString("but the number in the queue is less than the configured maximum allowed")); assertThat(result.nodeSize(), equalTo(ByteSizeValue.ofGb(1))); assertThat(result.tierSize(), equalTo(ByteSizeValue.ofGb(1))); @@ -1269,7 +1269,7 @@ public void testScale_WithNoMlNodesButWaitingAnalytics() { DeciderContext deciderContext = new DeciderContext(clusterState, AutoscalingCapacity.ZERO); MlAutoscalingContext mlAutoscalingContext = new MlAutoscalingContext(clusterState); - MlMemoryAutoscalingCapacity result = decider.scale(settings, deciderContext, mlAutoscalingContext); + MlMemoryAutoscalingCapacity result = decider.scale(settings, deciderContext, mlAutoscalingContext, 1); assertThat( result.reason(), containsString( diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDeciderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDeciderTests.java index dd769ff759637..7aa3714c6ff2f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDeciderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDeciderTests.java @@ -108,7 +108,8 @@ public void testScale_GivenCurrentCapacityIsUsedExactly() { MlProcessorAutoscalingCapacity capacity = decider.scale( Settings.EMPTY, newContext(clusterState), - new MlAutoscalingContext(clusterState) + new MlAutoscalingContext(clusterState), + 1 ); assertThat(capacity.nodeProcessors(), equalTo(Processors.of(7.8))); @@ -177,7 +178,8 @@ public void testScale_GivenUnsatisfiedDeployments() { MlProcessorAutoscalingCapacity capacity = decider.scale( Settings.EMPTY, newContext(clusterState), - new MlAutoscalingContext(clusterState) + new MlAutoscalingContext(clusterState), + 1 ); assertThat(capacity.nodeProcessors(), equalTo(Processors.of(8.0))); @@ -246,7 +248,8 @@ public void testScale_GivenUnsatisfiedDeploymentIsLowPriority_ShouldNotScaleUp() MlProcessorAutoscalingCapacity capacity = decider.scale( Settings.EMPTY, newContext(clusterState), - new MlAutoscalingContext(clusterState) + new MlAutoscalingContext(clusterState), + 1 ); assertThat(capacity.nodeProcessors(), equalTo(Processors.of(4.0))); @@ -313,7 +316,8 @@ public void testScale_GivenMoreThanHalfProcessorsAreUsed() { MlProcessorAutoscalingCapacity capacity = decider.scale( Settings.EMPTY, newContext(clusterState), - new MlAutoscalingContext(clusterState) + new MlAutoscalingContext(clusterState), + 1 ); assertThat(capacity.nodeProcessors(), equalTo(Processors.of(3.8))); @@ -322,6 +326,16 @@ public void testScale_GivenMoreThanHalfProcessorsAreUsed() { capacity.reason(), equalTo("not scaling down as model assignments require more than half of the ML tier's allocated processors") ); + + // test with allocated processor scaling + capacity = decider.scale(Settings.EMPTY, newContext(clusterState), new MlAutoscalingContext(clusterState), 2); + + assertThat(capacity.nodeProcessors(), equalTo(Processors.of(1.9))); + assertThat(capacity.tierProcessors(), equalTo(Processors.of(3.8))); + assertThat( + capacity.reason(), + equalTo("not scaling down as model assignments require more than half of the ML tier's allocated processors") + ); } public void testScale_GivenDownScalePossible_DelayNotSatisfied() { @@ -384,7 +398,8 @@ public void testScale_GivenDownScalePossible_DelayNotSatisfied() { MlProcessorAutoscalingCapacity capacity = decider.scale( Settings.EMPTY, newContext(clusterState), - new MlAutoscalingContext(clusterState) + new MlAutoscalingContext(clusterState), + 1 ); assertThat(capacity.nodeProcessors(), equalTo(Processors.of(7.9))); @@ -456,7 +471,8 @@ public void testScale_GivenDownScalePossible_DelaySatisfied() { MlProcessorAutoscalingCapacity capacity = decider.scale( Settings.EMPTY, newContext(clusterState), - new MlAutoscalingContext(clusterState) + new MlAutoscalingContext(clusterState), + 1 ); assertThat(capacity.nodeProcessors(), equalTo(Processors.of(2.0))); @@ -528,7 +544,8 @@ public void testScale_GivenLowPriorityDeploymentsOnly() { MlProcessorAutoscalingCapacity capacity = decider.scale( Settings.EMPTY, newContext(clusterState), - new MlAutoscalingContext(clusterState) + new MlAutoscalingContext(clusterState), + 1 ); assertThat(capacity.nodeProcessors(), equalTo(Processors.ZERO)); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterServiceTests.java index 053ad07cd444c..f32c7970001f3 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterServiceTests.java @@ -98,7 +98,8 @@ public void setupObjects() { MachineLearning.USE_AUTO_MACHINE_MEMORY_PERCENT, MachineLearning.MAX_OPEN_JOBS_PER_NODE, MachineLearning.MAX_LAZY_ML_NODES, - MachineLearning.MAX_ML_NODE_SIZE + MachineLearning.MAX_ML_NODE_SIZE, + MachineLearning.ALLOCATED_PROCESSORS_SCALE ) ); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancerTests.java index 32d37bfb7a625..a563f4b9a5a66 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancerTests.java @@ -10,7 +10,6 @@ import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction; @@ -44,8 +43,9 @@ public void testRebalance_GivenNoAssignments() { TrainedModelAssignmentMetadata.Builder.empty().build(), Map.of(), Map.of(), - Optional.empty() - ).rebalance(Settings.EMPTY).build(); + Optional.empty(), + 1 + ).rebalance().build(); assertThat(result.allAssignments().isEmpty(), is(true)); } @@ -73,9 +73,13 @@ public void testRebalance_GivenAllAssignmentsAreSatisfied_ShouldMakeNoChanges() nodeLoads.put(buildNode("node-1", oneGbBytes, 4), NodeLoad.builder("node-1").setMaxMemory(oneGbBytes).build()); nodeLoads.put(buildNode("node-2", oneGbBytes, 4), NodeLoad.builder("node-2").setMaxMemory(oneGbBytes).build()); - TrainedModelAssignmentMetadata result = new TrainedModelAssignmentRebalancer(currentMetadata, nodeLoads, Map.of(), Optional.empty()) - .rebalance(Settings.EMPTY) - .build(); + TrainedModelAssignmentMetadata result = new TrainedModelAssignmentRebalancer( + currentMetadata, + nodeLoads, + Map.of(), + Optional.empty(), + 1 + ).rebalance().build(); assertThat(currentMetadata, equalTo(result)); } @@ -111,8 +115,9 @@ public void testRebalance_GivenAllAssignmentsAreSatisfied_GivenOutdatedRoutingEn currentMetadata, nodeLoads, Map.of(List.of(), List.of(node1, node2)), - Optional.empty() - ).rebalance(Settings.EMPTY).build(); + Optional.empty(), + 1 + ).rebalance().build(); assertThat(result.allAssignments(), is(aMapWithSize(2))); @@ -135,9 +140,7 @@ public void testRebalance_GivenModelToAddAlreadyExists() { .build(); expectThrows( ResourceAlreadyExistsException.class, - () -> new TrainedModelAssignmentRebalancer(currentMetadata, Map.of(), Map.of(), Optional.of(taskParams)).rebalance( - Settings.EMPTY - ) + () -> new TrainedModelAssignmentRebalancer(currentMetadata, Map.of(), Map.of(), Optional.of(taskParams), 1).rebalance() ); } @@ -150,8 +153,9 @@ public void testRebalance_GivenFirstModelToAdd_NoMLNodes() throws Exception { currentMetadata, Map.of(), Map.of(), - Optional.of(taskParams) - ).rebalance(Settings.EMPTY).build(); + Optional.of(taskParams), + 1 + ).rebalance().build(); TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId); assertThat(assignment, is(notNullValue())); @@ -176,8 +180,9 @@ public void testRebalance_GivenFirstModelToAdd_NotEnoughProcessors() throws Exce currentMetadata, nodeLoads, Map.of(List.of(), List.of(node)), - Optional.of(taskParams) - ).rebalance(Settings.EMPTY).build(); + Optional.of(taskParams), + 1 + ).rebalance().build(); TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId); assertThat(assignment, is(notNullValue())); @@ -211,8 +216,9 @@ public void testRebalance_GivenFirstModelToAdd_NotEnoughMemory() throws Exceptio currentMetadata, nodeLoads, Map.of(), - Optional.of(taskParams) - ).rebalance(Settings.EMPTY).build(); + Optional.of(taskParams), + 1 + ).rebalance().build(); TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId); assertThat(assignment, is(notNullValue())); @@ -246,8 +252,9 @@ public void testRebalance_GivenFirstModelToAdd_ErrorDetectingNodeLoad() throws E currentMetadata, nodeLoads, Map.of(), - Optional.of(taskParams) - ).rebalance(Settings.EMPTY).build(); + Optional.of(taskParams), + 1 + ).rebalance().build(); TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId); assertThat(assignment, is(notNullValue())); @@ -281,8 +288,9 @@ public void testRebalance_GivenProblemsOnMultipleNodes() throws Exception { currentMetadata, nodeLoads, Map.of(List.of(), List.of(node1, node2)), - Optional.of(taskParams) - ).rebalance(Settings.EMPTY).build(); + Optional.of(taskParams), + 1 + ).rebalance().build(); TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId); assertThat(assignment, is(notNullValue())); @@ -313,8 +321,9 @@ public void testRebalance_GivenFirstModelToAdd_FitsFully() throws Exception { currentMetadata, nodeLoads, Map.of(List.of(), List.of(node1)), - Optional.of(taskParams) - ).rebalance(Settings.EMPTY).build(); + Optional.of(taskParams), + 1 + ).rebalance().build(); TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId); assertThat(assignment, is(notNullValue())); @@ -351,8 +360,9 @@ public void testRebalance_GivenModelToAdd_AndPreviousAssignments_AndTwoNodes_All currentMetadata, nodeLoads, Map.of(List.of(), List.of(node1, node2)), - Optional.of(taskParams) - ).rebalance(Settings.EMPTY).build(); + Optional.of(taskParams), + 1 + ).rebalance().build(); assertThat(result.allAssignments(), is(aMapWithSize(2))); @@ -414,8 +424,9 @@ public void testRebalance_GivenPreviousAssignments_AndNewNode() throws Exception currentMetadata, nodeLoads, Map.of(List.of(), List.of(node1, node2, node3)), - Optional.empty() - ).rebalance(Settings.EMPTY).build(); + Optional.empty(), + 1 + ).rebalance().build(); assertThat(result.allAssignments(), is(aMapWithSize(2))); @@ -477,8 +488,9 @@ public void testRebalance_GivenPreviousAssignments_AndRemovedNode_AndRemainingNo currentMetadata, nodeLoads, Map.of(List.of(), List.of(node1)), - Optional.empty() - ).rebalance(Settings.EMPTY).build(); + Optional.empty(), + 1 + ).rebalance().build(); assertThat(result.allAssignments(), is(aMapWithSize(2))); @@ -546,8 +558,9 @@ public void testRebalance_GivenPreviousAssignments_AndRemovedNode_AndRemainingNo currentMetadata, nodeLoads, Map.of(List.of(), List.of(node1)), - Optional.empty() - ).rebalance(Settings.EMPTY).build(); + Optional.empty(), + 1 + ).rebalance().build(); assertThat(result.allAssignments(), is(aMapWithSize(2))); @@ -594,8 +607,9 @@ public void testRebalance_GivenFailedAssignment_RestartsAssignment() throws Exce currentMetadata, nodeLoads, Map.of(List.of(), List.of(node1)), - Optional.empty() - ).rebalance(Settings.EMPTY).build(); + Optional.empty(), + 1 + ).rebalance().build(); assertThat(result.allAssignments(), is(aMapWithSize(1))); @@ -627,8 +641,9 @@ public void testRebalance_GivenLowPriorityModelToAdd_OnlyModel_NotEnoughMemory() currentMetadata, nodeLoads, Map.of(), - Optional.of(taskParams) - ).rebalance(Settings.EMPTY).build(); + Optional.of(taskParams), + 1 + ).rebalance().build(); TrainedModelAssignment assignment = result.getDeploymentAssignment(deploymentId); assertThat(assignment, is(notNullValue())); @@ -672,8 +687,9 @@ public void testRebalance_GivenLowPriorityModelToAdd_NotEnoughMemoryNorProcessor currentMetadata, nodeLoads, Map.of(List.of("zone-1"), List.of(node1), List.of("zone-2"), List.of(node2)), - Optional.of(taskParams1) - ).rebalance(Settings.EMPTY).build(); + Optional.of(taskParams1), + 1 + ).rebalance().build(); TrainedModelAssignment assignment = result.getDeploymentAssignment(deployment1); assertThat(assignment, is(notNullValue())); @@ -710,8 +726,9 @@ public void testRebalance_GivenMixedPriorityModels_NotEnoughMemoryForLowPriority currentMetadata, nodeLoads, Map.of(List.of(), List.of(node1)), - Optional.empty() - ).rebalance(Settings.EMPTY).build(); + Optional.empty(), + 1 + ).rebalance().build(); { TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId1); @@ -762,8 +779,9 @@ public void testRebalance_GivenMixedPriorityModels_TwoZones_EachNodeCanHoldOneMo currentMetadata, nodeLoads, Map.of(List.of("zone-1"), List.of(node1), List.of("zone-2"), List.of(node2)), - Optional.empty() - ).rebalance(Settings.EMPTY).build(); + Optional.empty(), + 1 + ).rebalance().build(); List assignedNodes = new ArrayList<>(); @@ -815,8 +833,9 @@ public void testRebalance_GivenModelUsingAllCpu_FittingLowPriorityModelCanStart( currentMetadata, nodeLoads, Map.of(List.of(), List.of(node1)), - Optional.empty() - ).rebalance(Settings.EMPTY).build(); + Optional.empty(), + 1 + ).rebalance().build(); { TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId1); @@ -864,8 +883,9 @@ public void testRebalance_GivenMultipleLowPriorityModels_AndMultipleNodes() thro currentMetadata, nodeLoads, Map.of(List.of(), List.of(node1, node2)), - Optional.empty() - ).rebalance(Settings.EMPTY).build(); + Optional.empty(), + 1 + ).rebalance().build(); { TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId1); @@ -913,8 +933,9 @@ public void testRebalance_GivenNormalPriorityModelToLoad_EvictsLowPriorityModel( currentMetadata, nodeLoads, Map.of(List.of(), List.of(node1)), - Optional.of(taskParams2) - ).rebalance(Settings.EMPTY).build(); + Optional.of(taskParams2), + 1 + ).rebalance().build(); { TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId1); @@ -964,8 +985,9 @@ public void testRebalance_GivenNormalPriorityModelToLoad_AndLowPriorityModelCanS currentMetadata, nodeLoads, Map.of(List.of(), List.of(node1, node2)), - Optional.of(taskParams2) - ).rebalance(Settings.EMPTY).build(); + Optional.of(taskParams2), + 1 + ).rebalance().build(); { TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId1); @@ -1015,8 +1037,9 @@ public void testRebalance_GivenNormalPriorityModelToLoad_AndLowPriorityModelMust currentMetadata, nodeLoads, Map.of(List.of(), List.of(node1, node2)), - Optional.of(taskParams2) - ).rebalance(Settings.EMPTY).build(); + Optional.of(taskParams2), + 1 + ).rebalance().build(); { TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId1); @@ -1056,13 +1079,13 @@ public void testRebalance_GivenFirstModelToAdd_GivenScalingProcessorSetting() { // The deployment wants 4 threads, the node has 4 CPUs but with // the scaling setting(2) that is divided by 2. Now the model // assignment cannot be satisfied. - var settings = Settings.builder().put(MachineLearning.ALLOCATED_PROCESSORS_SCALE.getKey(), 2).build(); TrainedModelAssignmentMetadata result = new TrainedModelAssignmentRebalancer( currentMetadata, nodeLoads, Map.of(List.of(), List.of(node)), - Optional.of(taskParams) - ).rebalance(settings).build(); + Optional.of(taskParams), + 2 + ).rebalance().build(); TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId); assertThat(assignment, is(notNullValue())); @@ -1078,9 +1101,13 @@ public void testRebalance_GivenFirstModelToAdd_GivenScalingProcessorSetting() { ); // Without the scaling factor the assignment is satisfied. - result = new TrainedModelAssignmentRebalancer(currentMetadata, nodeLoads, Map.of(List.of(), List.of(node)), Optional.of(taskParams)) - .rebalance(Settings.EMPTY) - .build(); + result = new TrainedModelAssignmentRebalancer( + currentMetadata, + nodeLoads, + Map.of(List.of(), List.of(node)), + Optional.of(taskParams), + 1 + ).rebalance().build(); assignment = result.getDeploymentAssignment(modelId); assertThat(assignment.getReason().isPresent(), is(false)); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/MlProcessorsTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/MlProcessorsTests.java index fa8c45b03b2b7..2ff3196dc87e9 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/MlProcessorsTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/MlProcessorsTests.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.ml.utils; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.ml.MachineLearning; @@ -20,14 +19,19 @@ public class MlProcessorsTests extends ESTestCase { public void testGet() { var node = DiscoveryNodeUtils.builder("foo").attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "8.0")).build(); - var processor = MlProcessors.get(node, Settings.EMPTY); + var processor = MlProcessors.get(node, 1); assertThat(processor.count(), equalTo(8.0)); } public void testGetWithScale() { var node = DiscoveryNodeUtils.builder("foo").attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "8.0")).build(); - var settings = Settings.builder().put(MachineLearning.ALLOCATED_PROCESSORS_SCALE.getKey(), 2).build(); - var processor = MlProcessors.get(node, settings); + var processor = MlProcessors.get(node, 2); assertThat(processor.count(), equalTo(4.0)); } + + public void testGetWithNull() { + var node = DiscoveryNodeUtils.builder("foo").attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "8.0")).build(); + var processor = MlProcessors.get(node, null); + assertThat(processor.count(), equalTo(8.0)); + } }