Skip to content

Commit

Permalink
Wait for cluster to recover before resolving index template (elastic#…
Browse files Browse the repository at this point in the history
…99797)

This PR makes bulk index action wait for cluster to recover before
resolving index templates so that ingest pipelines are correctly
processed when the cluster is recovering.

Resolves: elastic#49499
Supercedes: elastic#46085
  • Loading branch information
ywangd authored Sep 22, 2023
1 parent ed8ea1f commit dd1eb20
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 1 deletion.
5 changes: 5 additions & 0 deletions docs/changelog/99797.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 99797
summary: Wait for cluster to recover before resolving index template
area: CRUD
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,22 @@
*/
package org.elasticsearch.ingest.common;

import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.ingest.IngestStats;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.MockScriptEngine;
import org.elasticsearch.script.MockScriptPlugin;
import org.elasticsearch.test.ESIntegTestCase;
Expand All @@ -24,6 +31,7 @@

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
Expand Down Expand Up @@ -262,4 +270,86 @@ public void testWithDedicatedIngestNode() throws Exception {
assertThat(source.get("y"), equalTo(0));
}

public void testDefaultPipelineWaitForClusterStateRecovered() throws Exception {
internalCluster().startNode();

final var pipeline = new BytesArray("""
{
"processors" : [
{
"set": {
"field": "value",
"value": 42
}
}
]
}""");
final TimeValue timeout = TimeValue.timeValueSeconds(10);
client().admin().cluster().preparePutPipeline("test_pipeline", pipeline, XContentType.JSON).get(timeout);
client().admin().indices().preparePutTemplate("pipeline_template").setPatterns(Collections.singletonList("*")).setSettings("""
{
"index" : {
"default_pipeline" : "test_pipeline"
}
}
""", XContentType.JSON).get(timeout);

internalCluster().fullRestart(new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) {
return Settings.builder().put(GatewayService.RECOVER_AFTER_DATA_NODES_SETTING.getKey(), "2").build();
}

@Override
public boolean validateClusterForming() {
return randomBoolean();
}
});

// this one should fail
assertThat(
expectThrows(
ClusterBlockException.class,
() -> client().prepareIndex("index")
.setId("fails")
.setSource("x", 1)
.setTimeout(TimeValue.timeValueMillis(100)) // 100ms, to fail quickly
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get(timeout)
).getMessage(),
equalTo("blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized];")
);

// but this one should pass since it has a longer timeout
final PlainActionFuture<IndexResponse> future = new PlainActionFuture<>();
client().prepareIndex("index")
.setId("passes1")
.setSource("x", 2)
.setTimeout(TimeValue.timeValueSeconds(60)) // wait for second node to start in below
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.execute(future);

// so the cluster state can be recovered
internalCluster().startNode(Settings.builder().put(GatewayService.RECOVER_AFTER_DATA_NODES_SETTING.getKey(), "1"));
ensureYellow("index");

final IndexResponse indexResponse = future.actionGet(timeout);
assertThat(indexResponse.status(), equalTo(RestStatus.CREATED));
assertThat(indexResponse.getResult(), equalTo(DocWriteResponse.Result.CREATED));

client().prepareIndex("index").setId("passes2").setSource("x", 3).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();

// successfully indexed documents should have the value field set by the pipeline
Map<String, Object> source = client().prepareGet("index", "passes1").get(timeout).getSource();
assertThat(source.get("x"), equalTo(2));
assertThat(source.get("value"), equalTo(42));

source = client().prepareGet("index", "passes2").get(timeout).getSource();
assertThat(source.get("x"), equalTo(3));
assertThat(source.get("value"), equalTo(42));

// and make sure this failed doc didn't get through
source = client().prepareGet("index", "fails").get(timeout).getSource();
assertNull(source);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public static <Response extends ReplicationResponse & WriteResponse> ActionListe
@Override
protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener) {
/*
* This is called on the Transport tread so we can check the indexing
* This is called on the Transport thread so we can check the indexing
* memory pressure *quickly* but we don't want to keep the transport
* thread busy. Then, as soon as we have the indexing pressure in we fork
* to one of the write thread pools. We do this because juggling the
Expand All @@ -212,6 +212,52 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
final Releasable releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem);
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
final String executorName = isOnlySystem ? Names.SYSTEM_WRITE : Names.WRITE;
ensureClusterStateThenForkAndExecute(task, bulkRequest, executorName, releasingListener);
}

private void ensureClusterStateThenForkAndExecute(
Task task,
BulkRequest bulkRequest,
String executorName,
ActionListener<BulkResponse> releasingListener
) {
final ClusterState initialState = clusterService.state();
final ClusterBlockException blockException = initialState.blocks().globalBlockedException(ClusterBlockLevel.WRITE);
if (blockException != null) {
if (false == blockException.retryable()) {
releasingListener.onFailure(blockException);
return;
}
logger.trace("cluster is blocked, waiting for it to recover", blockException);
final ClusterStateObserver clusterStateObserver = new ClusterStateObserver(
initialState,
clusterService,
bulkRequest.timeout(),
logger,
threadPool.getThreadContext()
);
clusterStateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
forkAndExecute(task, bulkRequest, executorName, releasingListener);
}

@Override
public void onClusterServiceClose() {
releasingListener.onFailure(new NodeClosedException(clusterService.localNode()));
}

@Override
public void onTimeout(TimeValue timeout) {
releasingListener.onFailure(blockException);
}
}, newState -> false == newState.blocks().hasGlobalBlockWithLevel(ClusterBlockLevel.WRITE));
} else {
forkAndExecute(task, bulkRequest, executorName, releasingListener);
}
}

private void forkAndExecute(Task task, BulkRequest bulkRequest, String executorName, ActionListener<BulkResponse> releasingListener) {
threadPool.executor(Names.WRITE).execute(new ActionRunnable<>(releasingListener) {
@Override
protected void doRun() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -95,6 +96,7 @@ private void indicesThatCannotBeCreatedTestCase(
ClusterState state = mock(ClusterState.class);
when(state.getMetadata()).thenReturn(Metadata.EMPTY_METADATA);
when(state.metadata()).thenReturn(Metadata.EMPTY_METADATA);
when(state.blocks()).thenReturn(mock(ClusterBlocks.class));
when(clusterService.state()).thenReturn(state);
when(clusterService.getSettings()).thenReturn(Settings.EMPTY);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand Down Expand Up @@ -218,6 +219,7 @@ public void setupAction() {
.build();
when(state.getMetadata()).thenReturn(metadata);
when(state.metadata()).thenReturn(metadata);
when(state.blocks()).thenReturn(mock(ClusterBlocks.class));
when(clusterService.state()).thenReturn(state);
doAnswer(invocation -> {
ClusterChangedEvent event = mock(ClusterChangedEvent.class);
Expand Down

0 comments on commit dd1eb20

Please sign in to comment.