From 37976435cf165b9d2ddc95b435afba9e8d470f84 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Tue, 11 Dec 2018 16:19:19 +0000 Subject: [PATCH 1/2] [ML] Adapt to periodic persistent task refresh If https://github.com/elastic/elasticsearch/pull/36069/files is merged then the approach for reallocating ML persistent tasks after refreshing job memory requirements can be simplified. This change begins the simplification process. --- .../xpack/core/ml/MlMetadata.java | 50 ++------------- .../xpack/ml/MlConfigMigrator.java | 3 +- .../ml/action/TransportOpenJobAction.java | 11 +--- .../xpack/ml/process/MlMemoryTracker.java | 63 +++---------------- .../xpack/ml/MlMetadataTests.java | 13 +--- .../integration/MlDistributedFailureIT.java | 1 + .../ml/process/MlMemoryTrackerTests.java | 45 +------------ 7 files changed, 19 insertions(+), 167 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java index f815efee1cf66..5ca281bd2e6a3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MlMetadata.java @@ -57,9 +57,8 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom { public static final String TYPE = "ml"; private static final ParseField JOBS_FIELD = new ParseField("jobs"); private static final ParseField DATAFEEDS_FIELD = new ParseField("datafeeds"); - private static final ParseField LAST_MEMORY_REFRESH_VERSION_FIELD = new ParseField("last_memory_refresh_version"); - public static final MlMetadata EMPTY_METADATA = new MlMetadata(Collections.emptySortedMap(), Collections.emptySortedMap(), null); + public static final MlMetadata EMPTY_METADATA = new MlMetadata(Collections.emptySortedMap(), Collections.emptySortedMap()); // This parser follows the pattern that metadata is parsed leniently (to allow for enhancements) public static final ObjectParser LENIENT_PARSER = new ObjectParser<>("ml_metadata", true, Builder::new); @@ -67,18 +66,15 @@ public class MlMetadata implements XPackPlugin.XPackMetaDataCustom { LENIENT_PARSER.declareObjectArray(Builder::putJobs, (p, c) -> Job.LENIENT_PARSER.apply(p, c).build(), JOBS_FIELD); LENIENT_PARSER.declareObjectArray(Builder::putDatafeeds, (p, c) -> DatafeedConfig.LENIENT_PARSER.apply(p, c).build(), DATAFEEDS_FIELD); - LENIENT_PARSER.declareLong(Builder::setLastMemoryRefreshVersion, LAST_MEMORY_REFRESH_VERSION_FIELD); } private final SortedMap jobs; private final SortedMap datafeeds; - private final Long lastMemoryRefreshVersion; private final GroupOrJobLookup groupOrJobLookup; - private MlMetadata(SortedMap jobs, SortedMap datafeeds, Long lastMemoryRefreshVersion) { + private MlMetadata(SortedMap jobs, SortedMap datafeeds) { this.jobs = Collections.unmodifiableSortedMap(jobs); this.datafeeds = Collections.unmodifiableSortedMap(datafeeds); - this.lastMemoryRefreshVersion = lastMemoryRefreshVersion; this.groupOrJobLookup = new GroupOrJobLookup(jobs.values()); } @@ -121,10 +117,6 @@ public Set expandDatafeedIds(String expression) { .expand(expression); } - public Long getLastMemoryRefreshVersion() { - return lastMemoryRefreshVersion; - } - @Override public Version getMinimalSupportedVersion() { return Version.V_5_4_0; @@ -158,11 +150,6 @@ public MlMetadata(StreamInput in) throws IOException { datafeeds.put(in.readString(), new DatafeedConfig(in)); } this.datafeeds = datafeeds; - if (in.getVersion().onOrAfter(Version.V_6_6_0)) { - lastMemoryRefreshVersion = in.readOptionalLong(); - } else { - lastMemoryRefreshVersion = null; - } this.groupOrJobLookup = new GroupOrJobLookup(jobs.values()); } @@ -170,9 +157,6 @@ public MlMetadata(StreamInput in) throws IOException { public void writeTo(StreamOutput out) throws IOException { writeMap(jobs, out); writeMap(datafeeds, out); - if (out.getVersion().onOrAfter(Version.V_6_6_0)) { - out.writeOptionalLong(lastMemoryRefreshVersion); - } } private static void writeMap(Map map, StreamOutput out) throws IOException { @@ -189,9 +173,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws new DelegatingMapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true"), params); mapValuesToXContent(JOBS_FIELD, jobs, builder, extendedParams); mapValuesToXContent(DATAFEEDS_FIELD, datafeeds, builder, extendedParams); - if (lastMemoryRefreshVersion != null) { - builder.field(LAST_MEMORY_REFRESH_VERSION_FIELD.getPreferredName(), lastMemoryRefreshVersion); - } return builder; } @@ -208,12 +189,10 @@ public static class MlMetadataDiff implements NamedDiff { final Diff> jobs; final Diff> datafeeds; - final Long lastMemoryRefreshVersion; MlMetadataDiff(MlMetadata before, MlMetadata after) { this.jobs = DiffableUtils.diff(before.jobs, after.jobs, DiffableUtils.getStringKeySerializer()); this.datafeeds = DiffableUtils.diff(before.datafeeds, after.datafeeds, DiffableUtils.getStringKeySerializer()); - this.lastMemoryRefreshVersion = after.lastMemoryRefreshVersion; } public MlMetadataDiff(StreamInput in) throws IOException { @@ -221,11 +200,6 @@ public MlMetadataDiff(StreamInput in) throws IOException { MlMetadataDiff::readJobDiffFrom); this.datafeeds = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), DatafeedConfig::new, MlMetadataDiff::readDatafeedDiffFrom); - if (in.getVersion().onOrAfter(Version.V_6_6_0)) { - lastMemoryRefreshVersion = in.readOptionalLong(); - } else { - lastMemoryRefreshVersion = null; - } } /** @@ -237,17 +211,13 @@ public MlMetadataDiff(StreamInput in) throws IOException { public MetaData.Custom apply(MetaData.Custom part) { TreeMap newJobs = new TreeMap<>(jobs.apply(((MlMetadata) part).jobs)); TreeMap newDatafeeds = new TreeMap<>(datafeeds.apply(((MlMetadata) part).datafeeds)); - // lastMemoryRefreshVersion always comes from the diff - no need to merge with the old value - return new MlMetadata(newJobs, newDatafeeds, lastMemoryRefreshVersion); + return new MlMetadata(newJobs, newDatafeeds); } @Override public void writeTo(StreamOutput out) throws IOException { jobs.writeTo(out); datafeeds.writeTo(out); - if (out.getVersion().onOrAfter(Version.V_6_6_0)) { - out.writeOptionalLong(lastMemoryRefreshVersion); - } } @Override @@ -272,8 +242,7 @@ public boolean equals(Object o) { return false; MlMetadata that = (MlMetadata) o; return Objects.equals(jobs, that.jobs) && - Objects.equals(datafeeds, that.datafeeds) && - Objects.equals(lastMemoryRefreshVersion, that.lastMemoryRefreshVersion); + Objects.equals(datafeeds, that.datafeeds); } @Override @@ -283,14 +252,13 @@ public final String toString() { @Override public int hashCode() { - return Objects.hash(jobs, datafeeds, lastMemoryRefreshVersion); + return Objects.hash(jobs, datafeeds); } public static class Builder { private TreeMap jobs; private TreeMap datafeeds; - private Long lastMemoryRefreshVersion; public Builder() { jobs = new TreeMap<>(); @@ -304,7 +272,6 @@ public Builder(@Nullable MlMetadata previous) { } else { jobs = new TreeMap<>(previous.jobs); datafeeds = new TreeMap<>(previous.datafeeds); - lastMemoryRefreshVersion = previous.lastMemoryRefreshVersion; } } @@ -424,13 +391,8 @@ public Builder putDatafeeds(Collection datafeeds) { return this; } - public Builder setLastMemoryRefreshVersion(Long lastMemoryRefreshVersion) { - this.lastMemoryRefreshVersion = lastMemoryRefreshVersion; - return this; - } - public MlMetadata build() { - return new MlMetadata(jobs, datafeeds, lastMemoryRefreshVersion); + return new MlMetadata(jobs, datafeeds); } public void markJobAsDeleting(String jobId, PersistentTasksCustomMetaData tasks, boolean allowDeleteOpenJob) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java index 22dc43b9326fb..4ca5f2b839c8b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java @@ -248,8 +248,7 @@ static RemovalResult removeJobsAndDatafeeds(List jobsToRemove, List { - if (acknowledged) { - logger.trace("Job memory requirement refresh request completed successfully"); - } else { - logger.warn("Job memory requirement refresh request completed but did not set time in cluster state"); - } - }, - e -> logger.error("Failed to refresh job memory requirements", e) - )); + boolean scheduledRefresh = memoryTracker.asyncRefresh(); if (scheduledRefresh) { String reason = "Not opening job [" + jobId + "] because job memory requirements are stale - refresh requested"; logger.debug(reason); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java index 2ea92d98391eb..d99a108c56fc0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java @@ -9,15 +9,9 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.master.AcknowledgedRequest; -import org.elasticsearch.cluster.AckedClusterStateUpdateTask; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.LocalNodeMasterListener; -import org.elasticsearch.cluster.ack.AckedRequest; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.threadpool.ThreadPool; @@ -44,22 +38,10 @@ * 1. For all open ML jobs (via {@link #asyncRefresh}) * 2. For all open ML jobs, plus one named ML job that is not open (via {@link #refreshJobMemoryAndAllOthers}) * 3. For one named ML job (via {@link #refreshJobMemory}) - * In all cases a listener informs the caller when the requested updates are complete. + * In cases 2 and 3 a listener informs the caller when the requested updates are complete. */ public class MlMemoryTracker implements LocalNodeMasterListener { - private static final AckedRequest ACKED_REQUEST = new AckedRequest() { - @Override - public TimeValue ackTimeout() { - return AcknowledgedRequest.DEFAULT_ACK_TIMEOUT; - } - - @Override - public TimeValue masterNodeTimeout() { - return AcknowledgedRequest.DEFAULT_ACK_TIMEOUT; - } - }; - private static final Duration RECENT_UPDATE_THRESHOLD = Duration.ofMinutes(1); private final Logger logger = LogManager.getLogger(MlMemoryTracker.class); @@ -107,6 +89,7 @@ public String executorName() { */ public boolean isRecentlyRefreshed() { Instant localLastUpdateTime = lastUpdateTime; + // TODO add on PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING once PR 36069 is merged return localLastUpdateTime != null && localLastUpdateTime.plus(RECENT_UPDATE_THRESHOLD).isAfter(Instant.now()); } @@ -149,24 +132,19 @@ public void removeJob(String jobId) { /** * Uses a separate thread to refresh the memory requirement for every ML job that has * a corresponding persistent task. This method only works on the master node. - * @param listener Will be called when the async refresh completes or fails. The - * boolean value indicates whether the cluster state was updated - * with the refresh completion time. (If it was then this will in - * cause the persistent tasks framework to check if any persistent - * tasks are awaiting allocation.) * @return true if the async refresh is scheduled, and false * if this is not possible for some reason. */ - public boolean asyncRefresh(ActionListener listener) { + public boolean asyncRefresh() { if (isMaster) { try { - ActionListener mlMetaUpdateListener = ActionListener.wrap( - aVoid -> recordUpdateTimeInClusterState(listener), - listener::onFailure + ActionListener listener = ActionListener.wrap( + aVoid -> logger.trace("Job memory requirement refresh request completed successfully"), + e -> logger.error("Failed to refresh job memory requirements", e) ); threadPool.executor(executorName()).execute( - () -> refresh(clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE), mlMetaUpdateListener)); + () -> refresh(clusterService.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE), listener)); return true; } catch (EsRejectedExecutionException e) { logger.debug("Couldn't schedule ML memory update - node might be shutting down", e); @@ -233,33 +211,6 @@ void refresh(PersistentTasksCustomMetaData persistentTasks, ActionListener } } - private void recordUpdateTimeInClusterState(ActionListener listener) { - - clusterService.submitStateUpdateTask("ml-memory-last-update-time", - new AckedClusterStateUpdateTask(ACKED_REQUEST, listener) { - @Override - protected Boolean newResponse(boolean acknowledged) { - return acknowledged; - } - - @Override - public ClusterState execute(ClusterState currentState) { - MlMetadata currentMlMetadata = MlMetadata.getMlMetadata(currentState); - MlMetadata.Builder builder = new MlMetadata.Builder(currentMlMetadata); - builder.setLastMemoryRefreshVersion(currentState.getVersion() + 1); - MlMetadata newMlMetadata = builder.build(); - if (newMlMetadata.equals(currentMlMetadata)) { - // Return same reference if nothing has changed - return currentState; - } else { - ClusterState.Builder newState = ClusterState.builder(currentState); - newState.metaData(MetaData.builder(currentState.getMetaData()).putCustom(MlMetadata.TYPE, newMlMetadata).build()); - return newState.build(); - } - } - }); - } - private void iterateMlJobTasks(Iterator> iterator, ActionListener refreshComplete) { if (iterator.hasNext()) { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java index 5fd50792ed787..cb43afed94280 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlMetadataTests.java @@ -69,9 +69,6 @@ protected MlMetadata createTestInstance() { builder.putJob(job, false); } } - if (randomBoolean()) { - builder.setLastMemoryRefreshVersion(randomNonNegativeLong()); - } return builder.build(); } @@ -441,9 +438,8 @@ protected MlMetadata mutateInstance(MlMetadata instance) { for (Map.Entry entry : datafeeds.entrySet()) { metadataBuilder.putDatafeed(entry.getValue(), Collections.emptyMap()); } - metadataBuilder.setLastMemoryRefreshVersion(instance.getLastMemoryRefreshVersion()); - switch (between(0, 2)) { + switch (between(0, 1)) { case 0: metadataBuilder.putJob(JobTests.createRandomizedJob(), true); break; @@ -463,13 +459,6 @@ protected MlMetadata mutateInstance(MlMetadata instance) { metadataBuilder.putJob(randomJob, false); metadataBuilder.putDatafeed(datafeedConfig, Collections.emptyMap()); break; - case 2: - if (instance.getLastMemoryRefreshVersion() == null) { - metadataBuilder.setLastMemoryRefreshVersion(randomNonNegativeLong()); - } else { - metadataBuilder.setLastMemoryRefreshVersion(null); - } - break; default: throw new AssertionError("Illegal randomisation branch"); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java index 4b1ada80f0ef4..f1dde0482bf04 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java @@ -183,6 +183,7 @@ public void testCloseUnassignedJobAndDatafeed() throws Exception { } @TestLogging("org.elasticsearch.xpack.ml.action:TRACE,org.elasticsearch.xpack.ml.process:TRACE") + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/36069") public void testJobRelocationIsMemoryAware() throws Exception { internalCluster().ensureAtLeastNumDataNodes(1); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java index cbba7ffa04972..71b26487e4b3b 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java @@ -6,14 +6,11 @@ package org.elasticsearch.xpack.ml.process; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.cluster.AckedClusterStateUpdateTask; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.MlTasks; import org.elasticsearch.xpack.core.ml.action.OpenJobAction; import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits; @@ -28,8 +25,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.not; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.anyString; @@ -42,8 +37,6 @@ public class MlMemoryTrackerTests extends ESTestCase { - private ClusterService clusterService; - private ThreadPool threadPool; private JobManager jobManager; private JobResultsProvider jobResultsProvider; private MlMemoryTracker memoryTracker; @@ -51,8 +44,8 @@ public class MlMemoryTrackerTests extends ESTestCase { @Before public void setup() { - clusterService = mock(ClusterService.class); - threadPool = mock(ThreadPool.class); + ClusterService clusterService = mock(ClusterService.class); + ThreadPool threadPool = mock(ThreadPool.class); ExecutorService executorService = mock(ExecutorService.class); doAnswer(invocation -> { @SuppressWarnings("unchecked") @@ -154,40 +147,6 @@ public void testRefreshOne() { assertNull(memoryTracker.getJobMemoryRequirement(jobId)); } - @SuppressWarnings("unchecked") - public void testRecordUpdateTimeInClusterState() { - - boolean isMaster = randomBoolean(); - if (isMaster) { - memoryTracker.onMaster(); - } else { - memoryTracker.offMaster(); - } - - when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); - - AtomicReference updateVersion = new AtomicReference<>(); - - doAnswer(invocation -> { - AckedClusterStateUpdateTask task = (AckedClusterStateUpdateTask) invocation.getArguments()[1]; - ClusterState currentClusterState = ClusterState.EMPTY_STATE; - ClusterState newClusterState = task.execute(currentClusterState); - assertThat(currentClusterState, not(equalTo(newClusterState))); - MlMetadata newMlMetadata = MlMetadata.getMlMetadata(newClusterState); - updateVersion.set(newMlMetadata.getLastMemoryRefreshVersion()); - task.onAllNodesAcked(null); - return null; - }).when(clusterService).submitStateUpdateTask(anyString(), any(AckedClusterStateUpdateTask.class)); - - memoryTracker.asyncRefresh(ActionListener.wrap(ESTestCase::assertTrue, ESTestCase::assertNull)); - - if (isMaster) { - assertNotNull(updateVersion.get()); - } else { - assertNull(updateVersion.get()); - } - } - private PersistentTasksCustomMetaData.PersistentTask makeTestTask(String jobId) { return new PersistentTasksCustomMetaData.PersistentTask<>("job-" + jobId, MlTasks.JOB_TASK_NAME, new OpenJobAction.JobParams(jobId), 0, PersistentTasksCustomMetaData.INITIAL_ASSIGNMENT); From 0273a042fca9b2ab76b9154be04b2bd5df64cb77 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Fri, 14 Dec 2018 09:24:03 +0000 Subject: [PATCH 2/2] Remove AwaitsFix and implement TODO --- .../xpack/ml/MachineLearning.java | 2 +- .../xpack/ml/process/MlMemoryTracker.java | 19 +++++++++++++++---- .../integration/MlDistributedFailureIT.java | 1 - .../ml/process/MlMemoryTrackerTests.java | 9 ++++++++- 4 files changed, 24 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 49990d4ea9918..a7e45887a6fc8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -424,7 +424,7 @@ public Collection createComponents(Client client, ClusterService cluster this.datafeedManager.set(datafeedManager); MlLifeCycleService mlLifeCycleService = new MlLifeCycleService(environment, clusterService, datafeedManager, autodetectProcessManager); - MlMemoryTracker memoryTracker = new MlMemoryTracker(clusterService, threadPool, jobManager, jobResultsProvider); + MlMemoryTracker memoryTracker = new MlMemoryTracker(settings, clusterService, threadPool, jobManager, jobResultsProvider); this.memoryTracker.set(memoryTracker); // This object's constructor attaches to the license state, so there's no need to retain another reference to it diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java index d99a108c56fc0..6ee645ed60ad8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java @@ -11,8 +11,11 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.LocalNodeMasterListener; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.persistent.PersistentTasksClusterService; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.MlMetadata; @@ -54,14 +57,22 @@ public class MlMemoryTracker implements LocalNodeMasterListener { private final JobResultsProvider jobResultsProvider; private volatile boolean isMaster; private volatile Instant lastUpdateTime; + private volatile Duration reassignmentRecheckInterval; - public MlMemoryTracker(ClusterService clusterService, ThreadPool threadPool, JobManager jobManager, + public MlMemoryTracker(Settings settings, ClusterService clusterService, ThreadPool threadPool, JobManager jobManager, JobResultsProvider jobResultsProvider) { this.threadPool = threadPool; this.clusterService = clusterService; this.jobManager = jobManager; this.jobResultsProvider = jobResultsProvider; + setReassignmentRecheckInterval(PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING.get(settings)); clusterService.addLocalNodeMasterListener(this); + clusterService.getClusterSettings().addSettingsUpdateConsumer( + PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING, this::setReassignmentRecheckInterval); + } + + private void setReassignmentRecheckInterval(TimeValue recheckInterval) { + reassignmentRecheckInterval = Duration.ofNanos(recheckInterval.getNanos()); } @Override @@ -85,12 +96,12 @@ public String executorName() { /** * Is the information in this object sufficiently up to date - * for valid allocation decisions to be made using it? + * for valid task assignment decisions to be made using it? */ public boolean isRecentlyRefreshed() { Instant localLastUpdateTime = lastUpdateTime; - // TODO add on PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING once PR 36069 is merged - return localLastUpdateTime != null && localLastUpdateTime.plus(RECENT_UPDATE_THRESHOLD).isAfter(Instant.now()); + return localLastUpdateTime != null && + localLastUpdateTime.plus(RECENT_UPDATE_THRESHOLD).plus(reassignmentRecheckInterval).isAfter(Instant.now()); } /** diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java index f1dde0482bf04..4b1ada80f0ef4 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java @@ -183,7 +183,6 @@ public void testCloseUnassignedJobAndDatafeed() throws Exception { } @TestLogging("org.elasticsearch.xpack.ml.action:TRACE,org.elasticsearch.xpack.ml.process:TRACE") - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/36069") public void testJobRelocationIsMemoryAware() throws Exception { internalCluster().ensureAtLeastNumDataNodes(1); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java index 71b26487e4b3b..197fa469bed7c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/MlMemoryTrackerTests.java @@ -7,7 +7,10 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.persistent.PersistentTasksClusterService; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -19,6 +22,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.junit.Before; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -44,7 +48,10 @@ public class MlMemoryTrackerTests extends ESTestCase { @Before public void setup() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, + Collections.singleton(PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING)); ClusterService clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); ThreadPool threadPool = mock(ThreadPool.class); ExecutorService executorService = mock(ExecutorService.class); doAnswer(invocation -> { @@ -56,7 +63,7 @@ public void setup() { when(threadPool.executor(anyString())).thenReturn(executorService); jobManager = mock(JobManager.class); jobResultsProvider = mock(JobResultsProvider.class); - memoryTracker = new MlMemoryTracker(clusterService, threadPool, jobManager, jobResultsProvider); + memoryTracker = new MlMemoryTracker(Settings.EMPTY, clusterService, threadPool, jobManager, jobResultsProvider); } public void testRefreshAll() {