-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Log reindexing failures #112676
Log reindexing failures #112676
Conversation
A couple of things to consider while reviewing:
|
Pinging @elastic/es-distributed (Team:Distributed) |
Hi @ankikuma, I've created a changelog YAML for you. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a test I would create an IT
performing reindex on a specific node. And then wait for the task to appear in the tasks list. And then immediately stop the node. And check that the task still completes.
You would want to try it out without your shutdown wait code here and make sure you are indexing enough data that the test would (at least sometimes) fail without your shutdown wait you are adding here.
@@ -702,6 +704,52 @@ private void awaitSearchTasksComplete(TimeValue asyncSearchTimeout) { | |||
} | |||
} | |||
|
|||
private void awaitReindexTasksComplete(TimeValue asyncReindexTimeout) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we combine some code here? Something like:
private void awaitSearchTasksComplete(TimeValue asyncSearchTimeout) {
waitForTasks(asyncSearchTimeout, TransportSearchAction.NAME);
}
private void awaitReindexTasksComplete(TimeValue reindexTimeout) {
waitForTasks(reindexTimeout, ReindexAction.NAME);
}
private void waitForTasks(TimeValue timeout, String taskName) {
TaskManager taskManager = injector.getInstance(TransportService.class).getTaskManager();
long millisWaited = 0;
while (true) {
long searchTasksRemaining = taskManager.getTasks().values().stream().filter(task -> taskName.equals(task.getAction())).count();
if (searchTasksRemaining == 0) {
logger.debug("all search tasks complete");
return;
} else {
// Let the system work on those searches for a while. We're on a dedicated thread to manage app shutdown, so we
// literally just want to wait and not take up resources on this thread for now. Poll period chosen to allow short
// response times, but checking the tasks list is relatively expensive, and we don't want to waste CPU time we could
// be spending on finishing those searches.
final TimeValue pollPeriod = TimeValue.timeValueMillis(500);
millisWaited += pollPeriod.millis();
if (TimeValue.ZERO.equals(timeout) == false && millisWaited >= timeout.millis()) {
logger.warn(
format(
"timed out after waiting [%s] for [%d] " + taskName + " tasks to finish",
timeout.toString(),
searchTasksRemaining
)
);
return;
}
logger.debug(
format("waiting for [%s] " + taskName + " tasks to finish, next poll in [%s]", searchTasksRemaining, pollPeriod)
);
try {
Thread.sleep(pollPeriod.millis());
} catch (InterruptedException ex) {
logger.warn(
format(
"interrupted while waiting [%s] for [%d] " + taskName + " tasks to finish",
timeout.toString(),
searchTasksRemaining
)
);
return;
}
}
}
}
If it is difficult to get the reindex task to fail from restarting the node, you can always reach in and grab |
Thanks Tim, I will work on a test case. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, left a couple comments.
@@ -629,6 +630,7 @@ boolean isIncomplete() { | |||
|
|||
stopperRunner.accept("http-server-transport-stop", injector.getInstance(HttpServerTransport.class)::close); | |||
stopperRunner.accept("async-search-stop", () -> awaitSearchTasksComplete(maxTimeout)); | |||
stopperRunner.accept("reindex-stop", () -> awaitReindexTasksComplete(TimeValue.min(TimeValue.timeValueMinutes(1), maxTimeout))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us make the timeout configurable separately from the max-timeout. I think 1 min could be too long by default, I'd go with something like 10secs and we can increase where we need it. The problem is that users do rolling restarts and this could prevent those from completing as fast as before - and the longer it takes to restart the more operations may need to be replayed to re-establish sync'ed replicas.
@@ -629,6 +630,7 @@ boolean isIncomplete() { | |||
|
|||
stopperRunner.accept("http-server-transport-stop", injector.getInstance(HttpServerTransport.class)::close); | |||
stopperRunner.accept("async-search-stop", () -> awaitSearchTasksComplete(maxTimeout)); | |||
stopperRunner.accept("reindex-stop", () -> awaitReindexTasksComplete(TimeValue.min(TimeValue.timeValueMinutes(1), maxTimeout))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maxTimeout=ZERO
seems to mean infinite, I wonder if the TimeValue.min
here should respect that?
Refresh to latest
Refresh to latest
Added a test. Please take a look @henningandersen @Tim-Brooks whenever you get a chance. I set the default timeout for the reindexing wait to be 10s. Please let me know if you think I should shorten it. |
/** | ||
* Extracted out the logic from {@link Node#prepareForClose()} to facilitate testing. | ||
*/ | ||
public class ShutdownFenceService { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know I was the one who created this name initially 🙃, but I'm not sure it is the best. Maybe let's change it to ShutdownService
.
And maybe add a javadoc that describes it in a line or two similar to what you have on the prepareForShutdown
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, no problem. I will rename it to ShutdownPrepareService since we already have a PluginShutdownService and it might get confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly looks good to me. Just 1 comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, left a few comments.
assertHitCount(prepareSearch(INDEX).setSize(0).setTrackTotalHits(true), numDocs); | ||
} | ||
|
||
private void checkDestinationIndex(String dataNodeName, int numDocs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'd prefer to move this helper method after the test
method. Right now we have some helpers before, some after. We typically have the tests prior to the helper methods (at least when they are this large) to make it easy to spot the test methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Done.
} | ||
|
||
private void createReindexTaskAndShutdown(final String coordNodeName) throws Exception { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: looks like an extra newline here, can we remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still see it here?
...ndex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexNodeShutdownIT.java
Show resolved
Hide resolved
...ndex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexNodeShutdownIT.java
Show resolved
Hide resolved
private static TaskInfo findTask(String actionName, String nodeName) { | ||
ListTasksResponse tasks; | ||
long start = System.nanoTime(); | ||
do { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use assertBusy() instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
|
||
stopperRunner.accept("http-server-transport-stop", httpServerTransport::close); | ||
stopperRunner.accept("async-search-stop", () -> awaitSearchTasksComplete(maxTimeout, taskManager)); | ||
stopperRunner.accept("reindex-stop", () -> awaitReindexTasksComplete(reindexTimeout, taskManager)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the only change in this class? The refactor makes it a bit harder to reason about the change. We can keep it as is, but it might be beneficial to do the refactor separately for future PRs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The refactor was done for testing purpose as explained above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I understand that. Still, is the line added (plus the method called) the only change here or has other changes sneaked in?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see, I thought your question was about creating this new class. I only really added the awaitReindexTasksComplete() but in order to reuse the code being used by awaitSearchTasksComplete() I created a new API waitForTasks() to replace awaitSearchTasksComplete()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few comments before realizing the changes were not pushed yet.
} | ||
|
||
private void createReindexTaskAndShutdown(final String coordNodeName) throws Exception { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still see it here?
...ndex/src/internalClusterTest/java/org/elasticsearch/index/reindex/ReindexNodeShutdownIT.java
Show resolved
Hide resolved
I just pushed the changes. I wanted to wait to see if you would suggest a major revamp of the test :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Left a number of smaller comments, but no need for another round.
}, 10, TimeUnit.SECONDS)); | ||
} | ||
|
||
private static void findTask(String actionName, String nodeName) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'd call this waitForTask instead.
assertTrue("Number of documents in source and dest indexes does not match", waitUntil(() -> { | ||
final TotalHits totalHits = SearchResponseUtils.getTotalHits( | ||
client(dataNodeName).prepareSearch(DEST_INDEX).setSize(0).setTrackTotalHits(true) | ||
); | ||
return totalHits.relation() == TotalHits.Relation.EQUAL_TO && totalHits.value() == numDocs; | ||
}, 10, TimeUnit.SECONDS)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be something like following instead:
assertBusy(() -> assertHitCount(prepareSearch(DEST_INDEX).setSize(0).setTrackTotalHits(true), numDocs));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Done.
for (TaskInfo taskInfo : tasks.getTasks()) { | ||
// Skip tasks with a parent because those are children of the task we want | ||
assertFalse(taskInfo.parentTaskId().isSet()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this does not really wait. As a sidenote I am unsure if we need to wait and also whether skipping child tasks is important. But I think this should be:
for (TaskInfo taskInfo : tasks.getTasks()) { | |
// Skip tasks with a parent because those are children of the task we want | |
assertFalse(taskInfo.parentTaskId().isSet()); | |
} | |
for (TaskInfo taskInfo : tasks.getTasks()) { | |
// Skip tasks with a parent because those are children of the task we want | |
if(taskInfo.parentTaskId().isSet()) == false) { | |
return; | |
} | |
fail("not found - yet"); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. Thanks for catching this, I fixed it and tested it out as well (by holding off submission of the reindexing task).
@@ -19,7 +19,7 @@ public class ReindexMetrics { | |||
private final LongHistogram reindexTimeSecsHistogram; | |||
|
|||
public ReindexMetrics(MeterRegistry meterRegistry) { | |||
this(meterRegistry.registerLongHistogram(REINDEX_TIME_HISTOGRAM, "Time to reindex by search", "millis")); | |||
this(meterRegistry.registerLongHistogram(REINDEX_TIME_HISTOGRAM, "Time to reindex by search", "seconds")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I think I agree to the change, I'd prefer to do this in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I will remove it.
Settings settings; | ||
HttpServerTransport httpServerTransport; | ||
TerminationHandler terminationHandler; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make all fields as final and private as possible. I think these can be private final?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Thanks!
@@ -167,6 +156,12 @@ public class Node implements Closeable { | |||
Setting.Property.NodeScope | |||
); | |||
|
|||
public static final Setting<TimeValue> MAXIMUM_REINDEXING_TIMEOUT_SETTING = Setting.positiveTimeSetting( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we might as well move these two settings closer to their use, i.e., to ShutdownPrepareService
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure done.
} | ||
} | ||
|
||
private void waitForTasks(TimeValue timeout, String taskName, TaskManager taskManager) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we have differing naming for similar things in this class, I'd prefer to name this: awaitTasksComplete
, similar to the two other await
methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's a better name. Thanks.
Wait for reindexing tasks to finish during shutdown for an amount of time defined by settings. Also log the number of reindexing tasks still in flight after the wait.
Wait for reindexing tasks to finish during shutdown for an amount of time defined by settings. Also log the number of reindexing tasks still in flight after the wait.
Wait for reindexing tasks to finish during shutdown for an amount of time defined by settings. Also log the number of reindexing tasks still in flight after the wait.
Log number of reindex tasks in flight during a shutdown.
This PR aims to address ES-8850.
Also fixing a small labeling mistake for the reindex metrics.