Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

An invalid pipeline can be stored #41837

Open
spinscale opened this issue May 6, 2019 · 6 comments
Open

An invalid pipeline can be stored #41837

spinscale opened this issue May 6, 2019 · 6 comments
Assignees
Labels
>bug :Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP Team:Data Management Meta label for data/management team

Comments

@spinscale
Copy link
Contributor

spinscale commented May 6, 2019

Elasticsearch version (bin/elasticsearch --version): 7.0.1

Description of the problem including expected versus actual behavior:

A user can store an invalid pipeline (or run the simulate API without getting any error).

Steps to reproduce:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "script": {
          "lang": "painless",
          "source": "ctx.bar = 1"
        },
        "set": {
          "if": "ctx.bar == 1",
          "field": "spam",
          "value": "1"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "foo": "3"
      }
    }
  ]
}

PUT _ingest/pipeline/my-pipeline-id
{
  "processors": [
    {
      "script": {
        "lang": "painless",
        "source": "ctx.bar = 1"
      },
      "set": {
        "if": "ctx.bar == 1",
        "field": "spam",
        "value": "1"
      }
    }
  ]
}

This is a tricky example. If there were two processor with the same name it would work as expected and return an error. Also running a remove and a set processor works. However this script example fails, because the bar variable is not set, when the set processor is running.

@albertzaharovits albertzaharovits added the :Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP label May 6, 2019
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-core-features

@martijnvg martijnvg added the >bug label May 28, 2019
@martijnvg
Copy link
Member

It looks like we need to enforce that a processor element inside a pipeline can only have a single json object field.

@martijnvg martijnvg self-assigned this May 28, 2019
@rjernst rjernst added the Team:Data Management Meta label for data/management team label May 4, 2020
@martijnvg martijnvg removed their assignment Mar 24, 2021
@joegallo
Copy link
Contributor

joegallo commented Dec 7, 2022

Ahhhh, here's where we go wrong:

diff --git a/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java b/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java
index ff607423746..039e539122b 100644
--- a/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java
+++ b/server/src/main/java/org/elasticsearch/ingest/ConfigurationUtils.java
@@ -471,6 +471,7 @@ public final class ConfigurationUtils {
         List<Processor> processors = new ArrayList<>();
         if (processorConfigs != null) {
             for (Map<String, Object> processorConfigWithKey : processorConfigs) {
+                assert processorConfigWithKey.size() == 1;
                 for (Map.Entry<String, Object> entry : processorConfigWithKey.entrySet()) {
                     try {
                         if (entry.getValue() == null) {

It'll end up being 2 in the above example, but we'd expect it to only be 1. Of course we can't merely add an assert like this, but I figured I'd peel back a layer on where we go wrong here.


diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java
index 2a75ec3b768..2f13dc329c3 100644
--- a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java
+++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java
@@ -39,7 +39,7 @@ public class SimulatePipelineTransportAction extends HandledTransportAction<Simu

     @Override
     protected void doExecute(Task task, SimulatePipelineRequest request, ActionListener<SimulatePipelineResponse> listener) {
-        final Map<String, Object> source = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
+        final Map<String, Object> source = XContentHelper.convertToMap(request.getSource(), true, request.getXContentType()).v2();

         final SimulatePipelineRequest.Parsed simulateRequest;
         try {
diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java
index f54ca2853ab..2d68448718b 100644
--- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java
+++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java
@@ -481,7 +481,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
     }

     public void validatePipelineRequest(PutPipelineRequest request, NodesInfoResponse nodeInfos) throws Exception {
-        final Map<String, Object> config = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
+        final Map<String, Object> config = XContentHelper.convertToMap(request.getSource(), true, request.getXContentType()).v2();
         Map<DiscoveryNode, IngestInfo> ingestInfos = new HashMap<>();
         for (NodeInfo nodeInfo : nodeInfos.getNodes()) {
             ingestInfos.put(nodeInfo.getNode(), nodeInfo.getInfo(IngestInfo.class));
@@ -610,7 +610,7 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
                     );
                 }

-                var pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
+                var pipelineConfig = XContentHelper.convertToMap(request.getSource(), true, request.getXContentType()).v2();
                 final Integer specifiedVersion = (Integer) pipelineConfig.get("version");
                 if (pipelineConfig.containsKey("version") && Objects.equals(specifiedVersion, currentVersion)) {
                     throw new IllegalArgumentException(

I think this would ensure we get back everything in the original source ordering. A version of this could perhaps play a role in making this work without also relying on the hashing order of maps. (I'm not saying that's a good idea, just observing that it's the case.)

@joegallo
Copy link
Contributor

joegallo commented Dec 8, 2022

In the case of the simulate API, I'm of the opinion that we can just make the breaking change immediately. For the rest, though, I think we'll want to be more thoughtful.

@joegallo
Copy link
Contributor

joegallo commented Jan 4, 2023

#36134 is actually the same issue as this -- I closed that one in favor of this one.

@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-data-management (Team:Data Management)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
>bug :Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP Team:Data Management Meta label for data/management team
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants