Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ml autoscaling for zero allocations #114982

Merged
merged 4 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,9 @@ public String getDeploymentId() {
* @return the estimated memory (in bytes) required for the model deployment to run
*/
public long estimateMemoryUsageBytes() {
if (numberOfAllocations == 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is on TaskParams - StartTrainedModelDeploymentAction.TaskParams. estimateMemoryUsageBytes()

There is another public method StartTrainedModelDeploymentAction.estimateMemoryUsageBytes() on line 792 that needs this check.

If StartTrainedModelDeploymentAction.estimateMemoryUsageBytes() can return - then line 635 + (cacheSize.getBytes() - modelBytes); needs a Max.(0,...) to ensure the return is non-negative

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed StartTrainedModelDeploymentAction.estimateMemoryUsageBytes

I don't think the Max is necessary. StartTrainedModelDeploymentAction.estimateMemoryUsageBytes returns 0 only if the number of allocations is 0, in which case the TaskParams.estimateMemoryUsageBytes already returns 0.

return 0;
}
// We already take into account 2x the model bytes. If the cache size is larger than the model bytes, then
// we need to take it into account when returning the estimate.
if (cacheSize != null && cacheSize.getBytes() > modelBytes) {
Expand Down Expand Up @@ -796,6 +799,9 @@ public static long estimateMemoryUsageBytes(
long perAllocationMemoryBytes,
int numberOfAllocations
) {
if (numberOfAllocations == 0) {
return 0;
}
// While loading the model in the process we need twice the model size.

// 1. If ELSER v1 or v2 then 2004MB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@

import static org.elasticsearch.core.Strings.format;

/**
* This handles ML autoscaling just for classic cloud.
* For serverless, see: {@link MlAutoscalingResourceTracker}.
*/
public final class MlAutoscalingDeciderService implements AutoscalingDeciderService, LocalNodeMasterListener {

private static final Logger logger = LogManager.getLogger(MlAutoscalingDeciderService.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
import static org.elasticsearch.xpack.ml.job.JobNodeSelector.AWAITING_LAZY_ASSIGNMENT;

/**
* backend for new kubernetes based autoscaler.
* This handles ML autoscaling just for serverless.
* For classic cloud, see: {@link MlAutoscalingDeciderService}.
*/
public final class MlAutoscalingResourceTracker {
private static final Logger logger = LogManager.getLogger(MlAutoscalingResourceTracker.class);
Expand Down Expand Up @@ -242,72 +243,72 @@ static void getMemoryAndProcessors(
final int numMissingProcessors = numMissingAllocations * numberOfThreadsPerAllocation;
int numExistingProcessorsToBeUsed = Math.min(numMissingProcessors, numberOfAvailableProcessors);

if (numberOfRequestedAllocations == 0) {
continue;
}
if (assignment.getNodeRoutingTable().isEmpty() == false
&& assignment.getNodeRoutingTable().values().stream().allMatch(r -> r.getState().consumesMemory() == false)) {
// Ignore states that don't consume memory, for example all allocations are failed or stopped
// if the node routing table is empty, then it will match the above condition, but it needs to be handled in the next branch
continue;
}

if (assignment.getNodeRoutingTable().isEmpty() == false) {
Copy link
Contributor Author

@jan-elastic jan-elastic Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything down here in this file is just indentation.

If you want to review it, I'd recommend settings -> hide whitespace

// if the routing table is non-empty, this is an existing model
existingModelMemoryBytes += estimatedMemoryUsage;
} else {
// only increase memory requirements for new models
extraPerNodeModelMemoryBytes += Math.max(extraPerNodeModelMemoryBytes, estimatedMemoryUsage);
extraModelMemoryInBytes += estimatedMemoryUsage;
}

if (assignment.getNodeRoutingTable().isEmpty() == false) {
// if the routing table is non-empty, this is an existing model
existingModelMemoryBytes += estimatedMemoryUsage;
} else {
// only increase memory requirements for new models
extraPerNodeModelMemoryBytes += Math.max(extraPerNodeModelMemoryBytes, estimatedMemoryUsage);
extraModelMemoryInBytes += estimatedMemoryUsage;
// if not low priority, check processor requirements.
if (Priority.LOW.equals(modelAssignment.getValue().getTaskParams().getPriority()) == false) {
if (numMissingProcessors > numberOfAvailableProcessors) {
// as assignments can be placed on different nodes, we only need numberOfThreadsPerAllocation here
extraProcessors += numMissingProcessors - numExistingProcessorsToBeUsed;
extraPerNodeProcessors = Math.max(extraPerNodeProcessors, 1); // if extra processors >0, we need at least 1
// extraPerNodeProcessors
}

// if not low priority, check processor requirements.
if (Priority.LOW.equals(modelAssignment.getValue().getTaskParams().getPriority()) == false) {
if (numMissingProcessors > numberOfAvailableProcessors) {
// as assignments can be placed on different nodes, we only need numberOfThreadsPerAllocation here
extraProcessors += numMissingProcessors - numExistingProcessorsToBeUsed;
extraPerNodeProcessors = Math.max(extraPerNodeProcessors, 1); // if extra processors >0, we need at least 1
// extraPerNodeProcessors
}
if (perNodeAvailableProcessors < numberOfThreadsPerAllocation) {
extraPerNodeProcessors = Math.max(extraPerNodeProcessors, numberOfThreadsPerAllocation);
}
numberOfAvailableProcessors -= numExistingProcessorsToBeUsed;
if (perNodeAvailableProcessors < numberOfThreadsPerAllocation) {
extraPerNodeProcessors = Math.max(extraPerNodeProcessors, numberOfThreadsPerAllocation);
}
numberOfAvailableProcessors -= numExistingProcessorsToBeUsed;
}

if (extraProcessors > 0 || extraPerNodeProcessors > 0 || extraModelMemoryInBytes > 0 || extraPerNodeModelMemoryBytes > 0) {
logger.info(
() -> format(
"trained model [%s] assigned to [%s], waiting for [%d] allocations to start due to missing hardware",
modelAssignment.getKey(),
Strings.arrayToCommaDelimitedString(modelAssignment.getValue().getStartedNodes()),
numMissingAllocations
)
);
}

if (extraProcessors > 0 || extraPerNodeProcessors > 0 || extraModelMemoryInBytes > 0 || extraPerNodeModelMemoryBytes > 0) {
logger.info(
() -> format(
"trained model [%s] assigned to [%s], waiting for [%d] allocations to start due to missing hardware",
modelAssignment.getKey(),
Strings.arrayToCommaDelimitedString(modelAssignment.getValue().getStartedNodes()),
numMissingAllocations
for (String node : modelAssignment.getValue().getNodeRoutingTable().keySet()) {
sumOfCurrentlyExistingAndUsedProcessors += modelAssignment.getValue().getNodeRoutingTable().get(node).getTargetAllocations()
* numberOfThreadsPerAllocation;

jobRequirementsByNode.computeIfAbsent(node, k -> new ArrayList<>())
.add(
MlJobRequirements.of(
estimatedMemoryUsage,
Priority.LOW.equals(modelAssignment.getValue().getTaskParams().getPriority())
? 0
: modelAssignment.getValue().getNodeRoutingTable().get(node).getTargetAllocations()
* numberOfThreadsPerAllocation
)
);
}

for (String node : modelAssignment.getValue().getNodeRoutingTable().keySet()) {
sumOfCurrentlyExistingAndUsedProcessors += modelAssignment.getValue()
.getNodeRoutingTable()
.get(node)
.getTargetAllocations() * numberOfThreadsPerAllocation;

jobRequirementsByNode.computeIfAbsent(node, k -> new ArrayList<>())
.add(
MlJobRequirements.of(
estimatedMemoryUsage,
Priority.LOW.equals(modelAssignment.getValue().getTaskParams().getPriority())
? 0
: modelAssignment.getValue().getNodeRoutingTable().get(node).getTargetAllocations()
* numberOfThreadsPerAllocation
)
);
}

// min(3, max(number of allocations over all deployed models)
// the minimum number of nodes is equal to the number of allocations, up to 3
// if the number of allocations is greater than 3, then wantedMinNodes is still 3
// in theory this should help availability for 2-3 allocations
// the planner should split over all available nodes
minNodes = Math.min(3, Math.max(minNodes, numberOfRequestedAllocations));
}

// min(3, max(number of allocations over all deployed models)
// the minimum number of nodes is equal to the number of allocations, up to 3
// if the number of allocations is greater than 3, then wantedMinNodes is still 3
// in theory this should help availability for 2-3 allocations
// the planner should split over all available nodes
minNodes = Math.min(3, Math.max(minNodes, numberOfRequestedAllocations));
}

// dummy autoscaling entity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction;
import org.elasticsearch.xpack.core.ml.autoscaling.MlAutoscalingStats;
import org.elasticsearch.xpack.core.ml.inference.assignment.AdaptiveAllocationsSettings;
import org.elasticsearch.xpack.core.ml.inference.assignment.AssignmentState;
import org.elasticsearch.xpack.core.ml.inference.assignment.Priority;
import org.elasticsearch.xpack.core.ml.inference.assignment.RoutingInfo;
Expand Down Expand Up @@ -1800,6 +1801,172 @@ public void testGetMemoryAndProcessorsScaleDownNotPreventedByDummyEntityAsMemory
);
}

public void testGetMemoryAndProcessorsScaleDownForModelWithZeroAllocations() throws InterruptedException {
long memory = 1000000000;
Map<String, String> nodeAttr = Map.of(
MachineLearning.MACHINE_MEMORY_NODE_ATTR,
Long.toString(memory),
MachineLearning.MAX_JVM_SIZE_NODE_ATTR,
"400000000",
MachineLearning.ML_CONFIG_VERSION_NODE_ATTR,
"7.2.0",
MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR,
"2.0"
);

MlAutoscalingContext mlAutoscalingContext = new MlAutoscalingContext(
List.of(),
List.of(),
List.of(),
Map.of(
"model-with-zero-allocations",
TrainedModelAssignment.Builder.empty(
new StartTrainedModelDeploymentAction.TaskParams(
"model-with-zero-allocations",
"model-with-zero-allocations-deployment",
400,
0,
2,
100,
null,
Priority.NORMAL,
0L,
0L
),
new AdaptiveAllocationsSettings(true, 0, 4)
).build()
),
List.of(
DiscoveryNodeUtils.builder("ml-node-1")
.name("ml-node-name-1")
.address(new TransportAddress(InetAddress.getLoopbackAddress(), 9300))
.attributes(nodeAttr)
.roles(Set.of(DiscoveryNodeRole.ML_ROLE))
.build()
),
PersistentTasksCustomMetadata.builder().build()
);
MlMemoryTracker mockTracker = mock(MlMemoryTracker.class);

this.<MlAutoscalingStats>assertAsync(
listener -> MlAutoscalingResourceTracker.getMemoryAndProcessors(
mlAutoscalingContext,
mockTracker,
Map.of("ml-node-1", memory),
600000000,
2,
MachineLearning.DEFAULT_MAX_OPEN_JOBS_PER_NODE,
MlDummyAutoscalingEntity.of(0, 0),
1,
listener
),
stats -> {
assertEquals(memory, stats.currentPerNodeMemoryBytes());
assertEquals(0, stats.currentTotalModelMemoryBytes());
assertEquals(0, stats.currentTotalProcessorsInUse());
assertEquals(1, stats.currentTotalNodes());
assertEquals(0, stats.wantedMinNodes());
assertEquals(0, stats.wantedExtraPerNodeNodeProcessors());
assertEquals(0, stats.wantedExtraProcessors());
assertEquals(0, stats.wantedExtraModelMemoryBytes());
assertEquals(0, stats.wantedExtraPerNodeMemoryBytes());
assertEquals(memory, stats.unwantedNodeMemoryBytesToRemove());
assertEquals(MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes(), stats.currentPerNodeMemoryOverheadBytes());
}
);
}

public void testGetMemoryAndProcessorsIgnoreThreadsOfModelWithZeroAllocations() throws InterruptedException {
long memory = 1000000000;
Map<String, String> nodeAttr = Map.of(
MachineLearning.MACHINE_MEMORY_NODE_ATTR,
Long.toString(memory),
MachineLearning.MAX_JVM_SIZE_NODE_ATTR,
"400000000",
MachineLearning.ML_CONFIG_VERSION_NODE_ATTR,
"7.2.0",
MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR,
"2.0"
);

MlAutoscalingContext mlAutoscalingContext = new MlAutoscalingContext(
List.of(),
List.of(),
List.of(),
Map.of(
"model-with-one-allocation",
TrainedModelAssignment.Builder.empty(
new StartTrainedModelDeploymentAction.TaskParams(
"model-with-one-allocation",
"model-with-one-allocation-deployment",
400,
1,
2,
100,
null,
Priority.NORMAL,
0L,
0L
),
null
).addRoutingEntry("ml-node-1", new RoutingInfo(1, 1, RoutingState.STARTED, "")).build(),
"model-with-zero-allocations",
TrainedModelAssignment.Builder.empty(
new StartTrainedModelDeploymentAction.TaskParams(
"model-with-zero-allocations",
"model-with-zero-allocations-deployment",
400,
0,
4,
100,
null,
Priority.NORMAL,
0L,
0L
),
new AdaptiveAllocationsSettings(true, 0, 4)
).build()
),
List.of(
DiscoveryNodeUtils.builder("ml-node-1")
.name("ml-node-name-1")
.address(new TransportAddress(InetAddress.getLoopbackAddress(), 9300))
.attributes(nodeAttr)
.roles(Set.of(DiscoveryNodeRole.ML_ROLE))
.build()
),
PersistentTasksCustomMetadata.builder().build()
);
MlMemoryTracker mockTracker = mock(MlMemoryTracker.class);

this.<MlAutoscalingStats>assertAsync(
listener -> MlAutoscalingResourceTracker.getMemoryAndProcessors(
mlAutoscalingContext,
mockTracker,
Map.of("ml-node-1", memory),
600000000,
2,
MachineLearning.DEFAULT_MAX_OPEN_JOBS_PER_NODE,
MlDummyAutoscalingEntity.of(0, 0),
1,
listener
),
stats -> {
assertEquals(memory, stats.currentPerNodeMemoryBytes());
assertEquals(251659040, stats.currentTotalModelMemoryBytes());
assertEquals(2, stats.currentTotalProcessorsInUse());
assertEquals(1, stats.currentTotalNodes());
assertEquals(1, stats.wantedMinNodes());
assertEquals(0, stats.wantedExtraPerNodeNodeProcessors());
assertEquals(0, stats.wantedExtraProcessors());
assertEquals(0, stats.wantedExtraModelMemoryBytes());
assertEquals(0, stats.wantedExtraPerNodeMemoryBytes());
assertEquals(0, stats.unwantedNodeMemoryBytesToRemove());
assertEquals(MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes(), stats.currentPerNodeMemoryOverheadBytes());
}
);
}

private <T> void assertAsync(Consumer<ActionListener<T>> function, Consumer<T> furtherTests) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean listenerCalled = new AtomicBoolean(false);
Expand Down
Loading