From df51845e7e0b876c782face94fa61d73980f4608 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 21 Nov 2019 17:56:54 -0500 Subject: [PATCH 1/5] Replace required pipeline with final pipeline This commit enhances the required pipeline functionality by changing it so that default/request pipelines can also be executed, but the required pipeline is always executed last. This gives users the flexibility to execute their own indexing pipelines, but also ensure that any required pipelines are also executed. Since such pipelines are executed last, we change the name of required pipelines to final pipelines. --- docs/reference/index-modules.asciidoc | 12 +- docs/reference/ingest.asciidoc | 7 +- .../test/ingest/240_required_pipeline.yml | 25 +- qa/a.txt | 2 + .../action/bulk/TransportBulkAction.java | 83 ++--- .../action/index/IndexRequest.java | 30 ++ .../common/settings/IndexScopedSettings.java | 2 +- .../elasticsearch/index/IndexSettings.java | 57 +--- .../elasticsearch/ingest/IngestService.java | 99 ++++-- .../action/bulk/TransportBulkActionTests.java | 10 +- .../elasticsearch/index/FinalPipelineIT.java | 308 ++++++++++++++++++ .../index/RequiredPipelineIT.java | 133 -------- .../action/TransportResumeFollowAction.java | 2 +- 13 files changed, 465 insertions(+), 305 deletions(-) create mode 100644 qa/a.txt create mode 100644 server/src/test/java/org/elasticsearch/index/FinalPipelineIT.java delete mode 100644 server/src/test/java/org/elasticsearch/index/RequiredPipelineIT.java diff --git a/docs/reference/index-modules.asciidoc b/docs/reference/index-modules.asciidoc index 8ad469682fbf1..6798272ceed81 100644 --- a/docs/reference/index-modules.asciidoc +++ b/docs/reference/index-modules.asciidoc @@ -243,12 +243,12 @@ specific index module: overridden using the `pipeline` parameter. The special pipeline name `_none` indicates no ingest pipeline should be run. - `index.required_pipeline`:: - The required <> pipeline for this index. Index requests - will fail if the required pipeline is set and the pipeline does not exist. - The required pipeline can not be overridden with the `pipeline` parameter. A - default pipeline and a required pipeline can not both be set. The special - pipeline name `_none` indicates no ingest pipeline will run. + `index.final_pipeline`:: + The final <> pipeline for this index. Index requests + will fail if the final pipeline is set and the pipeline does not exist. + The final pipeline always runs after the request pipeline (if specified) and + the default pipeline (if it exists). The special pipeline name `_none` + indicates no ingest pipeline will run. [float] === Settings in other index modules diff --git a/docs/reference/ingest.asciidoc b/docs/reference/ingest.asciidoc index c5e7be1ed54a4..7f4e4d512e3e4 100644 --- a/docs/reference/ingest.asciidoc +++ b/docs/reference/ingest.asciidoc @@ -3,7 +3,7 @@ [partintro] -- -Use an ingest node to pre-process documents before the actual document indexing happens. +Use an ingest node to pre-process documents before the actual document indexing happens. The ingest node intercepts bulk and index requests, it applies transformations, and it then passes the documents back to the index or bulk APIs. @@ -23,7 +23,7 @@ another processor that renames a field. The <> then the configured pipelines. To use a pipeline, simply specify the `pipeline` parameter on an index or bulk request. This -way, the ingest node knows which pipeline to use. +way, the ingest node knows which pipeline to use. For example: Create a pipeline @@ -78,6 +78,9 @@ Responseļ¼š An index may also declare a <> that will be used in the absence of the `pipeline` parameter. +Finally, an index may also declare a <> +that will be executed after any request or default pipeline (if any). + See <> for more information about creating, adding, and deleting pipelines. -- diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/240_required_pipeline.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/240_required_pipeline.yml index 01553bcf40acc..87979b8c6319d 100644 --- a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/240_required_pipeline.yml +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/240_required_pipeline.yml @@ -6,7 +6,7 @@ teardown: ignore: 404 --- -"Test index with required pipeline": +"Test index with final pipeline": - do: ingest.put_pipeline: id: "my_pipeline" @@ -23,14 +23,14 @@ teardown: ] } - match: { acknowledged: true } - # required pipeline via index + # final pipeline via index - do: indices.create: index: test body: settings: index: - required_pipeline: "my_pipeline" + final_pipeline: "my_pipeline" aliases: test_alias: {} @@ -46,7 +46,7 @@ teardown: id: 1 - match: { _source.bytes_source_field: "1kb" } - match: { _source.bytes_target_field: 1024 } - # required pipeline via alias + # final pipeline via alias - do: index: index: test_alias @@ -59,7 +59,7 @@ teardown: id: 2 - match: { _source.bytes_source_field: "1kb" } - match: { _source.bytes_target_field: 1024 } - # required pipeline via upsert + # final pipeline via upsert - do: update: index: test @@ -75,7 +75,7 @@ teardown: id: 3 - match: { _source.bytes_source_field: "1kb" } - match: { _source.bytes_target_field: 1024 } - # required pipeline via scripted upsert + # final pipeline via scripted upsert - do: update: index: test @@ -92,7 +92,7 @@ teardown: id: 4 - match: { _source.bytes_source_field: "1kb" } - match: { _source.bytes_target_field: 1024 } - # required pipeline via doc_as_upsert + # final pipeline via doc_as_upsert - do: update: index: test @@ -106,7 +106,7 @@ teardown: id: 5 - match: { _source.bytes_source_field: "1kb" } - match: { _source.bytes_target_field: 1024 } - # required pipeline via bulk upsert + # final pipeline via bulk upsert # note - bulk scripted upsert's execute the pipeline before the script, so any data referenced by the pipeline # needs to be in the upsert, not the script - do: @@ -164,12 +164,3 @@ teardown: - match: { docs.5._source.bytes_source_field: "3kb" } - match: { docs.5._source.bytes_target_field: 3072 } - match: { docs.5._source.ran_script: true } - - # bad request, request pipeline can not be specified - - do: - catch: /illegal_argument_exception.*request pipeline \[pipeline\] can not override required pipeline \[my_pipeline\]/ - index: - index: test - id: 9 - pipeline: "pipeline" - body: {bytes_source_field: "1kb"} diff --git a/qa/a.txt b/qa/a.txt new file mode 100644 index 0000000000000..02cbf171d690b --- /dev/null +++ b/qa/a.txt @@ -0,0 +1,2 @@ +This commit adds coverage to the docs for some missing built-in shard +allocation attributes. diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index d42adcbd62ca4..41205d9dd3a81 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -276,9 +276,10 @@ static boolean resolveRequiredOrDefaultPipeline(DocWriteRequest originalReque if (indexRequest.isPipelineResolved() == false) { final String requestPipeline = indexRequest.getPipeline(); indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); - boolean requestCanOverridePipeline = true; - String requiredPipeline = null; - // start to look for default or required pipelines via settings found in the index meta data + indexRequest.setFinalPipeline(IngestService.NOOP_PIPELINE_NAME); + String defaultPipeline = null; + String finalPipeline = null; + // start to look for default or final pipelines via settings found in the index meta data IndexMetaData indexMetaData = metaData.indices().get(originalRequest.index()); // check the alias for the index request (this is how normal index requests are modeled) if (indexMetaData == null && indexRequest.index() != null) { @@ -298,64 +299,42 @@ static boolean resolveRequiredOrDefaultPipeline(DocWriteRequest originalReque } if (indexMetaData != null) { final Settings indexSettings = indexMetaData.getSettings(); - if (IndexSettings.REQUIRED_PIPELINE.exists(indexSettings)) { - // find the required pipeline if one is defined from an existing index - requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(indexSettings); - assert IndexSettings.DEFAULT_PIPELINE.get(indexSettings).equals(IngestService.NOOP_PIPELINE_NAME) : - IndexSettings.DEFAULT_PIPELINE.get(indexSettings); - indexRequest.setPipeline(requiredPipeline); - requestCanOverridePipeline = false; - } else { - // find the default pipeline if one is defined from an existing index - String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexSettings); + if (IndexSettings.DEFAULT_PIPELINE.exists(indexSettings)) { + // find the default pipeline if one is defined from an existing index setting + defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexSettings); indexRequest.setPipeline(defaultPipeline); } + if (IndexSettings.FINAL_PIPELINE.exists(indexSettings)) { + // find the final pipeline if one is defined from an existing index setting + finalPipeline = IndexSettings.FINAL_PIPELINE.get(indexSettings); + indexRequest.setFinalPipeline(finalPipeline); + } } else if (indexRequest.index() != null) { - // the index does not exist yet (and is valid request), so match index templates to look for a default pipeline + // the index does not exist yet (and this is a valid request), so match index templates to look for pipelines List templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index()); assert (templates != null); - // order of templates are highest order first, we have to iterate through them all though - String defaultPipeline = null; - for (IndexTemplateMetaData template : templates) { + // order of templates are highest order first + for (final IndexTemplateMetaData template : templates) { final Settings settings = template.settings(); - if (requiredPipeline == null && IndexSettings.REQUIRED_PIPELINE.exists(settings)) { - requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(settings); - requestCanOverridePipeline = false; - // we can not break in case a lower-order template has a default pipeline that we need to reject - } else if (defaultPipeline == null && IndexSettings.DEFAULT_PIPELINE.exists(settings)) { + if (defaultPipeline == null && IndexSettings.DEFAULT_PIPELINE.exists(settings)) { defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings); - // we can not break in case a lower-order template has a required pipeline that we need to reject + // we can not break in case a lower-order template has a final pipeline that we need to collect + } + if (finalPipeline == null && IndexSettings.FINAL_PIPELINE.exists(settings)) { + finalPipeline = IndexSettings.FINAL_PIPELINE.get(settings); + // we can not break in case a lower-order template has a default pipeline that we need to collect + } + if (defaultPipeline != null && finalPipeline != null) { + // we can break if we have already collected a default and final pipeline + break; } } - if (requiredPipeline != null && defaultPipeline != null) { - // we can not have picked up a required and a default pipeline from applying templates - final String message = String.format( - Locale.ROOT, - "required pipeline [%s] and default pipeline [%s] can not both be set", - requiredPipeline, - defaultPipeline); - throw new IllegalArgumentException(message); - } - final String pipeline; - if (requiredPipeline != null) { - pipeline = requiredPipeline; - } else { - pipeline = Objects.requireNonNullElse(defaultPipeline, IngestService.NOOP_PIPELINE_NAME); - } - indexRequest.setPipeline(pipeline); + indexRequest.setPipeline(Objects.requireNonNullElse(defaultPipeline, IngestService.NOOP_PIPELINE_NAME)); + indexRequest.setFinalPipeline(Objects.requireNonNullElse(finalPipeline, IngestService.NOOP_PIPELINE_NAME)); } if (requestPipeline != null) { - if (requestCanOverridePipeline == false) { - final String message = String.format( - Locale.ROOT, - "request pipeline [%s] can not override required pipeline [%s]", - requestPipeline, - requiredPipeline); - throw new IllegalArgumentException(message); - } else { - indexRequest.setPipeline(requestPipeline); - } + indexRequest.setPipeline(requestPipeline); } /* @@ -371,8 +350,10 @@ static boolean resolveRequiredOrDefaultPipeline(DocWriteRequest originalReque indexRequest.isPipelineResolved(true); } - // Return whether this index request has a pipeline - return IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false; + + // return whether this index request has a pipeline + return IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false + || IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false; } boolean needToCheck() { diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 87866de8ab3eb..34c86626d2f7c 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -98,6 +98,7 @@ public class IndexRequest extends ReplicatedWriteRequest implement private XContentType contentType; private String pipeline; + private String finalPipeline; private boolean isPipelineResolved; @@ -126,6 +127,9 @@ public IndexRequest(StreamInput in) throws IOException { version = in.readLong(); versionType = VersionType.fromValue(in.readByte()); pipeline = in.readOptionalString(); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + finalPipeline = in.readOptionalString(); + } if (in.getVersion().onOrAfter(Version.V_7_5_0)) { isPipelineResolved = in.readBoolean(); } @@ -202,6 +206,9 @@ public ActionRequestValidationException validate() { validationException = addValidationError("pipeline cannot be an empty string", validationException); } + if (finalPipeline != null && finalPipeline.isEmpty()) { + validationException = addValidationError("final pipeline cannot be an empty string", validationException); + } return validationException; } @@ -268,6 +275,26 @@ public String getPipeline() { return this.pipeline; } + /** + * Sets the final ingest pipeline to be executed before indexing the document. + * + * @param finalPipeline the name of the final pipeline + * @return this index request + */ + public IndexRequest setFinalPipeline(final String finalPipeline) { + this.finalPipeline = finalPipeline; + return this; + } + + /** + * Returns the final ingest pipeline to be executed before indexing the document. + * + * @return the name of the final pipeline + */ + public String getFinalPipeline() { + return this.finalPipeline; + } + /** * Sets if the pipeline for this request has been resolved by the coordinating node. * @@ -589,6 +616,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(version); out.writeByte(versionType.getValue()); out.writeOptionalString(pipeline); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeOptionalString(finalPipeline); + } if (out.getVersion().onOrAfter(Version.V_7_5_0)) { out.writeBoolean(isPipelineResolved); } diff --git a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index aad12323383d0..f83012f2db32b 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -161,7 +161,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { EngineConfig.INDEX_CODEC_SETTING, IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS, IndexSettings.DEFAULT_PIPELINE, - IndexSettings.REQUIRED_PIPELINE, + IndexSettings.FINAL_PIPELINE, MetaDataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING, // validate that built-in similarities don't get redefined diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettings.java b/server/src/main/java/org/elasticsearch/index/IndexSettings.java index a4771c8666a1e..0c142da9b0481 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -305,67 +305,16 @@ public final class IndexSettings { new Setting<>("index.default_pipeline", IngestService.NOOP_PIPELINE_NAME, Function.identity(), - new DefaultPipelineValidator(), Property.Dynamic, Property.IndexScope); - public static final Setting REQUIRED_PIPELINE = - new Setting<>("index.required_pipeline", + public static final Setting FINAL_PIPELINE = + new Setting<>("index.final_pipeline", IngestService.NOOP_PIPELINE_NAME, Function.identity(), - new RequiredPipelineValidator(), Property.Dynamic, Property.IndexScope); - static class DefaultPipelineValidator implements Setting.Validator { - - @Override - public void validate(final String value) { - - } - - @Override - public void validate(final String value, final Map, Object> settings) { - final String requiredPipeline = (String) settings.get(IndexSettings.REQUIRED_PIPELINE); - if (value.equals(IngestService.NOOP_PIPELINE_NAME) == false - && requiredPipeline.equals(IngestService.NOOP_PIPELINE_NAME) == false) { - throw new IllegalArgumentException( - "index has a default pipeline [" + value + "] and a required pipeline [" + requiredPipeline + "]"); - } - } - - @Override - public Iterator> settings() { - final List> settings = List.of(REQUIRED_PIPELINE); - return settings.iterator(); - } - - } - - static class RequiredPipelineValidator implements Setting.Validator { - - @Override - public void validate(final String value) { - - } - - @Override - public void validate(final String value, final Map, Object> settings) { - final String defaultPipeline = (String) settings.get(IndexSettings.DEFAULT_PIPELINE); - if (value.equals(IngestService.NOOP_PIPELINE_NAME) && defaultPipeline.equals(IngestService.NOOP_PIPELINE_NAME) == false) { - throw new IllegalArgumentException( - "index has a required pipeline [" + value + "] and a default pipeline [" + defaultPipeline + "]"); - } - } - - @Override - public Iterator> settings() { - final List> settings = List.of(DEFAULT_PIPELINE); - return settings.iterator(); - } - - } - /** * Marks an index to be searched throttled. This means that never more than one shard of such an index will be searched concurrently */ @@ -613,7 +562,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_IDLE_AFTER, this::setSearchIdleAfter); scopedSettings.addSettingsUpdateConsumer(MAX_REGEX_LENGTH_SETTING, this::setMaxRegexLength); scopedSettings.addSettingsUpdateConsumer(DEFAULT_PIPELINE, this::setDefaultPipeline); - scopedSettings.addSettingsUpdateConsumer(REQUIRED_PIPELINE, this::setRequiredPipeline); + scopedSettings.addSettingsUpdateConsumer(FINAL_PIPELINE, this::setRequiredPipeline); scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, this::setSoftDeleteRetentionOperations); scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_THROTTLED, this::setSearchThrottled); scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING, this::setRetentionLeaseMillis); diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index b17b530aca9f0..c8ef75d90a176 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -333,15 +333,15 @@ void validatePipeline(Map ingestInfos, PutPipelineReq public void executeBulkRequest(int numberOfActionRequests, Iterable> actionRequests, - BiConsumer itemFailureHandler, - BiConsumer completionHandler, - IntConsumer itemDroppedHandler) { + BiConsumer onFailure, + BiConsumer onCompletion, + IntConsumer onDropped) { threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { - completionHandler.accept(null, e); + onCompletion.accept(null, e); } @Override @@ -353,52 +353,81 @@ protected void doRun() { IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(actionRequest); if (indexRequest == null) { if (counter.decrementAndGet() == 0){ - completionHandler.accept(originalThread, null); + onCompletion.accept(originalThread, null); } assert counter.get() >= 0; continue; } - String pipelineId = indexRequest.getPipeline(); - if (NOOP_PIPELINE_NAME.equals(pipelineId)) { - if (counter.decrementAndGet() == 0){ - completionHandler.accept(originalThread, null); + + final String pipelineId = indexRequest.getPipeline(); + indexRequest.setPipeline(NOOP_PIPELINE_NAME); + final String finalPipelineId = indexRequest.getFinalPipeline(); + indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME); + final List pipelines; + if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false + && IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) { + pipelines = List.of(pipelineId, finalPipelineId); + } else if (IngestService.NOOP_PIPELINE_NAME.equals(pipelineId) == false ) { + pipelines = List.of(pipelineId); + } else if (IngestService.NOOP_PIPELINE_NAME.equals(finalPipelineId) == false) { + pipelines = List.of(finalPipelineId); + } else { + if (counter.decrementAndGet() == 0) { + onCompletion.accept(originalThread, null); } assert counter.get() >= 0; continue; } - final int slot = i; - try { - PipelineHolder holder = pipelines.get(pipelineId); - if (holder == null) { - throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); - } - Pipeline pipeline = holder.pipeline; - innerExecute(slot, indexRequest, pipeline, itemDroppedHandler, e -> { - if (e == null) { - // this shouldn't be needed here but we do it for consistency with index api - // which requires it to prevent double execution - indexRequest.setPipeline(NOOP_PIPELINE_NAME); - } else { - itemFailureHandler.accept(slot, e); - } + executePipelines(i, pipelines.iterator(), indexRequest, onDropped, onFailure, counter, onCompletion, originalThread); - if (counter.decrementAndGet() == 0){ - completionHandler.accept(originalThread, null); - } - assert counter.get() >= 0; - }); - } catch (Exception e) { - itemFailureHandler.accept(slot, e); - if (counter.decrementAndGet() == 0){ - completionHandler.accept(originalThread, null); + i++; + } + } + }); + } + + private void executePipelines( + final int slot, + final Iterator it, + final IndexRequest indexRequest, + final IntConsumer onDropped, + final BiConsumer onFailure, + final AtomicInteger counter, + final BiConsumer onCompletion, + final Thread originalThread + ) { + while (it.hasNext()) { + final String pipelineId = it.next(); + try { + PipelineHolder holder = pipelines.get(pipelineId); + if (holder == null) { + throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); + } + Pipeline pipeline = holder.pipeline; + innerExecute(slot, indexRequest, pipeline, onDropped, e -> { + if (e != null) { + onFailure.accept(slot, e); + } + + if (it.hasNext()) { + executePipelines(slot, it, indexRequest, onDropped, onFailure, counter, onCompletion, originalThread); + } else { + if (counter.decrementAndGet() == 0) { + onCompletion.accept(originalThread, null); } assert counter.get() >= 0; } - i++; + }); + } catch (Exception e) { + onFailure.accept(slot, e); + if (counter.decrementAndGet() == 0) { + onCompletion.accept(originalThread, null); } + assert counter.get() >= 0; + break; } - }); + } } public IngestStats stats() { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java index 7160146871a53..0cbe7c31a1ef4 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java @@ -201,7 +201,7 @@ public void testResolveRequiredOrDefaultPipelineDefaultPipeline() { public void testResolveRequiredOrDefaultPipelineRequiredPipeline() { IndexMetaData.Builder builder = IndexMetaData.builder("idx") - .settings(settings(Version.CURRENT).put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required-pipeline")) + .settings(settings(Version.CURRENT).put(IndexSettings.FINAL_PIPELINE.getKey(), "required-pipeline")) .numberOfShards(1) .numberOfReplicas(0) .putAlias(AliasMetaData.builder("alias").writeIndex(true).build()); @@ -224,7 +224,7 @@ public void testResolveRequiredOrDefaultPipelineRequiredPipeline() { // index name matches with ITMD: IndexTemplateMetaData.Builder templateBuilder = IndexTemplateMetaData.builder("name1") .patterns(List.of("id*")) - .settings(settings(Version.CURRENT).put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required-pipeline")); + .settings(settings(Version.CURRENT).put(IndexSettings.FINAL_PIPELINE.getKey(), "required-pipeline")); metaData = MetaData.builder().put(templateBuilder).build(); indexRequest = new IndexRequest("idx"); result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData); @@ -236,7 +236,7 @@ public void testResolveRequiredOrDefaultPipelineRequiredPipeline() { public void testResolveRequiredOrDefaultAndRequiredPipeline() { IndexTemplateMetaData.Builder builder1 = IndexTemplateMetaData.builder("name1") .patterns(List.of("i*")) - .settings(settings(Version.CURRENT).put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required-pipeline")); + .settings(settings(Version.CURRENT).put(IndexSettings.FINAL_PIPELINE.getKey(), "required-pipeline")); IndexTemplateMetaData.Builder builder2 = IndexTemplateMetaData.builder("name2") .patterns(List.of("id*")) .settings(settings(Version.CURRENT).put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default-pipeline")); @@ -287,7 +287,7 @@ public void testResolveRequiredOrDefaultPipelineRequestPipeline() { // request pipeline with required pipeline: { IndexMetaData.Builder builder = IndexMetaData.builder("idx") - .settings(settings(Version.CURRENT).put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required-pipeline")) + .settings(settings(Version.CURRENT).put(IndexSettings.FINAL_PIPELINE.getKey(), "required-pipeline")) .numberOfShards(1) .numberOfReplicas(0); MetaData metaData = MetaData.builder().put(builder).build(); @@ -301,7 +301,7 @@ public void testResolveRequiredOrDefaultPipelineRequestPipeline() { // request pipeline set to required pipeline: { IndexMetaData.Builder builder = IndexMetaData.builder("idx") - .settings(settings(Version.CURRENT).put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required-pipeline")) + .settings(settings(Version.CURRENT).put(IndexSettings.FINAL_PIPELINE.getKey(), "required-pipeline")) .numberOfShards(1) .numberOfReplicas(0); MetaData metaData = MetaData.builder().put(builder).build(); diff --git a/server/src/test/java/org/elasticsearch/index/FinalPipelineIT.java b/server/src/test/java/org/elasticsearch/index/FinalPipelineIT.java new file mode 100644 index 0000000000000..c179f2a3c7ff2 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/FinalPipelineIT.java @@ -0,0 +1,308 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index; + +import org.elasticsearch.action.get.GetRequestBuilder; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.ingest.DeletePipelineRequest; +import org.elasticsearch.action.ingest.GetPipelineRequest; +import org.elasticsearch.action.ingest.GetPipelineResponse; +import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.ingest.AbstractProcessor; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.PipelineConfiguration; +import org.elasticsearch.ingest.Processor; +import org.elasticsearch.plugins.IngestPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.ResourceWatcherService; +import org.junit.After; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.hasToString; + +public class FinalPipelineIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return List.of(TestPlugin.class); + } + + @After + public void cleanUpPipelines() { + final GetPipelineResponse response = client().admin() + .cluster() + .getPipeline(new GetPipelineRequest("default_pipeline", "final_pipeline", "request_pipeline")) + .actionGet(); + for (final PipelineConfiguration pipeline : response.pipelines()) { + client().admin().cluster().deletePipeline(new DeletePipelineRequest(pipeline.getId())).actionGet(); + } + } + + public void testFinalPipeline() { + final Settings settings = Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "final_pipeline").build(); + createIndex("index", settings); + + // this asserts that the final_pipeline was used, without us having to actually create the pipeline etc. + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> client().prepareIndex("index").setId("1").setSource(Map.of("field", "value")).get()); + assertThat(e, hasToString(containsString("pipeline with id [final_pipeline] does not exist"))); + } + + public void testRequestPipelineAndFinalPipeline() { + final BytesReference requestPipelineBody = new BytesArray("{\"processors\": [{\"request\": {}}]}"); + client().admin() + .cluster() + .putPipeline(new PutPipelineRequest("request_pipeline", requestPipelineBody, XContentType.JSON)) + .actionGet(); + final BytesReference finalPipelineBody = new BytesArray("{\"processors\": [{\"final\": {\"exists\":\"request\"}}]}"); + client().admin() + .cluster() + .putPipeline(new PutPipelineRequest("final_pipeline", finalPipelineBody, XContentType.JSON)) + .actionGet(); + final Settings settings = Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "final_pipeline").build(); + createIndex("index", settings); + final IndexRequestBuilder index = client().prepareIndex("index").setId("1"); + index.setSource(Map.of("field", "value")); + index.setPipeline("request_pipeline"); + index.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + final IndexResponse response = index.get(); + assertThat(response.status(), equalTo(RestStatus.CREATED)); + final GetRequestBuilder get = client().prepareGet("index", "1"); + final GetResponse getResponse = get.get(); + assertTrue(getResponse.isExists()); + final Map source = getResponse.getSourceAsMap(); + assertThat(source, hasKey("request")); + assertTrue((boolean) source.get("request")); + assertThat(source, hasKey("final")); + assertTrue((boolean) source.get("final")); + } + + public void testDefaultAndFinalPipeline() { + final BytesReference defaultPipelineBody = new BytesArray("{\"processors\": [{\"default\": {}}]}"); + client().admin() + .cluster() + .putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON)) + .actionGet(); + final BytesReference finalPipelineBody = new BytesArray("{\"processors\": [{\"final\": {\"exists\":\"default\"}}]}"); + client().admin() + .cluster() + .putPipeline(new PutPipelineRequest("final_pipeline", finalPipelineBody, XContentType.JSON)) + .actionGet(); + final Settings settings = Settings.builder() + .put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline") + .put(IndexSettings.FINAL_PIPELINE.getKey(), "final_pipeline") + .build(); + createIndex("index", settings); + final IndexRequestBuilder index = client().prepareIndex("index").setId("1"); + index.setSource(Map.of("field", "value")); + index.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + final IndexResponse response = index.get(); + assertThat(response.status(), equalTo(RestStatus.CREATED)); + final GetRequestBuilder get = client().prepareGet("index", "1"); + final GetResponse getResponse = get.get(); + assertTrue(getResponse.isExists()); + final Map source = getResponse.getSourceAsMap(); + assertThat(source, hasKey("default")); + assertTrue((boolean) source.get("default")); + assertThat(source, hasKey("final")); + assertTrue((boolean) source.get("final")); + } + + public void testDefaultAndFinalPipelineFromTemplates() { + final BytesReference defaultPipelineBody = new BytesArray("{\"processors\": [{\"default\": {}}]}"); + client().admin() + .cluster() + .putPipeline(new PutPipelineRequest("default_pipeline", defaultPipelineBody, XContentType.JSON)) + .actionGet(); + final BytesReference finalPipelineBody = new BytesArray("{\"processors\": [{\"final\": {\"exists\":\"default\"}}]}"); + client().admin() + .cluster() + .putPipeline(new PutPipelineRequest("final_pipeline", finalPipelineBody, XContentType.JSON)) + .actionGet(); + final int lowOrder = randomIntBetween(0, Integer.MAX_VALUE - 1); + final int highOrder = randomIntBetween(lowOrder + 1, Integer.MAX_VALUE); + final int finalPipelineOrder; + final int defaultPipelineOrder; + if (randomBoolean()) { + defaultPipelineOrder = lowOrder; + finalPipelineOrder = highOrder; + } else { + defaultPipelineOrder = highOrder; + finalPipelineOrder = lowOrder; + } + final Settings defaultPipelineSettings = + Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build(); + admin().indices() + .preparePutTemplate("default") + .setPatterns(List.of("index*")) + .setOrder(defaultPipelineOrder) + .setSettings(defaultPipelineSettings) + .get(); + final Settings finalPipelineSettings = + Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "final_pipeline").build(); + admin().indices() + .preparePutTemplate("final") + .setPatterns(List.of("index*")) + .setOrder(finalPipelineOrder) + .setSettings(finalPipelineSettings) + .get(); + final IndexRequestBuilder index = client().prepareIndex("index").setId("1"); + index.setSource(Map.of("field", "value")); + index.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + final IndexResponse response = index.get(); + assertThat(response.status(), equalTo(RestStatus.CREATED)); + final GetRequestBuilder get = client().prepareGet("index", "1"); + final GetResponse getResponse = get.get(); + assertTrue(getResponse.isExists()); + final Map source = getResponse.getSourceAsMap(); + assertThat(source, hasKey("default")); + assertTrue((boolean) source.get("default")); + assertThat(source, hasKey("final")); + assertTrue((boolean) source.get("final")); + } + + public void testHighOrderFinalPipelinePreferred() throws IOException { + final int lowOrder = randomIntBetween(0, Integer.MAX_VALUE - 1); + final int highOrder = randomIntBetween(lowOrder + 1, Integer.MAX_VALUE); + final Settings lowOrderFinalPipelineSettings = + Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "low_order_final_pipeline").build(); + admin().indices() + .preparePutTemplate("low_order") + .setPatterns(List.of("index*")) + .setOrder(lowOrder) + .setSettings(lowOrderFinalPipelineSettings) + .get(); + final Settings highOrderFinalPipelineSettings = + Settings.builder().put(IndexSettings.FINAL_PIPELINE.getKey(), "high_order_final_pipeline").build(); + admin().indices() + .preparePutTemplate("high_order") + .setPatterns(List.of("index*")) + .setOrder(highOrder) + .setSettings(highOrderFinalPipelineSettings) + .get(); + + // this asserts that the high_order_final_pipeline was selected, without us having to actually create the pipeline etc. + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> client().prepareIndex("index").setId("1").setSource(Map.of("field", "value")).get()); + assertThat(e, hasToString(containsString("pipeline with id [high_order_final_pipeline] does not exist"))); + } + + public static class TestPlugin extends Plugin implements IngestPlugin { + + @Override + public Collection createComponents( + final Client client, + final ClusterService clusterService, + final ThreadPool threadPool, + final ResourceWatcherService resourceWatcherService, + final ScriptService scriptService, + final NamedXContentRegistry xContentRegistry, + final Environment environment, + final NodeEnvironment nodeEnvironment, + final NamedWriteableRegistry namedWriteableRegistry) { + return List.of(); + } + + @Override + public Map getProcessors(Processor.Parameters parameters) { + return Map.of( + "default", + (factories, tag, config) -> + new AbstractProcessor(tag) { + + @Override + public IngestDocument execute(final IngestDocument ingestDocument) throws Exception { + ingestDocument.setFieldValue("default", true); + return ingestDocument; + } + + @Override + public String getType() { + return "default"; + } + }, + "final", + (processorFactories, tag, config) -> { + final String exists = (String) config.remove("exists"); + return new AbstractProcessor(tag) { + @Override + public IngestDocument execute(final IngestDocument ingestDocument) throws Exception { + // this asserts that this pipeline is the final pipeline executed + if (exists != null) { + if (ingestDocument.getSourceAndMetadata().containsKey(exists) == false) { + throw new AssertionError( + "expected document to contain [" + exists + "] but was [" + ingestDocument.getSourceAndMetadata()); + } + } + ingestDocument.setFieldValue("final", true); + return ingestDocument; + } + + @Override + public String getType() { + return "final"; + } + }; + }, + "request", + (processorFactories, tag, config) -> + new AbstractProcessor(tag) { + @Override + public IngestDocument execute(final IngestDocument ingestDocument) throws Exception { + ingestDocument.setFieldValue("request", true); + return ingestDocument; + } + + @Override + public String getType() { + return "request"; + } + } + ); + } + } + +} diff --git a/server/src/test/java/org/elasticsearch/index/RequiredPipelineIT.java b/server/src/test/java/org/elasticsearch/index/RequiredPipelineIT.java deleted file mode 100644 index 9b7684393b024..0000000000000 --- a/server/src/test/java/org/elasticsearch/index/RequiredPipelineIT.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index; - -import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.test.ESIntegTestCase; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.hasToString; - -public class RequiredPipelineIT extends ESIntegTestCase { - - public void testRequiredPipeline() { - final Settings settings = Settings.builder().put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required_pipeline").build(); - createIndex("index", settings); - - // this asserts that the required_pipeline was used, without us having to actually create the pipeline etc. - final IllegalArgumentException e = expectThrows( - IllegalArgumentException.class, - () -> client().prepareIndex("index").setId("1").setSource(Map.of("field", "value")).get()); - assertThat(e, hasToString(containsString("pipeline with id [required_pipeline] does not exist"))); - } - - public void testDefaultAndRequiredPipeline() { - final Settings settings = Settings.builder() - .put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline") - .put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required_pipeline") - .build(); - final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> createIndex("index", settings)); - assertThat( - e, - hasToString(containsString("index has a default pipeline [default_pipeline] and a required pipeline [required_pipeline]"))); - } - - public void testDefaultAndRequiredPipelineFromTemplates() { - final int lowOrder = randomIntBetween(0, Integer.MAX_VALUE - 1); - final int highOrder = randomIntBetween(lowOrder + 1, Integer.MAX_VALUE); - final int requiredPipelineOrder; - final int defaultPipelineOrder; - if (randomBoolean()) { - defaultPipelineOrder = lowOrder; - requiredPipelineOrder = highOrder; - } else { - defaultPipelineOrder = highOrder; - requiredPipelineOrder = lowOrder; - } - final Settings defaultPipelineSettings = - Settings.builder().put(IndexSettings.DEFAULT_PIPELINE.getKey(), "default_pipeline").build(); - admin().indices() - .preparePutTemplate("default") - .setPatterns(List.of("index*")) - .setOrder(defaultPipelineOrder) - .setSettings(defaultPipelineSettings) - .get(); - final Settings requiredPipelineSettings = - Settings.builder().put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required_pipeline").build(); - admin().indices() - .preparePutTemplate("required") - .setPatterns(List.of("index*")) - .setOrder(requiredPipelineOrder) - .setSettings(requiredPipelineSettings) - .get(); - final IllegalArgumentException e = expectThrows( - IllegalArgumentException.class, - () -> client().prepareIndex("index").setId("1").setSource(Map.of("field", "value")).get()); - assertThat( - e, - hasToString(containsString( - "required pipeline [required_pipeline] and default pipeline [default_pipeline] can not both be set"))); - } - - public void testHighOrderRequiredPipelinePreferred() throws IOException { - final int lowOrder = randomIntBetween(0, Integer.MAX_VALUE - 1); - final int highOrder = randomIntBetween(lowOrder + 1, Integer.MAX_VALUE); - final Settings defaultPipelineSettings = - Settings.builder().put(IndexSettings.REQUIRED_PIPELINE.getKey(), "low_order_required_pipeline").build(); - admin().indices() - .preparePutTemplate("default") - .setPatterns(List.of("index*")) - .setOrder(lowOrder) - .setSettings(defaultPipelineSettings) - .get(); - final Settings requiredPipelineSettings = - Settings.builder().put(IndexSettings.REQUIRED_PIPELINE.getKey(), "high_order_required_pipeline").build(); - admin().indices() - .preparePutTemplate("required") - .setPatterns(List.of("index*")) - .setOrder(highOrder) - .setSettings(requiredPipelineSettings) - .get(); - - // this asserts that the high_order_required_pipeline was selected, without us having to actually create the pipeline etc. - final IllegalArgumentException e = expectThrows( - IllegalArgumentException.class, - () -> client().prepareIndex("index").setId("1").setSource(Map.of("field", "value")).get()); - assertThat(e, hasToString(containsString("pipeline with id [high_order_required_pipeline] does not exist"))); - } - - public void testRequiredPipelineAndRequestPipeline() { - final Settings settings = Settings.builder().put(IndexSettings.REQUIRED_PIPELINE.getKey(), "required_pipeline").build(); - createIndex("index", settings); - final IndexRequestBuilder builder = client().prepareIndex("index").setId("1"); - builder.setSource(Map.of("field", "value")); - builder.setPipeline("request_pipeline"); - final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, builder::get); - assertThat( - e, - hasToString(containsString("request pipeline [request_pipeline] can not override required pipeline [required_pipeline]"))); - } - -} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java index 21e16260c10db..61845545ae098 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java @@ -390,7 +390,7 @@ static String[] extractLeaderShardHistoryUUIDs(Map ccrIndexMetaD IndexSettings.MAX_TOKEN_COUNT_SETTING, IndexSettings.MAX_SLICES_PER_SCROLL, IndexSettings.DEFAULT_PIPELINE, - IndexSettings.REQUIRED_PIPELINE, + IndexSettings.FINAL_PIPELINE, IndexSettings.INDEX_SEARCH_THROTTLED, IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING, IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING, From 6502198147c0a14f76341117472ecbbb13621f9f Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 21 Nov 2019 19:18:03 -0500 Subject: [PATCH 2/5] Fix imports --- .../java/org/elasticsearch/action/bulk/TransportBulkAction.java | 1 - server/src/main/java/org/elasticsearch/index/IndexSettings.java | 2 -- 2 files changed, 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 41205d9dd3a81..66350657cf559 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -79,7 +79,6 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Set; diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettings.java b/server/src/main/java/org/elasticsearch/index/IndexSettings.java index 0c142da9b0481..a1994d65c8d73 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -35,10 +35,8 @@ import org.elasticsearch.node.Node; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Locale; -import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; From 21064fe289bf4d9845f2e5c0f77244c0b4b5ab1d Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 22 Nov 2019 07:14:01 -0500 Subject: [PATCH 3/5] Remove accidental file --- qa/a.txt | 2 -- 1 file changed, 2 deletions(-) delete mode 100644 qa/a.txt diff --git a/qa/a.txt b/qa/a.txt deleted file mode 100644 index 02cbf171d690b..0000000000000 --- a/qa/a.txt +++ /dev/null @@ -1,2 +0,0 @@ -This commit adds coverage to the docs for some missing built-in shard -allocation attributes. From 38146018be4a79576177e041734a5c3a1d22526f Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 22 Nov 2019 07:23:23 -0500 Subject: [PATCH 4/5] Fix tests --- .../action/bulk/TransportBulkAction.java | 7 +- .../action/bulk/TransportBulkActionTests.java | 76 ++++++------------- 2 files changed, 27 insertions(+), 56 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 66350657cf559..f21ab304c49ec 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -162,7 +162,7 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener originalRequest, - IndexRequest indexRequest, - MetaData metaData) { - + static boolean resolvePipelines(final DocWriteRequest originalRequest, final IndexRequest indexRequest, final MetaData metaData) { if (indexRequest.isPipelineResolved() == false) { final String requestPipeline = indexRequest.getPipeline(); indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java index 0cbe7c31a1ef4..d32448f830609 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java @@ -90,7 +90,7 @@ void createIndex(String index, TimeValue timeout, ActionListener TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData)); - assertThat(e.getMessage(), - equalTo("required pipeline [required-pipeline] and default pipeline [default-pipeline] can not both be set")); - } - - public void testResolveRequiredOrDefaultPipelineRequestPipeline() { + public void testResolveRequestOrDefaultPipelineAndFinalPipeline() { // no pipeline: { MetaData metaData = MetaData.builder().build(); IndexRequest indexRequest = new IndexRequest("idx"); - boolean result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData); + boolean result = TransportBulkAction.resolvePipelines(indexRequest, indexRequest, metaData); assertThat(result, is(false)); assertThat(indexRequest.isPipelineResolved(), is(true)); assertThat(indexRequest.getPipeline(), equalTo(IngestService.NOOP_PIPELINE_NAME)); @@ -264,7 +251,7 @@ public void testResolveRequiredOrDefaultPipelineRequestPipeline() { { MetaData metaData = MetaData.builder().build(); IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline"); - boolean result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData); + boolean result = TransportBulkAction.resolvePipelines(indexRequest, indexRequest, metaData); assertThat(result, is(true)); assertThat(indexRequest.isPipelineResolved(), is(true)); assertThat(indexRequest.getPipeline(), equalTo("request-pipeline")); @@ -278,38 +265,25 @@ public void testResolveRequiredOrDefaultPipelineRequestPipeline() { .numberOfReplicas(0); MetaData metaData = MetaData.builder().put(builder).build(); IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline"); - boolean result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData); + boolean result = TransportBulkAction.resolvePipelines(indexRequest, indexRequest, metaData); assertThat(result, is(true)); assertThat(indexRequest.isPipelineResolved(), is(true)); assertThat(indexRequest.getPipeline(), equalTo("request-pipeline")); } - // request pipeline with required pipeline: + // request pipeline with final pipeline: { IndexMetaData.Builder builder = IndexMetaData.builder("idx") - .settings(settings(Version.CURRENT).put(IndexSettings.FINAL_PIPELINE.getKey(), "required-pipeline")) + .settings(settings(Version.CURRENT).put(IndexSettings.FINAL_PIPELINE.getKey(), "final-pipeline")) .numberOfShards(1) .numberOfReplicas(0); MetaData metaData = MetaData.builder().put(builder).build(); IndexRequest indexRequest = new IndexRequest("idx").setPipeline("request-pipeline"); - Exception e = expectThrows(IllegalArgumentException.class, - () -> TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData)); - assertThat(e.getMessage(), - equalTo("request pipeline [request-pipeline] can not override required pipeline [required-pipeline]")); - } - - // request pipeline set to required pipeline: - { - IndexMetaData.Builder builder = IndexMetaData.builder("idx") - .settings(settings(Version.CURRENT).put(IndexSettings.FINAL_PIPELINE.getKey(), "required-pipeline")) - .numberOfShards(1) - .numberOfReplicas(0); - MetaData metaData = MetaData.builder().put(builder).build(); - IndexRequest indexRequest = new IndexRequest("idx").setPipeline("required-pipeline").isPipelineResolved(true); - boolean result = TransportBulkAction.resolveRequiredOrDefaultPipeline(indexRequest, indexRequest, metaData); + boolean result = TransportBulkAction.resolvePipelines(indexRequest, indexRequest, metaData); assertThat(result, is(true)); assertThat(indexRequest.isPipelineResolved(), is(true)); - assertThat(indexRequest.getPipeline(), equalTo("required-pipeline")); + assertThat(indexRequest.getPipeline(), equalTo("request-pipeline")); + assertThat(indexRequest.getFinalPipeline(), equalTo("final-pipeline")); } } } From 07c6ca909ee700e487370b9b5ff82cfafd4b46be Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 22 Nov 2019 08:28:59 -0500 Subject: [PATCH 5/5] Fix tests --- .../ingest/IngestServiceTests.java | 36 +++++++++++-------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index e27c1416f703e..93b1589617ea7 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -133,7 +133,8 @@ public void testExecuteIndexPipelineDoesNotExist() { when(threadPool.executor(anyString())).thenReturn(executorService); IngestService ingestService = new IngestService(mock(ClusterService.class), threadPool, null, null, null, Collections.singletonList(DUMMY_PLUGIN), client); - final IndexRequest indexRequest = new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline("_id"); + final IndexRequest indexRequest = + new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); final SetOnce failure = new SetOnce<>(); final BiConsumer failureHandler = (slot, e) -> { @@ -622,7 +623,7 @@ public String getType() { clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); final SetOnce failure = new SetOnce<>(); - final IndexRequest indexRequest = new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline(id); + final IndexRequest indexRequest = new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline(id).setFinalPipeline("_none"); final BiConsumer failureHandler = (slot, e) -> { assertThat(e.getCause(), instanceOf(IllegalStateException.class)); assertThat(e.getCause().getMessage(), equalTo("error")); @@ -651,10 +652,10 @@ public void testExecuteBulkPipelineDoesNotExist() { BulkRequest bulkRequest = new BulkRequest(); - IndexRequest indexRequest1 = new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline("_id"); + IndexRequest indexRequest1 = new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); bulkRequest.add(indexRequest1); IndexRequest indexRequest2 = - new IndexRequest("_index").id("_id").source(Collections.emptyMap()).setPipeline("does_not_exist"); + new IndexRequest("_index").id("_id").source(Collections.emptyMap()).setPipeline("does_not_exist").setFinalPipeline("_none"); bulkRequest.add(indexRequest2); @SuppressWarnings("unchecked") BiConsumer failureHandler = mock(BiConsumer.class); @@ -689,7 +690,8 @@ public void testExecuteSuccess() { ClusterState previousClusterState = clusterState; clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); - final IndexRequest indexRequest = new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline("_id"); + final IndexRequest indexRequest = + new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); @SuppressWarnings("unchecked") final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") @@ -707,7 +709,8 @@ public void testExecuteEmptyPipeline() throws Exception { ClusterState previousClusterState = clusterState; clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); - final IndexRequest indexRequest = new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline("_id"); + final IndexRequest indexRequest = + new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); @SuppressWarnings("unchecked") final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") @@ -746,7 +749,8 @@ public void testExecutePropagateAllMetaDataUpdates() throws Exception { handler.accept(ingestDocument, null); return null; }).when(processor).execute(any(), any()); - final IndexRequest indexRequest = new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline("_id"); + final IndexRequest indexRequest = + new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); @SuppressWarnings("unchecked") final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") @@ -772,7 +776,8 @@ public void testExecuteFailure() throws Exception { ClusterState previousClusterState = clusterState; clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); - final IndexRequest indexRequest = new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline("_id"); + final IndexRequest indexRequest = + new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); doThrow(new RuntimeException()) .when(processor) .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any()); @@ -816,7 +821,8 @@ public void testExecuteSuccessWithOnFailure() throws Exception { ClusterState previousClusterState = clusterState; clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); - final IndexRequest indexRequest = new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline("_id"); + final IndexRequest indexRequest = + new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); @SuppressWarnings("unchecked") final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked") @@ -844,7 +850,8 @@ public void testExecuteFailureWithNestedOnFailure() throws Exception { ClusterState previousClusterState = clusterState; clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); - final IndexRequest indexRequest = new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline("_id"); + final IndexRequest indexRequest = + new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); doThrow(new RuntimeException()) .when(onFailureOnFailureProcessor) .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap()), any()); @@ -879,7 +886,7 @@ public void testBulkRequestExecutionWithFailures() throws Exception { request = new UpdateRequest("_index", "_id"); } } else { - IndexRequest indexRequest = new IndexRequest("_index").id("_id").setPipeline(pipelineId); + IndexRequest indexRequest = new IndexRequest("_index").id("_id").setPipeline(pipelineId).setFinalPipeline("_none"); indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1"); request = indexRequest; numIndexRequests++; @@ -932,7 +939,7 @@ public void testBulkRequestExecution() throws Exception { logger.info("Using [{}], not randomly determined default [{}]", xContentType, Requests.INDEX_CONTENT_TYPE); int numRequest = scaledRandomIntBetween(8, 64); for (int i = 0; i < numRequest; i++) { - IndexRequest indexRequest = new IndexRequest("_index").id("_id").setPipeline(pipelineId); + IndexRequest indexRequest = new IndexRequest("_index").id("_id").setPipeline(pipelineId).setFinalPipeline("_none"); indexRequest.source(xContentType, "field1", "value1"); bulkRequest.add(indexRequest); } @@ -1016,7 +1023,7 @@ public void testStats() throws Exception { @SuppressWarnings("unchecked") final BiConsumer completionHandler = mock(BiConsumer.class); final IndexRequest indexRequest = new IndexRequest("_index"); - indexRequest.setPipeline("_id1"); + indexRequest.setPipeline("_id1").setFinalPipeline("_none"); indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10)); ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler, indexReq -> {}); final IngestStats afterFirstRequestStats = ingestService.stats(); @@ -1143,7 +1150,8 @@ public String getTag() { ClusterState previousClusterState = clusterState; clusterState = IngestService.innerPut(putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); - final IndexRequest indexRequest = new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline("_id"); + final IndexRequest indexRequest = + new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none"); @SuppressWarnings("unchecked") final BiConsumer failureHandler = mock(BiConsumer.class); @SuppressWarnings("unchecked")