diff --git a/plugin/src/main/java/org/opensearch/ml/autoredeploy/MLModelAutoReDeployer.java b/plugin/src/main/java/org/opensearch/ml/autoredeploy/MLModelAutoReDeployer.java index dc322f2836..ff774ac0be 100644 --- a/plugin/src/main/java/org/opensearch/ml/autoredeploy/MLModelAutoReDeployer.java +++ b/plugin/src/main/java/org/opensearch/ml/autoredeploy/MLModelAutoReDeployer.java @@ -50,6 +50,7 @@ import lombok.Builder; import lombok.Data; +import lombok.Setter; import lombok.extern.log4j.Log4j2; @Log4j2 @@ -68,6 +69,9 @@ public class MLModelAutoReDeployer { private final SearchRequestBuilderFactory searchRequestBuilderFactory; + @Setter + private ActionListener startCronJobListener; + public MLModelAutoReDeployer( ClusterService clusterService, Client client, @@ -126,6 +130,7 @@ Consumer undeployModelsOnDataNodesConsumer() { public void buildAutoReloadArrangement(List addedNodes, String clusterManagerNodeId) { if (!enableAutoReDeployModel) { log.info("Model auto reload configuration is false, not performing auto reloading!"); + startCronjobAndClearListener(); return; } String localNodeId = clusterService.localNode().getId(); @@ -142,10 +147,12 @@ public void buildAutoReloadArrangement(List addedNodes, String clusterMa public void redeployAModel() { if (!enableAutoReDeployModel) { log.info("Model auto reload configuration is false, not performing auto reloading!"); + startCronjobAndClearListener(); return; } if (modelAutoRedeployArrangements.size() == 0) { log.info("No models needs to be auto redeployed!"); + startCronjobAndClearListener(); return; } ModelAutoRedeployArrangement modelAutoRedeployArrangement = modelAutoRedeployArrangements.poll(); @@ -176,9 +183,10 @@ private void triggerAutoDeployModels(List addedNodes) { }); redeployAModel(); } - }, - e -> { log.error("Failed to query need auto redeploy models, no action will be performed, addedNodes are: {}", addedNodes, e); } - ); + }, e -> { + log.error("Failed to query need auto redeploy models, no action will be performed, addedNodes are: {}", addedNodes, e); + startCronjobAndClearListener(); + }); queryRunningModels(listener); } @@ -296,6 +304,14 @@ private void triggerModelRedeploy(ModelAutoRedeployArrangement modelAutoRedeploy client.execute(MLDeployModelAction.INSTANCE, deployModelRequest, listener); } + private void startCronjobAndClearListener() { + boolean managerNode = clusterService.localNode().isClusterManagerNode(); + if (managerNode && startCronJobListener != null) { + startCronJobListener.onResponse(true); + startCronJobListener = null; + } + } + @Data @Builder static class ModelAutoRedeployArrangement { diff --git a/plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterManagerEventListener.java b/plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterManagerEventListener.java index c3012fdade..c4cdd9f899 100644 --- a/plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterManagerEventListener.java +++ b/plugin/src/main/java/org/opensearch/ml/cluster/MLCommonsClusterManagerEventListener.java @@ -8,12 +8,16 @@ import static org.opensearch.ml.plugin.MachineLearningPlugin.GENERAL_THREAD_POOL; import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_SYNC_UP_JOB_INTERVAL_IN_SECONDS; +import java.util.List; + import org.opensearch.client.Client; import org.opensearch.cluster.LocalNodeClusterManagerListener; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.lifecycle.LifecycleListener; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; +import org.opensearch.ml.autoredeploy.MLModelAutoReDeployer; import org.opensearch.ml.engine.encryptor.Encryptor; import org.opensearch.ml.indices.MLIndicesHandler; import org.opensearch.threadpool.Scheduler; @@ -35,6 +39,8 @@ public class MLCommonsClusterManagerEventListener implements LocalNodeClusterMan private volatile Integer jobInterval; + private final MLModelAutoReDeployer mlModelAutoReDeployer; + public MLCommonsClusterManagerEventListener( ClusterService clusterService, Client client, @@ -42,7 +48,8 @@ public MLCommonsClusterManagerEventListener( ThreadPool threadPool, DiscoveryNodeHelper nodeHelper, MLIndicesHandler mlIndicesHandler, - Encryptor encryptor + Encryptor encryptor, + MLModelAutoReDeployer modelAutoReDeployer ) { this.clusterService = clusterService; this.client = client; @@ -51,6 +58,7 @@ public MLCommonsClusterManagerEventListener( this.nodeHelper = nodeHelper; this.mlIndicesHandler = mlIndicesHandler; this.encryptor = encryptor; + this.mlModelAutoReDeployer = modelAutoReDeployer; this.jobInterval = ML_COMMONS_SYNC_UP_JOB_INTERVAL_IN_SECONDS.get(settings); clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_SYNC_UP_JOB_INTERVAL_IN_SECONDS, it -> { @@ -62,13 +70,28 @@ public MLCommonsClusterManagerEventListener( @Override public void onClusterManager() { - if (syncModelRoutingCron == null) { - startSyncModelRoutingCron(); - } + ActionListener listener = ActionListener.wrap(r -> { + if (syncModelRoutingCron == null) { + startSyncModelRoutingCron(); + } + }, e -> { + if (syncModelRoutingCron == null) { + startSyncModelRoutingCron(); + } + }); + mlModelAutoReDeployer.setStartCronJobListener(listener); + String localNodeId = clusterService.localNode().getId(); + threadPool + .schedule( + () -> mlModelAutoReDeployer.buildAutoReloadArrangement(List.of(localNodeId), localNodeId), + TimeValue.timeValueSeconds(jobInterval), + GENERAL_THREAD_POOL + ); } private void startSyncModelRoutingCron() { if (jobInterval > 0) { + log.info("Starting ML sync up job..."); syncModelRoutingCron = threadPool .scheduleWithFixedDelay( new MLSyncUpCron(client, clusterService, nodeHelper, mlIndicesHandler, encryptor), diff --git a/plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java b/plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java index b4787cc5ed..c83c212e88 100644 --- a/plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java +++ b/plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java @@ -468,7 +468,8 @@ public Collection createComponents( threadPool, nodeHelper, mlIndicesHandler, - encryptor + encryptor, + mlModelAutoRedeployer ); // TODO move this into MLFeatureEnabledSetting @@ -685,6 +686,7 @@ public List> getSettings() { MLCommonsSettings.ML_COMMONS_ENABLE_INHOUSE_PYTHON_MODEL, MLCommonsSettings.ML_COMMONS_MODEL_AUTO_REDEPLOY_ENABLE, MLCommonsSettings.ML_COMMONS_MODEL_AUTO_REDEPLOY_LIFETIME_RETRY_TIMES, + MLCommonsSettings.ML_COMMONS_MODEL_AUTO_REDEPLOY_SUCCESS_RATIO, MLCommonsSettings.ML_COMMONS_ALLOW_MODEL_URL, MLCommonsSettings.ML_COMMONS_ALLOW_LOCAL_FILE_UPLOAD, MLCommonsSettings.ML_COMMONS_MODEL_ACCESS_CONTROL_ENABLED,