Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log reindexing failures #112676

Merged
merged 26 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
dcd6d74
Log reindexing failures
ankikuma Sep 9, 2024
cfd0848
Add test
ankikuma Oct 8, 2024
ebcd40a
Address review comments
ankikuma Oct 8, 2024
c5100cb
Fix gradle errors
ankikuma Oct 8, 2024
58535f1
Add test
ankikuma Oct 14, 2024
4d9e12b
Merge remote-tracking branch 'upstream/main' into feature/ES8850
ankikuma Oct 14, 2024
66cecaf
Add test
ankikuma Oct 14, 2024
519940e
Merge remote-tracking branch 'upstream/main' into feature/ES8850
ankikuma Oct 14, 2024
c4c3207
Cleanup Node.java
ankikuma Oct 14, 2024
16ccf71
Commit test
ankikuma Oct 14, 2024
82904e9
Rewrite test
ankikuma Oct 16, 2024
43f312e
Merge remote-tracking branch 'upstream/main' into feature/ES8850
ankikuma Oct 16, 2024
33f24db
Modify test
ankikuma Oct 16, 2024
102c7ca
Fix test
ankikuma Oct 17, 2024
d4edfc4
Adjust timeout defaults
ankikuma Oct 18, 2024
17d9279
Cleanup
ankikuma Oct 18, 2024
d47324e
Merge remote-tracking branch 'upstream/main' into feature/ES8850
ankikuma Oct 18, 2024
f932ce1
Remove shutdown dependency from gradle
ankikuma Oct 21, 2024
e02f238
Remove taskManager.java
ankikuma Oct 21, 2024
9daddef
Refactor test
ankikuma Oct 21, 2024
5e28452
Merge remote-tracking branch 'upstream/main' into feature/ES8850
ankikuma Oct 22, 2024
8d75d0b
Rename ShutdownFenceService to ShutdownCloseService
ankikuma Oct 22, 2024
6ac0989
Address Review comments
ankikuma Oct 23, 2024
5f907b8
code cleanup
ankikuma Oct 23, 2024
a66eed1
Merge remote-tracking branch 'upstream/main' into feature/ES8850
ankikuma Oct 23, 2024
1d0f640
Review comments addressed
ankikuma Oct 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.reindex;

import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.node.ShutdownPrepareService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.reindex.ReindexPlugin;
import org.elasticsearch.search.SearchResponseUtils;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.TransportService;

import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.elasticsearch.node.Node.MAXIMUM_REINDEXING_TIMEOUT_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;

/**
* Test that a wait added during shutdown is necessary for a large reindexing task to complete.
* The test works as follows:
* 1. Start a large (reasonably long running) reindexing request on the coordinator-only node.
* 2. Check that the reindexing task appears on the coordinating node
* 3. With a 10s timeout value for MAXIMUM_REINDEXING_TIMEOUT_SETTING,
* wait for the reindexing task to complete before closing the node
* 4. Confirm that the reindexing task succeeds with the wait (it will fail without it)
*/
@ESIntegTestCase.ClusterScope(numDataNodes = 0, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST)
public class ReindexNodeShutdownIT extends ESIntegTestCase {

protected static final String INDEX = "reindex-shutdown-index";
protected static final String DEST_INDEX = "dest-index";

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(ReindexPlugin.class);
}

protected ReindexRequestBuilder reindex(String nodeName) {
return new ReindexRequestBuilder(internalCluster().client(nodeName));
}

public void testReindexWithShutdown() throws Exception {
final String masterNodeName = internalCluster().startMasterOnlyNode();
final String dataNodeName = internalCluster().startDataOnlyNode();

final Settings COORD_SETTINGS = Settings.builder()
.put(MAXIMUM_REINDEXING_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(10))
.build();
final String coordNodeName = internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);

ensureStableCluster(3);

int numDocs = 20000;
createIndex(numDocs);
createReindexTaskAndShutdown(coordNodeName);
checkDestinationIndex(dataNodeName, numDocs);
}

private void createIndex(int numDocs) {
// INDEX will be created on the dataNode
createIndex(INDEX);

logger.debug("setting up [{}] docs", numDocs);
indexRandom(
true,
false,
true,
IntStream.range(0, numDocs)
.mapToObj(i -> prepareIndex(INDEX).setId(String.valueOf(i)).setSource("n", i))
.collect(Collectors.toList())
);

// Checks that the all documents have been indexed and correctly counted
assertHitCount(prepareSearch(INDEX).setSize(0).setTrackTotalHits(true), numDocs);
}

private void createReindexTaskAndShutdown(final String coordNodeName) throws Exception {
AbstractBulkByScrollRequestBuilder<?, ?> builder = reindex(coordNodeName).source(INDEX).destination(DEST_INDEX);
AbstractBulkByScrollRequest<?> reindexRequest = builder.request();
ShutdownPrepareService shutdownPrepareService = internalCluster().getInstance(ShutdownPrepareService.class, coordNodeName);

TaskManager taskManager = internalCluster().getInstance(TransportService.class, coordNodeName).getTaskManager();

// Now execute the reindex action...
ActionListener<BulkByScrollResponse> reindexListener = new ActionListener<BulkByScrollResponse>() {
@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
assertNull(bulkByScrollResponse.getReasonCancelled());
logger.debug(bulkByScrollResponse.toString());
}

@Override
public void onFailure(Exception e) {
logger.debug("Encounterd " + e.toString());
ankikuma marked this conversation as resolved.
Show resolved Hide resolved
fail(e, "Encounterd " + e.toString());
}
};
internalCluster().client(coordNodeName).execute(ReindexAction.INSTANCE, reindexRequest, reindexListener);

// Check for reindex task to appear in the tasks list and Immediately stop coordinating node
findTask(ReindexAction.INSTANCE.name(), coordNodeName);
shutdownPrepareService.prepareForShutdown(taskManager);
henningandersen marked this conversation as resolved.
Show resolved Hide resolved
internalCluster().stopNode(coordNodeName);
}

// Make sure all documents from the source index have been reindexed into the destination index
private void checkDestinationIndex(String dataNodeName, int numDocs) {
assertTrue(indexExists(DEST_INDEX));
flushAndRefresh(DEST_INDEX);
assertTrue("Number of documents in source and dest indexes does not match", waitUntil(() -> {
final TotalHits totalHits = SearchResponseUtils.getTotalHits(
client(dataNodeName).prepareSearch(DEST_INDEX).setSize(0).setTrackTotalHits(true)
);
return totalHits.relation() == TotalHits.Relation.EQUAL_TO && totalHits.value() == numDocs;
}, 10, TimeUnit.SECONDS));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be something like following instead:

assertBusy(() -> assertHitCount(prepareSearch(DEST_INDEX).setSize(0).setTrackTotalHits(true), numDocs));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Done.

}

private static void findTask(String actionName, String nodeName) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd call this waitForTask instead.

assertBusy(() -> {
ListTasksResponse tasks = clusterAdmin().prepareListTasks(nodeName).setActions(actionName).setDetailed(true).get();
tasks.rethrowFailures("Find my task");
for (TaskInfo taskInfo : tasks.getTasks()) {
// Skip tasks with a parent because those are children of the task we want
assertFalse(taskInfo.parentTaskId().isSet());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this does not really wait. As a sidenote I am unsure if we need to wait and also whether skipping child tasks is important. But I think this should be:

Suggested change
for (TaskInfo taskInfo : tasks.getTasks()) {
// Skip tasks with a parent because those are children of the task we want
assertFalse(taskInfo.parentTaskId().isSet());
}
for (TaskInfo taskInfo : tasks.getTasks()) {
// Skip tasks with a parent because those are children of the task we want
if(taskInfo.parentTaskId().isSet()) == false) {
return;
}
fail("not found - yet");
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. Thanks for catching this, I fixed it and tested it out as well (by holding off submission of the reindexing task).

}, 10, TimeUnit.NANOSECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class ReindexMetrics {
private final LongHistogram reindexTimeSecsHistogram;

public ReindexMetrics(MeterRegistry meterRegistry) {
this(meterRegistry.registerLongHistogram(REINDEX_TIME_HISTOGRAM, "Time to reindex by search", "millis"));
this(meterRegistry.registerLongHistogram(REINDEX_TIME_HISTOGRAM, "Time to reindex by search", "seconds"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I think I agree to the change, I'd prefer to do this in a separate PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I will remove it.

}

private ReindexMetrics(LongHistogram reindexTimeSecsHistogram) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,8 @@ public void apply(Settings value, Settings current, Settings previous) {
Environment.PATH_SHARED_DATA_SETTING,
NodeEnvironment.NODE_ID_SEED_SETTING,
Node.INITIAL_STATE_TIMEOUT_SETTING,
Node.MAXIMUM_SHUTDOWN_TIMEOUT_SETTING,
Node.MAXIMUM_REINDEXING_TIMEOUT_SETTING,
DiscoveryModule.DISCOVERY_TYPE_SETTING,
DiscoveryModule.DISCOVERY_SEED_PROVIDERS_SETTING,
DiscoveryModule.ELECTION_STRATEGY_SETTING,
Expand Down
119 changes: 9 additions & 110 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.bootstrap.BootstrapCheck;
import org.elasticsearch.bootstrap.BootstrapContext;
import org.elasticsearch.client.internal.Client;
Expand Down Expand Up @@ -82,7 +78,6 @@
import org.elasticsearch.snapshots.SnapshotShardsService;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.tasks.TaskCancellationService;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.tasks.TaskResultsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterPortSettings;
Expand All @@ -106,18 +101,12 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import javax.net.ssl.SNIHostName;

import static org.elasticsearch.core.Strings.format;

/**
* A node represent a node within a cluster ({@code cluster.name}). The {@link #client()} can be used
* in order to use a {@link Client} to perform actions/operations against the cluster.
Expand Down Expand Up @@ -167,6 +156,12 @@ public class Node implements Closeable {
Setting.Property.NodeScope
);

public static final Setting<TimeValue> MAXIMUM_REINDEXING_TIMEOUT_SETTING = Setting.positiveTimeSetting(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might as well move these two settings closer to their use, i.e., to ShutdownPrepareService.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure done.

"node.maximum_reindexing_grace_period",
TimeValue.timeValueSeconds(10),
Setting.Property.NodeScope
);

private final Lifecycle lifecycle = new Lifecycle();

/**
Expand All @@ -187,6 +182,7 @@ public class Node implements Closeable {
private final LocalNodeFactory localNodeFactory;
private final NodeService nodeService;
private final TerminationHandler terminationHandler;

// for testing
final NamedWriteableRegistry namedWriteableRegistry;
final NamedXContentRegistry namedXContentRegistry;
Expand Down Expand Up @@ -606,105 +602,8 @@ public synchronized void close() throws IOException {
* logic should use Node Shutdown, see {@link org.elasticsearch.cluster.metadata.NodesShutdownMetadata}.
*/
public void prepareForClose() {
final var maxTimeout = MAXIMUM_SHUTDOWN_TIMEOUT_SETTING.get(this.settings());

record Stopper(String name, SubscribableListener<Void> listener) {
boolean isIncomplete() {
return listener().isDone() == false;
}
}

final var stoppers = new ArrayList<Stopper>();
final var allStoppersFuture = new PlainActionFuture<Void>();
try (var listeners = new RefCountingListener(allStoppersFuture)) {
final BiConsumer<String, Runnable> stopperRunner = (name, action) -> {
final var stopper = new Stopper(name, new SubscribableListener<>());
stoppers.add(stopper);
stopper.listener().addListener(listeners.acquire());
new Thread(() -> {
try {
action.run();
} catch (Exception ex) {
logger.warn("unexpected exception in shutdown task [" + stopper.name() + "]", ex);
} finally {
stopper.listener().onResponse(null);
}
}, stopper.name()).start();
};

stopperRunner.accept("http-server-transport-stop", injector.getInstance(HttpServerTransport.class)::close);
stopperRunner.accept("async-search-stop", () -> awaitSearchTasksComplete(maxTimeout));
if (terminationHandler != null) {
stopperRunner.accept("termination-handler-stop", terminationHandler::handleTermination);
}
}

final Supplier<String> incompleteStoppersDescriber = () -> stoppers.stream()
.filter(Stopper::isIncomplete)
.map(Stopper::name)
.collect(Collectors.joining(", ", "[", "]"));

try {
if (TimeValue.ZERO.equals(maxTimeout)) {
allStoppersFuture.get();
} else {
allStoppersFuture.get(maxTimeout.millis(), TimeUnit.MILLISECONDS);
}
} catch (ExecutionException e) {
assert false : e; // listeners are never completed exceptionally
logger.warn("failed during graceful shutdown tasks", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("interrupted while waiting for graceful shutdown tasks: " + incompleteStoppersDescriber.get(), e);
} catch (TimeoutException e) {
logger.warn("timed out while waiting for graceful shutdown tasks: " + incompleteStoppersDescriber.get());
}
}

private void awaitSearchTasksComplete(TimeValue asyncSearchTimeout) {
TaskManager taskManager = injector.getInstance(TransportService.class).getTaskManager();
long millisWaited = 0;
while (true) {
long searchTasksRemaining = taskManager.getTasks()
.values()
.stream()
.filter(task -> TransportSearchAction.TYPE.name().equals(task.getAction()))
.count();
if (searchTasksRemaining == 0) {
logger.debug("all search tasks complete");
return;
} else {
// Let the system work on those searches for a while. We're on a dedicated thread to manage app shutdown, so we
// literally just want to wait and not take up resources on this thread for now. Poll period chosen to allow short
// response times, but checking the tasks list is relatively expensive, and we don't want to waste CPU time we could
// be spending on finishing those searches.
final TimeValue pollPeriod = TimeValue.timeValueMillis(500);
millisWaited += pollPeriod.millis();
if (TimeValue.ZERO.equals(asyncSearchTimeout) == false && millisWaited >= asyncSearchTimeout.millis()) {
logger.warn(
format(
"timed out after waiting [%s] for [%d] search tasks to finish",
asyncSearchTimeout.toString(),
searchTasksRemaining
)
);
return;
}
logger.debug(format("waiting for [%s] search tasks to finish, next poll in [%s]", searchTasksRemaining, pollPeriod));
try {
Thread.sleep(pollPeriod.millis());
} catch (InterruptedException ex) {
logger.warn(
format(
"interrupted while waiting [%s] for [%d] search tasks to finish",
asyncSearchTimeout.toString(),
searchTasksRemaining
)
);
return;
}
}
}
injector.getInstance(ShutdownPrepareService.class)
.prepareForShutdown(injector.getInstance(TransportService.class).getTaskManager());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,8 @@ private void construct(
telemetryProvider.getTracer()
);

final ShutdownPrepareService shutdownPrepareService = new ShutdownPrepareService(settings, httpServerTransport, terminationHandler);

modules.add(
loadPersistentTasksService(
settingsModule,
Expand Down Expand Up @@ -1200,6 +1202,7 @@ private void construct(
b.bind(CompatibilityVersions.class).toInstance(compatibilityVersions);
b.bind(DataStreamAutoShardingService.class).toInstance(dataStreamAutoShardingService);
b.bind(FailureStoreMetrics.class).toInstance(failureStoreMetrics);
b.bind(ShutdownPrepareService.class).toInstance(shutdownPrepareService);
});

if (ReadinessService.enabled(environment)) {
Expand Down
Loading