Skip to content

Commit

Permalink
[ML] Add setting to scale the processor count used in the model assig…
Browse files Browse the repository at this point in the history
…nment planner (elastic#98296)

Adds the ml.allocated_processors_scale setting which is used to scale
the value of ml.allocated_processors_double. This setting influences
the number of model allocations that can fit on a node
# Conflicts:
#	x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java
#	x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingResourceTracker.java
#	x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/MlProcessors.java
  • Loading branch information
davidkyle committed Aug 8, 2023
1 parent 2649050 commit a737f54
Show file tree
Hide file tree
Showing 11 changed files with 187 additions and 53 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/98296.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 98296
summary: Add setting to scale the processor count used in the model assignment planner
area: Machine Learning
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,27 @@ public void loadExtensions(ExtensionLoader loader) {
public static final String PRE_V_8_5_ALLOCATED_PROCESSORS_NODE_ATTR = "ml.allocated_processors";

public static final String ALLOCATED_PROCESSORS_NODE_ATTR = "ml.allocated_processors_double";

/**
* For the NLP model assignment planner.
* The {@link #ALLOCATED_PROCESSORS_NODE_ATTR} attribute may be
* measured in hyper-threaded or virtual cores when the user
* would like the planner to consider logical cores.
*
* ALLOCATED_PROCESSORS_NODE_ATTR is divided by this setting,
* the default value of 1 means the attribute is unchanged, a value
* of 2 accounts for hyper-threaded cores with 2 threads per core.
* Increasing this setting above 1 reduces the number of model
* allocations that can be deployed on a node.
*/
public static final Setting<Integer> ALLOCATED_PROCESSORS_SCALE = Setting.intSetting(
"xpack.ml.allocated_processors_scale",
1,
1,
Property.OperatorDynamic,
Property.NodeScope
);

public static final Setting<Integer> CONCURRENT_JOB_ALLOCATIONS = Setting.intSetting(
"xpack.ml.node_concurrent_job_allocations",
2,
Expand Down Expand Up @@ -752,6 +773,7 @@ public static boolean isMlNode(DiscoveryNode node) {
@Override
public List<Setting<?>> getSettings() {
return List.of(
ALLOCATED_PROCESSORS_SCALE,
MachineLearningField.AUTODETECT_PROCESS,
PROCESS_CONNECT_TIMEOUT,
CONCURRENT_JOB_ALLOCATIONS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,10 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
final MlAutoscalingContext mlContext = new MlAutoscalingContext(clusterState);
final NativeMemoryCapacity currentNativeMemoryCapacity = memoryDecider.currentScale(mlContext.mlNodes);
final MlMemoryAutoscalingCapacity currentMemoryCapacity = memoryDecider.capacityFromNativeMemory(currentNativeMemoryCapacity);
final MlProcessorAutoscalingCapacity currentProcessorCapacity = processorDecider.computeCurrentCapacity(mlContext.mlNodes);
final MlProcessorAutoscalingCapacity currentProcessorCapacity = processorDecider.computeCurrentCapacity(
mlContext.mlNodes,
configuration
);

final MlScalingReason.Builder reasonBuilder = MlScalingReason.builder(mlContext)
.setCurrentMlCapacity(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public MlMemoryAutoscalingCapacity scale(Settings configuration, AutoscalingDeci
}
// We should keep this check here as well as in the processor decider while cloud is not
// reacting to processor autoscaling.
if (modelAssignmentsRequireMoreThanHalfCpu(mlContext.modelAssignments.values(), mlContext.mlNodes)) {
if (modelAssignmentsRequireMoreThanHalfCpu(mlContext.modelAssignments.values(), mlContext.mlNodes, configuration)) {
logger.debug("not down-scaling; model assignments require more than half of the ML tier's allocated processors");
return null;
}
Expand Down Expand Up @@ -825,11 +825,15 @@ static MlMemoryAutoscalingCapacity ensureScaleDown(
return newCapacity;
}

static boolean modelAssignmentsRequireMoreThanHalfCpu(Collection<TrainedModelAssignment> assignments, List<DiscoveryNode> mlNodes) {
static boolean modelAssignmentsRequireMoreThanHalfCpu(
Collection<TrainedModelAssignment> assignments,
List<DiscoveryNode> mlNodes,
Settings settings
) {
int totalRequiredProcessors = assignments.stream()
.mapToInt(t -> t.getTaskParams().getNumberOfAllocations() * t.getTaskParams().getThreadsPerAllocation())
.sum();
int totalMlProcessors = mlNodes.stream().mapToInt(node -> MlProcessors.get(node).roundUp()).sum();
int totalMlProcessors = mlNodes.stream().mapToInt(node -> MlProcessors.get(node, settings).roundUp()).sum();
return totalRequiredProcessors * 2 > totalMlProcessors;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public MlProcessorAutoscalingCapacity scale(Settings configuration, AutoscalingD
).build();
}

final MlProcessorAutoscalingCapacity currentCapacity = computeCurrentCapacity(mlContext.mlNodes);
final MlProcessorAutoscalingCapacity currentCapacity = computeCurrentCapacity(mlContext.mlNodes, configuration);

final MlProcessorAutoscalingCapacity requiredCapacity = computeRequiredCapacity(trainedModelAssignmentMetadata).build();

Expand All @@ -64,7 +64,8 @@ public MlProcessorAutoscalingCapacity scale(Settings configuration, AutoscalingD

if (MlMemoryAutoscalingDecider.modelAssignmentsRequireMoreThanHalfCpu(
trainedModelAssignmentMetadata.allAssignments().values(),
mlContext.mlNodes
mlContext.mlNodes,
configuration
)) {
return MlProcessorAutoscalingCapacity.builder(currentCapacity.nodeProcessors(), currentCapacity.tierProcessors())
.setReason("not scaling down as model assignments require more than half of the ML tier's allocated processors")
Expand Down Expand Up @@ -136,11 +137,11 @@ private MlProcessorAutoscalingCapacity.Builder computeRequiredCapacity(TrainedMo
);
}

MlProcessorAutoscalingCapacity computeCurrentCapacity(List<DiscoveryNode> mlNodes) {
MlProcessorAutoscalingCapacity computeCurrentCapacity(List<DiscoveryNode> mlNodes, Settings settings) {
Processors maxNodeProcessors = Processors.ZERO;
Processors tierProcessors = Processors.ZERO;
for (DiscoveryNode node : mlNodes) {
Processors nodeProcessors = MlProcessors.get(node);
Processors nodeProcessors = MlProcessors.get(node, settings);
if (nodeProcessors.compareTo(maxNodeProcessors) > 0) {
maxNodeProcessors = nodeProcessors;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ private TrainedModelAssignmentMetadata.Builder rebalanceAssignments(
nodeAvailabilityZoneMapper.buildMlNodesByAvailabilityZone(currentState),
modelToAdd
);
TrainedModelAssignmentMetadata.Builder rebalanced = rebalancer.rebalance();
TrainedModelAssignmentMetadata.Builder rebalanced = rebalancer.rebalance(clusterService.getSettings());
if (modelToAdd.isPresent()) {
checkModelIsFullyAllocatedIfScalingIsNotPossible(modelToAdd.get().getDeploymentId(), rebalanced, nodes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction;
import org.elasticsearch.xpack.core.ml.inference.assignment.Priority;
Expand Down Expand Up @@ -63,7 +64,7 @@ class TrainedModelAssignmentRebalancer {
this.deploymentToAdd = Objects.requireNonNull(deploymentToAdd);
}

TrainedModelAssignmentMetadata.Builder rebalance() throws Exception {
TrainedModelAssignmentMetadata.Builder rebalance(Settings settings) {
if (deploymentToAdd.isPresent() && currentMetadata.hasDeployment(deploymentToAdd.get().getDeploymentId())) {
throw new ResourceAlreadyExistsException(
"[{}] assignment for deployment with model [{}] already exists",
Expand All @@ -77,8 +78,8 @@ TrainedModelAssignmentMetadata.Builder rebalance() throws Exception {
return TrainedModelAssignmentMetadata.Builder.fromMetadata(currentMetadata);
}

AssignmentPlan assignmentPlan = computeAssignmentPlan();
return buildAssignmentsFromPlan(assignmentPlan);
AssignmentPlan assignmentPlan = computeAssignmentPlan(settings);
return buildAssignmentsFromPlan(assignmentPlan, settings);
}

private boolean areAllModelsSatisfiedAndNoOutdatedRoutingEntries() {
Expand All @@ -91,8 +92,8 @@ private boolean areAllModelsSatisfiedAndNoOutdatedRoutingEntries() {
return true;
}

AssignmentPlan computeAssignmentPlan() {
final Map<List<String>, List<AssignmentPlan.Node>> nodesByZone = createNodesByZoneMap();
AssignmentPlan computeAssignmentPlan(Settings settings) {
final Map<List<String>, List<AssignmentPlan.Node>> nodesByZone = createNodesByZoneMap(settings);
final Set<String> assignableNodeIds = nodesByZone.values()
.stream()
.flatMap(List::stream)
Expand Down Expand Up @@ -270,7 +271,7 @@ private Map<String, Integer> findFittingAssignments(
return fittingAssignments;
}

private Map<List<String>, List<AssignmentPlan.Node>> createNodesByZoneMap() {
private Map<List<String>, List<AssignmentPlan.Node>> createNodesByZoneMap(Settings settings) {
return mlNodesByZone.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> {
Collection<DiscoveryNode> discoveryNodes = e.getValue();
List<AssignmentPlan.Node> nodes = new ArrayList<>();
Expand All @@ -284,7 +285,7 @@ private Map<List<String>, List<AssignmentPlan.Node>> createNodesByZoneMap() {
// We subtract native inference memory as the planner expects available memory for
// native inference including current assignments.
getNodeFreeMemoryExcludingPerNodeOverheadAndNativeInference(load),
MlProcessors.get(discoveryNode).roundUp()
MlProcessors.get(discoveryNode, settings).roundUp()
)
);
} else {
Expand All @@ -304,7 +305,7 @@ private static long getNodeFreeMemoryExcludingPerNodeOverheadAndNativeInference(
return load.getFreeMemoryExcludingPerNodeOverhead() - load.getAssignedNativeInferenceMemory();
}

private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(AssignmentPlan assignmentPlan) {
private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(AssignmentPlan assignmentPlan, Settings settings) {
TrainedModelAssignmentMetadata.Builder builder = TrainedModelAssignmentMetadata.Builder.empty();
for (AssignmentPlan.Deployment deployment : assignmentPlan.models()) {
TrainedModelAssignment existingAssignment = currentMetadata.getDeploymentAssignment(deployment.id());
Expand Down Expand Up @@ -342,7 +343,7 @@ private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(Assignme
}
assignmentBuilder.calculateAndSetAssignmentState();

explainAssignments(assignmentPlan, nodeLoads, deployment).ifPresent(assignmentBuilder::setReason);
explainAssignments(assignmentPlan, nodeLoads, deployment, settings).ifPresent(assignmentBuilder::setReason);
builder.addNewAssignment(deployment.id(), assignmentBuilder);
}
return builder;
Expand All @@ -351,7 +352,8 @@ private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(Assignme
private Optional<String> explainAssignments(
AssignmentPlan assignmentPlan,
Map<DiscoveryNode, NodeLoad> nodeLoads,
AssignmentPlan.Deployment deployment
AssignmentPlan.Deployment deployment,
Settings settings
) {
if (assignmentPlan.satisfiesAllocations(deployment)) {
return Optional.empty();
Expand All @@ -363,7 +365,7 @@ private Optional<String> explainAssignments(

Map<String, String> nodeToReason = new TreeMap<>();
for (Map.Entry<DiscoveryNode, NodeLoad> nodeAndLoad : nodeLoads.entrySet()) {
Optional<String> reason = explainAssignment(assignmentPlan, nodeAndLoad.getKey(), nodeAndLoad.getValue(), deployment);
Optional<String> reason = explainAssignment(assignmentPlan, nodeAndLoad.getKey(), nodeAndLoad.getValue(), deployment, settings);
reason.ifPresent(s -> nodeToReason.put(nodeAndLoad.getKey().getId(), s));
}

Expand All @@ -382,7 +384,8 @@ private Optional<String> explainAssignment(
AssignmentPlan assignmentPlan,
DiscoveryNode node,
NodeLoad load,
AssignmentPlan.Deployment deployment
AssignmentPlan.Deployment deployment,
Settings settings
) {
if (Strings.isNullOrEmpty(load.getError()) == false) {
return Optional.of(load.getError());
Expand All @@ -395,7 +398,7 @@ private Optional<String> explainAssignment(
// But we should also check if we managed to assign a model during the rebalance for which
// we check if the node has used up any of its allocated processors.
boolean isPerNodeOverheadAccountedFor = load.getNumAssignedJobsAndModels() > 0
|| assignmentPlan.getRemainingNodeCores(load.getNodeId()) < MlProcessors.get(node).roundUp();
|| assignmentPlan.getRemainingNodeCores(load.getNodeId()) < MlProcessors.get(node, settings).roundUp();
long requiredMemory = deployment.memoryBytes() + (isPerNodeOverheadAccountedFor
? 0
: MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes());
Expand Down Expand Up @@ -424,7 +427,7 @@ private Optional<String> explainAssignment(
"This node has insufficient allocated processors. Available processors [{}], free processors [{}], "
+ "processors required for each allocation of this model [{}]",
new Object[] {
MlProcessors.get(node).roundUp(),
MlProcessors.get(node, settings).roundUp(),
assignmentPlan.getRemainingNodeCores(node.getId()),
deployment.threadsPerAllocation() }
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@

import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.Processors;
import org.elasticsearch.xpack.ml.MachineLearning;

public final class MlProcessors {

private MlProcessors() {}

public static Processors get(DiscoveryNode node) {
public static Processors get(DiscoveryNode node, Settings settings) {
String allocatedProcessorsString = node.getVersion().onOrAfter(Version.V_8_5_0)
? node.getAttributes().get(MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR)
: node.getAttributes().get(MachineLearning.PRE_V_8_5_ALLOCATED_PROCESSORS_NODE_ATTR);
Expand All @@ -25,7 +26,19 @@ public static Processors get(DiscoveryNode node) {
}
try {
double processorsAsDouble = Double.parseDouble(allocatedProcessorsString);
return processorsAsDouble > 0 ? Processors.of(processorsAsDouble) : Processors.ZERO;
if (processorsAsDouble <= 0) {
return Processors.ZERO;
}

Integer scale = null;
if (settings != null) {
scale = MachineLearning.ALLOCATED_PROCESSORS_SCALE.get(settings);
}
if (scale != null) {
processorsAsDouble = processorsAsDouble / scale;
}
return Processors.of(processorsAsDouble);

} catch (NumberFormatException e) {
assert e == null
: MachineLearning.ALLOCATED_PROCESSORS_NODE_ATTR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1079,7 +1079,8 @@ public void testCpuModelAssignmentRequirements() {
)
).build()
),
withMlNodes("ml_node_1", "ml_node_2")
withMlNodes("ml_node_1", "ml_node_2"),
Settings.EMPTY
)
);
assertTrue(
Expand Down Expand Up @@ -1110,7 +1111,8 @@ public void testCpuModelAssignmentRequirements() {
)
).build()
),
withMlNodes("ml_node_1", "ml_node_2")
withMlNodes("ml_node_1", "ml_node_2"),
Settings.EMPTY
)
);
assertFalse(
Expand Down Expand Up @@ -1141,7 +1143,8 @@ public void testCpuModelAssignmentRequirements() {
)
).build()
),
withMlNodes("ml_node_1", "ml_node_2", "ml_node_3", "ml_node_4")
withMlNodes("ml_node_1", "ml_node_2", "ml_node_3", "ml_node_4"),
Settings.EMPTY
)
);
}
Expand Down
Loading

0 comments on commit a737f54

Please sign in to comment.