Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] disallow autoscaling downscaling in two trained model assignment scenarios #88623

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction.DatafeedParams;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.inference.assignment.AllocationStatus;
import org.elasticsearch.xpack.core.ml.inference.assignment.AssignmentState;
import org.elasticsearch.xpack.core.ml.inference.assignment.TrainedModelAssignment;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits;
Expand Down Expand Up @@ -409,6 +410,17 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
.filter(e -> e.getValue().getAssignmentState().equals(AssignmentState.STARTING) && e.getValue().getNodeRoutingTable().isEmpty())
.map(Map.Entry::getKey)
.toList();
final List<String> notFullyAllocatedModels = modelAssignments.entrySet()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Point 2 is a place holder. Fix 1 will be a requirement even in the future with vCPU autoscaling.

It's true we'll need something that achieves the same as fix 1, but since our current autoscaling decider is already incredibly complex it would be nice to turn it into an ML memory autoscaling decider and have a separate ML CPU autoscaling decider. If we do that then this logic will live in the new CPU autoscaling decider.

So please add a TODO that this is a condition based on CPU, and should move to the CPU autoscaling decider when it's written.

.stream()
.filter(
e -> e.getValue()
.calculateAllocationStatus()
.map(AllocationStatus::calculateState)
.orElse(AllocationStatus.State.FULLY_ALLOCATED)
.equals(AllocationStatus.State.FULLY_ALLOCATED) == false
)
.map(Map.Entry::getKey)
.toList();

final int numAnalyticsJobsInQueue = NUM_ANALYTICS_JOBS_IN_QUEUE.get(configuration);
final int numAnomalyJobsInQueue = NUM_ANOMALY_JOBS_IN_QUEUE.get(configuration);
Expand Down Expand Up @@ -543,7 +555,8 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider

if (waitingAnalyticsJobs.isEmpty() == false
|| waitingSnapshotUpgrades.isEmpty() == false
|| waitingAnomalyJobs.isEmpty() == false) {
|| waitingAnomalyJobs.isEmpty() == false
|| notFullyAllocatedModels.isEmpty() == false) {
// We don't want to continue to consider a scale down if there are now waiting jobs
resetScaleDownCoolDown();
return new AutoscalingDeciderResult(
Expand All @@ -553,11 +566,13 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
Locale.ROOT,
"Passing currently perceived capacity as there are [%d] model snapshot upgrades, "
+ "[%d] analytics and [%d] anomaly detection jobs in the queue, "
+ " [%d] trained models not fully-allocated, "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
+ " [%d] trained models not fully-allocated, "
+ "[%d] trained models not fully-allocated, "

+ "but the number in the queue is less than the configured maximum allowed "
+ "or the queued jobs will eventually be assignable at the current size.",
waitingSnapshotUpgrades.size(),
waitingAnalyticsJobs.size(),
waitingAnomalyJobs.size()
waitingAnomalyJobs.size(),
notFullyAllocatedModels.size()
)
).build()
);
Expand Down Expand Up @@ -654,6 +669,9 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
if (capacity == null) {
return null;
}
if (modelAssignmentsRequireMoreThanHalfCpu(modelAssignments.values(), mlNodes)) {
return null;
Copy link
Contributor

@droberts195 droberts195 Jul 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a debug message here so that if this ever blocks a cluster from scaling down and we need to confirm this is what's really happening we can ask to switch on debug logging for this class.

Also, please add another TODO here saying this condition should move to the CPU autoscaling decider when it's written.

}
return new AutoscalingDeciderResult(capacity, result.reason());
});
if (maybeScaleDown.isPresent()) {
Expand Down Expand Up @@ -744,6 +762,26 @@ static AutoscalingCapacity ensureScaleDown(AutoscalingCapacity scaleDownResult,
return newCapacity;
}

static boolean modelAssignmentsRequireMoreThanHalfCpu(Collection<TrainedModelAssignment> assignments, List<DiscoveryNode> mlNodes) {
int totalRequiredProcessors = assignments.stream()
.mapToInt(t -> t.getTaskParams().getNumberOfAllocations() * t.getTaskParams().getThreadsPerAllocation())
.sum();
int totalMlProcessors = mlNodes.stream().mapToInt(node -> {
String allocatedProcessorsString = node.getAttributes().get(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR);
try {
return Integer.parseInt(allocatedProcessorsString);
} catch (NumberFormatException e) {
assert e == null
: MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR
+ " should parse because we set it internally: invalid value was ["
+ allocatedProcessorsString
+ "]";
return 0;
}
}).sum();
return totalRequiredProcessors * 2 > totalMlProcessors;
}

// This doesn't allow any jobs to wait in the queue, this is because in a "normal" scaling event, we also verify if a job
// can eventually start, and given the current cluster, no job can eventually start.
AutoscalingDeciderResult scaleUpFromZero(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.inference.assignment.TrainedModelAssignment;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
Expand Down Expand Up @@ -123,6 +125,7 @@ private static long mlOnlyNodeJvmBytes(long systemMemoryBytes) {
private static final long TEST_NODE_SIZE = ByteSizeValue.ofGb(20).getBytes();
private static final long ML_MEMORY_FOR_TEST_NODE_SIZE = NativeMemoryCalculator.allowedBytesForMl(TEST_NODE_SIZE, 0, true);
private static final long TEST_JVM_SIZE = mlOnlyNodeJvmBytes(TEST_NODE_SIZE);
private static final int TEST_ALLOCATED_PROCESSORS = 2;
private static final long TEST_JOB_SIZE = ByteSizeValue.ofMb(200).getBytes();
private static final long PER_NODE_OVERHEAD = MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes();

Expand Down Expand Up @@ -1182,6 +1185,48 @@ public void testScaleDown() {
}
}

public void testCpuModelAssignmentRequirements() {
assertTrue(
MlAutoscalingDeciderService.modelAssignmentsRequireMoreThanHalfCpu(
List.of(
TrainedModelAssignment.Builder.empty(
new StartTrainedModelDeploymentAction.TaskParams("model1", TEST_JOB_SIZE, 3, 2, 100, null)
).build(),
TrainedModelAssignment.Builder.empty(
new StartTrainedModelDeploymentAction.TaskParams("model1", TEST_JOB_SIZE, 1, 1, 100, null)
).build()
),
withMlNodes("ml_node_1", "ml_node_2")
)
);
assertTrue(
MlAutoscalingDeciderService.modelAssignmentsRequireMoreThanHalfCpu(
List.of(
TrainedModelAssignment.Builder.empty(
new StartTrainedModelDeploymentAction.TaskParams("model1", TEST_JOB_SIZE, 3, 1, 100, null)
).build(),
TrainedModelAssignment.Builder.empty(
new StartTrainedModelDeploymentAction.TaskParams("model1", TEST_JOB_SIZE, 1, 1, 100, null)
).build()
),
withMlNodes("ml_node_1", "ml_node_2")
)
);
assertFalse(
MlAutoscalingDeciderService.modelAssignmentsRequireMoreThanHalfCpu(
List.of(
TrainedModelAssignment.Builder.empty(
new StartTrainedModelDeploymentAction.TaskParams("model1", TEST_JOB_SIZE, 3, 1, 100, null)
).build(),
TrainedModelAssignment.Builder.empty(
new StartTrainedModelDeploymentAction.TaskParams("model1", TEST_JOB_SIZE, 1, 1, 100, null)
).build()
),
withMlNodes("ml_node_1", "ml_node_2", "ml_node_3", "ml_node_4")
)
);
}

public void testEnsureScaleDown() {
assertThat(
MlAutoscalingDeciderService.ensureScaleDown(
Expand Down Expand Up @@ -1394,7 +1439,9 @@ private static List<DiscoveryNode> withMlNodes(String... nodeName) {
MachineLearning.MACHINE_MEMORY_NODE_ATTR,
String.valueOf(TEST_NODE_SIZE),
MachineLearning.MAX_JVM_SIZE_NODE_ATTR,
String.valueOf(TEST_JVM_SIZE)
String.valueOf(TEST_JVM_SIZE),
MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR,
String.valueOf(TEST_ALLOCATED_PROCESSORS)
),
Set.of(DiscoveryNodeRole.ML_ROLE),
Version.CURRENT
Expand Down