diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 6af9735f7e998..7ef48cb5cd854 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -512,7 +512,7 @@ private long relativeTime() { void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListener listener) { long ingestStartTimeInNanos = System.nanoTime(); BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original); - ingestService.getPipelineExecutionService().executeBulkRequest(() -> bulkRequestModifier, (indexRequest, exception) -> { + ingestService.executeBulkRequest(() -> bulkRequestModifier, (indexRequest, exception) -> { logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]", indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), exception); bulkRequestModifier.markCurrentItemAsFailed(exception); diff --git a/server/src/main/java/org/elasticsearch/action/ingest/DeletePipelineTransportAction.java b/server/src/main/java/org/elasticsearch/action/ingest/DeletePipelineTransportAction.java index 45cb83634f84f..322859bd64562 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/DeletePipelineTransportAction.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/DeletePipelineTransportAction.java @@ -26,26 +26,23 @@ import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.ingest.PipelineStore; -import org.elasticsearch.node.NodeService; +import org.elasticsearch.ingest.IngestService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; public class DeletePipelineTransportAction extends TransportMasterNodeAction { - private final PipelineStore pipelineStore; - private final ClusterService clusterService; + private final IngestService ingestService; @Inject - public DeletePipelineTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, + public DeletePipelineTransportAction(Settings settings, ThreadPool threadPool, IngestService ingestService, TransportService transportService, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, NodeService nodeService) { - super(settings, DeletePipelineAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, DeletePipelineRequest::new); - this.clusterService = clusterService; - this.pipelineStore = nodeService.getIngestService().getPipelineStore(); + IndexNameExpressionResolver indexNameExpressionResolver) { + super(settings, DeletePipelineAction.NAME, transportService, ingestService.getClusterService(), + threadPool, actionFilters, indexNameExpressionResolver, DeletePipelineRequest::new); + this.ingestService = ingestService; } @Override @@ -59,8 +56,9 @@ protected WritePipelineResponse newResponse() { } @Override - protected void masterOperation(DeletePipelineRequest request, ClusterState state, ActionListener listener) throws Exception { - pipelineStore.delete(clusterService, request, listener); + protected void masterOperation(DeletePipelineRequest request, ClusterState state, + ActionListener listener) throws Exception { + ingestService.delete(request, listener); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineTransportAction.java b/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineTransportAction.java index 191ed87a42cde..540f46982a569 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineTransportAction.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineTransportAction.java @@ -29,21 +29,17 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.ingest.PipelineStore; -import org.elasticsearch.node.NodeService; +import org.elasticsearch.ingest.IngestService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; public class GetPipelineTransportAction extends TransportMasterNodeReadAction { - - private final PipelineStore pipelineStore; - + @Inject public GetPipelineTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, NodeService nodeService) { + IndexNameExpressionResolver indexNameExpressionResolver) { super(settings, GetPipelineAction.NAME, transportService, clusterService, threadPool, actionFilters, GetPipelineRequest::new, indexNameExpressionResolver); - this.pipelineStore = nodeService.getIngestService().getPipelineStore(); } @Override @@ -58,7 +54,7 @@ protected GetPipelineResponse newResponse() { @Override protected void masterOperation(GetPipelineRequest request, ClusterState state, ActionListener listener) throws Exception { - listener.onResponse(new GetPipelineResponse(pipelineStore.getPipelines(state, request.getIds()))); + listener.onResponse(new GetPipelineResponse(IngestService.getPipelines(state, request.getIds()))); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java b/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java index 17af73c167704..ecd4905092a94 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java @@ -31,12 +31,10 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.ingest.PipelineStore; +import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.IngestInfo; -import org.elasticsearch.node.NodeService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -45,19 +43,19 @@ public class PutPipelineTransportAction extends TransportMasterNodeAction { - private final PipelineStore pipelineStore; - private final ClusterService clusterService; + private final IngestService ingestService; private final NodeClient client; @Inject - public PutPipelineTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, - TransportService transportService, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, NodeService nodeService, - NodeClient client) { - super(settings, PutPipelineAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, PutPipelineRequest::new); - this.clusterService = clusterService; + public PutPipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + IngestService ingestService, NodeClient client) { + super( + settings, PutPipelineAction.NAME, transportService, ingestService.getClusterService(), + threadPool, actionFilters, indexNameExpressionResolver, PutPipelineRequest::new + ); this.client = client; - this.pipelineStore = nodeService.getIngestService().getPipelineStore(); + this.ingestService = ingestService; } @Override @@ -83,7 +81,7 @@ public void onResponse(NodesInfoResponse nodeInfos) { for (NodeInfo nodeInfo : nodeInfos.getNodes()) { ingestInfos.put(nodeInfo.getNode(), nodeInfo.getIngest()); } - pipelineStore.put(clusterService, ingestInfos, request, listener); + ingestService.putPipeline(ingestInfos, request, listener); } catch (Exception e) { onFailure(e); } diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java index 9a7d6bb7feea9..8405bb85b4b11 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java @@ -32,8 +32,8 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.Pipeline; -import org.elasticsearch.ingest.PipelineStore; import java.io.IOException; import java.util.ArrayList; @@ -164,14 +164,13 @@ public boolean isVerbose() { } } - private static final Pipeline.Factory PIPELINE_FACTORY = new Pipeline.Factory(); static final String SIMULATED_PIPELINE_ID = "_simulate_pipeline"; - static Parsed parseWithPipelineId(String pipelineId, Map config, boolean verbose, PipelineStore pipelineStore) { + static Parsed parseWithPipelineId(String pipelineId, Map config, boolean verbose, IngestService ingestService) { if (pipelineId == null) { throw new IllegalArgumentException("param [pipeline] is null"); } - Pipeline pipeline = pipelineStore.get(pipelineId); + Pipeline pipeline = ingestService.getPipeline(pipelineId); if (pipeline == null) { throw new IllegalArgumentException("pipeline [" + pipelineId + "] does not exist"); } @@ -179,9 +178,9 @@ static Parsed parseWithPipelineId(String pipelineId, Map config, return new Parsed(pipeline, ingestDocumentList, verbose); } - static Parsed parse(Map config, boolean verbose, PipelineStore pipelineStore) throws Exception { + static Parsed parse(Map config, boolean verbose, IngestService pipelineStore) throws Exception { Map pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE); - Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactories()); + Pipeline pipeline = Pipeline.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactories()); List ingestDocumentList = parseDocs(config); return new Parsed(pipeline, ingestDocumentList, verbose); } diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java index 2e898c1895f9a..ad8577d5244de 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java @@ -26,8 +26,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.ingest.PipelineStore; -import org.elasticsearch.node.NodeService; +import org.elasticsearch.ingest.IngestService; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -36,15 +35,15 @@ public class SimulatePipelineTransportAction extends HandledTransportAction { - private final PipelineStore pipelineStore; + private final IngestService ingestService; private final SimulateExecutionService executionService; @Inject public SimulatePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, - ActionFilters actionFilters, NodeService nodeService) { + ActionFilters actionFilters, IngestService ingestService) { super(settings, SimulatePipelineAction.NAME, transportService, actionFilters, (Writeable.Reader) SimulatePipelineRequest::new); - this.pipelineStore = nodeService.getIngestService().getPipelineStore(); + this.ingestService = ingestService; this.executionService = new SimulateExecutionService(threadPool); } @@ -55,9 +54,9 @@ protected void doExecute(Task task, SimulatePipelineRequest request, ActionListe final SimulatePipelineRequest.Parsed simulateRequest; try { if (request.getId() != null) { - simulateRequest = SimulatePipelineRequest.parseWithPipelineId(request.getId(), source, request.isVerbose(), pipelineStore); + simulateRequest = SimulatePipelineRequest.parseWithPipelineId(request.getId(), source, request.isVerbose(), ingestService); } else { - simulateRequest = SimulatePipelineRequest.parse(source, request.isVerbose(), pipelineStore); + simulateRequest = SimulatePipelineRequest.parse(source, request.isVerbose(), ingestService); } } catch (Exception e) { listener.onFailure(e); diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 01bc402e43ba5..3b0f7e3ab218a 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -22,14 +22,42 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.ScheduledFuture; -import java.util.function.BiFunction; - -import org.elasticsearch.common.settings.Settings; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.ingest.DeletePipelineRequest; +import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.ingest.WritePipelineResponse; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateApplier; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.metrics.CounterMetric; +import org.elasticsearch.common.metrics.MeanMetric; +import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.env.Environment; +import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.index.VersionType; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.script.ScriptService; @@ -38,20 +66,35 @@ /** * Holder class for several ingest related services. */ -public class IngestService { +public class IngestService implements ClusterStateApplier { public static final String NOOP_PIPELINE_NAME = "_none"; + private final ClusterService clusterService; private final PipelineStore pipelineStore; private final PipelineExecutionService pipelineExecutionService; - public IngestService(Settings settings, ThreadPool threadPool, + public IngestService(ClusterService clusterService, ThreadPool threadPool, Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, List ingestPlugins) { - BiFunction> scheduler = - (delay, command) -> threadPool.schedule(TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC, command); - Processor.Parameters parameters = new Processor.Parameters(env, scriptService, analysisRegistry, - threadPool.getThreadContext(), threadPool::relativeTimeInMillis, scheduler); + this.clusterService = clusterService; + this.pipelineStore = new PipelineStore( + processorFactories( + ingestPlugins, + new Processor.Parameters( + env, scriptService, analysisRegistry, + threadPool.getThreadContext(), threadPool::relativeTimeInMillis, + (delay, command) -> threadPool.schedule( + TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC, command + ) + ) + ) + ); + this.pipelineExecutionService = new PipelineExecutionService(pipelineStore, threadPool); + } + + private static Map processorFactories(List ingestPlugins, + Processor.Parameters parameters) { Map processorFactories = new HashMap<>(); for (IngestPlugin ingestPlugin : ingestPlugins) { Map newProcessors = ingestPlugin.getProcessors(parameters); @@ -61,24 +104,458 @@ public IngestService(Settings settings, ThreadPool threadPool, } } } - this.pipelineStore = new PipelineStore(settings, Collections.unmodifiableMap(processorFactories)); - this.pipelineExecutionService = new PipelineExecutionService(pipelineStore, threadPool); + return Collections.unmodifiableMap(processorFactories); + } + + public ClusterService getClusterService() { + return clusterService; + } + + /** + * Deletes the pipeline specified by id in the request. + */ + public void delete(DeletePipelineRequest request, + ActionListener listener) { + clusterService.submitStateUpdateTask("delete-pipeline-" + request.getId(), + new AckedClusterStateUpdateTask(request, listener) { + + @Override + protected WritePipelineResponse newResponse(boolean acknowledged) { + return new WritePipelineResponse(acknowledged); + } + + @Override + public ClusterState execute(ClusterState currentState) { + return innerDelete(request, currentState); + } + }); + } + + static ClusterState innerDelete(DeletePipelineRequest request, ClusterState currentState) { + IngestMetadata currentIngestMetadata = currentState.metaData().custom(IngestMetadata.TYPE); + if (currentIngestMetadata == null) { + return currentState; + } + Map pipelines = currentIngestMetadata.getPipelines(); + Set toRemove = new HashSet<>(); + for (String pipelineKey : pipelines.keySet()) { + if (Regex.simpleMatch(request.getId(), pipelineKey)) { + toRemove.add(pipelineKey); + } + } + if (toRemove.isEmpty() && Regex.isMatchAllPattern(request.getId()) == false) { + throw new ResourceNotFoundException("pipeline [{}] is missing", request.getId()); + } else if (toRemove.isEmpty()) { + return currentState; + } + final Map pipelinesCopy = new HashMap<>(pipelines); + for (String key : toRemove) { + pipelinesCopy.remove(key); + } + ClusterState.Builder newState = ClusterState.builder(currentState); + newState.metaData(MetaData.builder(currentState.getMetaData()) + .putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelinesCopy)) + .build()); + return newState.build(); + } + + /** + * @return pipeline configuration specified by id. If multiple ids or wildcards are specified multiple pipelines + * may be returned + */ + // Returning PipelineConfiguration instead of Pipeline, because Pipeline and Processor interface don't + // know how to serialize themselves. + public static List getPipelines(ClusterState clusterState, String... ids) { + IngestMetadata ingestMetadata = clusterState.getMetaData().custom(IngestMetadata.TYPE); + return innerGetPipelines(ingestMetadata, ids); + } + + static List innerGetPipelines(IngestMetadata ingestMetadata, String... ids) { + if (ingestMetadata == null) { + return Collections.emptyList(); + } + + // if we didn't ask for _any_ ID, then we get them all (this is the same as if they ask for '*') + if (ids.length == 0) { + return new ArrayList<>(ingestMetadata.getPipelines().values()); + } + + List result = new ArrayList<>(ids.length); + for (String id : ids) { + if (Regex.isSimpleMatchPattern(id)) { + for (Map.Entry entry : ingestMetadata.getPipelines().entrySet()) { + if (Regex.simpleMatch(id, entry.getKey())) { + result.add(entry.getValue()); + } + } + } else { + PipelineConfiguration pipeline = ingestMetadata.getPipelines().get(id); + if (pipeline != null) { + result.add(pipeline); + } + } + } + return result; + } + + public void executeBulkRequest(Iterable> actionRequests, BiConsumer itemFailureHandler, + Consumer completionHandler) { + pipelineExecutionService.executeBulkRequest(actionRequests, itemFailureHandler, completionHandler); } - public PipelineStore getPipelineStore() { - return pipelineStore; + public IngestStats stats() { + return pipelineExecutionService.stats(); } - public PipelineExecutionService getPipelineExecutionService() { - return pipelineExecutionService; + /** + * Stores the specified pipeline definition in the request. + */ + public void putPipeline(Map ingestInfos, PutPipelineRequest request, + ActionListener listener) throws Exception { + pipelineStore.put(clusterService, ingestInfos, request, listener); + } + + /** + * Returns the pipeline by the specified id + */ + public Pipeline getPipeline(String id) { + return pipelineStore.get(id); + } + + public Map getProcessorFactories() { + return pipelineStore.getProcessorFactories(); } public IngestInfo info() { - Map processorFactories = pipelineStore.getProcessorFactories(); + Map processorFactories = getProcessorFactories(); List processorInfoList = new ArrayList<>(processorFactories.size()); for (Map.Entry entry : processorFactories.entrySet()) { processorInfoList.add(new ProcessorInfo(entry.getKey())); } return new IngestInfo(processorInfoList); } + + Map pipelines() { + return pipelineStore.pipelines; + } + + void validatePipeline(Map ingestInfos, PutPipelineRequest request) throws Exception { + pipelineStore.validatePipeline(ingestInfos, request); + } + + void updatePipelineStats(IngestMetadata ingestMetadata) { + pipelineExecutionService.updatePipelineStats(ingestMetadata); + } + + @Override + public void applyClusterState(final ClusterChangedEvent event) { + ClusterState state = event.state(); + pipelineStore.innerUpdatePipelines(event.previousState(), state); + IngestMetadata ingestMetadata = state.getMetaData().custom(IngestMetadata.TYPE); + if (ingestMetadata != null) { + pipelineExecutionService.updatePipelineStats(ingestMetadata); + } + } + + public static final class PipelineStore { + + private final Map processorFactories; + + // Ideally this should be in IngestMetadata class, but we don't have the processor factories around there. + // We know of all the processor factories when a node with all its plugin have been initialized. Also some + // processor factories rely on other node services. Custom metadata is statically registered when classes + // are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around. + volatile Map pipelines = new HashMap<>(); + + private PipelineStore(Map processorFactories) { + this.processorFactories = processorFactories; + } + + void innerUpdatePipelines(ClusterState previousState, ClusterState state) { + if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { + return; + } + + IngestMetadata ingestMetadata = state.getMetaData().custom(IngestMetadata.TYPE); + IngestMetadata previousIngestMetadata = previousState.getMetaData().custom(IngestMetadata.TYPE); + if (Objects.equals(ingestMetadata, previousIngestMetadata)) { + return; + } + + Map pipelines = new HashMap<>(); + List exceptions = new ArrayList<>(); + for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) { + try { + pipelines.put(pipeline.getId(), Pipeline.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories)); + } catch (ElasticsearchParseException e) { + pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), e)); + exceptions.add(e); + } catch (Exception e) { + ElasticsearchParseException parseException = new ElasticsearchParseException( + "Error updating pipeline with id [" + pipeline.getId() + "]", e); + pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), parseException)); + exceptions.add(parseException); + } + } + this.pipelines = Collections.unmodifiableMap(pipelines); + ExceptionsHelper.rethrowAndSuppress(exceptions); + } + + private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) { + String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null; + String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown"; + String errorMessage = "pipeline with id [" + id + "] could not be loaded, caused by [" + e.getDetailedMessage() + "]"; + Processor failureProcessor = new AbstractProcessor(tag) { + @Override + public void execute(IngestDocument ingestDocument) { + throw new IllegalStateException(errorMessage); + } + + @Override + public String getType() { + return type; + } + }; + String description = "this is a place holder pipeline, because pipeline with id [" + id + "] could not be loaded"; + return new Pipeline(id, description, null, new CompoundProcessor(failureProcessor)); + } + + /** + * Stores the specified pipeline definition in the request. + */ + public void put(ClusterService clusterService, Map ingestInfos, PutPipelineRequest request, + ActionListener listener) throws Exception { + // validates the pipeline and processor configuration before submitting a cluster update task: + validatePipeline(ingestInfos, request); + clusterService.submitStateUpdateTask("put-pipeline-" + request.getId(), + new AckedClusterStateUpdateTask(request, listener) { + + @Override + protected WritePipelineResponse newResponse(boolean acknowledged) { + return new WritePipelineResponse(acknowledged); + } + + @Override + public ClusterState execute(ClusterState currentState) { + return innerPut(request, currentState); + } + }); + } + + void validatePipeline(Map ingestInfos, PutPipelineRequest request) throws Exception { + if (ingestInfos.isEmpty()) { + throw new IllegalStateException("Ingest info is empty"); + } + + Map pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); + Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories); + List exceptions = new ArrayList<>(); + for (Processor processor : pipeline.flattenAllProcessors()) { + for (Map.Entry entry : ingestInfos.entrySet()) { + if (entry.getValue().containsProcessor(processor.getType()) == false) { + String message = "Processor type [" + processor.getType() + "] is not installed on node [" + entry.getKey() + "]"; + exceptions.add( + ConfigurationUtils.newConfigurationException(processor.getType(), processor.getTag(), null, message) + ); + } + } + } + ExceptionsHelper.rethrowAndSuppress(exceptions); + } + + static ClusterState innerPut(PutPipelineRequest request, ClusterState currentState) { + IngestMetadata currentIngestMetadata = currentState.metaData().custom(IngestMetadata.TYPE); + Map pipelines; + if (currentIngestMetadata != null) { + pipelines = new HashMap<>(currentIngestMetadata.getPipelines()); + } else { + pipelines = new HashMap<>(); + } + + pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), request.getSource(), request.getXContentType())); + ClusterState.Builder newState = ClusterState.builder(currentState); + newState.metaData(MetaData.builder(currentState.getMetaData()) + .putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines)) + .build()); + return newState.build(); + } + + /** + * Returns the pipeline by the specified id + */ + public Pipeline get(String id) { + return pipelines.get(id); + } + + public Map getProcessorFactories() { + return processorFactories; + } + } + + private static final class PipelineExecutionService { + + private final PipelineStore store; + private final ThreadPool threadPool; + + private final StatsHolder totalStats = new StatsHolder(); + private volatile Map statsHolderPerPipeline = Collections.emptyMap(); + + PipelineExecutionService(PipelineStore store, ThreadPool threadPool) { + this.store = store; + this.threadPool = threadPool; + } + + void executeBulkRequest(Iterable> actionRequests, + BiConsumer itemFailureHandler, + Consumer completionHandler) { + threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() { + + @Override + public void onFailure(Exception e) { + completionHandler.accept(e); + } + + @Override + protected void doRun() { + for (DocWriteRequest actionRequest : actionRequests) { + IndexRequest indexRequest = null; + if (actionRequest instanceof IndexRequest) { + indexRequest = (IndexRequest) actionRequest; + } else if (actionRequest instanceof UpdateRequest) { + UpdateRequest updateRequest = (UpdateRequest) actionRequest; + indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest(); + } + if (indexRequest == null) { + continue; + } + String pipeline = indexRequest.getPipeline(); + if (NOOP_PIPELINE_NAME.equals(pipeline) == false) { + try { + innerExecute(indexRequest, getPipeline(indexRequest.getPipeline())); + //this shouldn't be needed here but we do it for consistency with index api + // which requires it to prevent double execution + indexRequest.setPipeline(NOOP_PIPELINE_NAME); + } catch (Exception e) { + itemFailureHandler.accept(indexRequest, e); + } + } + } + completionHandler.accept(null); + } + }); + } + + IngestStats stats() { + Map statsHolderPerPipeline = this.statsHolderPerPipeline; + + Map statsPerPipeline = new HashMap<>(statsHolderPerPipeline.size()); + for (Map.Entry entry : statsHolderPerPipeline.entrySet()) { + statsPerPipeline.put(entry.getKey(), entry.getValue().createStats()); + } + + return new IngestStats(totalStats.createStats(), statsPerPipeline); + } + + void updatePipelineStats(IngestMetadata ingestMetadata) { + boolean changed = false; + Map newStatsPerPipeline = new HashMap<>(statsHolderPerPipeline); + Iterator iterator = newStatsPerPipeline.keySet().iterator(); + while (iterator.hasNext()) { + String pipeline = iterator.next(); + if (ingestMetadata.getPipelines().containsKey(pipeline) == false) { + iterator.remove(); + changed = true; + } + } + for (String pipeline : ingestMetadata.getPipelines().keySet()) { + if (newStatsPerPipeline.containsKey(pipeline) == false) { + newStatsPerPipeline.put(pipeline, new StatsHolder()); + changed = true; + } + } + + if (changed) { + statsHolderPerPipeline = Collections.unmodifiableMap(newStatsPerPipeline); + } + } + + private void innerExecute(IndexRequest indexRequest, Pipeline pipeline) throws Exception { + if (pipeline.getProcessors().isEmpty()) { + return; + } + + long startTimeInNanos = System.nanoTime(); + // the pipeline specific stat holder may not exist and that is fine: + // (e.g. the pipeline may have been removed while we're ingesting a document + Optional pipelineStats = Optional.ofNullable(statsHolderPerPipeline.get(pipeline.getId())); + try { + totalStats.preIngest(); + pipelineStats.ifPresent(StatsHolder::preIngest); + String index = indexRequest.index(); + String type = indexRequest.type(); + String id = indexRequest.id(); + String routing = indexRequest.routing(); + Long version = indexRequest.version(); + VersionType versionType = indexRequest.versionType(); + Map sourceAsMap = indexRequest.sourceAsMap(); + IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, version, versionType, sourceAsMap); + pipeline.execute(ingestDocument); + + Map metadataMap = ingestDocument.extractMetadata(); + //it's fine to set all metadata fields all the time, as ingest document holds their starting values + //before ingestion, which might also get modified during ingestion. + indexRequest.index((String) metadataMap.get(IngestDocument.MetaData.INDEX)); + indexRequest.type((String) metadataMap.get(IngestDocument.MetaData.TYPE)); + indexRequest.id((String) metadataMap.get(IngestDocument.MetaData.ID)); + indexRequest.routing((String) metadataMap.get(IngestDocument.MetaData.ROUTING)); + indexRequest.version(((Number) metadataMap.get(IngestDocument.MetaData.VERSION)).longValue()); + if (metadataMap.get(IngestDocument.MetaData.VERSION_TYPE) != null) { + indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.MetaData.VERSION_TYPE))); + } + indexRequest.source(ingestDocument.getSourceAndMetadata()); + } catch (Exception e) { + totalStats.ingestFailed(); + pipelineStats.ifPresent(StatsHolder::ingestFailed); + throw e; + } finally { + long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos); + totalStats.postIngest(ingestTimeInMillis); + pipelineStats.ifPresent(statsHolder -> statsHolder.postIngest(ingestTimeInMillis)); + } + } + + private Pipeline getPipeline(String pipelineId) { + Pipeline pipeline = store.get(pipelineId); + if (pipeline == null) { + throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); + } + return pipeline; + } + + private static class StatsHolder { + + private final MeanMetric ingestMetric = new MeanMetric(); + private final CounterMetric ingestCurrent = new CounterMetric(); + private final CounterMetric ingestFailed = new CounterMetric(); + + void preIngest() { + ingestCurrent.inc(); + } + + void postIngest(long ingestTimeInMillis) { + ingestCurrent.dec(); + ingestMetric.inc(ingestTimeInMillis); + } + + void ingestFailed() { + ingestFailed.inc(); + } + + IngestStats.Stats createStats() { + return new IngestStats.Stats(ingestMetric.count(), ingestMetric.sum(), ingestCurrent.count(), ingestFailed.count()); + } + + } + + } } diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java index 1b0553a54902b..37dd3f52cb7d3 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -51,6 +51,27 @@ public Pipeline(String id, @Nullable String description, @Nullable Integer versi this.version = version; } + public static Pipeline create(String id, Map config, + Map processorFactories) throws Exception { + String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY); + Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null); + List> processorConfigs = ConfigurationUtils.readList(null, null, config, PROCESSORS_KEY); + List processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, processorFactories); + List> onFailureProcessorConfigs = + ConfigurationUtils.readOptionalList(null, null, config, ON_FAILURE_KEY); + List onFailureProcessors = ConfigurationUtils.readProcessorConfigs(onFailureProcessorConfigs, processorFactories); + if (config.isEmpty() == false) { + throw new ElasticsearchParseException("pipeline [" + id + + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray())); + } + if (onFailureProcessorConfigs != null && onFailureProcessors.isEmpty()) { + throw new ElasticsearchParseException("pipeline [" + id + "] cannot have an empty on_failure option defined"); + } + CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.unmodifiableList(processors), + Collections.unmodifiableList(onFailureProcessors)); + return new Pipeline(id, description, version, compoundProcessor); + } + /** * Modifies the data of a document to be indexed based on the processor this pipeline holds */ @@ -113,27 +134,4 @@ public List flattenAllProcessors() { return compoundProcessor.flattenProcessors(); } - public static final class Factory { - - public Pipeline create(String id, Map config, Map processorFactories) throws Exception { - String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY); - Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null); - List> processorConfigs = ConfigurationUtils.readList(null, null, config, PROCESSORS_KEY); - List processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, processorFactories); - List> onFailureProcessorConfigs = - ConfigurationUtils.readOptionalList(null, null, config, ON_FAILURE_KEY); - List onFailureProcessors = ConfigurationUtils.readProcessorConfigs(onFailureProcessorConfigs, processorFactories); - if (config.isEmpty() == false) { - throw new ElasticsearchParseException("pipeline [" + id + - "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray())); - } - if (onFailureProcessorConfigs != null && onFailureProcessors.isEmpty()) { - throw new ElasticsearchParseException("pipeline [" + id + "] cannot have an empty on_failure option defined"); - } - CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.unmodifiableList(processors), - Collections.unmodifiableList(onFailureProcessors)); - return new Pipeline(id, description, version, compoundProcessor); - } - - } } diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java b/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java deleted file mode 100644 index 56d44ee888122..0000000000000 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java +++ /dev/null @@ -1,215 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.ingest; - -import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterStateApplier; -import org.elasticsearch.common.metrics.CounterMetric; -import org.elasticsearch.common.metrics.MeanMetric; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.index.VersionType; -import org.elasticsearch.threadpool.ThreadPool; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.TimeUnit; -import java.util.function.BiConsumer; -import java.util.function.Consumer; - -public class PipelineExecutionService implements ClusterStateApplier { - - private final PipelineStore store; - private final ThreadPool threadPool; - - private final StatsHolder totalStats = new StatsHolder(); - private volatile Map statsHolderPerPipeline = Collections.emptyMap(); - - public PipelineExecutionService(PipelineStore store, ThreadPool threadPool) { - this.store = store; - this.threadPool = threadPool; - } - - public void executeBulkRequest(Iterable> actionRequests, - BiConsumer itemFailureHandler, - Consumer completionHandler) { - threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() { - - @Override - public void onFailure(Exception e) { - completionHandler.accept(e); - } - - @Override - protected void doRun() throws Exception { - for (DocWriteRequest actionRequest : actionRequests) { - IndexRequest indexRequest = null; - if (actionRequest instanceof IndexRequest) { - indexRequest = (IndexRequest) actionRequest; - } else if (actionRequest instanceof UpdateRequest) { - UpdateRequest updateRequest = (UpdateRequest) actionRequest; - indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest(); - } - if (indexRequest == null) { - continue; - } - String pipeline = indexRequest.getPipeline(); - if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) { - try { - innerExecute(indexRequest, getPipeline(indexRequest.getPipeline())); - //this shouldn't be needed here but we do it for consistency with index api - // which requires it to prevent double execution - indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); - } catch (Exception e) { - itemFailureHandler.accept(indexRequest, e); - } - } - } - completionHandler.accept(null); - } - }); - } - - public IngestStats stats() { - Map statsHolderPerPipeline = this.statsHolderPerPipeline; - - Map statsPerPipeline = new HashMap<>(statsHolderPerPipeline.size()); - for (Map.Entry entry : statsHolderPerPipeline.entrySet()) { - statsPerPipeline.put(entry.getKey(), entry.getValue().createStats()); - } - - return new IngestStats(totalStats.createStats(), statsPerPipeline); - } - - @Override - public void applyClusterState(ClusterChangedEvent event) { - IngestMetadata ingestMetadata = event.state().getMetaData().custom(IngestMetadata.TYPE); - if (ingestMetadata != null) { - updatePipelineStats(ingestMetadata); - } - } - - void updatePipelineStats(IngestMetadata ingestMetadata) { - boolean changed = false; - Map newStatsPerPipeline = new HashMap<>(statsHolderPerPipeline); - Iterator iterator = newStatsPerPipeline.keySet().iterator(); - while (iterator.hasNext()) { - String pipeline = iterator.next(); - if (ingestMetadata.getPipelines().containsKey(pipeline) == false) { - iterator.remove(); - changed = true; - } - } - for (String pipeline : ingestMetadata.getPipelines().keySet()) { - if (newStatsPerPipeline.containsKey(pipeline) == false) { - newStatsPerPipeline.put(pipeline, new StatsHolder()); - changed = true; - } - } - - if (changed) { - statsHolderPerPipeline = Collections.unmodifiableMap(newStatsPerPipeline); - } - } - - private void innerExecute(IndexRequest indexRequest, Pipeline pipeline) throws Exception { - if (pipeline.getProcessors().isEmpty()) { - return; - } - - long startTimeInNanos = System.nanoTime(); - // the pipeline specific stat holder may not exist and that is fine: - // (e.g. the pipeline may have been removed while we're ingesting a document - Optional pipelineStats = Optional.ofNullable(statsHolderPerPipeline.get(pipeline.getId())); - try { - totalStats.preIngest(); - pipelineStats.ifPresent(StatsHolder::preIngest); - String index = indexRequest.index(); - String type = indexRequest.type(); - String id = indexRequest.id(); - String routing = indexRequest.routing(); - Long version = indexRequest.version(); - VersionType versionType = indexRequest.versionType(); - Map sourceAsMap = indexRequest.sourceAsMap(); - IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, version, versionType, sourceAsMap); - pipeline.execute(ingestDocument); - - Map metadataMap = ingestDocument.extractMetadata(); - //it's fine to set all metadata fields all the time, as ingest document holds their starting values - //before ingestion, which might also get modified during ingestion. - indexRequest.index((String) metadataMap.get(IngestDocument.MetaData.INDEX)); - indexRequest.type((String) metadataMap.get(IngestDocument.MetaData.TYPE)); - indexRequest.id((String) metadataMap.get(IngestDocument.MetaData.ID)); - indexRequest.routing((String) metadataMap.get(IngestDocument.MetaData.ROUTING)); - indexRequest.version(((Number) metadataMap.get(IngestDocument.MetaData.VERSION)).longValue()); - if (metadataMap.get(IngestDocument.MetaData.VERSION_TYPE) != null) { - indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.MetaData.VERSION_TYPE))); - } - indexRequest.source(ingestDocument.getSourceAndMetadata()); - } catch (Exception e) { - totalStats.ingestFailed(); - pipelineStats.ifPresent(StatsHolder::ingestFailed); - throw e; - } finally { - long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos); - totalStats.postIngest(ingestTimeInMillis); - pipelineStats.ifPresent(statsHolder -> statsHolder.postIngest(ingestTimeInMillis)); - } - } - - private Pipeline getPipeline(String pipelineId) { - Pipeline pipeline = store.get(pipelineId); - if (pipeline == null) { - throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); - } - return pipeline; - } - - static class StatsHolder { - - private final MeanMetric ingestMetric = new MeanMetric(); - private final CounterMetric ingestCurrent = new CounterMetric(); - private final CounterMetric ingestFailed = new CounterMetric(); - - void preIngest() { - ingestCurrent.inc(); - } - - void postIngest(long ingestTimeInMillis) { - ingestCurrent.dec(); - ingestMetric.inc(ingestTimeInMillis); - } - - void ingestFailed() { - ingestFailed.inc(); - } - - IngestStats.Stats createStats() { - return new IngestStats.Stats(ingestMetric.count(), ingestMetric.sum(), ingestCurrent.count(), ingestFailed.count()); - } - - } - -} diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineStore.java b/server/src/main/java/org/elasticsearch/ingest/PipelineStore.java deleted file mode 100644 index c6dce0bd45b3c..0000000000000 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineStore.java +++ /dev/null @@ -1,275 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.ingest; - -import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.ResourceNotFoundException; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ingest.DeletePipelineRequest; -import org.elasticsearch.action.ingest.PutPipelineRequest; -import org.elasticsearch.action.ingest.WritePipelineResponse; -import org.elasticsearch.cluster.AckedClusterStateUpdateTask; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateApplier; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.regex.Regex; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.gateway.GatewayService; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; - -public class PipelineStore extends AbstractComponent implements ClusterStateApplier { - - private final Pipeline.Factory factory = new Pipeline.Factory(); - private final Map processorFactories; - - // Ideally this should be in IngestMetadata class, but we don't have the processor factories around there. - // We know of all the processor factories when a node with all its plugin have been initialized. Also some - // processor factories rely on other node services. Custom metadata is statically registered when classes - // are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around. - volatile Map pipelines = new HashMap<>(); - - public PipelineStore(Settings settings, Map processorFactories) { - super(settings); - this.processorFactories = processorFactories; - } - - @Override - public void applyClusterState(ClusterChangedEvent event) { - innerUpdatePipelines(event.previousState(), event.state()); - } - - void innerUpdatePipelines(ClusterState previousState, ClusterState state) { - if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { - return; - } - - IngestMetadata ingestMetadata = state.getMetaData().custom(IngestMetadata.TYPE); - IngestMetadata previousIngestMetadata = previousState.getMetaData().custom(IngestMetadata.TYPE); - if (Objects.equals(ingestMetadata, previousIngestMetadata)) { - return; - } - - Map pipelines = new HashMap<>(); - List exceptions = new ArrayList<>(); - for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) { - try { - pipelines.put(pipeline.getId(), factory.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories)); - } catch (ElasticsearchParseException e) { - pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), e)); - exceptions.add(e); - } catch (Exception e) { - ElasticsearchParseException parseException = new ElasticsearchParseException( - "Error updating pipeline with id [" + pipeline.getId() + "]", e); - pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), parseException)); - exceptions.add(parseException); - } - } - this.pipelines = Collections.unmodifiableMap(pipelines); - ExceptionsHelper.rethrowAndSuppress(exceptions); - } - - private Pipeline substitutePipeline(String id, ElasticsearchParseException e) { - String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null; - String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown"; - String errorMessage = "pipeline with id [" + id + "] could not be loaded, caused by [" + e.getDetailedMessage() + "]"; - Processor failureProcessor = new AbstractProcessor(tag) { - @Override - public void execute(IngestDocument ingestDocument) { - throw new IllegalStateException(errorMessage); - } - - @Override - public String getType() { - return type; - } - }; - String description = "this is a place holder pipeline, because pipeline with id [" + id + "] could not be loaded"; - return new Pipeline(id, description, null, new CompoundProcessor(failureProcessor)); - } - - /** - * Deletes the pipeline specified by id in the request. - */ - public void delete(ClusterService clusterService, DeletePipelineRequest request, ActionListener listener) { - clusterService.submitStateUpdateTask("delete-pipeline-" + request.getId(), - new AckedClusterStateUpdateTask(request, listener) { - - @Override - protected WritePipelineResponse newResponse(boolean acknowledged) { - return new WritePipelineResponse(acknowledged); - } - - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - return innerDelete(request, currentState); - } - }); - } - - ClusterState innerDelete(DeletePipelineRequest request, ClusterState currentState) { - IngestMetadata currentIngestMetadata = currentState.metaData().custom(IngestMetadata.TYPE); - if (currentIngestMetadata == null) { - return currentState; - } - Map pipelines = currentIngestMetadata.getPipelines(); - Set toRemove = new HashSet<>(); - for (String pipelineKey : pipelines.keySet()) { - if (Regex.simpleMatch(request.getId(), pipelineKey)) { - toRemove.add(pipelineKey); - } - } - if (toRemove.isEmpty() && Regex.isMatchAllPattern(request.getId()) == false) { - throw new ResourceNotFoundException("pipeline [{}] is missing", request.getId()); - } else if (toRemove.isEmpty()) { - return currentState; - } - final Map pipelinesCopy = new HashMap<>(pipelines); - for (String key : toRemove) { - pipelinesCopy.remove(key); - } - ClusterState.Builder newState = ClusterState.builder(currentState); - newState.metaData(MetaData.builder(currentState.getMetaData()) - .putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelinesCopy)) - .build()); - return newState.build(); - } - - /** - * Stores the specified pipeline definition in the request. - */ - public void put(ClusterService clusterService, Map ingestInfos, PutPipelineRequest request, - ActionListener listener) throws Exception { - // validates the pipeline and processor configuration before submitting a cluster update task: - validatePipeline(ingestInfos, request); - clusterService.submitStateUpdateTask("put-pipeline-" + request.getId(), - new AckedClusterStateUpdateTask(request, listener) { - - @Override - protected WritePipelineResponse newResponse(boolean acknowledged) { - return new WritePipelineResponse(acknowledged); - } - - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - return innerPut(request, currentState); - } - }); - } - - void validatePipeline(Map ingestInfos, PutPipelineRequest request) throws Exception { - if (ingestInfos.isEmpty()) { - throw new IllegalStateException("Ingest info is empty"); - } - - Map pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); - Pipeline pipeline = factory.create(request.getId(), pipelineConfig, processorFactories); - List exceptions = new ArrayList<>(); - for (Processor processor : pipeline.flattenAllProcessors()) { - for (Map.Entry entry : ingestInfos.entrySet()) { - if (entry.getValue().containsProcessor(processor.getType()) == false) { - String message = "Processor type [" + processor.getType() + "] is not installed on node [" + entry.getKey() + "]"; - exceptions.add(ConfigurationUtils.newConfigurationException(processor.getType(), processor.getTag(), null, message)); - } - } - } - ExceptionsHelper.rethrowAndSuppress(exceptions); - } - - ClusterState innerPut(PutPipelineRequest request, ClusterState currentState) { - IngestMetadata currentIngestMetadata = currentState.metaData().custom(IngestMetadata.TYPE); - Map pipelines; - if (currentIngestMetadata != null) { - pipelines = new HashMap<>(currentIngestMetadata.getPipelines()); - } else { - pipelines = new HashMap<>(); - } - - pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), request.getSource(), request.getXContentType())); - ClusterState.Builder newState = ClusterState.builder(currentState); - newState.metaData(MetaData.builder(currentState.getMetaData()) - .putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines)) - .build()); - return newState.build(); - } - - /** - * Returns the pipeline by the specified id - */ - public Pipeline get(String id) { - return pipelines.get(id); - } - - public Map getProcessorFactories() { - return processorFactories; - } - - /** - * @return pipeline configuration specified by id. If multiple ids or wildcards are specified multiple pipelines - * may be returned - */ - // Returning PipelineConfiguration instead of Pipeline, because Pipeline and Processor interface don't - // know how to serialize themselves. - public List getPipelines(ClusterState clusterState, String... ids) { - IngestMetadata ingestMetadata = clusterState.getMetaData().custom(IngestMetadata.TYPE); - return innerGetPipelines(ingestMetadata, ids); - } - - List innerGetPipelines(IngestMetadata ingestMetadata, String... ids) { - if (ingestMetadata == null) { - return Collections.emptyList(); - } - - // if we didn't ask for _any_ ID, then we get them all (this is the same as if they ask for '*') - if (ids.length == 0) { - return new ArrayList<>(ingestMetadata.getPipelines().values()); - } - - List result = new ArrayList<>(ids.length); - for (String id : ids) { - if (Regex.isSimpleMatchPattern(id)) { - for (Map.Entry entry : ingestMetadata.getPipelines().entrySet()) { - if (Regex.simpleMatch(id, entry.getKey())) { - result.add(entry.getValue()); - } - } - } else { - PipelineConfiguration pipeline = ingestMetadata.getPipelines().get(id); - if (pipeline != null) { - result.add(pipeline); - } - } - } - return result; - } -} diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index c65488bd08eb7..9dfd8d2a382a5 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -352,7 +352,7 @@ protected Node(final Environment environment, Collection final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool); clusterService.addStateApplier(scriptModule.getScriptService()); resourcesToClose.add(clusterService); - final IngestService ingestService = new IngestService(settings, threadPool, this.environment, + final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment, scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class)); final DiskThresholdMonitor listener = new DiskThresholdMonitor(settings, clusterService::state, clusterService.getClusterSettings(), client); diff --git a/server/src/main/java/org/elasticsearch/node/NodeService.java b/server/src/main/java/org/elasticsearch/node/NodeService.java index 0e19b5a650221..207886c5cf263 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeService.java +++ b/server/src/main/java/org/elasticsearch/node/NodeService.java @@ -37,7 +37,6 @@ import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.monitor.MonitorService; -import org.elasticsearch.node.ResponseCollectorService; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; @@ -83,8 +82,7 @@ public class NodeService extends AbstractComponent implements Closeable { this.scriptService = scriptService; this.responseCollectorService = responseCollectorService; this.searchTransportService = searchTransportService; - clusterService.addStateApplier(ingestService.getPipelineStore()); - clusterService.addStateApplier(ingestService.getPipelineExecutionService()); + clusterService.addStateApplier(ingestService); } public NodeInfo info(boolean settings, boolean os, boolean process, boolean jvm, boolean threadPool, @@ -120,7 +118,7 @@ public NodeStats stats(CommonStatsFlags indices, boolean os, boolean process, bo circuitBreaker ? circuitBreakerService.stats() : null, script ? scriptService.stats() : null, discoveryStats ? discovery.stats() : null, - ingest ? ingestService.getPipelineExecutionService().stats() : null, + ingest ? ingestService.stats() : null, adaptiveSelection ? responseCollectorService.getAdaptiveStats(searchTransportService.getPendingSearchRequests()) : null ); } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index 0bc813249f5c0..64e86c7e1bd1a 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -40,7 +40,6 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.ingest.IngestService; -import org.elasticsearch.ingest.PipelineExecutionService; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -82,9 +81,6 @@ public class TransportBulkActionIngestTests extends ESTestCase { ClusterService clusterService; IngestService ingestService; - /** The ingest execution service we can capture calls to */ - PipelineExecutionService executionService; - /** Arguments to callbacks we want to capture, but which require generics, so we must use @Captor */ @Captor ArgumentCaptor> failureHandler; @@ -180,8 +176,6 @@ public void setupAction() { }).when(clusterService).addStateApplier(any(ClusterStateApplier.class)); // setup the mocked ingest service for capturing calls ingestService = mock(IngestService.class); - executionService = mock(PipelineExecutionService.class); - when(ingestService.getPipelineExecutionService()).thenReturn(executionService); action = new TestTransportBulkAction(); singleItemBulkWriteAction = new TestSingleItemBulkWriteAction(action); reset(transportService); // call on construction of action @@ -238,7 +232,7 @@ public void testIngestLocal() throws Exception { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(executionService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture()); + verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture()); completionHandler.getValue().accept(exception); assertTrue(failureCalled.get()); @@ -272,7 +266,7 @@ public void testSingleItemBulkActionIngestLocal() throws Exception { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(executionService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture()); + verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture()); completionHandler.getValue().accept(exception); assertTrue(failureCalled.get()); @@ -304,7 +298,7 @@ public void testIngestForward() throws Exception { action.execute(null, bulkRequest, listener); // should not have executed ingest locally - verify(executionService, never()).executeBulkRequest(any(), any(), any()); + verify(ingestService, never()).executeBulkRequest(any(), any(), any()); // but instead should have sent to a remote node with the transport service ArgumentCaptor node = ArgumentCaptor.forClass(DiscoveryNode.class); verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture()); @@ -348,7 +342,7 @@ public void testSingleItemBulkActionIngestForward() throws Exception { singleItemBulkWriteAction.execute(null, indexRequest, listener); // should not have executed ingest locally - verify(executionService, never()).executeBulkRequest(any(), any(), any()); + verify(ingestService, never()).executeBulkRequest(any(), any(), any()); // but instead should have sent to a remote node with the transport service ArgumentCaptor node = ArgumentCaptor.forClass(DiscoveryNode.class); verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture()); @@ -396,7 +390,7 @@ public void testUseDefaultPipeline() throws Exception { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(executionService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture()); + verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture()); completionHandler.getValue().accept(exception); assertTrue(failureCalled.get()); diff --git a/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java b/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java index b0c6d717bb38e..1711d16891083 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java @@ -31,8 +31,8 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.ingest.CompoundProcessor; import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.Pipeline; -import org.elasticsearch.ingest.PipelineStore; import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.TestProcessor; import org.elasticsearch.test.ESTestCase; @@ -53,7 +53,7 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase { - private PipelineStore store; + private IngestService ingestService; @Before public void init() throws IOException { @@ -62,9 +62,9 @@ public void init() throws IOException { Pipeline pipeline = new Pipeline(SIMULATED_PIPELINE_ID, null, null, pipelineCompoundProcessor); Map registry = Collections.singletonMap("mock_processor", (factories, tag, config) -> processor); - store = mock(PipelineStore.class); - when(store.get(SIMULATED_PIPELINE_ID)).thenReturn(pipeline); - when(store.getProcessorFactories()).thenReturn(registry); + ingestService = mock(IngestService.class); + when(ingestService.getPipeline(SIMULATED_PIPELINE_ID)).thenReturn(pipeline); + when(ingestService.getProcessorFactories()).thenReturn(registry); } public void testParseUsingPipelineStore() throws Exception { @@ -94,7 +94,8 @@ public void testParseUsingPipelineStore() throws Exception { expectedDocs.add(expectedDoc); } - SimulatePipelineRequest.Parsed actualRequest = SimulatePipelineRequest.parseWithPipelineId(SIMULATED_PIPELINE_ID, requestContent, false, store); + SimulatePipelineRequest.Parsed actualRequest = + SimulatePipelineRequest.parseWithPipelineId(SIMULATED_PIPELINE_ID, requestContent, false, ingestService); assertThat(actualRequest.isVerbose(), equalTo(false)); assertThat(actualRequest.getDocuments().size(), equalTo(numDocs)); Iterator> expectedDocsIterator = expectedDocs.iterator(); @@ -182,7 +183,7 @@ public void testParseWithProvidedPipeline() throws Exception { requestContent.put(Fields.PIPELINE, pipelineConfig); - SimulatePipelineRequest.Parsed actualRequest = SimulatePipelineRequest.parse(requestContent, false, store); + SimulatePipelineRequest.Parsed actualRequest = SimulatePipelineRequest.parse(requestContent, false, ingestService); assertThat(actualRequest.isVerbose(), equalTo(false)); assertThat(actualRequest.getDocuments().size(), equalTo(numDocs)); Iterator> expectedDocsIterator = expectedDocs.iterator(); @@ -208,7 +209,7 @@ public void testNullPipelineId() { List> docs = new ArrayList<>(); requestContent.put(Fields.DOCS, docs); Exception e = expectThrows(IllegalArgumentException.class, - () -> SimulatePipelineRequest.parseWithPipelineId(null, requestContent, false, store)); + () -> SimulatePipelineRequest.parseWithPipelineId(null, requestContent, false, ingestService)); assertThat(e.getMessage(), equalTo("param [pipeline] is null")); } @@ -218,7 +219,7 @@ public void testNonExistentPipelineId() { List> docs = new ArrayList<>(); requestContent.put(Fields.DOCS, docs); Exception e = expectThrows(IllegalArgumentException.class, - () -> SimulatePipelineRequest.parseWithPipelineId(pipelineId, requestContent, false, store)); + () -> SimulatePipelineRequest.parseWithPipelineId(pipelineId, requestContent, false, ingestService)); assertThat(e.getMessage(), equalTo("pipeline [" + pipelineId + "] does not exist")); } } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java b/server/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java index 84d9327a0910a..f4d8034894bad 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java @@ -97,12 +97,12 @@ public void testFailStartNode() throws Exception { WritePipelineResponse response = client().admin().cluster().preparePutPipeline("_id", pipelineSource, XContentType.JSON).get(); assertThat(response.isAcknowledged(), is(true)); - Pipeline pipeline = internalCluster().getInstance(NodeService.class, node1).getIngestService().getPipelineStore().get("_id"); + Pipeline pipeline = internalCluster().getInstance(NodeService.class, node1).getIngestService().getPipeline("_id"); assertThat(pipeline, notNullValue()); installPlugin = false; String node2 = internalCluster().startNode(); - pipeline = internalCluster().getInstance(NodeService.class, node2).getIngestService().getPipelineStore().get("_id"); + pipeline = internalCluster().getInstance(NodeService.class, node2).getIngestService().getPipeline("_id"); assertNotNull(pipeline); assertThat(pipeline.getId(), equalTo("_id")); diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index c0353acb7f9d0..f6eb56b071e9f 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -21,16 +21,69 @@ import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; import java.util.Map; - -import org.elasticsearch.common.settings.Settings; +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.Version; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.ingest.DeletePipelineRequest; +import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.Requests; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.VersionType; import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.mockito.Mockito; +import org.hamcrest.CustomTypeSafeMatcher; +import org.mockito.ArgumentMatcher; +import org.mockito.invocation.InvocationOnMock; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class IngestServiceTests extends ESTestCase { - private final IngestPlugin DUMMY_PLUGIN = new IngestPlugin() { + + private static final IngestPlugin DUMMY_PLUGIN = new IngestPlugin() { @Override public Map getProcessors(Processor.Parameters parameters) { return Collections.singletonMap("foo", (factories, tag, config) -> null); @@ -38,19 +91,812 @@ public Map getProcessors(Processor.Parameters paramet }; public void testIngestPlugin() { - ThreadPool tp = Mockito.mock(ThreadPool.class); - IngestService ingestService = new IngestService(Settings.EMPTY, tp, null, null, + ThreadPool tp = mock(ThreadPool.class); + IngestService ingestService = new IngestService(mock(ClusterService.class), tp, null, null, null, Collections.singletonList(DUMMY_PLUGIN)); - Map factories = ingestService.getPipelineStore().getProcessorFactories(); + Map factories = ingestService.getProcessorFactories(); assertTrue(factories.containsKey("foo")); assertEquals(1, factories.size()); } public void testIngestPluginDuplicate() { - ThreadPool tp = Mockito.mock(ThreadPool.class); + ThreadPool tp = mock(ThreadPool.class); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> - new IngestService(Settings.EMPTY, tp, null, null, + new IngestService(mock(ClusterService.class), tp, null, null, null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN))); assertTrue(e.getMessage(), e.getMessage().contains("already registered")); } + + public void testExecuteIndexPipelineDoesNotExist() { + ThreadPool threadPool = mock(ThreadPool.class); + final ExecutorService executorService = EsExecutors.newDirectExecutorService(); + when(threadPool.executor(anyString())).thenReturn(executorService); + IngestService ingestService = new IngestService(mock(ClusterService.class), threadPool, null, null, + null, Collections.singletonList(DUMMY_PLUGIN)); + final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); + + final SetOnce failure = new SetOnce<>(); + final BiConsumer failureHandler = (request, e) -> { + failure.set(true); + assertThat(request, sameInstance(indexRequest)); + assertThat(e, instanceOf(IllegalArgumentException.class)); + assertThat(e.getMessage(), equalTo("pipeline with id [_id] does not exist")); + }; + + @SuppressWarnings("unchecked") + final Consumer completionHandler = mock(Consumer.class); + + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + + assertTrue(failure.get()); + verify(completionHandler, times(1)).accept(null); + } + + public void testUpdatePipelines() { + IngestService ingestService = createWithProcessors(); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousClusterState = clusterState; + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + assertThat(ingestService.pipelines().size(), is(0)); + + PipelineConfiguration pipeline = new PipelineConfiguration( + "_id",new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"), XContentType.JSON + ); + IngestMetadata ingestMetadata = new IngestMetadata(Collections.singletonMap("_id", pipeline)); + clusterState = ClusterState.builder(clusterState) + .metaData(MetaData.builder().putCustom(IngestMetadata.TYPE, ingestMetadata)) + .build(); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + assertThat(ingestService.pipelines().size(), is(1)); + assertThat(ingestService.pipelines().get("_id").getId(), equalTo("_id")); + assertThat(ingestService.pipelines().get("_id").getDescription(), nullValue()); + assertThat(ingestService.pipelines().get("_id").getProcessors().size(), equalTo(1)); + assertThat(ingestService.pipelines().get("_id").getProcessors().get(0).getType(), equalTo("set")); + } + + public void testDelete() { + IngestService ingestService = createWithProcessors(); + PipelineConfiguration config = new PipelineConfiguration( + "_id",new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"), XContentType.JSON + ); + IngestMetadata ingestMetadata = new IngestMetadata(Collections.singletonMap("_id", config)); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousClusterState = clusterState; + clusterState = ClusterState.builder(clusterState).metaData(MetaData.builder() + .putCustom(IngestMetadata.TYPE, ingestMetadata)).build(); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + assertThat(ingestService.getPipeline("_id"), notNullValue()); + + // Delete pipeline: + DeletePipelineRequest deleteRequest = new DeletePipelineRequest("_id"); + previousClusterState = clusterState; + clusterState = IngestService.innerDelete(deleteRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + assertThat(ingestService.getPipeline("_id"), nullValue()); + + // Delete existing pipeline: + try { + IngestService.innerDelete(deleteRequest, clusterState); + fail("exception expected"); + } catch (ResourceNotFoundException e) { + assertThat(e.getMessage(), equalTo("pipeline [_id] is missing")); + } + } + + public void testValidateNoIngestInfo() throws Exception { + IngestService ingestService = createWithProcessors(); + PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray( + "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"), XContentType.JSON); + Exception e = expectThrows(IllegalStateException.class, () -> ingestService.validatePipeline(emptyMap(), putRequest)); + assertEquals("Ingest info is empty", e.getMessage()); + + DiscoveryNode discoveryNode = new DiscoveryNode("_node_id", buildNewFakeTransportAddress(), + emptyMap(), emptySet(), Version.CURRENT); + IngestInfo ingestInfo = new IngestInfo(Collections.singletonList(new ProcessorInfo("set"))); + ingestService.validatePipeline(Collections.singletonMap(discoveryNode, ingestInfo), putRequest); + } + + public void testCrud() throws Exception { + IngestService ingestService = createWithProcessors(); + String id = "_id"; + Pipeline pipeline = ingestService.getPipeline(id); + assertThat(pipeline, nullValue()); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + + PutPipelineRequest putRequest = new PutPipelineRequest(id, + new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"), XContentType.JSON); + ClusterState previousClusterState = clusterState; + clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + pipeline = ingestService.getPipeline(id); + assertThat(pipeline, notNullValue()); + assertThat(pipeline.getId(), equalTo(id)); + assertThat(pipeline.getDescription(), nullValue()); + assertThat(pipeline.getProcessors().size(), equalTo(1)); + assertThat(pipeline.getProcessors().get(0).getType(), equalTo("set")); + + DeletePipelineRequest deleteRequest = new DeletePipelineRequest(id); + previousClusterState = clusterState; + clusterState = IngestService.innerDelete(deleteRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + pipeline = ingestService.getPipeline(id); + assertThat(pipeline, nullValue()); + } + + public void testPut() { + IngestService ingestService = createWithProcessors(); + String id = "_id"; + Pipeline pipeline = ingestService.getPipeline(id); + assertThat(pipeline, nullValue()); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + + // add a new pipeline: + PutPipelineRequest putRequest = new PutPipelineRequest(id, new BytesArray("{\"processors\": []}"), XContentType.JSON); + ClusterState previousClusterState = clusterState; + clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + pipeline = ingestService.getPipeline(id); + assertThat(pipeline, notNullValue()); + assertThat(pipeline.getId(), equalTo(id)); + assertThat(pipeline.getDescription(), nullValue()); + assertThat(pipeline.getProcessors().size(), equalTo(0)); + + // overwrite existing pipeline: + putRequest = + new PutPipelineRequest(id, new BytesArray("{\"processors\": [], \"description\": \"_description\"}"), XContentType.JSON); + previousClusterState = clusterState; + clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + pipeline = ingestService.getPipeline(id); + assertThat(pipeline, notNullValue()); + assertThat(pipeline.getId(), equalTo(id)); + assertThat(pipeline.getDescription(), equalTo("_description")); + assertThat(pipeline.getProcessors().size(), equalTo(0)); + } + + public void testPutWithErrorResponse() { + IngestService ingestService = createWithProcessors(); + String id = "_id"; + Pipeline pipeline = ingestService.getPipeline(id); + assertThat(pipeline, nullValue()); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + + PutPipelineRequest putRequest = + new PutPipelineRequest(id, new BytesArray("{\"description\": \"empty processors\"}"), XContentType.JSON); + ClusterState previousClusterState = clusterState; + clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState); + try { + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + fail("should fail"); + } catch (ElasticsearchParseException e) { + assertThat(e.getMessage(), equalTo("[processors] required property is missing")); + } + pipeline = ingestService.getPipeline(id); + assertNotNull(pipeline); + assertThat(pipeline.getId(), equalTo("_id")); + assertThat(pipeline.getDescription(), equalTo("this is a place holder pipeline, because pipeline with" + + " id [_id] could not be loaded")); + assertThat(pipeline.getProcessors().size(), equalTo(1)); + assertNull(pipeline.getProcessors().get(0).getTag()); + assertThat(pipeline.getProcessors().get(0).getType(), equalTo("unknown")); + } + + public void testDeleteUsingWildcard() { + IngestService ingestService = createWithProcessors(); + HashMap pipelines = new HashMap<>(); + BytesArray definition = new BytesArray( + "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}" + ); + pipelines.put("p1", new PipelineConfiguration("p1", definition, XContentType.JSON)); + pipelines.put("p2", new PipelineConfiguration("p2", definition, XContentType.JSON)); + pipelines.put("q1", new PipelineConfiguration("q1", definition, XContentType.JSON)); + IngestMetadata ingestMetadata = new IngestMetadata(pipelines); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousClusterState = clusterState; + clusterState = ClusterState.builder(clusterState).metaData(MetaData.builder() + .putCustom(IngestMetadata.TYPE, ingestMetadata)).build(); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + assertThat(ingestService.getPipeline("p1"), notNullValue()); + assertThat(ingestService.getPipeline("p2"), notNullValue()); + assertThat(ingestService.getPipeline("q1"), notNullValue()); + + // Delete pipeline matching wildcard + DeletePipelineRequest deleteRequest = new DeletePipelineRequest("p*"); + previousClusterState = clusterState; + clusterState = IngestService.innerDelete(deleteRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + assertThat(ingestService.getPipeline("p1"), nullValue()); + assertThat(ingestService.getPipeline("p2"), nullValue()); + assertThat(ingestService.getPipeline("q1"), notNullValue()); + + // Exception if we used name which does not exist + try { + IngestService.innerDelete(new DeletePipelineRequest("unknown"), clusterState); + fail("exception expected"); + } catch (ResourceNotFoundException e) { + assertThat(e.getMessage(), equalTo("pipeline [unknown] is missing")); + } + + // match all wildcard works on last remaining pipeline + DeletePipelineRequest matchAllDeleteRequest = new DeletePipelineRequest("*"); + previousClusterState = clusterState; + clusterState = IngestService.innerDelete(matchAllDeleteRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + assertThat(ingestService.getPipeline("p1"), nullValue()); + assertThat(ingestService.getPipeline("p2"), nullValue()); + assertThat(ingestService.getPipeline("q1"), nullValue()); + + // match all wildcard does not throw exception if none match + IngestService.innerDelete(matchAllDeleteRequest, clusterState); + } + + public void testDeleteWithExistingUnmatchedPipelines() { + IngestService ingestService = createWithProcessors(); + HashMap pipelines = new HashMap<>(); + BytesArray definition = new BytesArray( + "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}" + ); + pipelines.put("p1", new PipelineConfiguration("p1", definition, XContentType.JSON)); + IngestMetadata ingestMetadata = new IngestMetadata(pipelines); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousClusterState = clusterState; + clusterState = ClusterState.builder(clusterState).metaData(MetaData.builder() + .putCustom(IngestMetadata.TYPE, ingestMetadata)).build(); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + assertThat(ingestService.getPipeline("p1"), notNullValue()); + + DeletePipelineRequest deleteRequest = new DeletePipelineRequest("z*"); + try { + IngestService.innerDelete(deleteRequest, clusterState); + fail("exception expected"); + } catch (ResourceNotFoundException e) { + assertThat(e.getMessage(), equalTo("pipeline [z*] is missing")); + } + } + + public void testGetPipelines() { + Map configs = new HashMap<>(); + configs.put("_id1", new PipelineConfiguration( + "_id1", new BytesArray("{\"processors\": []}"), XContentType.JSON + )); + configs.put("_id2", new PipelineConfiguration( + "_id2", new BytesArray("{\"processors\": []}"), XContentType.JSON + )); + + assertThat(IngestService.innerGetPipelines(null, "_id1").isEmpty(), is(true)); + + IngestMetadata ingestMetadata = new IngestMetadata(configs); + List pipelines = IngestService.innerGetPipelines(ingestMetadata, "_id1"); + assertThat(pipelines.size(), equalTo(1)); + assertThat(pipelines.get(0).getId(), equalTo("_id1")); + + pipelines = IngestService.innerGetPipelines(ingestMetadata, "_id1", "_id2"); + assertThat(pipelines.size(), equalTo(2)); + assertThat(pipelines.get(0).getId(), equalTo("_id1")); + assertThat(pipelines.get(1).getId(), equalTo("_id2")); + + pipelines = IngestService.innerGetPipelines(ingestMetadata, "_id*"); + pipelines.sort(Comparator.comparing(PipelineConfiguration::getId)); + assertThat(pipelines.size(), equalTo(2)); + assertThat(pipelines.get(0).getId(), equalTo("_id1")); + assertThat(pipelines.get(1).getId(), equalTo("_id2")); + + // get all variants: (no IDs or '*') + pipelines = IngestService.innerGetPipelines(ingestMetadata); + pipelines.sort(Comparator.comparing(PipelineConfiguration::getId)); + assertThat(pipelines.size(), equalTo(2)); + assertThat(pipelines.get(0).getId(), equalTo("_id1")); + assertThat(pipelines.get(1).getId(), equalTo("_id2")); + + pipelines = IngestService.innerGetPipelines(ingestMetadata, "*"); + pipelines.sort(Comparator.comparing(PipelineConfiguration::getId)); + assertThat(pipelines.size(), equalTo(2)); + assertThat(pipelines.get(0).getId(), equalTo("_id1")); + assertThat(pipelines.get(1).getId(), equalTo("_id2")); + } + + public void testValidate() throws Exception { + IngestService ingestService = createWithProcessors(); + PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray( + "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\", \"tag\": \"tag1\"}}," + + "{\"remove\" : {\"field\": \"_field\", \"tag\": \"tag2\"}}]}"), + XContentType.JSON); + + DiscoveryNode node1 = new DiscoveryNode("_node_id1", buildNewFakeTransportAddress(), + emptyMap(), emptySet(), Version.CURRENT); + DiscoveryNode node2 = new DiscoveryNode("_node_id2", buildNewFakeTransportAddress(), + emptyMap(), emptySet(), Version.CURRENT); + Map ingestInfos = new HashMap<>(); + ingestInfos.put(node1, new IngestInfo(Arrays.asList(new ProcessorInfo("set"), new ProcessorInfo("remove")))); + ingestInfos.put(node2, new IngestInfo(Arrays.asList(new ProcessorInfo("set")))); + + ElasticsearchParseException e = + expectThrows(ElasticsearchParseException.class, () -> ingestService.validatePipeline(ingestInfos, putRequest)); + assertEquals("Processor type [remove] is not installed on node [" + node2 + "]", e.getMessage()); + assertEquals("remove", e.getHeader("processor_type").get(0)); + assertEquals("tag2", e.getHeader("processor_tag").get(0)); + + ingestInfos.put(node2, new IngestInfo(Arrays.asList(new ProcessorInfo("set"), new ProcessorInfo("remove")))); + ingestService.validatePipeline(ingestInfos, putRequest); + } + + public void testExecuteIndexPipelineExistsButFailedParsing() { + IngestService ingestService = createWithProcessors(Collections.singletonMap( + "mock", (factories, tag, config) -> new AbstractProcessor("mock") { + @Override + public void execute(IngestDocument ingestDocument) { + throw new IllegalStateException("error"); + } + + @Override + public String getType() { + return null; + } + } + )); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + String id = "_id"; + PutPipelineRequest putRequest = new PutPipelineRequest(id, + new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); + ClusterState previousClusterState = clusterState; + clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + final SetOnce failure = new SetOnce<>(); + final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline(id); + final BiConsumer failureHandler = (request, e) -> { + assertThat(e.getCause(), instanceOf(IllegalArgumentException.class)); + assertThat(e.getCause().getCause(), instanceOf(IllegalStateException.class)); + assertThat(e.getCause().getCause().getMessage(), equalTo("error")); + failure.set(true); + }; + + @SuppressWarnings("unchecked") + final Consumer completionHandler = mock(Consumer.class); + + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + + assertTrue(failure.get()); + verify(completionHandler, times(1)).accept(null); + } + + public void testExecuteBulkPipelineDoesNotExist() { + IngestService ingestService = createWithProcessors(Collections.singletonMap( + "mock", (factories, tag, config) -> mock(CompoundProcessor.class))); + + PutPipelineRequest putRequest = new PutPipelineRequest("_id", + new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + ClusterState previousClusterState = clusterState; + clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + + BulkRequest bulkRequest = new BulkRequest(); + + IndexRequest indexRequest1 = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); + bulkRequest.add(indexRequest1); + IndexRequest indexRequest2 = + new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("does_not_exist"); + bulkRequest.add(indexRequest2); + @SuppressWarnings("unchecked") + BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + Consumer completionHandler = mock(Consumer.class); + ingestService.executeBulkRequest(bulkRequest.requests(), failureHandler, completionHandler); + verify(failureHandler, times(1)).accept( + argThat(new CustomTypeSafeMatcher("failure handler was not called with the expected arguments") { + @Override + protected boolean matchesSafely(IndexRequest item) { + return item == indexRequest2; + } + + }), + argThat(new CustomTypeSafeMatcher("failure handler was not called with the expected arguments") { + @Override + protected boolean matchesSafely(IllegalArgumentException iae) { + return "pipeline with id [does_not_exist] does not exist".equals(iae.getMessage()); + } + }) + ); + verify(completionHandler, times(1)).accept(null); + } + + public void testExecuteSuccess() { + IngestService ingestService = createWithProcessors(Collections.singletonMap( + "mock", (factories, tag, config) -> mock(CompoundProcessor.class))); + PutPipelineRequest putRequest = new PutPipelineRequest("_id", + new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + ClusterState previousClusterState = clusterState; + clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); + @SuppressWarnings("unchecked") + final BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + final Consumer completionHandler = mock(Consumer.class); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + verify(failureHandler, never()).accept(any(), any()); + verify(completionHandler, times(1)).accept(null); + } + + public void testExecuteEmptyPipeline() throws Exception { + IngestService ingestService = createWithProcessors(emptyMap()); + PutPipelineRequest putRequest = + new PutPipelineRequest("_id", new BytesArray("{\"processors\": [], \"description\": \"_description\"}"), XContentType.JSON); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + ClusterState previousClusterState = clusterState; + clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); + @SuppressWarnings("unchecked") + final BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + final Consumer completionHandler = mock(Consumer.class); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + verify(failureHandler, never()).accept(any(), any()); + verify(completionHandler, times(1)).accept(null); + } + + public void testExecutePropagateAllMetaDataUpdates() throws Exception { + final CompoundProcessor processor = mock(CompoundProcessor.class); + IngestService ingestService = createWithProcessors(Collections.singletonMap( + "mock", (factories, tag, config) -> processor)); + PutPipelineRequest putRequest = new PutPipelineRequest("_id", + new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + ClusterState previousClusterState = clusterState; + clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + final long newVersion = randomLong(); + final String versionType = randomFrom("internal", "external", "external_gt", "external_gte"); + doAnswer((InvocationOnMock invocationOnMock) -> { + IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0]; + for (IngestDocument.MetaData metaData : IngestDocument.MetaData.values()) { + if (metaData == IngestDocument.MetaData.VERSION) { + ingestDocument.setFieldValue(metaData.getFieldName(), newVersion); + } else if (metaData == IngestDocument.MetaData.VERSION_TYPE) { + ingestDocument.setFieldValue(metaData.getFieldName(), versionType); + } else { + ingestDocument.setFieldValue(metaData.getFieldName(), "update" + metaData.getFieldName()); + } + } + return null; + }).when(processor).execute(any()); + final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); + @SuppressWarnings("unchecked") + final BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + final Consumer completionHandler = mock(Consumer.class); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + verify(processor).execute(any()); + verify(failureHandler, never()).accept(any(), any()); + verify(completionHandler, times(1)).accept(null); + assertThat(indexRequest.index(), equalTo("update_index")); + assertThat(indexRequest.type(), equalTo("update_type")); + assertThat(indexRequest.id(), equalTo("update_id")); + assertThat(indexRequest.routing(), equalTo("update_routing")); + assertThat(indexRequest.version(), equalTo(newVersion)); + assertThat(indexRequest.versionType(), equalTo(VersionType.fromString(versionType))); + } + + public void testExecuteFailure() throws Exception { + final CompoundProcessor processor = mock(CompoundProcessor.class); + IngestService ingestService = createWithProcessors(Collections.singletonMap( + "mock", (factories, tag, config) -> processor)); + PutPipelineRequest putRequest = new PutPipelineRequest("_id", + new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + ClusterState previousClusterState = clusterState; + clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); + doThrow(new RuntimeException()) + .when(processor) + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); + @SuppressWarnings("unchecked") + final BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + final Consumer completionHandler = mock(Consumer.class); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); + verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class)); + verify(completionHandler, times(1)).accept(null); + } + + public void testExecuteSuccessWithOnFailure() throws Exception { + final Processor processor = mock(Processor.class); + when(processor.getType()).thenReturn("mock_processor_type"); + when(processor.getTag()).thenReturn("mock_processor_tag"); + final Processor onFailureProcessor = mock(Processor.class); + final CompoundProcessor compoundProcessor = new CompoundProcessor( + false, Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor))); + IngestService ingestService = createWithProcessors(Collections.singletonMap( + "mock", (factories, tag, config) -> compoundProcessor)); + PutPipelineRequest putRequest = new PutPipelineRequest("_id", + new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + ClusterState previousClusterState = clusterState; + clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); + doThrow(new RuntimeException()).when(processor).execute(eqIndexTypeId(emptyMap())); + @SuppressWarnings("unchecked") + final BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + final Consumer completionHandler = mock(Consumer.class); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + verify(failureHandler, never()).accept(eq(indexRequest), any(ElasticsearchException.class)); + verify(completionHandler, times(1)).accept(null); + } + + public void testExecuteFailureWithNestedOnFailure() throws Exception { + final Processor processor = mock(Processor.class); + final Processor onFailureProcessor = mock(Processor.class); + final Processor onFailureOnFailureProcessor = mock(Processor.class); + final List processors = Collections.singletonList(onFailureProcessor); + final List onFailureProcessors = Collections.singletonList(onFailureOnFailureProcessor); + final CompoundProcessor compoundProcessor = new CompoundProcessor( + false, + Collections.singletonList(processor), + Collections.singletonList(new CompoundProcessor(false, processors, onFailureProcessors))); + IngestService ingestService = createWithProcessors(Collections.singletonMap( + "mock", (factories, tag, config) -> compoundProcessor)); + PutPipelineRequest putRequest = new PutPipelineRequest("_id", + new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + ClusterState previousClusterState = clusterState; + clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); + doThrow(new RuntimeException()) + .when(onFailureOnFailureProcessor) + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); + doThrow(new RuntimeException()) + .when(onFailureProcessor) + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); + doThrow(new RuntimeException()) + .when(processor) + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); + @SuppressWarnings("unchecked") + final BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + final Consumer completionHandler = mock(Consumer.class); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); + verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class)); + verify(completionHandler, times(1)).accept(null); + } + + public void testBulkRequestExecutionWithFailures() throws Exception { + BulkRequest bulkRequest = new BulkRequest(); + String pipelineId = "_id"; + + int numRequest = scaledRandomIntBetween(8, 64); + int numIndexRequests = 0; + for (int i = 0; i < numRequest; i++) { + DocWriteRequest request; + if (randomBoolean()) { + if (randomBoolean()) { + request = new DeleteRequest("_index", "_type", "_id"); + } else { + request = new UpdateRequest("_index", "_type", "_id"); + } + } else { + IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline(pipelineId); + indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1"); + request = indexRequest; + numIndexRequests++; + } + bulkRequest.add(request); + } + + CompoundProcessor processor = mock(CompoundProcessor.class); + when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class))); + Exception error = new RuntimeException(); + doThrow(error).when(processor).execute(any()); + IngestService ingestService = createWithProcessors(Collections.singletonMap( + "mock", (factories, tag, config) -> processor)); + PutPipelineRequest putRequest = new PutPipelineRequest("_id", + new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + ClusterState previousClusterState = clusterState; + clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + + @SuppressWarnings("unchecked") + BiConsumer requestItemErrorHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + Consumer completionHandler = mock(Consumer.class); + ingestService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler); + + verify(requestItemErrorHandler, times(numIndexRequests)).accept(any(IndexRequest.class), argThat(new ArgumentMatcher() { + @Override + public boolean matches(final Object o) { + return ((Exception)o).getCause().getCause().equals(error); + } + })); + verify(completionHandler, times(1)).accept(null); + } + + public void testBulkRequestExecution() { + BulkRequest bulkRequest = new BulkRequest(); + String pipelineId = "_id"; + + int numRequest = scaledRandomIntBetween(8, 64); + for (int i = 0; i < numRequest; i++) { + IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline(pipelineId); + indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1"); + bulkRequest.add(indexRequest); + } + + IngestService ingestService = createWithProcessors(emptyMap()); + PutPipelineRequest putRequest = + new PutPipelineRequest("_id", new BytesArray("{\"processors\": [], \"description\": \"_description\"}"), XContentType.JSON); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousClusterState = clusterState; + clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + + @SuppressWarnings("unchecked") + BiConsumer requestItemErrorHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + Consumer completionHandler = mock(Consumer.class); + ingestService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler); + + verify(requestItemErrorHandler, never()).accept(any(), any()); + verify(completionHandler, times(1)).accept(null); + } + + public void testStats() { + final Processor processor = mock(Processor.class); + IngestService ingestService = createWithProcessors(Collections.singletonMap( + "mock", (factories, tag, config) -> processor)); + final IngestStats initialStats = ingestService.stats(); + assertThat(initialStats.getStatsPerPipeline().size(), equalTo(0)); + assertThat(initialStats.getTotalStats().getIngestCount(), equalTo(0L)); + assertThat(initialStats.getTotalStats().getIngestCurrent(), equalTo(0L)); + assertThat(initialStats.getTotalStats().getIngestFailedCount(), equalTo(0L)); + assertThat(initialStats.getTotalStats().getIngestTimeInMillis(), equalTo(0L)); + + PutPipelineRequest putRequest = new PutPipelineRequest("_id1", + new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + ClusterState previousClusterState = clusterState; + clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + putRequest = new PutPipelineRequest("_id2", + new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); + previousClusterState = clusterState; + clusterState = IngestService.PipelineStore.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + final Map configurationMap = new HashMap<>(); + configurationMap.put("_id1", new PipelineConfiguration("_id1", new BytesArray("{}"), XContentType.JSON)); + configurationMap.put("_id2", new PipelineConfiguration("_id2", new BytesArray("{}"), XContentType.JSON)); + ingestService.updatePipelineStats(new IngestMetadata(configurationMap)); + + @SuppressWarnings("unchecked") final BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") final Consumer completionHandler = mock(Consumer.class); + + final IndexRequest indexRequest = new IndexRequest("_index"); + indexRequest.setPipeline("_id1"); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + final IngestStats afterFirstRequestStats = ingestService.stats(); + assertThat(afterFirstRequestStats.getStatsPerPipeline().size(), equalTo(2)); + assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L)); + assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(0L)); + assertThat(afterFirstRequestStats.getTotalStats().getIngestCount(), equalTo(1L)); + + indexRequest.setPipeline("_id2"); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + final IngestStats afterSecondRequestStats = ingestService.stats(); + assertThat(afterSecondRequestStats.getStatsPerPipeline().size(), equalTo(2)); + assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L)); + assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L)); + assertThat(afterSecondRequestStats.getTotalStats().getIngestCount(), equalTo(2L)); + } + + // issue: https://github.com/elastic/elasticsearch/issues/18126 + public void testUpdatingStatsWhenRemovingPipelineWorks() { + IngestService ingestService = createWithProcessors(); + Map configurationMap = new HashMap<>(); + configurationMap.put("_id1", new PipelineConfiguration("_id1", new BytesArray("{}"), XContentType.JSON)); + configurationMap.put("_id2", new PipelineConfiguration("_id2", new BytesArray("{}"), XContentType.JSON)); + ingestService.updatePipelineStats(new IngestMetadata(configurationMap)); + assertThat(ingestService.stats().getStatsPerPipeline(), hasKey("_id1")); + assertThat(ingestService.stats().getStatsPerPipeline(), hasKey("_id2")); + + configurationMap = new HashMap<>(); + configurationMap.put("_id3", new PipelineConfiguration("_id3", new BytesArray("{}"), XContentType.JSON)); + ingestService.updatePipelineStats(new IngestMetadata(configurationMap)); + assertThat(ingestService.stats().getStatsPerPipeline(), not(hasKey("_id1"))); + assertThat(ingestService.stats().getStatsPerPipeline(), not(hasKey("_id2"))); + } + + private IngestDocument eqIndexTypeId(final Map source) { + return argThat(new IngestDocumentMatcher("_index", "_type", "_id", source)); + } + + private IngestDocument eqIndexTypeId(final Long version, final VersionType versionType, final Map source) { + return argThat(new IngestDocumentMatcher("_index", "_type", "_id", version, versionType, source)); + } + + private static IngestService createWithProcessors() { + Map processors = new HashMap<>(); + processors.put("set", (factories, tag, config) -> { + String field = (String) config.remove("field"); + String value = (String) config.remove("value"); + return new Processor() { + @Override + public void execute(IngestDocument ingestDocument) { + ingestDocument.setFieldValue(field, value); + } + + @Override + public String getType() { + return "set"; + } + + @Override + public String getTag() { + return tag; + } + }; + }); + processors.put("remove", (factories, tag, config) -> { + String field = (String) config.remove("field"); + return new Processor() { + @Override + public void execute(IngestDocument ingestDocument) { + ingestDocument.removeField(field); + } + + @Override + public String getType() { + return "remove"; + } + + @Override + public String getTag() { + return tag; + } + }; + }); + return createWithProcessors(processors); + } + + private static IngestService createWithProcessors(Map processors) { + ThreadPool threadPool = mock(ThreadPool.class); + final ExecutorService executorService = EsExecutors.newDirectExecutorService(); + when(threadPool.executor(anyString())).thenReturn(executorService); + return new IngestService(mock(ClusterService.class), threadPool, null, null, + null, Collections.singletonList(new IngestPlugin() { + @Override + public Map getProcessors(final Processor.Parameters parameters) { + return processors; + } + })); + } + + private class IngestDocumentMatcher extends ArgumentMatcher { + + private final IngestDocument ingestDocument; + + IngestDocumentMatcher(String index, String type, String id, Map source) { + this.ingestDocument = new IngestDocument(index, type, id, null, null, null, source); + } + + IngestDocumentMatcher(String index, String type, String id, Long version, VersionType versionType, Map source) { + this.ingestDocument = new IngestDocument(index, type, id, null, version, versionType, source); + } + + @Override + public boolean matches(Object o) { + if (o.getClass() == IngestDocument.class) { + IngestDocument otherIngestDocument = (IngestDocument) o; + //ingest metadata will not be the same (timestamp differs every time) + return Objects.equals(ingestDocument.getSourceAndMetadata(), otherIngestDocument.getSourceAndMetadata()); + } + return false; + } + } } diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java deleted file mode 100644 index 15a23421da26a..0000000000000 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java +++ /dev/null @@ -1,471 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.ingest; - -import org.apache.lucene.util.SetOnce; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.client.Requests; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.VersionType; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; -import org.hamcrest.CustomTypeSafeMatcher; -import org.junit.Before; -import org.mockito.ArgumentMatcher; -import org.mockito.invocation.InvocationOnMock; - -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ExecutorService; -import java.util.function.BiConsumer; -import java.util.function.Consumer; - -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasKey; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.sameInstance; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.argThat; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class PipelineExecutionServiceTests extends ESTestCase { - - private final Integer version = randomBoolean() ? randomInt() : null; - private PipelineStore store; - private PipelineExecutionService executionService; - - @Before - public void setup() { - store = mock(PipelineStore.class); - ThreadPool threadPool = mock(ThreadPool.class); - final ExecutorService executorService = EsExecutors.newDirectExecutorService(); - when(threadPool.executor(anyString())).thenReturn(executorService); - executionService = new PipelineExecutionService(store, threadPool); - } - - public void testExecuteIndexPipelineDoesNotExist() { - final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - - final SetOnce failure = new SetOnce<>(); - final BiConsumer failureHandler = (request, e) -> { - failure.set(true); - assertThat(request, sameInstance(indexRequest)); - assertThat(e, instanceOf(IllegalArgumentException.class)); - assertThat(e.getMessage(), equalTo("pipeline with id [_id] does not exist")); - }; - - @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - - executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); - - assertTrue(failure.get()); - verify(completionHandler, times(1)).accept(null); - } - - public void testExecuteIndexPipelineExistsButFailedParsing() { - when(store.get("_id")).thenReturn(new Pipeline("_id", "stub", null, - new CompoundProcessor(new AbstractProcessor("mock") { - @Override - public void execute(IngestDocument ingestDocument) { - throw new IllegalStateException("error"); - } - - @Override - public String getType() { - return null; - } - }))); - - final SetOnce failure = new SetOnce<>(); - final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - final BiConsumer failureHandler = (request, e) -> { - assertThat(e.getCause(), instanceOf(IllegalArgumentException.class)); - assertThat(e.getCause().getCause(), instanceOf(IllegalStateException.class)); - assertThat(e.getCause().getCause().getMessage(), equalTo("error")); - failure.set(true); - }; - - @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - - executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); - - assertTrue(failure.get()); - verify(completionHandler, times(1)).accept(null); - } - - public void testExecuteBulkPipelineDoesNotExist() { - CompoundProcessor processor = mock(CompoundProcessor.class); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor)); - BulkRequest bulkRequest = new BulkRequest(); - - IndexRequest indexRequest1 = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - bulkRequest.add(indexRequest1); - IndexRequest indexRequest2 = - new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("does_not_exist"); - bulkRequest.add(indexRequest2); - @SuppressWarnings("unchecked") - BiConsumer failureHandler = mock(BiConsumer.class); - @SuppressWarnings("unchecked") - Consumer completionHandler = mock(Consumer.class); - executionService.executeBulkRequest(bulkRequest.requests(), failureHandler, completionHandler); - verify(failureHandler, times(1)).accept( - argThat(new CustomTypeSafeMatcher("failure handler was not called with the expected arguments") { - @Override - protected boolean matchesSafely(IndexRequest item) { - return item == indexRequest2; - } - - }), - argThat(new CustomTypeSafeMatcher("failure handler was not called with the expected arguments") { - @Override - protected boolean matchesSafely(IllegalArgumentException iae) { - return "pipeline with id [does_not_exist] does not exist".equals(iae.getMessage()); - } - }) - ); - verify(completionHandler, times(1)).accept(null); - } - - public void testExecuteSuccess() { - final CompoundProcessor processor = mock(CompoundProcessor.class); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor)); - final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); - @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); - verify(failureHandler, never()).accept(any(), any()); - verify(completionHandler, times(1)).accept(null); - } - - public void testExecuteEmptyPipeline() throws Exception { - final CompoundProcessor processor = mock(CompoundProcessor.class); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor)); - when(processor.getProcessors()).thenReturn(Collections.emptyList()); - - final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); - @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); - verify(processor, never()).execute(any()); - verify(failureHandler, never()).accept(any(), any()); - verify(completionHandler, times(1)).accept(null); - } - - public void testExecutePropagateAllMetaDataUpdates() throws Exception { - final CompoundProcessor processor = mock(CompoundProcessor.class); - when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class))); - final long newVersion = randomLong(); - final String versionType = randomFrom("internal", "external", "external_gt", "external_gte"); - doAnswer((InvocationOnMock invocationOnMock) -> { - IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0]; - for (IngestDocument.MetaData metaData : IngestDocument.MetaData.values()) { - if (metaData == IngestDocument.MetaData.VERSION) { - ingestDocument.setFieldValue(metaData.getFieldName(), newVersion); - } else if (metaData == IngestDocument.MetaData.VERSION_TYPE) { - ingestDocument.setFieldValue(metaData.getFieldName(), versionType); - } else { - ingestDocument.setFieldValue(metaData.getFieldName(), "update" + metaData.getFieldName()); - } - } - return null; - }).when(processor).execute(any()); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor)); - - final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); - @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); - verify(processor).execute(any()); - verify(failureHandler, never()).accept(any(), any()); - verify(completionHandler, times(1)).accept(null); - assertThat(indexRequest.index(), equalTo("update_index")); - assertThat(indexRequest.type(), equalTo("update_type")); - assertThat(indexRequest.id(), equalTo("update_id")); - assertThat(indexRequest.routing(), equalTo("update_routing")); - assertThat(indexRequest.version(), equalTo(newVersion)); - assertThat(indexRequest.versionType(), equalTo(VersionType.fromString(versionType))); - } - - public void testExecuteFailure() throws Exception { - final CompoundProcessor processor = mock(CompoundProcessor.class); - when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class))); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor)); - final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - doThrow(new RuntimeException()) - .when(processor) - .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); - @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); - @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); - verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); - verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class)); - verify(completionHandler, times(1)).accept(null); - } - - public void testExecuteSuccessWithOnFailure() throws Exception { - final Processor processor = mock(Processor.class); - when(processor.getType()).thenReturn("mock_processor_type"); - when(processor.getTag()).thenReturn("mock_processor_tag"); - final Processor onFailureProcessor = mock(Processor.class); - final CompoundProcessor compoundProcessor = new CompoundProcessor( - false, Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor))); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, compoundProcessor)); - final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - doThrow(new RuntimeException()).when(processor).execute(eqIndexTypeId(Collections.emptyMap())); - @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); - @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); - verify(failureHandler, never()).accept(eq(indexRequest), any(ElasticsearchException.class)); - verify(completionHandler, times(1)).accept(null); - } - - public void testExecuteFailureWithOnFailure() throws Exception { - final Processor processor = mock(Processor.class); - final Processor onFailureProcessor = mock(Processor.class); - final CompoundProcessor compoundProcessor = new CompoundProcessor( - false, Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor))); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, compoundProcessor)); - final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - doThrow(new RuntimeException()) - .when(processor) - .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); - doThrow(new RuntimeException()) - .when(onFailureProcessor) - .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); - @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); - @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); - verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); - verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class)); - verify(completionHandler, times(1)).accept(null); - } - - public void testExecuteFailureWithNestedOnFailure() throws Exception { - final Processor processor = mock(Processor.class); - final Processor onFailureProcessor = mock(Processor.class); - final Processor onFailureOnFailureProcessor = mock(Processor.class); - final List processors = Collections.singletonList(onFailureProcessor); - final List onFailureProcessors = Collections.singletonList(onFailureOnFailureProcessor); - final CompoundProcessor compoundProcessor = new CompoundProcessor( - false, - Collections.singletonList(processor), - Collections.singletonList(new CompoundProcessor(false, processors, onFailureProcessors))); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, compoundProcessor)); - final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - doThrow(new RuntimeException()) - .when(onFailureOnFailureProcessor) - .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); - doThrow(new RuntimeException()) - .when(onFailureProcessor) - .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); - doThrow(new RuntimeException()) - .when(processor) - .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); - @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); - @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); - verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); - verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class)); - verify(completionHandler, times(1)).accept(null); - } - - public void testBulkRequestExecutionWithFailures() throws Exception { - BulkRequest bulkRequest = new BulkRequest(); - String pipelineId = "_id"; - - int numRequest = scaledRandomIntBetween(8, 64); - int numIndexRequests = 0; - for (int i = 0; i < numRequest; i++) { - DocWriteRequest request; - if (randomBoolean()) { - if (randomBoolean()) { - request = new DeleteRequest("_index", "_type", "_id"); - } else { - request = new UpdateRequest("_index", "_type", "_id"); - } - } else { - IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline(pipelineId); - indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1"); - request = indexRequest; - numIndexRequests++; - } - bulkRequest.add(request); - } - - CompoundProcessor processor = mock(CompoundProcessor.class); - when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class))); - Exception error = new RuntimeException(); - doThrow(error).when(processor).execute(any()); - when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, version, processor)); - - @SuppressWarnings("unchecked") - BiConsumer requestItemErrorHandler = mock(BiConsumer.class); - @SuppressWarnings("unchecked") - Consumer completionHandler = mock(Consumer.class); - executionService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler); - - verify(requestItemErrorHandler, times(numIndexRequests)).accept(any(IndexRequest.class), eq(error)); - verify(completionHandler, times(1)).accept(null); - } - - public void testBulkRequestExecution() { - BulkRequest bulkRequest = new BulkRequest(); - String pipelineId = "_id"; - - int numRequest = scaledRandomIntBetween(8, 64); - for (int i = 0; i < numRequest; i++) { - IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline(pipelineId); - indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1"); - bulkRequest.add(indexRequest); - } - - when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, version, new CompoundProcessor())); - - @SuppressWarnings("unchecked") - BiConsumer requestItemErrorHandler = mock(BiConsumer.class); - @SuppressWarnings("unchecked") - Consumer completionHandler = mock(Consumer.class); - executionService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler); - - verify(requestItemErrorHandler, never()).accept(any(), any()); - verify(completionHandler, times(1)).accept(null); - } - - public void testStats() { - final IngestStats initialStats = executionService.stats(); - assertThat(initialStats.getStatsPerPipeline().size(), equalTo(0)); - assertThat(initialStats.getTotalStats().getIngestCount(), equalTo(0L)); - assertThat(initialStats.getTotalStats().getIngestCurrent(), equalTo(0L)); - assertThat(initialStats.getTotalStats().getIngestFailedCount(), equalTo(0L)); - assertThat(initialStats.getTotalStats().getIngestTimeInMillis(), equalTo(0L)); - - when(store.get("_id1")).thenReturn(new Pipeline("_id1", null, version, new CompoundProcessor(mock(Processor.class)))); - when(store.get("_id2")).thenReturn(new Pipeline("_id2", null, null, new CompoundProcessor(mock(Processor.class)))); - - final Map configurationMap = new HashMap<>(); - configurationMap.put("_id1", new PipelineConfiguration("_id1", new BytesArray("{}"), XContentType.JSON)); - configurationMap.put("_id2", new PipelineConfiguration("_id2", new BytesArray("{}"), XContentType.JSON)); - executionService.updatePipelineStats(new IngestMetadata(configurationMap)); - - @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); - @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - - final IndexRequest indexRequest = new IndexRequest("_index"); - indexRequest.setPipeline("_id1"); - executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); - final IngestStats afterFirstRequestStats = executionService.stats(); - assertThat(afterFirstRequestStats.getStatsPerPipeline().size(), equalTo(2)); - assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L)); - assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(0L)); - assertThat(afterFirstRequestStats.getTotalStats().getIngestCount(), equalTo(1L)); - - indexRequest.setPipeline("_id2"); - executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); - final IngestStats afterSecondRequestStats = executionService.stats(); - assertThat(afterSecondRequestStats.getStatsPerPipeline().size(), equalTo(2)); - assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L)); - assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L)); - assertThat(afterSecondRequestStats.getTotalStats().getIngestCount(), equalTo(2L)); - } - - // issue: https://github.com/elastic/elasticsearch/issues/18126 - public void testUpdatingStatsWhenRemovingPipelineWorks() { - Map configurationMap = new HashMap<>(); - configurationMap.put("_id1", new PipelineConfiguration("_id1", new BytesArray("{}"), XContentType.JSON)); - configurationMap.put("_id2", new PipelineConfiguration("_id2", new BytesArray("{}"), XContentType.JSON)); - executionService.updatePipelineStats(new IngestMetadata(configurationMap)); - assertThat(executionService.stats().getStatsPerPipeline(), hasKey("_id1")); - assertThat(executionService.stats().getStatsPerPipeline(), hasKey("_id2")); - - configurationMap = new HashMap<>(); - configurationMap.put("_id3", new PipelineConfiguration("_id3", new BytesArray("{}"), XContentType.JSON)); - executionService.updatePipelineStats(new IngestMetadata(configurationMap)); - assertThat(executionService.stats().getStatsPerPipeline(), not(hasKey("_id1"))); - assertThat(executionService.stats().getStatsPerPipeline(), not(hasKey("_id2"))); - } - - private IngestDocument eqIndexTypeId(final Map source) { - return argThat(new IngestDocumentMatcher("_index", "_type", "_id", source)); - } - - private IngestDocument eqIndexTypeId(final Long version, final VersionType versionType, final Map source) { - return argThat(new IngestDocumentMatcher("_index", "_type", "_id", version, versionType, source)); - } - - private class IngestDocumentMatcher extends ArgumentMatcher { - - private final IngestDocument ingestDocument; - - IngestDocumentMatcher(String index, String type, String id, Map source) { - this.ingestDocument = new IngestDocument(index, type, id, null, null, null, source); - } - - IngestDocumentMatcher(String index, String type, String id, Long version, VersionType versionType, Map source) { - this.ingestDocument = new IngestDocument(index, type, id, null, version, versionType, source); - } - - @Override - public boolean matches(Object o) { - if (o.getClass() == IngestDocument.class) { - IngestDocument otherIngestDocument = (IngestDocument) o; - //ingest metadata will not be the same (timestamp differs every time) - return Objects.equals(ingestDocument.getSourceAndMetadata(), otherIngestDocument.getSourceAndMetadata()); - } - return false; - } - } -} diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java index 461873a3fe3d2..cafdbcfb44690 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java @@ -47,9 +47,8 @@ public void testCreate() throws Exception { pipelineConfig.put(Pipeline.VERSION_KEY, versionString); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Arrays.asList(Collections.singletonMap("test", processorConfig0), Collections.singletonMap("test", processorConfig1))); - Pipeline.Factory factory = new Pipeline.Factory(); Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); - Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry); + Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getVersion(), equalTo(version)); @@ -64,9 +63,8 @@ public void testCreateWithNoProcessorsField() throws Exception { Map pipelineConfig = new HashMap<>(); pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); pipelineConfig.put(Pipeline.VERSION_KEY, versionString); - Pipeline.Factory factory = new Pipeline.Factory(); try { - factory.create("_id", pipelineConfig, Collections.emptyMap()); + Pipeline.create("_id", pipelineConfig, Collections.emptyMap()); fail("should fail, missing required [processors] field"); } catch (ElasticsearchParseException e) { assertThat(e.getMessage(), equalTo("[processors] required property is missing")); @@ -78,8 +76,7 @@ public void testCreateWithEmptyProcessorsField() throws Exception { pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); pipelineConfig.put(Pipeline.VERSION_KEY, versionString); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.emptyList()); - Pipeline.Factory factory = new Pipeline.Factory(); - Pipeline pipeline = factory.create("_id", pipelineConfig, null); + Pipeline pipeline = Pipeline.create("_id", pipelineConfig, null); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getVersion(), equalTo(version)); @@ -93,9 +90,8 @@ public void testCreateWithPipelineOnFailure() throws Exception { pipelineConfig.put(Pipeline.VERSION_KEY, versionString); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); pipelineConfig.put(Pipeline.ON_FAILURE_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); - Pipeline.Factory factory = new Pipeline.Factory(); Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); - Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry); + Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getVersion(), equalTo(version)); @@ -112,9 +108,8 @@ public void testCreateWithPipelineEmptyOnFailure() throws Exception { pipelineConfig.put(Pipeline.VERSION_KEY, versionString); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); pipelineConfig.put(Pipeline.ON_FAILURE_KEY, Collections.emptyList()); - Pipeline.Factory factory = new Pipeline.Factory(); Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); - Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create("_id", pipelineConfig, processorRegistry)); + Exception e = expectThrows(ElasticsearchParseException.class, () -> Pipeline.create("_id", pipelineConfig, processorRegistry)); assertThat(e.getMessage(), equalTo("pipeline [_id] cannot have an empty on_failure option defined")); } @@ -125,9 +120,8 @@ public void testCreateWithPipelineEmptyOnFailureInProcessor() throws Exception { pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); pipelineConfig.put(Pipeline.VERSION_KEY, versionString); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); - Pipeline.Factory factory = new Pipeline.Factory(); Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); - Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create("_id", pipelineConfig, processorRegistry)); + Exception e = expectThrows(ElasticsearchParseException.class, () -> Pipeline.create("_id", pipelineConfig, processorRegistry)); assertThat(e.getMessage(), equalTo("[on_failure] processors list cannot be empty")); } @@ -136,14 +130,13 @@ public void testCreateWithPipelineIgnoreFailure() throws Exception { processorConfig.put("ignore_failure", true); Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); - Pipeline.Factory factory = new Pipeline.Factory(); Map pipelineConfig = new HashMap<>(); pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); pipelineConfig.put(Pipeline.VERSION_KEY, versionString); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); - Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry); + Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getVersion(), equalTo(version)); @@ -162,9 +155,8 @@ public void testCreateUnusedProcessorOptions() throws Exception { pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); pipelineConfig.put(Pipeline.VERSION_KEY, versionString); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); - Pipeline.Factory factory = new Pipeline.Factory(); Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); - Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create("_id", pipelineConfig, processorRegistry)); + Exception e = expectThrows(ElasticsearchParseException.class, () -> Pipeline.create("_id", pipelineConfig, processorRegistry)); assertThat(e.getMessage(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]")); } @@ -176,9 +168,8 @@ public void testCreateProcessorsWithOnFailureProperties() throws Exception { pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); pipelineConfig.put(Pipeline.VERSION_KEY, versionString); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); - Pipeline.Factory factory = new Pipeline.Factory(); Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); - Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry); + Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getVersion(), equalTo(version)); diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java deleted file mode 100644 index 250bb5059cf58..0000000000000 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java +++ /dev/null @@ -1,377 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.ingest; - -import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.ResourceNotFoundException; -import org.elasticsearch.Version; -import org.elasticsearch.action.ingest.DeletePipelineRequest; -import org.elasticsearch.action.ingest.PutPipelineRequest; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.test.ESTestCase; -import org.junit.Before; - -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static java.util.Collections.emptyMap; -import static java.util.Collections.emptySet; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; - -public class PipelineStoreTests extends ESTestCase { - - private PipelineStore store; - - @Before - public void init() throws Exception { - Map processorFactories = new HashMap<>(); - processorFactories.put("set", (factories, tag, config) -> { - String field = (String) config.remove("field"); - String value = (String) config.remove("value"); - return new Processor() { - @Override - public void execute(IngestDocument ingestDocument) throws Exception { - ingestDocument.setFieldValue(field, value); - } - - @Override - public String getType() { - return "set"; - } - - @Override - public String getTag() { - return tag; - } - }; - }); - processorFactories.put("remove", (factories, tag, config) -> { - String field = (String) config.remove("field"); - return new Processor() { - @Override - public void execute(IngestDocument ingestDocument) throws Exception { - ingestDocument.removeField(field); - } - - @Override - public String getType() { - return "remove"; - } - - @Override - public String getTag() { - return tag; - } - }; - }); - store = new PipelineStore(Settings.EMPTY, processorFactories); - } - - public void testUpdatePipelines() { - ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); - ClusterState previousClusterState = clusterState; - store.innerUpdatePipelines(previousClusterState, clusterState); - assertThat(store.pipelines.size(), is(0)); - - PipelineConfiguration pipeline = new PipelineConfiguration( - "_id",new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"), XContentType.JSON - ); - IngestMetadata ingestMetadata = new IngestMetadata(Collections.singletonMap("_id", pipeline)); - clusterState = ClusterState.builder(clusterState) - .metaData(MetaData.builder().putCustom(IngestMetadata.TYPE, ingestMetadata)) - .build(); - store.innerUpdatePipelines(previousClusterState, clusterState); - assertThat(store.pipelines.size(), is(1)); - assertThat(store.pipelines.get("_id").getId(), equalTo("_id")); - assertThat(store.pipelines.get("_id").getDescription(), nullValue()); - assertThat(store.pipelines.get("_id").getProcessors().size(), equalTo(1)); - assertThat(store.pipelines.get("_id").getProcessors().get(0).getType(), equalTo("set")); - } - - public void testPut() { - String id = "_id"; - Pipeline pipeline = store.get(id); - assertThat(pipeline, nullValue()); - ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); - - // add a new pipeline: - PutPipelineRequest putRequest = new PutPipelineRequest(id, new BytesArray("{\"processors\": []}"), XContentType.JSON); - ClusterState previousClusterState = clusterState; - clusterState = store.innerPut(putRequest, clusterState); - store.innerUpdatePipelines(previousClusterState, clusterState); - pipeline = store.get(id); - assertThat(pipeline, notNullValue()); - assertThat(pipeline.getId(), equalTo(id)); - assertThat(pipeline.getDescription(), nullValue()); - assertThat(pipeline.getProcessors().size(), equalTo(0)); - - // overwrite existing pipeline: - putRequest = - new PutPipelineRequest(id, new BytesArray("{\"processors\": [], \"description\": \"_description\"}"), XContentType.JSON); - previousClusterState = clusterState; - clusterState = store.innerPut(putRequest, clusterState); - store.innerUpdatePipelines(previousClusterState, clusterState); - pipeline = store.get(id); - assertThat(pipeline, notNullValue()); - assertThat(pipeline.getId(), equalTo(id)); - assertThat(pipeline.getDescription(), equalTo("_description")); - assertThat(pipeline.getProcessors().size(), equalTo(0)); - } - - public void testPutWithErrorResponse() { - String id = "_id"; - Pipeline pipeline = store.get(id); - assertThat(pipeline, nullValue()); - ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); - - PutPipelineRequest putRequest = - new PutPipelineRequest(id, new BytesArray("{\"description\": \"empty processors\"}"), XContentType.JSON); - ClusterState previousClusterState = clusterState; - clusterState = store.innerPut(putRequest, clusterState); - try { - store.innerUpdatePipelines(previousClusterState, clusterState); - fail("should fail"); - } catch (ElasticsearchParseException e) { - assertThat(e.getMessage(), equalTo("[processors] required property is missing")); - } - pipeline = store.get(id); - assertNotNull(pipeline); - assertThat(pipeline.getId(), equalTo("_id")); - assertThat(pipeline.getDescription(), equalTo("this is a place holder pipeline, because pipeline with" + - " id [_id] could not be loaded")); - assertThat(pipeline.getProcessors().size(), equalTo(1)); - assertNull(pipeline.getProcessors().get(0).getTag()); - assertThat(pipeline.getProcessors().get(0).getType(), equalTo("unknown")); - } - - public void testDelete() { - PipelineConfiguration config = new PipelineConfiguration( - "_id",new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"), XContentType.JSON - ); - IngestMetadata ingestMetadata = new IngestMetadata(Collections.singletonMap("_id", config)); - ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); - ClusterState previousClusterState = clusterState; - clusterState = ClusterState.builder(clusterState).metaData(MetaData.builder() - .putCustom(IngestMetadata.TYPE, ingestMetadata)).build(); - store.innerUpdatePipelines(previousClusterState, clusterState); - assertThat(store.get("_id"), notNullValue()); - - // Delete pipeline: - DeletePipelineRequest deleteRequest = new DeletePipelineRequest("_id"); - previousClusterState = clusterState; - clusterState = store.innerDelete(deleteRequest, clusterState); - store.innerUpdatePipelines(previousClusterState, clusterState); - assertThat(store.get("_id"), nullValue()); - - // Delete existing pipeline: - try { - store.innerDelete(deleteRequest, clusterState); - fail("exception expected"); - } catch (ResourceNotFoundException e) { - assertThat(e.getMessage(), equalTo("pipeline [_id] is missing")); - } - } - - public void testDeleteUsingWildcard() { - HashMap pipelines = new HashMap<>(); - BytesArray definition = new BytesArray( - "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}" - ); - pipelines.put("p1", new PipelineConfiguration("p1", definition, XContentType.JSON)); - pipelines.put("p2", new PipelineConfiguration("p2", definition, XContentType.JSON)); - pipelines.put("q1", new PipelineConfiguration("q1", definition, XContentType.JSON)); - IngestMetadata ingestMetadata = new IngestMetadata(pipelines); - ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); - ClusterState previousClusterState = clusterState; - clusterState = ClusterState.builder(clusterState).metaData(MetaData.builder() - .putCustom(IngestMetadata.TYPE, ingestMetadata)).build(); - store.innerUpdatePipelines(previousClusterState, clusterState); - assertThat(store.get("p1"), notNullValue()); - assertThat(store.get("p2"), notNullValue()); - assertThat(store.get("q1"), notNullValue()); - - // Delete pipeline matching wildcard - DeletePipelineRequest deleteRequest = new DeletePipelineRequest("p*"); - previousClusterState = clusterState; - clusterState = store.innerDelete(deleteRequest, clusterState); - store.innerUpdatePipelines(previousClusterState, clusterState); - assertThat(store.get("p1"), nullValue()); - assertThat(store.get("p2"), nullValue()); - assertThat(store.get("q1"), notNullValue()); - - // Exception if we used name which does not exist - try { - store.innerDelete(new DeletePipelineRequest("unknown"), clusterState); - fail("exception expected"); - } catch (ResourceNotFoundException e) { - assertThat(e.getMessage(), equalTo("pipeline [unknown] is missing")); - } - - // match all wildcard works on last remaining pipeline - DeletePipelineRequest matchAllDeleteRequest = new DeletePipelineRequest("*"); - previousClusterState = clusterState; - clusterState = store.innerDelete(matchAllDeleteRequest, clusterState); - store.innerUpdatePipelines(previousClusterState, clusterState); - assertThat(store.get("p1"), nullValue()); - assertThat(store.get("p2"), nullValue()); - assertThat(store.get("q1"), nullValue()); - - // match all wildcard does not throw exception if none match - store.innerDelete(matchAllDeleteRequest, clusterState); - } - - public void testDeleteWithExistingUnmatchedPipelines() { - HashMap pipelines = new HashMap<>(); - BytesArray definition = new BytesArray( - "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}" - ); - pipelines.put("p1", new PipelineConfiguration("p1", definition, XContentType.JSON)); - IngestMetadata ingestMetadata = new IngestMetadata(pipelines); - ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); - ClusterState previousClusterState = clusterState; - clusterState = ClusterState.builder(clusterState).metaData(MetaData.builder() - .putCustom(IngestMetadata.TYPE, ingestMetadata)).build(); - store.innerUpdatePipelines(previousClusterState, clusterState); - assertThat(store.get("p1"), notNullValue()); - - DeletePipelineRequest deleteRequest = new DeletePipelineRequest("z*"); - try { - store.innerDelete(deleteRequest, clusterState); - fail("exception expected"); - } catch (ResourceNotFoundException e) { - assertThat(e.getMessage(), equalTo("pipeline [z*] is missing")); - } - } - - public void testGetPipelines() { - Map configs = new HashMap<>(); - configs.put("_id1", new PipelineConfiguration( - "_id1", new BytesArray("{\"processors\": []}"), XContentType.JSON - )); - configs.put("_id2", new PipelineConfiguration( - "_id2", new BytesArray("{\"processors\": []}"), XContentType.JSON - )); - - assertThat(store.innerGetPipelines(null, "_id1").isEmpty(), is(true)); - - IngestMetadata ingestMetadata = new IngestMetadata(configs); - List pipelines = store.innerGetPipelines(ingestMetadata, "_id1"); - assertThat(pipelines.size(), equalTo(1)); - assertThat(pipelines.get(0).getId(), equalTo("_id1")); - - pipelines = store.innerGetPipelines(ingestMetadata, "_id1", "_id2"); - assertThat(pipelines.size(), equalTo(2)); - assertThat(pipelines.get(0).getId(), equalTo("_id1")); - assertThat(pipelines.get(1).getId(), equalTo("_id2")); - - pipelines = store.innerGetPipelines(ingestMetadata, "_id*"); - pipelines.sort((o1, o2) -> o1.getId().compareTo(o2.getId())); - assertThat(pipelines.size(), equalTo(2)); - assertThat(pipelines.get(0).getId(), equalTo("_id1")); - assertThat(pipelines.get(1).getId(), equalTo("_id2")); - - // get all variants: (no IDs or '*') - pipelines = store.innerGetPipelines(ingestMetadata); - pipelines.sort((o1, o2) -> o1.getId().compareTo(o2.getId())); - assertThat(pipelines.size(), equalTo(2)); - assertThat(pipelines.get(0).getId(), equalTo("_id1")); - assertThat(pipelines.get(1).getId(), equalTo("_id2")); - - pipelines = store.innerGetPipelines(ingestMetadata, "*"); - pipelines.sort((o1, o2) -> o1.getId().compareTo(o2.getId())); - assertThat(pipelines.size(), equalTo(2)); - assertThat(pipelines.get(0).getId(), equalTo("_id1")); - assertThat(pipelines.get(1).getId(), equalTo("_id2")); - } - - public void testCrud() throws Exception { - String id = "_id"; - Pipeline pipeline = store.get(id); - assertThat(pipeline, nullValue()); - ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty - - PutPipelineRequest putRequest = new PutPipelineRequest(id, - new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"), XContentType.JSON); - ClusterState previousClusterState = clusterState; - clusterState = store.innerPut(putRequest, clusterState); - store.innerUpdatePipelines(previousClusterState, clusterState); - pipeline = store.get(id); - assertThat(pipeline, notNullValue()); - assertThat(pipeline.getId(), equalTo(id)); - assertThat(pipeline.getDescription(), nullValue()); - assertThat(pipeline.getProcessors().size(), equalTo(1)); - assertThat(pipeline.getProcessors().get(0).getType(), equalTo("set")); - - DeletePipelineRequest deleteRequest = new DeletePipelineRequest(id); - previousClusterState = clusterState; - clusterState = store.innerDelete(deleteRequest, clusterState); - store.innerUpdatePipelines(previousClusterState, clusterState); - pipeline = store.get(id); - assertThat(pipeline, nullValue()); - } - - public void testValidate() throws Exception { - PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray( - "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\", \"tag\": \"tag1\"}}," + - "{\"remove\" : {\"field\": \"_field\", \"tag\": \"tag2\"}}]}"), - XContentType.JSON); - - DiscoveryNode node1 = new DiscoveryNode("_node_id1", buildNewFakeTransportAddress(), - emptyMap(), emptySet(), Version.CURRENT); - DiscoveryNode node2 = new DiscoveryNode("_node_id2", buildNewFakeTransportAddress(), - emptyMap(), emptySet(), Version.CURRENT); - Map ingestInfos = new HashMap<>(); - ingestInfos.put(node1, new IngestInfo(Arrays.asList(new ProcessorInfo("set"), new ProcessorInfo("remove")))); - ingestInfos.put(node2, new IngestInfo(Arrays.asList(new ProcessorInfo("set")))); - - ElasticsearchParseException e = - expectThrows(ElasticsearchParseException.class, () -> store.validatePipeline(ingestInfos, putRequest)); - assertEquals("Processor type [remove] is not installed on node [" + node2 + "]", e.getMessage()); - assertEquals("remove", e.getHeader("processor_type").get(0)); - assertEquals("tag2", e.getHeader("processor_tag").get(0)); - - ingestInfos.put(node2, new IngestInfo(Arrays.asList(new ProcessorInfo("set"), new ProcessorInfo("remove")))); - store.validatePipeline(ingestInfos, putRequest); - } - - public void testValidateNoIngestInfo() throws Exception { - PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray( - "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"), XContentType.JSON); - Exception e = expectThrows(IllegalStateException.class, () -> store.validatePipeline(Collections.emptyMap(), putRequest)); - assertEquals("Ingest info is empty", e.getMessage()); - - DiscoveryNode discoveryNode = new DiscoveryNode("_node_id", buildNewFakeTransportAddress(), - emptyMap(), emptySet(), Version.CURRENT); - IngestInfo ingestInfo = new IngestInfo(Collections.singletonList(new ProcessorInfo("set"))); - store.validatePipeline(Collections.singletonMap(discoveryNode, ingestInfo), putRequest); - } -}