From d683b2060b1775a517778d7ac28adbe84d524336 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Fri, 4 Oct 2019 09:16:56 +0100 Subject: [PATCH] [ML] More accurate job memory overhead (#47516) When an ML job runs the memory required can be broken down into: 1. Memory required to load the executable code 2. Instrumented model memory 3. Other memory used by the job's main process or ancilliary processes that is not instrumented Previously we added a simple fixed overhead to account for 1 and 3. This was 100MB for anomaly detection jobs (large because of the completely uninstrumented categorization function and normalize process), and 20MB for data frame analytics jobs. However, this was an oversimplification because the executable code only needs to be loaded once per machine. Also the 100MB overhead for anomaly detection jobs was probably too high in most cases because categorization and normalization don't use _that_ much memory. This PR therefore changes the calculation of memory requirements as follows: 1. A per-node overhead of 30MB for _only_ the first job of any type to be run on a given node - this is to account for loading the executable code 2. The established model memory (if applicable) or model memory limit of the job 3. A per-job overhead of 10MB for anomaly detection jobs and 5MB for data frame analytics jobs, to account for the uninstrumented memory usage This change will enable more jobs to be run on the same node. It will be particularly beneficial when there are a large number of small jobs. It will have less of an effect when there are a small number of large jobs. --- .../dataframe/DataFrameAnalyticsConfig.java | 6 +- .../xpack/core/ml/job/config/Job.java | 9 ++- .../xpack/ml/MachineLearning.java | 6 ++ .../xpack/ml/job/JobNodeSelector.java | 12 ++- .../xpack/ml/job/JobNodeSelectorTests.java | 79 ++++++++++++++++--- 5 files changed, 97 insertions(+), 15 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java index 2d65013e31e1e..b327f4ec8882c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java @@ -41,7 +41,11 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable { public static final ByteSizeValue DEFAULT_MODEL_MEMORY_LIMIT = new ByteSizeValue(1, ByteSizeUnit.GB); public static final ByteSizeValue MIN_MODEL_MEMORY_LIMIT = new ByteSizeValue(1, ByteSizeUnit.MB); - public static final ByteSizeValue PROCESS_MEMORY_OVERHEAD = new ByteSizeValue(20, ByteSizeUnit.MB); + /** + * This includes the overhead of thread stacks and data structures that the program might use that + * are not instrumented. But it does NOT include the memory used by loading the executable code. + */ + public static final ByteSizeValue PROCESS_MEMORY_OVERHEAD = new ByteSizeValue(5, ByteSizeUnit.MB); public static final ParseField ID = new ParseField("id"); public static final ParseField DESCRIPTION = new ParseField("description"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java index 5a2b2314e8d53..f96d498b7e8a6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java @@ -86,7 +86,14 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO public static final ObjectParser STRICT_PARSER = createParser(false); public static final TimeValue MIN_BACKGROUND_PERSIST_INTERVAL = TimeValue.timeValueHours(1); - public static final ByteSizeValue PROCESS_MEMORY_OVERHEAD = new ByteSizeValue(100, ByteSizeUnit.MB); + + /** + * This includes the overhead of thread stacks and data structures that the program might use that + * are not instrumented. (For the autodetect process categorization is not instrumented, + * and the normalize process is not instrumented at all.) But this overhead does NOT + * include the memory used by loading the executable code. + */ + public static final ByteSizeValue PROCESS_MEMORY_OVERHEAD = new ByteSizeValue(10, ByteSizeUnit.MB); public static final long DEFAULT_MODEL_SNAPSHOT_RETENTION_DAYS = 1; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 9d1fdc762ea80..277d7018e8d4e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -334,6 +334,12 @@ public Set getRoles() { public static final String MACHINE_MEMORY_NODE_ATTR = "ml.machine_memory"; public static final Setting CONCURRENT_JOB_ALLOCATIONS = Setting.intSetting("xpack.ml.node_concurrent_job_allocations", 2, 0, Property.Dynamic, Property.NodeScope); + /** + * The amount of memory needed to load the ML native code shared libraries. The assumption is that the first + * ML job to run on a given node will do this, and then subsequent ML jobs on the same node will reuse the + * same already-loaded code. + */ + public static final ByteSizeValue NATIVE_EXECUTABLE_CODE_OVERHEAD = new ByteSizeValue(30, ByteSizeUnit.MB); // Values higher than 100% are allowed to accommodate use cases where swapping has been determined to be acceptable. // Anomaly detector jobs only use their full model memory during background persistence, and this is deliberately // staggered, so with large numbers of jobs few will generally be persisting state at the same time. diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java index aa97c13b21d69..79ddb58c2945e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java @@ -110,7 +110,7 @@ public PersistentTasksCustomMetaData.Assignment selectNode(int dynamicMaxOpenJob continue; } - // Assuming the node is elligible at all, check loading + // Assuming the node is eligible at all, check loading CurrentLoad currentLoad = calculateCurrentLoadForNode(node, persistentTasks, allocateByMemory); allocateByMemory = currentLoad.allocateByMemory; @@ -170,6 +170,11 @@ public PersistentTasksCustomMetaData.Assignment selectNode(int dynamicMaxOpenJob long maxMlMemory = machineMemory * maxMachineMemoryPercent / 100; Long estimatedMemoryFootprint = memoryTracker.getJobMemoryRequirement(taskName, jobId); if (estimatedMemoryFootprint != null) { + // If this will be the first job assigned to the node then it will need to + // load the native code shared libraries, so add the overhead for this + if (currentLoad.numberOfAssignedJobs == 0) { + estimatedMemoryFootprint += MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes(); + } long availableMemory = maxMlMemory - currentLoad.assignedJobMemory; if (estimatedMemoryFootprint > availableMemory) { reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) @@ -283,6 +288,11 @@ private CurrentLoad calculateCurrentLoadForNode(DiscoveryNode node, PersistentTa } } } + // if any jobs are running then the native code will be loaded, but shared between all jobs, + // so increase the total memory usage of the assigned jobs to account for this + if (result.numberOfAssignedJobs > 0) { + result.assignedJobMemory += MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes(); + } } return result; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java index ae1a738a4b33a..ab65f8d8c7938 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java @@ -175,9 +175,12 @@ public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_maxCapacityMemoryLi int currentlyRunningJobsPerNode = randomIntBetween(1, 100); int maxRunningJobsPerNode = currentlyRunningJobsPerNode + 1; // Be careful if changing this - in order for the error message to be exactly as expected - // the value here must divide exactly into (JOB_MEMORY_REQUIREMENT.getBytes() * 100) - int maxMachineMemoryPercent = 40; - long machineMemory = currentlyRunningJobsPerNode * JOB_MEMORY_REQUIREMENT.getBytes() * 100 / maxMachineMemoryPercent; + // the value here must divide exactly into both (JOB_MEMORY_REQUIREMENT.getBytes() * 100) and + // MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + int maxMachineMemoryPercent = 20; + long currentlyRunningJobMemory = MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + + currentlyRunningJobsPerNode * JOB_MEMORY_REQUIREMENT.getBytes(); + long machineMemory = currentlyRunningJobMemory * 100 / maxMachineMemoryPercent; Map nodeAttr = new HashMap<>(); nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(maxRunningJobsPerNode)); @@ -193,9 +196,33 @@ public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_maxCapacityMemoryLi jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), containsString("because this node has insufficient available memory. " - + "Available memory for ML [" + (machineMemory * maxMachineMemoryPercent / 100) + "], memory required by existing jobs [" - + (JOB_MEMORY_REQUIREMENT.getBytes() * currentlyRunningJobsPerNode) + "], estimated memory required for this job [" - + JOB_MEMORY_REQUIREMENT.getBytes() + "]")); + + "Available memory for ML [" + currentlyRunningJobMemory + "], memory required by existing jobs [" + + currentlyRunningJobMemory + "], estimated memory required for this job [" + JOB_MEMORY_REQUIREMENT.getBytes() + "]")); + } + + public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_firstJobTooBigMemoryLimiting() { + int numNodes = randomIntBetween(1, 10); + int maxRunningJobsPerNode = randomIntBetween(1, 100); + int maxMachineMemoryPercent = 20; + long firstJobTotalMemory = MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + JOB_MEMORY_REQUIREMENT.getBytes(); + long machineMemory = (firstJobTotalMemory - 1) * 100 / maxMachineMemoryPercent; + + Map nodeAttr = new HashMap<>(); + nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(maxRunningJobsPerNode)); + nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, Long.toString(machineMemory)); + + ClusterState.Builder cs = fillNodesWithRunningJobs(nodeAttr, numNodes, 0); + + Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id1000", JOB_MEMORY_REQUIREMENT).build(new Date()); + + JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), job.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, + 0, node -> TransportOpenJobAction.nodeFilter(node, job)); + PersistentTasksCustomMetaData.Assignment result = + jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed); + assertNull(result.getExecutorNode()); + assertThat(result.getExplanation(), containsString("because this node has insufficient available memory. " + + "Available memory for ML [" + (firstJobTotalMemory - 1) + + "], memory required by existing jobs [0], estimated memory required for this job [" + firstJobTotalMemory + "]")); } public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_maxCapacityMemoryLimiting() { @@ -203,9 +230,12 @@ public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_maxCapacityMemor int currentlyRunningJobsPerNode = randomIntBetween(1, 100); int maxRunningJobsPerNode = currentlyRunningJobsPerNode + 1; // Be careful if changing this - in order for the error message to be exactly as expected - // the value here must divide exactly into (JOB_MEMORY_REQUIREMENT.getBytes() * 100) - int maxMachineMemoryPercent = 40; - long machineMemory = currentlyRunningJobsPerNode * JOB_MEMORY_REQUIREMENT.getBytes() * 100 / maxMachineMemoryPercent; + // the value here must divide exactly into both (JOB_MEMORY_REQUIREMENT.getBytes() * 100) and + // MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + int maxMachineMemoryPercent = 20; + long currentlyRunningJobMemory = MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + + currentlyRunningJobsPerNode * JOB_MEMORY_REQUIREMENT.getBytes(); + long machineMemory = currentlyRunningJobMemory * 100 / maxMachineMemoryPercent; Map nodeAttr = new HashMap<>(); nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(maxRunningJobsPerNode)); @@ -222,9 +252,34 @@ public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_maxCapacityMemor jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), containsString("because this node has insufficient available memory. " - + "Available memory for ML [" + (machineMemory * maxMachineMemoryPercent / 100) + "], memory required by existing jobs [" - + (JOB_MEMORY_REQUIREMENT.getBytes() * currentlyRunningJobsPerNode) + "], estimated memory required for this job [" - + JOB_MEMORY_REQUIREMENT.getBytes() + "]")); + + "Available memory for ML [" + currentlyRunningJobMemory + "], memory required by existing jobs [" + + currentlyRunningJobMemory + "], estimated memory required for this job [" + JOB_MEMORY_REQUIREMENT.getBytes() + "]")); + } + + public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_firstJobTooBigMemoryLimiting() { + int numNodes = randomIntBetween(1, 10); + int maxRunningJobsPerNode = randomIntBetween(1, 100); + int maxMachineMemoryPercent = 20; + long firstJobTotalMemory = MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + JOB_MEMORY_REQUIREMENT.getBytes(); + long machineMemory = (firstJobTotalMemory - 1) * 100 / maxMachineMemoryPercent; + + Map nodeAttr = new HashMap<>(); + nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(maxRunningJobsPerNode)); + nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, Long.toString(machineMemory)); + + ClusterState.Builder cs = fillNodesWithRunningJobs(nodeAttr, numNodes, 0); + + String dataFrameAnalyticsId = "data_frame_analytics_id1000"; + + JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), dataFrameAnalyticsId, + MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, memoryTracker, 0, + node -> TransportStartDataFrameAnalyticsAction.TaskExecutor.nodeFilter(node, dataFrameAnalyticsId)); + PersistentTasksCustomMetaData.Assignment result = + jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed); + assertNull(result.getExecutorNode()); + assertThat(result.getExplanation(), containsString("because this node has insufficient available memory. " + + "Available memory for ML [" + (firstJobTotalMemory - 1) + + "], memory required by existing jobs [0], estimated memory required for this job [" + firstJobTotalMemory + "]")); } public void testSelectLeastLoadedMlNode_noMlNodes() {