diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java index 2d65013e31e1e..b327f4ec8882c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/dataframe/DataFrameAnalyticsConfig.java @@ -41,7 +41,11 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable { public static final ByteSizeValue DEFAULT_MODEL_MEMORY_LIMIT = new ByteSizeValue(1, ByteSizeUnit.GB); public static final ByteSizeValue MIN_MODEL_MEMORY_LIMIT = new ByteSizeValue(1, ByteSizeUnit.MB); - public static final ByteSizeValue PROCESS_MEMORY_OVERHEAD = new ByteSizeValue(20, ByteSizeUnit.MB); + /** + * This includes the overhead of thread stacks and data structures that the program might use that + * are not instrumented. But it does NOT include the memory used by loading the executable code. + */ + public static final ByteSizeValue PROCESS_MEMORY_OVERHEAD = new ByteSizeValue(5, ByteSizeUnit.MB); public static final ParseField ID = new ParseField("id"); public static final ParseField DESCRIPTION = new ParseField("description"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java index 5a2b2314e8d53..f96d498b7e8a6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/config/Job.java @@ -86,7 +86,14 @@ public class Job extends AbstractDiffable implements Writeable, ToXContentO public static final ObjectParser STRICT_PARSER = createParser(false); public static final TimeValue MIN_BACKGROUND_PERSIST_INTERVAL = TimeValue.timeValueHours(1); - public static final ByteSizeValue PROCESS_MEMORY_OVERHEAD = new ByteSizeValue(100, ByteSizeUnit.MB); + + /** + * This includes the overhead of thread stacks and data structures that the program might use that + * are not instrumented. (For the autodetect process categorization is not instrumented, + * and the normalize process is not instrumented at all.) But this overhead does NOT + * include the memory used by loading the executable code. + */ + public static final ByteSizeValue PROCESS_MEMORY_OVERHEAD = new ByteSizeValue(10, ByteSizeUnit.MB); public static final long DEFAULT_MODEL_SNAPSHOT_RETENTION_DAYS = 1; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 9d1fdc762ea80..277d7018e8d4e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -334,6 +334,12 @@ public Set getRoles() { public static final String MACHINE_MEMORY_NODE_ATTR = "ml.machine_memory"; public static final Setting CONCURRENT_JOB_ALLOCATIONS = Setting.intSetting("xpack.ml.node_concurrent_job_allocations", 2, 0, Property.Dynamic, Property.NodeScope); + /** + * The amount of memory needed to load the ML native code shared libraries. The assumption is that the first + * ML job to run on a given node will do this, and then subsequent ML jobs on the same node will reuse the + * same already-loaded code. + */ + public static final ByteSizeValue NATIVE_EXECUTABLE_CODE_OVERHEAD = new ByteSizeValue(30, ByteSizeUnit.MB); // Values higher than 100% are allowed to accommodate use cases where swapping has been determined to be acceptable. // Anomaly detector jobs only use their full model memory during background persistence, and this is deliberately // staggered, so with large numbers of jobs few will generally be persisting state at the same time. diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java index aa97c13b21d69..79ddb58c2945e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobNodeSelector.java @@ -110,7 +110,7 @@ public PersistentTasksCustomMetaData.Assignment selectNode(int dynamicMaxOpenJob continue; } - // Assuming the node is elligible at all, check loading + // Assuming the node is eligible at all, check loading CurrentLoad currentLoad = calculateCurrentLoadForNode(node, persistentTasks, allocateByMemory); allocateByMemory = currentLoad.allocateByMemory; @@ -170,6 +170,11 @@ public PersistentTasksCustomMetaData.Assignment selectNode(int dynamicMaxOpenJob long maxMlMemory = machineMemory * maxMachineMemoryPercent / 100; Long estimatedMemoryFootprint = memoryTracker.getJobMemoryRequirement(taskName, jobId); if (estimatedMemoryFootprint != null) { + // If this will be the first job assigned to the node then it will need to + // load the native code shared libraries, so add the overhead for this + if (currentLoad.numberOfAssignedJobs == 0) { + estimatedMemoryFootprint += MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes(); + } long availableMemory = maxMlMemory - currentLoad.assignedJobMemory; if (estimatedMemoryFootprint > availableMemory) { reason = "Not opening job [" + jobId + "] on node [" + nodeNameAndMlAttributes(node) @@ -283,6 +288,11 @@ private CurrentLoad calculateCurrentLoadForNode(DiscoveryNode node, PersistentTa } } } + // if any jobs are running then the native code will be loaded, but shared between all jobs, + // so increase the total memory usage of the assigned jobs to account for this + if (result.numberOfAssignedJobs > 0) { + result.assignedJobMemory += MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes(); + } } return result; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java index ae1a738a4b33a..ab65f8d8c7938 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/JobNodeSelectorTests.java @@ -175,9 +175,12 @@ public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_maxCapacityMemoryLi int currentlyRunningJobsPerNode = randomIntBetween(1, 100); int maxRunningJobsPerNode = currentlyRunningJobsPerNode + 1; // Be careful if changing this - in order for the error message to be exactly as expected - // the value here must divide exactly into (JOB_MEMORY_REQUIREMENT.getBytes() * 100) - int maxMachineMemoryPercent = 40; - long machineMemory = currentlyRunningJobsPerNode * JOB_MEMORY_REQUIREMENT.getBytes() * 100 / maxMachineMemoryPercent; + // the value here must divide exactly into both (JOB_MEMORY_REQUIREMENT.getBytes() * 100) and + // MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + int maxMachineMemoryPercent = 20; + long currentlyRunningJobMemory = MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + + currentlyRunningJobsPerNode * JOB_MEMORY_REQUIREMENT.getBytes(); + long machineMemory = currentlyRunningJobMemory * 100 / maxMachineMemoryPercent; Map nodeAttr = new HashMap<>(); nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(maxRunningJobsPerNode)); @@ -193,9 +196,33 @@ public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_maxCapacityMemoryLi jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), containsString("because this node has insufficient available memory. " - + "Available memory for ML [" + (machineMemory * maxMachineMemoryPercent / 100) + "], memory required by existing jobs [" - + (JOB_MEMORY_REQUIREMENT.getBytes() * currentlyRunningJobsPerNode) + "], estimated memory required for this job [" - + JOB_MEMORY_REQUIREMENT.getBytes() + "]")); + + "Available memory for ML [" + currentlyRunningJobMemory + "], memory required by existing jobs [" + + currentlyRunningJobMemory + "], estimated memory required for this job [" + JOB_MEMORY_REQUIREMENT.getBytes() + "]")); + } + + public void testSelectLeastLoadedMlNodeForAnomalyDetectorJob_firstJobTooBigMemoryLimiting() { + int numNodes = randomIntBetween(1, 10); + int maxRunningJobsPerNode = randomIntBetween(1, 100); + int maxMachineMemoryPercent = 20; + long firstJobTotalMemory = MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + JOB_MEMORY_REQUIREMENT.getBytes(); + long machineMemory = (firstJobTotalMemory - 1) * 100 / maxMachineMemoryPercent; + + Map nodeAttr = new HashMap<>(); + nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(maxRunningJobsPerNode)); + nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, Long.toString(machineMemory)); + + ClusterState.Builder cs = fillNodesWithRunningJobs(nodeAttr, numNodes, 0); + + Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id1000", JOB_MEMORY_REQUIREMENT).build(new Date()); + + JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), job.getId(), MlTasks.JOB_TASK_NAME, memoryTracker, + 0, node -> TransportOpenJobAction.nodeFilter(node, job)); + PersistentTasksCustomMetaData.Assignment result = + jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed); + assertNull(result.getExecutorNode()); + assertThat(result.getExplanation(), containsString("because this node has insufficient available memory. " + + "Available memory for ML [" + (firstJobTotalMemory - 1) + + "], memory required by existing jobs [0], estimated memory required for this job [" + firstJobTotalMemory + "]")); } public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_maxCapacityMemoryLimiting() { @@ -203,9 +230,12 @@ public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_maxCapacityMemor int currentlyRunningJobsPerNode = randomIntBetween(1, 100); int maxRunningJobsPerNode = currentlyRunningJobsPerNode + 1; // Be careful if changing this - in order for the error message to be exactly as expected - // the value here must divide exactly into (JOB_MEMORY_REQUIREMENT.getBytes() * 100) - int maxMachineMemoryPercent = 40; - long machineMemory = currentlyRunningJobsPerNode * JOB_MEMORY_REQUIREMENT.getBytes() * 100 / maxMachineMemoryPercent; + // the value here must divide exactly into both (JOB_MEMORY_REQUIREMENT.getBytes() * 100) and + // MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + int maxMachineMemoryPercent = 20; + long currentlyRunningJobMemory = MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + + currentlyRunningJobsPerNode * JOB_MEMORY_REQUIREMENT.getBytes(); + long machineMemory = currentlyRunningJobMemory * 100 / maxMachineMemoryPercent; Map nodeAttr = new HashMap<>(); nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(maxRunningJobsPerNode)); @@ -222,9 +252,34 @@ public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_maxCapacityMemor jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed); assertNull(result.getExecutorNode()); assertThat(result.getExplanation(), containsString("because this node has insufficient available memory. " - + "Available memory for ML [" + (machineMemory * maxMachineMemoryPercent / 100) + "], memory required by existing jobs [" - + (JOB_MEMORY_REQUIREMENT.getBytes() * currentlyRunningJobsPerNode) + "], estimated memory required for this job [" - + JOB_MEMORY_REQUIREMENT.getBytes() + "]")); + + "Available memory for ML [" + currentlyRunningJobMemory + "], memory required by existing jobs [" + + currentlyRunningJobMemory + "], estimated memory required for this job [" + JOB_MEMORY_REQUIREMENT.getBytes() + "]")); + } + + public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_firstJobTooBigMemoryLimiting() { + int numNodes = randomIntBetween(1, 10); + int maxRunningJobsPerNode = randomIntBetween(1, 100); + int maxMachineMemoryPercent = 20; + long firstJobTotalMemory = MachineLearning.NATIVE_EXECUTABLE_CODE_OVERHEAD.getBytes() + JOB_MEMORY_REQUIREMENT.getBytes(); + long machineMemory = (firstJobTotalMemory - 1) * 100 / maxMachineMemoryPercent; + + Map nodeAttr = new HashMap<>(); + nodeAttr.put(MachineLearning.MAX_OPEN_JOBS_NODE_ATTR, Integer.toString(maxRunningJobsPerNode)); + nodeAttr.put(MachineLearning.MACHINE_MEMORY_NODE_ATTR, Long.toString(machineMemory)); + + ClusterState.Builder cs = fillNodesWithRunningJobs(nodeAttr, numNodes, 0); + + String dataFrameAnalyticsId = "data_frame_analytics_id1000"; + + JobNodeSelector jobNodeSelector = new JobNodeSelector(cs.build(), dataFrameAnalyticsId, + MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME, memoryTracker, 0, + node -> TransportStartDataFrameAnalyticsAction.TaskExecutor.nodeFilter(node, dataFrameAnalyticsId)); + PersistentTasksCustomMetaData.Assignment result = + jobNodeSelector.selectNode(maxRunningJobsPerNode, 2, maxMachineMemoryPercent, isMemoryTrackerRecentlyRefreshed); + assertNull(result.getExecutorNode()); + assertThat(result.getExplanation(), containsString("because this node has insufficient available memory. " + + "Available memory for ML [" + (firstJobTotalMemory - 1) + + "], memory required by existing jobs [0], estimated memory required for this job [" + firstJobTotalMemory + "]")); } public void testSelectLeastLoadedMlNode_noMlNodes() {