From e4283066863f6291d0d353db88f1cbe565e0d40b Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 21 Aug 2018 10:13:32 +0200 Subject: [PATCH] INGEST: Simplify IngestService (#33008) * INGEST: Simplify IngestService * Follow up to #32617 * Flatten redundant inner classes of `IngestService` --- .../action/bulk/BulkRequest.java | 20 +- .../action/bulk/TransportBulkAction.java | 6 +- .../ingest/DeletePipelineTransportAction.java | 17 +- .../ingest/GetPipelineTransportAction.java | 13 +- .../ingest/PutPipelineTransportAction.java | 11 +- .../ingest/SimulatePipelineRequest.java | 11 +- .../SimulatePipelineTransportAction.java | 10 +- .../elasticsearch/ingest/IngestService.java | 446 ++++++++- .../org/elasticsearch/ingest/Pipeline.java | 46 +- .../ingest/PipelineExecutionService.java | 217 ----- .../elasticsearch/ingest/PipelineStore.java | 275 ------ .../java/org/elasticsearch/node/Node.java | 4 +- .../org/elasticsearch/node/NodeService.java | 16 +- .../action/bulk/BulkRequestTests.java | 2 +- .../bulk/TransportBulkActionIngestTests.java | 22 +- .../SimulatePipelineRequestParsingTests.java | 23 +- ...gestProcessorNotInstalledOnAllNodesIT.java | 4 +- .../ingest/IngestServiceTests.java | 864 +++++++++++++++++- .../ingest/PipelineExecutionServiceTests.java | 472 ---------- .../ingest/PipelineFactoryTests.java | 27 +- .../ingest/PipelineStoreTests.java | 377 -------- 21 files changed, 1386 insertions(+), 1497 deletions(-) delete mode 100644 server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java delete mode 100644 server/src/main/java/org/elasticsearch/ingest/PipelineStore.java delete mode 100644 server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java delete mode 100644 server/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index 67a5f025bfe03..9c6bcfc59ea36 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -89,7 +89,7 @@ public class BulkRequest extends ActionRequest implements CompositeIndicesReques * {@link WriteRequest}s to this but java doesn't support syntax to declare that everything in the array has both types so we declare * the one with the least casts. */ - final List requests = new ArrayList<>(); + final List> requests = new ArrayList<>(); private final Set indices = new HashSet<>(); List payloads = null; @@ -105,14 +105,14 @@ public BulkRequest() { /** * Adds a list of requests to be executed. Either index or delete requests. */ - public BulkRequest add(DocWriteRequest... requests) { - for (DocWriteRequest request : requests) { + public BulkRequest add(DocWriteRequest... requests) { + for (DocWriteRequest request : requests) { add(request, null); } return this; } - public BulkRequest add(DocWriteRequest request) { + public BulkRequest add(DocWriteRequest request) { return add(request, null); } @@ -122,7 +122,7 @@ public BulkRequest add(DocWriteRequest request) { * @param payload Optional payload * @return the current bulk request */ - public BulkRequest add(DocWriteRequest request, @Nullable Object payload) { + public BulkRequest add(DocWriteRequest request, @Nullable Object payload) { if (request instanceof IndexRequest) { add((IndexRequest) request, payload); } else if (request instanceof DeleteRequest) { @@ -139,8 +139,8 @@ public BulkRequest add(DocWriteRequest request, @Nullable Object payload) { /** * Adds a list of requests to be executed. Either index or delete requests. */ - public BulkRequest add(Iterable requests) { - for (DocWriteRequest request : requests) { + public BulkRequest add(Iterable> requests) { + for (DocWriteRequest request : requests) { add(request); } return this; @@ -229,7 +229,7 @@ private void addPayload(Object payload) { /** * The list of requests in this bulk request. */ - public List requests() { + public List> requests() { return this.requests; } @@ -550,7 +550,7 @@ public ActionRequestValidationException validate() { if (requests.isEmpty()) { validationException = addValidationError("no requests added", validationException); } - for (DocWriteRequest request : requests) { + for (DocWriteRequest request : requests) { // We first check if refresh has been set if (((WriteRequest) request).getRefreshPolicy() != RefreshPolicy.NONE) { validationException = addValidationError( @@ -585,7 +585,7 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); waitForActiveShards.writeTo(out); out.writeVInt(requests.size()); - for (DocWriteRequest request : requests) { + for (DocWriteRequest request : requests) { DocWriteRequest.writeDocumentRequest(out, request); } refreshPolicy.writeTo(out); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index d7575e60f5656..f59d66ff46953 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -525,7 +525,7 @@ private long relativeTime() { void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListener listener) { long ingestStartTimeInNanos = System.nanoTime(); BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(original); - ingestService.getPipelineExecutionService().executeBulkRequest(() -> bulkRequestModifier, (indexRequest, exception) -> { + ingestService.executeBulkRequest(() -> bulkRequestModifier, (indexRequest, exception) -> { logger.debug(() -> new ParameterizedMessage("failed to execute pipeline [{}] for document [{}/{}/{}]", indexRequest.getPipeline(), indexRequest.index(), indexRequest.type(), indexRequest.id()), exception); bulkRequestModifier.markCurrentItemAsFailed(exception); @@ -549,7 +549,7 @@ void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListen }); } - static final class BulkRequestModifier implements Iterator { + static final class BulkRequestModifier implements Iterator> { final BulkRequest bulkRequest; final SparseFixedBitSet failedSlots; @@ -584,7 +584,7 @@ BulkRequest getBulkRequest() { modifiedBulkRequest.timeout(bulkRequest.timeout()); int slot = 0; - List requests = bulkRequest.requests(); + List> requests = bulkRequest.requests(); originalSlots = new int[requests.size()]; // oversize, but that's ok for (int i = 0; i < requests.size(); i++) { DocWriteRequest request = requests.get(i); diff --git a/server/src/main/java/org/elasticsearch/action/ingest/DeletePipelineTransportAction.java b/server/src/main/java/org/elasticsearch/action/ingest/DeletePipelineTransportAction.java index d3cd052ecad1e..5667b6ee9b00f 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/DeletePipelineTransportAction.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/DeletePipelineTransportAction.java @@ -30,23 +30,23 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.ingest.PipelineStore; -import org.elasticsearch.node.NodeService; +import org.elasticsearch.ingest.IngestService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; public class DeletePipelineTransportAction extends TransportMasterNodeAction { - private final PipelineStore pipelineStore; + private final IngestService ingestService; private final ClusterService clusterService; @Inject public DeletePipelineTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, NodeService nodeService) { - super(settings, DeletePipelineAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, DeletePipelineRequest::new); + IndexNameExpressionResolver indexNameExpressionResolver, IngestService ingestService) { + super(settings, DeletePipelineAction.NAME, transportService, clusterService, threadPool, + actionFilters, indexNameExpressionResolver, DeletePipelineRequest::new); this.clusterService = clusterService; - this.pipelineStore = nodeService.getIngestService().getPipelineStore(); + this.ingestService = ingestService; } @Override @@ -60,8 +60,9 @@ protected AcknowledgedResponse newResponse() { } @Override - protected void masterOperation(DeletePipelineRequest request, ClusterState state, ActionListener listener) throws Exception { - pipelineStore.delete(clusterService, request, listener); + protected void masterOperation(DeletePipelineRequest request, ClusterState state, ActionListener listener) + throws Exception { + ingestService.delete(request, listener); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineTransportAction.java b/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineTransportAction.java index f64b36d47aedb..b6a1ee9c86ebf 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineTransportAction.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/GetPipelineTransportAction.java @@ -29,21 +29,18 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.ingest.PipelineStore; -import org.elasticsearch.node.NodeService; +import org.elasticsearch.ingest.IngestService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; public class GetPipelineTransportAction extends TransportMasterNodeReadAction { - private final PipelineStore pipelineStore; - @Inject public GetPipelineTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, NodeService nodeService) { - super(settings, GetPipelineAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, GetPipelineRequest::new); - this.pipelineStore = nodeService.getIngestService().getPipelineStore(); + IndexNameExpressionResolver indexNameExpressionResolver) { + super(settings, GetPipelineAction.NAME, transportService, clusterService, threadPool, actionFilters, + indexNameExpressionResolver, GetPipelineRequest::new); } @Override @@ -58,7 +55,7 @@ protected GetPipelineResponse newResponse() { @Override protected void masterOperation(GetPipelineRequest request, ClusterState state, ActionListener listener) throws Exception { - listener.onResponse(new GetPipelineResponse(pipelineStore.getPipelines(state, request.getIds()))); + listener.onResponse(new GetPipelineResponse(IngestService.getPipelines(state, request.getIds()))); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java b/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java index ea20a8e5b80b1..58cb301dd5fb9 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java @@ -35,9 +35,8 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.ingest.PipelineStore; +import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.IngestInfo; -import org.elasticsearch.node.NodeService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -46,19 +45,19 @@ public class PutPipelineTransportAction extends TransportMasterNodeAction { - private final PipelineStore pipelineStore; + private final IngestService ingestService; private final ClusterService clusterService; private final TransportNodesInfoAction nodesInfoAction; @Inject public PutPipelineTransportAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver, NodeService nodeService, + IndexNameExpressionResolver indexNameExpressionResolver, IngestService ingestService, TransportNodesInfoAction nodesInfoAction) { super(settings, PutPipelineAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, PutPipelineRequest::new); this.clusterService = clusterService; this.nodesInfoAction = nodesInfoAction; - this.pipelineStore = nodeService.getIngestService().getPipelineStore(); + this.ingestService = ingestService; } @Override @@ -84,7 +83,7 @@ public void onResponse(NodesInfoResponse nodeInfos) { for (NodeInfo nodeInfo : nodeInfos.getNodes()) { ingestInfos.put(nodeInfo.getNode(), nodeInfo.getIngest()); } - pipelineStore.put(clusterService, ingestInfos, request, listener); + ingestService.putPipeline(ingestInfos, request, listener); } catch (Exception e) { onFailure(e); } diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java index 53a7ec1d1f7c9..d5dfd78d1c74e 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java @@ -32,8 +32,8 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.Pipeline; -import org.elasticsearch.ingest.PipelineStore; import java.io.IOException; import java.util.ArrayList; @@ -160,14 +160,13 @@ public boolean isVerbose() { } } - private static final Pipeline.Factory PIPELINE_FACTORY = new Pipeline.Factory(); static final String SIMULATED_PIPELINE_ID = "_simulate_pipeline"; - static Parsed parseWithPipelineId(String pipelineId, Map config, boolean verbose, PipelineStore pipelineStore) { + static Parsed parseWithPipelineId(String pipelineId, Map config, boolean verbose, IngestService ingestService) { if (pipelineId == null) { throw new IllegalArgumentException("param [pipeline] is null"); } - Pipeline pipeline = pipelineStore.get(pipelineId); + Pipeline pipeline = ingestService.getPipeline(pipelineId); if (pipeline == null) { throw new IllegalArgumentException("pipeline [" + pipelineId + "] does not exist"); } @@ -175,9 +174,9 @@ static Parsed parseWithPipelineId(String pipelineId, Map config, return new Parsed(pipeline, ingestDocumentList, verbose); } - static Parsed parse(Map config, boolean verbose, PipelineStore pipelineStore) throws Exception { + static Parsed parse(Map config, boolean verbose, IngestService ingestService) throws Exception { Map pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE); - Pipeline pipeline = PIPELINE_FACTORY.create(SIMULATED_PIPELINE_ID, pipelineConfig, pipelineStore.getProcessorFactories()); + Pipeline pipeline = Pipeline.create(SIMULATED_PIPELINE_ID, pipelineConfig, ingestService.getProcessorFactories()); List ingestDocumentList = parseDocs(config); return new Parsed(pipeline, ingestDocumentList, verbose); } diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java index 3f67007df690d..ff7d247715473 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java @@ -26,7 +26,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.ingest.PipelineStore; +import org.elasticsearch.ingest.IngestService; import org.elasticsearch.node.NodeService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -35,13 +35,13 @@ public class SimulatePipelineTransportAction extends HandledTransportAction { - private final PipelineStore pipelineStore; + private final IngestService ingestService; private final SimulateExecutionService executionService; @Inject public SimulatePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, NodeService nodeService) { super(settings, SimulatePipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SimulatePipelineRequest::new); - this.pipelineStore = nodeService.getIngestService().getPipelineStore(); + this.ingestService = nodeService.getIngestService(); this.executionService = new SimulateExecutionService(threadPool); } @@ -52,9 +52,9 @@ protected void doExecute(SimulatePipelineRequest request, ActionListener processorFactories; + // Ideally this should be in IngestMetadata class, but we don't have the processor factories around there. + // We know of all the processor factories when a node with all its plugin have been initialized. Also some + // processor factories rely on other node services. Custom metadata is statically registered when classes + // are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around. + private volatile Map pipelines = new HashMap<>(); + private final ThreadPool threadPool; + private final StatsHolder totalStats = new StatsHolder(); + private volatile Map statsHolderPerPipeline = Collections.emptyMap(); - public IngestService(Settings settings, ThreadPool threadPool, + public IngestService(ClusterService clusterService, ThreadPool threadPool, Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, List ingestPlugins) { - BiFunction> scheduler = - (delay, command) -> threadPool.schedule(TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC, command); - Processor.Parameters parameters = new Processor.Parameters(env, scriptService, analysisRegistry, - threadPool.getThreadContext(), threadPool::relativeTimeInMillis, scheduler); + this.clusterService = clusterService; + this.processorFactories = processorFactories( + ingestPlugins, + new Processor.Parameters( + env, scriptService, analysisRegistry, + threadPool.getThreadContext(), threadPool::relativeTimeInMillis, + (delay, command) -> threadPool.schedule( + TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC, command + ) + ) + ); + this.threadPool = threadPool; + } + + private static Map processorFactories(List ingestPlugins, + Processor.Parameters parameters) { Map processorFactories = new HashMap<>(); for (IngestPlugin ingestPlugin : ingestPlugins) { Map newProcessors = ingestPlugin.getProcessors(parameters); @@ -61,24 +110,385 @@ public IngestService(Settings settings, ThreadPool threadPool, } } } - this.pipelineStore = new PipelineStore(settings, Collections.unmodifiableMap(processorFactories)); - this.pipelineExecutionService = new PipelineExecutionService(pipelineStore, threadPool); + return Collections.unmodifiableMap(processorFactories); + } + + public ClusterService getClusterService() { + return clusterService; + } + + /** + * Deletes the pipeline specified by id in the request. + */ + public void delete(DeletePipelineRequest request, ActionListener listener) { + clusterService.submitStateUpdateTask("delete-pipeline-" + request.getId(), + new AckedClusterStateUpdateTask(request, listener) { + + @Override + protected AcknowledgedResponse newResponse(boolean acknowledged) { + return new AcknowledgedResponse(acknowledged); + } + + @Override + public ClusterState execute(ClusterState currentState) { + return innerDelete(request, currentState); + } + }); + } + + static ClusterState innerDelete(DeletePipelineRequest request, ClusterState currentState) { + IngestMetadata currentIngestMetadata = currentState.metaData().custom(IngestMetadata.TYPE); + if (currentIngestMetadata == null) { + return currentState; + } + Map pipelines = currentIngestMetadata.getPipelines(); + Set toRemove = new HashSet<>(); + for (String pipelineKey : pipelines.keySet()) { + if (Regex.simpleMatch(request.getId(), pipelineKey)) { + toRemove.add(pipelineKey); + } + } + if (toRemove.isEmpty() && Regex.isMatchAllPattern(request.getId()) == false) { + throw new ResourceNotFoundException("pipeline [{}] is missing", request.getId()); + } else if (toRemove.isEmpty()) { + return currentState; + } + final Map pipelinesCopy = new HashMap<>(pipelines); + for (String key : toRemove) { + pipelinesCopy.remove(key); + } + ClusterState.Builder newState = ClusterState.builder(currentState); + newState.metaData(MetaData.builder(currentState.getMetaData()) + .putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelinesCopy)) + .build()); + return newState.build(); + } + + /** + * @return pipeline configuration specified by id. If multiple ids or wildcards are specified multiple pipelines + * may be returned + */ + // Returning PipelineConfiguration instead of Pipeline, because Pipeline and Processor interface don't + // know how to serialize themselves. + public static List getPipelines(ClusterState clusterState, String... ids) { + IngestMetadata ingestMetadata = clusterState.getMetaData().custom(IngestMetadata.TYPE); + return innerGetPipelines(ingestMetadata, ids); + } + + static List innerGetPipelines(IngestMetadata ingestMetadata, String... ids) { + if (ingestMetadata == null) { + return Collections.emptyList(); + } + + // if we didn't ask for _any_ ID, then we get them all (this is the same as if they ask for '*') + if (ids.length == 0) { + return new ArrayList<>(ingestMetadata.getPipelines().values()); + } + + List result = new ArrayList<>(ids.length); + for (String id : ids) { + if (Regex.isSimpleMatchPattern(id)) { + for (Map.Entry entry : ingestMetadata.getPipelines().entrySet()) { + if (Regex.simpleMatch(id, entry.getKey())) { + result.add(entry.getValue()); + } + } + } else { + PipelineConfiguration pipeline = ingestMetadata.getPipelines().get(id); + if (pipeline != null) { + result.add(pipeline); + } + } + } + return result; } - public PipelineStore getPipelineStore() { - return pipelineStore; + /** + * Stores the specified pipeline definition in the request. + */ + public void putPipeline(Map ingestInfos, PutPipelineRequest request, + ActionListener listener) throws Exception { + // validates the pipeline and processor configuration before submitting a cluster update task: + validatePipeline(ingestInfos, request); + clusterService.submitStateUpdateTask("put-pipeline-" + request.getId(), + new AckedClusterStateUpdateTask(request, listener) { + + @Override + protected AcknowledgedResponse newResponse(boolean acknowledged) { + return new AcknowledgedResponse(acknowledged); + } + + @Override + public ClusterState execute(ClusterState currentState) { + return innerPut(request, currentState); + } + }); } - public PipelineExecutionService getPipelineExecutionService() { - return pipelineExecutionService; + /** + * Returns the pipeline by the specified id + */ + public Pipeline getPipeline(String id) { + return pipelines.get(id); + } + + public Map getProcessorFactories() { + return processorFactories; } public IngestInfo info() { - Map processorFactories = pipelineStore.getProcessorFactories(); + Map processorFactories = getProcessorFactories(); List processorInfoList = new ArrayList<>(processorFactories.size()); for (Map.Entry entry : processorFactories.entrySet()) { processorInfoList.add(new ProcessorInfo(entry.getKey())); } return new IngestInfo(processorInfoList); } + + Map pipelines() { + return pipelines; + } + + @Override + public void applyClusterState(final ClusterChangedEvent event) { + ClusterState state = event.state(); + innerUpdatePipelines(event.previousState(), state); + IngestMetadata ingestMetadata = state.getMetaData().custom(IngestMetadata.TYPE); + if (ingestMetadata != null) { + updatePipelineStats(ingestMetadata); + } + } + + private static Pipeline substitutePipeline(String id, ElasticsearchParseException e) { + String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null; + String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown"; + String errorMessage = "pipeline with id [" + id + "] could not be loaded, caused by [" + e.getDetailedMessage() + "]"; + Processor failureProcessor = new AbstractProcessor(tag) { + @Override + public void execute(IngestDocument ingestDocument) { + throw new IllegalStateException(errorMessage); + } + + @Override + public String getType() { + return type; + } + }; + String description = "this is a place holder pipeline, because pipeline with id [" + id + "] could not be loaded"; + return new Pipeline(id, description, null, new CompoundProcessor(failureProcessor)); + } + + static ClusterState innerPut(PutPipelineRequest request, ClusterState currentState) { + IngestMetadata currentIngestMetadata = currentState.metaData().custom(IngestMetadata.TYPE); + Map pipelines; + if (currentIngestMetadata != null) { + pipelines = new HashMap<>(currentIngestMetadata.getPipelines()); + } else { + pipelines = new HashMap<>(); + } + + pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), request.getSource(), request.getXContentType())); + ClusterState.Builder newState = ClusterState.builder(currentState); + newState.metaData(MetaData.builder(currentState.getMetaData()) + .putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines)) + .build()); + return newState.build(); + } + + void validatePipeline(Map ingestInfos, PutPipelineRequest request) throws Exception { + if (ingestInfos.isEmpty()) { + throw new IllegalStateException("Ingest info is empty"); + } + + Map pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); + Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories); + List exceptions = new ArrayList<>(); + for (Processor processor : pipeline.flattenAllProcessors()) { + for (Map.Entry entry : ingestInfos.entrySet()) { + if (entry.getValue().containsProcessor(processor.getType()) == false) { + String message = "Processor type [" + processor.getType() + "] is not installed on node [" + entry.getKey() + "]"; + exceptions.add( + ConfigurationUtils.newConfigurationException(processor.getType(), processor.getTag(), null, message) + ); + } + } + } + ExceptionsHelper.rethrowAndSuppress(exceptions); + } + + public void executeBulkRequest(Iterable> actionRequests, + BiConsumer itemFailureHandler, Consumer completionHandler) { + threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() { + + @Override + public void onFailure(Exception e) { + completionHandler.accept(e); + } + + @Override + protected void doRun() { + for (DocWriteRequest actionRequest : actionRequests) { + IndexRequest indexRequest = null; + if (actionRequest instanceof IndexRequest) { + indexRequest = (IndexRequest) actionRequest; + } else if (actionRequest instanceof UpdateRequest) { + UpdateRequest updateRequest = (UpdateRequest) actionRequest; + indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest(); + } + if (indexRequest == null) { + continue; + } + String pipelineId = indexRequest.getPipeline(); + if (NOOP_PIPELINE_NAME.equals(pipelineId) == false) { + try { + Pipeline pipeline = pipelines.get(pipelineId); + if (pipeline == null) { + throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); + } + innerExecute(indexRequest, pipeline); + //this shouldn't be needed here but we do it for consistency with index api + // which requires it to prevent double execution + indexRequest.setPipeline(NOOP_PIPELINE_NAME); + } catch (Exception e) { + itemFailureHandler.accept(indexRequest, e); + } + } + } + completionHandler.accept(null); + } + }); + } + + public IngestStats stats() { + Map statsHolderPerPipeline = this.statsHolderPerPipeline; + + Map statsPerPipeline = new HashMap<>(statsHolderPerPipeline.size()); + for (Map.Entry entry : statsHolderPerPipeline.entrySet()) { + statsPerPipeline.put(entry.getKey(), entry.getValue().createStats()); + } + + return new IngestStats(totalStats.createStats(), statsPerPipeline); + } + + void updatePipelineStats(IngestMetadata ingestMetadata) { + boolean changed = false; + Map newStatsPerPipeline = new HashMap<>(statsHolderPerPipeline); + Iterator iterator = newStatsPerPipeline.keySet().iterator(); + while (iterator.hasNext()) { + String pipeline = iterator.next(); + if (ingestMetadata.getPipelines().containsKey(pipeline) == false) { + iterator.remove(); + changed = true; + } + } + for (String pipeline : ingestMetadata.getPipelines().keySet()) { + if (newStatsPerPipeline.containsKey(pipeline) == false) { + newStatsPerPipeline.put(pipeline, new StatsHolder()); + changed = true; + } + } + + if (changed) { + statsHolderPerPipeline = Collections.unmodifiableMap(newStatsPerPipeline); + } + } + + private void innerExecute(IndexRequest indexRequest, Pipeline pipeline) throws Exception { + if (pipeline.getProcessors().isEmpty()) { + return; + } + + long startTimeInNanos = System.nanoTime(); + // the pipeline specific stat holder may not exist and that is fine: + // (e.g. the pipeline may have been removed while we're ingesting a document + Optional pipelineStats = Optional.ofNullable(statsHolderPerPipeline.get(pipeline.getId())); + try { + totalStats.preIngest(); + pipelineStats.ifPresent(StatsHolder::preIngest); + String index = indexRequest.index(); + String type = indexRequest.type(); + String id = indexRequest.id(); + String routing = indexRequest.routing(); + Long version = indexRequest.version(); + VersionType versionType = indexRequest.versionType(); + Map sourceAsMap = indexRequest.sourceAsMap(); + IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, null, version, versionType, sourceAsMap); + pipeline.execute(ingestDocument); + + Map metadataMap = ingestDocument.extractMetadata(); + //it's fine to set all metadata fields all the time, as ingest document holds their starting values + //before ingestion, which might also get modified during ingestion. + indexRequest.index((String) metadataMap.get(IngestDocument.MetaData.INDEX)); + indexRequest.type((String) metadataMap.get(IngestDocument.MetaData.TYPE)); + indexRequest.id((String) metadataMap.get(IngestDocument.MetaData.ID)); + indexRequest.routing((String) metadataMap.get(IngestDocument.MetaData.ROUTING)); + indexRequest.version(((Number) metadataMap.get(IngestDocument.MetaData.VERSION)).longValue()); + if (metadataMap.get(IngestDocument.MetaData.VERSION_TYPE) != null) { + indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.MetaData.VERSION_TYPE))); + } + indexRequest.source(ingestDocument.getSourceAndMetadata()); + } catch (Exception e) { + totalStats.ingestFailed(); + pipelineStats.ifPresent(StatsHolder::ingestFailed); + throw e; + } finally { + long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos); + totalStats.postIngest(ingestTimeInMillis); + pipelineStats.ifPresent(statsHolder -> statsHolder.postIngest(ingestTimeInMillis)); + } + } + + private void innerUpdatePipelines(ClusterState previousState, ClusterState state) { + if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { + return; + } + + IngestMetadata ingestMetadata = state.getMetaData().custom(IngestMetadata.TYPE); + IngestMetadata previousIngestMetadata = previousState.getMetaData().custom(IngestMetadata.TYPE); + if (Objects.equals(ingestMetadata, previousIngestMetadata)) { + return; + } + + Map pipelines = new HashMap<>(); + List exceptions = new ArrayList<>(); + for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) { + try { + pipelines.put(pipeline.getId(), Pipeline.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories)); + } catch (ElasticsearchParseException e) { + pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), e)); + exceptions.add(e); + } catch (Exception e) { + ElasticsearchParseException parseException = new ElasticsearchParseException( + "Error updating pipeline with id [" + pipeline.getId() + "]", e); + pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), parseException)); + exceptions.add(parseException); + } + } + this.pipelines = Collections.unmodifiableMap(pipelines); + ExceptionsHelper.rethrowAndSuppress(exceptions); + } + + private static class StatsHolder { + + private final MeanMetric ingestMetric = new MeanMetric(); + private final CounterMetric ingestCurrent = new CounterMetric(); + private final CounterMetric ingestFailed = new CounterMetric(); + + void preIngest() { + ingestCurrent.inc(); + } + + void postIngest(long ingestTimeInMillis) { + ingestCurrent.dec(); + ingestMetric.inc(ingestTimeInMillis); + } + + void ingestFailed() { + ingestFailed.inc(); + } + + IngestStats.Stats createStats() { + return new IngestStats.Stats(ingestMetric.count(), ingestMetric.sum(), ingestCurrent.count(), ingestFailed.count()); + } + } } diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java index ace9948f9254f..6a6a93c5efc16 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -51,6 +51,28 @@ public Pipeline(String id, @Nullable String description, @Nullable Integer versi this.version = version; } + public static Pipeline create(String id, Map config, Map processorFactories) + throws Exception { + String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY); + Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null); + List>> processorConfigs = + ConfigurationUtils.readList(null, null, config, PROCESSORS_KEY); + List processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, processorFactories); + List>> onFailureProcessorConfigs = + ConfigurationUtils.readOptionalList(null, null, config, ON_FAILURE_KEY); + List onFailureProcessors = ConfigurationUtils.readProcessorConfigs(onFailureProcessorConfigs, processorFactories); + if (config.isEmpty() == false) { + throw new ElasticsearchParseException("pipeline [" + id + + "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray())); + } + if (onFailureProcessorConfigs != null && onFailureProcessors.isEmpty()) { + throw new ElasticsearchParseException("pipeline [" + id + "] cannot have an empty on_failure option defined"); + } + CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.unmodifiableList(processors), + Collections.unmodifiableList(onFailureProcessors)); + return new Pipeline(id, description, version, compoundProcessor); + } + /** * Modifies the data of a document to be indexed based on the processor this pipeline holds */ @@ -112,28 +134,4 @@ public List getOnFailureProcessors() { public List flattenAllProcessors() { return compoundProcessor.flattenProcessors(); } - - public static final class Factory { - - public Pipeline create(String id, Map config, Map processorFactories) throws Exception { - String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY); - Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null); - List>> processorConfigs = ConfigurationUtils.readList(null, null, config, PROCESSORS_KEY); - List processors = ConfigurationUtils.readProcessorConfigs(processorConfigs, processorFactories); - List>> onFailureProcessorConfigs = - ConfigurationUtils.readOptionalList(null, null, config, ON_FAILURE_KEY); - List onFailureProcessors = ConfigurationUtils.readProcessorConfigs(onFailureProcessorConfigs, processorFactories); - if (config.isEmpty() == false) { - throw new ElasticsearchParseException("pipeline [" + id + - "] doesn't support one or more provided configuration parameters " + Arrays.toString(config.keySet().toArray())); - } - if (onFailureProcessorConfigs != null && onFailureProcessors.isEmpty()) { - throw new ElasticsearchParseException("pipeline [" + id + "] cannot have an empty on_failure option defined"); - } - CompoundProcessor compoundProcessor = new CompoundProcessor(false, Collections.unmodifiableList(processors), - Collections.unmodifiableList(onFailureProcessors)); - return new Pipeline(id, description, version, compoundProcessor); - } - - } } diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java b/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java deleted file mode 100644 index d0ba8399f7178..0000000000000 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineExecutionService.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.ingest; - -import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterStateApplier; -import org.elasticsearch.common.metrics.CounterMetric; -import org.elasticsearch.common.metrics.MeanMetric; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.index.VersionType; -import org.elasticsearch.threadpool.ThreadPool; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.TimeUnit; -import java.util.function.BiConsumer; -import java.util.function.Consumer; - -public class PipelineExecutionService implements ClusterStateApplier { - - private final PipelineStore store; - private final ThreadPool threadPool; - - private final StatsHolder totalStats = new StatsHolder(); - private volatile Map statsHolderPerPipeline = Collections.emptyMap(); - - public PipelineExecutionService(PipelineStore store, ThreadPool threadPool) { - this.store = store; - this.threadPool = threadPool; - } - - public void executeBulkRequest(Iterable actionRequests, - BiConsumer itemFailureHandler, - Consumer completionHandler) { - threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() { - - @Override - public void onFailure(Exception e) { - completionHandler.accept(e); - } - - @Override - protected void doRun() throws Exception { - for (DocWriteRequest actionRequest : actionRequests) { - IndexRequest indexRequest = null; - if (actionRequest instanceof IndexRequest) { - indexRequest = (IndexRequest) actionRequest; - } else if (actionRequest instanceof UpdateRequest) { - UpdateRequest updateRequest = (UpdateRequest) actionRequest; - indexRequest = updateRequest.docAsUpsert() ? updateRequest.doc() : updateRequest.upsertRequest(); - } - if (indexRequest == null) { - continue; - } - String pipeline = indexRequest.getPipeline(); - if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) { - try { - innerExecute(indexRequest, getPipeline(indexRequest.getPipeline())); - //this shouldn't be needed here but we do it for consistency with index api - // which requires it to prevent double execution - indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME); - } catch (Exception e) { - itemFailureHandler.accept(indexRequest, e); - } - } - } - completionHandler.accept(null); - } - }); - } - - public IngestStats stats() { - Map statsHolderPerPipeline = this.statsHolderPerPipeline; - - Map statsPerPipeline = new HashMap<>(statsHolderPerPipeline.size()); - for (Map.Entry entry : statsHolderPerPipeline.entrySet()) { - statsPerPipeline.put(entry.getKey(), entry.getValue().createStats()); - } - - return new IngestStats(totalStats.createStats(), statsPerPipeline); - } - - @Override - public void applyClusterState(ClusterChangedEvent event) { - IngestMetadata ingestMetadata = event.state().getMetaData().custom(IngestMetadata.TYPE); - if (ingestMetadata != null) { - updatePipelineStats(ingestMetadata); - } - } - - void updatePipelineStats(IngestMetadata ingestMetadata) { - boolean changed = false; - Map newStatsPerPipeline = new HashMap<>(statsHolderPerPipeline); - Iterator iterator = newStatsPerPipeline.keySet().iterator(); - while (iterator.hasNext()) { - String pipeline = iterator.next(); - if (ingestMetadata.getPipelines().containsKey(pipeline) == false) { - iterator.remove(); - changed = true; - } - } - for (String pipeline : ingestMetadata.getPipelines().keySet()) { - if (newStatsPerPipeline.containsKey(pipeline) == false) { - newStatsPerPipeline.put(pipeline, new StatsHolder()); - changed = true; - } - } - - if (changed) { - statsHolderPerPipeline = Collections.unmodifiableMap(newStatsPerPipeline); - } - } - - private void innerExecute(IndexRequest indexRequest, Pipeline pipeline) throws Exception { - if (pipeline.getProcessors().isEmpty()) { - return; - } - - long startTimeInNanos = System.nanoTime(); - // the pipeline specific stat holder may not exist and that is fine: - // (e.g. the pipeline may have been removed while we're ingesting a document - Optional pipelineStats = Optional.ofNullable(statsHolderPerPipeline.get(pipeline.getId())); - try { - totalStats.preIngest(); - pipelineStats.ifPresent(StatsHolder::preIngest); - String index = indexRequest.index(); - String type = indexRequest.type(); - String id = indexRequest.id(); - String routing = indexRequest.routing(); - String parent = indexRequest.parent(); - Long version = indexRequest.version(); - VersionType versionType = indexRequest.versionType(); - Map sourceAsMap = indexRequest.sourceAsMap(); - IngestDocument ingestDocument = new IngestDocument(index, type, id, routing, parent, version, versionType, sourceAsMap); - pipeline.execute(ingestDocument); - - Map metadataMap = ingestDocument.extractMetadata(); - //it's fine to set all metadata fields all the time, as ingest document holds their starting values - //before ingestion, which might also get modified during ingestion. - indexRequest.index((String) metadataMap.get(IngestDocument.MetaData.INDEX)); - indexRequest.type((String) metadataMap.get(IngestDocument.MetaData.TYPE)); - indexRequest.id((String) metadataMap.get(IngestDocument.MetaData.ID)); - indexRequest.routing((String) metadataMap.get(IngestDocument.MetaData.ROUTING)); - indexRequest.parent((String) metadataMap.get(IngestDocument.MetaData.PARENT)); - indexRequest.version(((Number) metadataMap.get(IngestDocument.MetaData.VERSION)).longValue()); - if (metadataMap.get(IngestDocument.MetaData.VERSION_TYPE) != null) { - indexRequest.versionType(VersionType.fromString((String) metadataMap.get(IngestDocument.MetaData.VERSION_TYPE))); - } - indexRequest.source(ingestDocument.getSourceAndMetadata()); - } catch (Exception e) { - totalStats.ingestFailed(); - pipelineStats.ifPresent(StatsHolder::ingestFailed); - throw e; - } finally { - long ingestTimeInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeInNanos); - totalStats.postIngest(ingestTimeInMillis); - pipelineStats.ifPresent(statsHolder -> statsHolder.postIngest(ingestTimeInMillis)); - } - } - - private Pipeline getPipeline(String pipelineId) { - Pipeline pipeline = store.get(pipelineId); - if (pipeline == null) { - throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); - } - return pipeline; - } - - static class StatsHolder { - - private final MeanMetric ingestMetric = new MeanMetric(); - private final CounterMetric ingestCurrent = new CounterMetric(); - private final CounterMetric ingestFailed = new CounterMetric(); - - void preIngest() { - ingestCurrent.inc(); - } - - void postIngest(long ingestTimeInMillis) { - ingestCurrent.dec(); - ingestMetric.inc(ingestTimeInMillis); - } - - void ingestFailed() { - ingestFailed.inc(); - } - - IngestStats.Stats createStats() { - return new IngestStats.Stats(ingestMetric.count(), ingestMetric.sum(), ingestCurrent.count(), ingestFailed.count()); - } - - } - -} diff --git a/server/src/main/java/org/elasticsearch/ingest/PipelineStore.java b/server/src/main/java/org/elasticsearch/ingest/PipelineStore.java deleted file mode 100644 index 9fceaf1a9a573..0000000000000 --- a/server/src/main/java/org/elasticsearch/ingest/PipelineStore.java +++ /dev/null @@ -1,275 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.ingest; - -import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.ResourceNotFoundException; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ingest.DeletePipelineRequest; -import org.elasticsearch.action.ingest.PutPipelineRequest; -import org.elasticsearch.action.support.master.AcknowledgedResponse; -import org.elasticsearch.cluster.AckedClusterStateUpdateTask; -import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateApplier; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.regex.Regex; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.gateway.GatewayService; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; - -public class PipelineStore extends AbstractComponent implements ClusterStateApplier { - - private final Pipeline.Factory factory = new Pipeline.Factory(); - private final Map processorFactories; - - // Ideally this should be in IngestMetadata class, but we don't have the processor factories around there. - // We know of all the processor factories when a node with all its plugin have been initialized. Also some - // processor factories rely on other node services. Custom metadata is statically registered when classes - // are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around. - volatile Map pipelines = new HashMap<>(); - - public PipelineStore(Settings settings, Map processorFactories) { - super(settings); - this.processorFactories = processorFactories; - } - - @Override - public void applyClusterState(ClusterChangedEvent event) { - innerUpdatePipelines(event.previousState(), event.state()); - } - - void innerUpdatePipelines(ClusterState previousState, ClusterState state) { - if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { - return; - } - - IngestMetadata ingestMetadata = state.getMetaData().custom(IngestMetadata.TYPE); - IngestMetadata previousIngestMetadata = previousState.getMetaData().custom(IngestMetadata.TYPE); - if (Objects.equals(ingestMetadata, previousIngestMetadata)) { - return; - } - - Map pipelines = new HashMap<>(); - List exceptions = new ArrayList<>(); - for (PipelineConfiguration pipeline : ingestMetadata.getPipelines().values()) { - try { - pipelines.put(pipeline.getId(), factory.create(pipeline.getId(), pipeline.getConfigAsMap(), processorFactories)); - } catch (ElasticsearchParseException e) { - pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), e)); - exceptions.add(e); - } catch (Exception e) { - ElasticsearchParseException parseException = new ElasticsearchParseException( - "Error updating pipeline with id [" + pipeline.getId() + "]", e); - pipelines.put(pipeline.getId(), substitutePipeline(pipeline.getId(), parseException)); - exceptions.add(parseException); - } - } - this.pipelines = Collections.unmodifiableMap(pipelines); - ExceptionsHelper.rethrowAndSuppress(exceptions); - } - - private Pipeline substitutePipeline(String id, ElasticsearchParseException e) { - String tag = e.getHeaderKeys().contains("processor_tag") ? e.getHeader("processor_tag").get(0) : null; - String type = e.getHeaderKeys().contains("processor_type") ? e.getHeader("processor_type").get(0) : "unknown"; - String errorMessage = "pipeline with id [" + id + "] could not be loaded, caused by [" + e.getDetailedMessage() + "]"; - Processor failureProcessor = new AbstractProcessor(tag) { - @Override - public void execute(IngestDocument ingestDocument) { - throw new IllegalStateException(errorMessage); - } - - @Override - public String getType() { - return type; - } - }; - String description = "this is a place holder pipeline, because pipeline with id [" + id + "] could not be loaded"; - return new Pipeline(id, description, null, new CompoundProcessor(failureProcessor)); - } - - /** - * Deletes the pipeline specified by id in the request. - */ - public void delete(ClusterService clusterService, DeletePipelineRequest request, ActionListener listener) { - clusterService.submitStateUpdateTask("delete-pipeline-" + request.getId(), - new AckedClusterStateUpdateTask(request, listener) { - - @Override - protected AcknowledgedResponse newResponse(boolean acknowledged) { - return new AcknowledgedResponse(acknowledged); - } - - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - return innerDelete(request, currentState); - } - }); - } - - ClusterState innerDelete(DeletePipelineRequest request, ClusterState currentState) { - IngestMetadata currentIngestMetadata = currentState.metaData().custom(IngestMetadata.TYPE); - if (currentIngestMetadata == null) { - return currentState; - } - Map pipelines = currentIngestMetadata.getPipelines(); - Set toRemove = new HashSet<>(); - for (String pipelineKey : pipelines.keySet()) { - if (Regex.simpleMatch(request.getId(), pipelineKey)) { - toRemove.add(pipelineKey); - } - } - if (toRemove.isEmpty() && Regex.isMatchAllPattern(request.getId()) == false) { - throw new ResourceNotFoundException("pipeline [{}] is missing", request.getId()); - } else if (toRemove.isEmpty()) { - return currentState; - } - final Map pipelinesCopy = new HashMap<>(pipelines); - for (String key : toRemove) { - pipelinesCopy.remove(key); - } - ClusterState.Builder newState = ClusterState.builder(currentState); - newState.metaData(MetaData.builder(currentState.getMetaData()) - .putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelinesCopy)) - .build()); - return newState.build(); - } - - /** - * Stores the specified pipeline definition in the request. - */ - public void put(ClusterService clusterService, Map ingestInfos, PutPipelineRequest request, - ActionListener listener) throws Exception { - // validates the pipeline and processor configuration before submitting a cluster update task: - validatePipeline(ingestInfos, request); - clusterService.submitStateUpdateTask("put-pipeline-" + request.getId(), - new AckedClusterStateUpdateTask(request, listener) { - - @Override - protected AcknowledgedResponse newResponse(boolean acknowledged) { - return new AcknowledgedResponse(acknowledged); - } - - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - return innerPut(request, currentState); - } - }); - } - - void validatePipeline(Map ingestInfos, PutPipelineRequest request) throws Exception { - if (ingestInfos.isEmpty()) { - throw new IllegalStateException("Ingest info is empty"); - } - - Map pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2(); - Pipeline pipeline = factory.create(request.getId(), pipelineConfig, processorFactories); - List exceptions = new ArrayList<>(); - for (Processor processor : pipeline.flattenAllProcessors()) { - for (Map.Entry entry : ingestInfos.entrySet()) { - if (entry.getValue().containsProcessor(processor.getType()) == false) { - String message = "Processor type [" + processor.getType() + "] is not installed on node [" + entry.getKey() + "]"; - exceptions.add(ConfigurationUtils.newConfigurationException(processor.getType(), processor.getTag(), null, message)); - } - } - } - ExceptionsHelper.rethrowAndSuppress(exceptions); - } - - ClusterState innerPut(PutPipelineRequest request, ClusterState currentState) { - IngestMetadata currentIngestMetadata = currentState.metaData().custom(IngestMetadata.TYPE); - Map pipelines; - if (currentIngestMetadata != null) { - pipelines = new HashMap<>(currentIngestMetadata.getPipelines()); - } else { - pipelines = new HashMap<>(); - } - - pipelines.put(request.getId(), new PipelineConfiguration(request.getId(), request.getSource(), request.getXContentType())); - ClusterState.Builder newState = ClusterState.builder(currentState); - newState.metaData(MetaData.builder(currentState.getMetaData()) - .putCustom(IngestMetadata.TYPE, new IngestMetadata(pipelines)) - .build()); - return newState.build(); - } - - /** - * Returns the pipeline by the specified id - */ - public Pipeline get(String id) { - return pipelines.get(id); - } - - public Map getProcessorFactories() { - return processorFactories; - } - - /** - * @return pipeline configuration specified by id. If multiple ids or wildcards are specified multiple pipelines - * may be returned - */ - // Returning PipelineConfiguration instead of Pipeline, because Pipeline and Processor interface don't - // know how to serialize themselves. - public List getPipelines(ClusterState clusterState, String... ids) { - IngestMetadata ingestMetadata = clusterState.getMetaData().custom(IngestMetadata.TYPE); - return innerGetPipelines(ingestMetadata, ids); - } - - List innerGetPipelines(IngestMetadata ingestMetadata, String... ids) { - if (ingestMetadata == null) { - return Collections.emptyList(); - } - - // if we didn't ask for _any_ ID, then we get them all (this is the same as if they ask for '*') - if (ids.length == 0) { - return new ArrayList<>(ingestMetadata.getPipelines().values()); - } - - List result = new ArrayList<>(ids.length); - for (String id : ids) { - if (Regex.isSimpleMatchPattern(id)) { - for (Map.Entry entry : ingestMetadata.getPipelines().entrySet()) { - if (Regex.simpleMatch(id, entry.getKey())) { - result.add(entry.getValue()); - } - } - } else { - PipelineConfiguration pipeline = ingestMetadata.getPipelines().get(id); - if (pipeline != null) { - result.add(pipeline); - } - } - } - return result; - } -} diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index f8f95aed8c404..1a5d99bb5bbe5 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -355,7 +355,7 @@ protected Node(final Environment environment, Collection ClusterModule.getClusterStateCustomSuppliers(clusterPlugins)); clusterService.addStateApplier(scriptModule.getScriptService()); resourcesToClose.add(clusterService); - final IngestService ingestService = new IngestService(settings, threadPool, this.environment, + final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment, scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class)); final DiskThresholdMonitor listener = new DiskThresholdMonitor(settings, clusterService::state, clusterService.getClusterSettings(), client); @@ -489,7 +489,7 @@ protected Node(final Environment environment, Collection clusterModule.getAllocationService(), environment.configFile()); this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(), transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(), - httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService, + httpServerTransport, ingestService, settingsModule.getSettingsFilter(), responseCollectorService, searchTransportService); final SearchService searchService = newSearchService(clusterService, indicesService, diff --git a/server/src/main/java/org/elasticsearch/node/NodeService.java b/server/src/main/java/org/elasticsearch/node/NodeService.java index 0e19b5a650221..9065b45be12bd 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeService.java +++ b/server/src/main/java/org/elasticsearch/node/NodeService.java @@ -26,7 +26,6 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.search.SearchTransportService; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; @@ -37,7 +36,6 @@ import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.monitor.MonitorService; -import org.elasticsearch.node.ResponseCollectorService; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.ThreadPool; @@ -64,11 +62,11 @@ public class NodeService extends AbstractComponent implements Closeable { private final Discovery discovery; NodeService(Settings settings, ThreadPool threadPool, MonitorService monitorService, Discovery discovery, - TransportService transportService, IndicesService indicesService, PluginsService pluginService, - CircuitBreakerService circuitBreakerService, ScriptService scriptService, - @Nullable HttpServerTransport httpServerTransport, IngestService ingestService, ClusterService clusterService, - SettingsFilter settingsFilter, ResponseCollectorService responseCollectorService, - SearchTransportService searchTransportService) { + TransportService transportService, IndicesService indicesService, PluginsService pluginService, + CircuitBreakerService circuitBreakerService, ScriptService scriptService, + @Nullable HttpServerTransport httpServerTransport, IngestService ingestService, + SettingsFilter settingsFilter, ResponseCollectorService responseCollectorService, + SearchTransportService searchTransportService) { super(settings); this.threadPool = threadPool; this.monitorService = monitorService; @@ -83,8 +81,6 @@ public class NodeService extends AbstractComponent implements Closeable { this.scriptService = scriptService; this.responseCollectorService = responseCollectorService; this.searchTransportService = searchTransportService; - clusterService.addStateApplier(ingestService.getPipelineStore()); - clusterService.addStateApplier(ingestService.getPipelineExecutionService()); } public NodeInfo info(boolean settings, boolean os, boolean process, boolean jvm, boolean threadPool, @@ -120,7 +116,7 @@ public NodeStats stats(CommonStatsFlags indices, boolean os, boolean process, bo circuitBreaker ? circuitBreakerService.stats() : null, script ? scriptService.stats() : null, discoveryStats ? discovery.stats() : null, - ingest ? ingestService.getPipelineExecutionService().stats() : null, + ingest ? ingestService.stats() : null, adaptiveSelection ? responseCollectorService.getAdaptiveStats(searchTransportService.getPendingSearchRequests()) : null ); } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java index 97a1ef2806a3e..60a9c6d7db8da 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java @@ -126,7 +126,7 @@ public void testBulkAllowExplicitIndex() throws Exception { public void testBulkAddIterable() { BulkRequest bulkRequest = Requests.bulkRequest(); - List requests = new ArrayList<>(); + List> requests = new ArrayList<>(); requests.add(new IndexRequest("test", "test", "id").source(Requests.INDEX_CONTENT_TYPE, "field", "value")); requests.add(new UpdateRequest("test", "test", "id").doc(Requests.INDEX_CONTENT_TYPE, "field", "value")); requests.add(new DeleteRequest("test", "test", "id")); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index d71458ddb056b..a6eb5d60643bd 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -45,7 +45,6 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.ingest.IngestService; -import org.elasticsearch.ingest.PipelineExecutionService; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -90,9 +89,6 @@ public class TransportBulkActionIngestTests extends ESTestCase { ClusterService clusterService; IngestService ingestService; - /** The ingest execution service we can capture calls to */ - PipelineExecutionService executionService; - /** Arguments to callbacks we want to capture, but which require generics, so we must use @Captor */ @Captor ArgumentCaptor> failureHandler; @@ -101,7 +97,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { @Captor ArgumentCaptor> remoteResponseHandler; @Captor - ArgumentCaptor> bulkDocsItr; + ArgumentCaptor>> bulkDocsItr; /** The actual action we want to test, with real indexing mocked */ TestTransportBulkAction action; @@ -207,8 +203,6 @@ public void setupAction() { }).when(clusterService).addStateApplier(any(ClusterStateApplier.class)); // setup the mocked ingest service for capturing calls ingestService = mock(IngestService.class); - executionService = mock(PipelineExecutionService.class); - when(ingestService.getPipelineExecutionService()).thenReturn(executionService); action = new TestTransportBulkAction(); singleItemBulkWriteAction = new TestSingleItemBulkWriteAction(action); reset(transportService); // call on construction of action @@ -265,12 +259,12 @@ public void testIngestLocal() throws Exception { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(executionService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture()); + verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture()); completionHandler.getValue().accept(exception); assertTrue(failureCalled.get()); // now check success - Iterator req = bulkDocsItr.getValue().iterator(); + Iterator> req = bulkDocsItr.getValue().iterator(); failureHandler.getValue().accept((IndexRequest)req.next(), exception); // have an exception for our one index request indexRequest2.setPipeline(IngestService.NOOP_PIPELINE_NAME); // this is done by the real pipeline execution service when processing completionHandler.getValue().accept(null); @@ -299,7 +293,7 @@ public void testSingleItemBulkActionIngestLocal() throws Exception { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(executionService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture()); + verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture()); completionHandler.getValue().accept(exception); assertTrue(failureCalled.get()); @@ -331,7 +325,7 @@ public void testIngestForward() throws Exception { action.execute(null, bulkRequest, listener); // should not have executed ingest locally - verify(executionService, never()).executeBulkRequest(any(), any(), any()); + verify(ingestService, never()).executeBulkRequest(any(), any(), any()); // but instead should have sent to a remote node with the transport service ArgumentCaptor node = ArgumentCaptor.forClass(DiscoveryNode.class); verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture()); @@ -375,7 +369,7 @@ public void testSingleItemBulkActionIngestForward() throws Exception { singleItemBulkWriteAction.execute(null, indexRequest, listener); // should not have executed ingest locally - verify(executionService, never()).executeBulkRequest(any(), any(), any()); + verify(ingestService, never()).executeBulkRequest(any(), any(), any()); // but instead should have sent to a remote node with the transport service ArgumentCaptor node = ArgumentCaptor.forClass(DiscoveryNode.class); verify(transportService).sendRequest(node.capture(), eq(BulkAction.NAME), any(), remoteResponseHandler.capture()); @@ -423,7 +417,7 @@ public void testUseDefaultPipeline() throws Exception { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(executionService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture()); + verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture()); completionHandler.getValue().accept(exception); assertTrue(failureCalled.get()); @@ -455,7 +449,7 @@ public void testCreateIndexBeforeRunPipeline() throws Exception { assertFalse(action.isExecuted); // haven't executed yet assertFalse(responseCalled.get()); assertFalse(failureCalled.get()); - verify(executionService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture()); + verify(ingestService).executeBulkRequest(bulkDocsItr.capture(), failureHandler.capture(), completionHandler.capture()); completionHandler.getValue().accept(exception); assertTrue(failureCalled.get()); diff --git a/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java b/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java index 00815807eee8a..1711d16891083 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java @@ -31,8 +31,8 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.ingest.CompoundProcessor; import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.Pipeline; -import org.elasticsearch.ingest.PipelineStore; import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.TestProcessor; import org.elasticsearch.test.ESTestCase; @@ -42,7 +42,6 @@ import static org.elasticsearch.action.ingest.SimulatePipelineRequest.SIMULATED_PIPELINE_ID; import static org.elasticsearch.ingest.IngestDocument.MetaData.ID; import static org.elasticsearch.ingest.IngestDocument.MetaData.INDEX; -import static org.elasticsearch.ingest.IngestDocument.MetaData.PARENT; import static org.elasticsearch.ingest.IngestDocument.MetaData.ROUTING; import static org.elasticsearch.ingest.IngestDocument.MetaData.TYPE; import static org.elasticsearch.ingest.IngestDocument.MetaData.VERSION; @@ -54,7 +53,7 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase { - private PipelineStore store; + private IngestService ingestService; @Before public void init() throws IOException { @@ -63,9 +62,9 @@ public void init() throws IOException { Pipeline pipeline = new Pipeline(SIMULATED_PIPELINE_ID, null, null, pipelineCompoundProcessor); Map registry = Collections.singletonMap("mock_processor", (factories, tag, config) -> processor); - store = mock(PipelineStore.class); - when(store.get(SIMULATED_PIPELINE_ID)).thenReturn(pipeline); - when(store.getProcessorFactories()).thenReturn(registry); + ingestService = mock(IngestService.class); + when(ingestService.getPipeline(SIMULATED_PIPELINE_ID)).thenReturn(pipeline); + when(ingestService.getProcessorFactories()).thenReturn(registry); } public void testParseUsingPipelineStore() throws Exception { @@ -95,7 +94,8 @@ public void testParseUsingPipelineStore() throws Exception { expectedDocs.add(expectedDoc); } - SimulatePipelineRequest.Parsed actualRequest = SimulatePipelineRequest.parseWithPipelineId(SIMULATED_PIPELINE_ID, requestContent, false, store); + SimulatePipelineRequest.Parsed actualRequest = + SimulatePipelineRequest.parseWithPipelineId(SIMULATED_PIPELINE_ID, requestContent, false, ingestService); assertThat(actualRequest.isVerbose(), equalTo(false)); assertThat(actualRequest.getDocuments().size(), equalTo(numDocs)); Iterator> expectedDocsIterator = expectedDocs.iterator(); @@ -123,7 +123,7 @@ public void testParseWithProvidedPipeline() throws Exception { for (int i = 0; i < numDocs; i++) { Map doc = new HashMap<>(); Map expectedDoc = new HashMap<>(); - List fields = Arrays.asList(INDEX, TYPE, ID, ROUTING, PARENT, VERSION, VERSION_TYPE); + List fields = Arrays.asList(INDEX, TYPE, ID, ROUTING, VERSION, VERSION_TYPE); for(IngestDocument.MetaData field : fields) { if (field == VERSION) { Long value = randomLong(); @@ -183,7 +183,7 @@ public void testParseWithProvidedPipeline() throws Exception { requestContent.put(Fields.PIPELINE, pipelineConfig); - SimulatePipelineRequest.Parsed actualRequest = SimulatePipelineRequest.parse(requestContent, false, store); + SimulatePipelineRequest.Parsed actualRequest = SimulatePipelineRequest.parse(requestContent, false, ingestService); assertThat(actualRequest.isVerbose(), equalTo(false)); assertThat(actualRequest.getDocuments().size(), equalTo(numDocs)); Iterator> expectedDocsIterator = expectedDocs.iterator(); @@ -194,7 +194,6 @@ public void testParseWithProvidedPipeline() throws Exception { assertThat(metadataMap.get(TYPE), equalTo(expectedDocument.get(TYPE.getFieldName()))); assertThat(metadataMap.get(ID), equalTo(expectedDocument.get(ID.getFieldName()))); assertThat(metadataMap.get(ROUTING), equalTo(expectedDocument.get(ROUTING.getFieldName()))); - assertThat(metadataMap.get(PARENT), equalTo(expectedDocument.get(PARENT.getFieldName()))); assertThat(metadataMap.get(VERSION), equalTo(expectedDocument.get(VERSION.getFieldName()))); assertThat(metadataMap.get(VERSION_TYPE), equalTo(expectedDocument.get(VERSION_TYPE.getFieldName()))); assertThat(ingestDocument.getSourceAndMetadata(), equalTo(expectedDocument.get(Fields.SOURCE))); @@ -210,7 +209,7 @@ public void testNullPipelineId() { List> docs = new ArrayList<>(); requestContent.put(Fields.DOCS, docs); Exception e = expectThrows(IllegalArgumentException.class, - () -> SimulatePipelineRequest.parseWithPipelineId(null, requestContent, false, store)); + () -> SimulatePipelineRequest.parseWithPipelineId(null, requestContent, false, ingestService)); assertThat(e.getMessage(), equalTo("param [pipeline] is null")); } @@ -220,7 +219,7 @@ public void testNonExistentPipelineId() { List> docs = new ArrayList<>(); requestContent.put(Fields.DOCS, docs); Exception e = expectThrows(IllegalArgumentException.class, - () -> SimulatePipelineRequest.parseWithPipelineId(pipelineId, requestContent, false, store)); + () -> SimulatePipelineRequest.parseWithPipelineId(pipelineId, requestContent, false, ingestService)); assertThat(e.getMessage(), equalTo("pipeline [" + pipelineId + "] does not exist")); } } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java b/server/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java index 338e5b662c5da..4c2352bfebe7d 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java @@ -97,12 +97,12 @@ public void testFailStartNode() throws Exception { AcknowledgedResponse response = client().admin().cluster().preparePutPipeline("_id", pipelineSource, XContentType.JSON).get(); assertThat(response.isAcknowledged(), is(true)); - Pipeline pipeline = internalCluster().getInstance(NodeService.class, node1).getIngestService().getPipelineStore().get("_id"); + Pipeline pipeline = internalCluster().getInstance(NodeService.class, node1).getIngestService().getPipeline("_id"); assertThat(pipeline, notNullValue()); installPlugin = false; String node2 = internalCluster().startNode(); - pipeline = internalCluster().getInstance(NodeService.class, node2).getIngestService().getPipelineStore().get("_id"); + pipeline = internalCluster().getInstance(NodeService.class, node2).getIngestService().getPipeline("_id"); assertNotNull(pipeline); assertThat(pipeline.getId(), equalTo("_id")); diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index c0353acb7f9d0..6542538d38f43 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -21,16 +21,69 @@ import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; import java.util.Map; - -import org.elasticsearch.common.settings.Settings; +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.Version; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.ingest.DeletePipelineRequest; +import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.Requests; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.VersionType; import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; -import org.mockito.Mockito; +import org.hamcrest.CustomTypeSafeMatcher; +import org.mockito.ArgumentMatcher; +import org.mockito.invocation.InvocationOnMock; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class IngestServiceTests extends ESTestCase { - private final IngestPlugin DUMMY_PLUGIN = new IngestPlugin() { + + private static final IngestPlugin DUMMY_PLUGIN = new IngestPlugin() { @Override public Map getProcessors(Processor.Parameters parameters) { return Collections.singletonMap("foo", (factories, tag, config) -> null); @@ -38,19 +91,812 @@ public Map getProcessors(Processor.Parameters paramet }; public void testIngestPlugin() { - ThreadPool tp = Mockito.mock(ThreadPool.class); - IngestService ingestService = new IngestService(Settings.EMPTY, tp, null, null, + ThreadPool tp = mock(ThreadPool.class); + IngestService ingestService = new IngestService(mock(ClusterService.class), tp, null, null, null, Collections.singletonList(DUMMY_PLUGIN)); - Map factories = ingestService.getPipelineStore().getProcessorFactories(); + Map factories = ingestService.getProcessorFactories(); assertTrue(factories.containsKey("foo")); assertEquals(1, factories.size()); } public void testIngestPluginDuplicate() { - ThreadPool tp = Mockito.mock(ThreadPool.class); + ThreadPool tp = mock(ThreadPool.class); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> - new IngestService(Settings.EMPTY, tp, null, null, + new IngestService(mock(ClusterService.class), tp, null, null, null, Arrays.asList(DUMMY_PLUGIN, DUMMY_PLUGIN))); assertTrue(e.getMessage(), e.getMessage().contains("already registered")); } + + public void testExecuteIndexPipelineDoesNotExist() { + ThreadPool threadPool = mock(ThreadPool.class); + final ExecutorService executorService = EsExecutors.newDirectExecutorService(); + when(threadPool.executor(anyString())).thenReturn(executorService); + IngestService ingestService = new IngestService(mock(ClusterService.class), threadPool, null, null, + null, Collections.singletonList(DUMMY_PLUGIN)); + final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); + + final SetOnce failure = new SetOnce<>(); + final BiConsumer failureHandler = (request, e) -> { + failure.set(true); + assertThat(request, sameInstance(indexRequest)); + assertThat(e, instanceOf(IllegalArgumentException.class)); + assertThat(e.getMessage(), equalTo("pipeline with id [_id] does not exist")); + }; + + @SuppressWarnings("unchecked") + final Consumer completionHandler = mock(Consumer.class); + + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + + assertTrue(failure.get()); + verify(completionHandler, times(1)).accept(null); + } + + public void testUpdatePipelines() { + IngestService ingestService = createWithProcessors(); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousClusterState = clusterState; + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + assertThat(ingestService.pipelines().size(), is(0)); + + PipelineConfiguration pipeline = new PipelineConfiguration( + "_id",new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"), XContentType.JSON + ); + IngestMetadata ingestMetadata = new IngestMetadata(Collections.singletonMap("_id", pipeline)); + clusterState = ClusterState.builder(clusterState) + .metaData(MetaData.builder().putCustom(IngestMetadata.TYPE, ingestMetadata)) + .build(); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + assertThat(ingestService.pipelines().size(), is(1)); + assertThat(ingestService.pipelines().get("_id").getId(), equalTo("_id")); + assertThat(ingestService.pipelines().get("_id").getDescription(), nullValue()); + assertThat(ingestService.pipelines().get("_id").getProcessors().size(), equalTo(1)); + assertThat(ingestService.pipelines().get("_id").getProcessors().get(0).getType(), equalTo("set")); + } + + public void testDelete() { + IngestService ingestService = createWithProcessors(); + PipelineConfiguration config = new PipelineConfiguration( + "_id",new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"), XContentType.JSON + ); + IngestMetadata ingestMetadata = new IngestMetadata(Collections.singletonMap("_id", config)); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousClusterState = clusterState; + clusterState = ClusterState.builder(clusterState).metaData(MetaData.builder() + .putCustom(IngestMetadata.TYPE, ingestMetadata)).build(); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + assertThat(ingestService.getPipeline("_id"), notNullValue()); + + // Delete pipeline: + DeletePipelineRequest deleteRequest = new DeletePipelineRequest("_id"); + previousClusterState = clusterState; + clusterState = IngestService.innerDelete(deleteRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + assertThat(ingestService.getPipeline("_id"), nullValue()); + + // Delete existing pipeline: + try { + IngestService.innerDelete(deleteRequest, clusterState); + fail("exception expected"); + } catch (ResourceNotFoundException e) { + assertThat(e.getMessage(), equalTo("pipeline [_id] is missing")); + } + } + + public void testValidateNoIngestInfo() throws Exception { + IngestService ingestService = createWithProcessors(); + PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray( + "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"), XContentType.JSON); + Exception e = expectThrows(IllegalStateException.class, () -> ingestService.validatePipeline(emptyMap(), putRequest)); + assertEquals("Ingest info is empty", e.getMessage()); + + DiscoveryNode discoveryNode = new DiscoveryNode("_node_id", buildNewFakeTransportAddress(), + emptyMap(), emptySet(), Version.CURRENT); + IngestInfo ingestInfo = new IngestInfo(Collections.singletonList(new ProcessorInfo("set"))); + ingestService.validatePipeline(Collections.singletonMap(discoveryNode, ingestInfo), putRequest); + } + + public void testCrud() throws Exception { + IngestService ingestService = createWithProcessors(); + String id = "_id"; + Pipeline pipeline = ingestService.getPipeline(id); + assertThat(pipeline, nullValue()); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + + PutPipelineRequest putRequest = new PutPipelineRequest(id, + new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"), XContentType.JSON); + ClusterState previousClusterState = clusterState; + clusterState = IngestService.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + pipeline = ingestService.getPipeline(id); + assertThat(pipeline, notNullValue()); + assertThat(pipeline.getId(), equalTo(id)); + assertThat(pipeline.getDescription(), nullValue()); + assertThat(pipeline.getProcessors().size(), equalTo(1)); + assertThat(pipeline.getProcessors().get(0).getType(), equalTo("set")); + + DeletePipelineRequest deleteRequest = new DeletePipelineRequest(id); + previousClusterState = clusterState; + clusterState = IngestService.innerDelete(deleteRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + pipeline = ingestService.getPipeline(id); + assertThat(pipeline, nullValue()); + } + + public void testPut() { + IngestService ingestService = createWithProcessors(); + String id = "_id"; + Pipeline pipeline = ingestService.getPipeline(id); + assertThat(pipeline, nullValue()); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + + // add a new pipeline: + PutPipelineRequest putRequest = new PutPipelineRequest(id, new BytesArray("{\"processors\": []}"), XContentType.JSON); + ClusterState previousClusterState = clusterState; + clusterState = IngestService.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + pipeline = ingestService.getPipeline(id); + assertThat(pipeline, notNullValue()); + assertThat(pipeline.getId(), equalTo(id)); + assertThat(pipeline.getDescription(), nullValue()); + assertThat(pipeline.getProcessors().size(), equalTo(0)); + + // overwrite existing pipeline: + putRequest = + new PutPipelineRequest(id, new BytesArray("{\"processors\": [], \"description\": \"_description\"}"), XContentType.JSON); + previousClusterState = clusterState; + clusterState = IngestService.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + pipeline = ingestService.getPipeline(id); + assertThat(pipeline, notNullValue()); + assertThat(pipeline.getId(), equalTo(id)); + assertThat(pipeline.getDescription(), equalTo("_description")); + assertThat(pipeline.getProcessors().size(), equalTo(0)); + } + + public void testPutWithErrorResponse() { + IngestService ingestService = createWithProcessors(); + String id = "_id"; + Pipeline pipeline = ingestService.getPipeline(id); + assertThat(pipeline, nullValue()); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + + PutPipelineRequest putRequest = + new PutPipelineRequest(id, new BytesArray("{\"description\": \"empty processors\"}"), XContentType.JSON); + ClusterState previousClusterState = clusterState; + clusterState = IngestService.innerPut(putRequest, clusterState); + try { + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + fail("should fail"); + } catch (ElasticsearchParseException e) { + assertThat(e.getMessage(), equalTo("[processors] required property is missing")); + } + pipeline = ingestService.getPipeline(id); + assertNotNull(pipeline); + assertThat(pipeline.getId(), equalTo("_id")); + assertThat(pipeline.getDescription(), equalTo("this is a place holder pipeline, because pipeline with" + + " id [_id] could not be loaded")); + assertThat(pipeline.getProcessors().size(), equalTo(1)); + assertNull(pipeline.getProcessors().get(0).getTag()); + assertThat(pipeline.getProcessors().get(0).getType(), equalTo("unknown")); + } + + public void testDeleteUsingWildcard() { + IngestService ingestService = createWithProcessors(); + HashMap pipelines = new HashMap<>(); + BytesArray definition = new BytesArray( + "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}" + ); + pipelines.put("p1", new PipelineConfiguration("p1", definition, XContentType.JSON)); + pipelines.put("p2", new PipelineConfiguration("p2", definition, XContentType.JSON)); + pipelines.put("q1", new PipelineConfiguration("q1", definition, XContentType.JSON)); + IngestMetadata ingestMetadata = new IngestMetadata(pipelines); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousClusterState = clusterState; + clusterState = ClusterState.builder(clusterState).metaData(MetaData.builder() + .putCustom(IngestMetadata.TYPE, ingestMetadata)).build(); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + assertThat(ingestService.getPipeline("p1"), notNullValue()); + assertThat(ingestService.getPipeline("p2"), notNullValue()); + assertThat(ingestService.getPipeline("q1"), notNullValue()); + + // Delete pipeline matching wildcard + DeletePipelineRequest deleteRequest = new DeletePipelineRequest("p*"); + previousClusterState = clusterState; + clusterState = IngestService.innerDelete(deleteRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + assertThat(ingestService.getPipeline("p1"), nullValue()); + assertThat(ingestService.getPipeline("p2"), nullValue()); + assertThat(ingestService.getPipeline("q1"), notNullValue()); + + // Exception if we used name which does not exist + try { + IngestService.innerDelete(new DeletePipelineRequest("unknown"), clusterState); + fail("exception expected"); + } catch (ResourceNotFoundException e) { + assertThat(e.getMessage(), equalTo("pipeline [unknown] is missing")); + } + + // match all wildcard works on last remaining pipeline + DeletePipelineRequest matchAllDeleteRequest = new DeletePipelineRequest("*"); + previousClusterState = clusterState; + clusterState = IngestService.innerDelete(matchAllDeleteRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + assertThat(ingestService.getPipeline("p1"), nullValue()); + assertThat(ingestService.getPipeline("p2"), nullValue()); + assertThat(ingestService.getPipeline("q1"), nullValue()); + + // match all wildcard does not throw exception if none match + IngestService.innerDelete(matchAllDeleteRequest, clusterState); + } + + public void testDeleteWithExistingUnmatchedPipelines() { + IngestService ingestService = createWithProcessors(); + HashMap pipelines = new HashMap<>(); + BytesArray definition = new BytesArray( + "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}" + ); + pipelines.put("p1", new PipelineConfiguration("p1", definition, XContentType.JSON)); + IngestMetadata ingestMetadata = new IngestMetadata(pipelines); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousClusterState = clusterState; + clusterState = ClusterState.builder(clusterState).metaData(MetaData.builder() + .putCustom(IngestMetadata.TYPE, ingestMetadata)).build(); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + assertThat(ingestService.getPipeline("p1"), notNullValue()); + + DeletePipelineRequest deleteRequest = new DeletePipelineRequest("z*"); + try { + IngestService.innerDelete(deleteRequest, clusterState); + fail("exception expected"); + } catch (ResourceNotFoundException e) { + assertThat(e.getMessage(), equalTo("pipeline [z*] is missing")); + } + } + + public void testGetPipelines() { + Map configs = new HashMap<>(); + configs.put("_id1", new PipelineConfiguration( + "_id1", new BytesArray("{\"processors\": []}"), XContentType.JSON + )); + configs.put("_id2", new PipelineConfiguration( + "_id2", new BytesArray("{\"processors\": []}"), XContentType.JSON + )); + + assertThat(IngestService.innerGetPipelines(null, "_id1").isEmpty(), is(true)); + + IngestMetadata ingestMetadata = new IngestMetadata(configs); + List pipelines = IngestService.innerGetPipelines(ingestMetadata, "_id1"); + assertThat(pipelines.size(), equalTo(1)); + assertThat(pipelines.get(0).getId(), equalTo("_id1")); + + pipelines = IngestService.innerGetPipelines(ingestMetadata, "_id1", "_id2"); + assertThat(pipelines.size(), equalTo(2)); + assertThat(pipelines.get(0).getId(), equalTo("_id1")); + assertThat(pipelines.get(1).getId(), equalTo("_id2")); + + pipelines = IngestService.innerGetPipelines(ingestMetadata, "_id*"); + pipelines.sort(Comparator.comparing(PipelineConfiguration::getId)); + assertThat(pipelines.size(), equalTo(2)); + assertThat(pipelines.get(0).getId(), equalTo("_id1")); + assertThat(pipelines.get(1).getId(), equalTo("_id2")); + + // get all variants: (no IDs or '*') + pipelines = IngestService.innerGetPipelines(ingestMetadata); + pipelines.sort(Comparator.comparing(PipelineConfiguration::getId)); + assertThat(pipelines.size(), equalTo(2)); + assertThat(pipelines.get(0).getId(), equalTo("_id1")); + assertThat(pipelines.get(1).getId(), equalTo("_id2")); + + pipelines = IngestService.innerGetPipelines(ingestMetadata, "*"); + pipelines.sort(Comparator.comparing(PipelineConfiguration::getId)); + assertThat(pipelines.size(), equalTo(2)); + assertThat(pipelines.get(0).getId(), equalTo("_id1")); + assertThat(pipelines.get(1).getId(), equalTo("_id2")); + } + + public void testValidate() throws Exception { + IngestService ingestService = createWithProcessors(); + PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray( + "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\", \"tag\": \"tag1\"}}," + + "{\"remove\" : {\"field\": \"_field\", \"tag\": \"tag2\"}}]}"), + XContentType.JSON); + + DiscoveryNode node1 = new DiscoveryNode("_node_id1", buildNewFakeTransportAddress(), + emptyMap(), emptySet(), Version.CURRENT); + DiscoveryNode node2 = new DiscoveryNode("_node_id2", buildNewFakeTransportAddress(), + emptyMap(), emptySet(), Version.CURRENT); + Map ingestInfos = new HashMap<>(); + ingestInfos.put(node1, new IngestInfo(Arrays.asList(new ProcessorInfo("set"), new ProcessorInfo("remove")))); + ingestInfos.put(node2, new IngestInfo(Arrays.asList(new ProcessorInfo("set")))); + + ElasticsearchParseException e = + expectThrows(ElasticsearchParseException.class, () -> ingestService.validatePipeline(ingestInfos, putRequest)); + assertEquals("Processor type [remove] is not installed on node [" + node2 + "]", e.getMessage()); + assertEquals("remove", e.getMetadata("es.processor_type").get(0)); + assertEquals("tag2", e.getMetadata("es.processor_tag").get(0)); + + ingestInfos.put(node2, new IngestInfo(Arrays.asList(new ProcessorInfo("set"), new ProcessorInfo("remove")))); + ingestService.validatePipeline(ingestInfos, putRequest); + } + + public void testExecuteIndexPipelineExistsButFailedParsing() { + IngestService ingestService = createWithProcessors(Collections.singletonMap( + "mock", (factories, tag, config) -> new AbstractProcessor("mock") { + @Override + public void execute(IngestDocument ingestDocument) { + throw new IllegalStateException("error"); + } + + @Override + public String getType() { + return null; + } + } + )); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + String id = "_id"; + PutPipelineRequest putRequest = new PutPipelineRequest(id, + new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); + ClusterState previousClusterState = clusterState; + clusterState = IngestService.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + final SetOnce failure = new SetOnce<>(); + final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline(id); + final BiConsumer failureHandler = (request, e) -> { + assertThat(e.getCause(), instanceOf(IllegalArgumentException.class)); + assertThat(e.getCause().getCause(), instanceOf(IllegalStateException.class)); + assertThat(e.getCause().getCause().getMessage(), equalTo("error")); + failure.set(true); + }; + + @SuppressWarnings("unchecked") + final Consumer completionHandler = mock(Consumer.class); + + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + + assertTrue(failure.get()); + verify(completionHandler, times(1)).accept(null); + } + + public void testExecuteBulkPipelineDoesNotExist() { + IngestService ingestService = createWithProcessors(Collections.singletonMap( + "mock", (factories, tag, config) -> mock(CompoundProcessor.class))); + + PutPipelineRequest putRequest = new PutPipelineRequest("_id", + new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + ClusterState previousClusterState = clusterState; + clusterState = IngestService.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + + BulkRequest bulkRequest = new BulkRequest(); + + IndexRequest indexRequest1 = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); + bulkRequest.add(indexRequest1); + IndexRequest indexRequest2 = + new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("does_not_exist"); + bulkRequest.add(indexRequest2); + @SuppressWarnings("unchecked") + BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + Consumer completionHandler = mock(Consumer.class); + ingestService.executeBulkRequest(bulkRequest.requests(), failureHandler, completionHandler); + verify(failureHandler, times(1)).accept( + argThat(new CustomTypeSafeMatcher("failure handler was not called with the expected arguments") { + @Override + protected boolean matchesSafely(IndexRequest item) { + return item == indexRequest2; + } + + }), + argThat(new CustomTypeSafeMatcher("failure handler was not called with the expected arguments") { + @Override + protected boolean matchesSafely(IllegalArgumentException iae) { + return "pipeline with id [does_not_exist] does not exist".equals(iae.getMessage()); + } + }) + ); + verify(completionHandler, times(1)).accept(null); + } + + public void testExecuteSuccess() { + IngestService ingestService = createWithProcessors(Collections.singletonMap( + "mock", (factories, tag, config) -> mock(CompoundProcessor.class))); + PutPipelineRequest putRequest = new PutPipelineRequest("_id", + new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + ClusterState previousClusterState = clusterState; + clusterState = IngestService.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); + @SuppressWarnings("unchecked") + final BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + final Consumer completionHandler = mock(Consumer.class); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + verify(failureHandler, never()).accept(any(), any()); + verify(completionHandler, times(1)).accept(null); + } + + public void testExecuteEmptyPipeline() throws Exception { + IngestService ingestService = createWithProcessors(emptyMap()); + PutPipelineRequest putRequest = + new PutPipelineRequest("_id", new BytesArray("{\"processors\": [], \"description\": \"_description\"}"), XContentType.JSON); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + ClusterState previousClusterState = clusterState; + clusterState = IngestService.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); + @SuppressWarnings("unchecked") + final BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + final Consumer completionHandler = mock(Consumer.class); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + verify(failureHandler, never()).accept(any(), any()); + verify(completionHandler, times(1)).accept(null); + } + + public void testExecutePropagateAllMetaDataUpdates() throws Exception { + final CompoundProcessor processor = mock(CompoundProcessor.class); + IngestService ingestService = createWithProcessors(Collections.singletonMap( + "mock", (factories, tag, config) -> processor)); + PutPipelineRequest putRequest = new PutPipelineRequest("_id", + new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + ClusterState previousClusterState = clusterState; + clusterState = IngestService.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + final long newVersion = randomLong(); + final String versionType = randomFrom("internal", "external", "external_gt", "external_gte"); + doAnswer((InvocationOnMock invocationOnMock) -> { + IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0]; + for (IngestDocument.MetaData metaData : IngestDocument.MetaData.values()) { + if (metaData == IngestDocument.MetaData.VERSION) { + ingestDocument.setFieldValue(metaData.getFieldName(), newVersion); + } else if (metaData == IngestDocument.MetaData.VERSION_TYPE) { + ingestDocument.setFieldValue(metaData.getFieldName(), versionType); + } else { + ingestDocument.setFieldValue(metaData.getFieldName(), "update" + metaData.getFieldName()); + } + } + return null; + }).when(processor).execute(any()); + final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); + @SuppressWarnings("unchecked") + final BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + final Consumer completionHandler = mock(Consumer.class); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + verify(processor).execute(any()); + verify(failureHandler, never()).accept(any(), any()); + verify(completionHandler, times(1)).accept(null); + assertThat(indexRequest.index(), equalTo("update_index")); + assertThat(indexRequest.type(), equalTo("update_type")); + assertThat(indexRequest.id(), equalTo("update_id")); + assertThat(indexRequest.routing(), equalTo("update_routing")); + assertThat(indexRequest.version(), equalTo(newVersion)); + assertThat(indexRequest.versionType(), equalTo(VersionType.fromString(versionType))); + } + + public void testExecuteFailure() throws Exception { + final CompoundProcessor processor = mock(CompoundProcessor.class); + IngestService ingestService = createWithProcessors(Collections.singletonMap( + "mock", (factories, tag, config) -> processor)); + PutPipelineRequest putRequest = new PutPipelineRequest("_id", + new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + ClusterState previousClusterState = clusterState; + clusterState = IngestService.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); + doThrow(new RuntimeException()) + .when(processor) + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); + @SuppressWarnings("unchecked") + final BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + final Consumer completionHandler = mock(Consumer.class); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); + verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class)); + verify(completionHandler, times(1)).accept(null); + } + + public void testExecuteSuccessWithOnFailure() throws Exception { + final Processor processor = mock(Processor.class); + when(processor.getType()).thenReturn("mock_processor_type"); + when(processor.getTag()).thenReturn("mock_processor_tag"); + final Processor onFailureProcessor = mock(Processor.class); + final CompoundProcessor compoundProcessor = new CompoundProcessor( + false, Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor))); + IngestService ingestService = createWithProcessors(Collections.singletonMap( + "mock", (factories, tag, config) -> compoundProcessor)); + PutPipelineRequest putRequest = new PutPipelineRequest("_id", + new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + ClusterState previousClusterState = clusterState; + clusterState = IngestService.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); + doThrow(new RuntimeException()).when(processor).execute(eqIndexTypeId(emptyMap())); + @SuppressWarnings("unchecked") + final BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + final Consumer completionHandler = mock(Consumer.class); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + verify(failureHandler, never()).accept(eq(indexRequest), any(ElasticsearchException.class)); + verify(completionHandler, times(1)).accept(null); + } + + public void testExecuteFailureWithNestedOnFailure() throws Exception { + final Processor processor = mock(Processor.class); + final Processor onFailureProcessor = mock(Processor.class); + final Processor onFailureOnFailureProcessor = mock(Processor.class); + final List processors = Collections.singletonList(onFailureProcessor); + final List onFailureProcessors = Collections.singletonList(onFailureOnFailureProcessor); + final CompoundProcessor compoundProcessor = new CompoundProcessor( + false, + Collections.singletonList(processor), + Collections.singletonList(new CompoundProcessor(false, processors, onFailureProcessors))); + IngestService ingestService = createWithProcessors(Collections.singletonMap( + "mock", (factories, tag, config) -> compoundProcessor)); + PutPipelineRequest putRequest = new PutPipelineRequest("_id", + new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + ClusterState previousClusterState = clusterState; + clusterState = IngestService.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(emptyMap()).setPipeline("_id"); + doThrow(new RuntimeException()) + .when(onFailureOnFailureProcessor) + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); + doThrow(new RuntimeException()) + .when(onFailureProcessor) + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); + doThrow(new RuntimeException()) + .when(processor) + .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); + @SuppressWarnings("unchecked") + final BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + final Consumer completionHandler = mock(Consumer.class); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), emptyMap())); + verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class)); + verify(completionHandler, times(1)).accept(null); + } + + public void testBulkRequestExecutionWithFailures() throws Exception { + BulkRequest bulkRequest = new BulkRequest(); + String pipelineId = "_id"; + + int numRequest = scaledRandomIntBetween(8, 64); + int numIndexRequests = 0; + for (int i = 0; i < numRequest; i++) { + DocWriteRequest request; + if (randomBoolean()) { + if (randomBoolean()) { + request = new DeleteRequest("_index", "_type", "_id"); + } else { + request = new UpdateRequest("_index", "_type", "_id"); + } + } else { + IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline(pipelineId); + indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1"); + request = indexRequest; + numIndexRequests++; + } + bulkRequest.add(request); + } + + CompoundProcessor processor = mock(CompoundProcessor.class); + when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class))); + Exception error = new RuntimeException(); + doThrow(error).when(processor).execute(any()); + IngestService ingestService = createWithProcessors(Collections.singletonMap( + "mock", (factories, tag, config) -> processor)); + PutPipelineRequest putRequest = new PutPipelineRequest("_id", + new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + ClusterState previousClusterState = clusterState; + clusterState = IngestService.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + + @SuppressWarnings("unchecked") + BiConsumer requestItemErrorHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + Consumer completionHandler = mock(Consumer.class); + ingestService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler); + + verify(requestItemErrorHandler, times(numIndexRequests)).accept(any(IndexRequest.class), argThat(new ArgumentMatcher() { + @Override + public boolean matches(final Object o) { + return ((Exception)o).getCause().getCause().equals(error); + } + })); + verify(completionHandler, times(1)).accept(null); + } + + public void testBulkRequestExecution() { + BulkRequest bulkRequest = new BulkRequest(); + String pipelineId = "_id"; + + int numRequest = scaledRandomIntBetween(8, 64); + for (int i = 0; i < numRequest; i++) { + IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline(pipelineId); + indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1"); + bulkRequest.add(indexRequest); + } + + IngestService ingestService = createWithProcessors(emptyMap()); + PutPipelineRequest putRequest = + new PutPipelineRequest("_id", new BytesArray("{\"processors\": [], \"description\": \"_description\"}"), XContentType.JSON); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); + ClusterState previousClusterState = clusterState; + clusterState = IngestService.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + + @SuppressWarnings("unchecked") + BiConsumer requestItemErrorHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") + Consumer completionHandler = mock(Consumer.class); + ingestService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler); + + verify(requestItemErrorHandler, never()).accept(any(), any()); + verify(completionHandler, times(1)).accept(null); + } + + public void testStats() { + final Processor processor = mock(Processor.class); + IngestService ingestService = createWithProcessors(Collections.singletonMap( + "mock", (factories, tag, config) -> processor)); + final IngestStats initialStats = ingestService.stats(); + assertThat(initialStats.getStatsPerPipeline().size(), equalTo(0)); + assertThat(initialStats.getTotalStats().getIngestCount(), equalTo(0L)); + assertThat(initialStats.getTotalStats().getIngestCurrent(), equalTo(0L)); + assertThat(initialStats.getTotalStats().getIngestFailedCount(), equalTo(0L)); + assertThat(initialStats.getTotalStats().getIngestTimeInMillis(), equalTo(0L)); + + PutPipelineRequest putRequest = new PutPipelineRequest("_id1", + new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty + ClusterState previousClusterState = clusterState; + clusterState = IngestService.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + putRequest = new PutPipelineRequest("_id2", + new BytesArray("{\"processors\": [{\"mock\" : {}}]}"), XContentType.JSON); + previousClusterState = clusterState; + clusterState = IngestService.innerPut(putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + final Map configurationMap = new HashMap<>(); + configurationMap.put("_id1", new PipelineConfiguration("_id1", new BytesArray("{}"), XContentType.JSON)); + configurationMap.put("_id2", new PipelineConfiguration("_id2", new BytesArray("{}"), XContentType.JSON)); + ingestService.updatePipelineStats(new IngestMetadata(configurationMap)); + + @SuppressWarnings("unchecked") final BiConsumer failureHandler = mock(BiConsumer.class); + @SuppressWarnings("unchecked") final Consumer completionHandler = mock(Consumer.class); + + final IndexRequest indexRequest = new IndexRequest("_index"); + indexRequest.setPipeline("_id1"); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + final IngestStats afterFirstRequestStats = ingestService.stats(); + assertThat(afterFirstRequestStats.getStatsPerPipeline().size(), equalTo(2)); + assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L)); + assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(0L)); + assertThat(afterFirstRequestStats.getTotalStats().getIngestCount(), equalTo(1L)); + + indexRequest.setPipeline("_id2"); + ingestService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); + final IngestStats afterSecondRequestStats = ingestService.stats(); + assertThat(afterSecondRequestStats.getStatsPerPipeline().size(), equalTo(2)); + assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L)); + assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L)); + assertThat(afterSecondRequestStats.getTotalStats().getIngestCount(), equalTo(2L)); + } + + // issue: https://github.com/elastic/elasticsearch/issues/18126 + public void testUpdatingStatsWhenRemovingPipelineWorks() { + IngestService ingestService = createWithProcessors(); + Map configurationMap = new HashMap<>(); + configurationMap.put("_id1", new PipelineConfiguration("_id1", new BytesArray("{}"), XContentType.JSON)); + configurationMap.put("_id2", new PipelineConfiguration("_id2", new BytesArray("{}"), XContentType.JSON)); + ingestService.updatePipelineStats(new IngestMetadata(configurationMap)); + assertThat(ingestService.stats().getStatsPerPipeline(), hasKey("_id1")); + assertThat(ingestService.stats().getStatsPerPipeline(), hasKey("_id2")); + + configurationMap = new HashMap<>(); + configurationMap.put("_id3", new PipelineConfiguration("_id3", new BytesArray("{}"), XContentType.JSON)); + ingestService.updatePipelineStats(new IngestMetadata(configurationMap)); + assertThat(ingestService.stats().getStatsPerPipeline(), not(hasKey("_id1"))); + assertThat(ingestService.stats().getStatsPerPipeline(), not(hasKey("_id2"))); + } + + private IngestDocument eqIndexTypeId(final Map source) { + return argThat(new IngestDocumentMatcher("_index", "_type", "_id", source)); + } + + private IngestDocument eqIndexTypeId(final Long version, final VersionType versionType, final Map source) { + return argThat(new IngestDocumentMatcher("_index", "_type", "_id", version, versionType, source)); + } + + private static IngestService createWithProcessors() { + Map processors = new HashMap<>(); + processors.put("set", (factories, tag, config) -> { + String field = (String) config.remove("field"); + String value = (String) config.remove("value"); + return new Processor() { + @Override + public void execute(IngestDocument ingestDocument) { + ingestDocument.setFieldValue(field, value); + } + + @Override + public String getType() { + return "set"; + } + + @Override + public String getTag() { + return tag; + } + }; + }); + processors.put("remove", (factories, tag, config) -> { + String field = (String) config.remove("field"); + return new Processor() { + @Override + public void execute(IngestDocument ingestDocument) { + ingestDocument.removeField(field); + } + + @Override + public String getType() { + return "remove"; + } + + @Override + public String getTag() { + return tag; + } + }; + }); + return createWithProcessors(processors); + } + + private static IngestService createWithProcessors(Map processors) { + ThreadPool threadPool = mock(ThreadPool.class); + final ExecutorService executorService = EsExecutors.newDirectExecutorService(); + when(threadPool.executor(anyString())).thenReturn(executorService); + return new IngestService(mock(ClusterService.class), threadPool, null, null, + null, Collections.singletonList(new IngestPlugin() { + @Override + public Map getProcessors(final Processor.Parameters parameters) { + return processors; + } + })); + } + + private class IngestDocumentMatcher extends ArgumentMatcher { + + private final IngestDocument ingestDocument; + + IngestDocumentMatcher(String index, String type, String id, Map source) { + this.ingestDocument = new IngestDocument(index, type, id, null, null, null, null, source); + } + + IngestDocumentMatcher(String index, String type, String id, Long version, VersionType versionType, Map source) { + this.ingestDocument = new IngestDocument(index, type, id, null, null, version, versionType, source); + } + + @Override + public boolean matches(Object o) { + if (o.getClass() == IngestDocument.class) { + IngestDocument otherIngestDocument = (IngestDocument) o; + //ingest metadata will not be the same (timestamp differs every time) + return Objects.equals(ingestDocument.getSourceAndMetadata(), otherIngestDocument.getSourceAndMetadata()); + } + return false; + } + } } diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java deleted file mode 100644 index a347d1bcdaf91..0000000000000 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineExecutionServiceTests.java +++ /dev/null @@ -1,472 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.ingest; - -import org.apache.lucene.util.SetOnce; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.client.Requests; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.VersionType; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; -import org.hamcrest.CustomTypeSafeMatcher; -import org.junit.Before; -import org.mockito.ArgumentMatcher; -import org.mockito.invocation.InvocationOnMock; - -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ExecutorService; -import java.util.function.BiConsumer; -import java.util.function.Consumer; - -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasKey; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.sameInstance; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.argThat; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -public class PipelineExecutionServiceTests extends ESTestCase { - - private final Integer version = randomBoolean() ? randomInt() : null; - private PipelineStore store; - private PipelineExecutionService executionService; - - @Before - public void setup() { - store = mock(PipelineStore.class); - ThreadPool threadPool = mock(ThreadPool.class); - final ExecutorService executorService = EsExecutors.newDirectExecutorService(); - when(threadPool.executor(anyString())).thenReturn(executorService); - executionService = new PipelineExecutionService(store, threadPool); - } - - public void testExecuteIndexPipelineDoesNotExist() { - final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - - final SetOnce failure = new SetOnce<>(); - final BiConsumer failureHandler = (request, e) -> { - failure.set(true); - assertThat(request, sameInstance(indexRequest)); - assertThat(e, instanceOf(IllegalArgumentException.class)); - assertThat(e.getMessage(), equalTo("pipeline with id [_id] does not exist")); - }; - - @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - - executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); - - assertTrue(failure.get()); - verify(completionHandler, times(1)).accept(null); - } - - public void testExecuteIndexPipelineExistsButFailedParsing() { - when(store.get("_id")).thenReturn(new Pipeline("_id", "stub", null, - new CompoundProcessor(new AbstractProcessor("mock") { - @Override - public void execute(IngestDocument ingestDocument) { - throw new IllegalStateException("error"); - } - - @Override - public String getType() { - return null; - } - }))); - - final SetOnce failure = new SetOnce<>(); - final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - final BiConsumer failureHandler = (request, e) -> { - assertThat(e.getCause(), instanceOf(IllegalArgumentException.class)); - assertThat(e.getCause().getCause(), instanceOf(IllegalStateException.class)); - assertThat(e.getCause().getCause().getMessage(), equalTo("error")); - failure.set(true); - }; - - @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - - executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); - - assertTrue(failure.get()); - verify(completionHandler, times(1)).accept(null); - } - - public void testExecuteBulkPipelineDoesNotExist() { - CompoundProcessor processor = mock(CompoundProcessor.class); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor)); - BulkRequest bulkRequest = new BulkRequest(); - - IndexRequest indexRequest1 = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - bulkRequest.add(indexRequest1); - IndexRequest indexRequest2 = - new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("does_not_exist"); - bulkRequest.add(indexRequest2); - @SuppressWarnings("unchecked") - BiConsumer failureHandler = mock(BiConsumer.class); - @SuppressWarnings("unchecked") - Consumer completionHandler = mock(Consumer.class); - executionService.executeBulkRequest(bulkRequest.requests(), failureHandler, completionHandler); - verify(failureHandler, times(1)).accept( - argThat(new CustomTypeSafeMatcher("failure handler was not called with the expected arguments") { - @Override - protected boolean matchesSafely(IndexRequest item) { - return item == indexRequest2; - } - - }), - argThat(new CustomTypeSafeMatcher("failure handler was not called with the expected arguments") { - @Override - protected boolean matchesSafely(IllegalArgumentException iae) { - return "pipeline with id [does_not_exist] does not exist".equals(iae.getMessage()); - } - }) - ); - verify(completionHandler, times(1)).accept(null); - } - - public void testExecuteSuccess() { - final CompoundProcessor processor = mock(CompoundProcessor.class); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor)); - final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); - @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); - verify(failureHandler, never()).accept(any(), any()); - verify(completionHandler, times(1)).accept(null); - } - - public void testExecuteEmptyPipeline() throws Exception { - final CompoundProcessor processor = mock(CompoundProcessor.class); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor)); - when(processor.getProcessors()).thenReturn(Collections.emptyList()); - - final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); - @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); - verify(processor, never()).execute(any()); - verify(failureHandler, never()).accept(any(), any()); - verify(completionHandler, times(1)).accept(null); - } - - public void testExecutePropagateAllMetaDataUpdates() throws Exception { - final CompoundProcessor processor = mock(CompoundProcessor.class); - when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class))); - final long newVersion = randomLong(); - final String versionType = randomFrom("internal", "external", "external_gt", "external_gte"); - doAnswer((InvocationOnMock invocationOnMock) -> { - IngestDocument ingestDocument = (IngestDocument) invocationOnMock.getArguments()[0]; - for (IngestDocument.MetaData metaData : IngestDocument.MetaData.values()) { - if (metaData == IngestDocument.MetaData.VERSION) { - ingestDocument.setFieldValue(metaData.getFieldName(), newVersion); - } else if (metaData == IngestDocument.MetaData.VERSION_TYPE) { - ingestDocument.setFieldValue(metaData.getFieldName(), versionType); - } else { - ingestDocument.setFieldValue(metaData.getFieldName(), "update" + metaData.getFieldName()); - } - } - return null; - }).when(processor).execute(any()); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor)); - - final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); - @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); - verify(processor).execute(any()); - verify(failureHandler, never()).accept(any(), any()); - verify(completionHandler, times(1)).accept(null); - assertThat(indexRequest.index(), equalTo("update_index")); - assertThat(indexRequest.type(), equalTo("update_type")); - assertThat(indexRequest.id(), equalTo("update_id")); - assertThat(indexRequest.routing(), equalTo("update_routing")); - assertThat(indexRequest.parent(), equalTo("update_parent")); - assertThat(indexRequest.version(), equalTo(newVersion)); - assertThat(indexRequest.versionType(), equalTo(VersionType.fromString(versionType))); - } - - public void testExecuteFailure() throws Exception { - final CompoundProcessor processor = mock(CompoundProcessor.class); - when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class))); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, processor)); - final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - doThrow(new RuntimeException()) - .when(processor) - .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); - @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); - @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); - verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); - verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class)); - verify(completionHandler, times(1)).accept(null); - } - - public void testExecuteSuccessWithOnFailure() throws Exception { - final Processor processor = mock(Processor.class); - when(processor.getType()).thenReturn("mock_processor_type"); - when(processor.getTag()).thenReturn("mock_processor_tag"); - final Processor onFailureProcessor = mock(Processor.class); - final CompoundProcessor compoundProcessor = new CompoundProcessor( - false, Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor))); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, compoundProcessor)); - final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - doThrow(new RuntimeException()).when(processor).execute(eqIndexTypeId(Collections.emptyMap())); - @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); - @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); - verify(failureHandler, never()).accept(eq(indexRequest), any(ElasticsearchException.class)); - verify(completionHandler, times(1)).accept(null); - } - - public void testExecuteFailureWithOnFailure() throws Exception { - final Processor processor = mock(Processor.class); - final Processor onFailureProcessor = mock(Processor.class); - final CompoundProcessor compoundProcessor = new CompoundProcessor( - false, Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor))); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, compoundProcessor)); - final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - doThrow(new RuntimeException()) - .when(processor) - .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); - doThrow(new RuntimeException()) - .when(onFailureProcessor) - .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); - @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); - @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); - verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); - verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class)); - verify(completionHandler, times(1)).accept(null); - } - - public void testExecuteFailureWithNestedOnFailure() throws Exception { - final Processor processor = mock(Processor.class); - final Processor onFailureProcessor = mock(Processor.class); - final Processor onFailureOnFailureProcessor = mock(Processor.class); - final List processors = Collections.singletonList(onFailureProcessor); - final List onFailureProcessors = Collections.singletonList(onFailureOnFailureProcessor); - final CompoundProcessor compoundProcessor = new CompoundProcessor( - false, - Collections.singletonList(processor), - Collections.singletonList(new CompoundProcessor(false, processors, onFailureProcessors))); - when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", version, compoundProcessor)); - final IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").source(Collections.emptyMap()).setPipeline("_id"); - doThrow(new RuntimeException()) - .when(onFailureOnFailureProcessor) - .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); - doThrow(new RuntimeException()) - .when(onFailureProcessor) - .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); - doThrow(new RuntimeException()) - .when(processor) - .execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); - @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); - @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); - verify(processor).execute(eqIndexTypeId(indexRequest.version(), indexRequest.versionType(), Collections.emptyMap())); - verify(failureHandler, times(1)).accept(eq(indexRequest), any(RuntimeException.class)); - verify(completionHandler, times(1)).accept(null); - } - - public void testBulkRequestExecutionWithFailures() throws Exception { - BulkRequest bulkRequest = new BulkRequest(); - String pipelineId = "_id"; - - int numRequest = scaledRandomIntBetween(8, 64); - int numIndexRequests = 0; - for (int i = 0; i < numRequest; i++) { - DocWriteRequest request; - if (randomBoolean()) { - if (randomBoolean()) { - request = new DeleteRequest("_index", "_type", "_id"); - } else { - request = new UpdateRequest("_index", "_type", "_id"); - } - } else { - IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline(pipelineId); - indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1"); - request = indexRequest; - numIndexRequests++; - } - bulkRequest.add(request); - } - - CompoundProcessor processor = mock(CompoundProcessor.class); - when(processor.getProcessors()).thenReturn(Collections.singletonList(mock(Processor.class))); - Exception error = new RuntimeException(); - doThrow(error).when(processor).execute(any()); - when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, version, processor)); - - @SuppressWarnings("unchecked") - BiConsumer requestItemErrorHandler = mock(BiConsumer.class); - @SuppressWarnings("unchecked") - Consumer completionHandler = mock(Consumer.class); - executionService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler); - - verify(requestItemErrorHandler, times(numIndexRequests)).accept(any(IndexRequest.class), eq(error)); - verify(completionHandler, times(1)).accept(null); - } - - public void testBulkRequestExecution() { - BulkRequest bulkRequest = new BulkRequest(); - String pipelineId = "_id"; - - int numRequest = scaledRandomIntBetween(8, 64); - for (int i = 0; i < numRequest; i++) { - IndexRequest indexRequest = new IndexRequest("_index", "_type", "_id").setPipeline(pipelineId); - indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1"); - bulkRequest.add(indexRequest); - } - - when(store.get(pipelineId)).thenReturn(new Pipeline(pipelineId, null, version, new CompoundProcessor())); - - @SuppressWarnings("unchecked") - BiConsumer requestItemErrorHandler = mock(BiConsumer.class); - @SuppressWarnings("unchecked") - Consumer completionHandler = mock(Consumer.class); - executionService.executeBulkRequest(bulkRequest.requests(), requestItemErrorHandler, completionHandler); - - verify(requestItemErrorHandler, never()).accept(any(), any()); - verify(completionHandler, times(1)).accept(null); - } - - public void testStats() { - final IngestStats initialStats = executionService.stats(); - assertThat(initialStats.getStatsPerPipeline().size(), equalTo(0)); - assertThat(initialStats.getTotalStats().getIngestCount(), equalTo(0L)); - assertThat(initialStats.getTotalStats().getIngestCurrent(), equalTo(0L)); - assertThat(initialStats.getTotalStats().getIngestFailedCount(), equalTo(0L)); - assertThat(initialStats.getTotalStats().getIngestTimeInMillis(), equalTo(0L)); - - when(store.get("_id1")).thenReturn(new Pipeline("_id1", null, version, new CompoundProcessor(mock(Processor.class)))); - when(store.get("_id2")).thenReturn(new Pipeline("_id2", null, null, new CompoundProcessor(mock(Processor.class)))); - - final Map configurationMap = new HashMap<>(); - configurationMap.put("_id1", new PipelineConfiguration("_id1", new BytesArray("{}"), XContentType.JSON)); - configurationMap.put("_id2", new PipelineConfiguration("_id2", new BytesArray("{}"), XContentType.JSON)); - executionService.updatePipelineStats(new IngestMetadata(configurationMap)); - - @SuppressWarnings("unchecked") - final BiConsumer failureHandler = mock(BiConsumer.class); - @SuppressWarnings("unchecked") - final Consumer completionHandler = mock(Consumer.class); - - final IndexRequest indexRequest = new IndexRequest("_index"); - indexRequest.setPipeline("_id1"); - executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); - final IngestStats afterFirstRequestStats = executionService.stats(); - assertThat(afterFirstRequestStats.getStatsPerPipeline().size(), equalTo(2)); - assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L)); - assertThat(afterFirstRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(0L)); - assertThat(afterFirstRequestStats.getTotalStats().getIngestCount(), equalTo(1L)); - - indexRequest.setPipeline("_id2"); - executionService.executeBulkRequest(Collections.singletonList(indexRequest), failureHandler, completionHandler); - final IngestStats afterSecondRequestStats = executionService.stats(); - assertThat(afterSecondRequestStats.getStatsPerPipeline().size(), equalTo(2)); - assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id1").getIngestCount(), equalTo(1L)); - assertThat(afterSecondRequestStats.getStatsPerPipeline().get("_id2").getIngestCount(), equalTo(1L)); - assertThat(afterSecondRequestStats.getTotalStats().getIngestCount(), equalTo(2L)); - } - - // issue: https://github.com/elastic/elasticsearch/issues/18126 - public void testUpdatingStatsWhenRemovingPipelineWorks() { - Map configurationMap = new HashMap<>(); - configurationMap.put("_id1", new PipelineConfiguration("_id1", new BytesArray("{}"), XContentType.JSON)); - configurationMap.put("_id2", new PipelineConfiguration("_id2", new BytesArray("{}"), XContentType.JSON)); - executionService.updatePipelineStats(new IngestMetadata(configurationMap)); - assertThat(executionService.stats().getStatsPerPipeline(), hasKey("_id1")); - assertThat(executionService.stats().getStatsPerPipeline(), hasKey("_id2")); - - configurationMap = new HashMap<>(); - configurationMap.put("_id3", new PipelineConfiguration("_id3", new BytesArray("{}"), XContentType.JSON)); - executionService.updatePipelineStats(new IngestMetadata(configurationMap)); - assertThat(executionService.stats().getStatsPerPipeline(), not(hasKey("_id1"))); - assertThat(executionService.stats().getStatsPerPipeline(), not(hasKey("_id2"))); - } - - private IngestDocument eqIndexTypeId(final Map source) { - return argThat(new IngestDocumentMatcher("_index", "_type", "_id", source)); - } - - private IngestDocument eqIndexTypeId(final Long version, final VersionType versionType, final Map source) { - return argThat(new IngestDocumentMatcher("_index", "_type", "_id", version, versionType, source)); - } - - private class IngestDocumentMatcher extends ArgumentMatcher { - - private final IngestDocument ingestDocument; - - IngestDocumentMatcher(String index, String type, String id, Map source) { - this.ingestDocument = new IngestDocument(index, type, id, null, null, null, null, source); - } - - IngestDocumentMatcher(String index, String type, String id, Long version, VersionType versionType, Map source) { - this.ingestDocument = new IngestDocument(index, type, id, null, null, version, versionType, source); - } - - @Override - public boolean matches(Object o) { - if (o.getClass() == IngestDocument.class) { - IngestDocument otherIngestDocument = (IngestDocument) o; - //ingest metadata will not be the same (timestamp differs every time) - return Objects.equals(ingestDocument.getSourceAndMetadata(), otherIngestDocument.getSourceAndMetadata()); - } - return false; - } - } -} diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java index 461873a3fe3d2..cafdbcfb44690 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java @@ -47,9 +47,8 @@ public void testCreate() throws Exception { pipelineConfig.put(Pipeline.VERSION_KEY, versionString); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Arrays.asList(Collections.singletonMap("test", processorConfig0), Collections.singletonMap("test", processorConfig1))); - Pipeline.Factory factory = new Pipeline.Factory(); Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); - Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry); + Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getVersion(), equalTo(version)); @@ -64,9 +63,8 @@ public void testCreateWithNoProcessorsField() throws Exception { Map pipelineConfig = new HashMap<>(); pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); pipelineConfig.put(Pipeline.VERSION_KEY, versionString); - Pipeline.Factory factory = new Pipeline.Factory(); try { - factory.create("_id", pipelineConfig, Collections.emptyMap()); + Pipeline.create("_id", pipelineConfig, Collections.emptyMap()); fail("should fail, missing required [processors] field"); } catch (ElasticsearchParseException e) { assertThat(e.getMessage(), equalTo("[processors] required property is missing")); @@ -78,8 +76,7 @@ public void testCreateWithEmptyProcessorsField() throws Exception { pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); pipelineConfig.put(Pipeline.VERSION_KEY, versionString); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.emptyList()); - Pipeline.Factory factory = new Pipeline.Factory(); - Pipeline pipeline = factory.create("_id", pipelineConfig, null); + Pipeline pipeline = Pipeline.create("_id", pipelineConfig, null); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getVersion(), equalTo(version)); @@ -93,9 +90,8 @@ public void testCreateWithPipelineOnFailure() throws Exception { pipelineConfig.put(Pipeline.VERSION_KEY, versionString); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); pipelineConfig.put(Pipeline.ON_FAILURE_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); - Pipeline.Factory factory = new Pipeline.Factory(); Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); - Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry); + Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getVersion(), equalTo(version)); @@ -112,9 +108,8 @@ public void testCreateWithPipelineEmptyOnFailure() throws Exception { pipelineConfig.put(Pipeline.VERSION_KEY, versionString); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); pipelineConfig.put(Pipeline.ON_FAILURE_KEY, Collections.emptyList()); - Pipeline.Factory factory = new Pipeline.Factory(); Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); - Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create("_id", pipelineConfig, processorRegistry)); + Exception e = expectThrows(ElasticsearchParseException.class, () -> Pipeline.create("_id", pipelineConfig, processorRegistry)); assertThat(e.getMessage(), equalTo("pipeline [_id] cannot have an empty on_failure option defined")); } @@ -125,9 +120,8 @@ public void testCreateWithPipelineEmptyOnFailureInProcessor() throws Exception { pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); pipelineConfig.put(Pipeline.VERSION_KEY, versionString); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); - Pipeline.Factory factory = new Pipeline.Factory(); Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); - Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create("_id", pipelineConfig, processorRegistry)); + Exception e = expectThrows(ElasticsearchParseException.class, () -> Pipeline.create("_id", pipelineConfig, processorRegistry)); assertThat(e.getMessage(), equalTo("[on_failure] processors list cannot be empty")); } @@ -136,14 +130,13 @@ public void testCreateWithPipelineIgnoreFailure() throws Exception { processorConfig.put("ignore_failure", true); Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); - Pipeline.Factory factory = new Pipeline.Factory(); Map pipelineConfig = new HashMap<>(); pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); pipelineConfig.put(Pipeline.VERSION_KEY, versionString); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); - Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry); + Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getVersion(), equalTo(version)); @@ -162,9 +155,8 @@ public void testCreateUnusedProcessorOptions() throws Exception { pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); pipelineConfig.put(Pipeline.VERSION_KEY, versionString); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); - Pipeline.Factory factory = new Pipeline.Factory(); Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); - Exception e = expectThrows(ElasticsearchParseException.class, () -> factory.create("_id", pipelineConfig, processorRegistry)); + Exception e = expectThrows(ElasticsearchParseException.class, () -> Pipeline.create("_id", pipelineConfig, processorRegistry)); assertThat(e.getMessage(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]")); } @@ -176,9 +168,8 @@ public void testCreateProcessorsWithOnFailureProperties() throws Exception { pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); pipelineConfig.put(Pipeline.VERSION_KEY, versionString); pipelineConfig.put(Pipeline.PROCESSORS_KEY, Collections.singletonList(Collections.singletonMap("test", processorConfig))); - Pipeline.Factory factory = new Pipeline.Factory(); Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); - Pipeline pipeline = factory.create("_id", pipelineConfig, processorRegistry); + Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getVersion(), equalTo(version)); diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java deleted file mode 100644 index 250bb5059cf58..0000000000000 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineStoreTests.java +++ /dev/null @@ -1,377 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.ingest; - -import org.elasticsearch.ElasticsearchParseException; -import org.elasticsearch.ResourceNotFoundException; -import org.elasticsearch.Version; -import org.elasticsearch.action.ingest.DeletePipelineRequest; -import org.elasticsearch.action.ingest.PutPipelineRequest; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.test.ESTestCase; -import org.junit.Before; - -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static java.util.Collections.emptyMap; -import static java.util.Collections.emptySet; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; - -public class PipelineStoreTests extends ESTestCase { - - private PipelineStore store; - - @Before - public void init() throws Exception { - Map processorFactories = new HashMap<>(); - processorFactories.put("set", (factories, tag, config) -> { - String field = (String) config.remove("field"); - String value = (String) config.remove("value"); - return new Processor() { - @Override - public void execute(IngestDocument ingestDocument) throws Exception { - ingestDocument.setFieldValue(field, value); - } - - @Override - public String getType() { - return "set"; - } - - @Override - public String getTag() { - return tag; - } - }; - }); - processorFactories.put("remove", (factories, tag, config) -> { - String field = (String) config.remove("field"); - return new Processor() { - @Override - public void execute(IngestDocument ingestDocument) throws Exception { - ingestDocument.removeField(field); - } - - @Override - public String getType() { - return "remove"; - } - - @Override - public String getTag() { - return tag; - } - }; - }); - store = new PipelineStore(Settings.EMPTY, processorFactories); - } - - public void testUpdatePipelines() { - ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); - ClusterState previousClusterState = clusterState; - store.innerUpdatePipelines(previousClusterState, clusterState); - assertThat(store.pipelines.size(), is(0)); - - PipelineConfiguration pipeline = new PipelineConfiguration( - "_id",new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"), XContentType.JSON - ); - IngestMetadata ingestMetadata = new IngestMetadata(Collections.singletonMap("_id", pipeline)); - clusterState = ClusterState.builder(clusterState) - .metaData(MetaData.builder().putCustom(IngestMetadata.TYPE, ingestMetadata)) - .build(); - store.innerUpdatePipelines(previousClusterState, clusterState); - assertThat(store.pipelines.size(), is(1)); - assertThat(store.pipelines.get("_id").getId(), equalTo("_id")); - assertThat(store.pipelines.get("_id").getDescription(), nullValue()); - assertThat(store.pipelines.get("_id").getProcessors().size(), equalTo(1)); - assertThat(store.pipelines.get("_id").getProcessors().get(0).getType(), equalTo("set")); - } - - public void testPut() { - String id = "_id"; - Pipeline pipeline = store.get(id); - assertThat(pipeline, nullValue()); - ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); - - // add a new pipeline: - PutPipelineRequest putRequest = new PutPipelineRequest(id, new BytesArray("{\"processors\": []}"), XContentType.JSON); - ClusterState previousClusterState = clusterState; - clusterState = store.innerPut(putRequest, clusterState); - store.innerUpdatePipelines(previousClusterState, clusterState); - pipeline = store.get(id); - assertThat(pipeline, notNullValue()); - assertThat(pipeline.getId(), equalTo(id)); - assertThat(pipeline.getDescription(), nullValue()); - assertThat(pipeline.getProcessors().size(), equalTo(0)); - - // overwrite existing pipeline: - putRequest = - new PutPipelineRequest(id, new BytesArray("{\"processors\": [], \"description\": \"_description\"}"), XContentType.JSON); - previousClusterState = clusterState; - clusterState = store.innerPut(putRequest, clusterState); - store.innerUpdatePipelines(previousClusterState, clusterState); - pipeline = store.get(id); - assertThat(pipeline, notNullValue()); - assertThat(pipeline.getId(), equalTo(id)); - assertThat(pipeline.getDescription(), equalTo("_description")); - assertThat(pipeline.getProcessors().size(), equalTo(0)); - } - - public void testPutWithErrorResponse() { - String id = "_id"; - Pipeline pipeline = store.get(id); - assertThat(pipeline, nullValue()); - ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); - - PutPipelineRequest putRequest = - new PutPipelineRequest(id, new BytesArray("{\"description\": \"empty processors\"}"), XContentType.JSON); - ClusterState previousClusterState = clusterState; - clusterState = store.innerPut(putRequest, clusterState); - try { - store.innerUpdatePipelines(previousClusterState, clusterState); - fail("should fail"); - } catch (ElasticsearchParseException e) { - assertThat(e.getMessage(), equalTo("[processors] required property is missing")); - } - pipeline = store.get(id); - assertNotNull(pipeline); - assertThat(pipeline.getId(), equalTo("_id")); - assertThat(pipeline.getDescription(), equalTo("this is a place holder pipeline, because pipeline with" + - " id [_id] could not be loaded")); - assertThat(pipeline.getProcessors().size(), equalTo(1)); - assertNull(pipeline.getProcessors().get(0).getTag()); - assertThat(pipeline.getProcessors().get(0).getType(), equalTo("unknown")); - } - - public void testDelete() { - PipelineConfiguration config = new PipelineConfiguration( - "_id",new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"), XContentType.JSON - ); - IngestMetadata ingestMetadata = new IngestMetadata(Collections.singletonMap("_id", config)); - ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); - ClusterState previousClusterState = clusterState; - clusterState = ClusterState.builder(clusterState).metaData(MetaData.builder() - .putCustom(IngestMetadata.TYPE, ingestMetadata)).build(); - store.innerUpdatePipelines(previousClusterState, clusterState); - assertThat(store.get("_id"), notNullValue()); - - // Delete pipeline: - DeletePipelineRequest deleteRequest = new DeletePipelineRequest("_id"); - previousClusterState = clusterState; - clusterState = store.innerDelete(deleteRequest, clusterState); - store.innerUpdatePipelines(previousClusterState, clusterState); - assertThat(store.get("_id"), nullValue()); - - // Delete existing pipeline: - try { - store.innerDelete(deleteRequest, clusterState); - fail("exception expected"); - } catch (ResourceNotFoundException e) { - assertThat(e.getMessage(), equalTo("pipeline [_id] is missing")); - } - } - - public void testDeleteUsingWildcard() { - HashMap pipelines = new HashMap<>(); - BytesArray definition = new BytesArray( - "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}" - ); - pipelines.put("p1", new PipelineConfiguration("p1", definition, XContentType.JSON)); - pipelines.put("p2", new PipelineConfiguration("p2", definition, XContentType.JSON)); - pipelines.put("q1", new PipelineConfiguration("q1", definition, XContentType.JSON)); - IngestMetadata ingestMetadata = new IngestMetadata(pipelines); - ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); - ClusterState previousClusterState = clusterState; - clusterState = ClusterState.builder(clusterState).metaData(MetaData.builder() - .putCustom(IngestMetadata.TYPE, ingestMetadata)).build(); - store.innerUpdatePipelines(previousClusterState, clusterState); - assertThat(store.get("p1"), notNullValue()); - assertThat(store.get("p2"), notNullValue()); - assertThat(store.get("q1"), notNullValue()); - - // Delete pipeline matching wildcard - DeletePipelineRequest deleteRequest = new DeletePipelineRequest("p*"); - previousClusterState = clusterState; - clusterState = store.innerDelete(deleteRequest, clusterState); - store.innerUpdatePipelines(previousClusterState, clusterState); - assertThat(store.get("p1"), nullValue()); - assertThat(store.get("p2"), nullValue()); - assertThat(store.get("q1"), notNullValue()); - - // Exception if we used name which does not exist - try { - store.innerDelete(new DeletePipelineRequest("unknown"), clusterState); - fail("exception expected"); - } catch (ResourceNotFoundException e) { - assertThat(e.getMessage(), equalTo("pipeline [unknown] is missing")); - } - - // match all wildcard works on last remaining pipeline - DeletePipelineRequest matchAllDeleteRequest = new DeletePipelineRequest("*"); - previousClusterState = clusterState; - clusterState = store.innerDelete(matchAllDeleteRequest, clusterState); - store.innerUpdatePipelines(previousClusterState, clusterState); - assertThat(store.get("p1"), nullValue()); - assertThat(store.get("p2"), nullValue()); - assertThat(store.get("q1"), nullValue()); - - // match all wildcard does not throw exception if none match - store.innerDelete(matchAllDeleteRequest, clusterState); - } - - public void testDeleteWithExistingUnmatchedPipelines() { - HashMap pipelines = new HashMap<>(); - BytesArray definition = new BytesArray( - "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}" - ); - pipelines.put("p1", new PipelineConfiguration("p1", definition, XContentType.JSON)); - IngestMetadata ingestMetadata = new IngestMetadata(pipelines); - ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); - ClusterState previousClusterState = clusterState; - clusterState = ClusterState.builder(clusterState).metaData(MetaData.builder() - .putCustom(IngestMetadata.TYPE, ingestMetadata)).build(); - store.innerUpdatePipelines(previousClusterState, clusterState); - assertThat(store.get("p1"), notNullValue()); - - DeletePipelineRequest deleteRequest = new DeletePipelineRequest("z*"); - try { - store.innerDelete(deleteRequest, clusterState); - fail("exception expected"); - } catch (ResourceNotFoundException e) { - assertThat(e.getMessage(), equalTo("pipeline [z*] is missing")); - } - } - - public void testGetPipelines() { - Map configs = new HashMap<>(); - configs.put("_id1", new PipelineConfiguration( - "_id1", new BytesArray("{\"processors\": []}"), XContentType.JSON - )); - configs.put("_id2", new PipelineConfiguration( - "_id2", new BytesArray("{\"processors\": []}"), XContentType.JSON - )); - - assertThat(store.innerGetPipelines(null, "_id1").isEmpty(), is(true)); - - IngestMetadata ingestMetadata = new IngestMetadata(configs); - List pipelines = store.innerGetPipelines(ingestMetadata, "_id1"); - assertThat(pipelines.size(), equalTo(1)); - assertThat(pipelines.get(0).getId(), equalTo("_id1")); - - pipelines = store.innerGetPipelines(ingestMetadata, "_id1", "_id2"); - assertThat(pipelines.size(), equalTo(2)); - assertThat(pipelines.get(0).getId(), equalTo("_id1")); - assertThat(pipelines.get(1).getId(), equalTo("_id2")); - - pipelines = store.innerGetPipelines(ingestMetadata, "_id*"); - pipelines.sort((o1, o2) -> o1.getId().compareTo(o2.getId())); - assertThat(pipelines.size(), equalTo(2)); - assertThat(pipelines.get(0).getId(), equalTo("_id1")); - assertThat(pipelines.get(1).getId(), equalTo("_id2")); - - // get all variants: (no IDs or '*') - pipelines = store.innerGetPipelines(ingestMetadata); - pipelines.sort((o1, o2) -> o1.getId().compareTo(o2.getId())); - assertThat(pipelines.size(), equalTo(2)); - assertThat(pipelines.get(0).getId(), equalTo("_id1")); - assertThat(pipelines.get(1).getId(), equalTo("_id2")); - - pipelines = store.innerGetPipelines(ingestMetadata, "*"); - pipelines.sort((o1, o2) -> o1.getId().compareTo(o2.getId())); - assertThat(pipelines.size(), equalTo(2)); - assertThat(pipelines.get(0).getId(), equalTo("_id1")); - assertThat(pipelines.get(1).getId(), equalTo("_id2")); - } - - public void testCrud() throws Exception { - String id = "_id"; - Pipeline pipeline = store.get(id); - assertThat(pipeline, nullValue()); - ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty - - PutPipelineRequest putRequest = new PutPipelineRequest(id, - new BytesArray("{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"), XContentType.JSON); - ClusterState previousClusterState = clusterState; - clusterState = store.innerPut(putRequest, clusterState); - store.innerUpdatePipelines(previousClusterState, clusterState); - pipeline = store.get(id); - assertThat(pipeline, notNullValue()); - assertThat(pipeline.getId(), equalTo(id)); - assertThat(pipeline.getDescription(), nullValue()); - assertThat(pipeline.getProcessors().size(), equalTo(1)); - assertThat(pipeline.getProcessors().get(0).getType(), equalTo("set")); - - DeletePipelineRequest deleteRequest = new DeletePipelineRequest(id); - previousClusterState = clusterState; - clusterState = store.innerDelete(deleteRequest, clusterState); - store.innerUpdatePipelines(previousClusterState, clusterState); - pipeline = store.get(id); - assertThat(pipeline, nullValue()); - } - - public void testValidate() throws Exception { - PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray( - "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\", \"tag\": \"tag1\"}}," + - "{\"remove\" : {\"field\": \"_field\", \"tag\": \"tag2\"}}]}"), - XContentType.JSON); - - DiscoveryNode node1 = new DiscoveryNode("_node_id1", buildNewFakeTransportAddress(), - emptyMap(), emptySet(), Version.CURRENT); - DiscoveryNode node2 = new DiscoveryNode("_node_id2", buildNewFakeTransportAddress(), - emptyMap(), emptySet(), Version.CURRENT); - Map ingestInfos = new HashMap<>(); - ingestInfos.put(node1, new IngestInfo(Arrays.asList(new ProcessorInfo("set"), new ProcessorInfo("remove")))); - ingestInfos.put(node2, new IngestInfo(Arrays.asList(new ProcessorInfo("set")))); - - ElasticsearchParseException e = - expectThrows(ElasticsearchParseException.class, () -> store.validatePipeline(ingestInfos, putRequest)); - assertEquals("Processor type [remove] is not installed on node [" + node2 + "]", e.getMessage()); - assertEquals("remove", e.getHeader("processor_type").get(0)); - assertEquals("tag2", e.getHeader("processor_tag").get(0)); - - ingestInfos.put(node2, new IngestInfo(Arrays.asList(new ProcessorInfo("set"), new ProcessorInfo("remove")))); - store.validatePipeline(ingestInfos, putRequest); - } - - public void testValidateNoIngestInfo() throws Exception { - PutPipelineRequest putRequest = new PutPipelineRequest("_id", new BytesArray( - "{\"processors\": [{\"set\" : {\"field\": \"_field\", \"value\": \"_value\"}}]}"), XContentType.JSON); - Exception e = expectThrows(IllegalStateException.class, () -> store.validatePipeline(Collections.emptyMap(), putRequest)); - assertEquals("Ingest info is empty", e.getMessage()); - - DiscoveryNode discoveryNode = new DiscoveryNode("_node_id", buildNewFakeTransportAddress(), - emptyMap(), emptySet(), Version.CURRENT); - IngestInfo ingestInfo = new IngestInfo(Collections.singletonList(new ProcessorInfo("set"))); - store.validatePipeline(Collections.singletonMap(discoveryNode, ingestInfo), putRequest); - } -}