Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Block storing invalid pipeline configs caused by nested processors #93425

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,35 @@
"invalid_field" : {}
}

---
"Test valid config with sequential processors":
- do:
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [{}, {}, {}]
}
- match: { acknowledged: true }

---
"Test invalid config with nested processors":
- skip:
version: " - 8.6.99"
reason: "bug fixed in 8.7.0"
- do:
catch: /illegal_argument_exception/
ingest.put_pipeline:
id: "my_pipeline"
body: >
{
"description": "_description",
"processors": [{"grok": {}}, {"grok": {}, "date": {}}]
}
- match: { error.reason: "[processors] contains nested objects but should be a list of single-entry objects" }


---
"Test Get Summarized Pipelines":
- skip:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.Objects;

public class SimulatePipelineRequest extends ActionRequest implements ToXContentObject {
public static final String PROCESSORS_KEY = "processors";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this variable is never used.


private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(SimulatePipelineRequest.class);
private String id;
private boolean verbose;
Expand Down Expand Up @@ -147,6 +149,12 @@ static Parsed parseWithPipelineId(
static Parsed parse(Map<String, Object> config, boolean verbose, IngestService ingestService, RestApiVersion restApiVersion)
throws Exception {
Map<String, Object> pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE);

// check for nested objects in processor configs
if (Pipeline.containsNestedProcessors(pipelineConfig)) {
throw new IllegalArgumentException("[processors] contains nested objects but should be a list of single-entry objects");
}

Pipeline pipeline = Pipeline.create(
SIMULATED_PIPELINE_ID,
pipelineConfig,
Expand Down
9 changes: 9 additions & 0 deletions server/src/main/java/org/elasticsearch/ingest/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ public Pipeline(
this.relativeTimeProvider = relativeTimeProvider;
}

public static boolean containsNestedProcessors(Map<String, Object> config) {
boolean result;

List<Map<String, Object>> processorConfigs = ConfigurationUtils.readList(null, null, config, PROCESSORS_KEY);
result = processorConfigs.stream().anyMatch(processor -> processor.keySet().size() > 1);
config.put(PROCESSORS_KEY, processorConfigs);
return result;
}

public static Pipeline create(
String id,
Map<String, Object> config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
Expand All @@ -20,10 +22,12 @@
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Map;

import static org.elasticsearch.rest.RestRequest.Method.PUT;

public class RestPutPipelineAction extends BaseRestHandler {
public static final String PROCESSORS_KEY = "processors";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this variable is never used.


@Override
public List<Route> routes() {
Expand Down Expand Up @@ -51,6 +55,13 @@ public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient cl

Tuple<XContentType, BytesReference> sourceTuple = restRequest.contentOrSourceParam();
PutPipelineRequest request = new PutPipelineRequest(restRequest.param("id"), sourceTuple.v2(), sourceTuple.v1(), ifVersion);

// check for nested processors in the config
Map<String, Object> config = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
if (Pipeline.containsNestedProcessors(config)) {
throw new IllegalArgumentException("[processors] contains nested objects but should be a list of single-entry objects");
}

request.masterNodeTimeout(restRequest.paramAsTime("master_timeout", request.masterNodeTimeout()));
request.timeout(restRequest.paramAsTime("timeout", request.timeout()));
return channel -> client.admin().cluster().putPipeline(request, new RestToXContentListener<>(channel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,38 @@ public void testParseWithProvidedPipeline() throws Exception {
assertThat(actualRequest.pipeline().getProcessors().size(), equalTo(numProcessors));
}

public void testParseWithNestedProcessors() throws Exception {
Map<String, Object> requestContent = new HashMap<>();
Map<String, Object> pipelineConfig = new HashMap<>();
List<Map<String, Object>> processors = new ArrayList<>();
Map<String, Object> processorConfig = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd declare the processorConfig right before you use it, slightly clearer that way.

List<Map<String, Object>> docs = new ArrayList<>();
requestContent.put(Fields.DOCS, docs);
String fieldName = randomAlphaOfLengthBetween(1, 10);
String fieldValue = randomAlphaOfLengthBetween(1, 10);
Map<String, Object> doc = new HashMap<>();
doc.put(Fields.SOURCE, Collections.singletonMap(fieldName, fieldValue));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer Map.of(...) rather than Collections.singletonMap(...) where possible -- there's this line and another a few lines below.

docs.add(doc);

// first processor
processors.add(Collections.singletonMap("mock_processor", new HashMap<>()));

// second processor, which is a grok and a date in the same object
processorConfig.put("mock_grok", new HashMap<>());
processorConfig.put("mock_date", new HashMap<>());
processors.add(processorConfig);

pipelineConfig.put("processors", processors);
requestContent.put(Fields.PIPELINE, pipelineConfig);

// parse and expect failure
Exception e = expectThrows(
IllegalArgumentException.class,
() -> SimulatePipelineRequest.parse(requestContent, false, ingestService, RestApiVersion.current())
);
assertThat(e.getMessage(), equalTo("[processors] contains nested objects but should be a list of single-entry objects"));
}

public void testNullPipelineId() {
Map<String, Object> requestContent = new HashMap<>();
List<Map<String, Object>> docs = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,32 @@ public void testCreate() throws Exception {
assertThat(pipeline.getProcessors().get(1).getTag(), nullValue());
}

public void testContainsNestedProcessors() throws Exception {
// create a pipeline with non-nested processors
Map<String, Object> processorConfig0 = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd drop processorConfig0 and processorConfig1, I don't think they carry their weight as variables. Probably just replace new HashMap<>() at each of their current uses.

Map<String, Object> processorConfig1 = new HashMap<>();
processorConfig0.put(ConfigurationUtils.TAG_KEY, "first-processor");
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
pipelineConfig.put(Pipeline.VERSION_KEY, versionString);
if (metadata != null) {
pipelineConfig.put(Pipeline.META_KEY, metadata);
}
pipelineConfig.put(Pipeline.PROCESSORS_KEY, List.of(Map.of("test", processorConfig0), Map.of("test", processorConfig1)));

assertFalse(Pipeline.containsNestedProcessors(pipelineConfig));

// now add one with nested processors
Map<String, Object> processorConfigNested = new HashMap<>();
processorConfigNested.put("mock_grok", new HashMap<>());
processorConfigNested.put("mock_date", new HashMap<>());
pipelineConfig.put(
Pipeline.PROCESSORS_KEY,
List.of(Map.of("test", processorConfig0), Map.of("test", processorConfig1), processorConfigNested)
);
assertTrue(Pipeline.containsNestedProcessors(pipelineConfig));
}

public void testCreateWithNoProcessorsField() throws Exception {
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void testInvalidIfVersionValue() {

RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.PUT)
.withPath("/_ingest/pipeline/my_pipeline")
.withContent(new BytesArray("{\"processors\":{}}"), XContentType.JSON)
.withContent(new BytesArray("{\"processors\":[]}"), XContentType.JSON)
.withParams(params)
.build();

Expand All @@ -60,7 +60,7 @@ public void testMissingIfVersionValue() {

RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.PUT)
.withPath("/_ingest/pipeline/my_pipeline")
.withContent(new BytesArray("{\"processors\":{}}"), XContentType.JSON)
.withContent(new BytesArray("{\"processors\":[]}"), XContentType.JSON)
.withParams(params)
.build();

Expand Down Expand Up @@ -94,7 +94,7 @@ public void testNumericIfVersionValue() {

RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.PUT)
.withPath("/_ingest/pipeline/my_pipeline")
.withContent(new BytesArray("{\"processors\":{}}"), XContentType.JSON)
.withContent(new BytesArray("{\"processors\":[]}"), XContentType.JSON)
.withParams(params)
.build();

Expand Down