Skip to content

Commit

Permalink
Consolidate logic for creating ingest pipelines (elastic#78289)
Browse files Browse the repository at this point in the history
  • Loading branch information
danhermann authored Sep 27, 2021
1 parent 272c55e commit 59149bf
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 201 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@

package org.elasticsearch.action.ingest;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
Expand All @@ -21,20 +19,12 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.ingest.IngestInfo;
import org.elasticsearch.ingest.IngestMetadata;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.HashMap;
import java.util.Map;

import static org.elasticsearch.ingest.IngestService.INGEST_ORIGIN;

public class PutPipelineTransportAction extends AcknowledgedTransportMasterNodeAction<PutPipelineRequest> {
Expand All @@ -59,42 +49,15 @@ public PutPipelineTransportAction(ThreadPool threadPool, TransportService transp
@Override
protected void masterOperation(Task task, PutPipelineRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener)
throws Exception {

Map<String, Object> pipelineConfig = null;
IngestMetadata currentIngestMetadata = state.metadata().custom(IngestMetadata.TYPE);
if (currentIngestMetadata != null && currentIngestMetadata.getPipelines().containsKey(request.getId())) {
pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
var currentPipeline = currentIngestMetadata.getPipelines().get(request.getId());
if (currentPipeline.getConfigAsMap().equals(pipelineConfig)) {
// existing pipeline matches request pipeline -- no need to update
listener.onResponse(AcknowledgedResponse.TRUE);
return;
}
}

if (state.getNodes().getMinNodeVersion().before(Version.V_7_15_0)) {
pipelineConfig = pipelineConfig == null
? XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2()
: pipelineConfig;
if (pipelineConfig.containsKey(Pipeline.META_KEY)) {
throw new IllegalStateException("pipelines with _meta field require minimum node version of " + Version.V_7_15_0);
ingestService.putPipeline(
request,
listener,
(nodeListener) -> {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.clear();
nodesInfoRequest.addMetric(NodesInfoRequest.Metric.INGEST.metricName());
client.admin().cluster().nodesInfo(nodesInfoRequest, nodeListener);
}
}
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.clear();
nodesInfoRequest.addMetric(NodesInfoRequest.Metric.INGEST.metricName());
client.admin().cluster().nodesInfo(
nodesInfoRequest,
ActionListener.wrap(
nodeInfos -> {
Map<DiscoveryNode, IngestInfo> ingestInfos = new HashMap<>();
for (NodeInfo nodeInfo : nodeInfos.getNodes()) {
ingestInfos.put(nodeInfo.getNode(), nodeInfo.getInfo(IngestInfo.class));
}
ingestService.putPipeline(ingestInfos, request, listener);
},
listener::onFailure
)
);
}

Expand Down
75 changes: 60 additions & 15 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.DeletePipelineRequest;
Expand All @@ -34,12 +37,12 @@
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.env.Environment;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.IndexSettings;
Expand Down Expand Up @@ -331,17 +334,56 @@ static List<PipelineConfiguration> innerGetPipelines(IngestMetadata ingestMetada
/**
* Stores the specified pipeline definition in the request.
*/
public void putPipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineRequest request,
ActionListener<AcknowledgedResponse> listener) throws Exception {
// validates the pipeline and processor configuration before submitting a cluster update task:
validatePipeline(ingestInfos, request);
clusterService.submitStateUpdateTask("put-pipeline-" + request.getId(),
new AckedClusterStateUpdateTask(request, listener) {
@Override
public ClusterState execute(ClusterState currentState) {
return innerPut(request, currentState);
public void putPipeline(
PutPipelineRequest request,
ActionListener<AcknowledgedResponse> listener,
Consumer<ActionListener<NodesInfoResponse>> nodeInfoListener
) throws Exception {

Map<String, Object> pipelineConfig = null;
IngestMetadata currentIngestMetadata = state.metadata().custom(IngestMetadata.TYPE);
if (currentIngestMetadata != null && currentIngestMetadata.getPipelines().containsKey(request.getId())) {
pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
var currentPipeline = currentIngestMetadata.getPipelines().get(request.getId());
if (currentPipeline.getConfigAsMap().equals(pipelineConfig)) {
// existing pipeline matches request pipeline -- no need to update
listener.onResponse(AcknowledgedResponse.TRUE);
return;
}
}

if (state.getNodes().getMinNodeVersion().before(Version.V_7_15_0)) {
pipelineConfig = pipelineConfig == null
? XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2()
: pipelineConfig;
if (pipelineConfig.containsKey(Pipeline.META_KEY)) {
throw new IllegalStateException("pipelines with _meta field require minimum node version of " + Version.V_7_15_0);
}
}

final Map<String, Object> config = pipelineConfig == null
? XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2()
: pipelineConfig;
nodeInfoListener.accept(ActionListener.wrap(
nodeInfos -> {
Map<DiscoveryNode, IngestInfo> ingestInfos = new HashMap<>();
for (NodeInfo nodeInfo : nodeInfos.getNodes()) {
ingestInfos.put(nodeInfo.getNode(), nodeInfo.getInfo(IngestInfo.class));
}
});

validatePipeline(ingestInfos, request.getId(), config);
clusterService.submitStateUpdateTask(
"put-pipeline-" + request.getId(),
new AckedClusterStateUpdateTask(request, listener) {
@Override
public ClusterState execute(ClusterState currentState) {
return innerPut(request, currentState);
}
}
);
},
listener::onFailure)
);
}

/**
Expand Down Expand Up @@ -417,13 +459,16 @@ static ClusterState innerPut(PutPipelineRequest request, ClusterState currentSta
return newState.build();
}

void validatePipeline(Map<DiscoveryNode, IngestInfo> ingestInfos, PutPipelineRequest request) throws Exception {
void validatePipeline(
Map<DiscoveryNode, IngestInfo> ingestInfos,
String pipelineId,
Map<String, Object> pipelineConfig
) throws Exception {
if (ingestInfos.isEmpty()) {
throw new IllegalStateException("Ingest info is empty");
}

Map<String, Object> pipelineConfig = XContentHelper.convertToMap(request.getSource(), false, request.getXContentType()).v2();
Pipeline pipeline = Pipeline.create(request.getId(), pipelineConfig, processorFactories, scriptService);
Pipeline pipeline = Pipeline.create(pipelineId, pipelineConfig, processorFactories, scriptService);
List<Exception> exceptions = new ArrayList<>();
for (Processor processor : pipeline.flattenAllProcessors()) {
for (Map.Entry<DiscoveryNode, IngestInfo> entry : ingestInfos.entrySet()) {
Expand Down

This file was deleted.

Loading

0 comments on commit 59149bf

Please sign in to comment.