Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Make the scale the processor count setting updatable #98305

Merged
merged 1 commit into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderContext;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResult;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderService;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.NodeLoadDetector;
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;

Expand All @@ -43,6 +44,7 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService, L
private final MlProcessorAutoscalingDecider processorDecider;

private volatile boolean isMaster;
private volatile int allocatedProcessorsScale;

public MlAutoscalingDeciderService(
MlMemoryTracker memoryTracker,
Expand All @@ -69,7 +71,15 @@ public MlAutoscalingDeciderService(
scaleTimer
);
this.processorDecider = new MlProcessorAutoscalingDecider(scaleTimer);
this.allocatedProcessorsScale = MachineLearning.ALLOCATED_PROCESSORS_SCALE.get(settings);

clusterService.addLocalNodeMasterListener(this);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MachineLearning.ALLOCATED_PROCESSORS_SCALE, this::setAllocatedProcessorsScale);
}

void setAllocatedProcessorsScale(int scale) {
this.allocatedProcessorsScale = scale;
}

@Override
Expand All @@ -96,7 +106,7 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
final MlMemoryAutoscalingCapacity currentMemoryCapacity = memoryDecider.capacityFromNativeMemory(currentNativeMemoryCapacity);
final MlProcessorAutoscalingCapacity currentProcessorCapacity = processorDecider.computeCurrentCapacity(
mlContext.mlNodes,
configuration
allocatedProcessorsScale
);

final MlScalingReason.Builder reasonBuilder = MlScalingReason.builder(mlContext)
Expand All @@ -123,7 +133,7 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
return downscaleToZero(configuration, context, currentNativeMemoryCapacity, reasonBuilder);
}

MlMemoryAutoscalingCapacity memoryCapacity = memoryDecider.scale(configuration, context, mlContext);
MlMemoryAutoscalingCapacity memoryCapacity = memoryDecider.scale(configuration, context, mlContext, allocatedProcessorsScale);
if (memoryCapacity.isUndetermined()) {
// If we cannot determine memory capacity we shouldn't make any autoscaling decision
// as it could lead to undesired capacity. For example, it could be that the processor decider decides
Expand All @@ -134,7 +144,12 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
reasonBuilder.setSimpleReason(format("[memory_decider] %s", memoryCapacity.reason())).build()
);
}
MlProcessorAutoscalingCapacity processorCapacity = processorDecider.scale(configuration, context, mlContext);
MlProcessorAutoscalingCapacity processorCapacity = processorDecider.scale(
configuration,
context,
mlContext,
allocatedProcessorsScale
);
reasonBuilder.setSimpleReason(
format("[memory_decider] %s; [processor_decider] %s", memoryCapacity.reason(), processorCapacity.reason())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,12 @@ void setMaxMlNodeSize(ByteSizeValue maxMlNodeSize) {
}
}

public MlMemoryAutoscalingCapacity scale(Settings configuration, AutoscalingDeciderContext context, MlAutoscalingContext mlContext) {
public MlMemoryAutoscalingCapacity scale(
Settings configuration,
AutoscalingDeciderContext context,
MlAutoscalingContext mlContext,
int allocatedProcessorsScale
) {
final ClusterState clusterState = context.state();

scaleTimer.lastScaleToScaleIntervalMillis()
Expand Down Expand Up @@ -259,7 +264,11 @@ public MlMemoryAutoscalingCapacity scale(Settings configuration, AutoscalingDeci
}
// We should keep this check here as well as in the processor decider while cloud is not
// reacting to processor autoscaling.
if (modelAssignmentsRequireMoreThanHalfCpu(mlContext.modelAssignments.values(), mlContext.mlNodes, configuration)) {
if (modelAssignmentsRequireMoreThanHalfCpu(
mlContext.modelAssignments.values(),
mlContext.mlNodes,
allocatedProcessorsScale
)) {
logger.debug("not down-scaling; model assignments require more than half of the ML tier's allocated processors");
return null;
}
Expand Down Expand Up @@ -828,12 +837,12 @@ static MlMemoryAutoscalingCapacity ensureScaleDown(
static boolean modelAssignmentsRequireMoreThanHalfCpu(
Collection<TrainedModelAssignment> assignments,
List<DiscoveryNode> mlNodes,
Settings settings
int allocatedProcessorsScale
) {
int totalRequiredProcessors = assignments.stream()
.mapToInt(t -> t.getTaskParams().getNumberOfAllocations() * t.getTaskParams().getThreadsPerAllocation())
.sum();
int totalMlProcessors = mlNodes.stream().mapToInt(node -> MlProcessors.get(node, settings).roundUp()).sum();
int totalMlProcessors = mlNodes.stream().mapToInt(node -> MlProcessors.get(node, allocatedProcessorsScale).roundUp()).sum();
return totalRequiredProcessors * 2 > totalMlProcessors;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ class MlProcessorAutoscalingDecider {
this.scaleTimer = Objects.requireNonNull(scaleTimer);
}

public MlProcessorAutoscalingCapacity scale(Settings configuration, AutoscalingDeciderContext context, MlAutoscalingContext mlContext) {
public MlProcessorAutoscalingCapacity scale(
Settings configuration,
AutoscalingDeciderContext context,
MlAutoscalingContext mlContext,
int allocatedProcessorsScale
) {
TrainedModelAssignmentMetadata trainedModelAssignmentMetadata = TrainedModelAssignmentMetadata.fromState(context.state());

if (hasUnsatisfiedDeployments(trainedModelAssignmentMetadata, mlContext.mlNodes)) {
Expand All @@ -52,7 +57,7 @@ public MlProcessorAutoscalingCapacity scale(Settings configuration, AutoscalingD
).build();
}

final MlProcessorAutoscalingCapacity currentCapacity = computeCurrentCapacity(mlContext.mlNodes, configuration);
final MlProcessorAutoscalingCapacity currentCapacity = computeCurrentCapacity(mlContext.mlNodes, allocatedProcessorsScale);

final MlProcessorAutoscalingCapacity requiredCapacity = computeRequiredCapacity(trainedModelAssignmentMetadata).build();

Expand All @@ -65,7 +70,7 @@ public MlProcessorAutoscalingCapacity scale(Settings configuration, AutoscalingD
if (MlMemoryAutoscalingDecider.modelAssignmentsRequireMoreThanHalfCpu(
trainedModelAssignmentMetadata.allAssignments().values(),
mlContext.mlNodes,
configuration
allocatedProcessorsScale
)) {
return MlProcessorAutoscalingCapacity.builder(currentCapacity.nodeProcessors(), currentCapacity.tierProcessors())
.setReason("not scaling down as model assignments require more than half of the ML tier's allocated processors")
Expand Down Expand Up @@ -137,11 +142,11 @@ private MlProcessorAutoscalingCapacity.Builder computeRequiredCapacity(TrainedMo
);
}

MlProcessorAutoscalingCapacity computeCurrentCapacity(List<DiscoveryNode> mlNodes, Settings settings) {
MlProcessorAutoscalingCapacity computeCurrentCapacity(List<DiscoveryNode> mlNodes, int allocatedProcessorsScale) {
Processors maxNodeProcessors = Processors.ZERO;
Processors tierProcessors = Processors.ZERO;
for (DiscoveryNode node : mlNodes) {
Processors nodeProcessors = MlProcessors.get(node, settings);
Processors nodeProcessors = MlProcessors.get(node, allocatedProcessorsScale);
if (nodeProcessors.compareTo(maxNodeProcessors) > 0) {
maxNodeProcessors = nodeProcessors;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public class TrainedModelAssignmentClusterService implements ClusterStateListene
private volatile int maxOpenJobs;
protected volatile int maxLazyMLNodes;
protected volatile long maxMLNodeSize;
protected volatile int allocatedProcessorsScale;

public TrainedModelAssignmentClusterService(
Settings settings,
Expand All @@ -99,6 +100,7 @@ public TrainedModelAssignmentClusterService(
this.maxOpenJobs = MachineLearning.MAX_OPEN_JOBS_PER_NODE.get(settings);
this.maxLazyMLNodes = MachineLearning.MAX_LAZY_ML_NODES.get(settings);
this.maxMLNodeSize = MachineLearning.MAX_ML_NODE_SIZE.get(settings).getBytes();
this.allocatedProcessorsScale = MachineLearning.ALLOCATED_PROCESSORS_SCALE.get(settings);
// Only nodes that can possibly be master nodes really need this service running
if (DiscoveryNode.isMasterNode(settings)) {
clusterService.addListener(this);
Expand All @@ -109,6 +111,8 @@ public TrainedModelAssignmentClusterService(
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_OPEN_JOBS_PER_NODE, this::setMaxOpenJobs);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_LAZY_ML_NODES, this::setMaxLazyMLNodes);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_ML_NODE_SIZE, this::setMaxMLNodeSize);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MachineLearning.ALLOCATED_PROCESSORS_SCALE, this::setAllocatedProcessorsScale);
}
}

Expand All @@ -132,6 +136,10 @@ private void setMaxMLNodeSize(ByteSizeValue value) {
this.maxMLNodeSize = value.getBytes();
}

private void setAllocatedProcessorsScale(int scale) {
this.allocatedProcessorsScale = scale;
}

@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
clusterService.submitUnbatchedStateUpdateTask(source, task);
Expand Down Expand Up @@ -487,9 +495,10 @@ private TrainedModelAssignmentMetadata.Builder rebalanceAssignments(
TrainedModelAssignmentMetadata.fromState(currentState),
nodeLoads,
nodeAvailabilityZoneMapper.buildMlNodesByAvailabilityZone(currentState),
modelToAdd
modelToAdd,
allocatedProcessorsScale
);
TrainedModelAssignmentMetadata.Builder rebalanced = rebalancer.rebalance(clusterService.getSettings());
TrainedModelAssignmentMetadata.Builder rebalanced = rebalancer.rebalance();
if (modelToAdd.isPresent()) {
checkModelIsFullyAllocatedIfScalingIsNotPossible(modelToAdd.get().getDeploymentId(), rebalanced, nodes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction;
import org.elasticsearch.xpack.core.ml.inference.assignment.Priority;
Expand Down Expand Up @@ -51,20 +50,23 @@ class TrainedModelAssignmentRebalancer {
private final Map<DiscoveryNode, NodeLoad> nodeLoads;
private final Map<List<String>, Collection<DiscoveryNode>> mlNodesByZone;
private final Optional<StartTrainedModelDeploymentAction.TaskParams> deploymentToAdd;
private final int allocatedProcessorsScale;

TrainedModelAssignmentRebalancer(
TrainedModelAssignmentMetadata currentMetadata,
Map<DiscoveryNode, NodeLoad> nodeLoads,
Map<List<String>, Collection<DiscoveryNode>> mlNodesByZone,
Optional<StartTrainedModelDeploymentAction.TaskParams> deploymentToAdd
Optional<StartTrainedModelDeploymentAction.TaskParams> deploymentToAdd,
int allocatedProcessorsScale
) {
this.currentMetadata = Objects.requireNonNull(currentMetadata);
this.nodeLoads = Objects.requireNonNull(nodeLoads);
this.mlNodesByZone = Objects.requireNonNull(mlNodesByZone);
this.deploymentToAdd = Objects.requireNonNull(deploymentToAdd);
this.allocatedProcessorsScale = allocatedProcessorsScale;
}

TrainedModelAssignmentMetadata.Builder rebalance(Settings settings) {
TrainedModelAssignmentMetadata.Builder rebalance() {
if (deploymentToAdd.isPresent() && currentMetadata.hasDeployment(deploymentToAdd.get().getDeploymentId())) {
throw new ResourceAlreadyExistsException(
"[{}] assignment for deployment with model [{}] already exists",
Expand All @@ -78,8 +80,8 @@ TrainedModelAssignmentMetadata.Builder rebalance(Settings settings) {
return TrainedModelAssignmentMetadata.Builder.fromMetadata(currentMetadata);
}

AssignmentPlan assignmentPlan = computeAssignmentPlan(settings);
return buildAssignmentsFromPlan(assignmentPlan, settings);
AssignmentPlan assignmentPlan = computeAssignmentPlan();
return buildAssignmentsFromPlan(assignmentPlan);
}

private boolean areAllModelsSatisfiedAndNoOutdatedRoutingEntries() {
Expand All @@ -92,8 +94,8 @@ private boolean areAllModelsSatisfiedAndNoOutdatedRoutingEntries() {
return true;
}

AssignmentPlan computeAssignmentPlan(Settings settings) {
final Map<List<String>, List<AssignmentPlan.Node>> nodesByZone = createNodesByZoneMap(settings);
AssignmentPlan computeAssignmentPlan() {
final Map<List<String>, List<AssignmentPlan.Node>> nodesByZone = createNodesByZoneMap();
final Set<String> assignableNodeIds = nodesByZone.values()
.stream()
.flatMap(List::stream)
Expand Down Expand Up @@ -271,7 +273,7 @@ private Map<String, Integer> findFittingAssignments(
return fittingAssignments;
}

private Map<List<String>, List<AssignmentPlan.Node>> createNodesByZoneMap(Settings settings) {
private Map<List<String>, List<AssignmentPlan.Node>> createNodesByZoneMap() {
return mlNodesByZone.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> {
Collection<DiscoveryNode> discoveryNodes = e.getValue();
List<AssignmentPlan.Node> nodes = new ArrayList<>();
Expand All @@ -285,7 +287,7 @@ private Map<List<String>, List<AssignmentPlan.Node>> createNodesByZoneMap(Settin
// We subtract native inference memory as the planner expects available memory for
// native inference including current assignments.
getNodeFreeMemoryExcludingPerNodeOverheadAndNativeInference(load),
MlProcessors.get(discoveryNode, settings).roundUp()
MlProcessors.get(discoveryNode, allocatedProcessorsScale).roundUp()
)
);
} else {
Expand All @@ -305,7 +307,7 @@ private static long getNodeFreeMemoryExcludingPerNodeOverheadAndNativeInference(
return load.getFreeMemoryExcludingPerNodeOverhead() - load.getAssignedNativeInferenceMemory();
}

private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(AssignmentPlan assignmentPlan, Settings settings) {
private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(AssignmentPlan assignmentPlan) {
TrainedModelAssignmentMetadata.Builder builder = TrainedModelAssignmentMetadata.Builder.empty();
for (AssignmentPlan.Deployment deployment : assignmentPlan.models()) {
TrainedModelAssignment existingAssignment = currentMetadata.getDeploymentAssignment(deployment.id());
Expand Down Expand Up @@ -343,7 +345,7 @@ private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(Assignme
}
assignmentBuilder.calculateAndSetAssignmentState();

explainAssignments(assignmentPlan, nodeLoads, deployment, settings).ifPresent(assignmentBuilder::setReason);
explainAssignments(assignmentPlan, nodeLoads, deployment).ifPresent(assignmentBuilder::setReason);
builder.addNewAssignment(deployment.id(), assignmentBuilder);
}
return builder;
Expand All @@ -352,8 +354,7 @@ private TrainedModelAssignmentMetadata.Builder buildAssignmentsFromPlan(Assignme
private Optional<String> explainAssignments(
AssignmentPlan assignmentPlan,
Map<DiscoveryNode, NodeLoad> nodeLoads,
AssignmentPlan.Deployment deployment,
Settings settings
AssignmentPlan.Deployment deployment
) {
if (assignmentPlan.satisfiesAllocations(deployment)) {
return Optional.empty();
Expand All @@ -365,7 +366,7 @@ private Optional<String> explainAssignments(

Map<String, String> nodeToReason = new TreeMap<>();
for (Map.Entry<DiscoveryNode, NodeLoad> nodeAndLoad : nodeLoads.entrySet()) {
Optional<String> reason = explainAssignment(assignmentPlan, nodeAndLoad.getKey(), nodeAndLoad.getValue(), deployment, settings);
Optional<String> reason = explainAssignment(assignmentPlan, nodeAndLoad.getKey(), nodeAndLoad.getValue(), deployment);
reason.ifPresent(s -> nodeToReason.put(nodeAndLoad.getKey().getId(), s));
}

Expand All @@ -384,8 +385,7 @@ private Optional<String> explainAssignment(
AssignmentPlan assignmentPlan,
DiscoveryNode node,
NodeLoad load,
AssignmentPlan.Deployment deployment,
Settings settings
AssignmentPlan.Deployment deployment
) {
if (Strings.isNullOrEmpty(load.getError()) == false) {
return Optional.of(load.getError());
Expand All @@ -398,7 +398,7 @@ private Optional<String> explainAssignment(
// But we should also check if we managed to assign a model during the rebalance for which
// we check if the node has used up any of its allocated processors.
boolean isPerNodeOverheadAccountedFor = load.getNumAssignedJobsAndModels() > 0
|| assignmentPlan.getRemainingNodeCores(load.getNodeId()) < MlProcessors.get(node, settings).roundUp();
|| assignmentPlan.getRemainingNodeCores(load.getNodeId()) < MlProcessors.get(node, allocatedProcessorsScale).roundUp();
long requiredMemory = deployment.memoryBytes() + (isPerNodeOverheadAccountedFor
? 0
: MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes());
Expand Down Expand Up @@ -427,7 +427,7 @@ private Optional<String> explainAssignment(
"This node has insufficient allocated processors. Available processors [{}], free processors [{}], "
+ "processors required for each allocation of this model [{}]",
new Object[] {
MlProcessors.get(node, settings).roundUp(),
MlProcessors.get(node, allocatedProcessorsScale).roundUp(),
assignmentPlan.getRemainingNodeCores(node.getId()),
deployment.threadsPerAllocation() }
)
Expand Down
Loading