Skip to content

Commit

Permalink
Add plugin pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
carlosdelest committed Dec 14, 2023
1 parent 21f0591 commit b81c8f4
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ static TransportVersion def(int id) {
public static final TransportVersion INFERENCE_SERVICE_EMBEDDING_SIZE_ADDED = def(8_559_00_0);
public static final TransportVersion ENRICH_ELASTICSEARCH_VERSION_REMOVED = def(8_560_00_0);
public static final TransportVersion NODE_STATS_REQUEST_SIMPLIFIED = def(8_561_00_0);
public static final TransportVersion SEMANTIC_TEXT_FIELD = def(8_562_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,8 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
if (indexRequest != null) {
IngestService.resolvePipelinesAndUpdateIndexRequest(actionRequest, indexRequest, metadata);
hasIndexRequestsWithPipelines |= IngestService.hasPipeline(indexRequest);
ingestService.resolvePipelinesAndUpdateIndexRequest(actionRequest, indexRequest, metadata);
hasIndexRequestsWithPipelines |= ingestService.hasPipeline(indexRequest);
}

if (actionRequest instanceof IndexRequest ir) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement

private String pipeline;
private String finalPipeline;
private String pluginsPipeline;

private boolean isPipelineResolved;

Expand Down Expand Up @@ -189,6 +190,9 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio
: new ArrayList<>(possiblyImmutableExecutedPipelines);
}
}
if (in.getTransportVersion().onOrAfter(TransportVersions.SEMANTIC_TEXT_FIELD)) {
this.pluginsPipeline = in.readOptionalString();
}
}

public IndexRequest() {
Expand Down Expand Up @@ -355,6 +359,15 @@ public String getFinalPipeline() {
return this.finalPipeline;
}

public String getPluginsPipeline() {
return pluginsPipeline;
}

public IndexRequest setPluginsPipeline(String pluginsPipeline) {
this.pluginsPipeline = pluginsPipeline;
return this;
}

/**
* Sets if the pipeline for this request has been resolved by the coordinating node.
*
Expand Down Expand Up @@ -734,6 +747,9 @@ private void writeBody(StreamOutput out) throws IOException {
out.writeOptionalCollection(executedPipelines, StreamOutput::writeString);
}
}
if (out.getTransportVersion().onOrAfter(TransportVersions.SEMANTIC_TEXT_FIELD)) {
out.writeOptionalString(pluginsPipeline);
}
}

@Override
Expand Down
164 changes: 116 additions & 48 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -107,15 +106,19 @@ public class IngestService implements ClusterStateApplier, ReportingService<Inge
private final ScriptService scriptService;
private final Supplier<DocumentParsingObserver> documentParsingObserverSupplier;
private final Map<String, Processor.Factory> processorFactories;
private final List<IngestPlugin> ingestPlugins;
// Ideally this should be in IngestMetadata class, but we don't have the processor factories around there.
// We know of all the processor factories when a node with all its plugin have been initialized. Also some
// processor factories rely on other node services. Custom metadata is statically registered when classes
// are loaded, so in the cluster state we just save the pipeline config and here we keep the actual pipelines around.
private volatile Map<String, PipelineHolder> pipelines = Map.of();
private volatile Map<String, List<Pipeline>> pluginPipelines = Map.of();

private final ThreadPool threadPool;
private final IngestMetric totalMetrics = new IngestMetric();
private final List<Consumer<ClusterState>> ingestClusterStateListeners = new CopyOnWriteArrayList<>();
private volatile ClusterState state;
private final Processor.Parameters processorParameters;

private static BiFunction<Long, Runnable, Scheduler.ScheduledCancellable> createScheduler(ThreadPool threadPool) {
return (delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), threadPool.generic());
Expand Down Expand Up @@ -187,21 +190,21 @@ public IngestService(
this.clusterService = clusterService;
this.scriptService = scriptService;
this.documentParsingObserverSupplier = documentParsingObserverSupplier;
this.processorFactories = processorFactories(
ingestPlugins,
new Processor.Parameters(
env,
scriptService,
analysisRegistry,
threadPool.getThreadContext(),
threadPool::relativeTimeInMillis,
createScheduler(threadPool),
this,
client,
threadPool.generic()::execute,
matcherWatchdog
)
this.ingestPlugins = ingestPlugins;
this.processorParameters = new Processor.Parameters(
env,
scriptService,
analysisRegistry,
threadPool.getThreadContext(),
threadPool::relativeTimeInMillis,
createScheduler(threadPool),
this,
client,
threadPool.generic()::execute,
matcherWatchdog
);

this.processorFactories = processorFactories(ingestPlugins, processorParameters);
this.threadPool = threadPool;
this.taskQueue = clusterService.createTaskQueue("ingest-pipelines", Priority.NORMAL, PIPELINE_TASK_EXECUTOR);
}
Expand All @@ -214,12 +217,14 @@ public IngestService(
IngestService(IngestService ingestService) {
this.clusterService = ingestService.clusterService;
this.scriptService = ingestService.scriptService;
this.ingestPlugins = ingestService.ingestPlugins;
this.documentParsingObserverSupplier = ingestService.documentParsingObserverSupplier;
this.processorFactories = ingestService.processorFactories;
this.threadPool = ingestService.threadPool;
this.taskQueue = ingestService.taskQueue;
this.pipelines = ingestService.pipelines;
this.state = ingestService.state;
this.processorParameters = ingestService.processorParameters;
}

private static Map<String, Processor.Factory> processorFactories(List<IngestPlugin> ingestPlugins, Processor.Parameters parameters) {
Expand Down Expand Up @@ -249,15 +254,15 @@ private static Map<String, Processor.Factory> processorFactories(List<IngestPlug
* @param indexRequest The {@link org.elasticsearch.action.index.IndexRequest} object to update.
* @param metadata Cluster metadata from where the pipeline information could be derived.
*/
public static void resolvePipelinesAndUpdateIndexRequest(
public void resolvePipelinesAndUpdateIndexRequest(
final DocWriteRequest<?> originalRequest,
final IndexRequest indexRequest,
final Metadata metadata
) {
resolvePipelinesAndUpdateIndexRequest(originalRequest, indexRequest, metadata, System.currentTimeMillis());
}

static void resolvePipelinesAndUpdateIndexRequest(
void resolvePipelinesAndUpdateIndexRequest(
final DocWriteRequest<?> originalRequest,
final IndexRequest indexRequest,
final Metadata metadata,
Expand All @@ -280,6 +285,7 @@ static void resolvePipelinesAndUpdateIndexRequest(
indexRequest.setPipeline(pipelines.defaultPipeline);
}
indexRequest.setFinalPipeline(pipelines.finalPipeline);
indexRequest.setPluginsPipeline(pipelines.pluginPipelines);
indexRequest.isPipelineResolved(true);
}

Expand Down Expand Up @@ -494,6 +500,14 @@ public Pipeline getPipeline(String id) {
}
}

private List<Pipeline> getPluginsPipelines(String id) {
if (id == null) {
return null;
}

return this.pluginPipelines.get(id);
}

public Map<String, Processor.Factory> getProcessorFactories() {
return processorFactories;
}
Expand Down Expand Up @@ -756,9 +770,18 @@ public void onFailure(Exception e) {
private PipelineIterator getAndResetPipelines(IndexRequest indexRequest) {
final String pipelineId = indexRequest.getPipeline();
indexRequest.setPipeline(NOOP_PIPELINE_NAME);

final String finalPipelineId = indexRequest.getFinalPipeline();
indexRequest.setFinalPipeline(NOOP_PIPELINE_NAME);
return new PipelineIterator(pipelineId, finalPipelineId);

final String pluginPipelineId = indexRequest.getPluginsPipeline();
indexRequest.setPluginsPipeline(NOOP_PIPELINE_NAME);
List<Pipeline> pluginsPipelines = null;
if (pluginPipelineId != null) {
pluginsPipelines = getPluginsPipelines(pluginPipelineId);
}

return new PipelineIterator(pipelineId, finalPipelineId, pluginsPipelines);
}

/**
Expand All @@ -778,36 +801,33 @@ private class PipelineIterator implements Iterator<PipelineSlot> {

private final String defaultPipeline;
private final String finalPipeline;
private final List<Pipeline> pluginsPipelines;

private final Iterator<PipelineSlot> pipelineSlotIterator;

private PipelineIterator(String defaultPipeline, String finalPipeline) {
private PipelineIterator(String defaultPipeline, String finalPipeline, List<Pipeline> pluginsPipelines) {
this.defaultPipeline = NOOP_PIPELINE_NAME.equals(defaultPipeline) ? null : defaultPipeline;
this.finalPipeline = NOOP_PIPELINE_NAME.equals(finalPipeline) ? null : finalPipeline;
this.pluginsPipelines = pluginsPipelines;
this.pipelineSlotIterator = iterator();
}

public PipelineIterator withoutDefaultPipeline() {
return new PipelineIterator(null, finalPipeline);
return new PipelineIterator(null, finalPipeline, pluginsPipelines);
}

private Iterator<PipelineSlot> iterator() {
PipelineSlot defaultPipelineSlot = null, finalPipelineSlot = null;
List<PipelineSlot> slotList = new ArrayList<>();
if (defaultPipeline != null) {
defaultPipelineSlot = new PipelineSlot(defaultPipeline, getPipeline(defaultPipeline), false);
slotList.add(new PipelineSlot(defaultPipeline, getPipeline(defaultPipeline), false));
}
if (finalPipeline != null) {
finalPipelineSlot = new PipelineSlot(finalPipeline, getPipeline(finalPipeline), true);
slotList.add(new PipelineSlot(finalPipeline, getPipeline(finalPipeline), true));
}

if (defaultPipeline != null && finalPipeline != null) {
return List.of(defaultPipelineSlot, finalPipelineSlot).iterator();
} else if (finalPipeline != null) {
return List.of(finalPipelineSlot).iterator();
} else if (defaultPipeline != null) {
return List.of(defaultPipelineSlot).iterator();
} else {
return Collections.emptyIterator();
if (pluginsPipelines != null) {
slotList.addAll(pluginsPipelines.stream().map(pipeline -> new PipelineSlot("plugins", pipeline, false)).toList());
}
return slotList.iterator();
}

@Override
Expand Down Expand Up @@ -1107,15 +1127,49 @@ public void applyClusterState(final ClusterChangedEvent event) {
ingestClusterStateListeners.forEach(consumer -> consumer.accept(state));

IngestMetadata newIngestMetadata = state.getMetadata().custom(IngestMetadata.TYPE);
if (newIngestMetadata == null) {
return;
if (newIngestMetadata != null) {
try {
innerUpdatePipelines(newIngestMetadata);
} catch (ElasticsearchParseException e) {
logger.warn("failed to update ingest pipelines", e);
}
}

try {
innerUpdatePipelines(newIngestMetadata);
} catch (ElasticsearchParseException e) {
logger.warn("failed to update ingest pipelines", e);
updatePluginPipelines(event);
}

private synchronized void updatePluginPipelines(ClusterChangedEvent event) {

Map<String, List<Pipeline>> updatedPluginPipelines = new HashMap<>(pluginPipelines);

Map<String, IndexMetadata> currentIndexMetadataMap = state.metadata().indices();
Map<String, IndexMetadata> previousIndexMetadataMap = event.previousState().metadata().indices();
HashSet<String> indicesNames = new HashSet<>(currentIndexMetadataMap.keySet());
indicesNames.addAll(previousIndexMetadataMap.keySet());

for (String indexName : indicesNames) {
IndexMetadata currentIndexMetadata = currentIndexMetadataMap.get(indexName);
if (currentIndexMetadata == null) {
// Index has been removed - remove pipelines if they exist
updatedPluginPipelines.remove(indexName);
} else {
IndexMetadata previousIndexMetadata = previousIndexMetadataMap.get(indexName);
if ((previousIndexMetadata == null) || (currentIndexMetadata.equals(previousIndexMetadata) == false)) {
// Did the index not exist before, or it changed? (re)create the pipeline
List<Pipeline> pipelineList = ingestPlugins.stream()
.map(plugin -> plugin.getIngestPipeline(currentIndexMetadata, processorParameters))
.flatMap(Optional::stream)
.toList();
if (pipelineList.isEmpty()) {
updatedPluginPipelines.remove(indexName);
} else {
updatedPluginPipelines.put(indexName, pipelineList);
}
}
}
}

pluginPipelines = Map.copyOf(updatedPluginPipelines);
}

synchronized void innerUpdatePipelines(IngestMetadata newIngestMetadata) {
Expand Down Expand Up @@ -1295,7 +1349,7 @@ record PipelineHolder(PipelineConfiguration configuration, Pipeline pipeline) {
}
}

private static Optional<Pipelines> resolvePipelinesFromMetadata(
private Optional<Pipelines> resolvePipelinesFromMetadata(
DocWriteRequest<?> originalRequest,
IndexRequest indexRequest,
Metadata metadata,
Expand All @@ -1308,8 +1362,9 @@ private static Optional<Pipelines> resolvePipelinesFromMetadata(
.get(IndexNameExpressionResolver.resolveDateMathExpression(originalRequest.index(), epochMillis));
}
// check the alias for the index request (this is how normal index requests are modeled)
if (indexMetadata == null && indexRequest.index() != null) {
IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(indexRequest.index());
String indexName = indexRequest.index();
if (indexMetadata == null && indexName != null) {
IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(indexName);
if (indexAbstraction != null && indexAbstraction.getWriteIndex() != null) {
indexMetadata = metadata.index(indexAbstraction.getWriteIndex());
}
Expand All @@ -1327,7 +1382,14 @@ private static Optional<Pipelines> resolvePipelinesFromMetadata(
}

final Settings settings = indexMetadata.getSettings();
return Optional.of(new Pipelines(IndexSettings.DEFAULT_PIPELINE.get(settings), IndexSettings.FINAL_PIPELINE.get(settings)));
List<Pipeline> pluginsPipelines = getPluginsPipelines(indexName);
return Optional.of(
new Pipelines(
IndexSettings.DEFAULT_PIPELINE.get(settings),
IndexSettings.FINAL_PIPELINE.get(settings),
pluginsPipelines == null ? NOOP_PIPELINE_NAME : indexName
)
);
}

private static Optional<Pipelines> resolvePipelinesFromIndexTemplates(IndexRequest indexRequest, Metadata metadata) {
Expand All @@ -1341,7 +1403,9 @@ private static Optional<Pipelines> resolvePipelinesFromIndexTemplates(IndexReque
String v2Template = MetadataIndexTemplateService.findV2Template(metadata, indexRequest.index(), false);
if (v2Template != null) {
final Settings settings = MetadataIndexTemplateService.resolveSettings(metadata, v2Template);
return Optional.of(new Pipelines(IndexSettings.DEFAULT_PIPELINE.get(settings), IndexSettings.FINAL_PIPELINE.get(settings)));
return Optional.of(
new Pipelines(IndexSettings.DEFAULT_PIPELINE.get(settings), IndexSettings.FINAL_PIPELINE.get(settings), null)
);
}

String defaultPipeline = null;
Expand Down Expand Up @@ -1371,29 +1435,33 @@ private static Optional<Pipelines> resolvePipelinesFromIndexTemplates(IndexReque
defaultPipeline = Objects.requireNonNullElse(defaultPipeline, NOOP_PIPELINE_NAME);
finalPipeline = Objects.requireNonNullElse(finalPipeline, NOOP_PIPELINE_NAME);

return Optional.of(new Pipelines(defaultPipeline, finalPipeline));
// TODO We can't resolve yet the plugins pipeline as we don't have the IndexMetadata. Check if we can do it.
return Optional.of(new Pipelines(defaultPipeline, finalPipeline, NOOP_PIPELINE_NAME));
}

/**
* Checks whether an IndexRequest has at least one pipeline defined.
* <p>
* This method assumes that the pipelines are beforehand resolved.
*/
public static boolean hasPipeline(IndexRequest indexRequest) {
public boolean hasPipeline(IndexRequest indexRequest) {
assert indexRequest.isPipelineResolved();
assert indexRequest.getPipeline() != null;
assert indexRequest.getFinalPipeline() != null;
assert indexRequest.getPluginsPipeline() != null;
return NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false
|| NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false;
|| NOOP_PIPELINE_NAME.equals(indexRequest.getFinalPipeline()) == false
|| NOOP_PIPELINE_NAME.equals(indexRequest.getPluginsPipeline()) == false;
}

private record Pipelines(String defaultPipeline, String finalPipeline) {
private record Pipelines(String defaultPipeline, String finalPipeline, String pluginPipelines) {

private static final Pipelines NO_PIPELINES_DEFINED = new Pipelines(NOOP_PIPELINE_NAME, NOOP_PIPELINE_NAME);
private static final Pipelines NO_PIPELINES_DEFINED = new Pipelines(NOOP_PIPELINE_NAME, NOOP_PIPELINE_NAME, NOOP_PIPELINE_NAME);

public Pipelines {
Objects.requireNonNull(defaultPipeline);
Objects.requireNonNull(finalPipeline);
Objects.requireNonNull(pluginPipelines);
}
}
}
Loading

0 comments on commit b81c8f4

Please sign in to comment.