Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster restart model auto redeploy #1627

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@

import lombok.Builder;
import lombok.Data;
import lombok.Setter;
import lombok.extern.log4j.Log4j2;

@Log4j2
Expand All @@ -68,6 +69,9 @@ public class MLModelAutoReDeployer {

private final SearchRequestBuilderFactory searchRequestBuilderFactory;

@Setter
private ActionListener<Boolean> startCronJobListener;

public MLModelAutoReDeployer(
ClusterService clusterService,
Client client,
Expand Down Expand Up @@ -126,6 +130,7 @@ Consumer<Boolean> undeployModelsOnDataNodesConsumer() {
public void buildAutoReloadArrangement(List<String> addedNodes, String clusterManagerNodeId) {
if (!enableAutoReDeployModel) {
log.info("Model auto reload configuration is false, not performing auto reloading!");
startCronjobAndClearListener();
return;
}
String localNodeId = clusterService.localNode().getId();
Expand All @@ -142,10 +147,12 @@ public void buildAutoReloadArrangement(List<String> addedNodes, String clusterMa
public void redeployAModel() {
if (!enableAutoReDeployModel) {
log.info("Model auto reload configuration is false, not performing auto reloading!");
startCronjobAndClearListener();
return;
}
if (modelAutoRedeployArrangements.size() == 0) {
log.info("No models needs to be auto redeployed!");
startCronjobAndClearListener();
return;
}
ModelAutoRedeployArrangement modelAutoRedeployArrangement = modelAutoRedeployArrangements.poll();
Expand Down Expand Up @@ -176,9 +183,10 @@ private void triggerAutoDeployModels(List<String> addedNodes) {
});
redeployAModel();
}
},
e -> { log.error("Failed to query need auto redeploy models, no action will be performed, addedNodes are: {}", addedNodes, e); }
);
}, e -> {
log.error("Failed to query need auto redeploy models, no action will be performed, addedNodes are: {}", addedNodes, e);
startCronjobAndClearListener();
});

queryRunningModels(listener);
}
Expand Down Expand Up @@ -296,6 +304,14 @@ private void triggerModelRedeploy(ModelAutoRedeployArrangement modelAutoRedeploy
client.execute(MLDeployModelAction.INSTANCE, deployModelRequest, listener);
}

private void startCronjobAndClearListener() {
boolean managerNode = clusterService.localNode().isClusterManagerNode();
if (managerNode && startCronJobListener != null) {
startCronJobListener.onResponse(true);
startCronJobListener = null;
}
}

@Data
@Builder
static class ModelAutoRedeployArrangement {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@
import static org.opensearch.ml.plugin.MachineLearningPlugin.GENERAL_THREAD_POOL;
import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_SYNC_UP_JOB_INTERVAL_IN_SECONDS;

import java.util.List;

import org.opensearch.client.Client;
import org.opensearch.cluster.LocalNodeClusterManagerListener;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.lifecycle.LifecycleListener;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.ml.autoredeploy.MLModelAutoReDeployer;
import org.opensearch.ml.engine.encryptor.Encryptor;
import org.opensearch.ml.indices.MLIndicesHandler;
import org.opensearch.threadpool.Scheduler;
Expand All @@ -35,14 +39,17 @@ public class MLCommonsClusterManagerEventListener implements LocalNodeClusterMan

private volatile Integer jobInterval;

private final MLModelAutoReDeployer mlModelAutoReDeployer;

public MLCommonsClusterManagerEventListener(
ClusterService clusterService,
Client client,
Settings settings,
ThreadPool threadPool,
DiscoveryNodeHelper nodeHelper,
MLIndicesHandler mlIndicesHandler,
Encryptor encryptor
Encryptor encryptor,
MLModelAutoReDeployer modelAutoReDeployer
) {
this.clusterService = clusterService;
this.client = client;
Expand All @@ -51,6 +58,7 @@ public MLCommonsClusterManagerEventListener(
this.nodeHelper = nodeHelper;
this.mlIndicesHandler = mlIndicesHandler;
this.encryptor = encryptor;
this.mlModelAutoReDeployer = modelAutoReDeployer;

this.jobInterval = ML_COMMONS_SYNC_UP_JOB_INTERVAL_IN_SECONDS.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(ML_COMMONS_SYNC_UP_JOB_INTERVAL_IN_SECONDS, it -> {
Expand All @@ -62,13 +70,28 @@ public MLCommonsClusterManagerEventListener(

@Override
public void onClusterManager() {
if (syncModelRoutingCron == null) {
startSyncModelRoutingCron();
}
ActionListener<Boolean> listener = ActionListener.wrap(r -> {
if (syncModelRoutingCron == null) {
startSyncModelRoutingCron();
}
}, e -> {
if (syncModelRoutingCron == null) {
startSyncModelRoutingCron();
}
});
mlModelAutoReDeployer.setStartCronJobListener(listener);
String localNodeId = clusterService.localNode().getId();
threadPool
.schedule(
() -> mlModelAutoReDeployer.buildAutoReloadArrangement(List.of(localNodeId), localNodeId),
TimeValue.timeValueSeconds(jobInterval),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we start auto reload immediately ?

Copy link
Collaborator Author

@zane-neo zane-neo Nov 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, cluster manager ready doesn't mean cluster is ready, if we start auto reload immediately we can get a blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized]; exception when querying needs reload models.

GENERAL_THREAD_POOL
);
}

private void startSyncModelRoutingCron() {
if (jobInterval > 0) {
log.info("Starting ML sync up job...");
syncModelRoutingCron = threadPool
.scheduleWithFixedDelay(
new MLSyncUpCron(client, clusterService, nodeHelper, mlIndicesHandler, encryptor),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,8 @@ public Collection<Object> createComponents(
threadPool,
nodeHelper,
mlIndicesHandler,
encryptor
encryptor,
mlModelAutoRedeployer
);

// TODO move this into MLFeatureEnabledSetting
Expand Down Expand Up @@ -685,6 +686,7 @@ public List<Setting<?>> getSettings() {
MLCommonsSettings.ML_COMMONS_ENABLE_INHOUSE_PYTHON_MODEL,
MLCommonsSettings.ML_COMMONS_MODEL_AUTO_REDEPLOY_ENABLE,
MLCommonsSettings.ML_COMMONS_MODEL_AUTO_REDEPLOY_LIFETIME_RETRY_TIMES,
MLCommonsSettings.ML_COMMONS_MODEL_AUTO_REDEPLOY_SUCCESS_RATIO,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This setting is not used anywhere in this PR?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't expose this setting when implementing the auto redeploy feature, so exposing it now to enable customer to change this ratio value.

MLCommonsSettings.ML_COMMONS_ALLOW_MODEL_URL,
MLCommonsSettings.ML_COMMONS_ALLOW_LOCAL_FILE_UPLOAD,
MLCommonsSettings.ML_COMMONS_MODEL_ACCESS_CONTROL_ENABLED,
Expand Down
Loading