From cbf3dc48a5ae4912d7a73895535b2e3cb607df65 Mon Sep 17 00:00:00 2001 From: Jan Kuipers <148754765+jan-elastic@users.noreply.github.com> Date: Thu, 17 Oct 2024 15:10:05 +0200 Subject: [PATCH] Fix ml autoscaling for zero allocations (#114982) (#114993) * Fix estimated memory usage for a model with zero allocations. * Ignore number of threads of models with zero allocations in autoscaling decisions. * Add some long overdue comments. * Another estimateMemoryUsageBytes fix --- .../StartTrainedModelDeploymentAction.java | 6 + .../MlAutoscalingDeciderService.java | 4 + .../MlAutoscalingResourceTracker.java | 109 ++++++------ .../MlAutoscalingResourceTrackerTests.java | 167 ++++++++++++++++++ 4 files changed, 232 insertions(+), 54 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartTrainedModelDeploymentAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartTrainedModelDeploymentAction.java index 34ebdcb7f9f9f..ca789fee7b744 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartTrainedModelDeploymentAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/StartTrainedModelDeploymentAction.java @@ -623,6 +623,9 @@ public String getDeploymentId() { * @return the estimated memory (in bytes) required for the model deployment to run */ public long estimateMemoryUsageBytes() { + if (numberOfAllocations == 0) { + return 0; + } // We already take into account 2x the model bytes. If the cache size is larger than the model bytes, then // we need to take it into account when returning the estimate. if (cacheSize != null && cacheSize.getBytes() > modelBytes) { @@ -796,6 +799,9 @@ public static long estimateMemoryUsageBytes( long perAllocationMemoryBytes, int numberOfAllocations ) { + if (numberOfAllocations == 0) { + return 0; + } // While loading the model in the process we need twice the model size. // 1. If ELSER v1 or v2 then 2004MB diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java index 18d974473251b..fee3d729f8dfd 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java @@ -30,6 +30,10 @@ import static org.elasticsearch.core.Strings.format; +/** + * This handles ML autoscaling just for classic cloud. + * For serverless, see: {@link MlAutoscalingResourceTracker}. + */ public final class MlAutoscalingDeciderService implements AutoscalingDeciderService, LocalNodeMasterListener { private static final Logger logger = LogManager.getLogger(MlAutoscalingDeciderService.class); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingResourceTracker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingResourceTracker.java index 9a9fbfa0340a9..6f14e2649a394 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingResourceTracker.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingResourceTracker.java @@ -48,7 +48,8 @@ import static org.elasticsearch.xpack.ml.job.JobNodeSelector.AWAITING_LAZY_ASSIGNMENT; /** - * backend for new kubernetes based autoscaler. + * This handles ML autoscaling just for serverless. + * For classic cloud, see: {@link MlAutoscalingDeciderService}. */ public final class MlAutoscalingResourceTracker { private static final Logger logger = LogManager.getLogger(MlAutoscalingResourceTracker.class); @@ -242,72 +243,72 @@ static void getMemoryAndProcessors( final int numMissingProcessors = numMissingAllocations * numberOfThreadsPerAllocation; int numExistingProcessorsToBeUsed = Math.min(numMissingProcessors, numberOfAvailableProcessors); + if (numberOfRequestedAllocations == 0) { + continue; + } if (assignment.getNodeRoutingTable().isEmpty() == false && assignment.getNodeRoutingTable().values().stream().allMatch(r -> r.getState().consumesMemory() == false)) { // Ignore states that don't consume memory, for example all allocations are failed or stopped // if the node routing table is empty, then it will match the above condition, but it needs to be handled in the next branch continue; + } + + if (assignment.getNodeRoutingTable().isEmpty() == false) { + // if the routing table is non-empty, this is an existing model + existingModelMemoryBytes += estimatedMemoryUsage; } else { + // only increase memory requirements for new models + extraPerNodeModelMemoryBytes += Math.max(extraPerNodeModelMemoryBytes, estimatedMemoryUsage); + extraModelMemoryInBytes += estimatedMemoryUsage; + } - if (assignment.getNodeRoutingTable().isEmpty() == false) { - // if the routing table is non-empty, this is an existing model - existingModelMemoryBytes += estimatedMemoryUsage; - } else { - // only increase memory requirements for new models - extraPerNodeModelMemoryBytes += Math.max(extraPerNodeModelMemoryBytes, estimatedMemoryUsage); - extraModelMemoryInBytes += estimatedMemoryUsage; + // if not low priority, check processor requirements. + if (Priority.LOW.equals(modelAssignment.getValue().getTaskParams().getPriority()) == false) { + if (numMissingProcessors > numberOfAvailableProcessors) { + // as assignments can be placed on different nodes, we only need numberOfThreadsPerAllocation here + extraProcessors += numMissingProcessors - numExistingProcessorsToBeUsed; + extraPerNodeProcessors = Math.max(extraPerNodeProcessors, 1); // if extra processors >0, we need at least 1 + // extraPerNodeProcessors } - - // if not low priority, check processor requirements. - if (Priority.LOW.equals(modelAssignment.getValue().getTaskParams().getPriority()) == false) { - if (numMissingProcessors > numberOfAvailableProcessors) { - // as assignments can be placed on different nodes, we only need numberOfThreadsPerAllocation here - extraProcessors += numMissingProcessors - numExistingProcessorsToBeUsed; - extraPerNodeProcessors = Math.max(extraPerNodeProcessors, 1); // if extra processors >0, we need at least 1 - // extraPerNodeProcessors - } - if (perNodeAvailableProcessors < numberOfThreadsPerAllocation) { - extraPerNodeProcessors = Math.max(extraPerNodeProcessors, numberOfThreadsPerAllocation); - } - numberOfAvailableProcessors -= numExistingProcessorsToBeUsed; + if (perNodeAvailableProcessors < numberOfThreadsPerAllocation) { + extraPerNodeProcessors = Math.max(extraPerNodeProcessors, numberOfThreadsPerAllocation); } + numberOfAvailableProcessors -= numExistingProcessorsToBeUsed; + } + + if (extraProcessors > 0 || extraPerNodeProcessors > 0 || extraModelMemoryInBytes > 0 || extraPerNodeModelMemoryBytes > 0) { + logger.info( + () -> format( + "trained model [%s] assigned to [%s], waiting for [%d] allocations to start due to missing hardware", + modelAssignment.getKey(), + Strings.arrayToCommaDelimitedString(modelAssignment.getValue().getStartedNodes()), + numMissingAllocations + ) + ); + } - if (extraProcessors > 0 || extraPerNodeProcessors > 0 || extraModelMemoryInBytes > 0 || extraPerNodeModelMemoryBytes > 0) { - logger.info( - () -> format( - "trained model [%s] assigned to [%s], waiting for [%d] allocations to start due to missing hardware", - modelAssignment.getKey(), - Strings.arrayToCommaDelimitedString(modelAssignment.getValue().getStartedNodes()), - numMissingAllocations + for (String node : modelAssignment.getValue().getNodeRoutingTable().keySet()) { + sumOfCurrentlyExistingAndUsedProcessors += modelAssignment.getValue().getNodeRoutingTable().get(node).getTargetAllocations() + * numberOfThreadsPerAllocation; + + jobRequirementsByNode.computeIfAbsent(node, k -> new ArrayList<>()) + .add( + MlJobRequirements.of( + estimatedMemoryUsage, + Priority.LOW.equals(modelAssignment.getValue().getTaskParams().getPriority()) + ? 0 + : modelAssignment.getValue().getNodeRoutingTable().get(node).getTargetAllocations() + * numberOfThreadsPerAllocation ) ); - } - - for (String node : modelAssignment.getValue().getNodeRoutingTable().keySet()) { - sumOfCurrentlyExistingAndUsedProcessors += modelAssignment.getValue() - .getNodeRoutingTable() - .get(node) - .getTargetAllocations() * numberOfThreadsPerAllocation; - - jobRequirementsByNode.computeIfAbsent(node, k -> new ArrayList<>()) - .add( - MlJobRequirements.of( - estimatedMemoryUsage, - Priority.LOW.equals(modelAssignment.getValue().getTaskParams().getPriority()) - ? 0 - : modelAssignment.getValue().getNodeRoutingTable().get(node).getTargetAllocations() - * numberOfThreadsPerAllocation - ) - ); - } - - // min(3, max(number of allocations over all deployed models) - // the minimum number of nodes is equal to the number of allocations, up to 3 - // if the number of allocations is greater than 3, then wantedMinNodes is still 3 - // in theory this should help availability for 2-3 allocations - // the planner should split over all available nodes - minNodes = Math.min(3, Math.max(minNodes, numberOfRequestedAllocations)); } + + // min(3, max(number of allocations over all deployed models) + // the minimum number of nodes is equal to the number of allocations, up to 3 + // if the number of allocations is greater than 3, then wantedMinNodes is still 3 + // in theory this should help availability for 2-3 allocations + // the planner should split over all available nodes + minNodes = Math.min(3, Math.max(minNodes, numberOfRequestedAllocations)); } // dummy autoscaling entity diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingResourceTrackerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingResourceTrackerTests.java index 3674dda3934bd..729bb708cc46f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingResourceTrackerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingResourceTrackerTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction; import org.elasticsearch.xpack.core.ml.autoscaling.MlAutoscalingStats; +import org.elasticsearch.xpack.core.ml.inference.assignment.AdaptiveAllocationsSettings; import org.elasticsearch.xpack.core.ml.inference.assignment.AssignmentState; import org.elasticsearch.xpack.core.ml.inference.assignment.Priority; import org.elasticsearch.xpack.core.ml.inference.assignment.RoutingInfo; @@ -1800,6 +1801,172 @@ public void testGetMemoryAndProcessorsScaleDownNotPreventedByDummyEntityAsMemory ); } + public void testGetMemoryAndProcessorsScaleDownForModelWithZeroAllocations() throws InterruptedException { + long memory = 1000000000; + Map nodeAttr = Map.of( + MachineLearning.MACHINE_MEMORY_NODE_ATTR, + Long.toString(memory), + MachineLearning.MAX_JVM_SIZE_NODE_ATTR, + "400000000", + MachineLearning.ML_CONFIG_VERSION_NODE_ATTR, + "7.2.0", + MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, + "2.0" + ); + + MlAutoscalingContext mlAutoscalingContext = new MlAutoscalingContext( + List.of(), + List.of(), + List.of(), + Map.of( + "model-with-zero-allocations", + TrainedModelAssignment.Builder.empty( + new StartTrainedModelDeploymentAction.TaskParams( + "model-with-zero-allocations", + "model-with-zero-allocations-deployment", + 400, + 0, + 2, + 100, + null, + Priority.NORMAL, + 0L, + 0L + ), + new AdaptiveAllocationsSettings(true, 0, 4) + ).build() + ), + List.of( + DiscoveryNodeUtils.builder("ml-node-1") + .name("ml-node-name-1") + .address(new TransportAddress(InetAddress.getLoopbackAddress(), 9300)) + .attributes(nodeAttr) + .roles(Set.of(DiscoveryNodeRole.ML_ROLE)) + .build() + ), + PersistentTasksCustomMetadata.builder().build() + ); + MlMemoryTracker mockTracker = mock(MlMemoryTracker.class); + + this.assertAsync( + listener -> MlAutoscalingResourceTracker.getMemoryAndProcessors( + mlAutoscalingContext, + mockTracker, + Map.of("ml-node-1", memory), + 600000000, + 2, + MachineLearning.DEFAULT_MAX_OPEN_JOBS_PER_NODE, + MlDummyAutoscalingEntity.of(0, 0), + 1, + listener + ), + stats -> { + assertEquals(memory, stats.currentPerNodeMemoryBytes()); + assertEquals(0, stats.currentTotalModelMemoryBytes()); + assertEquals(0, stats.currentTotalProcessorsInUse()); + assertEquals(1, stats.currentTotalNodes()); + assertEquals(0, stats.wantedMinNodes()); + assertEquals(0, stats.wantedExtraPerNodeNodeProcessors()); + assertEquals(0, stats.wantedExtraProcessors()); + assertEquals(0, stats.wantedExtraModelMemoryBytes()); + assertEquals(0, stats.wantedExtraPerNodeMemoryBytes()); + assertEquals(memory, stats.unwantedNodeMemoryBytesToRemove()); + assertEquals(MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes(), stats.currentPerNodeMemoryOverheadBytes()); + } + ); + } + + public void testGetMemoryAndProcessorsIgnoreThreadsOfModelWithZeroAllocations() throws InterruptedException { + long memory = 1000000000; + Map nodeAttr = Map.of( + MachineLearning.MACHINE_MEMORY_NODE_ATTR, + Long.toString(memory), + MachineLearning.MAX_JVM_SIZE_NODE_ATTR, + "400000000", + MachineLearning.ML_CONFIG_VERSION_NODE_ATTR, + "7.2.0", + MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR, + "2.0" + ); + + MlAutoscalingContext mlAutoscalingContext = new MlAutoscalingContext( + List.of(), + List.of(), + List.of(), + Map.of( + "model-with-one-allocation", + TrainedModelAssignment.Builder.empty( + new StartTrainedModelDeploymentAction.TaskParams( + "model-with-one-allocation", + "model-with-one-allocation-deployment", + 400, + 1, + 2, + 100, + null, + Priority.NORMAL, + 0L, + 0L + ), + null + ).addRoutingEntry("ml-node-1", new RoutingInfo(1, 1, RoutingState.STARTED, "")).build(), + "model-with-zero-allocations", + TrainedModelAssignment.Builder.empty( + new StartTrainedModelDeploymentAction.TaskParams( + "model-with-zero-allocations", + "model-with-zero-allocations-deployment", + 400, + 0, + 4, + 100, + null, + Priority.NORMAL, + 0L, + 0L + ), + new AdaptiveAllocationsSettings(true, 0, 4) + ).build() + ), + List.of( + DiscoveryNodeUtils.builder("ml-node-1") + .name("ml-node-name-1") + .address(new TransportAddress(InetAddress.getLoopbackAddress(), 9300)) + .attributes(nodeAttr) + .roles(Set.of(DiscoveryNodeRole.ML_ROLE)) + .build() + ), + PersistentTasksCustomMetadata.builder().build() + ); + MlMemoryTracker mockTracker = mock(MlMemoryTracker.class); + + this.assertAsync( + listener -> MlAutoscalingResourceTracker.getMemoryAndProcessors( + mlAutoscalingContext, + mockTracker, + Map.of("ml-node-1", memory), + 600000000, + 2, + MachineLearning.DEFAULT_MAX_OPEN_JOBS_PER_NODE, + MlDummyAutoscalingEntity.of(0, 0), + 1, + listener + ), + stats -> { + assertEquals(memory, stats.currentPerNodeMemoryBytes()); + assertEquals(251659040, stats.currentTotalModelMemoryBytes()); + assertEquals(2, stats.currentTotalProcessorsInUse()); + assertEquals(1, stats.currentTotalNodes()); + assertEquals(1, stats.wantedMinNodes()); + assertEquals(0, stats.wantedExtraPerNodeNodeProcessors()); + assertEquals(0, stats.wantedExtraProcessors()); + assertEquals(0, stats.wantedExtraModelMemoryBytes()); + assertEquals(0, stats.wantedExtraPerNodeMemoryBytes()); + assertEquals(0, stats.unwantedNodeMemoryBytesToRemove()); + assertEquals(MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes(), stats.currentPerNodeMemoryOverheadBytes()); + } + ); + } + private void assertAsync(Consumer> function, Consumer furtherTests) throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); AtomicBoolean listenerCalled = new AtomicBoolean(false);