diff --git a/docs/changelog/98296.yaml b/docs/changelog/98296.yaml new file mode 100644 index 0000000000000..94c5fb0e8ab4b --- /dev/null +++ b/docs/changelog/98296.yaml @@ -0,0 +1,5 @@ +pr: 98296 +summary: Add setting to scale the processor count used in the model assignment planner +area: Machine Learning +type: enhancement +issues: [] diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 19bca24ba4881..b26106178d3a4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -575,6 +575,27 @@ public void loadExtensions(ExtensionLoader loader) { public static final String PRE_V_8_5_ALLOCATED_PROCESSORS_NODE_ATTR = "ml.allocated_processors"; public static final String ALLOCATED_PROCESSORS_NODE_ATTR = "ml.allocated_processors_double"; + + /** + * For the NLP model assignment planner. + * The {@link #ALLOCATED_PROCESSORS_NODE_ATTR} attribute may be + * measured in hyper-threaded or virtual cores when the user + * would like the planner to consider logical cores. + * + * ALLOCATED_PROCESSORS_NODE_ATTR is divided by this setting, + * the default value of 1 means the attribute is unchanged, a value + * of 2 accounts for hyper-threaded cores with 2 threads per core. + * Increasing this setting above 1 reduces the number of model + * allocations that can be deployed on a node. + */ + public static final Setting ALLOCATED_PROCESSORS_SCALE = Setting.intSetting( + "xpack.ml.allocated_processors_scale", + 1, + 1, + Property.OperatorDynamic, + Property.NodeScope + ); + public static final Setting CONCURRENT_JOB_ALLOCATIONS = Setting.intSetting( "xpack.ml.node_concurrent_job_allocations", 2, @@ -752,6 +773,7 @@ public static boolean isMlNode(DiscoveryNode node) { @Override public List> getSettings() { return List.of( + ALLOCATED_PROCESSORS_SCALE, MachineLearningField.AUTODETECT_PROCESS, PROCESS_CONNECT_TIMEOUT, CONCURRENT_JOB_ALLOCATIONS, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java index 9fc09cf8c4e43..3acec6b6ce39d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java @@ -94,7 +94,10 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider final MlAutoscalingContext mlContext = new MlAutoscalingContext(clusterState); final NativeMemoryCapacity currentNativeMemoryCapacity = memoryDecider.currentScale(mlContext.mlNodes); final MlMemoryAutoscalingCapacity currentMemoryCapacity = memoryDecider.capacityFromNativeMemory(currentNativeMemoryCapacity); - final MlProcessorAutoscalingCapacity currentProcessorCapacity = processorDecider.computeCurrentCapacity(mlContext.mlNodes); + final MlProcessorAutoscalingCapacity currentProcessorCapacity = processorDecider.computeCurrentCapacity( + mlContext.mlNodes, + configuration + ); final MlScalingReason.Builder reasonBuilder = MlScalingReason.builder(mlContext) .setCurrentMlCapacity( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDecider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDecider.java index 5a5c89c985f01..ef0b69178768b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDecider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDecider.java @@ -259,7 +259,7 @@ public MlMemoryAutoscalingCapacity scale(Settings configuration, AutoscalingDeci } // We should keep this check here as well as in the processor decider while cloud is not // reacting to processor autoscaling. - if (modelAssignmentsRequireMoreThanHalfCpu(mlContext.modelAssignments.values(), mlContext.mlNodes)) { + if (modelAssignmentsRequireMoreThanHalfCpu(mlContext.modelAssignments.values(), mlContext.mlNodes, configuration)) { logger.debug("not down-scaling; model assignments require more than half of the ML tier's allocated processors"); return null; } @@ -825,11 +825,15 @@ static MlMemoryAutoscalingCapacity ensureScaleDown( return newCapacity; } - static boolean modelAssignmentsRequireMoreThanHalfCpu(Collection assignments, List mlNodes) { + static boolean modelAssignmentsRequireMoreThanHalfCpu( + Collection assignments, + List mlNodes, + Settings settings + ) { int totalRequiredProcessors = assignments.stream() .mapToInt(t -> t.getTaskParams().getNumberOfAllocations() * t.getTaskParams().getThreadsPerAllocation()) .sum(); - int totalMlProcessors = mlNodes.stream().mapToInt(node -> MlProcessors.get(node).roundUp()).sum(); + int totalMlProcessors = mlNodes.stream().mapToInt(node -> MlProcessors.get(node, settings).roundUp()).sum(); return totalRequiredProcessors * 2 > totalMlProcessors; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDecider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDecider.java index 52be2dedf850d..38ec86273fd86 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDecider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlProcessorAutoscalingDecider.java @@ -52,7 +52,7 @@ public MlProcessorAutoscalingCapacity scale(Settings configuration, AutoscalingD ).build(); } - final MlProcessorAutoscalingCapacity currentCapacity = computeCurrentCapacity(mlContext.mlNodes); + final MlProcessorAutoscalingCapacity currentCapacity = computeCurrentCapacity(mlContext.mlNodes, configuration); final MlProcessorAutoscalingCapacity requiredCapacity = computeRequiredCapacity(trainedModelAssignmentMetadata).build(); @@ -64,7 +64,8 @@ public MlProcessorAutoscalingCapacity scale(Settings configuration, AutoscalingD if (MlMemoryAutoscalingDecider.modelAssignmentsRequireMoreThanHalfCpu( trainedModelAssignmentMetadata.allAssignments().values(), - mlContext.mlNodes + mlContext.mlNodes, + configuration )) { return MlProcessorAutoscalingCapacity.builder(currentCapacity.nodeProcessors(), currentCapacity.tierProcessors()) .setReason("not scaling down as model assignments require more than half of the ML tier's allocated processors") @@ -136,11 +137,11 @@ private MlProcessorAutoscalingCapacity.Builder computeRequiredCapacity(TrainedMo ); } - MlProcessorAutoscalingCapacity computeCurrentCapacity(List mlNodes) { + MlProcessorAutoscalingCapacity computeCurrentCapacity(List mlNodes, Settings settings) { Processors maxNodeProcessors = Processors.ZERO; Processors tierProcessors = Processors.ZERO; for (DiscoveryNode node : mlNodes) { - Processors nodeProcessors = MlProcessors.get(node); + Processors nodeProcessors = MlProcessors.get(node, settings); if (nodeProcessors.compareTo(maxNodeProcessors) > 0) { maxNodeProcessors = nodeProcessors; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterService.java index 6f28570aa7339..41d9718bea9cf 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterService.java @@ -489,7 +489,7 @@ private TrainedModelAssignmentMetadata.Builder rebalanceAssignments( nodeAvailabilityZoneMapper.buildMlNodesByAvailabilityZone(currentState), modelToAdd ); - TrainedModelAssignmentMetadata.Builder rebalanced = rebalancer.rebalance(); + TrainedModelAssignmentMetadata.Builder rebalanced = rebalancer.rebalance(clusterService.getSettings()); if (modelToAdd.isPresent()) { checkModelIsFullyAllocatedIfScalingIsNotPossible(modelToAdd.get().getDeploymentId(), rebalanced, nodes); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancer.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancer.java index fc1a95e95c631..fa8d1ad345759 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancer.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancer.java @@ -13,6 +13,7 @@ import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction; import org.elasticsearch.xpack.core.ml.inference.assignment.Priority; @@ -63,7 +64,7 @@ class TrainedModelAssignmentRebalancer { this.deploymentToAdd = Objects.requireNonNull(deploymentToAdd); } - TrainedModelAssignmentMetadata.Builder rebalance() throws Exception { + TrainedModelAssignmentMetadata.Builder rebalance(Settings settings) { if (deploymentToAdd.isPresent() && currentMetadata.hasDeployment(deploymentToAdd.get().getDeploymentId())) { throw new ResourceAlreadyExistsException( "[{}] assignment for deployment with model [{}] already exists", @@ -77,8 +78,8 @@ TrainedModelAssignmentMetadata.Builder rebalance() throws Exception { return TrainedModelAssignmentMetadata.Builder.fromMetadata(currentMetadata); } - AssignmentPlan assignmentPlan = computeAssignmentPlan(); - return buildAssignmentsFromPlan(assignmentPlan); + AssignmentPlan assignmentPlan = computeAssignmentPlan(settings); + return buildAssignmentsFromPlan(assignmentPlan, settings); } private boolean areAllModelsSatisfiedAndNoOutdatedRoutingEntries() { @@ -91,8 +92,8 @@ private boolean areAllModelsSatisfiedAndNoOutdatedRoutingEntries() { return true; } - AssignmentPlan computeAssignmentPlan() { - final Map, List> nodesByZone = createNodesByZoneMap(); + AssignmentPlan computeAssignmentPlan(Settings settings) { + final Map, List> nodesByZone = createNodesByZoneMap(settings); final Set assignableNodeIds = nodesByZone.values() .stream() .flatMap(List::stream) @@ -270,7 +271,7 @@ private Map findFittingAssignments( return fittingAssignments; } - private Map, List> createNodesByZoneMap() { + private Map, List> createNodesByZoneMap(Settings settings) { return mlNodesByZone.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> { Collection discoveryNodes = e.getValue(); List nodes = new ArrayList<>(); @@ -284,7 +285,7 @@ private Map, List> createNodesByZoneMap() { // We subtract native inference memory as the planner expects available memory for // native inference including current assignments. getNodeFreeMemoryExcludingPerNodeOverheadAndNativeInference(load), - MlProcessors.get(discoveryNode).roundUp() + MlProcessors.get(discoveryNode, settings).roundUp() ) ); } else { @@ -304,7 +305,7 @@ private static long getNodeFreeMemoryExcludingPerNodeOverheadAndNativeInference( return load.getFreeMemoryExcludingPerNodeOverhead() - load.getAssignedNativeInferenceMemory(); } - private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(AssignmentPlan assignmentPlan) { + private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(AssignmentPlan assignmentPlan, Settings settings) { TrainedModelAssignmentMetadata.Builder builder = TrainedModelAssignmentMetadata.Builder.empty(); for (AssignmentPlan.Deployment deployment : assignmentPlan.models()) { TrainedModelAssignment existingAssignment = currentMetadata.getDeploymentAssignment(deployment.id()); @@ -342,7 +343,7 @@ private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(Assignme } assignmentBuilder.calculateAndSetAssignmentState(); - explainAssignments(assignmentPlan, nodeLoads, deployment).ifPresent(assignmentBuilder::setReason); + explainAssignments(assignmentPlan, nodeLoads, deployment, settings).ifPresent(assignmentBuilder::setReason); builder.addNewAssignment(deployment.id(), assignmentBuilder); } return builder; @@ -351,7 +352,8 @@ private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(Assignme private Optional explainAssignments( AssignmentPlan assignmentPlan, Map nodeLoads, - AssignmentPlan.Deployment deployment + AssignmentPlan.Deployment deployment, + Settings settings ) { if (assignmentPlan.satisfiesAllocations(deployment)) { return Optional.empty(); @@ -363,7 +365,7 @@ private Optional explainAssignments( Map nodeToReason = new TreeMap<>(); for (Map.Entry nodeAndLoad : nodeLoads.entrySet()) { - Optional reason = explainAssignment(assignmentPlan, nodeAndLoad.getKey(), nodeAndLoad.getValue(), deployment); + Optional reason = explainAssignment(assignmentPlan, nodeAndLoad.getKey(), nodeAndLoad.getValue(), deployment, settings); reason.ifPresent(s -> nodeToReason.put(nodeAndLoad.getKey().getId(), s)); } @@ -382,7 +384,8 @@ private Optional explainAssignment( AssignmentPlan assignmentPlan, DiscoveryNode node, NodeLoad load, - AssignmentPlan.Deployment deployment + AssignmentPlan.Deployment deployment, + Settings settings ) { if (Strings.isNullOrEmpty(load.getError()) == false) { return Optional.of(load.getError()); @@ -395,7 +398,7 @@ private Optional explainAssignment( // But we should also check if we managed to assign a model during the rebalance for which // we check if the node has used up any of its allocated processors. boolean isPerNodeOverheadAccountedFor = load.getNumAssignedJobsAndModels() > 0 - || assignmentPlan.getRemainingNodeCores(load.getNodeId()) < MlProcessors.get(node).roundUp(); + || assignmentPlan.getRemainingNodeCores(load.getNodeId()) < MlProcessors.get(node, settings).roundUp(); long requiredMemory = deployment.memoryBytes() + (isPerNodeOverheadAccountedFor ? 0 : MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes()); @@ -424,7 +427,7 @@ private Optional explainAssignment( "This node has insufficient allocated processors. Available processors [{}], free processors [{}], " + "processors required for each allocation of this model [{}]", new Object[] { - MlProcessors.get(node).roundUp(), + MlProcessors.get(node, settings).roundUp(), assignmentPlan.getRemainingNodeCores(node.getId()), deployment.threadsPerAllocation() } ) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/MlProcessors.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/MlProcessors.java index 4228e05a95698..3dea4fad0020f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/MlProcessors.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/MlProcessors.java @@ -9,6 +9,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.Processors; import org.elasticsearch.xpack.ml.MachineLearning; @@ -16,7 +17,7 @@ public final class MlProcessors { private MlProcessors() {} - public static Processors get(DiscoveryNode node) { + public static Processors get(DiscoveryNode node, Settings settings) { String allocatedProcessorsString = node.getVersion().onOrAfter(Version.V_8_5_0) ? node.getAttributes().get(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR) : node.getAttributes().get(MachineLearning.PRE_V_8_5_ALLOCATED_PROCESSORS_NODE_ATTR); @@ -25,7 +26,19 @@ public static Processors get(DiscoveryNode node) { } try { double processorsAsDouble = Double.parseDouble(allocatedProcessorsString); - return processorsAsDouble > 0 ? Processors.of(processorsAsDouble) : Processors.ZERO; + if (processorsAsDouble <= 0) { + return Processors.ZERO; + } + + Integer scale = null; + if (settings != null) { + scale = MachineLearning.ALLOCATED_PROCESSORS_SCALE.get(settings); + } + if (scale != null) { + processorsAsDouble = processorsAsDouble / scale; + } + return Processors.of(processorsAsDouble); + } catch (NumberFormatException e) { assert e == null : MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDeciderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDeciderTests.java index cd9ed91b345ac..88a09a02556fd 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDeciderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlMemoryAutoscalingDeciderTests.java @@ -1079,7 +1079,8 @@ public void testCpuModelAssignmentRequirements() { ) ).build() ), - withMlNodes("ml_node_1", "ml_node_2") + withMlNodes("ml_node_1", "ml_node_2"), + Settings.EMPTY ) ); assertTrue( @@ -1110,7 +1111,8 @@ public void testCpuModelAssignmentRequirements() { ) ).build() ), - withMlNodes("ml_node_1", "ml_node_2") + withMlNodes("ml_node_1", "ml_node_2"), + Settings.EMPTY ) ); assertFalse( @@ -1141,7 +1143,8 @@ public void testCpuModelAssignmentRequirements() { ) ).build() ), - withMlNodes("ml_node_1", "ml_node_2", "ml_node_3", "ml_node_4") + withMlNodes("ml_node_1", "ml_node_2", "ml_node_3", "ml_node_4"), + Settings.EMPTY ) ); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancerTests.java index 9919a81947f34..32d37bfb7a625 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentRebalancerTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction; @@ -38,13 +39,13 @@ public class TrainedModelAssignmentRebalancerTests extends ESTestCase { - public void testRebalance_GivenNoAssignments() throws Exception { + public void testRebalance_GivenNoAssignments() { TrainedModelAssignmentMetadata result = new TrainedModelAssignmentRebalancer( TrainedModelAssignmentMetadata.Builder.empty().build(), Map.of(), Map.of(), Optional.empty() - ).rebalance().build(); + ).rebalance(Settings.EMPTY).build(); assertThat(result.allAssignments().isEmpty(), is(true)); } @@ -73,7 +74,7 @@ public void testRebalance_GivenAllAssignmentsAreSatisfied_ShouldMakeNoChanges() nodeLoads.put(buildNode("node-2", oneGbBytes, 4), NodeLoad.builder("node-2").setMaxMemory(oneGbBytes).build()); TrainedModelAssignmentMetadata result = new TrainedModelAssignmentRebalancer(currentMetadata, nodeLoads, Map.of(), Optional.empty()) - .rebalance() + .rebalance(Settings.EMPTY) .build(); assertThat(currentMetadata, equalTo(result)); @@ -111,7 +112,7 @@ public void testRebalance_GivenAllAssignmentsAreSatisfied_GivenOutdatedRoutingEn nodeLoads, Map.of(List.of(), List.of(node1, node2)), Optional.empty() - ).rebalance().build(); + ).rebalance(Settings.EMPTY).build(); assertThat(result.allAssignments(), is(aMapWithSize(2))); @@ -134,7 +135,9 @@ public void testRebalance_GivenModelToAddAlreadyExists() { .build(); expectThrows( ResourceAlreadyExistsException.class, - () -> new TrainedModelAssignmentRebalancer(currentMetadata, Map.of(), Map.of(), Optional.of(taskParams)).rebalance() + () -> new TrainedModelAssignmentRebalancer(currentMetadata, Map.of(), Map.of(), Optional.of(taskParams)).rebalance( + Settings.EMPTY + ) ); } @@ -148,7 +151,7 @@ public void testRebalance_GivenFirstModelToAdd_NoMLNodes() throws Exception { Map.of(), Map.of(), Optional.of(taskParams) - ).rebalance().build(); + ).rebalance(Settings.EMPTY).build(); TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId); assertThat(assignment, is(notNullValue())); @@ -174,7 +177,7 @@ public void testRebalance_GivenFirstModelToAdd_NotEnoughProcessors() throws Exce nodeLoads, Map.of(List.of(), List.of(node)), Optional.of(taskParams) - ).rebalance().build(); + ).rebalance(Settings.EMPTY).build(); TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId); assertThat(assignment, is(notNullValue())); @@ -209,7 +212,7 @@ public void testRebalance_GivenFirstModelToAdd_NotEnoughMemory() throws Exceptio nodeLoads, Map.of(), Optional.of(taskParams) - ).rebalance().build(); + ).rebalance(Settings.EMPTY).build(); TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId); assertThat(assignment, is(notNullValue())); @@ -244,7 +247,7 @@ public void testRebalance_GivenFirstModelToAdd_ErrorDetectingNodeLoad() throws E nodeLoads, Map.of(), Optional.of(taskParams) - ).rebalance().build(); + ).rebalance(Settings.EMPTY).build(); TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId); assertThat(assignment, is(notNullValue())); @@ -279,7 +282,7 @@ public void testRebalance_GivenProblemsOnMultipleNodes() throws Exception { nodeLoads, Map.of(List.of(), List.of(node1, node2)), Optional.of(taskParams) - ).rebalance().build(); + ).rebalance(Settings.EMPTY).build(); TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId); assertThat(assignment, is(notNullValue())); @@ -311,7 +314,7 @@ public void testRebalance_GivenFirstModelToAdd_FitsFully() throws Exception { nodeLoads, Map.of(List.of(), List.of(node1)), Optional.of(taskParams) - ).rebalance().build(); + ).rebalance(Settings.EMPTY).build(); TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId); assertThat(assignment, is(notNullValue())); @@ -349,7 +352,7 @@ public void testRebalance_GivenModelToAdd_AndPreviousAssignments_AndTwoNodes_All nodeLoads, Map.of(List.of(), List.of(node1, node2)), Optional.of(taskParams) - ).rebalance().build(); + ).rebalance(Settings.EMPTY).build(); assertThat(result.allAssignments(), is(aMapWithSize(2))); @@ -412,7 +415,7 @@ public void testRebalance_GivenPreviousAssignments_AndNewNode() throws Exception nodeLoads, Map.of(List.of(), List.of(node1, node2, node3)), Optional.empty() - ).rebalance().build(); + ).rebalance(Settings.EMPTY).build(); assertThat(result.allAssignments(), is(aMapWithSize(2))); @@ -475,7 +478,7 @@ public void testRebalance_GivenPreviousAssignments_AndRemovedNode_AndRemainingNo nodeLoads, Map.of(List.of(), List.of(node1)), Optional.empty() - ).rebalance().build(); + ).rebalance(Settings.EMPTY).build(); assertThat(result.allAssignments(), is(aMapWithSize(2))); @@ -544,7 +547,7 @@ public void testRebalance_GivenPreviousAssignments_AndRemovedNode_AndRemainingNo nodeLoads, Map.of(List.of(), List.of(node1)), Optional.empty() - ).rebalance().build(); + ).rebalance(Settings.EMPTY).build(); assertThat(result.allAssignments(), is(aMapWithSize(2))); @@ -592,7 +595,7 @@ public void testRebalance_GivenFailedAssignment_RestartsAssignment() throws Exce nodeLoads, Map.of(List.of(), List.of(node1)), Optional.empty() - ).rebalance().build(); + ).rebalance(Settings.EMPTY).build(); assertThat(result.allAssignments(), is(aMapWithSize(1))); @@ -625,7 +628,7 @@ public void testRebalance_GivenLowPriorityModelToAdd_OnlyModel_NotEnoughMemory() nodeLoads, Map.of(), Optional.of(taskParams) - ).rebalance().build(); + ).rebalance(Settings.EMPTY).build(); TrainedModelAssignment assignment = result.getDeploymentAssignment(deploymentId); assertThat(assignment, is(notNullValue())); @@ -670,7 +673,7 @@ public void testRebalance_GivenLowPriorityModelToAdd_NotEnoughMemoryNorProcessor nodeLoads, Map.of(List.of("zone-1"), List.of(node1), List.of("zone-2"), List.of(node2)), Optional.of(taskParams1) - ).rebalance().build(); + ).rebalance(Settings.EMPTY).build(); TrainedModelAssignment assignment = result.getDeploymentAssignment(deployment1); assertThat(assignment, is(notNullValue())); @@ -708,7 +711,7 @@ public void testRebalance_GivenMixedPriorityModels_NotEnoughMemoryForLowPriority nodeLoads, Map.of(List.of(), List.of(node1)), Optional.empty() - ).rebalance().build(); + ).rebalance(Settings.EMPTY).build(); { TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId1); @@ -760,7 +763,7 @@ public void testRebalance_GivenMixedPriorityModels_TwoZones_EachNodeCanHoldOneMo nodeLoads, Map.of(List.of("zone-1"), List.of(node1), List.of("zone-2"), List.of(node2)), Optional.empty() - ).rebalance().build(); + ).rebalance(Settings.EMPTY).build(); List assignedNodes = new ArrayList<>(); @@ -813,7 +816,7 @@ public void testRebalance_GivenModelUsingAllCpu_FittingLowPriorityModelCanStart( nodeLoads, Map.of(List.of(), List.of(node1)), Optional.empty() - ).rebalance().build(); + ).rebalance(Settings.EMPTY).build(); { TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId1); @@ -862,7 +865,7 @@ public void testRebalance_GivenMultipleLowPriorityModels_AndMultipleNodes() thro nodeLoads, Map.of(List.of(), List.of(node1, node2)), Optional.empty() - ).rebalance().build(); + ).rebalance(Settings.EMPTY).build(); { TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId1); @@ -911,7 +914,7 @@ public void testRebalance_GivenNormalPriorityModelToLoad_EvictsLowPriorityModel( nodeLoads, Map.of(List.of(), List.of(node1)), Optional.of(taskParams2) - ).rebalance().build(); + ).rebalance(Settings.EMPTY).build(); { TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId1); @@ -962,7 +965,7 @@ public void testRebalance_GivenNormalPriorityModelToLoad_AndLowPriorityModelCanS nodeLoads, Map.of(List.of(), List.of(node1, node2)), Optional.of(taskParams2) - ).rebalance().build(); + ).rebalance(Settings.EMPTY).build(); { TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId1); @@ -1013,7 +1016,7 @@ public void testRebalance_GivenNormalPriorityModelToLoad_AndLowPriorityModelMust nodeLoads, Map.of(List.of(), List.of(node1, node2)), Optional.of(taskParams2) - ).rebalance().build(); + ).rebalance(Settings.EMPTY).build(); { TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId1); @@ -1039,6 +1042,50 @@ public void testRebalance_GivenNormalPriorityModelToLoad_AndLowPriorityModelMust } } + public void testRebalance_GivenFirstModelToAdd_GivenScalingProcessorSetting() { + long nodeMemoryBytes = ByteSizeValue.ofGb(1).getBytes(); + DiscoveryNode node = buildNode("node-1", nodeMemoryBytes, 4); + + String modelId = "model-to-add"; + StartTrainedModelDeploymentAction.TaskParams taskParams = normalPriorityParams(modelId, modelId, 1024L, 1, 4); + TrainedModelAssignmentMetadata currentMetadata = TrainedModelAssignmentMetadata.Builder.empty().build(); + Map nodeLoads = new HashMap<>(); + + nodeLoads.put(node, NodeLoad.builder("node-1").setMaxMemory(nodeMemoryBytes).build()); + + // The deployment wants 4 threads, the node has 4 CPUs but with + // the scaling setting(2) that is divided by 2. Now the model + // assignment cannot be satisfied. + var settings = Settings.builder().put(MachineLearning.ALLOCATED_PROCESSORS_SCALE.getKey(), 2).build(); + TrainedModelAssignmentMetadata result = new TrainedModelAssignmentRebalancer( + currentMetadata, + nodeLoads, + Map.of(List.of(), List.of(node)), + Optional.of(taskParams) + ).rebalance(settings).build(); + + TrainedModelAssignment assignment = result.getDeploymentAssignment(modelId); + assertThat(assignment, is(notNullValue())); + assertThat(assignment.getAssignmentState(), equalTo(AssignmentState.STARTING)); + assertThat(assignment.getNodeRoutingTable(), is(anEmptyMap())); + assertThat(assignment.getReason().isPresent(), is(true)); + assertThat( + assignment.getReason().get(), + equalTo( + "Could not assign (more) allocations on node [node-1]. Reason: This node has insufficient allocated processors. " + + "Available processors [2], free processors [2], processors required for each allocation of this model [4]" + ) + ); + + // Without the scaling factor the assignment is satisfied. + result = new TrainedModelAssignmentRebalancer(currentMetadata, nodeLoads, Map.of(List.of(), List.of(node)), Optional.of(taskParams)) + .rebalance(Settings.EMPTY) + .build(); + + assignment = result.getDeploymentAssignment(modelId); + assertThat(assignment.getReason().isPresent(), is(false)); + } + private static StartTrainedModelDeploymentAction.TaskParams lowPriorityParams(String deploymentId, long modelSize) { return lowPriorityParams(deploymentId, deploymentId, modelSize); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/MlProcessorsTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/MlProcessorsTests.java new file mode 100644 index 0000000000000..fa8c45b03b2b7 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/MlProcessorsTests.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ml.utils; + +import org.elasticsearch.cluster.node.DiscoveryNodeUtils; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.MachineLearning; + +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +public class MlProcessorsTests extends ESTestCase { + + public void testGet() { + var node = DiscoveryNodeUtils.builder("foo").attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "8.0")).build(); + var processor = MlProcessors.get(node, Settings.EMPTY); + assertThat(processor.count(), equalTo(8.0)); + } + + public void testGetWithScale() { + var node = DiscoveryNodeUtils.builder("foo").attributes(Map.of(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, "8.0")).build(); + var settings = Settings.builder().put(MachineLearning.ALLOCATED_PROCESSORS_SCALE.getKey(), 2).build(); + var processor = MlProcessors.get(node, settings); + assertThat(processor.count(), equalTo(4.0)); + } +}