Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INGEST: Implement Drop Processor #32278

Merged

Conversation

original-brownbear
Copy link
Member

@original-brownbear original-brownbear commented Jul 23, 2018

  • Throw DroppedDocumentException to indicate document is not to be indexed
  • Catch and rethrow where necessary to prevent Exception being logged and avoid unnecessary wrapping and other operations on the exception
  • In simulate API: Don't add dropped documents to returned docs array
  • In index request: return "noop" type index response for dropped documents
  • closes Drop document in ingest pipeline #23726

* Throw `DroppedDocumentException` to indicate document is not to be indexed
* Catch and rethrow where necessary to prevent Exception being logged and avoid unnecessary wrapping and other operations on the exception
* In simulate API: Don't add dropped documents to returned `docs` array
* In index request: return `"noop"` type index response for dropped documents
@original-brownbear original-brownbear added >enhancement :Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP v7.0.0 v6.4.0 labels Jul 23, 2018
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-core-infra

@original-brownbear
Copy link
Member Author

@rjernst can you take al look here and let me know if the following behaviour looks ok to you?:

Pipeline:

PUT _ingest/pipeline/my_index
{
    "description": "drops documents",
    "processors": [
      {
        "script": {
          "source": """
            if (ctx.message == 'text') {
              Processors.drop()
            }
          """
        }
      }
    ]
}

Standard index request:

POST foo/1/?pipeline=my_index
{
  "message": "text"
}

returns:

{
  "_index": "foo",
  "_type": "1",
  "_id": null,
  "_version": -3,
  "result": "noop",
  "_shards": {
    "total": 0,
    "successful": 0,
    "failed": 0
  }
}

Simulate API:

Request:

POST _ingest/pipeline/_simulate
{
  "pipeline" :
  {
    "description": "_description",
    "processors": [
        {
        "script": {
          "source": """
            if (ctx.message == 'text') {
              Processors.drop()
            }
          """
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_type": "_doc",
      "_id": "id",
      "_source": {
        "message": "text"
      }
    },
    {
      "_index": "index",
      "_type": "_doc",
      "_id": "id",
      "_source": {
        "message": "text1"
      }
    }
  ]
}

returns:

{
  "docs": [
    {
      "doc": {
        "_index": "index",
        "_type": "_doc",
        "_id": "id",
        "_source": {
          "message": "text1"
        },
        "_ingest": {
          "timestamp": "2018-07-23T10:32:47.461733Z"
        }
      }
    }
  ]
}

Then I'd add tests here and we should be good :) (provided you're ok with the implementation)

Copy link
Member

@rjernst rjernst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the long delay in reviewing @original-brownbear. Overall this looks good, but we need to not add ingest specific code to painless.

@@ -56,6 +57,9 @@
* @return The generated ScriptException.
*/
default ScriptException convertToScriptException(Throwable t, Map<String, List<String>> extraMetadata) {
if (t instanceof DroppedDocumentException) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Painless shouldn't know about ingest. I think we need to discuss with @jdconrad how we can change painless to allow some exception types to be propagated verbatim instead of wrapping in a ScriptException. Perhaps we could allow execute on script classes to have checked exceptions, and those exceptions would be rethrown instead of wrapped?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rjernst yea this was really dirty, hence the WIP label.

@jdconrad @rjernst I like the idea of a checked Exception for this. Maybe something like ScriptExitException? We could even add an exit code field to it and maybe reuse it for other scenarios as well (thinking about ingest processors called from Painless directly failing here mostly ... those currently generate a weirdly complicated wrapped exception when failing, when in actuality you probably just want the failure in the processor to propagate up cleanly)?

Then we could do:

} catch (ScriptExitException e) {
   if (e.exitCode() == 0) {
      // Drop document quietly
   } else {
     // Log error and do other error things
   }
}

WDYT?

Copy link
Contributor

@jdconrad jdconrad Aug 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't use exceptions in this form because a user could potentially catch them in the script. We decided a while back that if we want to get around this we can use Error classes and we will never whitelist them. These bypass the users ability to catch them in script.

To add to that we currently have a PainlessError class that maybe could be used as an example for some of what you'd like to accomplish here? This gets triggered right now when a loop counter goes too high.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think an error is right here. This is essentially an exception the user is throwing (through calling the drop() method). Sure, they could catch it themselves. Another option (suggested by @jdconrad privately) is to have a flag on the IngestDocument, and set this, kind of like we have noop for update scripts. This might be an ok solution; we would then make the drop method take the ingest document and set the flag.

To be clear, though, my proposal was to change the execute method to something like this:

void execute(Map ctx) throws DroppedDocmentException

Painless would determine any checked exceptions when reflecting the execute method, and add a catch/rethrow into the existing try/catch within the main execute implementation. So the exception type would not be generic, but still specific to ingest, and painless would learn about it through reflection (as it does the other argument types as well).

@original-brownbear
Copy link
Member Author

@rjernst @jdconrad

Painless would determine any checked exceptions when reflecting the execute method, and add a catch/rethrow into the existing try/catch within the main execute implementation. So the exception type would not be generic, but still specific to ingest, and painless would learn about it through reflection (as it does the other argument types as well).

👍 This makes sense to me, maybe we could start moving on that?

@original-brownbear
Copy link
Member Author

@rjernst ping :)

@original-brownbear
Copy link
Member Author

Discussed this with @rjernst and we came up with an approach to this that
doesn't force the use of Exceptions and catching them on us.

As discussed earlier, the long-term goal is moving to representing the pipeline
as a script instead of as JSON.
If we were to represent a pipeline as a script like e.g.:


if(doc.foo == 'bar') {
   return 'null'
}

// legacy script processor called from new-style pipeline
scriptProcessor({
 "script": "script source",
 "params": {
   "param1": "val1" 
 }
}).execute(doc)

return doc

then handling dropping a document as simply returning null seems like a natural approach.
The practical issue we have in getting there with the current scripts is that they have no
return value, so supporting drop in legacy scripts is not possible in this way.

Now that we do have conditionals for any processor by setting the if field having drop
as part of the current scripts is less valuable though. We could simply implement a Drop
processor in the normal JSON pipeline and have its only configuration setting be the if condition.
This still leaves the question of how to actually make the execution of the pipeline break out
as a result of the drop processor.

In order to be able to break out of the execution and make the pipeline more similar to
what the above script based pipeline configuration looks like we were thinking of:

  • Change the current org.elasticsearch.ingest.Processor#execute method from void to
    returning IngestDocument.
    • Make all existing processor simply return the input document for now
    • Make the drop processor return null.
    • Adjust foreach, pipeline processor and composite processor accordingly so that they
      simply break out if a pipeline/processor returns null and return null themselves
    • Do not index document if pipeline returns null

(correct me if I misrepresented anything @rjernst :) )

@original-brownbear
Copy link
Member Author

@rjernst Implemented the above approach here (needs tests still and I might have to clean up a few left over formatting changes from the earlier impl. but functionally it seems fine), so in practice this works now:

Define drop processor with condition:

PUT _ingest/pipeline/my_index
{
    "description": "drops documents",
    "processors": [
      {
        "drop": {
          "if": "ctx.message == 'text'"
        }
      }
    ]
}

Try to index document that should be dropped:

POST foo/1/?pipeline=my_index
{
  "message": "text"
}

Returns:

{
  "_index": "foo",
  "_type": "1",
  "_id": null,
  "_version": -3,
  "result": "noop",
  "_shards": {
    "total": 0,
    "successful": 0,
    "failed": 0
  }
}

Think this is what we want functionally (then I'd clean it up real quick and add some tests :) and we should have this ready shortly).

Copy link
Member

@rjernst rjernst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! LGTM

}
throw new IllegalArgumentException("field [" + field + "] is null, cannot loop over its elements.");
}
List<Object> newValues = new ArrayList<>(values.size());
for (Object value : values) {
Object previousValue = ingestDocument.getIngestMetadata().put("_value", value);
try {
processor.execute(ingestDocument);
if (processor.execute(ingestDocument) == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically shouldn't the ingestDocument be chained for each successive execute call?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jup, that would be cleaner now => Will adjust that tomorrow :)

@original-brownbear
Copy link
Member Author

@rjernst thanks!

@original-brownbear original-brownbear merged commit 4677409 into elastic:master Sep 5, 2018
@original-brownbear original-brownbear deleted the clean-drop-proc branch September 5, 2018 12:25
@original-brownbear
Copy link
Member Author

urgh ... I just realized I completely forgot about adding tests here and just instinctively merged on the LGTM ✔️ => sorry about that PR for tests incoming shortly

jasontedor added a commit to jasontedor/elasticsearch that referenced this pull request Sep 5, 2018
* master:
  Fix deprecated setting specializations (elastic#33412)
  HLRC: split cluster request converters (elastic#33400)
  HLRC: Add ML get influencers API (elastic#33389)
  Add conditional token filter to elasticsearch (elastic#31958)
  Build: Merge xpack checkstyle config into core (elastic#33399)
  Disable IndexRecoveryIT.testRerouteRecovery.
  INGEST: Implement Drop Processor (elastic#32278)
  [ML] Add field stats to log structure finder (elastic#33351)
  Add interval response parameter to AutoDateInterval histogram (elastic#33254)
  MINOR+CORE: Remove Dead Methods ClusterService (elastic#33346)
original-brownbear added a commit to original-brownbear/elasticsearch that referenced this pull request Sep 5, 2018
* UT for behavior of dropped callback
and drop processor
   * Moved drop processor to `server`
project to enable this test
* Simple IT
* Relates elastic#32278
@original-brownbear
Copy link
Member Author

Tests -> #33430

@jakelandis
Copy link
Contributor

@original-brownbear - is this still pending back port to 6.5 ? Any concerns about the change to the execute signature for the 6.x branch ?

https://github.com/elastic/elasticsearch/tree/6.x/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common

@original-brownbear
Copy link
Member Author

@jakelandis no not really but just to be safe I was waiting for #33430 to be reviewed/merged so we have a bit of coverage :)

original-brownbear added a commit that referenced this pull request Sep 25, 2018
* INGEST: Tests for Drop Processor

* UT for behavior of dropped callback
and drop processor
   * Moved drop processor to `server`
project to enable this test
* Simple IT
* Relates #32278
@jakelandis
Copy link
Contributor

@original-brownbear - now that #33430 is merged, should be backport this for 6.5 ?

original-brownbear added a commit to original-brownbear/elasticsearch that referenced this pull request Oct 18, 2018
* INGEST: Implement Drop Processor
* Adjust Processor API
* Implement Drop Processor
* Closes elastic#23726
original-brownbear added a commit that referenced this pull request Oct 19, 2018
* INGEST: Implement Drop Processor (#32278)
* Adjust Processor API
* Closes #23726
jakelandis pushed a commit to jakelandis/elasticsearch that referenced this pull request Oct 23, 2018
* INGEST: Tests for Drop Processor

* UT for behavior of dropped callback
and drop processor
   * Moved drop processor to `server`
project to enable this test
* Simple IT
* Relates elastic#32278
jakelandis pushed a commit that referenced this pull request Oct 23, 2018
* INGEST: Tests for Drop Processor

* UT for behavior of dropped callback
and drop processor
   * Moved drop processor to `server`
project to enable this test
* Simple IT
* Relates #32278
kcm pushed a commit that referenced this pull request Oct 30, 2018
* INGEST: Tests for Drop Processor

* UT for behavior of dropped callback
and drop processor
   * Moved drop processor to `server`
project to enable this test
* Simple IT
* Relates #32278
@guruxu
Copy link

guruxu commented Nov 16, 2018

Folks, it would really be helpful to us plugin developers to plan ahead of time, if you could 1) add a deprecation annotation in advance and 2) release note the change. Re: https://github.com/elastic/elasticsearch/pull/32278/files#diff-41059c22f314a0bd73e614cc788a483bR60

@jakelandis
Copy link
Contributor

@guruxu - apologies for the troubles this has caused you. We will try to be more careful with these types of changes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP >enhancement v6.5.0 v7.0.0-beta1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Drop document in ingest pipeline
8 participants