Skip to content

Commit

Permalink
Log reindexing failures (elastic#112676)
Browse files Browse the repository at this point in the history
Wait for reindexing tasks to finish during shutdown for an amount of time defined by settings. Also log the number of reindexing tasks still in flight after the wait.
  • Loading branch information
ankikuma authored and jfreden committed Nov 4, 2024
1 parent ecce69c commit e820e9f
Show file tree
Hide file tree
Showing 5 changed files with 332 additions and 116 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.reindex;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.node.ShutdownPrepareService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.reindex.ReindexPlugin;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.TransportService;

import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.elasticsearch.node.ShutdownPrepareService.MAXIMUM_REINDEXING_TIMEOUT_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;

/**
* Test that a wait added during shutdown is necessary for a large reindexing task to complete.
* The test works as follows:
* 1. Start a large (reasonably long running) reindexing request on the coordinator-only node.
* 2. Check that the reindexing task appears on the coordinating node
* 3. With a 10s timeout value for MAXIMUM_REINDEXING_TIMEOUT_SETTING,
* wait for the reindexing task to complete before closing the node
* 4. Confirm that the reindexing task succeeds with the wait (it will fail without it)
*/
@ESIntegTestCase.ClusterScope(numDataNodes = 0, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST)
public class ReindexNodeShutdownIT extends ESIntegTestCase {

protected static final String INDEX = "reindex-shutdown-index";
protected static final String DEST_INDEX = "dest-index";

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(ReindexPlugin.class);
}

protected ReindexRequestBuilder reindex(String nodeName) {
return new ReindexRequestBuilder(internalCluster().client(nodeName));
}

public void testReindexWithShutdown() throws Exception {
final String masterNodeName = internalCluster().startMasterOnlyNode();
final String dataNodeName = internalCluster().startDataOnlyNode();

final Settings COORD_SETTINGS = Settings.builder()
.put(MAXIMUM_REINDEXING_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(10))
.build();
final String coordNodeName = internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);

ensureStableCluster(3);

int numDocs = 20000;
createIndex(numDocs);
createReindexTaskAndShutdown(coordNodeName);
checkDestinationIndex(dataNodeName, numDocs);
}

private void createIndex(int numDocs) {
// INDEX will be created on the dataNode
createIndex(INDEX);

logger.debug("setting up [{}] docs", numDocs);
indexRandom(
true,
false,
true,
IntStream.range(0, numDocs)
.mapToObj(i -> prepareIndex(INDEX).setId(String.valueOf(i)).setSource("n", i))
.collect(Collectors.toList())
);

// Checks that the all documents have been indexed and correctly counted
assertHitCount(prepareSearch(INDEX).setSize(0).setTrackTotalHits(true), numDocs);
}

private void createReindexTaskAndShutdown(final String coordNodeName) throws Exception {
AbstractBulkByScrollRequestBuilder<?, ?> builder = reindex(coordNodeName).source(INDEX).destination(DEST_INDEX);
AbstractBulkByScrollRequest<?> reindexRequest = builder.request();
ShutdownPrepareService shutdownPrepareService = internalCluster().getInstance(ShutdownPrepareService.class, coordNodeName);

TaskManager taskManager = internalCluster().getInstance(TransportService.class, coordNodeName).getTaskManager();

// Now execute the reindex action...
ActionListener<BulkByScrollResponse> reindexListener = new ActionListener<BulkByScrollResponse>() {
@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
assertNull(bulkByScrollResponse.getReasonCancelled());
logger.debug(bulkByScrollResponse.toString());
}

@Override
public void onFailure(Exception e) {
logger.debug("Encounterd " + e.toString());
fail(e, "Encounterd " + e.toString());
}
};
internalCluster().client(coordNodeName).execute(ReindexAction.INSTANCE, reindexRequest, reindexListener);

// Check for reindex task to appear in the tasks list and Immediately stop coordinating node
waitForTask(ReindexAction.INSTANCE.name(), coordNodeName);
shutdownPrepareService.prepareForShutdown(taskManager);
internalCluster().stopNode(coordNodeName);
}

// Make sure all documents from the source index have been reindexed into the destination index
private void checkDestinationIndex(String dataNodeName, int numDocs) throws Exception {
assertTrue(indexExists(DEST_INDEX));
flushAndRefresh(DEST_INDEX);
assertBusy(() -> { assertHitCount(prepareSearch(DEST_INDEX).setSize(0).setTrackTotalHits(true), numDocs); });
}

private static void waitForTask(String actionName, String nodeName) throws Exception {
assertBusy(() -> {
ListTasksResponse tasks = clusterAdmin().prepareListTasks(nodeName).setActions(actionName).setDetailed(true).get();
tasks.rethrowFailures("Find my task");
for (TaskInfo taskInfo : tasks.getTasks()) {
// Skip tasks with a parent because those are children of the task we want
if (taskInfo.parentTaskId().isSet() == false) return;
}
fail("Couldn't find task after waiting, tasks=" + tasks.getTasks());
}, 10, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import org.elasticsearch.monitor.process.ProcessService;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeRoleSettings;
import org.elasticsearch.node.ShutdownPrepareService;
import org.elasticsearch.persistent.PersistentTasksClusterService;
import org.elasticsearch.persistent.decider.EnableAssignmentDecider;
import org.elasticsearch.plugins.PluginsService;
Expand Down Expand Up @@ -456,6 +457,8 @@ public void apply(Settings value, Settings current, Settings previous) {
Environment.PATH_SHARED_DATA_SETTING,
NodeEnvironment.NODE_ID_SEED_SETTING,
Node.INITIAL_STATE_TIMEOUT_SETTING,
ShutdownPrepareService.MAXIMUM_SHUTDOWN_TIMEOUT_SETTING,
ShutdownPrepareService.MAXIMUM_REINDEXING_TIMEOUT_SETTING,
DiscoveryModule.DISCOVERY_TYPE_SETTING,
DiscoveryModule.DISCOVERY_SEED_PROVIDERS_SETTING,
DiscoveryModule.ELECTION_STRATEGY_SETTING,
Expand Down
119 changes: 3 additions & 116 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.bootstrap.BootstrapCheck;
import org.elasticsearch.bootstrap.BootstrapContext;
import org.elasticsearch.client.internal.Client;
Expand Down Expand Up @@ -82,7 +78,6 @@
import org.elasticsearch.snapshots.SnapshotShardsService;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.tasks.TaskCancellationService;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.tasks.TaskResultsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterPortSettings;
Expand All @@ -106,18 +101,12 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import javax.net.ssl.SNIHostName;

import static org.elasticsearch.core.Strings.format;

/**
* A node represent a node within a cluster ({@code cluster.name}). The {@link #client()} can be used
* in order to use a {@link Client} to perform actions/operations against the cluster.
Expand Down Expand Up @@ -161,12 +150,6 @@ public class Node implements Closeable {
Property.NodeScope
);

public static final Setting<TimeValue> MAXIMUM_SHUTDOWN_TIMEOUT_SETTING = Setting.positiveTimeSetting(
"node.maximum_shutdown_grace_period",
TimeValue.ZERO,
Setting.Property.NodeScope
);

private final Lifecycle lifecycle = new Lifecycle();

/**
Expand All @@ -187,6 +170,7 @@ public class Node implements Closeable {
private final LocalNodeFactory localNodeFactory;
private final NodeService nodeService;
private final TerminationHandler terminationHandler;

// for testing
final NamedWriteableRegistry namedWriteableRegistry;
final NamedXContentRegistry namedXContentRegistry;
Expand Down Expand Up @@ -606,105 +590,8 @@ public synchronized void close() throws IOException {
* logic should use Node Shutdown, see {@link org.elasticsearch.cluster.metadata.NodesShutdownMetadata}.
*/
public void prepareForClose() {
final var maxTimeout = MAXIMUM_SHUTDOWN_TIMEOUT_SETTING.get(this.settings());

record Stopper(String name, SubscribableListener<Void> listener) {
boolean isIncomplete() {
return listener().isDone() == false;
}
}

final var stoppers = new ArrayList<Stopper>();
final var allStoppersFuture = new PlainActionFuture<Void>();
try (var listeners = new RefCountingListener(allStoppersFuture)) {
final BiConsumer<String, Runnable> stopperRunner = (name, action) -> {
final var stopper = new Stopper(name, new SubscribableListener<>());
stoppers.add(stopper);
stopper.listener().addListener(listeners.acquire());
new Thread(() -> {
try {
action.run();
} catch (Exception ex) {
logger.warn("unexpected exception in shutdown task [" + stopper.name() + "]", ex);
} finally {
stopper.listener().onResponse(null);
}
}, stopper.name()).start();
};

stopperRunner.accept("http-server-transport-stop", injector.getInstance(HttpServerTransport.class)::close);
stopperRunner.accept("async-search-stop", () -> awaitSearchTasksComplete(maxTimeout));
if (terminationHandler != null) {
stopperRunner.accept("termination-handler-stop", terminationHandler::handleTermination);
}
}

final Supplier<String> incompleteStoppersDescriber = () -> stoppers.stream()
.filter(Stopper::isIncomplete)
.map(Stopper::name)
.collect(Collectors.joining(", ", "[", "]"));

try {
if (TimeValue.ZERO.equals(maxTimeout)) {
allStoppersFuture.get();
} else {
allStoppersFuture.get(maxTimeout.millis(), TimeUnit.MILLISECONDS);
}
} catch (ExecutionException e) {
assert false : e; // listeners are never completed exceptionally
logger.warn("failed during graceful shutdown tasks", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("interrupted while waiting for graceful shutdown tasks: " + incompleteStoppersDescriber.get(), e);
} catch (TimeoutException e) {
logger.warn("timed out while waiting for graceful shutdown tasks: " + incompleteStoppersDescriber.get());
}
}

private void awaitSearchTasksComplete(TimeValue asyncSearchTimeout) {
TaskManager taskManager = injector.getInstance(TransportService.class).getTaskManager();
long millisWaited = 0;
while (true) {
long searchTasksRemaining = taskManager.getTasks()
.values()
.stream()
.filter(task -> TransportSearchAction.TYPE.name().equals(task.getAction()))
.count();
if (searchTasksRemaining == 0) {
logger.debug("all search tasks complete");
return;
} else {
// Let the system work on those searches for a while. We're on a dedicated thread to manage app shutdown, so we
// literally just want to wait and not take up resources on this thread for now. Poll period chosen to allow short
// response times, but checking the tasks list is relatively expensive, and we don't want to waste CPU time we could
// be spending on finishing those searches.
final TimeValue pollPeriod = TimeValue.timeValueMillis(500);
millisWaited += pollPeriod.millis();
if (TimeValue.ZERO.equals(asyncSearchTimeout) == false && millisWaited >= asyncSearchTimeout.millis()) {
logger.warn(
format(
"timed out after waiting [%s] for [%d] search tasks to finish",
asyncSearchTimeout.toString(),
searchTasksRemaining
)
);
return;
}
logger.debug(format("waiting for [%s] search tasks to finish, next poll in [%s]", searchTasksRemaining, pollPeriod));
try {
Thread.sleep(pollPeriod.millis());
} catch (InterruptedException ex) {
logger.warn(
format(
"interrupted while waiting [%s] for [%d] search tasks to finish",
asyncSearchTimeout.toString(),
searchTasksRemaining
)
);
return;
}
}
}
injector.getInstance(ShutdownPrepareService.class)
.prepareForShutdown(injector.getInstance(TransportService.class).getTaskManager());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,8 @@ private void construct(
telemetryProvider.getTracer()
);

final ShutdownPrepareService shutdownPrepareService = new ShutdownPrepareService(settings, httpServerTransport, terminationHandler);

modules.add(
loadPersistentTasksService(
settingsModule,
Expand Down Expand Up @@ -1200,6 +1202,7 @@ private void construct(
b.bind(CompatibilityVersions.class).toInstance(compatibilityVersions);
b.bind(DataStreamAutoShardingService.class).toInstance(dataStreamAutoShardingService);
b.bind(FailureStoreMetrics.class).toInstance(failureStoreMetrics);
b.bind(ShutdownPrepareService.class).toInstance(shutdownPrepareService);
});

if (ReadinessService.enabled(environment)) {
Expand Down
Loading

0 comments on commit e820e9f

Please sign in to comment.