Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for _meta field to ingest pipelines #75905

Merged
merged 16 commits into from
Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions docs/reference/ingest/apis/put-pipeline.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,53 @@ Processors run sequentially in the order specified.
`version`::
(Optional, integer)
Version number used by external systems to track ingest pipelines.

masseyke marked this conversation as resolved.
Show resolved Hide resolved
+
This parameter is intended for external systems only. {es} does not use or
validate pipeline version numbers.

`_meta`::
(Optional, object)
Optional metadata about the ingest pipeline. May have any contents. This
map is not automatically generated by {es}.
// end::pipeline-object[]

[[put-pipeline-api-example]]
==== {api-examples-title}

[[pipeline-metadata]]
===== Pipeline metadata

You can use the `_meta` parameter to add arbitrary metadata to a pipeline.
This user-defined object is stored in the cluster state,
so keeping it short is preferable.

The `_meta` parameter is optional and not automatically generated or used by {es}.

To unset `_meta`, replace the pipeline without specifying one.

[source,console]
--------------------------------------------------
PUT /_ingest/pipeline/my-pipeline-id
{
"description" : "My optional pipeline description",
"processors" : [
{
"set" : {
"description" : "My optional processor description",
"field": "my-keyword-field",
"value": "foo"
}
}
],
"_meta": {
"reason": "set my-keyword-field to foo",
"serialization": {
"class": "MyPipeline",
"id": 10
}
}
}
--------------------------------------------------

To check the `_meta`, use the <<get-pipeline-api,get pipeline>> API.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.action.ingest;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
Expand All @@ -22,8 +23,10 @@
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.ingest.IngestInfo;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -55,6 +58,12 @@ public PutPipelineTransportAction(ThreadPool threadPool, TransportService transp
@Override
protected void masterOperation(Task task, PutPipelineRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener)
throws Exception {
if (state.getNodes().getMinNodeVersion().before(Version.V_8_0_0)) {
Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
if (pipelineConfig.containsKey(Pipeline.META_KEY)) {
throw new IllegalStateException("pipelines with _meta field require minimum node version of " + Version.V_8_0_0);
}
}
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.clear()
.addMetric(NodesInfoRequest.Metric.INGEST.metricName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean v
List<SimulateProcessorResult> processorResultList = new CopyOnWriteArrayList<>();
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), null, processorResultList);
Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(),
verbosePipelineProcessor);
pipeline.getMetadata(), verbosePipelineProcessor);
ingestDocument.executePipeline(verbosePipeline, (result, e) -> {
handler.accept(new SimulateDocumentVerboseResult(processorResultList), e);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ public String getType() {
}
};
String description = "this is a place holder pipeline, because pipeline with id [" + id + "] could not be loaded";
return new Pipeline(id, description, null, new CompoundProcessor(failureProcessor));
return new Pipeline(id, description, null, null, new CompoundProcessor(failureProcessor));
}

static class PipelineHolder {
Expand Down
21 changes: 16 additions & 5 deletions server/src/main/java/org/elasticsearch/ingest/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,30 @@ public final class Pipeline {
public static final String PROCESSORS_KEY = "processors";
public static final String VERSION_KEY = "version";
public static final String ON_FAILURE_KEY = "on_failure";
public static final String META_KEY = "_meta";

private final String id;
@Nullable
private final String description;
@Nullable
private final Integer version;
@Nullable
private final Map<String, Object> metadata;
private final CompoundProcessor compoundProcessor;
private final IngestMetric metrics;
private final LongSupplier relativeTimeProvider;

public Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor) {
this(id, description, version, compoundProcessor, System::nanoTime);
public Pipeline(String id, @Nullable String description, @Nullable Integer version,
@Nullable Map<String, Object> metadata, CompoundProcessor compoundProcessor) {
this(id, description, version, metadata, compoundProcessor, System::nanoTime);
}

//package private for testing
Pipeline(String id, @Nullable String description, @Nullable Integer version, CompoundProcessor compoundProcessor,
LongSupplier relativeTimeProvider) {
Pipeline(String id, @Nullable String description, @Nullable Integer version, @Nullable Map<String, Object> metadata,
CompoundProcessor compoundProcessor, LongSupplier relativeTimeProvider) {
this.id = id;
this.description = description;
this.metadata = metadata;
this.compoundProcessor = compoundProcessor;
this.version = version;
this.metrics = new IngestMetric();
Expand All @@ -58,6 +63,7 @@ public static Pipeline create(String id, Map<String, Object> config,
Map<String, Processor.Factory> processorFactories, ScriptService scriptService) throws Exception {
String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY);
Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null);
Map<String, Object> metadata = ConfigurationUtils.readOptionalMap(null, null, config, META_KEY);
List<Map<String, Object>> processorConfigs = ConfigurationUtils.readList(null, null, config, PROCESSORS_KEY);
List<Processor> processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, scriptService, processorFactories);
List<Map<String, Object>> onFailureProcessorConfigs =
Expand All @@ -73,7 +79,7 @@ public static Pipeline create(String id, Map<String, Object> config,
}
CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.unmodifiableList(processors),
Collections.unmodifiableList(onFailureProcessors));
return new Pipeline(id, description, version, compoundProcessor);
return new Pipeline(id, description, version, metadata, compoundProcessor);
}

/**
Expand Down Expand Up @@ -120,6 +126,11 @@ public Integer getVersion() {
return version;
}

@Nullable
public Map<String, Object> getMetadata() {
return metadata;
}

/**
* Get the underlying {@link CompoundProcessor} containing the Pipeline's processors
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@

import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.common.xcontent.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ContextParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ParseField;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
processorResultList.add(new SimulateProcessorResult(actualProcessor.getType(), actualProcessor.getTag(),
actualProcessor.getDescription(), conditionalWithResult));
Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(),
verbosePipelineProcessor);
pipeline.getMetadata(), verbosePipelineProcessor);
ingestDocument.executePipeline(verbosePipeline, handler);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void destroy() {

public void testExecuteVerboseItem() throws Exception {
TestProcessor processor = new TestProcessor("test-id", "mock", null, ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor));
Pipeline pipeline = new Pipeline("_id", "_description", version, null, new CompoundProcessor(processor, processor));

CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
Expand All @@ -93,7 +93,7 @@ public void testExecuteVerboseItem() throws Exception {
}
public void testExecuteItem() throws Exception {
TestProcessor processor = new TestProcessor("processor_0", "mock", null, ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor));
Pipeline pipeline = new Pipeline("_id", "_description", version, null, new CompoundProcessor(processor, processor));
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
executionService.executeDocument(pipeline, ingestDocument, false, (r, e) -> {
Expand All @@ -113,7 +113,7 @@ public void testExecuteVerboseItemExceptionWithoutOnFailure() throws Exception {
TestProcessor processor1 = new TestProcessor("processor_0", "mock", null, ingestDocument -> {});
TestProcessor processor2 = new TestProcessor("processor_1", "mock", null, new RuntimeException("processor failed"));
TestProcessor processor3 = new TestProcessor("processor_2", "mock", null, ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2, processor3));
Pipeline pipeline = new Pipeline("_id", "_description", version, null, new CompoundProcessor(processor1, processor2, processor3));
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> {
Expand Down Expand Up @@ -142,8 +142,8 @@ public void testExecuteVerboseItemWithOnFailure() throws Exception {
TestProcessor processor1 = new TestProcessor("processor_0", "mock", null, new RuntimeException("processor failed"));
TestProcessor processor2 = new TestProcessor("processor_1", "mock", null, ingestDocument -> {});
TestProcessor processor3 = new TestProcessor("processor_2", "mock", null, ingestDocument -> {});
Pipeline pipeline = new Pipeline("_id", "_description", version,
new CompoundProcessor(new CompoundProcessor(false, Collections.singletonList(processor1),
Pipeline pipeline = new Pipeline("_id", "_description", version, null,
new CompoundProcessor(new CompoundProcessor(false, Collections.singletonList(processor1),
Collections.singletonList(processor2)), processor3));
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
Expand Down Expand Up @@ -184,7 +184,7 @@ public void testExecuteVerboseItemExceptionWithIgnoreFailure() throws Exception
RuntimeException exception = new RuntimeException("processor failed");
TestProcessor testProcessor = new TestProcessor("processor_0", "mock", null, exception);
CompoundProcessor processor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList());
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor));
Pipeline pipeline = new Pipeline("_id", "_description", version, null, new CompoundProcessor(processor));
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> {
Expand All @@ -205,7 +205,7 @@ public void testExecuteVerboseItemExceptionWithIgnoreFailure() throws Exception
public void testExecuteVerboseItemWithoutExceptionAndWithIgnoreFailure() throws Exception {
TestProcessor testProcessor = new TestProcessor("processor_0", "mock", null, ingestDocument -> { });
CompoundProcessor processor = new CompoundProcessor(true, Collections.singletonList(testProcessor), Collections.emptyList());
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor));
Pipeline pipeline = new Pipeline("_id", "_description", version, null, new CompoundProcessor(processor));
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
executionService.executeDocument(pipeline, ingestDocument, true, (r, e) -> {
Expand All @@ -225,7 +225,7 @@ public void testExecuteVerboseItemWithoutExceptionAndWithIgnoreFailure() throws

public void testExecuteItemWithFailure() throws Exception {
TestProcessor processor = new TestProcessor(ingestDocument -> { throw new RuntimeException("processor failed"); });
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor, processor));
Pipeline pipeline = new Pipeline("_id", "_description", version, null, new CompoundProcessor(processor, processor));
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
executionService.executeDocument(pipeline, ingestDocument, false, (r, e) -> {
Expand All @@ -247,7 +247,7 @@ public void testExecuteItemWithFailure() throws Exception {
public void testDropDocument() throws Exception {
TestProcessor processor1 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field", "value"));
Processor processor2 = new DropProcessor.Factory().create(Map.of(), null, null, Map.of());
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2));
Pipeline pipeline = new Pipeline("_id", "_description", version, null, new CompoundProcessor(processor1, processor2));

CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
Expand All @@ -267,7 +267,7 @@ public void testDropDocument() throws Exception {
public void testDropDocumentVerbose() throws Exception {
TestProcessor processor1 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field", "value"));
Processor processor2 = new DropProcessor.Factory().create(Map.of(), null, null, Map.of());
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2));
Pipeline pipeline = new Pipeline("_id", "_description", version, null, new CompoundProcessor(processor1, processor2));

CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
Expand All @@ -291,7 +291,7 @@ public void testDropDocumentVerboseExtraProcessor() throws Exception {
TestProcessor processor1 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field1", "value"));
Processor processor2 = new DropProcessor.Factory().create(Map.of(), null, null, Map.of());
TestProcessor processor3 = new TestProcessor(ingestDocument -> ingestDocument.setFieldValue("field2", "value"));
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1, processor2, processor3));
Pipeline pipeline = new Pipeline("_id", "_description", version, null, new CompoundProcessor(processor1, processor2, processor3));

CountDownLatch latch = new CountDownLatch(1);
AtomicReference<SimulateDocumentResult> holder = new AtomicReference<>();
Expand Down Expand Up @@ -338,7 +338,7 @@ public String getType() {
return "none-of-your-business";
}
};
Pipeline pipeline = new Pipeline("_id", "_description", version, new CompoundProcessor(processor1));
Pipeline pipeline = new Pipeline("_id", "_description", version, null, new CompoundProcessor(processor1));
SimulatePipelineRequest.Parsed request = new SimulatePipelineRequest.Parsed(pipeline, documents, false);

AtomicReference<SimulatePipelineResponse> responseHolder = new AtomicReference<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void init() throws IOException {
TestProcessor processor = new TestProcessor(ingestDocument -> {
});
CompoundProcessor pipelineCompoundProcessor = new CompoundProcessor(processor);
Pipeline pipeline = new Pipeline(SIMULATED_PIPELINE_ID, null, null, pipelineCompoundProcessor);
Pipeline pipeline = new Pipeline(SIMULATED_PIPELINE_ID, null, null, null, pipelineCompoundProcessor);
Map<String, Processor.Factory> registry =
Collections.singletonMap("mock_processor", (factories, tag, description, config) -> processor);
ingestService = mock(IngestService.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,9 @@ public void testFailureProcessorIsInvokedOnFailure() {
assertThat(ingestMetadata.get("pipeline"), equalTo("1"));
});

Pipeline pipeline2 = new Pipeline("2", null, null, new CompoundProcessor(new TestProcessor(new RuntimeException("failure!"))));
Pipeline pipeline1 = new Pipeline("1", null, null, new CompoundProcessor(false,List.of(new AbstractProcessor(null, null) {
Pipeline pipeline2 = new Pipeline("2", null, null, null,
new CompoundProcessor(new TestProcessor(new RuntimeException("failure!"))));
Pipeline pipeline1 = new Pipeline("1", null, null, null, new CompoundProcessor(false,List.of(new AbstractProcessor(null, null) {
@Override
public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Exception> handler) {
ingestDocument.executePipeline(pipeline2, handler);
Expand Down Expand Up @@ -326,9 +327,9 @@ public void testNewCompoundProcessorException() {
}

public void testNewCompoundProcessorExceptionPipelineOrigin() {
Pipeline pipeline2 = new Pipeline("2", null, null,
Pipeline pipeline2 = new Pipeline("2", null, null, null,
new CompoundProcessor(new TestProcessor("my_tag", "my_type", null, new RuntimeException())));
Pipeline pipeline1 = new Pipeline("1", null, null, new CompoundProcessor(new AbstractProcessor(null, null) {
Pipeline pipeline1 = new Pipeline("1", null, null, null, new CompoundProcessor(new AbstractProcessor(null, null) {
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
throw new UnsupportedOperationException();
Expand Down
Loading