diff --git a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexNodeShutdownIT.java b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexNodeShutdownIT.java new file mode 100644 index 000000000000..4a001bb2d096 --- /dev/null +++ b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexNodeShutdownIT.java @@ -0,0 +1,139 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.reindex; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.node.ShutdownPrepareService; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.reindex.ReindexPlugin; +import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.tasks.TaskManager; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.transport.TransportService; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.elasticsearch.node.ShutdownPrepareService.MAXIMUM_REINDEXING_TIMEOUT_SETTING; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; + +/** + * Test that a wait added during shutdown is necessary for a large reindexing task to complete. + * The test works as follows: + * 1. Start a large (reasonably long running) reindexing request on the coordinator-only node. + * 2. Check that the reindexing task appears on the coordinating node + * 3. With a 10s timeout value for MAXIMUM_REINDEXING_TIMEOUT_SETTING, + * wait for the reindexing task to complete before closing the node + * 4. Confirm that the reindexing task succeeds with the wait (it will fail without it) + */ +@ESIntegTestCase.ClusterScope(numDataNodes = 0, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST) +public class ReindexNodeShutdownIT extends ESIntegTestCase { + + protected static final String INDEX = "reindex-shutdown-index"; + protected static final String DEST_INDEX = "dest-index"; + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(ReindexPlugin.class); + } + + protected ReindexRequestBuilder reindex(String nodeName) { + return new ReindexRequestBuilder(internalCluster().client(nodeName)); + } + + public void testReindexWithShutdown() throws Exception { + final String masterNodeName = internalCluster().startMasterOnlyNode(); + final String dataNodeName = internalCluster().startDataOnlyNode(); + + final Settings COORD_SETTINGS = Settings.builder() + .put(MAXIMUM_REINDEXING_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(10)) + .build(); + final String coordNodeName = internalCluster().startCoordinatingOnlyNode(Settings.EMPTY); + + ensureStableCluster(3); + + int numDocs = 20000; + createIndex(numDocs); + createReindexTaskAndShutdown(coordNodeName); + checkDestinationIndex(dataNodeName, numDocs); + } + + private void createIndex(int numDocs) { + // INDEX will be created on the dataNode + createIndex(INDEX); + + logger.debug("setting up [{}] docs", numDocs); + indexRandom( + true, + false, + true, + IntStream.range(0, numDocs) + .mapToObj(i -> prepareIndex(INDEX).setId(String.valueOf(i)).setSource("n", i)) + .collect(Collectors.toList()) + ); + + // Checks that the all documents have been indexed and correctly counted + assertHitCount(prepareSearch(INDEX).setSize(0).setTrackTotalHits(true), numDocs); + } + + private void createReindexTaskAndShutdown(final String coordNodeName) throws Exception { + AbstractBulkByScrollRequestBuilder builder = reindex(coordNodeName).source(INDEX).destination(DEST_INDEX); + AbstractBulkByScrollRequest reindexRequest = builder.request(); + ShutdownPrepareService shutdownPrepareService = internalCluster().getInstance(ShutdownPrepareService.class, coordNodeName); + + TaskManager taskManager = internalCluster().getInstance(TransportService.class, coordNodeName).getTaskManager(); + + // Now execute the reindex action... + ActionListener reindexListener = new ActionListener() { + @Override + public void onResponse(BulkByScrollResponse bulkByScrollResponse) { + assertNull(bulkByScrollResponse.getReasonCancelled()); + logger.debug(bulkByScrollResponse.toString()); + } + + @Override + public void onFailure(Exception e) { + logger.debug("Encounterd " + e.toString()); + fail(e, "Encounterd " + e.toString()); + } + }; + internalCluster().client(coordNodeName).execute(ReindexAction.INSTANCE, reindexRequest, reindexListener); + + // Check for reindex task to appear in the tasks list and Immediately stop coordinating node + waitForTask(ReindexAction.INSTANCE.name(), coordNodeName); + shutdownPrepareService.prepareForShutdown(taskManager); + internalCluster().stopNode(coordNodeName); + } + + // Make sure all documents from the source index have been reindexed into the destination index + private void checkDestinationIndex(String dataNodeName, int numDocs) throws Exception { + assertTrue(indexExists(DEST_INDEX)); + flushAndRefresh(DEST_INDEX); + assertBusy(() -> { assertHitCount(prepareSearch(DEST_INDEX).setSize(0).setTrackTotalHits(true), numDocs); }); + } + + private static void waitForTask(String actionName, String nodeName) throws Exception { + assertBusy(() -> { + ListTasksResponse tasks = clusterAdmin().prepareListTasks(nodeName).setActions(actionName).setDetailed(true).get(); + tasks.rethrowFailures("Find my task"); + for (TaskInfo taskInfo : tasks.getTasks()) { + // Skip tasks with a parent because those are children of the task we want + if (taskInfo.parentTaskId().isSet() == false) return; + } + fail("Couldn't find task after waiting, tasks=" + tasks.getTasks()); + }, 10, TimeUnit.SECONDS); + } +} diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 8cbacccb915a..7bb78eabc872 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -108,6 +108,7 @@ import org.elasticsearch.monitor.process.ProcessService; import org.elasticsearch.node.Node; import org.elasticsearch.node.NodeRoleSettings; +import org.elasticsearch.node.ShutdownPrepareService; import org.elasticsearch.persistent.PersistentTasksClusterService; import org.elasticsearch.persistent.decider.EnableAssignmentDecider; import org.elasticsearch.plugins.PluginsService; @@ -456,6 +457,8 @@ public void apply(Settings value, Settings current, Settings previous) { Environment.PATH_SHARED_DATA_SETTING, NodeEnvironment.NODE_ID_SEED_SETTING, Node.INITIAL_STATE_TIMEOUT_SETTING, + ShutdownPrepareService.MAXIMUM_SHUTDOWN_TIMEOUT_SETTING, + ShutdownPrepareService.MAXIMUM_REINDEXING_TIMEOUT_SETTING, DiscoveryModule.DISCOVERY_TYPE_SETTING, DiscoveryModule.DISCOVERY_SEED_PROVIDERS_SETTING, DiscoveryModule.ELECTION_STRATEGY_SETTING, diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 32a65302922a..e30f76fdd941 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -13,10 +13,6 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchTimeoutException; -import org.elasticsearch.action.search.TransportSearchAction; -import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.action.support.RefCountingListener; -import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.bootstrap.BootstrapCheck; import org.elasticsearch.bootstrap.BootstrapContext; import org.elasticsearch.client.internal.Client; @@ -82,7 +78,6 @@ import org.elasticsearch.snapshots.SnapshotShardsService; import org.elasticsearch.snapshots.SnapshotsService; import org.elasticsearch.tasks.TaskCancellationService; -import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.tasks.TaskResultsService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterPortSettings; @@ -106,18 +101,12 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; import javax.net.ssl.SNIHostName; -import static org.elasticsearch.core.Strings.format; - /** * A node represent a node within a cluster ({@code cluster.name}). The {@link #client()} can be used * in order to use a {@link Client} to perform actions/operations against the cluster. @@ -161,12 +150,6 @@ public class Node implements Closeable { Property.NodeScope ); - public static final Setting MAXIMUM_SHUTDOWN_TIMEOUT_SETTING = Setting.positiveTimeSetting( - "node.maximum_shutdown_grace_period", - TimeValue.ZERO, - Setting.Property.NodeScope - ); - private final Lifecycle lifecycle = new Lifecycle(); /** @@ -187,6 +170,7 @@ public class Node implements Closeable { private final LocalNodeFactory localNodeFactory; private final NodeService nodeService; private final TerminationHandler terminationHandler; + // for testing final NamedWriteableRegistry namedWriteableRegistry; final NamedXContentRegistry namedXContentRegistry; @@ -606,105 +590,8 @@ public synchronized void close() throws IOException { * logic should use Node Shutdown, see {@link org.elasticsearch.cluster.metadata.NodesShutdownMetadata}. */ public void prepareForClose() { - final var maxTimeout = MAXIMUM_SHUTDOWN_TIMEOUT_SETTING.get(this.settings()); - - record Stopper(String name, SubscribableListener listener) { - boolean isIncomplete() { - return listener().isDone() == false; - } - } - - final var stoppers = new ArrayList(); - final var allStoppersFuture = new PlainActionFuture(); - try (var listeners = new RefCountingListener(allStoppersFuture)) { - final BiConsumer stopperRunner = (name, action) -> { - final var stopper = new Stopper(name, new SubscribableListener<>()); - stoppers.add(stopper); - stopper.listener().addListener(listeners.acquire()); - new Thread(() -> { - try { - action.run(); - } catch (Exception ex) { - logger.warn("unexpected exception in shutdown task [" + stopper.name() + "]", ex); - } finally { - stopper.listener().onResponse(null); - } - }, stopper.name()).start(); - }; - - stopperRunner.accept("http-server-transport-stop", injector.getInstance(HttpServerTransport.class)::close); - stopperRunner.accept("async-search-stop", () -> awaitSearchTasksComplete(maxTimeout)); - if (terminationHandler != null) { - stopperRunner.accept("termination-handler-stop", terminationHandler::handleTermination); - } - } - - final Supplier incompleteStoppersDescriber = () -> stoppers.stream() - .filter(Stopper::isIncomplete) - .map(Stopper::name) - .collect(Collectors.joining(", ", "[", "]")); - - try { - if (TimeValue.ZERO.equals(maxTimeout)) { - allStoppersFuture.get(); - } else { - allStoppersFuture.get(maxTimeout.millis(), TimeUnit.MILLISECONDS); - } - } catch (ExecutionException e) { - assert false : e; // listeners are never completed exceptionally - logger.warn("failed during graceful shutdown tasks", e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.warn("interrupted while waiting for graceful shutdown tasks: " + incompleteStoppersDescriber.get(), e); - } catch (TimeoutException e) { - logger.warn("timed out while waiting for graceful shutdown tasks: " + incompleteStoppersDescriber.get()); - } - } - - private void awaitSearchTasksComplete(TimeValue asyncSearchTimeout) { - TaskManager taskManager = injector.getInstance(TransportService.class).getTaskManager(); - long millisWaited = 0; - while (true) { - long searchTasksRemaining = taskManager.getTasks() - .values() - .stream() - .filter(task -> TransportSearchAction.TYPE.name().equals(task.getAction())) - .count(); - if (searchTasksRemaining == 0) { - logger.debug("all search tasks complete"); - return; - } else { - // Let the system work on those searches for a while. We're on a dedicated thread to manage app shutdown, so we - // literally just want to wait and not take up resources on this thread for now. Poll period chosen to allow short - // response times, but checking the tasks list is relatively expensive, and we don't want to waste CPU time we could - // be spending on finishing those searches. - final TimeValue pollPeriod = TimeValue.timeValueMillis(500); - millisWaited += pollPeriod.millis(); - if (TimeValue.ZERO.equals(asyncSearchTimeout) == false && millisWaited >= asyncSearchTimeout.millis()) { - logger.warn( - format( - "timed out after waiting [%s] for [%d] search tasks to finish", - asyncSearchTimeout.toString(), - searchTasksRemaining - ) - ); - return; - } - logger.debug(format("waiting for [%s] search tasks to finish, next poll in [%s]", searchTasksRemaining, pollPeriod)); - try { - Thread.sleep(pollPeriod.millis()); - } catch (InterruptedException ex) { - logger.warn( - format( - "interrupted while waiting [%s] for [%d] search tasks to finish", - asyncSearchTimeout.toString(), - searchTasksRemaining - ) - ); - return; - } - } - } + injector.getInstance(ShutdownPrepareService.class) + .prepareForShutdown(injector.getInstance(TransportService.class).getTaskManager()); } /** diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 8e6648632957..7e3991c1df1f 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -1099,6 +1099,8 @@ private void construct( telemetryProvider.getTracer() ); + final ShutdownPrepareService shutdownPrepareService = new ShutdownPrepareService(settings, httpServerTransport, terminationHandler); + modules.add( loadPersistentTasksService( settingsModule, @@ -1200,6 +1202,7 @@ private void construct( b.bind(CompatibilityVersions.class).toInstance(compatibilityVersions); b.bind(DataStreamAutoShardingService.class).toInstance(dataStreamAutoShardingService); b.bind(FailureStoreMetrics.class).toInstance(failureStoreMetrics); + b.bind(ShutdownPrepareService.class).toInstance(shutdownPrepareService); }); if (ReadinessService.enabled(environment)) { diff --git a/server/src/main/java/org/elasticsearch/node/ShutdownPrepareService.java b/server/src/main/java/org/elasticsearch/node/ShutdownPrepareService.java new file mode 100644 index 000000000000..ab9537053f45 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/node/ShutdownPrepareService.java @@ -0,0 +1,184 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.node; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.search.TransportSearchAction; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.RefCountingListener; +import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.http.HttpServerTransport; +import org.elasticsearch.index.reindex.ReindexAction; +import org.elasticsearch.node.internal.TerminationHandler; +import org.elasticsearch.tasks.TaskManager; + +import java.util.ArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.BiConsumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.elasticsearch.core.Strings.format; + +/** + * This class was created to extract out the logic from {@link Node#prepareForClose()} to facilitate testing. + *

+ * Invokes hooks to prepare this node to be closed. This should be called when Elasticsearch receives a request to shut down + * gracefully from the underlying operating system, before system resources are closed. + *

+ * Note that this class is part of infrastructure to react to signals from the operating system - most graceful shutdown + * logic should use Node Shutdown, see {@link org.elasticsearch.cluster.metadata.NodesShutdownMetadata}. + */ +public class ShutdownPrepareService { + + private final Logger logger = LogManager.getLogger(ShutdownPrepareService.class); + private final Settings settings; + private final HttpServerTransport httpServerTransport; + private final TerminationHandler terminationHandler; + private volatile boolean hasBeenShutdown = false; + + public ShutdownPrepareService(Settings settings, HttpServerTransport httpServerTransport, TerminationHandler terminationHandler) { + this.settings = settings; + this.httpServerTransport = httpServerTransport; + this.terminationHandler = terminationHandler; + } + + public static final Setting MAXIMUM_SHUTDOWN_TIMEOUT_SETTING = Setting.positiveTimeSetting( + "node.maximum_shutdown_grace_period", + TimeValue.ZERO, + Setting.Property.NodeScope + ); + + public static final Setting MAXIMUM_REINDEXING_TIMEOUT_SETTING = Setting.positiveTimeSetting( + "node.maximum_reindexing_grace_period", + TimeValue.timeValueSeconds(10), + Setting.Property.NodeScope + ); + + /** + * Invokes hooks to prepare this node to be closed. This should be called when Elasticsearch receives a request to shut down + * gracefully from the underlying operating system, before system resources are closed. This method will block + * until the node is ready to shut down. + *

+ * Note that this class is part of infrastructure to react to signals from the operating system - most graceful shutdown + * logic should use Node Shutdown, see {@link org.elasticsearch.cluster.metadata.NodesShutdownMetadata}. + */ + public void prepareForShutdown(TaskManager taskManager) { + assert hasBeenShutdown == false; + hasBeenShutdown = true; + final var maxTimeout = MAXIMUM_SHUTDOWN_TIMEOUT_SETTING.get(settings); + final var reindexTimeout = MAXIMUM_REINDEXING_TIMEOUT_SETTING.get(settings); + + record Stopper(String name, SubscribableListener listener) { + boolean isIncomplete() { + return listener().isDone() == false; + } + } + + final var stoppers = new ArrayList(); + final var allStoppersFuture = new PlainActionFuture(); + try (var listeners = new RefCountingListener(allStoppersFuture)) { + final BiConsumer stopperRunner = (name, action) -> { + final var stopper = new Stopper(name, new SubscribableListener<>()); + stoppers.add(stopper); + stopper.listener().addListener(listeners.acquire()); + new Thread(() -> { + try { + action.run(); + } catch (Exception ex) { + logger.warn("unexpected exception in shutdown task [" + stopper.name() + "]", ex); + } finally { + stopper.listener().onResponse(null); + } + }, stopper.name()).start(); + }; + + stopperRunner.accept("http-server-transport-stop", httpServerTransport::close); + stopperRunner.accept("async-search-stop", () -> awaitSearchTasksComplete(maxTimeout, taskManager)); + stopperRunner.accept("reindex-stop", () -> awaitReindexTasksComplete(reindexTimeout, taskManager)); + if (terminationHandler != null) { + stopperRunner.accept("termination-handler-stop", terminationHandler::handleTermination); + } + } + + final Supplier incompleteStoppersDescriber = () -> stoppers.stream() + .filter(Stopper::isIncomplete) + .map(Stopper::name) + .collect(Collectors.joining(", ", "[", "]")); + + try { + if (TimeValue.ZERO.equals(maxTimeout)) { + allStoppersFuture.get(); + } else { + allStoppersFuture.get(maxTimeout.millis(), TimeUnit.MILLISECONDS); + } + } catch (ExecutionException e) { + assert false : e; // listeners are never completed exceptionally + logger.warn("failed during graceful shutdown tasks", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.warn("interrupted while waiting for graceful shutdown tasks: " + incompleteStoppersDescriber.get(), e); + } catch (TimeoutException e) { + logger.warn("timed out while waiting for graceful shutdown tasks: " + incompleteStoppersDescriber.get()); + } + } + + private void awaitTasksComplete(TimeValue timeout, String taskName, TaskManager taskManager) { + long millisWaited = 0; + while (true) { + long tasksRemaining = taskManager.getTasks().values().stream().filter(task -> taskName.equals(task.getAction())).count(); + if (tasksRemaining == 0) { + logger.debug("all " + taskName + " tasks complete"); + return; + } else { + // Let the system work on those tasks for a while. We're on a dedicated thread to manage app shutdown, so we + // literally just want to wait and not take up resources on this thread for now. Poll period chosen to allow short + // response times, but checking the tasks list is relatively expensive, and we don't want to waste CPU time we could + // be spending on finishing those tasks. + final TimeValue pollPeriod = TimeValue.timeValueMillis(500); + millisWaited += pollPeriod.millis(); + if (TimeValue.ZERO.equals(timeout) == false && millisWaited >= timeout.millis()) { + logger.warn( + format("timed out after waiting [%s] for [%d] " + taskName + " tasks to finish", timeout.toString(), tasksRemaining) + ); + return; + } + logger.debug(format("waiting for [%s] " + taskName + " tasks to finish, next poll in [%s]", tasksRemaining, pollPeriod)); + try { + Thread.sleep(pollPeriod.millis()); + } catch (InterruptedException ex) { + logger.warn( + format( + "interrupted while waiting [%s] for [%d] " + taskName + " tasks to finish", + timeout.toString(), + tasksRemaining + ) + ); + return; + } + } + } + } + + private void awaitSearchTasksComplete(TimeValue asyncSearchTimeout, TaskManager taskManager) { + awaitTasksComplete(asyncSearchTimeout, TransportSearchAction.NAME, taskManager); + } + + private void awaitReindexTasksComplete(TimeValue asyncReindexTimeout, TaskManager taskManager) { + awaitTasksComplete(asyncReindexTimeout, ReindexAction.NAME, taskManager); + } + +}