Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log reindexing failures #112676

Merged
merged 26 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
dcd6d74
Log reindexing failures
ankikuma Sep 9, 2024
cfd0848
Add test
ankikuma Oct 8, 2024
ebcd40a
Address review comments
ankikuma Oct 8, 2024
c5100cb
Fix gradle errors
ankikuma Oct 8, 2024
58535f1
Add test
ankikuma Oct 14, 2024
4d9e12b
Merge remote-tracking branch 'upstream/main' into feature/ES8850
ankikuma Oct 14, 2024
66cecaf
Add test
ankikuma Oct 14, 2024
519940e
Merge remote-tracking branch 'upstream/main' into feature/ES8850
ankikuma Oct 14, 2024
c4c3207
Cleanup Node.java
ankikuma Oct 14, 2024
16ccf71
Commit test
ankikuma Oct 14, 2024
82904e9
Rewrite test
ankikuma Oct 16, 2024
43f312e
Merge remote-tracking branch 'upstream/main' into feature/ES8850
ankikuma Oct 16, 2024
33f24db
Modify test
ankikuma Oct 16, 2024
102c7ca
Fix test
ankikuma Oct 17, 2024
d4edfc4
Adjust timeout defaults
ankikuma Oct 18, 2024
17d9279
Cleanup
ankikuma Oct 18, 2024
d47324e
Merge remote-tracking branch 'upstream/main' into feature/ES8850
ankikuma Oct 18, 2024
f932ce1
Remove shutdown dependency from gradle
ankikuma Oct 21, 2024
e02f238
Remove taskManager.java
ankikuma Oct 21, 2024
9daddef
Refactor test
ankikuma Oct 21, 2024
5e28452
Merge remote-tracking branch 'upstream/main' into feature/ES8850
ankikuma Oct 22, 2024
8d75d0b
Rename ShutdownFenceService to ShutdownCloseService
ankikuma Oct 22, 2024
6ac0989
Address Review comments
ankikuma Oct 23, 2024
5f907b8
code cleanup
ankikuma Oct 23, 2024
a66eed1
Merge remote-tracking branch 'upstream/main' into feature/ES8850
ankikuma Oct 23, 2024
1d0f640
Review comments addressed
ankikuma Oct 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.reindex;

import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.node.ShutdownPrepareService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.reindex.ReindexPlugin;
import org.elasticsearch.search.SearchResponseUtils;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.TransportService;

import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.elasticsearch.node.Node.MAXIMUM_REINDEXING_TIMEOUT_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;

/**
* Test that a wait added during shutdown is necessary for a large reindexing task to complete.
* The test works as follows:
* 1. Start a large reindexing request on the coordinator-only node.
* 2. Check that the reindexing task appears on the coordinating node
* 3. With a 5s timeout value for MAXIMUM_REINDEXING_TIMEOUT_SETTING,
* wait for the reindexing task to complete before closing the node
* 4. Confirm that the reindexing task succeeds with the wait (it will fail without it)
*/
@ESIntegTestCase.ClusterScope(numDataNodes = 0, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST)
public class ReindexNodeShutdownIT extends ESIntegTestCase {

protected static final String INDEX = "reindex-shutdown-index";
protected static final String DEST_INDEX = "dest-index";

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(ReindexPlugin.class);
}

protected ReindexRequestBuilder reindex(String nodeName) {
return new ReindexRequestBuilder(internalCluster().client(nodeName));
}

private void createIndex(int numDocs) {
// INDEX will be created on the dataNode
createIndex(INDEX);

logger.debug("setting up [{}] docs", numDocs);
indexRandom(
true,
false,
true,
IntStream.range(0, numDocs)
.mapToObj(i -> prepareIndex(INDEX).setId(String.valueOf(i)).setSource("n", i))
.collect(Collectors.toList())
);

// Checks that the all documents have been indexed and correctly counted
assertHitCount(prepareSearch(INDEX).setSize(0).setTrackTotalHits(true), numDocs);
}

private void checkDestinationIndex(String dataNodeName, int numDocs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd prefer to move this helper method after the test method. Right now we have some helpers before, some after. We typically have the tests prior to the helper methods (at least when they are this large) to make it easy to spot the test methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Done.

assertTrue(indexExists(DEST_INDEX));
flushAndRefresh(DEST_INDEX);
assertTrue("Number of documents in source and dest indexes does not match", waitUntil(() -> {
final TotalHits totalHits = SearchResponseUtils.getTotalHits(
client(dataNodeName).prepareSearch(DEST_INDEX).setSize(0).setTrackTotalHits(true)
);
return totalHits.relation() == TotalHits.Relation.EQUAL_TO && totalHits.value() == numDocs;
}, 10, TimeUnit.SECONDS));
}

public void testReindexWithShutdown() throws Exception {
final String masterNodeName = internalCluster().startMasterOnlyNode();
final String dataNodeName = internalCluster().startDataOnlyNode();

final Settings COORD_SETTINGS = Settings.builder()
.put(MAXIMUM_REINDEXING_TIMEOUT_SETTING.getKey(), TimeValue.timeValueSeconds(10))
.build();
final String coordNodeName = internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);

ensureStableCluster(3);

int numDocs = 20000;
createIndex(numDocs);
createReindexTaskAndShutdown(coordNodeName);
checkDestinationIndex(dataNodeName, numDocs);
}

private void createReindexTaskAndShutdown(final String coordNodeName) throws Exception {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: looks like an extra newline here, can we remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still see it here?

AbstractBulkByScrollRequestBuilder<?, ?> builder = reindex(coordNodeName).source(INDEX).destination(DEST_INDEX);
AbstractBulkByScrollRequest<?> reindexRequest = builder.request();
ShutdownPrepareService shutdownPrepareService = internalCluster().getInstance(ShutdownPrepareService.class, coordNodeName);

TaskManager taskManager = internalCluster().getInstance(TransportService.class, coordNodeName).getTaskManager();

// Now execute the reindex action...
ActionListener<BulkByScrollResponse> reindexListener = new ActionListener<BulkByScrollResponse>() {
@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
assertNull(bulkByScrollResponse.getReasonCancelled());
logger.debug(bulkByScrollResponse.toString());
}

@Override
public void onFailure(Exception e) {
logger.debug("Encounterd " + e.toString());
ankikuma marked this conversation as resolved.
Show resolved Hide resolved
}
};
internalCluster().client(coordNodeName).execute(ReindexAction.INSTANCE, reindexRequest, reindexListener);

// Check for reindex task to appear in the tasks list and Immediately stop coordinating node
TaskInfo mainTask = findTask(ReindexAction.INSTANCE.name(), coordNodeName);
shutdownPrepareService.prepareForShutdown(taskManager);
henningandersen marked this conversation as resolved.
Show resolved Hide resolved
internalCluster().stopNode(coordNodeName);
}

private static TaskInfo findTask(String actionName, String nodeName) {
ListTasksResponse tasks;
long start = System.nanoTime();
do {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use assertBusy() instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

tasks = clusterAdmin().prepareListTasks(nodeName).setActions(actionName).setDetailed(true).get();
tasks.rethrowFailures("Find my task");
for (TaskInfo taskInfo : tasks.getTasks()) {
// Skip tasks with a parent because those are children of the task we want
if (false == taskInfo.parentTaskId().isSet()) {
return taskInfo;
}
}
} while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10));
throw new AssertionError("Couldn't find task after waiting tasks=" + tasks.getTasks());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class ReindexMetrics {
private final LongHistogram reindexTimeSecsHistogram;

public ReindexMetrics(MeterRegistry meterRegistry) {
this(meterRegistry.registerLongHistogram(REINDEX_TIME_HISTOGRAM, "Time to reindex by search", "millis"));
this(meterRegistry.registerLongHistogram(REINDEX_TIME_HISTOGRAM, "Time to reindex by search", "seconds"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I think I agree to the change, I'd prefer to do this in a separate PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I will remove it.

}

private ReindexMetrics(LongHistogram reindexTimeSecsHistogram) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,8 @@ public void apply(Settings value, Settings current, Settings previous) {
Environment.PATH_SHARED_DATA_SETTING,
NodeEnvironment.NODE_ID_SEED_SETTING,
Node.INITIAL_STATE_TIMEOUT_SETTING,
Node.MAXIMUM_SHUTDOWN_TIMEOUT_SETTING,
Node.MAXIMUM_REINDEXING_TIMEOUT_SETTING,
DiscoveryModule.DISCOVERY_TYPE_SETTING,
DiscoveryModule.DISCOVERY_SEED_PROVIDERS_SETTING,
DiscoveryModule.ELECTION_STRATEGY_SETTING,
Expand Down
119 changes: 9 additions & 110 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.bootstrap.BootstrapCheck;
import org.elasticsearch.bootstrap.BootstrapContext;
import org.elasticsearch.client.internal.Client;
Expand Down Expand Up @@ -82,7 +78,6 @@
import org.elasticsearch.snapshots.SnapshotShardsService;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.tasks.TaskCancellationService;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.tasks.TaskResultsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterPortSettings;
Expand All @@ -106,18 +101,12 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import javax.net.ssl.SNIHostName;

import static org.elasticsearch.core.Strings.format;

/**
* A node represent a node within a cluster ({@code cluster.name}). The {@link #client()} can be used
* in order to use a {@link Client} to perform actions/operations against the cluster.
Expand Down Expand Up @@ -167,6 +156,12 @@ public class Node implements Closeable {
Setting.Property.NodeScope
);

public static final Setting<TimeValue> MAXIMUM_REINDEXING_TIMEOUT_SETTING = Setting.positiveTimeSetting(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might as well move these two settings closer to their use, i.e., to ShutdownPrepareService.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure done.

"node.maximum_reindexing_grace_period",
TimeValue.timeValueSeconds(10),
Setting.Property.NodeScope
);

private final Lifecycle lifecycle = new Lifecycle();

/**
Expand All @@ -187,6 +182,7 @@ public class Node implements Closeable {
private final LocalNodeFactory localNodeFactory;
private final NodeService nodeService;
private final TerminationHandler terminationHandler;

// for testing
final NamedWriteableRegistry namedWriteableRegistry;
final NamedXContentRegistry namedXContentRegistry;
Expand Down Expand Up @@ -606,105 +602,8 @@ public synchronized void close() throws IOException {
* logic should use Node Shutdown, see {@link org.elasticsearch.cluster.metadata.NodesShutdownMetadata}.
*/
public void prepareForClose() {
final var maxTimeout = MAXIMUM_SHUTDOWN_TIMEOUT_SETTING.get(this.settings());

record Stopper(String name, SubscribableListener<Void> listener) {
boolean isIncomplete() {
return listener().isDone() == false;
}
}

final var stoppers = new ArrayList<Stopper>();
final var allStoppersFuture = new PlainActionFuture<Void>();
try (var listeners = new RefCountingListener(allStoppersFuture)) {
final BiConsumer<String, Runnable> stopperRunner = (name, action) -> {
final var stopper = new Stopper(name, new SubscribableListener<>());
stoppers.add(stopper);
stopper.listener().addListener(listeners.acquire());
new Thread(() -> {
try {
action.run();
} catch (Exception ex) {
logger.warn("unexpected exception in shutdown task [" + stopper.name() + "]", ex);
} finally {
stopper.listener().onResponse(null);
}
}, stopper.name()).start();
};

stopperRunner.accept("http-server-transport-stop", injector.getInstance(HttpServerTransport.class)::close);
stopperRunner.accept("async-search-stop", () -> awaitSearchTasksComplete(maxTimeout));
if (terminationHandler != null) {
stopperRunner.accept("termination-handler-stop", terminationHandler::handleTermination);
}
}

final Supplier<String> incompleteStoppersDescriber = () -> stoppers.stream()
.filter(Stopper::isIncomplete)
.map(Stopper::name)
.collect(Collectors.joining(", ", "[", "]"));

try {
if (TimeValue.ZERO.equals(maxTimeout)) {
allStoppersFuture.get();
} else {
allStoppersFuture.get(maxTimeout.millis(), TimeUnit.MILLISECONDS);
}
} catch (ExecutionException e) {
assert false : e; // listeners are never completed exceptionally
logger.warn("failed during graceful shutdown tasks", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("interrupted while waiting for graceful shutdown tasks: " + incompleteStoppersDescriber.get(), e);
} catch (TimeoutException e) {
logger.warn("timed out while waiting for graceful shutdown tasks: " + incompleteStoppersDescriber.get());
}
}

private void awaitSearchTasksComplete(TimeValue asyncSearchTimeout) {
TaskManager taskManager = injector.getInstance(TransportService.class).getTaskManager();
long millisWaited = 0;
while (true) {
long searchTasksRemaining = taskManager.getTasks()
.values()
.stream()
.filter(task -> TransportSearchAction.TYPE.name().equals(task.getAction()))
.count();
if (searchTasksRemaining == 0) {
logger.debug("all search tasks complete");
return;
} else {
// Let the system work on those searches for a while. We're on a dedicated thread to manage app shutdown, so we
// literally just want to wait and not take up resources on this thread for now. Poll period chosen to allow short
// response times, but checking the tasks list is relatively expensive, and we don't want to waste CPU time we could
// be spending on finishing those searches.
final TimeValue pollPeriod = TimeValue.timeValueMillis(500);
millisWaited += pollPeriod.millis();
if (TimeValue.ZERO.equals(asyncSearchTimeout) == false && millisWaited >= asyncSearchTimeout.millis()) {
logger.warn(
format(
"timed out after waiting [%s] for [%d] search tasks to finish",
asyncSearchTimeout.toString(),
searchTasksRemaining
)
);
return;
}
logger.debug(format("waiting for [%s] search tasks to finish, next poll in [%s]", searchTasksRemaining, pollPeriod));
try {
Thread.sleep(pollPeriod.millis());
} catch (InterruptedException ex) {
logger.warn(
format(
"interrupted while waiting [%s] for [%d] search tasks to finish",
asyncSearchTimeout.toString(),
searchTasksRemaining
)
);
return;
}
}
}
injector.getInstance(ShutdownPrepareService.class)
.prepareForShutdown(injector.getInstance(TransportService.class).getTaskManager());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,8 @@ private void construct(
telemetryProvider.getTracer()
);

final ShutdownPrepareService shutdownPrepareService = new ShutdownPrepareService(settings, httpServerTransport, terminationHandler);

modules.add(
loadPersistentTasksService(
settingsModule,
Expand Down Expand Up @@ -1200,6 +1202,7 @@ private void construct(
b.bind(CompatibilityVersions.class).toInstance(compatibilityVersions);
b.bind(DataStreamAutoShardingService.class).toInstance(dataStreamAutoShardingService);
b.bind(FailureStoreMetrics.class).toInstance(failureStoreMetrics);
b.bind(ShutdownPrepareService.class).toInstance(shutdownPrepareService);
});

if (ReadinessService.enabled(environment)) {
Expand Down
Loading