Skip to content

Commit

Permalink
[ML] Make the scale the processor count setting updatable (#98298)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkyle authored Aug 10, 2023
1 parent 6269744 commit 92316f5
Show file tree
Hide file tree
Showing 15 changed files with 206 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -596,10 +596,10 @@ public void loadExtensions(ExtensionLoader loader) {
* allocations that can be deployed on a node.
*/
public static final Setting<Integer> ALLOCATED_PROCESSORS_SCALE = Setting.intSetting(
"ml.allocated_processors_scale",
"xpack.ml.allocated_processors_scale",
1,
1,
Property.OperatorDynamic,
Property.Dynamic,
Property.NodeScope
);

Expand Down Expand Up @@ -782,6 +782,7 @@ public static boolean isMlNode(DiscoveryNode node) {
@Override
public List<Setting<?>> getSettings() {
return List.of(
ALLOCATED_PROCESSORS_SCALE,
MachineLearningField.AUTODETECT_PROCESS,
PROCESS_CONNECT_TIMEOUT,
CONCURRENT_JOB_ALLOCATIONS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ protected void masterOperation(Task task, Request request, ClusterState state, A
if (mlMemoryTracker.isRecentlyRefreshed()) {
MlAutoscalingResourceTracker.getMlAutoscalingStats(
state,
clusterService.getClusterSettings(),
parentTaskAssigningClient,
request.timeout(),
mlMemoryTracker,
Expand All @@ -96,6 +97,7 @@ protected void masterOperation(Task task, Request request, ClusterState state, A
ActionListener.wrap(
ignored -> MlAutoscalingResourceTracker.getMlAutoscalingStats(
state,
clusterService.getClusterSettings(),
parentTaskAssigningClient,
request.timeout(),
mlMemoryTracker,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderContext;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderResult;
import org.elasticsearch.xpack.autoscaling.capacity.AutoscalingDeciderService;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.NodeLoadDetector;
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;

Expand All @@ -43,6 +44,7 @@ public class MlAutoscalingDeciderService implements AutoscalingDeciderService, L
private final MlProcessorAutoscalingDecider processorDecider;

private volatile boolean isMaster;
private volatile int allocatedProcessorsScale;

public MlAutoscalingDeciderService(
MlMemoryTracker memoryTracker,
Expand All @@ -69,7 +71,15 @@ public MlAutoscalingDeciderService(
scaleTimer
);
this.processorDecider = new MlProcessorAutoscalingDecider(scaleTimer);
this.allocatedProcessorsScale = MachineLearning.ALLOCATED_PROCESSORS_SCALE.get(settings);

clusterService.addLocalNodeMasterListener(this);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MachineLearning.ALLOCATED_PROCESSORS_SCALE, this::setAllocatedProcessorsScale);
}

void setAllocatedProcessorsScale(int scale) {
this.allocatedProcessorsScale = scale;
}

@Override
Expand All @@ -96,7 +106,7 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
final MlMemoryAutoscalingCapacity currentMemoryCapacity = memoryDecider.capacityFromNativeMemory(currentNativeMemoryCapacity);
final MlProcessorAutoscalingCapacity currentProcessorCapacity = processorDecider.computeCurrentCapacity(
mlContext.mlNodes,
configuration
allocatedProcessorsScale
);

final MlScalingReason.Builder reasonBuilder = MlScalingReason.builder(mlContext)
Expand All @@ -123,7 +133,7 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
return downscaleToZero(configuration, context, currentNativeMemoryCapacity, reasonBuilder);
}

MlMemoryAutoscalingCapacity memoryCapacity = memoryDecider.scale(configuration, context, mlContext);
MlMemoryAutoscalingCapacity memoryCapacity = memoryDecider.scale(configuration, context, mlContext, allocatedProcessorsScale);
if (memoryCapacity.isUndetermined()) {
// If we cannot determine memory capacity we shouldn't make any autoscaling decision
// as it could lead to undesired capacity. For example, it could be that the processor decider decides
Expand All @@ -134,7 +144,12 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider
reasonBuilder.setSimpleReason(format("[memory_decider] %s", memoryCapacity.reason())).build()
);
}
MlProcessorAutoscalingCapacity processorCapacity = processorDecider.scale(configuration, context, mlContext);
MlProcessorAutoscalingCapacity processorCapacity = processorDecider.scale(
configuration,
context,
mlContext,
allocatedProcessorsScale
);
reasonBuilder.setSimpleReason(
format("[memory_decider] %s; [processor_decider] %s", memoryCapacity.reason(), processorCapacity.reason())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
Expand Down Expand Up @@ -65,6 +66,7 @@ private MlAutoscalingResourceTracker() {}

public static void getMlAutoscalingStats(
ClusterState clusterState,
ClusterSettings clusterSettings,
Client client,
TimeValue timeout,
MlMemoryTracker mlMemoryTracker,
Expand All @@ -82,7 +84,8 @@ public static void getMlAutoscalingStats(
? NativeMemoryCalculator.allowedBytesForMl(clusterState.nodes().get(mlNodes[0]), settings).orElse(0L)
: 0L;
int processorsAvailableFirstNode = mlNodes.length > 0
? MlProcessors.get(clusterState.nodes().get(mlNodes[0]), settings).roundDown()
? MlProcessors.get(clusterState.nodes().get(mlNodes[0]), clusterSettings.get(MachineLearning.ALLOCATED_PROCESSORS_SCALE))
.roundDown()
: 0;

// Todo: MAX_LOW_PRIORITY_MODELS_PER_NODE not checked yet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,12 @@ void setMaxMlNodeSize(ByteSizeValue maxMlNodeSize) {
}
}

public MlMemoryAutoscalingCapacity scale(Settings configuration, AutoscalingDeciderContext context, MlAutoscalingContext mlContext) {
public MlMemoryAutoscalingCapacity scale(
Settings configuration,
AutoscalingDeciderContext context,
MlAutoscalingContext mlContext,
int allocatedProcessorsScale
) {
final ClusterState clusterState = context.state();

scaleTimer.lastScaleToScaleIntervalMillis()
Expand Down Expand Up @@ -258,7 +263,11 @@ public MlMemoryAutoscalingCapacity scale(Settings configuration, AutoscalingDeci
}
// We should keep this check here as well as in the processor decider while cloud is not
// reacting to processor autoscaling.
if (modelAssignmentsRequireMoreThanHalfCpu(mlContext.modelAssignments.values(), mlContext.mlNodes, configuration)) {
if (modelAssignmentsRequireMoreThanHalfCpu(
mlContext.modelAssignments.values(),
mlContext.mlNodes,
allocatedProcessorsScale
)) {
logger.debug("not down-scaling; model assignments require more than half of the ML tier's allocated processors");
return null;
}
Expand Down Expand Up @@ -818,12 +827,12 @@ static MlMemoryAutoscalingCapacity ensureScaleDown(
static boolean modelAssignmentsRequireMoreThanHalfCpu(
Collection<TrainedModelAssignment> assignments,
List<DiscoveryNode> mlNodes,
Settings settings
int allocatedProcessorsScale
) {
int totalRequiredProcessors = assignments.stream()
.mapToInt(t -> t.getTaskParams().getNumberOfAllocations() * t.getTaskParams().getThreadsPerAllocation())
.sum();
int totalMlProcessors = mlNodes.stream().mapToInt(node -> MlProcessors.get(node, settings).roundUp()).sum();
int totalMlProcessors = mlNodes.stream().mapToInt(node -> MlProcessors.get(node, allocatedProcessorsScale).roundUp()).sum();
return totalRequiredProcessors * 2 > totalMlProcessors;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ class MlProcessorAutoscalingDecider {
this.scaleTimer = Objects.requireNonNull(scaleTimer);
}

public MlProcessorAutoscalingCapacity scale(Settings configuration, AutoscalingDeciderContext context, MlAutoscalingContext mlContext) {
public MlProcessorAutoscalingCapacity scale(
Settings configuration,
AutoscalingDeciderContext context,
MlAutoscalingContext mlContext,
int allocatedProcessorsScale
) {
TrainedModelAssignmentMetadata trainedModelAssignmentMetadata = TrainedModelAssignmentMetadata.fromState(context.state());

if (hasUnsatisfiedDeployments(trainedModelAssignmentMetadata, mlContext.mlNodes)) {
Expand All @@ -52,7 +57,7 @@ public MlProcessorAutoscalingCapacity scale(Settings configuration, AutoscalingD
).build();
}

final MlProcessorAutoscalingCapacity currentCapacity = computeCurrentCapacity(mlContext.mlNodes, configuration);
final MlProcessorAutoscalingCapacity currentCapacity = computeCurrentCapacity(mlContext.mlNodes, allocatedProcessorsScale);

final MlProcessorAutoscalingCapacity requiredCapacity = computeRequiredCapacity(trainedModelAssignmentMetadata).build();

Expand All @@ -65,7 +70,7 @@ public MlProcessorAutoscalingCapacity scale(Settings configuration, AutoscalingD
if (MlMemoryAutoscalingDecider.modelAssignmentsRequireMoreThanHalfCpu(
trainedModelAssignmentMetadata.allAssignments().values(),
mlContext.mlNodes,
configuration
allocatedProcessorsScale
)) {
return MlProcessorAutoscalingCapacity.builder(currentCapacity.nodeProcessors(), currentCapacity.tierProcessors())
.setReason("not scaling down as model assignments require more than half of the ML tier's allocated processors")
Expand Down Expand Up @@ -137,11 +142,11 @@ private MlProcessorAutoscalingCapacity.Builder computeRequiredCapacity(TrainedMo
);
}

MlProcessorAutoscalingCapacity computeCurrentCapacity(List<DiscoveryNode> mlNodes, Settings settings) {
MlProcessorAutoscalingCapacity computeCurrentCapacity(List<DiscoveryNode> mlNodes, int allocatedProcessorsScale) {
Processors maxNodeProcessors = Processors.ZERO;
Processors tierProcessors = Processors.ZERO;
for (DiscoveryNode node : mlNodes) {
Processors nodeProcessors = MlProcessors.get(node, settings);
Processors nodeProcessors = MlProcessors.get(node, allocatedProcessorsScale);
if (nodeProcessors.compareTo(maxNodeProcessors) > 0) {
maxNodeProcessors = nodeProcessors;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public class TrainedModelAssignmentClusterService implements ClusterStateListene
private volatile int maxOpenJobs;
protected volatile int maxLazyMLNodes;
protected volatile long maxMLNodeSize;
protected volatile int allocatedProcessorsScale;

public TrainedModelAssignmentClusterService(
Settings settings,
Expand All @@ -99,6 +100,7 @@ public TrainedModelAssignmentClusterService(
this.maxOpenJobs = MachineLearning.MAX_OPEN_JOBS_PER_NODE.get(settings);
this.maxLazyMLNodes = MachineLearning.MAX_LAZY_ML_NODES.get(settings);
this.maxMLNodeSize = MachineLearning.MAX_ML_NODE_SIZE.get(settings).getBytes();
this.allocatedProcessorsScale = MachineLearning.ALLOCATED_PROCESSORS_SCALE.get(settings);
// Only nodes that can possibly be master nodes really need this service running
if (DiscoveryNode.isMasterNode(settings)) {
clusterService.addListener(this);
Expand All @@ -109,6 +111,8 @@ public TrainedModelAssignmentClusterService(
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_OPEN_JOBS_PER_NODE, this::setMaxOpenJobs);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_LAZY_ML_NODES, this::setMaxLazyMLNodes);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MachineLearning.MAX_ML_NODE_SIZE, this::setMaxMLNodeSize);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MachineLearning.ALLOCATED_PROCESSORS_SCALE, this::setAllocatedProcessorsScale);
}
}

Expand All @@ -132,6 +136,10 @@ private void setMaxMLNodeSize(ByteSizeValue value) {
this.maxMLNodeSize = value.getBytes();
}

private void setAllocatedProcessorsScale(int scale) {
this.allocatedProcessorsScale = scale;
}

@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
clusterService.submitUnbatchedStateUpdateTask(source, task);
Expand Down Expand Up @@ -486,9 +494,10 @@ private TrainedModelAssignmentMetadata.Builder rebalanceAssignments(
TrainedModelAssignmentMetadata.fromState(currentState),
nodeLoads,
nodeAvailabilityZoneMapper.buildMlNodesByAvailabilityZone(currentState),
modelToAdd
modelToAdd,
allocatedProcessorsScale
);
TrainedModelAssignmentMetadata.Builder rebalanced = rebalancer.rebalance(clusterService.getSettings());
TrainedModelAssignmentMetadata.Builder rebalanced = rebalancer.rebalance();
if (modelToAdd.isPresent()) {
checkModelIsFullyAllocatedIfScalingIsNotPossible(modelToAdd.get().getDeploymentId(), rebalanced, nodes);
}
Expand Down
Loading

0 comments on commit 92316f5

Please sign in to comment.