diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index b85bf085dabbb..1ed8b6058e6c9 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -82,6 +82,7 @@ public Map getProcessors(Processor.Parameters paramet processors.put(KeyValueProcessor.TYPE, new KeyValueProcessor.Factory()); processors.put(URLDecodeProcessor.TYPE, new URLDecodeProcessor.Factory()); processors.put(BytesProcessor.TYPE, new BytesProcessor.Factory()); + processors.put(PipelineProcessor.TYPE, new PipelineProcessor.Factory(parameters.ingestService)); processors.put(DissectProcessor.TYPE, new DissectProcessor.Factory()); return Collections.unmodifiableMap(processors); } diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/PipelineProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/PipelineProcessor.java new file mode 100644 index 0000000000000..77ffdb919193f --- /dev/null +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/PipelineProcessor.java @@ -0,0 +1,74 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.ingest.common; + +import java.util.Map; +import org.elasticsearch.ingest.AbstractProcessor; +import org.elasticsearch.ingest.ConfigurationUtils; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.IngestService; +import org.elasticsearch.ingest.Pipeline; +import org.elasticsearch.ingest.Processor; + +public class PipelineProcessor extends AbstractProcessor { + + public static final String TYPE = "pipeline"; + + private final String pipelineName; + + private final IngestService ingestService; + + private PipelineProcessor(String tag, String pipelineName, IngestService ingestService) { + super(tag); + this.pipelineName = pipelineName; + this.ingestService = ingestService; + } + + @Override + public void execute(IngestDocument ingestDocument) throws Exception { + Pipeline pipeline = ingestService.getPipeline(pipelineName); + if (pipeline == null) { + throw new IllegalStateException("Pipeline processor configured for non-existent pipeline [" + pipelineName + ']'); + } + ingestDocument.executePipeline(pipeline); + } + + @Override + public String getType() { + return TYPE; + } + + public static final class Factory implements Processor.Factory { + + private final IngestService ingestService; + + public Factory(IngestService ingestService) { + this.ingestService = ingestService; + } + + @Override + public PipelineProcessor create(Map registry, String processorTag, + Map config) throws Exception { + String pipeline = + ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "pipeline"); + return new PipelineProcessor(processorTag, pipeline, ingestService); + } + } +} diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java new file mode 100644 index 0000000000000..5baf3cf822d72 --- /dev/null +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/PipelineProcessorTests.java @@ -0,0 +1,115 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.ingest.common; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ingest.CompoundProcessor; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.IngestService; +import org.elasticsearch.ingest.Pipeline; +import org.elasticsearch.ingest.Processor; +import org.elasticsearch.ingest.RandomDocumentPicks; +import org.elasticsearch.test.ESTestCase; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class PipelineProcessorTests extends ESTestCase { + + public void testExecutesPipeline() throws Exception { + String pipelineId = "pipeline"; + IngestService ingestService = mock(IngestService.class); + CompletableFuture invoked = new CompletableFuture<>(); + IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); + Pipeline pipeline = new Pipeline( + pipelineId, null, null, + new CompoundProcessor(new Processor() { + @Override + public void execute(final IngestDocument ingestDocument) throws Exception { + invoked.complete(ingestDocument); + } + + @Override + public String getType() { + return null; + } + + @Override + public String getTag() { + return null; + } + }) + ); + when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + Map config = new HashMap<>(); + config.put("pipeline", pipelineId); + factory.create(Collections.emptyMap(), null, config).execute(testIngestDocument); + assertEquals(testIngestDocument, invoked.get()); + } + + public void testThrowsOnMissingPipeline() throws Exception { + IngestService ingestService = mock(IngestService.class); + IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + Map config = new HashMap<>(); + config.put("pipeline", "missingPipelineId"); + IllegalStateException e = expectThrows( + IllegalStateException.class, + () -> factory.create(Collections.emptyMap(), null, config).execute(testIngestDocument) + ); + assertEquals( + "Pipeline processor configured for non-existent pipeline [missingPipelineId]", e.getMessage() + ); + } + + public void testThrowsOnRecursivePipelineInvocations() throws Exception { + String innerPipelineId = "inner"; + String outerPipelineId = "outer"; + IngestService ingestService = mock(IngestService.class); + IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>()); + Map outerConfig = new HashMap<>(); + outerConfig.put("pipeline", innerPipelineId); + PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService); + Pipeline outer = new Pipeline( + outerPipelineId, null, null, + new CompoundProcessor(factory.create(Collections.emptyMap(), null, outerConfig)) + ); + Map innerConfig = new HashMap<>(); + innerConfig.put("pipeline", outerPipelineId); + Pipeline inner = new Pipeline( + innerPipelineId, null, null, + new CompoundProcessor(factory.create(Collections.emptyMap(), null, innerConfig)) + ); + when(ingestService.getPipeline(outerPipelineId)).thenReturn(outer); + when(ingestService.getPipeline(innerPipelineId)).thenReturn(inner); + outerConfig.put("pipeline", innerPipelineId); + ElasticsearchException e = expectThrows( + ElasticsearchException.class, + () -> factory.create(Collections.emptyMap(), null, outerConfig).execute(testIngestDocument) + ); + assertEquals( + "Recursive invocation of pipeline [inner] detected.", e.getRootCause().getMessage() + ); + } +} diff --git a/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml new file mode 100644 index 0000000000000..355ba2d42104a --- /dev/null +++ b/modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/210_pipeline_processor.yml @@ -0,0 +1,113 @@ +--- +teardown: +- do: + ingest.delete_pipeline: + id: "inner" + ignore: 404 + +- do: + ingest.delete_pipeline: + id: "outer" + ignore: 404 + +--- +"Test Pipeline Processor with Simple Inner Pipeline": +- do: + ingest.put_pipeline: + id: "inner" + body: > + { + "description" : "inner pipeline", + "processors" : [ + { + "set" : { + "field": "foo", + "value": "bar" + } + }, + { + "set" : { + "field": "baz", + "value": "blub" + } + } + ] + } +- match: { acknowledged: true } + +- do: + ingest.put_pipeline: + id: "outer" + body: > + { + "description" : "outer pipeline", + "processors" : [ + { + "pipeline" : { + "pipeline": "inner" + } + } + ] + } +- match: { acknowledged: true } + +- do: + index: + index: test + type: test + id: 1 + pipeline: "outer" + body: {} + +- do: + get: + index: test + type: test + id: 1 +- match: { _source.foo: "bar" } +- match: { _source.baz: "blub" } + +--- +"Test Pipeline Processor with Circular Pipelines": +- do: + ingest.put_pipeline: + id: "outer" + body: > + { + "description" : "outer pipeline", + "processors" : [ + { + "pipeline" : { + "pipeline": "inner" + } + } + ] + } +- match: { acknowledged: true } + +- do: + ingest.put_pipeline: + id: "inner" + body: > + { + "description" : "inner pipeline", + "processors" : [ + { + "pipeline" : { + "pipeline": "outer" + } + } + ] + } +- match: { acknowledged: true } + +- do: + catch: /illegal_state_exception/ + index: + index: test + type: test + id: 1 + pipeline: "outer" + body: {} +- match: { error.root_cause.0.type: "exception" } +- match: { error.root_cause.0.reason: "java.lang.IllegalArgumentException: java.lang.IllegalStateException: Recursive invocation of pipeline [inner] detected." } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java index aad55e12cefff..e218168eeb7b5 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestDocument.java @@ -19,6 +19,9 @@ package org.elasticsearch.ingest; +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.Set; import org.elasticsearch.common.Strings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.IdFieldMapper; @@ -55,6 +58,9 @@ public final class IngestDocument { private final Map sourceAndMetadata; private final Map ingestMetadata; + // Contains all pipelines that have been executed for this document + private final Set executedPipelines = Collections.newSetFromMap(new IdentityHashMap<>()); + public IngestDocument(String index, String type, String id, String routing, Long version, VersionType versionType, Map source) { this.sourceAndMetadata = new HashMap<>(); @@ -632,6 +638,19 @@ private static Object deepCopy(Object value) { } } + /** + * Executes the given pipeline with for this document unless the pipeline has already been executed + * for this document. + * @param pipeline Pipeline to execute + * @throws Exception On exception in pipeline execution + */ + public void executePipeline(Pipeline pipeline) throws Exception { + if (this.executedPipelines.add(pipeline) == false) { + throw new IllegalStateException("Recursive invocation of pipeline [" + pipeline.getId() + "] detected."); + } + pipeline.execute(this); + } + @Override public boolean equals(Object obj) { if (obj == this) { return true; } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index ae3416ef3b06d..eee14e958699f 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -92,7 +92,7 @@ public IngestService(ClusterService clusterService, ThreadPool threadPool, threadPool.getThreadContext(), threadPool::relativeTimeInMillis, (delay, command) -> threadPool.schedule( TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC, command - ) + ), this ) ); this.threadPool = threadPool; diff --git a/server/src/main/java/org/elasticsearch/ingest/Processor.java b/server/src/main/java/org/elasticsearch/ingest/Processor.java index c318d478814de..15a26d3749191 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Processor.java +++ b/server/src/main/java/org/elasticsearch/ingest/Processor.java @@ -97,22 +97,26 @@ class Parameters { * instances that have run prior to in ingest. */ public final ThreadContext threadContext; - + public final LongSupplier relativeTimeSupplier; - + + public final IngestService ingestService; + /** * Provides scheduler support */ public final BiFunction> scheduler; public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext, - LongSupplier relativeTimeSupplier, BiFunction> scheduler) { + LongSupplier relativeTimeSupplier, BiFunction> scheduler, + IngestService ingestService) { this.env = env; this.scriptService = scriptService; this.threadContext = threadContext; this.analysisRegistry = analysisRegistry; this.relativeTimeSupplier = relativeTimeSupplier; this.scheduler = scheduler; + this.ingestService = ingestService; } }