Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pipeline name to ingest metadata #50467

Merged
merged 3 commits into from
Jan 15, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions docs/reference/ingest/apis/simulate-pipeline.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ The API returns the following response:
"foo": "bar"
},
"_ingest": {
"timestamp": "2017-05-04T22:46:09.674Z"
"timestamp": "2017-05-04T22:46:09.674Z",
"pipeline": "_simulate_pipeline"
}
}
},
Expand All @@ -364,7 +365,8 @@ The API returns the following response:
"foo": "bar"
},
"_ingest": {
"timestamp": "2017-05-04T22:46:09.675Z"
"timestamp": "2017-05-04T22:46:09.675Z",
"pipeline": "_simulate_pipeline"
}
}
}
Expand All @@ -381,7 +383,8 @@ The API returns the following response:
"foo": "rab"
},
"_ingest": {
"timestamp": "2017-05-04T22:46:09.676Z"
"timestamp": "2017-05-04T22:46:09.676Z",
"pipeline": "_simulate_pipeline"
}
}
},
Expand All @@ -395,7 +398,8 @@ The API returns the following response:
"foo": "rab"
},
"_ingest": {
"timestamp": "2017-05-04T22:46:09.677Z"
"timestamp": "2017-05-04T22:46:09.677Z",
"pipeline": "_simulate_pipeline"
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions docs/reference/ingest/processors/pipeline.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ include::common-options.asciidoc[]
--------------------------------------------------
// NOTCONSOLE

The name of the current pipeline can be accessed from the `_ingest.pipeline` ingest metadata key.

An example of using this processor for nesting pipelines would be:

Define an inner pipeline:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,81 @@ teardown:
}
- match: { error.root_cause.0.type: "ingest_processor_exception" }
- match: { error.root_cause.0.reason: "java.lang.IllegalStateException: Pipeline processor configured for non-existent pipeline [legal-department]" }

---
"Test _ingest.pipeline metadata":
- do:
ingest.put_pipeline:
id: "pipeline1"
body: >
{
"processors" : [
{
"append" : {
"field": "pipelines",
"value": "{{_ingest.pipeline}}"
}
},
{
"pipeline" : {
"name": "another_pipeline"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.put_pipeline:
id: "another_pipeline"
body: >
{
"processors" : [
{
"append" : {
"field": "pipelines",
"value": "{{_ingest.pipeline}}"
}
},
{
"pipeline" : {
"name": "another_pipeline2"
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.put_pipeline:
id: "another_pipeline2"
body: >
{
"processors" : [
{
"append" : {
"field": "pipelines",
"value": "{{_ingest.pipeline}}"
}
}
]
}
- match: { acknowledged: true }

- do:
index:
index: test
id: 1
pipeline: "pipeline1"
body: >
{
}

- do:
get:
index: test
id: 1
- length: { _source.pipelines: 3 }
- match: { _source.pipelines.0: "pipeline1" }
- match: { _source.pipelines.1: "another_pipeline" }
- match: { _source.pipelines.2: "another_pipeline2" }
Original file line number Diff line number Diff line change
Expand Up @@ -284,26 +284,30 @@ teardown:
- length: { docs.0.processor_results.0.doc._source: 2 }
- match: { docs.0.processor_results.0.doc._source.foo.bar.0.item: "HELLO" }
- match: { docs.0.processor_results.0.doc._source.field2.value: "_value" }
- length: { docs.0.processor_results.0.doc._ingest: 1 }
- length: { docs.0.processor_results.0.doc._ingest: 2 }
- is_true: docs.0.processor_results.0.doc._ingest.timestamp
- is_true: docs.0.processor_results.0.doc._ingest.pipeline
- length: { docs.0.processor_results.1.doc._source: 3 }
- match: { docs.0.processor_results.1.doc._source.foo.bar.0.item: "HELLO" }
- match: { docs.0.processor_results.1.doc._source.field2.value: "_value" }
- match: { docs.0.processor_results.1.doc._source.field3: "third_val" }
- length: { docs.0.processor_results.1.doc._ingest: 1 }
- length: { docs.0.processor_results.1.doc._ingest: 2 }
- is_true: docs.0.processor_results.1.doc._ingest.timestamp
- is_true: docs.0.processor_results.1.doc._ingest.pipeline
- length: { docs.0.processor_results.2.doc._source: 3 }
- match: { docs.0.processor_results.2.doc._source.foo.bar.0.item: "HELLO" }
- match: { docs.0.processor_results.2.doc._source.field2.value: "_VALUE" }
- match: { docs.0.processor_results.2.doc._source.field3: "third_val" }
- length: { docs.0.processor_results.2.doc._ingest: 1 }
- length: { docs.0.processor_results.2.doc._ingest: 2 }
- is_true: docs.0.processor_results.2.doc._ingest.timestamp
- is_true: docs.0.processor_results.2.doc._ingest.pipeline
- length: { docs.0.processor_results.3.doc._source: 3 }
- match: { docs.0.processor_results.3.doc._source.foo.bar.0.item: "hello" }
- match: { docs.0.processor_results.3.doc._source.field2.value: "_VALUE" }
- match: { docs.0.processor_results.3.doc._source.field3: "third_val" }
- length: { docs.0.processor_results.3.doc._ingest: 1 }
- length: { docs.0.processor_results.3.doc._ingest: 2 }
- is_true: docs.0.processor_results.3.doc._ingest.timestamp
- is_true: docs.0.processor_results.3.doc._ingest.pipeline

---
"Test simulate with exception thrown":
Expand Down Expand Up @@ -393,12 +397,14 @@ teardown:
- match: { docs.1.processor_results.0.doc._index: "index" }
- match: { docs.1.processor_results.0.doc._source.foo: 5 }
- match: { docs.1.processor_results.0.doc._source.bar: "hello" }
- length: { docs.1.processor_results.0.doc._ingest: 1 }
- length: { docs.1.processor_results.0.doc._ingest: 2 }
- is_true: docs.1.processor_results.0.doc._ingest.timestamp
- is_true: docs.1.processor_results.0.doc._ingest.pipeline
- match: { docs.1.processor_results.1.doc._source.foo: 5 }
- match: { docs.1.processor_results.1.doc._source.bar: "HELLO" }
- length: { docs.1.processor_results.1.doc._ingest: 1 }
- length: { docs.1.processor_results.1.doc._ingest: 2 }
- is_true: docs.1.processor_results.1.doc._ingest.timestamp
- is_true: docs.1.processor_results.1.doc._ingest.pipeline

---
"Test verbose simulate with on_failure":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,8 +646,14 @@ private static Object deepCopy(Object value) {
*/
public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Exception> handler) {
if (executedPipelines.add(pipeline.getId())) {
Object previousPipeline = ingestMetadata.put("pipeline", pipeline.getId());
pipeline.execute(this, (result, e) -> {
executedPipelines.remove(pipeline.getId());
if (previousPipeline != null) {
ingestMetadata.put("pipeline", previousPipeline);
} else {
ingestMetadata.remove("pipeline");
}
handler.accept(result, e);
});
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class PipelineProcessor extends AbstractProcessor {
private final TemplateScript.Factory pipelineTemplate;
private final IngestService ingestService;

private PipelineProcessor(String tag, TemplateScript.Factory pipelineTemplate, IngestService ingestService) {
PipelineProcessor(String tag, TemplateScript.Factory pipelineTemplate, IngestService ingestService) {
super(tag);
this.pipelineTemplate = pipelineTemplate;
this.ingestService = ingestService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,23 +91,17 @@ public void testExecuteVerboseItem() throws Exception {
assertThat(actualItemResponse, instanceOf(SimulateDocumentVerboseResult.class));
SimulateDocumentVerboseResult simulateDocumentVerboseResult = (SimulateDocumentVerboseResult) actualItemResponse;
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(2));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("test-id"));
IngestDocument firstProcessorIngestDocument = simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument();
assertThat(firstProcessorIngestDocument, not(sameInstance(this.ingestDocument)));
assertIngestDocument(firstProcessorIngestDocument, this.ingestDocument);
assertThat(firstProcessorIngestDocument.getSourceAndMetadata(), not(sameInstance(this.ingestDocument.getSourceAndMetadata())));

assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("test-id"));
assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(0), pipeline.getId(), ingestDocument);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), nullValue());

assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getProcessorTag(), equalTo("test-id"));
IngestDocument secondProcessorIngestDocument = simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument();
assertThat(secondProcessorIngestDocument, not(sameInstance(this.ingestDocument)));
assertIngestDocument(secondProcessorIngestDocument, this.ingestDocument);
assertThat(secondProcessorIngestDocument.getSourceAndMetadata(), not(sameInstance(this.ingestDocument.getSourceAndMetadata())));
assertThat(secondProcessorIngestDocument.getSourceAndMetadata(),
not(sameInstance(firstProcessorIngestDocument.getSourceAndMetadata())));
assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(1), pipeline.getId(), ingestDocument);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument().getSourceAndMetadata(),
not(sameInstance(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata())));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), nullValue());
}

public void testExecuteItem() throws Exception {
TestProcessor processor = new TestProcessor("processor_0", "mock", ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor));
Expand Down Expand Up @@ -147,10 +141,7 @@ public void testExecuteVerboseItemExceptionWithoutOnFailure() throws Exception {
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(2));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("processor_0"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), nullValue());
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), not(sameInstance(ingestDocument)));
assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), ingestDocument);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata(),
not(sameInstance(ingestDocument.getSourceAndMetadata())));
assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(0), pipeline.getId(), ingestDocument);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getProcessorTag(), equalTo("processor_1"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(), nullValue());
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), instanceOf(RuntimeException.class));
Expand Down Expand Up @@ -191,14 +182,12 @@ public void testExecuteVerboseItemWithOnFailure() throws Exception {
metadata.put(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD, "mock");
metadata.put(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD, "processor_0");
metadata.put(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD, "processor failed");
assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(1).getIngestDocument(),
ingestDocumentWithOnFailureMetadata);

assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(1), pipeline.getId(),
ingestDocumentWithOnFailureMetadata);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(1).getFailure(), nullValue());

assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getProcessorTag(), equalTo("processor_2"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getIngestDocument(), not(sameInstance(ingestDocument)));
assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(2).getIngestDocument(), ingestDocument);
assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(2), pipeline.getId(), ingestDocument);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(2).getFailure(), nullValue());
}

Expand All @@ -221,10 +210,7 @@ public void testExecuteVerboseItemExceptionWithIgnoreFailure() throws Exception
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(1));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("processor_0"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), sameInstance(exception));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), not(sameInstance(ingestDocument)));
assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), ingestDocument);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata(),
not(sameInstance(ingestDocument.getSourceAndMetadata())));
assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(0), pipeline.getId(), ingestDocument);
}

public void testExecuteVerboseItemWithoutExceptionAndWithIgnoreFailure() throws Exception {
Expand All @@ -245,10 +231,7 @@ public void testExecuteVerboseItemWithoutExceptionAndWithIgnoreFailure() throws
assertThat(simulateDocumentVerboseResult.getProcessorResults().size(), equalTo(1));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getProcessorTag(), equalTo("processor_0"));
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getFailure(), nullValue());
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), not(sameInstance(ingestDocument)));
assertIngestDocument(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument(), ingestDocument);
assertThat(simulateDocumentVerboseResult.getProcessorResults().get(0).getIngestDocument().getSourceAndMetadata(),
not(sameInstance(ingestDocument.getSourceAndMetadata())));
assertVerboseResult(simulateDocumentVerboseResult.getProcessorResults().get(0), pipeline.getId(), ingestDocument);
}

public void testExecuteItemWithFailure() throws Exception {
Expand Down Expand Up @@ -392,4 +375,19 @@ public String getType() {
}
}

private static void assertVerboseResult(SimulateProcessorResult result,
String expectedPipelineId,
IngestDocument expectedIngestDocument) {
IngestDocument simulateVerboseIngestDocument = result.getIngestDocument();
// Remove and compare pipeline key. It is always in the verbose result,
// since that is a snapshot of how the ingest doc looks during pipeline execution, but not in the final ingestDocument.
// The key gets added and removed during pipeline execution.
String actualPipelineId = (String) simulateVerboseIngestDocument.getIngestMetadata().remove("pipeline");
assertThat(actualPipelineId, equalTo(expectedPipelineId));

assertThat(simulateVerboseIngestDocument, not(sameInstance(expectedIngestDocument)));
assertIngestDocument(simulateVerboseIngestDocument, expectedIngestDocument);
assertThat(simulateVerboseIngestDocument.getSourceAndMetadata(), not(sameInstance(expectedIngestDocument.getSourceAndMetadata())));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -285,11 +285,12 @@ public void testBreakOnFailure() throws Exception {
public void testFailureProcessorIsInvokedOnFailure() {
TestProcessor onFailureProcessor = new TestProcessor(null, "on_failure", ingestDocument -> {
Map<String, Object> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.entrySet(), hasSize(4));
assertThat(ingestMetadata.entrySet(), hasSize(5));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("failure!"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test-processor"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), nullValue());
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PIPELINE_FIELD), equalTo("2"));
assertThat(ingestMetadata.get("pipeline"), equalTo("1"));
});

Pipeline pipeline2 = new Pipeline("2", null, null, new CompoundProcessor(new TestProcessor(new RuntimeException("failure!"))));
Expand Down
Loading