From 97bfae96b5dad30a053f7afeec2209956a19d779 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Wed, 23 Jun 2021 10:48:56 -0400 Subject: [PATCH 1/6] [ML] clear job size estimate cache when feature is reset --- .../xpack/ml/MachineLearning.java | 33 +++++++++++++--- .../xpack/ml/process/MlMemoryTracker.java | 38 ++++++++++++++++++- 2 files changed, 63 insertions(+), 8 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 28bf1afc53c5a..04104d49d3f10 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -1312,9 +1312,12 @@ public String getFeatureDescription() { public void cleanUpFeature( ClusterService clusterService, Client client, - ActionListener finalListener) { + ActionListener finalListener + ) { logger.info("Starting machine learning feature reset"); + final Map results = new ConcurrentHashMap<>(); + ActionListener unsetResetModeListener = ActionListener.wrap( success -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(true), ActionListener.wrap( resetSuccess -> finalListener.onResponse(success), @@ -1337,25 +1340,43 @@ public void cleanUpFeature( ) ); - Map results = new ConcurrentHashMap<>(); + ActionListener cleanedUpIndicesListener = ActionListener.wrap( + success -> { + if (memoryTracker.get() != null) { + memoryTracker.get().awaitAndClear(ActionListener.wrap( + cacheCleared -> unsetResetModeListener.onResponse(success), + clearFailed -> { + logger.error("failed to clear memory tracker cache via machine learning reset feature API", clearFailed); + unsetResetModeListener.onResponse(success); + } + )); + return; + } + unsetResetModeListener.onResponse(success); + }, + failure -> { + logger.error("failed to clear .ml-* indices via reset feature API", failure); + unsetResetModeListener.onFailure(failure); + } + ); ActionListener afterWaitingForTasks = ActionListener.wrap( listTasksResponse -> { listTasksResponse.rethrowFailures("Waiting for indexing requests for .ml-* indices"); if (results.values().stream().allMatch(b -> b)) { - // Call into the original listener to clean up the indices - SystemIndexPlugin.super.cleanUpFeature(clusterService, client, unsetResetModeListener); + // Call into the original listener to clean up the indices and then clear ml memory cache + SystemIndexPlugin.super.cleanUpFeature(clusterService, client, cleanedUpIndicesListener); } else { final List failedComponents = results.entrySet().stream() .filter(result -> result.getValue() == false) .map(Map.Entry::getKey) .collect(Collectors.toList()); - unsetResetModeListener.onFailure( + cleanedUpIndicesListener.onFailure( new RuntimeException("Some machine learning components failed to reset: " + failedComponents) ); } }, - unsetResetModeListener::onFailure + cleanedUpIndicesListener::onFailure ); ActionListener afterDataframesStopped = ActionListener.wrap(dataFrameStopResponse -> { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java index 195959ae7b2af..e30858b568233 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java @@ -42,6 +42,7 @@ import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; /** @@ -72,6 +73,7 @@ public class MlMemoryTracker implements LocalNodeMasterListener { private final JobResultsProvider jobResultsProvider; private final DataFrameAnalyticsConfigProvider configProvider; private final Phaser stopPhaser; + private volatile AtomicInteger phase = new AtomicInteger(0); private volatile boolean isMaster; private volatile Instant lastUpdateTime; private volatile Duration reassignmentRecheckInterval; @@ -115,6 +117,37 @@ public void onMaster() { public void offMaster() { isMaster = false; logger.trace("ML memory tracker off master"); + clear(); + } + + public void awaitAndClear(ActionListener listener) { + // We never terminate the phaser + assert stopPhaser.isTerminated() == false; + // If there are no registered parties or no unarrived parties then there is a flaw + // in the register/arrive/unregister logic in another method that uses the phaser + assert stopPhaser.getRegisteredParties() > 0; + assert stopPhaser.getUnarrivedParties() > 0; + threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute( + () -> { + try { + // We await all current refreshes to complete, this increments the "current phase" and prevents + // further interaction while we clear contents + int newPhase = stopPhaser.arriveAndAwaitAdvance(); + assert newPhase > 0; + clear(); + phase.incrementAndGet(); + listener.onResponse(null); + } catch (Exception e) { + logger.warn("failed to wait for all refresh requests to complete", e); + listener.onFailure(e); + } + } + ); + + } + + private void clear() { + logger.trace("clearing ML Memory tracker contents"); for (Map memoryRequirementByJob : memoryRequirementByTaskName.values()) { memoryRequirementByJob.clear(); } @@ -401,8 +434,9 @@ public void refreshAnomalyDetectorJobMemory(String jobId, ActionListener l } // The phaser prevents searches being started after the memory tracker's stop() method has returned - if (stopPhaser.register() != 0) { - // Phases above 0 mean we've been stopped, so don't do any operations that involve external interaction + // Note: `phase` is incremented if cache is reset via the feature reset API + if (stopPhaser.register() != phase.get()) { + // Phases above not equal to `phase` mean we've been stopped, so don't do any operations that involve external interaction stopPhaser.arriveAndDeregister(); listener.onFailure(new EsRejectedExecutionException("Couldn't run ML memory update - node is shutting down")); return; From 3061baac98d925f87695512a3189091b17d2fb92 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Wed, 23 Jun 2021 13:38:25 -0400 Subject: [PATCH 2/6] addressing PR comments and upping logging on failed test --- .../xpack/ml/integration/AutoscalingIT.java | 24 +++++++++ .../xpack/ml/MachineLearning.java | 49 ++++++++----------- .../xpack/ml/process/MlMemoryTracker.java | 9 +++- 3 files changed, 53 insertions(+), 29 deletions(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/AutoscalingIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/AutoscalingIT.java index 4a5e853e1ab14..37126b78587aa 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/AutoscalingIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/AutoscalingIT.java @@ -24,6 +24,8 @@ import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.autoscaling.MlAutoscalingDeciderService; import org.elasticsearch.xpack.ml.autoscaling.NativeMemoryCapacity; +import org.junit.After; +import org.junit.Before; import java.util.Arrays; import java.util.Collections; @@ -44,6 +46,28 @@ public class AutoscalingIT extends MlNativeAutodetectIntegTestCase { private static final long NATIVE_PROCESS_OVERHEAD_MB = 30; private static final long BASELINE_OVERHEAD_MB = BASIC_REQUIREMENT_MB + NATIVE_PROCESS_OVERHEAD_MB; + @Before + public void setLoggingLevel() { + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder() + .put("logger.org.elasticsearch.xpack.ml.process.MlMemoryTracker", "TRACE") + .build() + ).get(); + } + + @After + public void unsetLoggingLevel() { + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder() + .putNull("logger.org.elasticsearch.xpack.ml.process.MlMemoryTracker") + .build() + ).get(); + } + // This test assumes that xpack.ml.max_machine_memory_percent is 30 // and that xpack.ml.use_auto_machine_memory_percent is false public void testMLAutoscalingCapacity() { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 04104d49d3f10..a7e8dc9a07ec3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -1331,32 +1331,15 @@ public void cleanUpFeature( ); }) ), - failure -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(false), ActionListener.wrap( - resetSuccess -> finalListener.onFailure(failure), - resetFailure -> { - logger.error("failed to disable reset mode after state clean up failure", resetFailure); - finalListener.onFailure(failure); - }) - ) - ); - - ActionListener cleanedUpIndicesListener = ActionListener.wrap( - success -> { - if (memoryTracker.get() != null) { - memoryTracker.get().awaitAndClear(ActionListener.wrap( - cacheCleared -> unsetResetModeListener.onResponse(success), - clearFailed -> { - logger.error("failed to clear memory tracker cache via machine learning reset feature API", clearFailed); - unsetResetModeListener.onResponse(success); - } - )); - return; - } - unsetResetModeListener.onResponse(success); - }, failure -> { - logger.error("failed to clear .ml-* indices via reset feature API", failure); - unsetResetModeListener.onFailure(failure); + logger.error("failed to reset machine learning", failure); + client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(false), ActionListener.wrap( + resetSuccess -> finalListener.onFailure(failure), + resetFailure -> { + logger.error("failed to disable reset mode after state clean up failure", resetFailure); + finalListener.onFailure(failure); + }) + ); } ); @@ -1364,19 +1347,29 @@ public void cleanUpFeature( listTasksResponse -> { listTasksResponse.rethrowFailures("Waiting for indexing requests for .ml-* indices"); if (results.values().stream().allMatch(b -> b)) { + if (memoryTracker.get() != null) { + memoryTracker.get().awaitAndClear(ActionListener.wrap( + cacheCleared -> SystemIndexPlugin.super.cleanUpFeature(clusterService, client, unsetResetModeListener), + clearFailed -> { + logger.error("failed to clear memory tracker cache via machine learning reset feature API", clearFailed); + SystemIndexPlugin.super.cleanUpFeature(clusterService, client, unsetResetModeListener); + } + )); + return; + } // Call into the original listener to clean up the indices and then clear ml memory cache - SystemIndexPlugin.super.cleanUpFeature(clusterService, client, cleanedUpIndicesListener); + SystemIndexPlugin.super.cleanUpFeature(clusterService, client, unsetResetModeListener); } else { final List failedComponents = results.entrySet().stream() .filter(result -> result.getValue() == false) .map(Map.Entry::getKey) .collect(Collectors.toList()); - cleanedUpIndicesListener.onFailure( + unsetResetModeListener.onFailure( new RuntimeException("Some machine learning components failed to reset: " + failedComponents) ); } }, - cleanedUpIndicesListener::onFailure + unsetResetModeListener::onFailure ); ActionListener afterDataframesStopped = ActionListener.wrap(dataFrameStopResponse -> { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java index e30858b568233..5273c6aebbb92 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java @@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.LocalNodeMasterListener; @@ -122,6 +123,7 @@ public void offMaster() { public void awaitAndClear(ActionListener listener) { // We never terminate the phaser + logger.trace("awaiting and clearing memory tracker"); assert stopPhaser.isTerminated() == false; // If there are no registered parties or no unarrived parties then there is a flaw // in the register/arrive/unregister logic in another method that uses the phaser @@ -135,6 +137,7 @@ public void awaitAndClear(ActionListener listener) { int newPhase = stopPhaser.arriveAndAwaitAdvance(); assert newPhase > 0; clear(); + logger.trace("completed awaiting and clearing memory tracker"); phase.incrementAndGet(); listener.onResponse(null); } catch (Exception e) { @@ -435,9 +438,13 @@ public void refreshAnomalyDetectorJobMemory(String jobId, ActionListener l // The phaser prevents searches being started after the memory tracker's stop() method has returned // Note: `phase` is incremented if cache is reset via the feature reset API - if (stopPhaser.register() != phase.get()) { + int localPhase = phase.get(); + if (stopPhaser.register() != localPhase) { // Phases above not equal to `phase` mean we've been stopped, so don't do any operations that involve external interaction stopPhaser.arriveAndDeregister(); + logger.trace( + () -> new ParameterizedMessage("[{}] not refreshing anomaly detector memory as node is shutting down", jobId) + ); listener.onFailure(new EsRejectedExecutionException("Couldn't run ML memory update - node is shutting down")); return; } From 0cfe27e2f108efaf3e604722ab45d102f66b92b7 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Wed, 23 Jun 2021 14:45:03 -0400 Subject: [PATCH 3/6] ---remove this commit--- --- .../elasticsearch/xpack/ml/integration/AutoscalingIT.java | 2 ++ .../elasticsearch/xpack/ml/process/MlMemoryTracker.java | 8 ++++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/AutoscalingIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/AutoscalingIT.java index 37126b78587aa..5478d41e639bc 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/AutoscalingIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/AutoscalingIT.java @@ -52,6 +52,7 @@ public void setLoggingLevel() { .cluster() .prepareUpdateSettings() .setTransientSettings(Settings.builder() + .put("logger.org.elasticsearch.xpack.ml", "DEBUG") .put("logger.org.elasticsearch.xpack.ml.process.MlMemoryTracker", "TRACE") .build() ).get(); @@ -63,6 +64,7 @@ public void unsetLoggingLevel() { .cluster() .prepareUpdateSettings() .setTransientSettings(Settings.builder() + .putNull("logger.org.elasticsearch.xpack.ml") .putNull("logger.org.elasticsearch.xpack.ml.process.MlMemoryTracker") .build() ).get(); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java index 5273c6aebbb92..0e650c4d90a31 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java @@ -123,7 +123,7 @@ public void offMaster() { public void awaitAndClear(ActionListener listener) { // We never terminate the phaser - logger.trace("awaiting and clearing memory tracker"); + logger.info("awaiting and clearing memory tracker"); assert stopPhaser.isTerminated() == false; // If there are no registered parties or no unarrived parties then there is a flaw // in the register/arrive/unregister logic in another method that uses the phaser @@ -137,8 +137,8 @@ public void awaitAndClear(ActionListener listener) { int newPhase = stopPhaser.arriveAndAwaitAdvance(); assert newPhase > 0; clear(); - logger.trace("completed awaiting and clearing memory tracker"); - phase.incrementAndGet(); + int anotherNewPhase = phase.incrementAndGet(); + logger.info("completed awaiting and clearing memory tracker new phase [{}] and [{}]", newPhase, anotherNewPhase); listener.onResponse(null); } catch (Exception e) { logger.warn("failed to wait for all refresh requests to complete", e); @@ -442,7 +442,7 @@ public void refreshAnomalyDetectorJobMemory(String jobId, ActionListener l if (stopPhaser.register() != localPhase) { // Phases above not equal to `phase` mean we've been stopped, so don't do any operations that involve external interaction stopPhaser.arriveAndDeregister(); - logger.trace( + logger.info( () -> new ParameterizedMessage("[{}] not refreshing anomaly detector memory as node is shutting down", jobId) ); listener.onFailure(new EsRejectedExecutionException("Couldn't run ML memory update - node is shutting down")); From b9f262f8c6fce6900867a411c9c4ded2613b4c74 Mon Sep 17 00:00:00 2001 From: Benjamin Trent <4357155+benwtrent@users.noreply.github.com> Date: Wed, 23 Jun 2021 16:45:28 -0400 Subject: [PATCH 4/6] fixing test and autoscaling logic --- .../xpack/ml/integration/AutoscalingIT.java | 26 ---------- .../MlAutoscalingDeciderService.java | 47 ++++++++++++++++--- .../xpack/ml/process/MlMemoryTracker.java | 10 ++-- 3 files changed, 46 insertions(+), 37 deletions(-) diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/AutoscalingIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/AutoscalingIT.java index 5478d41e639bc..4a5e853e1ab14 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/AutoscalingIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/AutoscalingIT.java @@ -24,8 +24,6 @@ import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.autoscaling.MlAutoscalingDeciderService; import org.elasticsearch.xpack.ml.autoscaling.NativeMemoryCapacity; -import org.junit.After; -import org.junit.Before; import java.util.Arrays; import java.util.Collections; @@ -46,30 +44,6 @@ public class AutoscalingIT extends MlNativeAutodetectIntegTestCase { private static final long NATIVE_PROCESS_OVERHEAD_MB = 30; private static final long BASELINE_OVERHEAD_MB = BASIC_REQUIREMENT_MB + NATIVE_PROCESS_OVERHEAD_MB; - @Before - public void setLoggingLevel() { - client().admin() - .cluster() - .prepareUpdateSettings() - .setTransientSettings(Settings.builder() - .put("logger.org.elasticsearch.xpack.ml", "DEBUG") - .put("logger.org.elasticsearch.xpack.ml.process.MlMemoryTracker", "TRACE") - .build() - ).get(); - } - - @After - public void unsetLoggingLevel() { - client().admin() - .cluster() - .prepareUpdateSettings() - .setTransientSettings(Settings.builder() - .putNull("logger.org.elasticsearch.xpack.ml") - .putNull("logger.org.elasticsearch.xpack.ml.process.MlMemoryTracker") - .build() - ).get(); - } - // This test assumes that xpack.ml.max_machine_memory_percent is 30 // and that xpack.ml.use_auto_machine_memory_percent is false public void testMLAutoscalingCapacity() { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java index 6cbf29e5b542e..991d8851f4009 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java @@ -372,6 +372,36 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider return scaleUpFromZero(waitingAnomalyJobs, waitingAnalyticsJobs, reasonBuilder); } + // We don't need to check anything as there are no tasks + // This is a quick path to downscale. + // simply return `0` for scale down if delay is satisfied + if (anomalyDetectionTasks.isEmpty() && dataframeAnalyticsTasks.isEmpty()) { + long msLeftToScale = msLeftToDownScale(configuration); + if (msLeftToScale > 0) { + return new AutoscalingDeciderResult( + context.currentCapacity(), + reasonBuilder + .setSimpleReason( + String.format( + Locale.ROOT, + "Passing currently perceived capacity as down scale delay has not be satisfied; configured delay [%s]" + + "last detected scale down event [%s]. Will request scale down in approximately [%s]", + DOWN_SCALE_DELAY.get(configuration).getStringRep(), + XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(scaleDownDetected), + TimeValue.timeValueMillis(msLeftToScale).getStringRep() + ) + ) + .build()); + } + return new AutoscalingDeciderResult( + AutoscalingCapacity.ZERO, + reasonBuilder + .setRequiredCapacity(AutoscalingCapacity.ZERO) + .setSimpleReason("Requesting scale down as tier and/or node size could be smaller") + .build() + ); + } + if (mlMemoryTracker.isRecentlyRefreshed(memoryTrackingStale) == false) { logger.debug(() -> new ParameterizedMessage( "view of job memory is stale given duration [{}]. Not attempting to make scaling decision", @@ -521,15 +551,11 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider } } - final long now = timeSupplier.get(); - if (newScaleDownCheck()) { - scaleDownDetected = now; - } - TimeValue downScaleDelay = DOWN_SCALE_DELAY.get(configuration); - long msLeftToScale = downScaleDelay.millis() - (now - scaleDownDetected); + long msLeftToScale = msLeftToDownScale(configuration); if (msLeftToScale <= 0) { return scaleDownDecision.get(); } + TimeValue downScaleDelay = DOWN_SCALE_DELAY.get(configuration); logger.debug(() -> new ParameterizedMessage( "not scaling down as the current scale down delay [{}] is not satisfied." + " The last time scale down was detected [{}]. Calculated scaled down capacity [{}] ", @@ -835,6 +861,15 @@ Optional checkForScaleDown(List nodeLoads, return Optional.empty(); } + private long msLeftToDownScale(Settings configuration) { + final long now = timeSupplier.get(); + if (newScaleDownCheck()) { + scaleDownDetected = now; + } + TimeValue downScaleDelay = DOWN_SCALE_DELAY.get(configuration); + return downScaleDelay.millis() - (now - scaleDownDetected); + } + @Override public String name() { return NAME; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java index 0e650c4d90a31..bca789bc3d72b 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java @@ -123,7 +123,7 @@ public void offMaster() { public void awaitAndClear(ActionListener listener) { // We never terminate the phaser - logger.info("awaiting and clearing memory tracker"); + logger.trace("awaiting and clearing memory tracker"); assert stopPhaser.isTerminated() == false; // If there are no registered parties or no unarrived parties then there is a flaw // in the register/arrive/unregister logic in another method that uses the phaser @@ -137,8 +137,8 @@ public void awaitAndClear(ActionListener listener) { int newPhase = stopPhaser.arriveAndAwaitAdvance(); assert newPhase > 0; clear(); - int anotherNewPhase = phase.incrementAndGet(); - logger.info("completed awaiting and clearing memory tracker new phase [{}] and [{}]", newPhase, anotherNewPhase); + phase.incrementAndGet(); + logger.trace("completed awaiting and clearing memory tracker"); listener.onResponse(null); } catch (Exception e) { logger.warn("failed to wait for all refresh requests to complete", e); @@ -361,6 +361,7 @@ void refresh(PersistentTasksCustomMetadata persistentTasks, ActionListener for (ActionListener listener : fullRefreshCompletionListeners) { listener.onFailure(e); } + logger.warn("ML memory tracker last update failed and listeners called", e); // It's critical that we empty out the current listener list on // error otherwise subsequent retries to refresh will be ignored fullRefreshCompletionListeners.clear(); @@ -438,8 +439,7 @@ public void refreshAnomalyDetectorJobMemory(String jobId, ActionListener l // The phaser prevents searches being started after the memory tracker's stop() method has returned // Note: `phase` is incremented if cache is reset via the feature reset API - int localPhase = phase.get(); - if (stopPhaser.register() != localPhase) { + if (stopPhaser.register() != phase.get()) { // Phases above not equal to `phase` mean we've been stopped, so don't do any operations that involve external interaction stopPhaser.arriveAndDeregister(); logger.info( From 277e404a0f4a3ba41cdb3cd2cc2e23117f4ae15a Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Thu, 24 Jun 2021 07:19:38 -0400 Subject: [PATCH 5/6] Update x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java Co-authored-by: David Kyle --- .../xpack/ml/autoscaling/MlAutoscalingDeciderService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java index 991d8851f4009..9713837d1f103 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java @@ -384,7 +384,7 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider .setSimpleReason( String.format( Locale.ROOT, - "Passing currently perceived capacity as down scale delay has not be satisfied; configured delay [%s]" + "Passing currently perceived capacity as down scale delay has not been satisfied; configured delay [%s]" + "last detected scale down event [%s]. Will request scale down in approximately [%s]", DOWN_SCALE_DELAY.get(configuration).getStringRep(), XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(scaleDownDetected), From ea208e63b4d7a41b5d3a051b87d00a8ac3c8c6f1 Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Thu, 24 Jun 2021 07:20:38 -0400 Subject: [PATCH 6/6] Update MlAutoscalingDeciderService.java --- .../xpack/ml/autoscaling/MlAutoscalingDeciderService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java index 9713837d1f103..96fa7935b24d6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/autoscaling/MlAutoscalingDeciderService.java @@ -568,7 +568,7 @@ public AutoscalingDeciderResult scale(Settings configuration, AutoscalingDecider .setSimpleReason( String.format( Locale.ROOT, - "Passing currently perceived capacity as down scale delay has not be satisfied; configured delay [%s]" + "Passing currently perceived capacity as down scale delay has not been satisfied; configured delay [%s]" + "last detected scale down event [%s]. Will request scale down in approximately [%s]", downScaleDelay.getStringRep(), XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(scaleDownDetected),