From ae9cb9eb67adce46dffb60d8a96463603bba50af Mon Sep 17 00:00:00 2001 From: David Roberts Date: Wed, 11 Aug 2021 13:43:37 +0100 Subject: [PATCH] [ML] Cap graceful shutdown time (#76349) The node shutdown work done in #75188 did not impose any upper bound on the time allowed for ML jobs to shut down gracefully. This change imposes a cap of 10 minutes on shutdown time. In reality closing ML jobs shouldn't take this long, but we don't want ML to stall the shutdown process forever due to a bug, and require user intervention to recover. Backport of #76342 --- .../xpack/ml/MlLifeCycleService.java | 40 ++++++++++-- .../xpack/ml/MlLifeCycleServiceTests.java | 62 ++++++++++++++++--- 2 files changed, 87 insertions(+), 15 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java index 1e62f09cb44b4..18bcc78651e0e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java @@ -6,6 +6,8 @@ */ package org.elasticsearch.xpack.ml; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.LifecycleListener; @@ -21,16 +23,31 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import java.io.IOException; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.Collection; public class MlLifeCycleService { + /** + * Maximum time we'll wait for jobs to gracefully persist their state and stop their associated + * processes. We expect this to take a minute or two at most if all goes to plan. The longer + * timeout here is to avoid the need for user intervention if something doesn't work and the + * graceful shutdown gets stuck. + */ + public static final Duration MAX_GRACEFUL_SHUTDOWN_TIME = Duration.of(10, ChronoUnit.MINUTES); + + private static final Logger logger = LogManager.getLogger(MlLifeCycleService.class); + private final Environment environment; private final ClusterService clusterService; private final DatafeedRunner datafeedRunner; private final AutodetectProcessManager autodetectProcessManager; private final DataFrameAnalyticsManager analyticsManager; private final MlMemoryTracker memoryTracker; + private volatile Instant shutdownStartTime; public MlLifeCycleService(Environment environment, ClusterService clusterService, DatafeedRunner datafeedRunner, AutodetectProcessManager autodetectProcessManager, DataFrameAnalyticsManager analyticsManager, @@ -84,16 +101,22 @@ public synchronized void stop() { * @return Has all active ML work vacated the specified node? */ public boolean isNodeSafeToShutdown(String nodeId) { - return isNodeSafeToShutdown(nodeId, clusterService.state()); + return isNodeSafeToShutdown(nodeId, clusterService.state(), shutdownStartTime, Clock.systemUTC()); } - static boolean isNodeSafeToShutdown(String nodeId, ClusterState state) { + static boolean isNodeSafeToShutdown(String nodeId, ClusterState state, Instant shutdownStartTime, Clock clock) { // If we are in a mixed version cluster that doesn't support locally aborting persistent tasks then // we cannot perform graceful shutdown, so just revert to the behaviour of previous versions where // the node shutdown API didn't exist if (PersistentTasksService.isLocalAbortSupported(state) == false) { return true; } + + // If the shutdown has taken too long then any remaining tasks will just be cut off when the node dies + if (shutdownStartTime != null && shutdownStartTime.isBefore(clock.instant().minus(MAX_GRACEFUL_SHUTDOWN_TIME))) { + return true; + } + PersistentTasksCustomMetadata tasks = state.metadata().custom(PersistentTasksCustomMetadata.TYPE); // TODO: currently only considering anomaly detection jobs - could extend in the future // Ignore failed jobs - the persistent task still exists to remember the failure (because no @@ -111,10 +134,10 @@ static boolean isNodeSafeToShutdown(String nodeId, ClusterState state) { * @param shutdownNodeIds IDs of all nodes being shut down. */ public void signalGracefulShutdown(Collection shutdownNodeIds) { - signalGracefulShutdown(clusterService.state(), shutdownNodeIds); + signalGracefulShutdown(clusterService.state(), shutdownNodeIds, Clock.systemUTC()); } - void signalGracefulShutdown(ClusterState state, Collection shutdownNodeIds) { + void signalGracefulShutdown(ClusterState state, Collection shutdownNodeIds, Clock clock) { // If we are in a mixed version cluster that doesn't support locally aborting persistent tasks then // we cannot perform graceful shutdown, so just revert to the behaviour of previous versions where @@ -124,10 +147,17 @@ void signalGracefulShutdown(ClusterState state, Collection shutdownNodeI } if (shutdownNodeIds.contains(state.nodes().getLocalNodeId())) { - + if (shutdownStartTime == null) { + shutdownStartTime = Instant.now(clock); + logger.info("Starting node shutdown sequence for ML"); + } datafeedRunner.vacateAllDatafeedsOnThisNode( "previously assigned node [" + state.nodes().getLocalNode().getName() + "] is shutting down"); autodetectProcessManager.vacateOpenJobsOnThisNode(); } } + + Instant getShutdownStartTime() { + return shutdownStartTime; + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlLifeCycleServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlLifeCycleServiceTests.java index 2095d99100b53..983e1ffedf842 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlLifeCycleServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlLifeCycleServiceTests.java @@ -40,13 +40,20 @@ import org.junit.Before; import java.net.InetAddress; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Set; +import static org.elasticsearch.xpack.ml.MlLifeCycleService.MAX_GRACEFUL_SHUTDOWN_TIME; +import static org.elasticsearch.xpack.ml.MlLifeCycleService.isNodeSafeToShutdown; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -94,10 +101,34 @@ public void testIsNodeSafeToShutdown() { Metadata metadata = Metadata.builder().putCustom(PersistentTasksCustomMetadata.TYPE, tasksBuilder.build()).build(); ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metadata(metadata).build(); - assertThat(MlLifeCycleService.isNodeSafeToShutdown("node-1", clusterState), is(false)); // has AD job - assertThat(MlLifeCycleService.isNodeSafeToShutdown("node-2", clusterState), is(true)); // has DFA job - assertThat(MlLifeCycleService.isNodeSafeToShutdown("node-3", clusterState), is(false)); // has snapshot upgrade - assertThat(MlLifeCycleService.isNodeSafeToShutdown("node-4", clusterState), is(true)); // has no ML tasks + Instant shutdownStartTime = Instant.now(); + + // We might be asked if it's safe to shut down immediately after being asked to shut down + Clock clock = Clock.fixed(shutdownStartTime, ZoneId.systemDefault()); + assertThat(isNodeSafeToShutdown("node-1", clusterState, shutdownStartTime, clock), is(false)); // has AD job + assertThat(isNodeSafeToShutdown("node-2", clusterState, shutdownStartTime, clock), is(true)); // has DFA job + assertThat(isNodeSafeToShutdown("node-3", clusterState, shutdownStartTime, clock), is(false)); // has snapshot upgrade + assertThat(isNodeSafeToShutdown("node-4", clusterState, shutdownStartTime, clock), is(true)); // has no ML tasks + + // Results should also be the same if we're asked if it's safe to shut down before being asked to shut down + assertThat(isNodeSafeToShutdown("node-1", clusterState, null, Clock.systemUTC()), is(false)); // has AD job + assertThat(isNodeSafeToShutdown("node-2", clusterState, null, Clock.systemUTC()), is(true)); // has DFA job + assertThat(isNodeSafeToShutdown("node-3", clusterState, null, Clock.systemUTC()), is(false)); // has snapshot upgrade + assertThat(isNodeSafeToShutdown("node-4", clusterState, null, Clock.systemUTC()), is(true)); // has no ML tasks + + // Results should still be the same 1 minute into shutdown + clock = Clock.fixed(clock.instant().plus(Duration.ofMinutes(1)), ZoneId.systemDefault()); + assertThat(isNodeSafeToShutdown("node-1", clusterState, shutdownStartTime, clock), is(false)); // has AD job + assertThat(isNodeSafeToShutdown("node-2", clusterState, shutdownStartTime, clock), is(true)); // has DFA job + assertThat(isNodeSafeToShutdown("node-3", clusterState, shutdownStartTime, clock), is(false)); // has snapshot upgrade + assertThat(isNodeSafeToShutdown("node-4", clusterState, shutdownStartTime, clock), is(true)); // has no ML tasks + + // After the timeout we should always report it's safe to shut down + clock = Clock.fixed(clock.instant().plus(MAX_GRACEFUL_SHUTDOWN_TIME), ZoneId.systemDefault()); + assertThat(isNodeSafeToShutdown("node-1", clusterState, shutdownStartTime, clock), is(true)); // has AD job + assertThat(isNodeSafeToShutdown("node-2", clusterState, shutdownStartTime, clock), is(true)); // has DFA job + assertThat(isNodeSafeToShutdown("node-3", clusterState, shutdownStartTime, clock), is(true)); // has snapshot upgrade + assertThat(isNodeSafeToShutdown("node-4", clusterState, shutdownStartTime, clock), is(true)); // has no ML tasks } public void testIsNodeSafeToShutdownGivenFailedTasks() { @@ -120,10 +151,13 @@ public void testIsNodeSafeToShutdownGivenFailedTasks() { Metadata metadata = Metadata.builder().putCustom(PersistentTasksCustomMetadata.TYPE, tasksBuilder.build()).build(); ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metadata(metadata).build(); - assertThat(MlLifeCycleService.isNodeSafeToShutdown("node-1", clusterState), is(true)); // has failed AD job - assertThat(MlLifeCycleService.isNodeSafeToShutdown("node-2", clusterState), is(true)); // has failed DFA job - assertThat(MlLifeCycleService.isNodeSafeToShutdown("node-3", clusterState), is(true)); // has failed snapshot upgrade - assertThat(MlLifeCycleService.isNodeSafeToShutdown("node-4", clusterState), is(true)); // has no ML tasks + // For these tests it shouldn't matter when shutdown started or what the time is now, because it's always safe to shut down + Instant shutdownStartTime = randomFrom(Instant.now(), null); + Clock clock = Clock.fixed(randomFrom(Instant.now(), Instant.now().plus(Duration.ofDays(1))), ZoneId.systemDefault()); + assertThat(isNodeSafeToShutdown("node-1", clusterState, shutdownStartTime, clock), is(true)); // has failed AD job + assertThat(isNodeSafeToShutdown("node-2", clusterState, shutdownStartTime, clock), is(true)); // has failed DFA job + assertThat(isNodeSafeToShutdown("node-3", clusterState, shutdownStartTime, clock), is(true)); // has failed snapshot upgrade + assertThat(isNodeSafeToShutdown("node-4", clusterState, shutdownStartTime, clock), is(true)); // has no ML tasks } public void testSignalGracefulShutdownIncludingLocalNode() { @@ -145,11 +179,18 @@ public void testSignalGracefulShutdownIncludingLocalNode() { Collection shutdownNodeIds = randomBoolean() ? Collections.singleton("node-2") : Arrays.asList("node-1", "node-2", "node-3"); - mlLifeCycleService.signalGracefulShutdown(clusterState, shutdownNodeIds); + final Clock clock = Clock.fixed(Instant.now(), ZoneId.systemDefault()); + mlLifeCycleService.signalGracefulShutdown(clusterState, shutdownNodeIds, clock); + assertThat(mlLifeCycleService.getShutdownStartTime(), is(clock.instant())); verify(datafeedRunner).vacateAllDatafeedsOnThisNode("previously assigned node [node-2-name] is shutting down"); verify(autodetectProcessManager).vacateOpenJobsOnThisNode(); verifyNoMoreInteractions(datafeedRunner, autodetectProcessManager, analyticsManager, memoryTracker); + + // Calling the method again should not advance the start time + Clock advancedClock = Clock.fixed(clock.instant().plus(Duration.ofMinutes(1)), ZoneId.systemDefault()); + mlLifeCycleService.signalGracefulShutdown(clusterState, shutdownNodeIds, advancedClock); + assertThat(mlLifeCycleService.getShutdownStartTime(), is(clock.instant())); } public void testSignalGracefulShutdownExcludingLocalNode() { @@ -170,7 +211,8 @@ public void testSignalGracefulShutdownExcludingLocalNode() { Collection shutdownNodeIds = randomBoolean() ? Collections.singleton("node-1") : Arrays.asList("node-1", "node-3"); - mlLifeCycleService.signalGracefulShutdown(clusterState, shutdownNodeIds); + mlLifeCycleService.signalGracefulShutdown(clusterState, shutdownNodeIds, Clock.systemUTC()); + assertThat(mlLifeCycleService.getShutdownStartTime(), nullValue()); verifyNoMoreInteractions(datafeedRunner, autodetectProcessManager, analyticsManager, memoryTracker); }