Skip to content

Commit

Permalink
[ML] More accurate job memory overhead (#47516)
Browse files Browse the repository at this point in the history
When an ML job runs the memory required can be
broken down into:

1. Memory required to load the executable code
2. Instrumented model memory
3. Other memory used by the job's main process or
   ancilliary processes that is not instrumented

Previously we added a simple fixed overhead to
account for 1 and 3. This was 100MB for anomaly
detection jobs (large because of the completely
uninstrumented categorization function and
normalize process), and 20MB for data frame
analytics jobs.

However, this was an oversimplification because
the executable code only needs to be loaded once
per machine.  Also the 100MB overhead for anomaly
detection jobs was probably too high in most cases
because categorization and normalization don't use
_that_ much memory.

This PR therefore changes the calculation of memory
requirements as follows:

1. A per-node overhead of 30MB for _only_ the first
   job of any type to be run on a given node - this
   is to account for loading the executable code
2. The established model memory (if applicable) or
   model memory limit of the job
3. A per-job overhead of 10MB for anomaly detection
   jobs and 5MB for data frame analytics jobs, to
   account for the uninstrumented memory usage

This change will enable more jobs to be run on the
same node.  It will be particularly beneficial when
there are a large number of small jobs.  It will
have less of an effect when there are a small number
of large jobs.
  • Loading branch information
droberts195 authored Oct 4, 2019
1 parent e036ac4 commit d683b20
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {

public static final ByteSizeValue DEFAULT_MODEL_MEMORY_LIMIT = new ByteSizeValue(1, ByteSizeUnit.GB);
public static final ByteSizeValue MIN_MODEL_MEMORY_LIMIT = new ByteSizeValue(1, ByteSizeUnit.MB);
public static final ByteSizeValue PROCESS_MEMORY_OVERHEAD = new ByteSizeValue(20, ByteSizeUnit.MB);
/**
* This includes the overhead of thread stacks and data structures that the program might use that
* are not instrumented. But it does NOT include the memory used by loading the executable code.
*/
public static final ByteSizeValue PROCESS_MEMORY_OVERHEAD = new ByteSizeValue(5, ByteSizeUnit.MB);

public static final ParseField ID = new ParseField("id");
public static final ParseField DESCRIPTION = new ParseField("description");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,14 @@ public class Job extends AbstractDiffable<Job> implements Writeable, ToXContentO
public static final ObjectParser<Builder, Void> STRICT_PARSER = createParser(false);

public static final TimeValue MIN_BACKGROUND_PERSIST_INTERVAL = TimeValue.timeValueHours(1);
public static final ByteSizeValue PROCESS_MEMORY_OVERHEAD = new ByteSizeValue(100, ByteSizeUnit.MB);

/**
* This includes the overhead of thread stacks and data structures that the program might use that
* are not instrumented. (For the <code>autodetect</code> process categorization is not instrumented,
* and the <code>normalize</code> process is not instrumented at all.) But this overhead does NOT
* include the memory used by loading the executable code.
*/
public static final ByteSizeValue PROCESS_MEMORY_OVERHEAD = new ByteSizeValue(10, ByteSizeUnit.MB);

public static final long DEFAULT_MODEL_SNAPSHOT_RETENTION_DAYS = 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,12 @@ public Set<DiscoveryNodeRole> getRoles() {
public static final String MACHINE_MEMORY_NODE_ATTR = "ml.machine_memory";
public static final Setting<Integer> CONCURRENT_JOB_ALLOCATIONS =
Setting.intSetting("xpack.ml.node_concurrent_job_allocations", 2, 0, Property.Dynamic, Property.NodeScope);
/**
* The amount of memory needed to load the ML native code shared libraries. The assumption is that the first
* ML job to run on a given node will do this, and then subsequent ML jobs on the same node will reuse the
* same already-loaded code.
*/
public static final ByteSizeValue NATIVE_EXECUTABLE_CODE_OVERHEAD = new ByteSizeValue(30, ByteSizeUnit.MB);
// Values higher than 100% are allowed to accommodate use cases where swapping has been determined to be acceptable.
// Anomaly detector jobs only use their full model memory during background persistence, and this is deliberately
// staggered, so with large numbers of jobs few will generally be persisting state at the same time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public PersistentTasksCustomMetaData.Assignment selectNode(int dynamicMaxOpenJob
continue;
}

// Assuming the node is elligible at all, check loading
// Assuming the node is eligible at all, check loading
CurrentLoad currentLoad = calculateCurrentLoadForNode(node, persistentTasks, allocateByMemory);
allocateByMemory = currentLoad.allocateByMemory;

Expand Down Expand Up @@ -170,6 +170,11 @@ public PersistentTasksCustomMetaData.Assignment selectNode(int dynamicMaxOpenJob
long maxMlMemory = machineMemory * maxMachineMemoryPercent / 100;
Long estimatedMemoryFootprint = memoryTracker.getJobMemoryRequirement(taskName, jobId);
if (estimatedMemoryFootprint != null) {
// If this will be the first job assigned to the node then it will need to
// load the native code shared libraries, so add the overhead for this
if (currentLoad.numberOfAssignedJobs == 0) {
estimatedMemoryFootprint += MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes();
}
long availableMemory = maxMlMemory - currentLoad.assignedJobMemory;
if (estimatedMemoryFootprint > availableMemory) {
reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node)
Expand Down Expand Up @@ -283,6 +288,11 @@ private CurrentLoad calculateCurrentLoadForNode(DiscoveryNode node, PersistentTa
}
}
}
// if any jobs are running then the native code will be loaded, but shared between all jobs,
// so increase the total memory usage of the assigned jobs to account for this
if (result.numberOfAssignedJobs > 0) {
result.assignedJobMemory += MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes();
}
}

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,12 @@ public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_maxCapacityMemoryLi
int currentlyRunningJobsPerNode = randomIntBetween(1, 100);
int maxRunningJobsPerNode = currentlyRunningJobsPerNode + 1;
// Be careful if changing this - in order for the error message to be exactly as expected
// the value here must divide exactly into (JOB_MEMORY_REQUIREMENT.getBytes() * 100)
int maxMachineMemoryPercent = 40;
long machineMemory = currentlyRunningJobsPerNode * JOB_MEMORY_REQUIREMENT.getBytes() * 100 / maxMachineMemoryPercent;
// the value here must divide exactly into both (JOB_MEMORY_REQUIREMENT.getBytes() * 100) and
// MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes()
int maxMachineMemoryPercent = 20;
long currentlyRunningJobMemory = MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() +
currentlyRunningJobsPerNode * JOB_MEMORY_REQUIREMENT.getBytes();
long machineMemory = currentlyRunningJobMemory * 100 / maxMachineMemoryPercent;

Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(maxRunningJobsPerNode));
Expand All @@ -193,19 +196,46 @@ public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_maxCapacityMemoryLi
jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed);
assertNull(result.getExecutorNode());
assertThat(result.getExplanation(), containsString("because this node has insufficient available memory. "
+ "Available memory for ML [" + (machineMemory * maxMachineMemoryPercent / 100) + "], memory required by existing jobs ["
+ (JOB_MEMORY_REQUIREMENT.getBytes() * currentlyRunningJobsPerNode) + "], estimated memory required for this job ["
+ JOB_MEMORY_REQUIREMENT.getBytes() + "]"));
+ "Available memory for ML [" + currentlyRunningJobMemory + "], memory required by existing jobs ["
+ currentlyRunningJobMemory + "], estimated memory required for this job [" + JOB_MEMORY_REQUIREMENT.getBytes() + "]"));
}

public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_firstJobTooBigMemoryLimiting() {
int numNodes = randomIntBetween(1, 10);
int maxRunningJobsPerNode = randomIntBetween(1, 100);
int maxMachineMemoryPercent = 20;
long firstJobTotalMemory = MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + JOB_MEMORY_REQUIREMENT.getBytes();
long machineMemory = (firstJobTotalMemory - 1) * 100 / maxMachineMemoryPercent;

Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(maxRunningJobsPerNode));
nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, Long.toString(machineMemory));

ClusterState.Builder cs = fillNodesWithRunningJobs(nodeAttr, numNodes, 0);

Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id1000", JOB_MEMORY_REQUIREMENT).build(new Date());

JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), job.getId(), MlTasks.JOB_TASK_NAME, memoryTracker,
0, node -> TransportOpenJobAction.nodeFilter(node, job));
PersistentTasksCustomMetaData.Assignment result =
jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed);
assertNull(result.getExecutorNode());
assertThat(result.getExplanation(), containsString("because this node has insufficient available memory. "
+ "Available memory for ML [" + (firstJobTotalMemory - 1)
+ "], memory required by existing jobs [0], estimated memory required for this job [" + firstJobTotalMemory + "]"));
}

public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_maxCapacityMemoryLimiting() {
int numNodes = randomIntBetween(1, 10);
int currentlyRunningJobsPerNode = randomIntBetween(1, 100);
int maxRunningJobsPerNode = currentlyRunningJobsPerNode + 1;
// Be careful if changing this - in order for the error message to be exactly as expected
// the value here must divide exactly into (JOB_MEMORY_REQUIREMENT.getBytes() * 100)
int maxMachineMemoryPercent = 40;
long machineMemory = currentlyRunningJobsPerNode * JOB_MEMORY_REQUIREMENT.getBytes() * 100 / maxMachineMemoryPercent;
// the value here must divide exactly into both (JOB_MEMORY_REQUIREMENT.getBytes() * 100) and
// MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes()
int maxMachineMemoryPercent = 20;
long currentlyRunningJobMemory = MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() +
currentlyRunningJobsPerNode * JOB_MEMORY_REQUIREMENT.getBytes();
long machineMemory = currentlyRunningJobMemory * 100 / maxMachineMemoryPercent;

Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(maxRunningJobsPerNode));
Expand All @@ -222,9 +252,34 @@ public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_maxCapacityMemor
jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed);
assertNull(result.getExecutorNode());
assertThat(result.getExplanation(), containsString("because this node has insufficient available memory. "
+ "Available memory for ML [" + (machineMemory * maxMachineMemoryPercent / 100) + "], memory required by existing jobs ["
+ (JOB_MEMORY_REQUIREMENT.getBytes() * currentlyRunningJobsPerNode) + "], estimated memory required for this job ["
+ JOB_MEMORY_REQUIREMENT.getBytes() + "]"));
+ "Available memory for ML [" + currentlyRunningJobMemory + "], memory required by existing jobs ["
+ currentlyRunningJobMemory + "], estimated memory required for this job [" + JOB_MEMORY_REQUIREMENT.getBytes() + "]"));
}

public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_firstJobTooBigMemoryLimiting() {
int numNodes = randomIntBetween(1, 10);
int maxRunningJobsPerNode = randomIntBetween(1, 100);
int maxMachineMemoryPercent = 20;
long firstJobTotalMemory = MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + JOB_MEMORY_REQUIREMENT.getBytes();
long machineMemory = (firstJobTotalMemory - 1) * 100 / maxMachineMemoryPercent;

Map<String, String> nodeAttr = new HashMap<>();
nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(maxRunningJobsPerNode));
nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, Long.toString(machineMemory));

ClusterState.Builder cs = fillNodesWithRunningJobs(nodeAttr, numNodes, 0);

String dataFrameAnalyticsId = "data_frame_analytics_id1000";

JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), dataFrameAnalyticsId,
MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, memoryTracker, 0,
node -> TransportStartDataFrameAnalyticsAction.TaskExecutor.nodeFilter(node, dataFrameAnalyticsId));
PersistentTasksCustomMetaData.Assignment result =
jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed);
assertNull(result.getExecutorNode());
assertThat(result.getExplanation(), containsString("because this node has insufficient available memory. "
+ "Available memory for ML [" + (firstJobTotalMemory - 1)
+ "], memory required by existing jobs [0], estimated memory required for this job [" + firstJobTotalMemory + "]"));
}

public void testSelectLeastLoadedMlNode_noMlNodes() {
Expand Down

0 comments on commit d683b20

Please sign in to comment.