diff --git a/plugin/src/main/java/org/opensearch/ml/autoredeploy/MLModelAutoReDeployer.java b/plugin/src/main/java/org/opensearch/ml/autoredeploy/MLModelAutoReDeployer.java index 4441a3d592..ca1b3a2f15 100644 --- a/plugin/src/main/java/org/opensearch/ml/autoredeploy/MLModelAutoReDeployer.java +++ b/plugin/src/main/java/org/opensearch/ml/autoredeploy/MLModelAutoReDeployer.java @@ -241,7 +241,9 @@ private void queryRunningModels(ActionListener listener) { String[] includes = new String[] { MLModel.AUTO_REDEPLOY_RETRY_TIMES_FIELD, MLModel.PLANNING_WORKER_NODES_FIELD, - MLModel.DEPLOY_TO_ALL_NODES_FIELD }; + MLModel.DEPLOY_TO_ALL_NODES_FIELD, + MLModel.FUNCTION_NAME_FIELD, + MLModel.ALGORITHM_FIELD}; String[] excludes = new String[] { MLModel.MODEL_CONTENT_FIELD, MLModel.OLD_MODEL_CONTENT_FIELD }; FetchSourceContext fetchContext = new FetchSourceContext(true, includes, excludes); @@ -263,6 +265,10 @@ private void triggerModelRedeploy(ModelAutoRedeployArrangement modelAutoRedeploy String functionName = (String) Optional .ofNullable(sourceAsMap.get(MLModel.FUNCTION_NAME_FIELD)) .orElse(sourceAsMap.get(MLModel.ALGORITHM_FIELD)); + if (functionName == null) { + log.error("Model function_name or algorithm is null, model is not in correct status, please check the model, model id is: {}", modelId); + return; + } if (FunctionName.REMOTE == FunctionName.from(functionName)) { log.info("Skipping redeploying remote model {} as remote model deployment can be done at prediction time.", modelId); return; diff --git a/plugin/src/test/java/org/opensearch/ml/autoredeploy/MLModelAutoReDeployerIT.java b/plugin/src/test/java/org/opensearch/ml/autoredeploy/MLModelAutoReDeployerIT.java new file mode 100644 index 0000000000..d6eca1c2bc --- /dev/null +++ b/plugin/src/test/java/org/opensearch/ml/autoredeploy/MLModelAutoReDeployerIT.java @@ -0,0 +1,73 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.ml.autoredeploy; + +import lombok.SneakyThrows; +import org.junit.Before; +import org.opensearch.ml.common.MLTaskState; +import org.opensearch.ml.rest.MLCommonsRestTestCase; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import static org.opensearch.ml.common.MLTask.MODEL_ID_FIELD; + +public class MLModelAutoReDeployerIT extends MLCommonsRestTestCase { + + @Before + public void setup() throws IOException { + prepareModel(); + } + + @SneakyThrows + private void prepareModel() { + String requestBody = Files.readString(Path.of(this + .getClass() + .getClassLoader() + .getResource("org/opensearch/ml/autoredeploy/TracedSmallModelRequest.json") + .toURI())); + String registerFirstModelTaskId = registerModel(requestBody); + String registerSecondModelTaskId = registerModel(requestBody); + waitForTask(registerFirstModelTaskId, MLTaskState.COMPLETED); + getTask(client(), registerFirstModelTaskId, response -> { + String firstModelId = (String) response.get(MODEL_ID_FIELD); + try { + String deployFirstModelTaskId = deployModel(firstModelId); + getTask(client(), registerSecondModelTaskId, response1 -> { + String secondModelId = (String) response1.get(MODEL_ID_FIELD); + try { + /** + * At this time point, the model auto redeployer should be querying the deploying/deploy failed/partially deployed models. + * The original deploy model task should be able to complete successfully, if not it means the + * org.opensearch.ml.action.forward.TransportForwardAction.triggerNextModelDeployAndCheckIfRestRetryTimes might throw exception + * which cause by org.opensearch.ml.autoredeploy.MLModelAutoReDeployer#redeployAModel. The auto redeploy constructs an arrangement + * with two models, the first model deploy done event will trigger the auto redeploy's next model deploy, and if during this + * any error occurs, the first model deploy task status won't be updated to complete. So if this IT can pass, then it means the + * next model auto redeploy trigger is correct. + */ + String deploySecondModelTaskId = deployModel(secondModelId); + waitForTask(deploySecondModelTaskId, MLTaskState.COMPLETED); + } catch (Exception e) { + fail(e.getMessage()); + } + }); + waitForTask(deployFirstModelTaskId, MLTaskState.COMPLETED); + } catch (Exception e) { + logger.error(e.getMessage(), e); + fail(e.getMessage()); + } + }); + } + + public void testModelAutoRedeploy() { + // This is a tricky IT to test model auto redeploy, since model auto redeploy is not easily to test with IT, so adding this test + // to mimic a case when a cluster spin up and immediately deploy a model, the model auto redeployer also selected this deploying + // model to deploy, and the original deploy task should complete and the auto redeploy task should fail in background. + } +} diff --git a/plugin/src/test/resources/org/opensearch/ml/autoredeploy/TracedSmallModelRequest.json b/plugin/src/test/resources/org/opensearch/ml/autoredeploy/TracedSmallModelRequest.json new file mode 100644 index 0000000000..9fc53f3b91 --- /dev/null +++ b/plugin/src/test/resources/org/opensearch/ml/autoredeploy/TracedSmallModelRequest.json @@ -0,0 +1,14 @@ +{ + "name": "traced_small_model", + "version": "1.0.0", + "model_format": "TORCH_SCRIPT", + "model_task_type": "text_embedding", + "model_content_hash_value": "e13b74006290a9d0f58c1376f9629d4ebc05a0f9385f40db837452b167ae9021", + "model_config": { + "model_type": "bert", + "embedding_dimension": 768, + "framework_type": "sentence_transformers", + "all_config": "{\"architectures\":[\"BertModel\"],\"max_position_embeddings\":512,\"model_type\":\"bert\",\"num_attention_heads\":12,\"num_hidden_layers\":6}" + }, + "url": "https://github.com/opensearch-project/ml-commons/blob/2.x/ml-algorithms/src/test/resources/org/opensearch/ml/engine/algorithms/text_embedding/traced_small_model.zip?raw=true" +}