Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for cluster to recover before resolving index template #99797

Merged
merged 8 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/99797.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 99797
summary: Wait for cluster to recover before resolving index template
area: CRUD
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,22 @@
*/
package org.elasticsearch.ingest.common;

import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.ingest.IngestStats;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.MockScriptEngine;
import org.elasticsearch.script.MockScriptPlugin;
import org.elasticsearch.test.ESIntegTestCase;
Expand All @@ -24,6 +31,7 @@

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
Expand Down Expand Up @@ -262,4 +270,86 @@ public void testWithDedicatedIngestNode() throws Exception {
assertThat(source.get("y"), equalTo(0));
}

public void testDefaultPipelineWaitForClusterStateRecovered() throws Exception {
internalCluster().startNode();

final var pipeline = new BytesArray("""
{
"processors" : [
{
"set": {
"field": "value",
"value": 42
}
}
]
}""");
final TimeValue timeout = TimeValue.timeValueSeconds(10);
client().admin().cluster().preparePutPipeline("test_pipeline", pipeline, XContentType.JSON).get(timeout);
client().admin().indices().preparePutTemplate("pipeline_template").setPatterns(Collections.singletonList("*")).setSettings("""
{
"index" : {
"default_pipeline" : "test_pipeline"
}
}
""", XContentType.JSON).get(timeout);

internalCluster().fullRestart(new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) {
return Settings.builder().put(GatewayService.RECOVER_AFTER_DATA_NODES_SETTING.getKey(), "2").build();
}

@Override
public boolean validateClusterForming() {
return randomBoolean();
}
});

// this one should fail
assertThat(
expectThrows(
ClusterBlockException.class,
() -> client().prepareIndex("index")
.setId("fails")
.setSource("x", 1)
.setTimeout(TimeValue.timeValueMillis(100)) // 100ms, to fail quickly
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get(timeout)
).getMessage(),
equalTo("blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized];")
);

// but this one should pass since it has a longer timeout
final PlainActionFuture<IndexResponse> future = new PlainActionFuture<>();
client().prepareIndex("index")
.setId("passes1")
.setSource("x", 2)
.setTimeout(TimeValue.timeValueSeconds(60)) // wait for second node to start in below
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.execute(future);

// so the cluster state can be recovered
internalCluster().startNode(Settings.builder().put(GatewayService.RECOVER_AFTER_DATA_NODES_SETTING.getKey(), "1"));
ensureYellow("index");

final IndexResponse indexResponse = future.actionGet(timeout);
assertThat(indexResponse.status(), equalTo(RestStatus.CREATED));
assertThat(indexResponse.getResult(), equalTo(DocWriteResponse.Result.CREATED));

client().prepareIndex("index").setId("passes2").setSource("x", 3).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();

// successfully indexed documents should have the value field set by the pipeline
Map<String, Object> source = client().prepareGet("index", "passes1").get(timeout).getSource();
assertThat(source.get("x"), equalTo(2));
assertThat(source.get("value"), equalTo(42));

source = client().prepareGet("index", "passes2").get(timeout).getSource();
assertThat(source.get("x"), equalTo(3));
assertThat(source.get("value"), equalTo(42));

// and make sure this failed doc didn't get through
source = client().prepareGet("index", "fails").get(timeout).getSource();
assertNull(source);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,9 @@ public static <Response extends ReplicationResponse & WriteResponse> ActionListe
}

@Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> originalListener) {
/*
* This is called on the Transport tread so we can check the indexing
* This is called on the Transport thread so we can check the indexing
* memory pressure *quickly* but we don't want to keep the transport
* thread busy. Then, as soon as we have the indexing pressure in we fork
* to one of the write thread pools. We do this because juggling the
Expand All @@ -206,18 +206,58 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
* We *could* detect these cases and only fork in then, but that is complex
* to get right and the fork is fairly low overhead.
*/
final ClusterState initialState = clusterService.state();
final int indexingOps = bulkRequest.numberOfActions();
final long indexingBytes = bulkRequest.ramBytesUsed();
final boolean isOnlySystem = isOnlySystem(bulkRequest, clusterService.state().metadata().getIndicesLookup(), systemIndices);
final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem);
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
final boolean isOnlySystem = isOnlySystem(bulkRequest, initialState.metadata().getIndicesLookup(), systemIndices);
final String executorName = isOnlySystem ? Names.SYSTEM_WRITE : Names.WRITE;
threadPool.executor(Names.WRITE).execute(new ActionRunnable<>(releasingListener) {
@Override
protected void doRun() {
doInternalExecute(task, bulkRequest, executorName, releasingListener);
final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem);
// We should use the releasingListener from here onwards
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A nice way to enforce this is to extract the rest of this method out into another method which receives only the wrapped listener in its arguments.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I thought about that, but somehow decided against for no particularly good reason. I have now split it into a separate method.

final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(originalListener, releasable::close);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a slight preference for leaving these lines in the same order to simplify the diff, unless I'm missing something about the new order being required?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was accidental. Should be reverted by now.


final ClusterBlockException blockException = initialState.blocks().globalBlockedException(ClusterBlockLevel.WRITE);
if (blockException != null) {
if (false == blockException.retryable()) {
releasingListener.onFailure(blockException);
return;
}
});
logger.trace("cluster is blocked, waiting for it to recover", blockException);
final ClusterStateObserver clusterStateObserver = new ClusterStateObserver(
initialState,
clusterService,
bulkRequest.timeout(),
logger,
threadPool.getThreadContext()
);
clusterStateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
threadPool.executor(Names.WRITE).execute(new ActionRunnable<>(releasingListener) {
@Override
protected void doRun() {
doInternalExecute(task, bulkRequest, executorName, releasingListener);
}
});
}

@Override
public void onClusterServiceClose() {
releasingListener.onFailure(new NodeClosedException(clusterService.localNode()));
}

@Override
public void onTimeout(TimeValue timeout) {
releasingListener.onFailure(blockException);
}
}, newState -> false == newState.blocks().hasGlobalBlockWithLevel(ClusterBlockLevel.WRITE));
} else {
threadPool.executor(Names.WRITE).execute(new ActionRunnable<>(releasingListener) {
@Override
protected void doRun() {
doInternalExecute(task, bulkRequest, executorName, releasingListener);
}
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same as the onNewClusterState implementation, can we find a way to reduce the duplication?

Also I wonder why we're forking to WRITE here even though we know it might be a low-latency system-indices-only write. Not a new problem, but something to think about in future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is now splitted into a separate method. I didn't do that because it felt too small for a method and not too big for a duplication. It is also possible to wrap the thing in a runnable so that it can be called in both places. But it feels wasteful to create an extra object. Please let me know if you have a preference.

Also I wonder why we're forking to WRITE here even though we know it might be a low-latency system-indices-only write. Not a new problem, but something to think about in future.

Yeah. I noticed it as well when I duplicated the code. I think it is a bug. But would rather address it separately.

}
}

protected void doInternalExecute(Task task, BulkRequest bulkRequest, String executorName, ActionListener<BulkResponse> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -95,6 +96,7 @@ private void indicesThatCannotBeCreatedTestCase(
ClusterState state = mock(ClusterState.class);
when(state.getMetadata()).thenReturn(Metadata.EMPTY_METADATA);
when(state.metadata()).thenReturn(Metadata.EMPTY_METADATA);
when(state.blocks()).thenReturn(mock(ClusterBlocks.class));
when(clusterService.state()).thenReturn(state);
when(clusterService.getSettings()).thenReturn(Settings.EMPTY);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand Down Expand Up @@ -218,6 +219,7 @@ public void setupAction() {
.build();
when(state.getMetadata()).thenReturn(metadata);
when(state.metadata()).thenReturn(metadata);
when(state.blocks()).thenReturn(mock(ClusterBlocks.class));
when(clusterService.state()).thenReturn(state);
doAnswer(invocation -> {
ClusterChangedEvent event = mock(ClusterChangedEvent.class);
Expand Down