-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Block bulk request's index metadata retrieval until cluster state is recovered #46085
Conversation
If a bulk request hits a node that has not recovered the CS yet, index templates (and their ingest pipelines) are not loaded yet. The actual bulk request will wait/retry until the state is recovered, but the resolution of index templates will yield nothing, which means `default-pipeline` processors will be omitted and this can result in incorrect documents being indexed. This commit adds a blocking/retry mechanism so that index metadata resolution waits until the CS has been recovered.
Pinging @elastic/es-distributed |
if (startTime == -1) { | ||
startTime = relativeTime(); | ||
} | ||
ClusterBlockException blockException = clusterService.state().blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is METADATA_READ
the correct block to look for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that ClusterBlockLevel.WRITE
is more appropriate, since writing data is what we're trying to do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approach looks good. I left a few initial comments on my way out the door.
if (startTime == -1) { | ||
startTime = relativeTime(); | ||
} | ||
ClusterBlockException blockException = clusterService.state().blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that ClusterBlockLevel.WRITE
is more appropriate, since writing data is what we're trying to do.
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
Outdated
Show resolved
Hide resolved
|
||
// so the cluster state can be recovered | ||
internalCluster() | ||
.startNode(Settings.builder().put(GatewayService.RECOVER_AFTER_NODES_SETTING.getKey(), "1")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can just be .startNode()
- the other node is the master, so this node doesn't do any state recovery.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I thought so too, but if I didn't reset the recover_after, the test would timeout because it never recovered. I tried to dig into why that was happening but couldn't figure it out... resetting recover_after seemed to be the only fix I could find :/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried applying the following patch on top of 676e941 and ran >100 iterations of the test with no failures:
diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java
index f406ccc965c..7ff9832e81a 100644
--- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java
+++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java
@@ -249,9 +249,7 @@ public class IngestRestartIT extends ESIntegTestCase {
}
});
- // so the cluster state can be recovered
- internalCluster()
- .startNode(Settings.builder().put(GatewayService.RECOVER_AFTER_NODES_SETTING.getKey(), "1"));
+ internalCluster().startNode();
ensureYellow("index");
assertTrue(latch.await(5, TimeUnit.SECONDS));
Maybe it was a different change. If this still fails for you then I'd like to investigate further, maybe on Zoom?
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java
Outdated
Show resolved
Hide resolved
Thanks @DaveCTurner! Addressed review comments, the predicate definitely made it cleaner. I think everything is constructed correctly, but the global block methods/semantics confuse me a little so extra eyeballs there might be warranted :) I also moved the start time into the ctor because it was simpler, and should probably be run right after instantiation anyway. Edit: seems it's breaking some tests. Looking... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like you're bitten by an excessively fake mock :(
I left a few more small points.
|
||
// so the cluster state can be recovered | ||
internalCluster() | ||
.startNode(Settings.builder().put(GatewayService.RECOVER_AFTER_NODES_SETTING.getKey(), "1")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried applying the following patch on top of 676e941 and ran >100 iterations of the test with no failures:
diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java
index f406ccc965c..7ff9832e81a 100644
--- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java
+++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java
@@ -249,9 +249,7 @@ public class IngestRestartIT extends ESIntegTestCase {
}
});
- // so the cluster state can be recovered
- internalCluster()
- .startNode(Settings.builder().put(GatewayService.RECOVER_AFTER_NODES_SETTING.getKey(), "1"));
+ internalCluster().startNode();
ensureYellow("index");
assertTrue(latch.await(5, TimeUnit.SECONDS));
Maybe it was a different change. If this still fails for you then I'd like to investigate further, maybe on Zoom?
listener.onFailure(blockException); | ||
} | ||
}, newState -> newState.blocks().global(ClusterBlockLevel.WRITE).isEmpty()); | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think if ... else
is a bit clearer than if ... return
:)
recoveredObserver.waitForNextChange(new ClusterStateObserver.Listener() { | ||
@Override | ||
public void onNewClusterState(ClusterState newState) { | ||
// predicate passed, begin preparing for the bulk |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than a comment, we could assert newState.blocks().global(ClusterBlockLevel.WRITE).isEmpty()
.
* A runnable that will ensure the cluster state has been recovered enough to | ||
* read index metadata and templates/pipelines. Will retry up to the bulk's timeout | ||
*/ | ||
private final class BulkExecutor extends ActionRunnable<BulkResponse> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this class now that we aren't doing any retries ourself? I get that it wraps up the three parameters and the start time but 4 parameters to another method isn't so bad.
|
||
@Override | ||
protected void doRun() { | ||
ClusterState currentState = clusterService.state(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically I think we should get the state from recoveredObserver.setAndGetObservedState()
to keep the state we observe in sync with the state that the ClusterStateObserver
is observing. Alternatively, we could avoid constructing the ClusterStateObserver
until we discover we need it, and give it the state we got from the cluster service - there's no need for it to be tracked in a field.
@Override | ||
public void onNewClusterState(ClusterState newState) { | ||
// predicate passed, begin preparing for the bulk | ||
prepForBulk(newState); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is now sometimes on the cluster applier thread, which is probably not what we want. We should add this to the top of prepForBulk
, and then make sure to call prepForBulk
on the WRITE
threadpool here.
assert ClusterApplierService.assertNotClusterStateUpdateThread("TransportBulkAction#prepForBulk");
Urgh, this PR has languished. I just don't have time to dig in and solve these issues I'm afraid, given my relative lack of experience in this part of the code. When I started to fix the tests at the time, it became clear that the code needed a bit more adjusting than just fixing fragile mocks (e.g. to fix some of the mocks would have required pulling in half the world or even more excessive mocking). I'm going to close this for now and open a ticket about the issue so it isn't lost. I'll revisit this PR if I find some time but for now it's probably best to admit defeat :( |
This PR makes bulk index action to wait for cluster to recover before resolving index templates so that ingest pipelines are correctly processed when the cluster is recovering. Resolves: elastic#49499 Supercedes: elastic#46085
If a bulk request hits a node that has not recovered the CS yet, index metadata will not be available. The actual bulk request will wait/retry until the CS is recovered (due to retry mechanism in
BulkOperation
), but the resolution of index templates has no retry mechanism. This meansdefault-pipeline
processors will be omitted and this can result in incorrect documents being indexed.This commit adds a blocking/retry mechanism so that index metadata resolution waits until the CS has been recovered.
Questions/concerns
I'm assuming we want to retry if the CS isn't ready... is that a correct assumption?
This follows a similar pattern to
BulkOperation
/executeBulk()
, since it seems the TransportBulkAction is reused (at least in the integ tests) so we need to store state somewhere other than on the action itself.It does makes
BulkOperation
somewhat redundant, since that also does cluster state checking. But I was hesitant to change more than was necessary. If desired I can try to meld the two together so there is less redundancy.Not familiar with this code, so any/all suggestions welcome :)