From 8f5e45755ca045d1b9fa9afc0a7e8026fa8c1f65 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Wed, 11 Aug 2021 12:44:51 +0100 Subject: [PATCH] [ML] Cap graceful shutdown time (#76342) The node shutdown work done in #75188 did not impose any upper bound on the time allowed for ML jobs to shut down gracefully. This change imposes a cap of 10 minutes on shutdown time. In reality closing ML jobs shouldn't take this long, but we don't want ML to stall the shutdown process forever due to a bug, and require user intervention to recover. --- .../xpack/ml/MlLifeCycleService.java | 40 ++++++++++-- .../xpack/ml/MlLifeCycleServiceTests.java | 62 ++++++++++++++++--- 2 files changed, 87 insertions(+), 15 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java index b55bf2a8dbc54..80a5d0890ecaa 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlLifeCycleService.java @@ -6,6 +6,8 @@ */ package org.elasticsearch.xpack.ml; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.LifecycleListener; @@ -18,17 +20,32 @@ import org.elasticsearch.xpack.ml.process.MlMemoryTracker; import java.io.IOException; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.Collection; import java.util.Objects; public class MlLifeCycleService { + /** + * Maximum time we'll wait for jobs to gracefully persist their state and stop their associated + * processes. We expect this to take a minute or two at most if all goes to plan. The longer + * timeout here is to avoid the need for user intervention if something doesn't work and the + * graceful shutdown gets stuck. + */ + public static final Duration MAX_GRACEFUL_SHUTDOWN_TIME = Duration.of(10, ChronoUnit.MINUTES); + + private static final Logger logger = LogManager.getLogger(MlLifeCycleService.class); + private final ClusterService clusterService; private final DatafeedRunner datafeedRunner; private final MlController mlController; private final AutodetectProcessManager autodetectProcessManager; private final DataFrameAnalyticsManager analyticsManager; private final MlMemoryTracker memoryTracker; + private volatile Instant shutdownStartTime; MlLifeCycleService(ClusterService clusterService, DatafeedRunner datafeedRunner, MlController mlController, AutodetectProcessManager autodetectProcessManager, DataFrameAnalyticsManager analyticsManager, @@ -69,10 +86,16 @@ public synchronized void stop() { * @return Has all active ML work vacated the specified node? */ public boolean isNodeSafeToShutdown(String nodeId) { - return isNodeSafeToShutdown(nodeId, clusterService.state()); + return isNodeSafeToShutdown(nodeId, clusterService.state(), shutdownStartTime, Clock.systemUTC()); } - static boolean isNodeSafeToShutdown(String nodeId, ClusterState state) { + static boolean isNodeSafeToShutdown(String nodeId, ClusterState state, Instant shutdownStartTime, Clock clock) { + + // If the shutdown has taken too long then any remaining tasks will just be cut off when the node dies + if (shutdownStartTime != null && shutdownStartTime.isBefore(clock.instant().minus(MAX_GRACEFUL_SHUTDOWN_TIME))) { + return true; + } + PersistentTasksCustomMetadata tasks = state.metadata().custom(PersistentTasksCustomMetadata.TYPE); // TODO: currently only considering anomaly detection jobs - could extend in the future // Ignore failed jobs - the persistent task still exists to remember the failure (because no @@ -90,15 +113,22 @@ static boolean isNodeSafeToShutdown(String nodeId, ClusterState state) { * @param shutdownNodeIds IDs of all nodes being shut down. */ public void signalGracefulShutdown(Collection shutdownNodeIds) { - signalGracefulShutdown(clusterService.state(), shutdownNodeIds); + signalGracefulShutdown(clusterService.state(), shutdownNodeIds, Clock.systemUTC()); } - void signalGracefulShutdown(ClusterState state, Collection shutdownNodeIds) { + void signalGracefulShutdown(ClusterState state, Collection shutdownNodeIds, Clock clock) { if (shutdownNodeIds.contains(state.nodes().getLocalNodeId())) { - + if (shutdownStartTime == null) { + shutdownStartTime = Instant.now(clock); + logger.info("Starting node shutdown sequence for ML"); + } datafeedRunner.vacateAllDatafeedsOnThisNode( "previously assigned node [" + state.nodes().getLocalNode().getName() + "] is shutting down"); autodetectProcessManager.vacateOpenJobsOnThisNode(); } } + + Instant getShutdownStartTime() { + return shutdownStartTime; + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlLifeCycleServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlLifeCycleServiceTests.java index e55522dd1030e..55ee77165c651 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlLifeCycleServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlLifeCycleServiceTests.java @@ -36,11 +36,18 @@ import org.junit.Before; import java.net.InetAddress; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import static org.elasticsearch.xpack.ml.MlLifeCycleService.MAX_GRACEFUL_SHUTDOWN_TIME; +import static org.elasticsearch.xpack.ml.MlLifeCycleService.isNodeSafeToShutdown; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -82,10 +89,34 @@ public void testIsNodeSafeToShutdown() { Metadata metadata = Metadata.builder().putCustom(PersistentTasksCustomMetadata.TYPE, tasksBuilder.build()).build(); ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metadata(metadata).build(); - assertThat(MlLifeCycleService.isNodeSafeToShutdown("node-1", clusterState), is(false)); // has AD job - assertThat(MlLifeCycleService.isNodeSafeToShutdown("node-2", clusterState), is(true)); // has DFA job - assertThat(MlLifeCycleService.isNodeSafeToShutdown("node-3", clusterState), is(false)); // has snapshot upgrade - assertThat(MlLifeCycleService.isNodeSafeToShutdown("node-4", clusterState), is(true)); // has no ML tasks + Instant shutdownStartTime = Instant.now(); + + // We might be asked if it's safe to shut down immediately after being asked to shut down + Clock clock = Clock.fixed(shutdownStartTime, ZoneId.systemDefault()); + assertThat(isNodeSafeToShutdown("node-1", clusterState, shutdownStartTime, clock), is(false)); // has AD job + assertThat(isNodeSafeToShutdown("node-2", clusterState, shutdownStartTime, clock), is(true)); // has DFA job + assertThat(isNodeSafeToShutdown("node-3", clusterState, shutdownStartTime, clock), is(false)); // has snapshot upgrade + assertThat(isNodeSafeToShutdown("node-4", clusterState, shutdownStartTime, clock), is(true)); // has no ML tasks + + // Results should also be the same if we're asked if it's safe to shut down before being asked to shut down + assertThat(isNodeSafeToShutdown("node-1", clusterState, null, Clock.systemUTC()), is(false)); // has AD job + assertThat(isNodeSafeToShutdown("node-2", clusterState, null, Clock.systemUTC()), is(true)); // has DFA job + assertThat(isNodeSafeToShutdown("node-3", clusterState, null, Clock.systemUTC()), is(false)); // has snapshot upgrade + assertThat(isNodeSafeToShutdown("node-4", clusterState, null, Clock.systemUTC()), is(true)); // has no ML tasks + + // Results should still be the same 1 minute into shutdown + clock = Clock.fixed(clock.instant().plus(Duration.ofMinutes(1)), ZoneId.systemDefault()); + assertThat(isNodeSafeToShutdown("node-1", clusterState, shutdownStartTime, clock), is(false)); // has AD job + assertThat(isNodeSafeToShutdown("node-2", clusterState, shutdownStartTime, clock), is(true)); // has DFA job + assertThat(isNodeSafeToShutdown("node-3", clusterState, shutdownStartTime, clock), is(false)); // has snapshot upgrade + assertThat(isNodeSafeToShutdown("node-4", clusterState, shutdownStartTime, clock), is(true)); // has no ML tasks + + // After the timeout we should always report it's safe to shut down + clock = Clock.fixed(clock.instant().plus(MAX_GRACEFUL_SHUTDOWN_TIME), ZoneId.systemDefault()); + assertThat(isNodeSafeToShutdown("node-1", clusterState, shutdownStartTime, clock), is(true)); // has AD job + assertThat(isNodeSafeToShutdown("node-2", clusterState, shutdownStartTime, clock), is(true)); // has DFA job + assertThat(isNodeSafeToShutdown("node-3", clusterState, shutdownStartTime, clock), is(true)); // has snapshot upgrade + assertThat(isNodeSafeToShutdown("node-4", clusterState, shutdownStartTime, clock), is(true)); // has no ML tasks } public void testIsNodeSafeToShutdownGivenFailedTasks() { @@ -108,10 +139,13 @@ public void testIsNodeSafeToShutdownGivenFailedTasks() { Metadata metadata = Metadata.builder().putCustom(PersistentTasksCustomMetadata.TYPE, tasksBuilder.build()).build(); ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metadata(metadata).build(); - assertThat(MlLifeCycleService.isNodeSafeToShutdown("node-1", clusterState), is(true)); // has failed AD job - assertThat(MlLifeCycleService.isNodeSafeToShutdown("node-2", clusterState), is(true)); // has failed DFA job - assertThat(MlLifeCycleService.isNodeSafeToShutdown("node-3", clusterState), is(true)); // has failed snapshot upgrade - assertThat(MlLifeCycleService.isNodeSafeToShutdown("node-4", clusterState), is(true)); // has no ML tasks + // For these tests it shouldn't matter when shutdown started or what the time is now, because it's always safe to shut down + Instant shutdownStartTime = randomFrom(Instant.now(), null); + Clock clock = Clock.fixed(randomFrom(Instant.now(), Instant.now().plus(Duration.ofDays(1))), ZoneId.systemDefault()); + assertThat(isNodeSafeToShutdown("node-1", clusterState, shutdownStartTime, clock), is(true)); // has failed AD job + assertThat(isNodeSafeToShutdown("node-2", clusterState, shutdownStartTime, clock), is(true)); // has failed DFA job + assertThat(isNodeSafeToShutdown("node-3", clusterState, shutdownStartTime, clock), is(true)); // has failed snapshot upgrade + assertThat(isNodeSafeToShutdown("node-4", clusterState, shutdownStartTime, clock), is(true)); // has no ML tasks } public void testSignalGracefulShutdownIncludingLocalNode() { @@ -133,11 +167,18 @@ public void testSignalGracefulShutdownIncludingLocalNode() { Collection shutdownNodeIds = randomBoolean() ? Collections.singleton("node-2") : Arrays.asList("node-1", "node-2", "node-3"); - mlLifeCycleService.signalGracefulShutdown(clusterState, shutdownNodeIds); + final Clock clock = Clock.fixed(Instant.now(), ZoneId.systemDefault()); + mlLifeCycleService.signalGracefulShutdown(clusterState, shutdownNodeIds, clock); + assertThat(mlLifeCycleService.getShutdownStartTime(), is(clock.instant())); verify(datafeedRunner).vacateAllDatafeedsOnThisNode("previously assigned node [node-2-name] is shutting down"); verify(autodetectProcessManager).vacateOpenJobsOnThisNode(); verifyNoMoreInteractions(datafeedRunner, mlController, autodetectProcessManager, analyticsManager, memoryTracker); + + // Calling the method again should not advance the start time + Clock advancedClock = Clock.fixed(clock.instant().plus(Duration.ofMinutes(1)), ZoneId.systemDefault()); + mlLifeCycleService.signalGracefulShutdown(clusterState, shutdownNodeIds, advancedClock); + assertThat(mlLifeCycleService.getShutdownStartTime(), is(clock.instant())); } public void testSignalGracefulShutdownExcludingLocalNode() { @@ -158,7 +199,8 @@ public void testSignalGracefulShutdownExcludingLocalNode() { Collection shutdownNodeIds = randomBoolean() ? Collections.singleton("node-1") : Arrays.asList("node-1", "node-3"); - mlLifeCycleService.signalGracefulShutdown(clusterState, shutdownNodeIds); + mlLifeCycleService.signalGracefulShutdown(clusterState, shutdownNodeIds, Clock.systemUTC()); + assertThat(mlLifeCycleService.getShutdownStartTime(), nullValue()); verifyNoMoreInteractions(datafeedRunner, mlController, autodetectProcessManager, analyticsManager, memoryTracker); }