Skip to content

Commit

Permalink
Some optimizations on calculating plugin ingestion pipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
carlosdelest committed Nov 20, 2023
1 parent 3028bd6 commit 2cae30c
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 27 deletions.
52 changes: 32 additions & 20 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -1125,29 +1125,41 @@ public void applyClusterState(final ClusterChangedEvent event) {
}

private synchronized void updatePluginPipelines(ClusterChangedEvent event) {
// TODO Update just for changed indices
Map<String, IndexMetadata> currentIndexMetadata = state.metadata().indices();
Map<String, IndexMetadata> previousIndexMetadata = event.previousState().metadata().indices();
if (currentIndexMetadata.equals(previousIndexMetadata) == false) {
Map<String, List<Pipeline>> updatedPluginPipelines = new HashMap<>();
HashSet<String> indicesNames = new HashSet<>(currentIndexMetadata.keySet());
indicesNames.addAll(previousIndexMetadata.keySet());
for (String indexName : indicesNames) {
List<Pipeline> pipelineList = ingestPlugins.stream()
.map(
plugin -> plugin.getIngestPipeline(
currentIndexMetadata.get(indexName),
previousIndexMetadata.get(indexName),
processorParameters
)
).flatMap(Optional::stream)
.toList();
if (pipelineList.isEmpty() == false) {
updatedPluginPipelines.put(indexName, pipelineList);

Map<String, List<Pipeline>> updatedPluginPipelines = new HashMap<>(pluginPipelines);

Map<String, IndexMetadata> currentIndexMetadataMap = state.metadata().indices();
Map<String, IndexMetadata> previousIndexMetadataMap = event.previousState().metadata().indices();
HashSet<String> indicesNames = new HashSet<>(currentIndexMetadataMap.keySet());
indicesNames.addAll(previousIndexMetadataMap.keySet());

for (String indexName : indicesNames) {
IndexMetadata currentIndexMetadata = currentIndexMetadataMap.get(indexName);
if (currentIndexMetadata == null) {
// Index has been removed - remove pipelines if they exist
updatedPluginPipelines.remove(indexName);
} else {
IndexMetadata previousIndexMetadata = previousIndexMetadataMap.get(indexName);
if ((previousIndexMetadata == null) || (currentIndexMetadata.equals(previousIndexMetadata) == false)) {
// Did the index not exist before, or it changed? (re)create the pipeline
List<Pipeline> pipelineList = ingestPlugins.stream()
.map(
plugin -> plugin.getIngestPipeline(
currentIndexMetadata,
processorParameters
)
).flatMap(Optional::stream)
.toList();
if (pipelineList.isEmpty()) {
updatedPluginPipelines.remove(indexName);
} else {
updatedPluginPipelines.put(indexName, pipelineList);
}
}
}
pluginPipelines = Map.copyOf(updatedPluginPipelines);
}

pluginPipelines = Map.copyOf(updatedPluginPipelines);
}

synchronized void innerUpdatePipelines(IngestMetadata newIngestMetadata, Metadata metadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ default Map<String, Processor.Factory> getProcessors(Processor.Parameters parame
return Map.of();
}

default Optional<Pipeline> getIngestPipeline(IndexMetadata current, IndexMetadata previous, Processor.Parameters parameters) {
default Optional<Pipeline> getIngestPipeline(IndexMetadata indexMetadata, Processor.Parameters parameters) {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2281,13 +2281,13 @@ public Map<String, Mapper.TypeParser> getMappers() {
}

@Override
public Optional<Pipeline> getIngestPipeline(IndexMetadata current, IndexMetadata previous, Processor.Parameters parameters) {
public Optional<Pipeline> getIngestPipeline(IndexMetadata indexMetadata, Processor.Parameters parameters) {

if (current == null) {
if (indexMetadata == null) {
return Optional.empty();
}

Map<String, List<String>> inferenceModelsForFields = current.getInferenceModelsForFields();
Map<String, List<String>> inferenceModelsForFields = indexMetadata.getInferenceModelsForFields();
if (inferenceModelsForFields.isEmpty()) {
return Optional.empty();
}
Expand All @@ -2310,7 +2310,7 @@ public Optional<Pipeline> getIngestPipeline(IndexMetadata current, IndexMetadata
SemanticTextInferenceProcessor semanticTextInferenceProcessor = new SemanticTextInferenceProcessor(
parameters.client,
inferenceAuditorSetOnce.get(),
"semantic text processor for index " + current.getIndex().getName() + ", model " + modelId,
"semantic text processor for index " + indexMetadata.getIndex().getName() + ", model " + modelId,
inferenceModelsForFields
);
inferenceProcessors.add(semanticTextInferenceProcessor);
Expand All @@ -2325,10 +2325,10 @@ public Optional<Pipeline> getIngestPipeline(IndexMetadata current, IndexMetadata
}
}

String inferencePipelineName = "_semantic_text_inference_" + current.getIndex().getName();
String inferencePipelineName = "_semantic_text_inference_" + indexMetadata.getIndex().getName();
Pipeline inferencePipeline = new Pipeline(
inferencePipelineName,
"semantic text pipeline for index " + current.getIndex().getName(),
"semantic text pipeline for index " + indexMetadata.getIndex().getName(),
null,
null,
new CompoundProcessor(inferenceProcessors.toArray(new Processor[] {}))
Expand Down

0 comments on commit 2cae30c

Please sign in to comment.