From c6ff3d9637a77137efce9f822bbaba77849ed36f Mon Sep 17 00:00:00 2001 From: Matt Culbreth Date: Mon, 30 Jan 2023 18:08:27 -0500 Subject: [PATCH 1/4] Check for nested pipeline configs in Simulate and block --- .../ingest/SimulatePipelineRequest.java | 10 ++++++ .../SimulatePipelineRequestParsingTests.java | 35 +++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java index c5395118e5a5f..40910dbbe778e 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java @@ -36,6 +36,8 @@ import java.util.Objects; public class SimulatePipelineRequest extends ActionRequest implements ToXContentObject { + public static final String PROCESSORS_KEY = "processors"; + private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(SimulatePipelineRequest.class); private String id; private boolean verbose; @@ -147,6 +149,14 @@ static Parsed parseWithPipelineId( static Parsed parse(Map config, boolean verbose, IngestService ingestService, RestApiVersion restApiVersion) throws Exception { Map pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE); + + // check for nested objects in processor configs + List> processorConfigs = ConfigurationUtils.readList(null, null, pipelineConfig, PROCESSORS_KEY); + if (processorConfigs.stream().anyMatch(processor -> processor.keySet().size() > 1)) { + throw new IllegalArgumentException("[processors] contains nested objects but should be a list of single-entry objects"); + } + pipelineConfig.put(PROCESSORS_KEY, processorConfigs); + Pipeline pipeline = Pipeline.create( SIMULATED_PIPELINE_ID, pipelineConfig, diff --git a/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java b/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java index bdc14e32c2a6f..2873312128c45 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java @@ -210,6 +210,41 @@ public void testParseWithProvidedPipeline() throws Exception { assertThat(actualRequest.pipeline().getDescription(), nullValue()); assertThat(actualRequest.pipeline().getProcessors().size(), equalTo(numProcessors)); } + public void testParseWithNestedProcessors() throws Exception { + Map requestContent = new HashMap<>(); + Map pipelineConfig = new HashMap<>(); + List> processors = new ArrayList<>(); + Map processorConfig = new HashMap<>(); + List> docs = new ArrayList<>(); + requestContent.put(Fields.DOCS, docs); + String fieldName = randomAlphaOfLengthBetween(1, 10); + String fieldValue = randomAlphaOfLengthBetween(1, 10); + Map doc = new HashMap<>(); + doc.put(Fields.SOURCE, Collections.singletonMap(fieldName, fieldValue)); + docs.add(doc); + + // first processor + processors.add(Collections.singletonMap("mock_processor", new HashMap<>())); + + // second processor, which is a grok and a date in the same object + processorConfig.put("mock_grok", new HashMap<>()); + processorConfig.put("mock_date", new HashMap<>()); + processors.add(processorConfig); + + pipelineConfig.put("processors", processors); + requestContent.put(Fields.PIPELINE, pipelineConfig); + + // parse and expect failure + Exception e = expectThrows( + IllegalArgumentException.class, + () -> SimulatePipelineRequest.parse( + requestContent, + false, + ingestService, + RestApiVersion.current() + )); + assertThat(e.getMessage(), equalTo("[processors] contains nested objects but should be a list of single-entry objects")); + } public void testNullPipelineId() { Map requestContent = new HashMap<>(); From 39cb8ad4523a4832c02f404733738c96789c04ce Mon Sep 17 00:00:00 2001 From: Matt Culbreth Date: Tue, 31 Jan 2023 13:59:42 -0500 Subject: [PATCH 2/4] Error out on nested processors for put pipeline rest request --- .../rest-api-spec/test/ingest/10_basic.yml | 26 +++++++++++++++++ .../ingest/SimulatePipelineRequest.java | 4 +-- .../org/elasticsearch/ingest/Pipeline.java | 8 ++++++ .../action/ingest/RestPutPipelineAction.java | 11 ++++++++ .../ingest/PipelineFactoryTests.java | 28 +++++++++++++++++++ 5 files changed, 74 insertions(+), 3 deletions(-) diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/ingest/10_basic.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/ingest/10_basic.yml index dc6e730248860..6462bb520735f 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/ingest/10_basic.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/ingest/10_basic.yml @@ -153,6 +153,32 @@ "invalid_field" : {} } +--- +"Test valid config with sequential processors": + - do: + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [{}, {}, {}] + } + - match: { acknowledged: true } + +--- +"Test invalid config with nested processors": + - do: + catch: /illegal_argument_exception/ + ingest.put_pipeline: + id: "my_pipeline" + body: > + { + "description": "_description", + "processors": [{"grok": {}}, {"grok": {}, "date": {}}] + } + - match: { error.reason: "[processors] contains nested objects but should be a list of single-entry objects" } + + --- "Test Get Summarized Pipelines": - skip: diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java index 40910dbbe778e..eb5a105a3e625 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java @@ -151,11 +151,9 @@ static Parsed parse(Map config, boolean verbose, IngestService i Map pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE); // check for nested objects in processor configs - List> processorConfigs = ConfigurationUtils.readList(null, null, pipelineConfig, PROCESSORS_KEY); - if (processorConfigs.stream().anyMatch(processor -> processor.keySet().size() > 1)) { + if (Pipeline.containsNestedProcessors(pipelineConfig)) { throw new IllegalArgumentException("[processors] contains nested objects but should be a list of single-entry objects"); } - pipelineConfig.put(PROCESSORS_KEY, processorConfigs); Pipeline pipeline = Pipeline.create( SIMULATED_PIPELINE_ID, diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java index f4d60ecf5ce34..be78ac890fd9d 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -68,6 +68,14 @@ public Pipeline( this.relativeTimeProvider = relativeTimeProvider; } + public static boolean containsNestedProcessors(Map config) { + boolean result; + List> processorConfigs = ConfigurationUtils.readList(null, null, config, PROCESSORS_KEY); + result = processorConfigs.stream().anyMatch(processor -> processor.keySet().size() > 1); + config.put(PROCESSORS_KEY, processorConfigs); + return result; + } + public static Pipeline create( String id, Map config, diff --git a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java index 6e8c5b667da67..61cc1ef029038 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java @@ -11,7 +11,9 @@ import org.elasticsearch.action.ingest.PutPipelineRequest; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.Tuple; +import org.elasticsearch.ingest.Pipeline; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.RestToXContentListener; @@ -20,10 +22,12 @@ import java.io.IOException; import java.util.List; import java.util.Locale; +import java.util.Map; import static org.elasticsearch.rest.RestRequest.Method.PUT; public class RestPutPipelineAction extends BaseRestHandler { + public static final String PROCESSORS_KEY = "processors"; @Override public List routes() { @@ -51,6 +55,13 @@ public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient cl Tuple sourceTuple = restRequest.contentOrSourceParam(); PutPipelineRequest request = new PutPipelineRequest(restRequest.param("id"), sourceTuple.v2(), sourceTuple.v1(), ifVersion); + + // check for nested processors in the config + Map config = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); + if (Pipeline.containsNestedProcessors(config)) { + throw new IllegalArgumentException("[processors] contains nested objects but should be a list of single-entry objects"); + } + request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout())); request.timeout(restRequest.paramAsTime("timeout", request.timeout())); return channel -> client.admin().cluster().putPipeline(request, new RestToXContentListener<>(channel)); diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java index 12a44e652f5fb..d0d3a3ccd8a94 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java @@ -53,6 +53,34 @@ public void testCreate() throws Exception { assertThat(pipeline.getProcessors().get(1).getTag(), nullValue()); } + public void testContainsNestedProcessors() throws Exception { + // create a pipeline with non-nested processors + Map processorConfig0 = new HashMap<>(); + Map processorConfig1 = new HashMap<>(); + processorConfig0.put(ConfigurationUtils.TAG_KEY, "first-processor"); + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); + pipelineConfig.put(Pipeline.VERSION_KEY, versionString); + if (metadata != null) { + pipelineConfig.put(Pipeline.META_KEY, metadata); + } + pipelineConfig.put(Pipeline.PROCESSORS_KEY, + List.of(Map.of("test", processorConfig0), Map.of("test", processorConfig1))); + + assertFalse(Pipeline.containsNestedProcessors(pipelineConfig)); + + // now add one with nested processors + Map processorConfigNested = new HashMap<>(); + processorConfigNested.put("mock_grok", new HashMap<>()); + processorConfigNested.put("mock_date", new HashMap<>()); + pipelineConfig.put(Pipeline.PROCESSORS_KEY, + List.of( + Map.of("test", processorConfig0), + Map.of("test", processorConfig1), + processorConfigNested)); + assertTrue(Pipeline.containsNestedProcessors(pipelineConfig)); + } + public void testCreateWithNoProcessorsField() throws Exception { Map pipelineConfig = new HashMap<>(); pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); From 45c7f30c79e22b0667b309a30e1ad9504a09a7ca Mon Sep 17 00:00:00 2001 From: Matt Culbreth Date: Wed, 1 Feb 2023 13:08:40 -0500 Subject: [PATCH 3/4] Fix formatting --- .../SimulatePipelineRequestParsingTests.java | 9 +++------ .../elasticsearch/ingest/PipelineFactoryTests.java | 14 ++++++-------- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java b/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java index 2873312128c45..16ead142b42e0 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java @@ -210,6 +210,7 @@ public void testParseWithProvidedPipeline() throws Exception { assertThat(actualRequest.pipeline().getDescription(), nullValue()); assertThat(actualRequest.pipeline().getProcessors().size(), equalTo(numProcessors)); } + public void testParseWithNestedProcessors() throws Exception { Map requestContent = new HashMap<>(); Map pipelineConfig = new HashMap<>(); @@ -237,12 +238,8 @@ public void testParseWithNestedProcessors() throws Exception { // parse and expect failure Exception e = expectThrows( IllegalArgumentException.class, - () -> SimulatePipelineRequest.parse( - requestContent, - false, - ingestService, - RestApiVersion.current() - )); + () -> SimulatePipelineRequest.parse(requestContent, false, ingestService, RestApiVersion.current()) + ); assertThat(e.getMessage(), equalTo("[processors] contains nested objects but should be a list of single-entry objects")); } diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java index d0d3a3ccd8a94..912358013d115 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java @@ -53,7 +53,7 @@ public void testCreate() throws Exception { assertThat(pipeline.getProcessors().get(1).getTag(), nullValue()); } - public void testContainsNestedProcessors() throws Exception { + public void testContainsNestedProcessors() throws Exception { // create a pipeline with non-nested processors Map processorConfig0 = new HashMap<>(); Map processorConfig1 = new HashMap<>(); @@ -64,8 +64,7 @@ public void testContainsNestedProcessors() throws Exception { if (metadata != null) { pipelineConfig.put(Pipeline.META_KEY, metadata); } - pipelineConfig.put(Pipeline.PROCESSORS_KEY, - List.of(Map.of("test", processorConfig0), Map.of("test", processorConfig1))); + pipelineConfig.put(Pipeline.PROCESSORS_KEY, List.of(Map.of("test", processorConfig0), Map.of("test", processorConfig1))); assertFalse(Pipeline.containsNestedProcessors(pipelineConfig)); @@ -73,11 +72,10 @@ public void testContainsNestedProcessors() throws Exception { Map processorConfigNested = new HashMap<>(); processorConfigNested.put("mock_grok", new HashMap<>()); processorConfigNested.put("mock_date", new HashMap<>()); - pipelineConfig.put(Pipeline.PROCESSORS_KEY, - List.of( - Map.of("test", processorConfig0), - Map.of("test", processorConfig1), - processorConfigNested)); + pipelineConfig.put( + Pipeline.PROCESSORS_KEY, + List.of(Map.of("test", processorConfig0), Map.of("test", processorConfig1), processorConfigNested) + ); assertTrue(Pipeline.containsNestedProcessors(pipelineConfig)); } From 5690ce787af03d2aa95e1f422039c3aaeb2b8230 Mon Sep 17 00:00:00 2001 From: Matt Culbreth Date: Wed, 1 Feb 2023 15:54:58 -0500 Subject: [PATCH 4/4] Fix invalid input for pipeline in test; formatting --- .../resources/rest-api-spec/test/ingest/10_basic.yml | 5 ++++- server/src/main/java/org/elasticsearch/ingest/Pipeline.java | 1 + .../rest/action/ingest/RestPutPipelineActionTests.java | 6 +++--- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/ingest/10_basic.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/ingest/10_basic.yml index 6462bb520735f..0f834cc08712e 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/ingest/10_basic.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/ingest/10_basic.yml @@ -167,11 +167,14 @@ --- "Test invalid config with nested processors": + - skip: + version: " - 8.6.99" + reason: "bug fixed in 8.7.0" - do: catch: /illegal_argument_exception/ ingest.put_pipeline: id: "my_pipeline" - body: > + body: > { "description": "_description", "processors": [{"grok": {}}, {"grok": {}, "date": {}}] diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java index be78ac890fd9d..7e0aaa76b7aa2 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -70,6 +70,7 @@ public Pipeline( public static boolean containsNestedProcessors(Map config) { boolean result; + List> processorConfigs = ConfigurationUtils.readList(null, null, config, PROCESSORS_KEY); result = processorConfigs.stream().anyMatch(processor -> processor.keySet().size() > 1); config.put(PROCESSORS_KEY, processorConfigs); diff --git a/server/src/test/java/org/elasticsearch/rest/action/ingest/RestPutPipelineActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/ingest/RestPutPipelineActionTests.java index 440399bef17ed..4512dd65a323b 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/ingest/RestPutPipelineActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/ingest/RestPutPipelineActionTests.java @@ -46,7 +46,7 @@ public void testInvalidIfVersionValue() { RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.PUT) .withPath("/_ingest/pipeline/my_pipeline") - .withContent(new BytesArray("{\"processors\":{}}"), XContentType.JSON) + .withContent(new BytesArray("{\"processors\":[]}"), XContentType.JSON) .withParams(params) .build(); @@ -60,7 +60,7 @@ public void testMissingIfVersionValue() { RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.PUT) .withPath("/_ingest/pipeline/my_pipeline") - .withContent(new BytesArray("{\"processors\":{}}"), XContentType.JSON) + .withContent(new BytesArray("{\"processors\":[]}"), XContentType.JSON) .withParams(params) .build(); @@ -94,7 +94,7 @@ public void testNumericIfVersionValue() { RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.PUT) .withPath("/_ingest/pipeline/my_pipeline") - .withContent(new BytesArray("{\"processors\":{}}"), XContentType.JSON) + .withContent(new BytesArray("{\"processors\":[]}"), XContentType.JSON) .withParams(params) .build();