Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for cluster to recover before resolving index template #99797

Merged
merged 8 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/99797.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 99797
summary: Wait for cluster to recover before resolving index template
area: CRUD
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,22 @@
*/
package org.elasticsearch.ingest.common;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.ingest.IngestStats;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.MockScriptEngine;
import org.elasticsearch.script.MockScriptPlugin;
import org.elasticsearch.test.ESIntegTestCase;
Expand All @@ -24,8 +31,11 @@

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;

Expand Down Expand Up @@ -262,4 +272,95 @@ public void testWithDedicatedIngestNode() throws Exception {
assertThat(source.get("y"), equalTo(0));
}

public void testDefaultPipelineWaitForClusterStateRecovered() throws Exception {
internalCluster().startNode();

final var pipeline = new BytesArray("""
{
"processors" : [
{
"set": {
"field": "value",
"value": 42
}
}
]
}""");
client().admin().cluster().preparePutPipeline("test_pipeline", pipeline, XContentType.JSON).get();
client().admin().indices().preparePutTemplate("pipeline_template").setPatterns(Collections.singletonList("*")).setSettings("""
{
"index" : {
"default_pipeline" : "test_pipeline"
}
}
""", XContentType.JSON).get();

internalCluster().fullRestart(new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) {
return Settings.builder().put(GatewayService.RECOVER_AFTER_DATA_NODES_SETTING.getKey(), "2").build();
}

@Override
public boolean validateClusterForming() {
return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I think it's not necessary to skip this validation, it's only checking that the master is elected. At least, the test should work whether we wait like that or not, so maybe return randomBoolean() to indicate we're not relying on it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's good know. Thanks. Updated as suggested.

}
});

// this one should fail
assertThat(
expectThrows(
ClusterBlockException.class,
() -> client().prepareIndex("index")
.setId("fails")
.setSource("x", 1)
.setTimeout(TimeValue.timeValueMillis(100)) // 100ms, to fail quickly
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get()
ywangd marked this conversation as resolved.
Show resolved Hide resolved
).getMessage(),
equalTo("blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized];")
);

final var latch = new CountDownLatch(1);
// but this one should pass since it has a longer timeout
client().prepareIndex("index")
.setId("passes1")
.setSource("x", 2)
.setTimeout(TimeValue.timeValueSeconds(60)) // wait for second node to start in below
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.execute(new ActionListener<>() {
@Override
public void onResponse(IndexResponse indexResponse) {
assertThat(indexResponse.status(), equalTo(RestStatus.CREATED));
assertThat(indexResponse.getResult(), equalTo(DocWriteResponse.Result.CREATED));
latch.countDown();
}

@Override
public void onFailure(Exception e) {
fail("Should not have failed with exception: " + e.getMessage());
}
});

// so the cluster state can be recovered
internalCluster().startNode(Settings.builder().put(GatewayService.RECOVER_AFTER_DATA_NODES_SETTING.getKey(), "1"));
ensureYellow("index");
assertTrue(latch.await(5, TimeUnit.SECONDS));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5s might be a little short, suggest using safeAwait:

Suggested change
assertTrue(latch.await(5, TimeUnit.SECONDS));
safeAwait(latch);

However, personally I think it'd be even better to use a PlainActionFuture as the listener so that any exception is immediately thrown back on the main test thread rather than having to wait for the timeout on the latch.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Using PlainActionFuture is a better and more explicit choice. Updated.


client().prepareIndex("index").setId("passes2").setSource("x", 3).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
client().admin().indices().prepareRefresh("index").get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This refresh shouldn't be necessary I think, we set ?refresh=true on all the preceding indexing.


// successfully indexed documents should have the value field set by the pipeline
Map<String, Object> source = client().prepareGet("index", "passes1").get().getSource();
assertThat(source.get("x"), equalTo(2));
assertThat(source.get("value"), equalTo(42));

source = client().prepareGet("index", "passes2").get().getSource();
assertThat(source.get("x"), equalTo(3));
assertThat(source.get("value"), equalTo(42));

// and make sure this failed doc didn't get through
source = client().prepareGet("index", "fails").get().getSource();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer all of these .get() calls to have a timeout (10s?) too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Updated all the places with a 10 second timeout.

assertNull(source);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,47 @@ public static <Response extends ReplicationResponse & WriteResponse> ActionListe

@Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
final long startTime = relativeTime();
final ClusterState initialState = clusterService.state();
final ClusterBlockException blockException = initialState.blocks().globalBlockedException(ClusterBlockLevel.WRITE);
if (blockException != null) {
if (false == blockException.retryable()) {
listener.onFailure(blockException);
return;
}
logger.trace("cluster is blocked, waiting for it to recover", blockException);
final ClusterStateObserver clusterStateObserver = new ClusterStateObserver(
initialState,
clusterService,
bulkRequest.timeout(),
logger,
threadPool.getThreadContext()
);
clusterStateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
doExecuteOnWriteThreadPool(task, bulkRequest, startTime, listener);
}

@Override
public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}

@Override
public void onTimeout(TimeValue timeout) {
listener.onFailure(blockException);
}
}, newState -> false == newState.blocks().hasGlobalBlockWithLevel(ClusterBlockLevel.WRITE));
} else {
doExecuteOnWriteThreadPool(task, bulkRequest, startTime, listener);
}
}

private void doExecuteOnWriteThreadPool(Task task, BulkRequest bulkRequest, long startTime, ActionListener<BulkResponse> listener) {
ywangd marked this conversation as resolved.
Show resolved Hide resolved
/*
* This is called on the Transport tread so we can check the indexing
* memory pressure *quickly* but we don't want to keep the transport
* This is called on the Transport thread and sometimes on the cluster state applier thread,
* so we can check the indexing memory pressure *quickly* but we don't want to keep the transport
* thread busy. Then, as soon as we have the indexing pressure in we fork
* to one of the write thread pools. We do this because juggling the
* bulk request can get expensive for a few reasons:
Expand All @@ -209,19 +247,24 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
final int indexingOps = bulkRequest.numberOfActions();
final long indexingBytes = bulkRequest.ramBytesUsed();
final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices);
final String executorName = isOnlySystem ? Names.SYSTEM_WRITE : Names.WRITE;
final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah wait, sorry, we need to do this check first, on the transport worker, before we start waiting for cluster states. Otherwise we might just pile up far too much work in memory waiting for the cluster recovery.

But that is a little tricky because we need to compute isOnlySystem to call this, and that needs the cluster to be recovered. I suggest we conservatively assume isOnlySystem == false if the cluster is not yet recovered.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I don't quite follow. It seems to me that when the code reaches here, the cluster state is either recovered successfully or we had no need to wait for recover at all. So there is no more waiting after this line?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's right, but it's the waiting before this line that's a problem. Before we run this line we have the bulk request in memory but we aren't tracking it in the indexing pressure subsystem. If we receive lots of indexing requests while the cluster is recovering then we will try to hold them all in memory, rejecting none of them and will eventually just OOM.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Thanks a lot for the explanation! Updated in 812154a

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, I didn't change how isOnlySystem is computed since the method is written with the assumption that the index may not be available yet (see here).
Also, if the metadata is not available, the ultimate default is to return false (when the SystemIndices also does not know the index).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had some concerns about whether clusterState.metadata() could be null, or clusterState.metadata().getIndicesLookup(), but I did some checking and I think this is ok.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks checking! I didn't think that was possible.

final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
final String executorName = isOnlySystem ? Names.SYSTEM_WRITE : Names.WRITE;
threadPool.executor(Names.WRITE).execute(new ActionRunnable<>(releasingListener) {
@Override
protected void doRun() {
doInternalExecute(task, bulkRequest, executorName, releasingListener);
doInternalExecute(task, bulkRequest, executorName, startTime, releasingListener);
}
});
}

protected void doInternalExecute(Task task, BulkRequest bulkRequest, String executorName, ActionListener<BulkResponse> listener) {
final long startTime = relativeTime();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think preserving the original start time is a good change but it seems unrelated to the rest of this PR, and deserves a test of its own too. Could you separate that out into a different PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching it. This is a bigger change than I initially thought because today the startTime is reset after ingestService completes the processing. My previous change would have made it include the ingestService processing time (but not the request forward time). I have reverted this change. A follow-up is better. Thanks!

protected void doInternalExecute(
Task task,
BulkRequest bulkRequest,
String executorName,
long startTime,
ActionListener<BulkResponse> listener
) {
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());

boolean hasIndexRequestsWithPipelines = false;
Expand Down Expand Up @@ -256,7 +299,7 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec
assert arePipelinesResolved : bulkRequest;
}
if (clusterService.localNode().isIngestNode()) {
processBulkIndexIngestRequest(task, bulkRequest, executorName, l);
processBulkIndexIngestRequest(task, bulkRequest, executorName, startTime, l);
} else {
ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, l);
}
Expand Down Expand Up @@ -759,6 +802,7 @@ private void processBulkIndexIngestRequest(
Task task,
BulkRequest original,
String executorName,
long startTime,
ActionListener<BulkResponse> listener
) {
final long ingestStartTimeInNanos = System.nanoTime();
Expand Down Expand Up @@ -788,7 +832,7 @@ private void processBulkIndexIngestRequest(
ActionRunnable<BulkResponse> runnable = new ActionRunnable<>(actionListener) {
@Override
protected void doRun() {
doInternalExecute(task, bulkRequest, executorName, actionListener);
doInternalExecute(task, bulkRequest, executorName, startTime, actionListener);
}

@Override
Expand Down