From c02ef26ea1811d5a8259e08d6707fac9a668b4a9 Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Wed, 28 Aug 2019 11:00:00 -0400 Subject: [PATCH 1/2] Block bulk requests until cluster state is recovered If a bulk request hits a node that has not recovered the CS yet, index templates (and their ingest pipelines) are not loaded yet. The actual bulk request will wait/retry until the state is recovered, but the resolution of index templates will yield nothing, which means `default-pipeline` processors will be omitted and this can result in incorrect documents being indexed. This commit adds a blocking/retry mechanism so that index metadata resolution waits until the CS has been recovered. --- .../ingest/common/IngestRestartIT.java | 101 ++++++ .../action/bulk/TransportBulkAction.java | 295 +++++++++++------- 2 files changed, 278 insertions(+), 118 deletions(-) diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java index 6c79c68df1df1..f406ccc965ce7 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/IngestRestartIT.java @@ -19,13 +19,20 @@ package org.elasticsearch.ingest.common; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.MockScriptEngine; import org.elasticsearch.script.MockScriptPlugin; import org.elasticsearch.test.ESIntegTestCase; @@ -35,6 +42,8 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; @@ -177,6 +186,98 @@ public void testPipelineWithScriptProcessorThatHasStoredScript() throws Exceptio assertThat(source.get("z"), equalTo(0)); } + public void testDefaultPipelineBeforeCSRecovered() throws Exception { + internalCluster().startNode(); + + BytesReference pipeline = new BytesArray("{\n" + + " \"processors\" : [\n" + + " {\n" + + " \"remove\": {\n" + + " \"field\": \"_type\"\n" + + " }\n" + + " }" + + " ]\n" + + "}"); + client().admin().cluster().preparePutPipeline("test_pipeline", pipeline, XContentType.JSON).get(); + client().admin().indices().preparePutTemplate("pipeline_template") + .setPatterns(Collections.singletonList("*")) + .setSettings( + "{\n" + + " \"index\" : {\n" + + " \"default_pipeline\" : \"test_pipeline\"" + + " }\n" + + "}\n", XContentType.JSON).get(); + + + internalCluster().fullRestart(new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) { + return Settings.builder().put(GatewayService.RECOVER_AFTER_NODES_SETTING.getKey(), "2").build(); + } + + @Override + public boolean validateClusterForming() { + return false; + } + }); + + CountDownLatch latch = new CountDownLatch(1); + + // this one should fail + assertThat(expectThrows(ClusterBlockException.class, () -> client().prepareIndex("index", "foo", "fails") + .setSource("x", 1) + .setTimeout(TimeValue.timeValueMillis(100)) // 100ms, to fail quickly + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get()).getMessage(), equalTo("blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized];")); + + // but this one should pass since it has a longer timeout + client().prepareIndex("index", "foo", "passes1") + .setSource("x", 2) + .setTimeout(TimeValue.timeValueSeconds(60)) // wait for second node to start + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .execute(new ActionListener() { + @Override + public void onResponse(IndexResponse indexResponse) { + assertThat(indexResponse.status(), equalTo(RestStatus.CREATED)); + assertThat(indexResponse.getResult(), equalTo(DocWriteResponse.Result.CREATED)); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + fail("Should not have failed with exception: " + e.getMessage()); + } + }); + + // so the cluster state can be recovered + internalCluster() + .startNode(Settings.builder().put(GatewayService.RECOVER_AFTER_NODES_SETTING.getKey(), "1")); + ensureYellow("index"); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + + client().prepareIndex("index", "bar", "passes2") + .setSource("x", 3) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + + client().admin().indices().prepareRefresh("index").get(); + + // note that the types are _doc not `foo` or `bar` + Map source = client().prepareGet("index", "_doc", "passes1").get().getSource(); + assertThat(source.get("x"), equalTo(2)); + + source = client().prepareGet("index", "_doc", "passes2").get().getSource(); + assertThat(source.get("x"), equalTo(3)); + + // and make sure this failed doc didn't get through + source = client().prepareGet("index", "foo", "fails").get().getSource(); + assertNull(source); + source = client().prepareGet("index", "_doc", "fails").get().getSource(); + assertNull(source); + + } + public void testWithDedicatedIngestNode() throws Exception { String node = internalCluster().startNode(); String ingestNode = internalCluster().startNode(Settings.builder() diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index a2f105df7e9b7..a9da7d70066c4 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -147,149 +147,208 @@ public static IndexRequest getIndexWriteRequest(DocWriteRequest docWriteRequest) @Override protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener listener) { - final long startTime = relativeTime(); - final AtomicArray responses = new AtomicArray<>(bulkRequest.requests.size()); - - boolean hasIndexRequestsWithPipelines = false; - final MetaData metaData = clusterService.state().getMetaData(); - ImmutableOpenMap indicesMetaData = metaData.indices(); - for (DocWriteRequest actionRequest : bulkRequest.requests) { - IndexRequest indexRequest = getIndexWriteRequest(actionRequest); - if (indexRequest != null) { - // get pipeline from request - String pipeline = indexRequest.getPipeline(); - if (pipeline == null) { - // start to look for default pipeline via settings found in the index meta data - IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index()); - // check the alias for the index request (this is how normal index requests are modeled) - if (indexMetaData == null && indexRequest.index() != null) { - AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index()); - if (indexOrAlias != null && indexOrAlias.isAlias()) { - AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias; - indexMetaData = alias.getWriteIndex(); - } + new BulkExecutor(task, bulkRequest, listener).run(); + } + + /** + * A runnable that will ensure the cluster state has been recovered enough to + * read index metadata and templates/pipelines. Will retry up to the bulk's timeout + */ + private final class BulkExecutor extends ActionRunnable { + private final ClusterStateObserver recoveredObserver; + private final BulkRequest bulkRequest; + private final Task task; + long startTime = -1; + + BulkExecutor(Task task, BulkRequest bulkRequest, ActionListener listener) { + super(listener); + this.recoveredObserver = new ClusterStateObserver(clusterService, bulkRequest.timeout(), logger, threadPool.getThreadContext()); + this.bulkRequest = bulkRequest; + this.task = task; + } + + @Override + protected void doRun() { + if (startTime == -1) { + startTime = relativeTime(); + } + ClusterBlockException blockException = clusterService.state().blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + if (blockException != null) { + if (recoveredObserver.isTimedOut()) { + // we running as a last attempt after a timeout has happened. don't retry + listener.onFailure(blockException); + return; + } + recoveredObserver.waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + run(); } - // check the alias for the action request (this is how upserts are modeled) - if (indexMetaData == null && actionRequest.index() != null) { - AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(actionRequest.index()); - if (indexOrAlias != null && indexOrAlias.isAlias()) { - AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias; - indexMetaData = alias.getWriteIndex(); - } + + @Override + public void onClusterServiceClose() { + listener.onFailure(new NodeClosedException(clusterService.localNode())); } - if (indexMetaData != null) { - // Find the default pipeline if one is defined from and existing index. - String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings()); - indexRequest.setPipeline(defaultPipeline); - if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) { - hasIndexRequestsWithPipelines = true; + + @Override + public void onTimeout(TimeValue timeout) { + // Try one more time... + run(); + } + }); + return; + } + + // All good, begin preparing for the bulk request + prepForBulk(); + } + + private void prepForBulk() { + final AtomicArray responses = new AtomicArray<>(bulkRequest.requests.size()); + + boolean hasIndexRequestsWithPipelines = false; + final MetaData metaData = clusterService.state().getMetaData(); + ImmutableOpenMap indicesMetaData = metaData.indices(); + for (DocWriteRequest actionRequest : bulkRequest.requests) { + IndexRequest indexRequest = getIndexWriteRequest(actionRequest); + if (indexRequest != null) { + // get pipeline from request + String pipeline = indexRequest.getPipeline(); + if (pipeline == null) { + // start to look for default pipeline via settings found in the index meta data + IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index()); + // check the alias for the index request (this is how normal index requests are modeled) + if (indexMetaData == null && indexRequest.index() != null) { + AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(indexRequest.index()); + if (indexOrAlias != null && indexOrAlias.isAlias()) { + AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias; + indexMetaData = alias.getWriteIndex(); + } } - } else if (indexRequest.index() != null) { - // No index exists yet (and is valid request), so matching index templates to look for a default pipeline - List templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index()); - assert (templates != null); - String defaultPipeline = IngestService.NOOP_PIPELINE_NAME; - // order of templates are highest order first, break if we find a default_pipeline - for (IndexTemplateMetaData template : templates) { - final Settings settings = template.settings(); - if (IndexSettings.DEFAULT_PIPELINE.exists(settings)) { - defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings); - break; + // check the alias for the action request (this is how upserts are modeled) + if (indexMetaData == null && actionRequest.index() != null) { + AliasOrIndex indexOrAlias = metaData.getAliasAndIndexLookup().get(actionRequest.index()); + if (indexOrAlias != null && indexOrAlias.isAlias()) { + AliasOrIndex.Alias alias = (AliasOrIndex.Alias) indexOrAlias; + indexMetaData = alias.getWriteIndex(); } } - indexRequest.setPipeline(defaultPipeline); - if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) { - hasIndexRequestsWithPipelines = true; + if (indexMetaData != null) { + // Find the default pipeline if one is defined from and existing index. + String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings()); + indexRequest.setPipeline(defaultPipeline); + if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) { + hasIndexRequestsWithPipelines = true; + } + } else if (indexRequest.index() != null) { + // No index exists yet (and is valid request), so matching index templates to look for a default pipeline + List templates + = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index()); + assert (templates != null); + String defaultPipeline = IngestService.NOOP_PIPELINE_NAME; + // order of templates are highest order first, break if we find a default_pipeline + for (IndexTemplateMetaData template : templates) { + final Settings settings = template.settings(); + if (IndexSettings.DEFAULT_PIPELINE.exists(settings)) { + defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings); + break; + } + } + indexRequest.setPipeline(defaultPipeline); + if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) { + hasIndexRequestsWithPipelines = true; + } } + } else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) { + hasIndexRequestsWithPipelines = true; } - } else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) { - hasIndexRequestsWithPipelines = true; } } - } - if (hasIndexRequestsWithPipelines) { - // this method (doExecute) will be called again, but with the bulk requests updated from the ingest node processing but - // also with IngestService.NOOP_PIPELINE_NAME on each request. This ensures that this on the second time through this method, - // this path is never taken. - try { - if (clusterService.localNode().isIngestNode()) { - processBulkIndexIngestRequest(task, bulkRequest, listener); - } else { - ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener); + if (hasIndexRequestsWithPipelines) { + // this method (doExecute) will be called again, but with the bulk requests updated from the ingest node processing but + // also with IngestService.NOOP_PIPELINE_NAME on each request. This ensures that this on the second time through + // this method, this path is never taken. + try { + if (clusterService.localNode().isIngestNode()) { + processBulkIndexIngestRequest(task, bulkRequest, listener); + } else { + ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener); + } + } catch (Exception e) { + listener.onFailure(e); } - } catch (Exception e) { - listener.onFailure(e); + return; } - return; - } - if (needToCheck()) { - // Attempt to create all the indices that we're going to need during the bulk before we start. - // Step 1: collect all the indices in the request - final Set indices = bulkRequest.requests.stream() + if (needToCheck()) { + // Attempt to create all the indices that we're going to need during the bulk before we start. + // Step 1: collect all the indices in the request + final Set indices = bulkRequest.requests.stream() // delete requests should not attempt to create the index (if the index does not // exists), unless an external versioning is used - .filter(request -> request.opType() != DocWriteRequest.OpType.DELETE + .filter(request -> request.opType() != DocWriteRequest.OpType.DELETE || request.versionType() == VersionType.EXTERNAL || request.versionType() == VersionType.EXTERNAL_GTE) - .map(DocWriteRequest::index) - .collect(Collectors.toSet()); - /* Step 2: filter that to indices that don't exist and we can create. At the same time build a map of indices we can't create - * that we'll use when we try to run the requests. */ - final Map indicesThatCannotBeCreated = new HashMap<>(); - Set autoCreateIndices = new HashSet<>(); - ClusterState state = clusterService.state(); - for (String index : indices) { - boolean shouldAutoCreate; - try { - shouldAutoCreate = shouldAutoCreate(index, state); - } catch (IndexNotFoundException e) { - shouldAutoCreate = false; - indicesThatCannotBeCreated.put(index, e); - } - if (shouldAutoCreate) { - autoCreateIndices.add(index); + .map(DocWriteRequest::index) + .collect(Collectors.toSet()); + /* Step 2: filter that to indices that don't exist and we can create. At the same time build a map of + indices we can't create that we'll use when we try to run the requests. */ + final Map indicesThatCannotBeCreated = new HashMap<>(); + Set autoCreateIndices = new HashSet<>(); + ClusterState state = clusterService.state(); + for (String index : indices) { + boolean shouldAutoCreate; + try { + shouldAutoCreate = shouldAutoCreate(index, state); + } catch (IndexNotFoundException e) { + shouldAutoCreate = false; + indicesThatCannotBeCreated.put(index, e); + } + if (shouldAutoCreate) { + autoCreateIndices.add(index); + } } - } - // Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back. - if (autoCreateIndices.isEmpty()) { - executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated); - } else { - final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size()); - for (String index : autoCreateIndices) { - createIndex(index, bulkRequest.timeout(), new ActionListener() { - @Override - public void onResponse(CreateIndexResponse result) { - if (counter.decrementAndGet() == 0) { - threadPool.executor(ThreadPool.Names.WRITE).execute( - () -> executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated)); + // Step 3: create all the indices that are missing, if there are any missing. + // start the bulk after all the creates come back. + if (autoCreateIndices.isEmpty()) { + executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated); + } else { + final AtomicInteger counter = new AtomicInteger(autoCreateIndices.size()); + for (String index : autoCreateIndices) { + createIndex(index, bulkRequest.timeout(), new ActionListener() { + @Override + public void onResponse(CreateIndexResponse result) { + if (counter.decrementAndGet() == 0) { + threadPool.executor(ThreadPool.Names.WRITE).execute( + () -> executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated)); + } } - } - @Override - public void onFailure(Exception e) { - if (!(ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException)) { - // fail all requests involving this index, if create didn't work - for (int i = 0; i < bulkRequest.requests.size(); i++) { - DocWriteRequest request = bulkRequest.requests.get(i); - if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) { - bulkRequest.requests.set(i, null); + @Override + public void onFailure(Exception e) { + if (!(ExceptionsHelper.unwrapCause(e) instanceof ResourceAlreadyExistsException)) { + // fail all requests involving this index, if create didn't work + for (int i = 0; i < bulkRequest.requests.size(); i++) { + DocWriteRequest request = bulkRequest.requests.get(i); + if (request != null && setResponseFailureIfIndexMatches(responses, i, request, index, e)) { + bulkRequest.requests.set(i, null); + } } } + if (counter.decrementAndGet() == 0) { + executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> { + inner.addSuppressed(e); + listener.onFailure(inner); + }), responses, indicesThatCannotBeCreated); + } } - if (counter.decrementAndGet() == 0) { - executeBulk(task, bulkRequest, startTime, ActionListener.wrap(listener::onResponse, inner -> { - inner.addSuppressed(e); - listener.onFailure(inner); - }), responses, indicesThatCannotBeCreated); - } - } - }); + }); + } } + } else { + executeBulk(task, bulkRequest, startTime, listener, responses, emptyMap()); } - } else { - executeBulk(task, bulkRequest, startTime, listener, responses, emptyMap()); } } From 676e941432a374e55000aecc2078ed1e9ee3281a Mon Sep 17 00:00:00 2001 From: Zachary Tong Date: Thu, 29 Aug 2019 13:29:16 -0400 Subject: [PATCH 2/2] Address review comments --- .../action/bulk/TransportBulkAction.java | 34 ++++++++----------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index a9da7d70066c4..e9a7c5efe3fea 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -158,31 +158,27 @@ private final class BulkExecutor extends ActionRunnable { private final ClusterStateObserver recoveredObserver; private final BulkRequest bulkRequest; private final Task task; - long startTime = -1; + long startTime; BulkExecutor(Task task, BulkRequest bulkRequest, ActionListener listener) { super(listener); this.recoveredObserver = new ClusterStateObserver(clusterService, bulkRequest.timeout(), logger, threadPool.getThreadContext()); this.bulkRequest = bulkRequest; this.task = task; + startTime = relativeTime(); } @Override protected void doRun() { - if (startTime == -1) { - startTime = relativeTime(); - } - ClusterBlockException blockException = clusterService.state().blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + ClusterState currentState = clusterService.state(); + ClusterBlockException blockException = currentState.blocks().globalBlockedException(ClusterBlockLevel.WRITE); + if (blockException != null) { - if (recoveredObserver.isTimedOut()) { - // we running as a last attempt after a timeout has happened. don't retry - listener.onFailure(blockException); - return; - } recoveredObserver.waitForNextChange(new ClusterStateObserver.Listener() { @Override - public void onNewClusterState(ClusterState state) { - run(); + public void onNewClusterState(ClusterState newState) { + // predicate passed, begin preparing for the bulk + prepForBulk(newState); } @Override @@ -192,22 +188,21 @@ public void onClusterServiceClose() { @Override public void onTimeout(TimeValue timeout) { - // Try one more time... - run(); + listener.onFailure(blockException); } - }); + }, newState -> newState.blocks().global(ClusterBlockLevel.WRITE).isEmpty()); return; } // All good, begin preparing for the bulk request - prepForBulk(); + prepForBulk(currentState); } - private void prepForBulk() { + private void prepForBulk(ClusterState clusterState) { final AtomicArray responses = new AtomicArray<>(bulkRequest.requests.size()); boolean hasIndexRequestsWithPipelines = false; - final MetaData metaData = clusterService.state().getMetaData(); + final MetaData metaData = clusterState.getMetaData(); ImmutableOpenMap indicesMetaData = metaData.indices(); for (DocWriteRequest actionRequest : bulkRequest.requests) { IndexRequest indexRequest = getIndexWriteRequest(actionRequest); @@ -296,11 +291,10 @@ private void prepForBulk() { indices we can't create that we'll use when we try to run the requests. */ final Map indicesThatCannotBeCreated = new HashMap<>(); Set autoCreateIndices = new HashSet<>(); - ClusterState state = clusterService.state(); for (String index : indices) { boolean shouldAutoCreate; try { - shouldAutoCreate = shouldAutoCreate(index, state); + shouldAutoCreate = shouldAutoCreate(index, clusterState); } catch (IndexNotFoundException e) { shouldAutoCreate = false; indicesThatCannotBeCreated.put(index, e);