Skip to content

Commit

Permalink
Fix ml autoscaling for zero allocations (#114982) (#114993)
Browse files Browse the repository at this point in the history
* Fix estimated memory usage for a model with zero allocations.

* Ignore number of threads of models with zero allocations in autoscaling decisions.

* Add some long overdue comments.

* Another estimateMemoryUsageBytes fix
  • Loading branch information
jan-elastic committed Oct 18, 2024
1 parent d096b58 commit cbf3dc4
Show file tree
Hide file tree
Showing 4 changed files with 232 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,9 @@ public String getDeploymentId() {
* @return the estimated memory (in bytes) required for the model deployment to run
*/
public long estimateMemoryUsageBytes() {
if (numberOfAllocations == 0) {
return 0;
}
// We already take into account 2x the model bytes. If the cache size is larger than the model bytes, then
// we need to take it into account when returning the estimate.
if (cacheSize != null && cacheSize.getBytes() > modelBytes) {
Expand Down Expand Up @@ -796,6 +799,9 @@ public static long estimateMemoryUsageBytes(
long perAllocationMemoryBytes,
int numberOfAllocations
) {
if (numberOfAllocations == 0) {
return 0;
}
// While loading the model in the process we need twice the model size.

// 1. If ELSER v1 or v2 then 2004MB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@

import static org.elasticsearch.core.Strings.format;

/**
* This handles ML autoscaling just for classic cloud.
* For serverless, see: {@link MlAutoscalingResourceTracker}.
*/
public final class MlAutoscalingDeciderService implements AutoscalingDeciderService, LocalNodeMasterListener {

private static final Logger logger = LogManager.getLogger(MlAutoscalingDeciderService.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
import static org.elasticsearch.xpack.ml.job.JobNodeSelector.AWAITING_LAZY_ASSIGNMENT;

/**
* backend for new kubernetes based autoscaler.
* This handles ML autoscaling just for serverless.
* For classic cloud, see: {@link MlAutoscalingDeciderService}.
*/
public final class MlAutoscalingResourceTracker {
private static final Logger logger = LogManager.getLogger(MlAutoscalingResourceTracker.class);
Expand Down Expand Up @@ -242,72 +243,72 @@ static void getMemoryAndProcessors(
final int numMissingProcessors = numMissingAllocations * numberOfThreadsPerAllocation;
int numExistingProcessorsToBeUsed = Math.min(numMissingProcessors, numberOfAvailableProcessors);

if (numberOfRequestedAllocations == 0) {
continue;
}
if (assignment.getNodeRoutingTable().isEmpty() == false
&& assignment.getNodeRoutingTable().values().stream().allMatch(r -> r.getState().consumesMemory() == false)) {
// Ignore states that don't consume memory, for example all allocations are failed or stopped
// if the node routing table is empty, then it will match the above condition, but it needs to be handled in the next branch
continue;
}

if (assignment.getNodeRoutingTable().isEmpty() == false) {
// if the routing table is non-empty, this is an existing model
existingModelMemoryBytes += estimatedMemoryUsage;
} else {
// only increase memory requirements for new models
extraPerNodeModelMemoryBytes += Math.max(extraPerNodeModelMemoryBytes, estimatedMemoryUsage);
extraModelMemoryInBytes += estimatedMemoryUsage;
}

if (assignment.getNodeRoutingTable().isEmpty() == false) {
// if the routing table is non-empty, this is an existing model
existingModelMemoryBytes += estimatedMemoryUsage;
} else {
// only increase memory requirements for new models
extraPerNodeModelMemoryBytes += Math.max(extraPerNodeModelMemoryBytes, estimatedMemoryUsage);
extraModelMemoryInBytes += estimatedMemoryUsage;
// if not low priority, check processor requirements.
if (Priority.LOW.equals(modelAssignment.getValue().getTaskParams().getPriority()) == false) {
if (numMissingProcessors > numberOfAvailableProcessors) {
// as assignments can be placed on different nodes, we only need numberOfThreadsPerAllocation here
extraProcessors += numMissingProcessors - numExistingProcessorsToBeUsed;
extraPerNodeProcessors = Math.max(extraPerNodeProcessors, 1); // if extra processors >0, we need at least 1
// extraPerNodeProcessors
}

// if not low priority, check processor requirements.
if (Priority.LOW.equals(modelAssignment.getValue().getTaskParams().getPriority()) == false) {
if (numMissingProcessors > numberOfAvailableProcessors) {
// as assignments can be placed on different nodes, we only need numberOfThreadsPerAllocation here
extraProcessors += numMissingProcessors - numExistingProcessorsToBeUsed;
extraPerNodeProcessors = Math.max(extraPerNodeProcessors, 1); // if extra processors >0, we need at least 1
// extraPerNodeProcessors
}
if (perNodeAvailableProcessors < numberOfThreadsPerAllocation) {
extraPerNodeProcessors = Math.max(extraPerNodeProcessors, numberOfThreadsPerAllocation);
}
numberOfAvailableProcessors -= numExistingProcessorsToBeUsed;
if (perNodeAvailableProcessors < numberOfThreadsPerAllocation) {
extraPerNodeProcessors = Math.max(extraPerNodeProcessors, numberOfThreadsPerAllocation);
}
numberOfAvailableProcessors -= numExistingProcessorsToBeUsed;
}

if (extraProcessors > 0 || extraPerNodeProcessors > 0 || extraModelMemoryInBytes > 0 || extraPerNodeModelMemoryBytes > 0) {
logger.info(
() -> format(
"trained model [%s] assigned to [%s], waiting for [%d] allocations to start due to missing hardware",
modelAssignment.getKey(),
Strings.arrayToCommaDelimitedString(modelAssignment.getValue().getStartedNodes()),
numMissingAllocations
)
);
}

if (extraProcessors > 0 || extraPerNodeProcessors > 0 || extraModelMemoryInBytes > 0 || extraPerNodeModelMemoryBytes > 0) {
logger.info(
() -> format(
"trained model [%s] assigned to [%s], waiting for [%d] allocations to start due to missing hardware",
modelAssignment.getKey(),
Strings.arrayToCommaDelimitedString(modelAssignment.getValue().getStartedNodes()),
numMissingAllocations
for (String node : modelAssignment.getValue().getNodeRoutingTable().keySet()) {
sumOfCurrentlyExistingAndUsedProcessors += modelAssignment.getValue().getNodeRoutingTable().get(node).getTargetAllocations()
* numberOfThreadsPerAllocation;

jobRequirementsByNode.computeIfAbsent(node, k -> new ArrayList<>())
.add(
MlJobRequirements.of(
estimatedMemoryUsage,
Priority.LOW.equals(modelAssignment.getValue().getTaskParams().getPriority())
? 0
: modelAssignment.getValue().getNodeRoutingTable().get(node).getTargetAllocations()
* numberOfThreadsPerAllocation
)
);
}

for (String node : modelAssignment.getValue().getNodeRoutingTable().keySet()) {
sumOfCurrentlyExistingAndUsedProcessors += modelAssignment.getValue()
.getNodeRoutingTable()
.get(node)
.getTargetAllocations() * numberOfThreadsPerAllocation;

jobRequirementsByNode.computeIfAbsent(node, k -> new ArrayList<>())
.add(
MlJobRequirements.of(
estimatedMemoryUsage,
Priority.LOW.equals(modelAssignment.getValue().getTaskParams().getPriority())
? 0
: modelAssignment.getValue().getNodeRoutingTable().get(node).getTargetAllocations()
* numberOfThreadsPerAllocation
)
);
}

// min(3, max(number of allocations over all deployed models)
// the minimum number of nodes is equal to the number of allocations, up to 3
// if the number of allocations is greater than 3, then wantedMinNodes is still 3
// in theory this should help availability for 2-3 allocations
// the planner should split over all available nodes
minNodes = Math.min(3, Math.max(minNodes, numberOfRequestedAllocations));
}

// min(3, max(number of allocations over all deployed models)
// the minimum number of nodes is equal to the number of allocations, up to 3
// if the number of allocations is greater than 3, then wantedMinNodes is still 3
// in theory this should help availability for 2-3 allocations
// the planner should split over all available nodes
minNodes = Math.min(3, Math.max(minNodes, numberOfRequestedAllocations));
}

// dummy autoscaling entity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction;
import org.elasticsearch.xpack.core.ml.autoscaling.MlAutoscalingStats;
import org.elasticsearch.xpack.core.ml.inference.assignment.AdaptiveAllocationsSettings;
import org.elasticsearch.xpack.core.ml.inference.assignment.AssignmentState;
import org.elasticsearch.xpack.core.ml.inference.assignment.Priority;
import org.elasticsearch.xpack.core.ml.inference.assignment.RoutingInfo;
Expand Down Expand Up @@ -1800,6 +1801,172 @@ public void testGetMemoryAndProcessorsScaleDownNotPreventedByDummyEntityAsMemory
);
}

public void testGetMemoryAndProcessorsScaleDownForModelWithZeroAllocations() throws InterruptedException {
long memory = 1000000000;
Map<String, String> nodeAttr = Map.of(
MachineLearning.MACHINE_MEMORY_NODE_ATTR,
Long.toString(memory),
MachineLearning.MAX_JVM_SIZE_NODE_ATTR,
"400000000",
MachineLearning.ML_CONFIG_VERSION_NODE_ATTR,
"7.2.0",
MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR,
"2.0"
);

MlAutoscalingContext mlAutoscalingContext = new MlAutoscalingContext(
List.of(),
List.of(),
List.of(),
Map.of(
"model-with-zero-allocations",
TrainedModelAssignment.Builder.empty(
new StartTrainedModelDeploymentAction.TaskParams(
"model-with-zero-allocations",
"model-with-zero-allocations-deployment",
400,
0,
2,
100,
null,
Priority.NORMAL,
0L,
0L
),
new AdaptiveAllocationsSettings(true, 0, 4)
).build()
),
List.of(
DiscoveryNodeUtils.builder("ml-node-1")
.name("ml-node-name-1")
.address(new TransportAddress(InetAddress.getLoopbackAddress(), 9300))
.attributes(nodeAttr)
.roles(Set.of(DiscoveryNodeRole.ML_ROLE))
.build()
),
PersistentTasksCustomMetadata.builder().build()
);
MlMemoryTracker mockTracker = mock(MlMemoryTracker.class);

this.<MlAutoscalingStats>assertAsync(
listener -> MlAutoscalingResourceTracker.getMemoryAndProcessors(
mlAutoscalingContext,
mockTracker,
Map.of("ml-node-1", memory),
600000000,
2,
MachineLearning.DEFAULT_MAX_OPEN_JOBS_PER_NODE,
MlDummyAutoscalingEntity.of(0, 0),
1,
listener
),
stats -> {
assertEquals(memory, stats.currentPerNodeMemoryBytes());
assertEquals(0, stats.currentTotalModelMemoryBytes());
assertEquals(0, stats.currentTotalProcessorsInUse());
assertEquals(1, stats.currentTotalNodes());
assertEquals(0, stats.wantedMinNodes());
assertEquals(0, stats.wantedExtraPerNodeNodeProcessors());
assertEquals(0, stats.wantedExtraProcessors());
assertEquals(0, stats.wantedExtraModelMemoryBytes());
assertEquals(0, stats.wantedExtraPerNodeMemoryBytes());
assertEquals(memory, stats.unwantedNodeMemoryBytesToRemove());
assertEquals(MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes(), stats.currentPerNodeMemoryOverheadBytes());
}
);
}

public void testGetMemoryAndProcessorsIgnoreThreadsOfModelWithZeroAllocations() throws InterruptedException {
long memory = 1000000000;
Map<String, String> nodeAttr = Map.of(
MachineLearning.MACHINE_MEMORY_NODE_ATTR,
Long.toString(memory),
MachineLearning.MAX_JVM_SIZE_NODE_ATTR,
"400000000",
MachineLearning.ML_CONFIG_VERSION_NODE_ATTR,
"7.2.0",
MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR,
"2.0"
);

MlAutoscalingContext mlAutoscalingContext = new MlAutoscalingContext(
List.of(),
List.of(),
List.of(),
Map.of(
"model-with-one-allocation",
TrainedModelAssignment.Builder.empty(
new StartTrainedModelDeploymentAction.TaskParams(
"model-with-one-allocation",
"model-with-one-allocation-deployment",
400,
1,
2,
100,
null,
Priority.NORMAL,
0L,
0L
),
null
).addRoutingEntry("ml-node-1", new RoutingInfo(1, 1, RoutingState.STARTED, "")).build(),
"model-with-zero-allocations",
TrainedModelAssignment.Builder.empty(
new StartTrainedModelDeploymentAction.TaskParams(
"model-with-zero-allocations",
"model-with-zero-allocations-deployment",
400,
0,
4,
100,
null,
Priority.NORMAL,
0L,
0L
),
new AdaptiveAllocationsSettings(true, 0, 4)
).build()
),
List.of(
DiscoveryNodeUtils.builder("ml-node-1")
.name("ml-node-name-1")
.address(new TransportAddress(InetAddress.getLoopbackAddress(), 9300))
.attributes(nodeAttr)
.roles(Set.of(DiscoveryNodeRole.ML_ROLE))
.build()
),
PersistentTasksCustomMetadata.builder().build()
);
MlMemoryTracker mockTracker = mock(MlMemoryTracker.class);

this.<MlAutoscalingStats>assertAsync(
listener -> MlAutoscalingResourceTracker.getMemoryAndProcessors(
mlAutoscalingContext,
mockTracker,
Map.of("ml-node-1", memory),
600000000,
2,
MachineLearning.DEFAULT_MAX_OPEN_JOBS_PER_NODE,
MlDummyAutoscalingEntity.of(0, 0),
1,
listener
),
stats -> {
assertEquals(memory, stats.currentPerNodeMemoryBytes());
assertEquals(251659040, stats.currentTotalModelMemoryBytes());
assertEquals(2, stats.currentTotalProcessorsInUse());
assertEquals(1, stats.currentTotalNodes());
assertEquals(1, stats.wantedMinNodes());
assertEquals(0, stats.wantedExtraPerNodeNodeProcessors());
assertEquals(0, stats.wantedExtraProcessors());
assertEquals(0, stats.wantedExtraModelMemoryBytes());
assertEquals(0, stats.wantedExtraPerNodeMemoryBytes());
assertEquals(0, stats.unwantedNodeMemoryBytesToRemove());
assertEquals(MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes(), stats.currentPerNodeMemoryOverheadBytes());
}
);
}

private <T> void assertAsync(Consumer<ActionListener<T>> function, Consumer<T> furtherTests) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean listenerCalled = new AtomicBoolean(false);
Expand Down

0 comments on commit cbf3dc4

Please sign in to comment.