From dd1eb2056b6b2b91a32e9add3f0f97c54f7f4dbb Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Fri, 22 Sep 2023 23:19:28 +1000 Subject: [PATCH] Wait for cluster to recover before resolving index template (#99797) This PR makes bulk index action wait for cluster to recover before resolving index templates so that ingest pipelines are correctly processed when the cluster is recovering. Resolves: #49499 Supercedes: #46085 --- docs/changelog/99797.yaml | 5 ++ .../ingest/common/IngestRestartIT.java | 90 +++++++++++++++++++ .../action/bulk/TransportBulkAction.java | 48 +++++++++- ...ActionIndicesThatCannotBeCreatedTests.java | 2 + .../bulk/TransportBulkActionIngestTests.java | 2 + 5 files changed, 146 insertions(+), 1 deletion(-) create mode 100644 docs/changelog/99797.yaml diff --git a/docs/changelog/99797.yaml b/docs/changelog/99797.yaml new file mode 100644 index 0000000000000..e46d4501291b5 --- /dev/null +++ b/docs/changelog/99797.yaml @@ -0,0 +1,5 @@ +pr: 99797 +summary: Wait for cluster to recover before resolving index template +area: CRUD +type: bug +issues: [] diff --git a/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/IngestRestartIT.java b/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/IngestRestartIT.java index acd0ed59da137..f2cebfc2569d7 100644 --- a/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/IngestRestartIT.java +++ b/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/IngestRestartIT.java @@ -7,15 +7,22 @@ */ package org.elasticsearch.ingest.common; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Strings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.ingest.IngestStats; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.MockScriptEngine; import org.elasticsearch.script.MockScriptPlugin; import org.elasticsearch.test.ESIntegTestCase; @@ -24,6 +31,7 @@ import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.function.Consumer; @@ -262,4 +270,86 @@ public void testWithDedicatedIngestNode() throws Exception { assertThat(source.get("y"), equalTo(0)); } + public void testDefaultPipelineWaitForClusterStateRecovered() throws Exception { + internalCluster().startNode(); + + final var pipeline = new BytesArray(""" + { + "processors" : [ + { + "set": { + "field": "value", + "value": 42 + } + } + ] + }"""); + final TimeValue timeout = TimeValue.timeValueSeconds(10); + client().admin().cluster().preparePutPipeline("test_pipeline", pipeline, XContentType.JSON).get(timeout); + client().admin().indices().preparePutTemplate("pipeline_template").setPatterns(Collections.singletonList("*")).setSettings(""" + { + "index" : { + "default_pipeline" : "test_pipeline" + } + } + """, XContentType.JSON).get(timeout); + + internalCluster().fullRestart(new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) { + return Settings.builder().put(GatewayService.RECOVER_AFTER_DATA_NODES_SETTING.getKey(), "2").build(); + } + + @Override + public boolean validateClusterForming() { + return randomBoolean(); + } + }); + + // this one should fail + assertThat( + expectThrows( + ClusterBlockException.class, + () -> client().prepareIndex("index") + .setId("fails") + .setSource("x", 1) + .setTimeout(TimeValue.timeValueMillis(100)) // 100ms, to fail quickly + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(timeout) + ).getMessage(), + equalTo("blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized];") + ); + + // but this one should pass since it has a longer timeout + final PlainActionFuture future = new PlainActionFuture<>(); + client().prepareIndex("index") + .setId("passes1") + .setSource("x", 2) + .setTimeout(TimeValue.timeValueSeconds(60)) // wait for second node to start in below + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .execute(future); + + // so the cluster state can be recovered + internalCluster().startNode(Settings.builder().put(GatewayService.RECOVER_AFTER_DATA_NODES_SETTING.getKey(), "1")); + ensureYellow("index"); + + final IndexResponse indexResponse = future.actionGet(timeout); + assertThat(indexResponse.status(), equalTo(RestStatus.CREATED)); + assertThat(indexResponse.getResult(), equalTo(DocWriteResponse.Result.CREATED)); + + client().prepareIndex("index").setId("passes2").setSource("x", 3).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + + // successfully indexed documents should have the value field set by the pipeline + Map source = client().prepareGet("index", "passes1").get(timeout).getSource(); + assertThat(source.get("x"), equalTo(2)); + assertThat(source.get("value"), equalTo(42)); + + source = client().prepareGet("index", "passes2").get(timeout).getSource(); + assertThat(source.get("x"), equalTo(3)); + assertThat(source.get("value"), equalTo(42)); + + // and make sure this failed doc didn't get through + source = client().prepareGet("index", "fails").get(timeout).getSource(); + assertNull(source); + } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 6635323a2ef2c..13d10be86bd68 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -192,7 +192,7 @@ public static ActionListe @Override protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener listener) { /* - * This is called on the Transport tread so we can check the indexing + * This is called on the Transport thread so we can check the indexing * memory pressure *quickly* but we don't want to keep the transport * thread busy. Then, as soon as we have the indexing pressure in we fork * to one of the write thread pools. We do this because juggling the @@ -212,6 +212,52 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener releasingListener = ActionListener.runBefore(listener, releasable::close); final String executorName = isOnlySystem ? Names.SYSTEM_WRITE : Names.WRITE; + ensureClusterStateThenForkAndExecute(task, bulkRequest, executorName, releasingListener); + } + + private void ensureClusterStateThenForkAndExecute( + Task task, + BulkRequest bulkRequest, + String executorName, + ActionListener releasingListener + ) { + final ClusterState initialState = clusterService.state(); + final ClusterBlockException blockException = initialState.blocks().globalBlockedException(ClusterBlockLevel.WRITE); + if (blockException != null) { + if (false == blockException.retryable()) { + releasingListener.onFailure(blockException); + return; + } + logger.trace("cluster is blocked, waiting for it to recover", blockException); + final ClusterStateObserver clusterStateObserver = new ClusterStateObserver( + initialState, + clusterService, + bulkRequest.timeout(), + logger, + threadPool.getThreadContext() + ); + clusterStateObserver.waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + forkAndExecute(task, bulkRequest, executorName, releasingListener); + } + + @Override + public void onClusterServiceClose() { + releasingListener.onFailure(new NodeClosedException(clusterService.localNode())); + } + + @Override + public void onTimeout(TimeValue timeout) { + releasingListener.onFailure(blockException); + } + }, newState -> false == newState.blocks().hasGlobalBlockWithLevel(ClusterBlockLevel.WRITE)); + } else { + forkAndExecute(task, bulkRequest, executorName, releasingListener); + } + } + + private void forkAndExecute(Task task, BulkRequest bulkRequest, String executorName, ActionListener releasingListener) { threadPool.executor(Names.WRITE).execute(new ActionRunnable<>(releasingListener) { @Override protected void doRun() { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java index 1b0c24664be31..618a33821c156 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -95,6 +96,7 @@ private void indicesThatCannotBeCreatedTestCase( ClusterState state = mock(ClusterState.class); when(state.getMetadata()).thenReturn(Metadata.EMPTY_METADATA); when(state.metadata()).thenReturn(Metadata.EMPTY_METADATA); + when(state.blocks()).thenReturn(mock(ClusterBlocks.class)); when(clusterService.state()).thenReturn(state); when(clusterService.getSettings()).thenReturn(Settings.EMPTY); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index 86146e41f87e1..0168eb0488a5b 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; +import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.AliasMetadata; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -218,6 +219,7 @@ public void setupAction() { .build(); when(state.getMetadata()).thenReturn(metadata); when(state.metadata()).thenReturn(metadata); + when(state.blocks()).thenReturn(mock(ClusterBlocks.class)); when(clusterService.state()).thenReturn(state); doAnswer(invocation -> { ClusterChangedEvent event = mock(ClusterChangedEvent.class);