Skip to content

Commit

Permalink
ingest: better support for conditionals with simulate?verbose (elasti…
Browse files Browse the repository at this point in the history
…c#34155)

This commit introduces two corrections to the way simulate?verbose
handles conditionals on processors.

1) Prior to this change when executing simulate?verbose for
processors with conditionals that evaluate to false, that processor
would still be displayed in the result set. What was displayed was
correct, such that no changes to the document occurred. However, if the
conditional evaluates to false, the processor should not even be
displayed.

2) Prior to this change when executing simulate?verbose for
pipeline processors with conditionals, the individual steps would no
longer be displayed. Commit e37e5df addressed the issue, but
failed account for a conditional on the pipeline processor. Since
a pipeline processor can introduce cycles and is effectively a
single processor that encapsulates multiple other processors that
are potentially guarded by a single conditional, special handling is
needed to for pipeline and conditional pipeline processors.
  • Loading branch information
jakelandis committed Oct 23, 2018
1 parent dd2bbfd commit c4f03ae
Show file tree
Hide file tree
Showing 5 changed files with 275 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,6 @@ teardown:
- match: { acknowledged: true }

- do:
catch: /illegal_state_exception/
ingest.simulate:
verbose: true
body: >
Expand All @@ -667,8 +666,10 @@ teardown:
}
]
}
- match: { error.root_cause.0.type: "illegal_state_exception" }
- match: { error.root_cause.0.reason: "Cycle detected for pipeline: inner" }
- length: { docs: 1 }
- length: { docs.0.processor_results: 1 }
- match: { docs.0.processor_results.0.error.reason: "java.lang.IllegalArgumentException: java.lang.IllegalStateException: Cycle detected for pipeline: outer" }
- match: { docs.0.processor_results.0.error.caused_by.caused_by.reason: "Cycle detected for pipeline: outer" }

---
"Test verbose simulate with Pipeline Processor with Multiple Pipelines":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,13 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.PipelineProcessor;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Set;

import static org.elasticsearch.ingest.TrackingResultProcessor.decorate;

Expand All @@ -46,11 +42,9 @@ class SimulateExecutionService {
}

SimulateDocumentResult executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean verbose) {
// Prevent cycles in pipeline decoration
final Set<PipelineProcessor> pipelinesSeen = Collections.newSetFromMap(new IdentityHashMap<>());
if (verbose) {
List<SimulateProcessorResult> processorResultList = new ArrayList<>();
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList, pipelinesSeen);
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList);
try {
Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(),
verbosePipelineProcessor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,7 @@ public class ConditionalProcessor extends AbstractProcessor {

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
IngestConditionalScript script =
scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams());
if (script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata()))) {
// Only record metric if the script evaluates to true
if (evaluate(ingestDocument)) {
long startTimeInNanos = relativeTimeProvider.getAsLong();
try {
metric.preIngest();
Expand All @@ -81,6 +78,12 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
return ingestDocument;
}

boolean evaluate(IngestDocument ingestDocument) {
IngestConditionalScript script =
scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams());
return script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata()));
}

Processor getProcessor() {
return processor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

package org.elasticsearch.ingest;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ingest.SimulateProcessorResult;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;

/**
* Processor to be used within Simulate API to keep track of processors executed in pipeline.
Expand All @@ -42,14 +42,46 @@ public final class TrackingResultProcessor implements Processor {

@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
Processor processor = actualProcessor;
try {
actualProcessor.execute(ingestDocument);
processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument)));
if (processor instanceof ConditionalProcessor) {
ConditionalProcessor conditionalProcessor = (ConditionalProcessor) processor;
if (conditionalProcessor.evaluate(ingestDocument) == false) {
return ingestDocument;
}
if (conditionalProcessor.getProcessor() instanceof PipelineProcessor) {
processor = conditionalProcessor.getProcessor();
}
}
if (processor instanceof PipelineProcessor) {
PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor);
Pipeline pipeline = pipelineProcessor.getPipeline();
//runtime check for cycles against a copy of the document. This is needed to properly handle conditionals around pipelines
try {
IngestDocument ingestDocumentCopy = new IngestDocument(ingestDocument);
ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline());
} catch (ElasticsearchException elasticsearchException) {
if (elasticsearchException.getCause().getCause() instanceof IllegalStateException) {
throw elasticsearchException;
}
//else do nothing, let the tracking processors throw the exception while recording the path up to the failure
} catch (Exception e) {
// do nothing, let the tracking processors throw the exception while recording the path up to the failure
}
//now that we know that there are no cycles between pipelines, decorate the processors for this pipeline and execute it
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList);
Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(),
verbosePipelineProcessor);
ingestDocument.executePipeline(verbosePipeline);
} else {
processor.execute(ingestDocument);
processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument)));
}
} catch (Exception e) {
if (ignoreFailure) {
processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument), e));
processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument), e));
} else {
processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), e));
processorResultList.add(new SimulateProcessorResult(processor.getTag(), e));
}
throw e;
}
Expand All @@ -66,35 +98,19 @@ public String getTag() {
return actualProcessor.getTag();
}

public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List<SimulateProcessorResult> processorResultList,
Set<PipelineProcessor> pipelinesSeen) {
public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List<SimulateProcessorResult> processorResultList) {
List<Processor> processors = new ArrayList<>(compoundProcessor.getProcessors().size());
for (Processor processor : compoundProcessor.getProcessors()) {
if (processor instanceof PipelineProcessor) {
PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor);
if (pipelinesSeen.add(pipelineProcessor) == false) {
throw new IllegalStateException("Cycle detected for pipeline: " + pipelineProcessor.getPipeline().getId());
}
processors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList, pipelinesSeen));
pipelinesSeen.remove(pipelineProcessor);
} else if (processor instanceof CompoundProcessor) {
processors.add(decorate((CompoundProcessor) processor, processorResultList, pipelinesSeen));
if (processor instanceof CompoundProcessor) {
processors.add(decorate((CompoundProcessor) processor, processorResultList));
} else {
processors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList));
}
}
List<Processor> onFailureProcessors = new ArrayList<>(compoundProcessor.getProcessors().size());
for (Processor processor : compoundProcessor.getOnFailureProcessors()) {
if (processor instanceof PipelineProcessor) {
PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor);
if (pipelinesSeen.add(pipelineProcessor) == false) {
throw new IllegalStateException("Cycle detected for pipeline: " + pipelineProcessor.getPipeline().getId());
}
onFailureProcessors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList,
pipelinesSeen));
pipelinesSeen.remove(pipelineProcessor);
} else if (processor instanceof CompoundProcessor) {
onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList, pipelinesSeen));
if (processor instanceof CompoundProcessor) {
onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList));
} else {
onFailureProcessors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList));
}
Expand Down
Loading

0 comments on commit c4f03ae

Please sign in to comment.