Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Moving InferenceProcessorInfoExtractor to core to make available to inference plugin #109183

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.ml.utils;

/**
* Provides access to constants for core, ml, and inference plugins
*/
public class InferenceProcessorConstants {
public static final String TYPE = "inference";
public static final String TARGET_FIELD = "target_field";
public static final String FIELD_MAP = "field_map";
public static final String INFERENCE_CONFIG = "inference_config";

private InferenceProcessorConstants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* 2.0.
*/

package org.elasticsearch.xpack.ml.utils;
package org.elasticsearch.xpack.core.ml.utils;

import org.apache.lucene.util.Counter;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -16,6 +16,7 @@
import org.elasticsearch.transport.Transports;

import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -24,13 +25,11 @@

import static org.elasticsearch.inference.InferenceResults.MODEL_ID_RESULTS_FIELD;
import static org.elasticsearch.ingest.Pipeline.PROCESSORS_KEY;
import static org.elasticsearch.xpack.ml.inference.ingest.InferenceProcessor.TYPE;

/**
* Utilities for extracting information around inference processors from IngestMetadata
*/
public final class InferenceProcessorInfoExtractor {

private static final String FOREACH_PROCESSOR_NAME = "foreach";
// Any more than 10 nestings of processors, we stop searching for inference processor definitions
private static final int MAX_INFERENCE_PROCESSOR_SEARCH_RECURSIONS = 10;
Expand Down Expand Up @@ -131,6 +130,38 @@ public static Map<String, Set<String>> pipelineIdsByResource(ClusterState state,
return pipelineIdsByModelIds;
}

/**
* @param state Current {@link ClusterState}
* @return a map from Model or Deployment IDs or Aliases to each pipeline referencing them.
*/
@SuppressWarnings("unchecked")
public static Set<String> pipelineIdsForResource(ClusterState state, Set<String> ids) {
assert Transports.assertNotTransportThread("non-trivial nested loops over cluster state structures");
Set<String> pipelineIds = new HashSet<>();
Metadata metadata = state.metadata();
if (metadata == null) {
return pipelineIds;
}
IngestMetadata ingestMetadata = metadata.custom(IngestMetadata.TYPE);
if (ingestMetadata == null) {
return pipelineIds;
}
ingestMetadata.getPipelines().forEach((pipelineId, configuration) -> {
Map<String, Object> configMap = configuration.getConfigAsMap();
List<Map<String, Object>> processorConfigs = ConfigurationUtils.readList(null, null, configMap, PROCESSORS_KEY);
for (Map<String, Object> processorConfigWithKey : processorConfigs) {
for (Map.Entry<String, Object> entry : processorConfigWithKey.entrySet()) {
addModelsAndPipelines(entry.getKey(), pipelineId, (Map<String, Object>) entry.getValue(), pam -> {
if (ids.contains(pam.modelIdOrAlias)) {
pipelineIds.add(pipelineId);
}
}, 0);
}
}
});
return pipelineIds;
}

@SuppressWarnings("unchecked")
private static void addModelsAndPipelines(
String processorType,
Expand All @@ -146,7 +177,7 @@ private static void addModelsAndPipelines(
if (processorType == null || processorDefinition == null) {
return;
}
if (TYPE.equals(processorType)) {
if (InferenceProcessorConstants.TYPE.equals(processorType)) {
String modelId = (String) processorDefinition.get(MODEL_ID_RESULTS_FIELD);
if (modelId != null) {
handler.accept(new PipelineAndModel(pipelineId, modelId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* 2.0.
*/

package org.elasticsearch.xpack.ml.utils;
package org.elasticsearch.xpack.core.ml.utils;

import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -23,7 +23,6 @@
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.core.ml.inference.trainedmodel.RegressionConfig;
import org.elasticsearch.xpack.ml.inference.ingest.InferenceProcessor;

import java.io.IOException;
import java.net.InetAddress;
Expand Down Expand Up @@ -217,15 +216,15 @@ private static Map<String, Object> forEachProcessorWithInference(String modelId)
}

private static Map<String, Object> inferenceProcessorForModel(String modelId) {
return Collections.singletonMap(InferenceProcessor.TYPE, new HashMap<>() {
return Collections.singletonMap(InferenceProcessorConstants.TYPE, new HashMap<>() {
{
put(InferenceResults.MODEL_ID_RESULTS_FIELD, modelId);
put(
InferenceProcessor.INFERENCE_CONFIG,
InferenceProcessorConstants.INFERENCE_CONFIG,
Collections.singletonMap(RegressionConfig.NAME.getPreferredName(), Collections.emptyMap())
);
put(InferenceProcessor.TARGET_FIELD, "new_field");
put(InferenceProcessor.FIELD_MAP, Collections.singletonMap("source", "dest"));
put(InferenceProcessorConstants.TARGET_FIELD, "new_field");
put(InferenceProcessorConstants.FIELD_MAP, Collections.singletonMap("source", "dest"));
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.inference.action.DeleteInferenceEndpointAction;
import org.elasticsearch.xpack.core.ml.utils.InferenceProcessorInfoExtractor;
import org.elasticsearch.xpack.inference.common.InferenceExceptions;
import org.elasticsearch.xpack.inference.registry.ModelRegistry;
import org.elasticsearch.xpack.inference.utils.InferenceProcessorInfoExtractor;

import java.util.Set;

Expand Down

This file was deleted.

Loading