Skip to content

Commit

Permalink
ingest: support simulate with verbose for pipeline processor (elastic…
Browse files Browse the repository at this point in the history
…#33839)

* ingest: support simulate with verbose for pipeline processor

This change better supports the use of simulate?verbose with the
pipeline processor. Prior to this change any pipeline processors
executed with simulate?verbose would not show all intermediate 
processors for the inner pipelines.

This changes also moves the PipelineProcess and TrackingResultProcessor
classes to enable instance checks and to avoid overly public classes.
As well this updates the error message for when cycles are detected
in pipelines calling other pipelines.
  • Loading branch information
jakelandis authored Sep 20, 2018
1 parent 019aead commit e37e5df
Show file tree
Hide file tree
Showing 10 changed files with 509 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.grok.Grok;
import org.elasticsearch.grok.ThreadWatchdog;
import org.elasticsearch.ingest.PipelineProcessor;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.IngestPlugin;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,4 @@ teardown:
pipeline: "outer"
body: {}
- match: { error.root_cause.0.type: "exception" }
- match: { error.root_cause.0.reason: "java.lang.IllegalArgumentException: java.lang.IllegalStateException: Recursive invocation of pipeline [inner] detected." }
- match: { error.root_cause.0.reason: "java.lang.IllegalArgumentException: java.lang.IllegalStateException: Cycle detected for pipeline: inner" }
Original file line number Diff line number Diff line change
Expand Up @@ -605,3 +605,150 @@ teardown:
- length: { docs.0.processor_results.1: 2 }
- match: { docs.0.processor_results.1.tag: "rename-1" }
- match: { docs.0.processor_results.1.doc._source.new_status: 200 }

---
"Test verbose simulate with Pipeline Processor with Circular Pipelines":
- do:
ingest.put_pipeline:
id: "outer"
body: >
{
"description" : "outer pipeline",
"processors" : [
{
"pipeline" : {
"pipeline": "inner"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.put_pipeline:
id: "inner"
body: >
{
"description" : "inner pipeline",
"processors" : [
{
"pipeline" : {
"pipeline": "outer"
}
}
]
}
- match: { acknowledged: true }

- do:
catch: /illegal_state_exception/
ingest.simulate:
verbose: true
body: >
{
"pipeline": {
"processors" : [
{
"pipeline" : {
"pipeline": "outer"
}
}
]
}
,
"docs": [
{
"_index": "index",
"_type": "type",
"_id": "id",
"_source": {
"field1": "123.42 400 <foo>"
}
}
]
}
- match: { error.root_cause.0.type: "illegal_state_exception" }
- match: { error.root_cause.0.reason: "Cycle detected for pipeline: inner" }

---
"Test verbose simulate with Pipeline Processor with Multiple Pipelines":
- do:
ingest.put_pipeline:
id: "pipeline1"
body: >
{
"processors": [
{
"set": {
"field": "pipeline1",
"value": true
}
},
{
"pipeline": {
"pipeline": "pipeline2"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.put_pipeline:
id: "pipeline2"
body: >
{
"processors": [
{
"set": {
"field": "pipeline2",
"value": true
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.simulate:
verbose: true
body: >
{
"pipeline": {
"processors": [
{
"set": {
"field": "pipeline0",
"value": true
}
},
{
"pipeline": {
"pipeline": "pipeline1"
}
}
]
},
"docs": [
{
"_index": "index",
"_type": "type",
"_id": "id",
"_source": {
"field1": "123.42 400 <foo>"
}
}
]
}
- length: { docs: 1 }
- length: { docs.0.processor_results: 3 }
- match: { docs.0.processor_results.0.doc._source.pipeline0: true }
- is_false: docs.0.processor_results.0.doc._source.pipeline1
- is_false: docs.0.processor_results.0.doc._source.pipeline2
- match: { docs.0.processor_results.1.doc._source.pipeline0: true }
- match: { docs.0.processor_results.1.doc._source.pipeline1: true }
- is_false: docs.0.processor_results.1.doc._source.pipeline2
- match: { docs.0.processor_results.2.doc._source.pipeline0: true }
- match: { docs.0.processor_results.2.doc._source.pipeline1: true }
- match: { docs.0.processor_results.2.doc._source.pipeline2: true }

Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.PipelineProcessor;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Set;

import static org.elasticsearch.action.ingest.TrackingResultProcessor.decorate;
import static org.elasticsearch.ingest.TrackingResultProcessor.decorate;

class SimulateExecutionService {

Expand All @@ -42,11 +46,15 @@ class SimulateExecutionService {
}

SimulateDocumentResult executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean verbose) {
// Prevent cycles in pipeline decoration
final Set<PipelineProcessor> pipelinesSeen = Collections.newSetFromMap(new IdentityHashMap<>());
if (verbose) {
List<SimulateProcessorResult> processorResultList = new ArrayList<>();
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList);
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList, pipelinesSeen);
try {
verbosePipelineProcessor.execute(ingestDocument);
Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(),
verbosePipelineProcessor);
ingestDocument.executePipeline(verbosePipeline);
return new SimulateDocumentVerboseResult(processorResultList);
} catch (Exception e) {
return new SimulateDocumentVerboseResult(processorResultList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ private static Object deepCopy(Object value) {
public IngestDocument executePipeline(Pipeline pipeline) throws Exception {
try {
if (this.executedPipelines.add(pipeline) == false) {
throw new IllegalStateException("Recursive invocation of pipeline [" + pipeline.getId() + "] detected.");
throw new IllegalStateException("Cycle detected for pipeline: " + pipeline.getId());
}
return pipeline.execute(this);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,9 @@
* under the License.
*/

package org.elasticsearch.ingest.common;
package org.elasticsearch.ingest;

import java.util.Map;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.Processor;

public class PipelineProcessor extends AbstractProcessor {

Expand All @@ -50,6 +44,10 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
return ingestDocument.executePipeline(pipeline);
}

Pipeline getPipeline(){
return ingestService.getPipeline(pipelineName);
}

@Override
public String getType() {
return TYPE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@
* under the License.
*/

package org.elasticsearch.action.ingest;
package org.elasticsearch.ingest;

import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.action.ingest.SimulateProcessorResult;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;

/**
* Processor to be used within Simulate API to keep track of processors executed in pipeline.
Expand All @@ -35,7 +34,7 @@ public final class TrackingResultProcessor implements Processor {
private final List<SimulateProcessorResult> processorResultList;
private final boolean ignoreFailure;

public TrackingResultProcessor(boolean ignoreFailure, Processor actualProcessor, List<SimulateProcessorResult> processorResultList) {
TrackingResultProcessor(boolean ignoreFailure, Processor actualProcessor, List<SimulateProcessorResult> processorResultList) {
this.ignoreFailure = ignoreFailure;
this.processorResultList = processorResultList;
this.actualProcessor = actualProcessor;
Expand Down Expand Up @@ -67,19 +66,35 @@ public String getTag() {
return actualProcessor.getTag();
}

public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List<SimulateProcessorResult> processorResultList) {
public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List<SimulateProcessorResult> processorResultList,
Set<PipelineProcessor> pipelinesSeen) {
List<Processor> processors = new ArrayList<>(compoundProcessor.getProcessors().size());
for (Processor processor : compoundProcessor.getProcessors()) {
if (processor instanceof CompoundProcessor) {
processors.add(decorate((CompoundProcessor) processor, processorResultList));
if (processor instanceof PipelineProcessor) {
PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor);
if (pipelinesSeen.add(pipelineProcessor) == false) {
throw new IllegalStateException("Cycle detected for pipeline: " + pipelineProcessor.getPipeline().getId());
}
processors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList, pipelinesSeen));
pipelinesSeen.remove(pipelineProcessor);
} else if (processor instanceof CompoundProcessor) {
processors.add(decorate((CompoundProcessor) processor, processorResultList, pipelinesSeen));
} else {
processors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList));
}
}
List<Processor> onFailureProcessors = new ArrayList<>(compoundProcessor.getProcessors().size());
for (Processor processor : compoundProcessor.getOnFailureProcessors()) {
if (processor instanceof CompoundProcessor) {
onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList));
if (processor instanceof PipelineProcessor) {
PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor);
if (pipelinesSeen.add(pipelineProcessor) == false) {
throw new IllegalStateException("Cycle detected for pipeline: " + pipelineProcessor.getPipeline().getId());
}
onFailureProcessors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList,
pipelinesSeen));
pipelinesSeen.remove(pipelineProcessor);
} else if (processor instanceof CompoundProcessor) {
onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList, pipelinesSeen));
} else {
onFailureProcessors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList));
}
Expand Down
Loading

0 comments on commit e37e5df

Please sign in to comment.