Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport][ML] Add setting to scale the processor count used in the model assignment planner #98299

Merged
merged 2 commits into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/98296.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 98296
summary: Add setting to scale the processor count used in the model assignment planner
area: Machine Learning
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,27 @@ public void loadExtensions(ExtensionLoader loader) {
public static final String PRE_V_8_5_ALLOCATED_PROCESSORS_NODE_ATTR = "ml.allocated_processors";

public static final String ALLOCATED_PROCESSORS_NODE_ATTR = "ml.allocated_processors_double";

/**
* For the NLP model assignment planner.
* The {@link #ALLOCATED_PROCESSORS_NODE_ATTR} attribute may be
* measured in hyper-threaded or virtual cores when the user
* would like the planner to consider logical cores.
*
* ALLOCATED_PROCESSORS_NODE_ATTR is divided by this setting,
* the default value of 1 means the attribute is unchanged, a value
* of 2 accounts for hyper-threaded cores with 2 threads per core.
* Increasing this setting above 1 reduces the number of model
* allocations that can be deployed on a node.
*/
public static final Setting<Integer> ALLOCATED_PROCESSORS_SCALE = Setting.intSetting(
"xpack.ml.allocated_processors_scale",
1,
1,
Property.Dynamic,
Property.NodeScope
);

public static final Setting<Integer> CONCURRENT_JOB_ALLOCATIONS = Setting.intSetting(
"xpack.ml.node_concurrent_job_allocations",
2,
Expand Down Expand Up @@ -752,6 +773,7 @@ public static boolean isMlNode(DiscoveryNode node) {
@Override
public List<Setting<?>> getSettings() {
return List.of(
ALLOCATED_PROCESSORS_SCALE,
MachineLearningField.AUTODETECT_PROCESS,
PROCESS_CONNECT_TIMEOUT,
CONCURRENT_JOB_ALLOCATIONS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,10 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
final MlAutoscalingContext mlContext = new MlAutoscalingContext(clusterState);
final NativeMemoryCapacity currentNativeMemoryCapacity = memoryDecider.currentScale(mlContext.mlNodes);
final MlMemoryAutoscalingCapacity currentMemoryCapacity = memoryDecider.capacityFromNativeMemory(currentNativeMemoryCapacity);
final MlProcessorAutoscalingCapacity currentProcessorCapacity = processorDecider.computeCurrentCapacity(mlContext.mlNodes);
final MlProcessorAutoscalingCapacity currentProcessorCapacity = processorDecider.computeCurrentCapacity(
mlContext.mlNodes,
configuration
);

final MlScalingReason.Builder reasonBuilder = MlScalingReason.builder(mlContext)
.setCurrentMlCapacity(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public MlMemoryAutoscalingCapacity scale(Settings configuration, AutoscalingDeci
}
// We should keep this check here as well as in the processor decider while cloud is not
// reacting to processor autoscaling.
if (modelAssignmentsRequireMoreThanHalfCpu(mlContext.modelAssignments.values(), mlContext.mlNodes)) {
if (modelAssignmentsRequireMoreThanHalfCpu(mlContext.modelAssignments.values(), mlContext.mlNodes, configuration)) {
logger.debug("not down-scaling; model assignments require more than half of the ML tier's allocated processors");
return null;
}
Expand Down Expand Up @@ -825,11 +825,15 @@ static MlMemoryAutoscalingCapacity ensureScaleDown(
return newCapacity;
}

static boolean modelAssignmentsRequireMoreThanHalfCpu(Collection<TrainedModelAssignment> assignments, List<DiscoveryNode> mlNodes) {
static boolean modelAssignmentsRequireMoreThanHalfCpu(
Collection<TrainedModelAssignment> assignments,
List<DiscoveryNode> mlNodes,
Settings settings
) {
int totalRequiredProcessors = assignments.stream()
.mapToInt(t -> t.getTaskParams().getNumberOfAllocations() * t.getTaskParams().getThreadsPerAllocation())
.sum();
int totalMlProcessors = mlNodes.stream().mapToInt(node -> MlProcessors.get(node).roundUp()).sum();
int totalMlProcessors = mlNodes.stream().mapToInt(node -> MlProcessors.get(node, settings).roundUp()).sum();
return totalRequiredProcessors * 2 > totalMlProcessors;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public MlProcessorAutoscalingCapacity scale(Settings configuration, AutoscalingD
).build();
}

final MlProcessorAutoscalingCapacity currentCapacity = computeCurrentCapacity(mlContext.mlNodes);
final MlProcessorAutoscalingCapacity currentCapacity = computeCurrentCapacity(mlContext.mlNodes, configuration);

final MlProcessorAutoscalingCapacity requiredCapacity = computeRequiredCapacity(trainedModelAssignmentMetadata).build();

Expand All @@ -64,7 +64,8 @@ public MlProcessorAutoscalingCapacity scale(Settings configuration, AutoscalingD

if (MlMemoryAutoscalingDecider.modelAssignmentsRequireMoreThanHalfCpu(
trainedModelAssignmentMetadata.allAssignments().values(),
mlContext.mlNodes
mlContext.mlNodes,
configuration
)) {
return MlProcessorAutoscalingCapacity.builder(currentCapacity.nodeProcessors(), currentCapacity.tierProcessors())
.setReason("not scaling down as model assignments require more than half of the ML tier's allocated processors")
Expand Down Expand Up @@ -136,11 +137,11 @@ private MlProcessorAutoscalingCapacity.Builder computeRequiredCapacity(TrainedMo
);
}

MlProcessorAutoscalingCapacity computeCurrentCapacity(List<DiscoveryNode> mlNodes) {
MlProcessorAutoscalingCapacity computeCurrentCapacity(List<DiscoveryNode> mlNodes, Settings settings) {
Processors maxNodeProcessors = Processors.ZERO;
Processors tierProcessors = Processors.ZERO;
for (DiscoveryNode node : mlNodes) {
Processors nodeProcessors = MlProcessors.get(node);
Processors nodeProcessors = MlProcessors.get(node, settings);
if (nodeProcessors.compareTo(maxNodeProcessors) > 0) {
maxNodeProcessors = nodeProcessors;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ private TrainedModelAssignmentMetadata.Builder rebalanceAssignments(
nodeAvailabilityZoneMapper.buildMlNodesByAvailabilityZone(currentState),
modelToAdd
);
TrainedModelAssignmentMetadata.Builder rebalanced = rebalancer.rebalance();
TrainedModelAssignmentMetadata.Builder rebalanced = rebalancer.rebalance(clusterService.getSettings());
if (modelToAdd.isPresent()) {
checkModelIsFullyAllocatedIfScalingIsNotPossible(modelToAdd.get().getDeploymentId(), rebalanced, nodes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction;
import org.elasticsearch.xpack.core.ml.inference.assignment.Priority;
Expand Down Expand Up @@ -63,7 +64,7 @@ class TrainedModelAssignmentRebalancer {
this.deploymentToAdd = Objects.requireNonNull(deploymentToAdd);
}

TrainedModelAssignmentMetadata.Builder rebalance() throws Exception {
TrainedModelAssignmentMetadata.Builder rebalance(Settings settings) {
if (deploymentToAdd.isPresent() && currentMetadata.hasDeployment(deploymentToAdd.get().getDeploymentId())) {
throw new ResourceAlreadyExistsException(
"[{}] assignment for deployment with model [{}] already exists",
Expand All @@ -77,8 +78,8 @@ TrainedModelAssignmentMetadata.Builder rebalance() throws Exception {
return TrainedModelAssignmentMetadata.Builder.fromMetadata(currentMetadata);
}

AssignmentPlan assignmentPlan = computeAssignmentPlan();
return buildAssignmentsFromPlan(assignmentPlan);
AssignmentPlan assignmentPlan = computeAssignmentPlan(settings);
return buildAssignmentsFromPlan(assignmentPlan, settings);
}

private boolean areAllModelsSatisfiedAndNoOutdatedRoutingEntries() {
Expand All @@ -91,8 +92,8 @@ private boolean areAllModelsSatisfiedAndNoOutdatedRoutingEntries() {
return true;
}

AssignmentPlan computeAssignmentPlan() {
final Map<List<String>, List<AssignmentPlan.Node>> nodesByZone = createNodesByZoneMap();
AssignmentPlan computeAssignmentPlan(Settings settings) {
final Map<List<String>, List<AssignmentPlan.Node>> nodesByZone = createNodesByZoneMap(settings);
final Set<String> assignableNodeIds = nodesByZone.values()
.stream()
.flatMap(List::stream)
Expand Down Expand Up @@ -270,7 +271,7 @@ private Map<String, Integer> findFittingAssignments(
return fittingAssignments;
}

private Map<List<String>, List<AssignmentPlan.Node>> createNodesByZoneMap() {
private Map<List<String>, List<AssignmentPlan.Node>> createNodesByZoneMap(Settings settings) {
return mlNodesByZone.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> {
Collection<DiscoveryNode> discoveryNodes = e.getValue();
List<AssignmentPlan.Node> nodes = new ArrayList<>();
Expand All @@ -284,7 +285,7 @@ private Map<List<String>, List<AssignmentPlan.Node>> createNodesByZoneMap() {
// We subtract native inference memory as the planner expects available memory for
// native inference including current assignments.
getNodeFreeMemoryExcludingPerNodeOverheadAndNativeInference(load),
MlProcessors.get(discoveryNode).roundUp()
MlProcessors.get(discoveryNode, settings).roundUp()
)
);
} else {
Expand All @@ -304,7 +305,7 @@ private static long getNodeFreeMemoryExcludingPerNodeOverheadAndNativeInference(
return load.getFreeMemoryExcludingPerNodeOverhead() - load.getAssignedNativeInferenceMemory();
}

private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(AssignmentPlan assignmentPlan) {
private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(AssignmentPlan assignmentPlan, Settings settings) {
TrainedModelAssignmentMetadata.Builder builder = TrainedModelAssignmentMetadata.Builder.empty();
for (AssignmentPlan.Deployment deployment : assignmentPlan.models()) {
TrainedModelAssignment existingAssignment = currentMetadata.getDeploymentAssignment(deployment.id());
Expand Down Expand Up @@ -342,7 +343,7 @@ private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(Assignme
}
assignmentBuilder.calculateAndSetAssignmentState();

explainAssignments(assignmentPlan, nodeLoads, deployment).ifPresent(assignmentBuilder::setReason);
explainAssignments(assignmentPlan, nodeLoads, deployment, settings).ifPresent(assignmentBuilder::setReason);
builder.addNewAssignment(deployment.id(), assignmentBuilder);
}
return builder;
Expand All @@ -351,7 +352,8 @@ private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(Assignme
private Optional<String> explainAssignments(
AssignmentPlan assignmentPlan,
Map<DiscoveryNode, NodeLoad> nodeLoads,
AssignmentPlan.Deployment deployment
AssignmentPlan.Deployment deployment,
Settings settings
) {
if (assignmentPlan.satisfiesAllocations(deployment)) {
return Optional.empty();
Expand All @@ -363,7 +365,7 @@ private Optional<String> explainAssignments(

Map<String, String> nodeToReason = new TreeMap<>();
for (Map.Entry<DiscoveryNode, NodeLoad> nodeAndLoad : nodeLoads.entrySet()) {
Optional<String> reason = explainAssignment(assignmentPlan, nodeAndLoad.getKey(), nodeAndLoad.getValue(), deployment);
Optional<String> reason = explainAssignment(assignmentPlan, nodeAndLoad.getKey(), nodeAndLoad.getValue(), deployment, settings);
reason.ifPresent(s -> nodeToReason.put(nodeAndLoad.getKey().getId(), s));
}

Expand All @@ -382,7 +384,8 @@ private Optional<String> explainAssignment(
AssignmentPlan assignmentPlan,
DiscoveryNode node,
NodeLoad load,
AssignmentPlan.Deployment deployment
AssignmentPlan.Deployment deployment,
Settings settings
) {
if (Strings.isNullOrEmpty(load.getError()) == false) {
return Optional.of(load.getError());
Expand All @@ -395,7 +398,7 @@ private Optional<String> explainAssignment(
// But we should also check if we managed to assign a model during the rebalance for which
// we check if the node has used up any of its allocated processors.
boolean isPerNodeOverheadAccountedFor = load.getNumAssignedJobsAndModels() > 0
|| assignmentPlan.getRemainingNodeCores(load.getNodeId()) < MlProcessors.get(node).roundUp();
|| assignmentPlan.getRemainingNodeCores(load.getNodeId()) < MlProcessors.get(node, settings).roundUp();
long requiredMemory = deployment.memoryBytes() + (isPerNodeOverheadAccountedFor
? 0
: MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes());
Expand Down Expand Up @@ -424,7 +427,7 @@ private Optional<String> explainAssignment(
"This node has insufficient allocated processors. Available processors [{}], free processors [{}], "
+ "processors required for each allocation of this model [{}]",
new Object[] {
MlProcessors.get(node).roundUp(),
MlProcessors.get(node, settings).roundUp(),
assignmentPlan.getRemainingNodeCores(node.getId()),
deployment.threadsPerAllocation() }
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@

import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.Processors;
import org.elasticsearch.xpack.ml.MachineLearning;

public final class MlProcessors {

private MlProcessors() {}

public static Processors get(DiscoveryNode node) {
public static Processors get(DiscoveryNode node, Settings settings) {
String allocatedProcessorsString = node.getVersion().onOrAfter(Version.V_8_5_0)
? node.getAttributes().get(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR)
: node.getAttributes().get(MachineLearning.PRE_V_8_5_ALLOCATED_PROCESSORS_NODE_ATTR);
Expand All @@ -25,7 +26,19 @@ public static Processors get(DiscoveryNode node) {
}
try {
double processorsAsDouble = Double.parseDouble(allocatedProcessorsString);
return processorsAsDouble > 0 ? Processors.of(processorsAsDouble) : Processors.ZERO;
if (processorsAsDouble <= 0) {
return Processors.ZERO;
}

Integer scale = null;
if (settings != null) {
scale = MachineLearning.ALLOCATED_PROCESSORS_SCALE.get(settings);
}
if (scale != null) {
processorsAsDouble = processorsAsDouble / scale;
}
return Processors.of(processorsAsDouble);

} catch (NumberFormatException e) {
assert e == null
: MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1079,7 +1079,8 @@ public void testCpuModelAssignmentRequirements() {
)
).build()
),
withMlNodes("ml_node_1", "ml_node_2")
withMlNodes("ml_node_1", "ml_node_2"),
Settings.EMPTY
)
);
assertTrue(
Expand Down Expand Up @@ -1110,7 +1111,8 @@ public void testCpuModelAssignmentRequirements() {
)
).build()
),
withMlNodes("ml_node_1", "ml_node_2")
withMlNodes("ml_node_1", "ml_node_2"),
Settings.EMPTY
)
);
assertFalse(
Expand Down Expand Up @@ -1141,7 +1143,8 @@ public void testCpuModelAssignmentRequirements() {
)
).build()
),
withMlNodes("ml_node_1", "ml_node_2", "ml_node_3", "ml_node_4")
withMlNodes("ml_node_1", "ml_node_2", "ml_node_3", "ml_node_4"),
Settings.EMPTY
)
);
}
Expand Down
Loading