Skip to content

Commit

Permalink
ml inference ingest processor support for local models
Browse files Browse the repository at this point in the history
Signed-off-by: Bhavana Ramaram <[email protected]>
  • Loading branch information
rbhavna committed Jun 5, 2024
1 parent 865a424 commit 5c6aa08
Show file tree
Hide file tree
Showing 6 changed files with 420 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,10 @@ public void loadExtensions(ExtensionLoader loader) {
public Map<String, org.opensearch.ingest.Processor.Factory> getProcessors(org.opensearch.ingest.Processor.Parameters parameters) {
Map<String, org.opensearch.ingest.Processor.Factory> processors = new HashMap<>();
processors
.put(MLInferenceIngestProcessor.TYPE, new MLInferenceIngestProcessor.Factory(parameters.scriptService, parameters.client));
.put(
MLInferenceIngestProcessor.TYPE,
new MLInferenceIngestProcessor.Factory(parameters.scriptService, parameters.client, xContentRegistry)
);
return Collections.unmodifiableMap(processors);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,4 @@ public InferenceProcessorAttributes(
this.maxPredictionTask = maxPredictionTask;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@

import static org.opensearch.ml.processor.InferenceProcessorAttributes.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -19,11 +21,14 @@
import org.opensearch.client.Client;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.ingest.AbstractProcessor;
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.ingest.IngestDocument;
import org.opensearch.ingest.Processor;
import org.opensearch.ingest.ValueSource;
import org.opensearch.ml.common.FunctionName;
import org.opensearch.ml.common.output.MLOutput;
import org.opensearch.ml.common.output.model.ModelTensorOutput;
import org.opensearch.ml.common.transport.MLTaskResponse;
import org.opensearch.ml.common.transport.prediction.MLPredictionTaskAction;
Expand All @@ -45,17 +50,26 @@ public class MLInferenceIngestProcessor extends AbstractProcessor implements Mod
public static final String DOT_SYMBOL = ".";
private final InferenceProcessorAttributes inferenceProcessorAttributes;
private final boolean ignoreMissing;
private final String functionName;
private final boolean fullResponsePath;
private final boolean ignoreFailure;
private final boolean override;
private final String modelInput;
private final ScriptService scriptService;
private static Client client;
public static final String TYPE = "ml_inference";
public static final String DEFAULT_OUTPUT_FIELD_NAME = "inference_results";
// allow to ignore a field from mapping is not present in the document, and when the outfield is not found in the
// prediction outcomes, return the whole prediction outcome by skipping filtering
public static final String IGNORE_MISSING = "ignore_missing";
public static final String OVERRIDE = "override";
public static final String FUNCTION_NAME = "function_name";
public static final String FULL_RESPONSE_PATH = "full_response_path";
public static final String MODEL_INPUT = "model_input";
// At default, ml inference processor allows maximum 10 prediction tasks running in parallel
// it can be overwritten using max_prediction_tasks when creating processor
public static final int DEFAULT_MAX_PREDICTION_TASKS = 10;
private final NamedXContentRegistry xContentRegistry;

private Configuration suppressExceptionConfiguration = Configuration
.builder()
Expand All @@ -71,9 +85,14 @@ protected MLInferenceIngestProcessor(
String tag,
String description,
boolean ignoreMissing,
String functionName,
boolean fullResponsePath,
boolean ignoreFailure,
boolean override,
String modelInput,
ScriptService scriptService,
Client client
Client client,
NamedXContentRegistry xContentRegistry
) {
super(tag, description);
this.inferenceProcessorAttributes = new InferenceProcessorAttributes(
Expand All @@ -84,9 +103,14 @@ protected MLInferenceIngestProcessor(
maxPredictionTask
);
this.ignoreMissing = ignoreMissing;
this.functionName = functionName;
this.fullResponsePath = fullResponsePath;
this.ignoreFailure = ignoreFailure;
this.override = override;
this.modelInput = modelInput;
this.scriptService = scriptService;
this.client = client;
this.xContentRegistry = xContentRegistry;
}

/**
Expand Down Expand Up @@ -162,10 +186,44 @@ private void processPredictions(
List<Map<String, String>> processOutputMap,
int inputMapIndex,
int inputMapSize
) {
) throws IOException {
Map<String, String> modelParameters = new HashMap<>();
Map<String, String> modelConfigs = new HashMap<>();

if (inferenceProcessorAttributes.getModelConfigMaps() != null) {
modelParameters.putAll(inferenceProcessorAttributes.getModelConfigMaps());
modelConfigs.putAll(inferenceProcessorAttributes.getModelConfigMaps());
}
Map<String, String> outputMapping = processOutputMap.get(inputMapIndex);

Map<String, Object> ingestDocumentSourceAndMetaData = new HashMap<>();
ingestDocumentSourceAndMetaData.putAll(ingestDocument.getSourceAndMetadata());
ingestDocumentSourceAndMetaData.put(IngestDocument.INGEST_KEY, ingestDocument.getIngestMetadata());

Map<String, List<String>> newOutputMapping = new HashMap<>();
for (Map.Entry<String, String> entry : outputMapping.entrySet()) {
String newDocumentFieldName = entry.getKey();
List<String> dotPathsInArray = writeNewDotPathForNestedObject(ingestDocumentSourceAndMetaData, newDocumentFieldName);
newOutputMapping.put(newDocumentFieldName, dotPathsInArray);
}

for (Map.Entry<String, String> entry : outputMapping.entrySet()) {
String newDocumentFieldName = entry.getKey();
List<String> dotPaths = newOutputMapping.get(newDocumentFieldName);

int existingFields = 0;
for (String path : dotPaths) {
if (ingestDocument.hasField(path)) {
existingFields++;
}
}
if (!override && existingFields == dotPaths.size()) {
newOutputMapping.remove(newDocumentFieldName);
}
}
if (newOutputMapping.size() == 0) {
batchPredictionListener.onResponse(null);
return;
}
// when no input mapping is provided, default to read all fields from documents as model input
if (inputMapSize == 0) {
Expand All @@ -184,15 +242,30 @@ private void processPredictions(
}
}

ActionRequest request = getRemoteModelInferenceRequest(modelParameters, inferenceProcessorAttributes.getModelId());
Set<String> inputMapKeys = new HashSet<>(modelParameters.keySet());
inputMapKeys.removeAll(modelConfigs.keySet());

Map<String, String> inputMappings = new HashMap<>();
for (String k : inputMapKeys) {
inputMappings.put(k, modelParameters.get(k));
}
ActionRequest request = getRemoteModelInferenceRequest(
xContentRegistry,
modelParameters,
modelConfigs,
inputMappings,
inferenceProcessorAttributes.getModelId(),
functionName,
modelInput
);

client.execute(MLPredictionTaskAction.INSTANCE, request, new ActionListener<>() {

@Override
public void onResponse(MLTaskResponse mlTaskResponse) {
ModelTensorOutput modelTensorOutput = (ModelTensorOutput) mlTaskResponse.getOutput();
MLOutput mlOutput = mlTaskResponse.getOutput();
if (processOutputMap == null || processOutputMap.isEmpty()) {
appendFieldValue(modelTensorOutput, null, DEFAULT_OUTPUT_FIELD_NAME, ingestDocument);
appendFieldValue(mlOutput, null, DEFAULT_OUTPUT_FIELD_NAME, ingestDocument);
} else {
// outMapping serves as a filter to modelTensorOutput, the fields that are not specified
// in the outputMapping will not write to document
Expand All @@ -202,14 +275,10 @@ public void onResponse(MLTaskResponse mlTaskResponse) {
// document field as key, model field as value
String newDocumentFieldName = entry.getKey();
String modelOutputFieldName = entry.getValue();
if (ingestDocument.hasField(newDocumentFieldName)) {
throw new IllegalArgumentException(
"document already has field name "
+ newDocumentFieldName
+ ". Not allow to overwrite the same field name, please check output_map."
);
if (!newOutputMapping.containsKey(newDocumentFieldName)) {
continue;
}
appendFieldValue(modelTensorOutput, modelOutputFieldName, newDocumentFieldName, ingestDocument);
appendFieldValue(mlOutput, modelOutputFieldName, newDocumentFieldName, ingestDocument);
}
}
batchPredictionListener.onResponse(null);
Expand Down Expand Up @@ -322,16 +391,16 @@ private void appendFieldValue(

modelOutputValue = getModelOutputValue(modelTensorOutput, modelOutputFieldName, ignoreMissing);

Map<String, Object> ingestDocumentSourceAndMetaData = new HashMap<>();
ingestDocumentSourceAndMetaData.putAll(ingestDocument.getSourceAndMetadata());
ingestDocumentSourceAndMetaData.put(IngestDocument.INGEST_KEY, ingestDocument.getIngestMetadata());
List<String> dotPathsInArray = writeNewDotPathForNestedObject(ingestDocumentSourceAndMetaData, newDocumentFieldName);
List<String> dotPathsInArray = writeNewDotPathForNestedObject(ingestDocument.getSourceAndMetadata(), newDocumentFieldName);

if (dotPathsInArray.size() == 1) {
ValueSource ingestValue = ValueSource.wrap(modelOutputValue, scriptService);
TemplateScript.Factory ingestField = ConfigurationUtils
.compileTemplate(TYPE, tag, dotPathsInArray.get(0), dotPathsInArray.get(0), scriptService);
ingestDocument.setFieldValue(ingestField, ingestValue, ignoreMissing);
if (!ingestDocument.hasField(dotPathsInArray.get(0)) || override) {
ValueSource ingestValue = ValueSource.wrap(modelOutputValue, scriptService);
TemplateScript.Factory ingestField = ConfigurationUtils
.compileTemplate(TYPE, tag, dotPathsInArray.get(0), dotPathsInArray.get(0), scriptService);

ingestDocument.setFieldValue(ingestField, ingestValue, ignoreMissing);
}
} else {
if (!(modelOutputValue instanceof List)) {
throw new IllegalArgumentException("Model output is not an array, cannot assign to array in documents.");
Expand All @@ -353,18 +422,73 @@ private void appendFieldValue(
// Iterate over dotPathInArray
for (int i = 0; i < dotPathsInArray.size(); i++) {
String dotPathInArray = dotPathsInArray.get(i);
Object modelOutputValueInArray = modelOutputValueArray.get(i);
ValueSource ingestValue = ValueSource.wrap(modelOutputValueInArray, scriptService);
TemplateScript.Factory ingestField = ConfigurationUtils
.compileTemplate(TYPE, tag, dotPathInArray, dotPathInArray, scriptService);
ingestDocument.setFieldValue(ingestField, ingestValue, ignoreMissing);
if (!ingestDocument.hasField(dotPathInArray) || override) {
Object modelOutputValueInArray = modelOutputValueArray.get(i);
ValueSource ingestValue = ValueSource.wrap(modelOutputValueInArray, scriptService);
TemplateScript.Factory ingestField = ConfigurationUtils
.compileTemplate(TYPE, tag, dotPathInArray, dotPathInArray, scriptService);
ingestDocument.setFieldValue(ingestField, ingestValue, ignoreMissing);
}
}
}
} else {
throw new RuntimeException("model inference output cannot be null");
}
}

private void appendFieldValue(
MLOutput mlOutput,
String modelOutputFieldName,
String newDocumentFieldName,
IngestDocument ingestDocument
) {

if (mlOutput == null) {
throw new RuntimeException("model inference output is null");
}

Object modelOutputValue = getModelOutputValue(mlOutput, modelOutputFieldName, ignoreMissing, fullResponsePath);

Map<String, Object> ingestDocumentSourceAndMetaData = new HashMap<>();
ingestDocumentSourceAndMetaData.putAll(ingestDocument.getSourceAndMetadata());
ingestDocumentSourceAndMetaData.put(IngestDocument.INGEST_KEY, ingestDocument.getIngestMetadata());
List<String> dotPathsInArray = writeNewDotPathForNestedObject(ingestDocumentSourceAndMetaData, newDocumentFieldName);

if (dotPathsInArray.size() == 1) {
ValueSource ingestValue = ValueSource.wrap(modelOutputValue, scriptService);
TemplateScript.Factory ingestField = ConfigurationUtils
.compileTemplate(TYPE, tag, dotPathsInArray.get(0), dotPathsInArray.get(0), scriptService);
ingestDocument.setFieldValue(ingestField, ingestValue, ignoreMissing);
} else {
if (!(modelOutputValue instanceof List)) {
throw new IllegalArgumentException("Model output is not an array, cannot assign to array in documents.");
}
List<?> modelOutputValueArray = (List<?>) modelOutputValue;
// check length of the prediction array to be the same of the document array
if (dotPathsInArray.size() != modelOutputValueArray.size()) {
throw new RuntimeException(
"the prediction field: "
+ modelOutputFieldName
+ " is an array in size of "
+ modelOutputValueArray.size()
+ " but the document field array from field "
+ newDocumentFieldName
+ " is in size of "
+ dotPathsInArray.size()
);
}
// Iterate over dotPathInArray
for (int i = 0; i < dotPathsInArray.size(); i++) {
String dotPathInArray = dotPathsInArray.get(i);
Object modelOutputValueInArray = modelOutputValueArray.get(i);
ValueSource ingestValue = ValueSource.wrap(modelOutputValueInArray, scriptService);
TemplateScript.Factory ingestField = ConfigurationUtils
.compileTemplate(TYPE, tag, dotPathInArray, dotPathInArray, scriptService);
ingestDocument.setFieldValue(ingestField, ingestValue, ignoreMissing);
}
}
}

@Override
public String getType() {
return TYPE;
Expand All @@ -374,16 +498,18 @@ public static class Factory implements Processor.Factory {

private final ScriptService scriptService;
private final Client client;
private final NamedXContentRegistry xContentRegistry;

/**
* Constructs a new instance of the Factory class.
*
* @param scriptService the ScriptService instance to be used by the Factory
* @param client the Client instance to be used by the Factory
*/
public Factory(ScriptService scriptService, Client client) {
public Factory(ScriptService scriptService, Client client, NamedXContentRegistry xContentRegistry) {
this.scriptService = scriptService;
this.client = client;
this.xContentRegistry = xContentRegistry;
}

/**
Expand All @@ -410,6 +536,14 @@ public MLInferenceIngestProcessor create(
int maxPredictionTask = ConfigurationUtils
.readIntProperty(TYPE, processorTag, config, MAX_PREDICTION_TASKS, DEFAULT_MAX_PREDICTION_TASKS);
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, IGNORE_MISSING, false);
boolean override = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, OVERRIDE, false);
String functionName = ConfigurationUtils
.readStringProperty(TYPE, processorTag, config, FUNCTION_NAME, FunctionName.REMOTE.name());
String modelInput = ConfigurationUtils
.readStringProperty(TYPE, processorTag, config, MODEL_INPUT, "{ \"parameters\": ${ml_inference.parameters} }");
boolean defaultValue = !functionName.equals("remote");
boolean fullResponsePath = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, FULL_RESPONSE_PATH, defaultValue);

boolean ignoreFailure = ConfigurationUtils
.readBooleanProperty(TYPE, processorTag, config, ConfigurationUtils.IGNORE_FAILURE_KEY, false);
// convert model config user input data structure to Map<String, String>
Expand Down Expand Up @@ -440,11 +574,16 @@ public MLInferenceIngestProcessor create(
processorTag,
description,
ignoreMissing,
functionName,
fullResponsePath,
ignoreFailure,
override,
modelInput,
scriptService,
client
client,
xContentRegistry
);
}
}

}
}
Loading

0 comments on commit 5c6aa08

Please sign in to comment.