Skip to content

Commit

Permalink
[ML] Cap graceful shutdown time (#76342)
Browse files Browse the repository at this point in the history
The node shutdown work done in #75188 did not impose any
upper bound on the time allowed for ML jobs to shut down
gracefully.

This change imposes a cap of 10 minutes on shutdown time.
In reality closing ML jobs shouldn't take this long, but
we don't want ML to stall the shutdown process forever
due to a bug, and require user intervention to recover.
  • Loading branch information
droberts195 authored Aug 11, 2021
1 parent 97d8a15 commit 8f5e457
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
*/
package org.elasticsearch.xpack.ml;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.LifecycleListener;
Expand All @@ -18,17 +20,32 @@
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;

import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.Objects;

public class MlLifeCycleService {

/**
* Maximum time we'll wait for jobs to gracefully persist their state and stop their associated
* processes. We expect this to take a minute or two at most if all goes to plan. The longer
* timeout here is to avoid the need for user intervention if something doesn't work and the
* graceful shutdown gets stuck.
*/
public static final Duration MAX_GRACEFUL_SHUTDOWN_TIME = Duration.of(10, ChronoUnit.MINUTES);

private static final Logger logger = LogManager.getLogger(MlLifeCycleService.class);

private final ClusterService clusterService;
private final DatafeedRunner datafeedRunner;
private final MlController mlController;
private final AutodetectProcessManager autodetectProcessManager;
private final DataFrameAnalyticsManager analyticsManager;
private final MlMemoryTracker memoryTracker;
private volatile Instant shutdownStartTime;

MlLifeCycleService(ClusterService clusterService, DatafeedRunner datafeedRunner, MlController mlController,
AutodetectProcessManager autodetectProcessManager, DataFrameAnalyticsManager analyticsManager,
Expand Down Expand Up @@ -69,10 +86,16 @@ public synchronized void stop() {
* @return Has all active ML work vacated the specified node?
*/
public boolean isNodeSafeToShutdown(String nodeId) {
return isNodeSafeToShutdown(nodeId, clusterService.state());
return isNodeSafeToShutdown(nodeId, clusterService.state(), shutdownStartTime, Clock.systemUTC());
}

static boolean isNodeSafeToShutdown(String nodeId, ClusterState state) {
static boolean isNodeSafeToShutdown(String nodeId, ClusterState state, Instant shutdownStartTime, Clock clock) {

// If the shutdown has taken too long then any remaining tasks will just be cut off when the node dies
if (shutdownStartTime != null && shutdownStartTime.isBefore(clock.instant().minus(MAX_GRACEFUL_SHUTDOWN_TIME))) {
return true;
}

PersistentTasksCustomMetadata tasks = state.metadata().custom(PersistentTasksCustomMetadata.TYPE);
// TODO: currently only considering anomaly detection jobs - could extend in the future
// Ignore failed jobs - the persistent task still exists to remember the failure (because no
Expand All @@ -90,15 +113,22 @@ static boolean isNodeSafeToShutdown(String nodeId, ClusterState state) {
* @param shutdownNodeIds IDs of all nodes being shut down.
*/
public void signalGracefulShutdown(Collection<String> shutdownNodeIds) {
signalGracefulShutdown(clusterService.state(), shutdownNodeIds);
signalGracefulShutdown(clusterService.state(), shutdownNodeIds, Clock.systemUTC());
}

void signalGracefulShutdown(ClusterState state, Collection<String> shutdownNodeIds) {
void signalGracefulShutdown(ClusterState state, Collection<String> shutdownNodeIds, Clock clock) {
if (shutdownNodeIds.contains(state.nodes().getLocalNodeId())) {

if (shutdownStartTime == null) {
shutdownStartTime = Instant.now(clock);
logger.info("Starting node shutdown sequence for ML");
}
datafeedRunner.vacateAllDatafeedsOnThisNode(
"previously assigned node [" + state.nodes().getLocalNode().getName() + "] is shutting down");
autodetectProcessManager.vacateOpenJobsOnThisNode();
}
}

Instant getShutdownStartTime() {
return shutdownStartTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,18 @@
import org.junit.Before;

import java.net.InetAddress;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;

import static org.elasticsearch.xpack.ml.MlLifeCycleService.MAX_GRACEFUL_SHUTDOWN_TIME;
import static org.elasticsearch.xpack.ml.MlLifeCycleService.isNodeSafeToShutdown;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
Expand Down Expand Up @@ -82,10 +89,34 @@ public void testIsNodeSafeToShutdown() {
Metadata metadata = Metadata.builder().putCustom(PersistentTasksCustomMetadata.TYPE, tasksBuilder.build()).build();
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metadata(metadata).build();

assertThat(MlLifeCycleService.isNodeSafeToShutdown("node-1", clusterState), is(false)); // has AD job
assertThat(MlLifeCycleService.isNodeSafeToShutdown("node-2", clusterState), is(true)); // has DFA job
assertThat(MlLifeCycleService.isNodeSafeToShutdown("node-3", clusterState), is(false)); // has snapshot upgrade
assertThat(MlLifeCycleService.isNodeSafeToShutdown("node-4", clusterState), is(true)); // has no ML tasks
Instant shutdownStartTime = Instant.now();

// We might be asked if it's safe to shut down immediately after being asked to shut down
Clock clock = Clock.fixed(shutdownStartTime, ZoneId.systemDefault());
assertThat(isNodeSafeToShutdown("node-1", clusterState, shutdownStartTime, clock), is(false)); // has AD job
assertThat(isNodeSafeToShutdown("node-2", clusterState, shutdownStartTime, clock), is(true)); // has DFA job
assertThat(isNodeSafeToShutdown("node-3", clusterState, shutdownStartTime, clock), is(false)); // has snapshot upgrade
assertThat(isNodeSafeToShutdown("node-4", clusterState, shutdownStartTime, clock), is(true)); // has no ML tasks

// Results should also be the same if we're asked if it's safe to shut down before being asked to shut down
assertThat(isNodeSafeToShutdown("node-1", clusterState, null, Clock.systemUTC()), is(false)); // has AD job
assertThat(isNodeSafeToShutdown("node-2", clusterState, null, Clock.systemUTC()), is(true)); // has DFA job
assertThat(isNodeSafeToShutdown("node-3", clusterState, null, Clock.systemUTC()), is(false)); // has snapshot upgrade
assertThat(isNodeSafeToShutdown("node-4", clusterState, null, Clock.systemUTC()), is(true)); // has no ML tasks

// Results should still be the same 1 minute into shutdown
clock = Clock.fixed(clock.instant().plus(Duration.ofMinutes(1)), ZoneId.systemDefault());
assertThat(isNodeSafeToShutdown("node-1", clusterState, shutdownStartTime, clock), is(false)); // has AD job
assertThat(isNodeSafeToShutdown("node-2", clusterState, shutdownStartTime, clock), is(true)); // has DFA job
assertThat(isNodeSafeToShutdown("node-3", clusterState, shutdownStartTime, clock), is(false)); // has snapshot upgrade
assertThat(isNodeSafeToShutdown("node-4", clusterState, shutdownStartTime, clock), is(true)); // has no ML tasks

// After the timeout we should always report it's safe to shut down
clock = Clock.fixed(clock.instant().plus(MAX_GRACEFUL_SHUTDOWN_TIME), ZoneId.systemDefault());
assertThat(isNodeSafeToShutdown("node-1", clusterState, shutdownStartTime, clock), is(true)); // has AD job
assertThat(isNodeSafeToShutdown("node-2", clusterState, shutdownStartTime, clock), is(true)); // has DFA job
assertThat(isNodeSafeToShutdown("node-3", clusterState, shutdownStartTime, clock), is(true)); // has snapshot upgrade
assertThat(isNodeSafeToShutdown("node-4", clusterState, shutdownStartTime, clock), is(true)); // has no ML tasks
}

public void testIsNodeSafeToShutdownGivenFailedTasks() {
Expand All @@ -108,10 +139,13 @@ public void testIsNodeSafeToShutdownGivenFailedTasks() {
Metadata metadata = Metadata.builder().putCustom(PersistentTasksCustomMetadata.TYPE, tasksBuilder.build()).build();
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metadata(metadata).build();

assertThat(MlLifeCycleService.isNodeSafeToShutdown("node-1", clusterState), is(true)); // has failed AD job
assertThat(MlLifeCycleService.isNodeSafeToShutdown("node-2", clusterState), is(true)); // has failed DFA job
assertThat(MlLifeCycleService.isNodeSafeToShutdown("node-3", clusterState), is(true)); // has failed snapshot upgrade
assertThat(MlLifeCycleService.isNodeSafeToShutdown("node-4", clusterState), is(true)); // has no ML tasks
// For these tests it shouldn't matter when shutdown started or what the time is now, because it's always safe to shut down
Instant shutdownStartTime = randomFrom(Instant.now(), null);
Clock clock = Clock.fixed(randomFrom(Instant.now(), Instant.now().plus(Duration.ofDays(1))), ZoneId.systemDefault());
assertThat(isNodeSafeToShutdown("node-1", clusterState, shutdownStartTime, clock), is(true)); // has failed AD job
assertThat(isNodeSafeToShutdown("node-2", clusterState, shutdownStartTime, clock), is(true)); // has failed DFA job
assertThat(isNodeSafeToShutdown("node-3", clusterState, shutdownStartTime, clock), is(true)); // has failed snapshot upgrade
assertThat(isNodeSafeToShutdown("node-4", clusterState, shutdownStartTime, clock), is(true)); // has no ML tasks
}

public void testSignalGracefulShutdownIncludingLocalNode() {
Expand All @@ -133,11 +167,18 @@ public void testSignalGracefulShutdownIncludingLocalNode() {
Collection<String> shutdownNodeIds =
randomBoolean() ? Collections.singleton("node-2") : Arrays.asList("node-1", "node-2", "node-3");

mlLifeCycleService.signalGracefulShutdown(clusterState, shutdownNodeIds);
final Clock clock = Clock.fixed(Instant.now(), ZoneId.systemDefault());
mlLifeCycleService.signalGracefulShutdown(clusterState, shutdownNodeIds, clock);
assertThat(mlLifeCycleService.getShutdownStartTime(), is(clock.instant()));

verify(datafeedRunner).vacateAllDatafeedsOnThisNode("previously assigned node [node-2-name] is shutting down");
verify(autodetectProcessManager).vacateOpenJobsOnThisNode();
verifyNoMoreInteractions(datafeedRunner, mlController, autodetectProcessManager, analyticsManager, memoryTracker);

// Calling the method again should not advance the start time
Clock advancedClock = Clock.fixed(clock.instant().plus(Duration.ofMinutes(1)), ZoneId.systemDefault());
mlLifeCycleService.signalGracefulShutdown(clusterState, shutdownNodeIds, advancedClock);
assertThat(mlLifeCycleService.getShutdownStartTime(), is(clock.instant()));
}

public void testSignalGracefulShutdownExcludingLocalNode() {
Expand All @@ -158,7 +199,8 @@ public void testSignalGracefulShutdownExcludingLocalNode() {
Collection<String> shutdownNodeIds =
randomBoolean() ? Collections.singleton("node-1") : Arrays.asList("node-1", "node-3");

mlLifeCycleService.signalGracefulShutdown(clusterState, shutdownNodeIds);
mlLifeCycleService.signalGracefulShutdown(clusterState, shutdownNodeIds, Clock.systemUTC());
assertThat(mlLifeCycleService.getShutdownStartTime(), nullValue());

verifyNoMoreInteractions(datafeedRunner, mlController, autodetectProcessManager, analyticsManager, memoryTracker);
}
Expand Down

0 comments on commit 8f5e457

Please sign in to comment.