Skip to content

Commit

Permalink
Fix long pending issue when deleting model (opensearch-project#1882)
Browse files Browse the repository at this point in the history
* Fix long pending issue when deleting model

Signed-off-by: zane-neo <[email protected]>

* Refine the delete model code

Signed-off-by: zane-neo <[email protected]>

* refactor delete model flow to make sure all dependent resources are deleted together with model metadata

Signed-off-by: zane-neo <[email protected]>

* fix minor issue to make sure only non-remote model will deelete chunks

Signed-off-by: zane-neo <[email protected]>

* format code

Signed-off-by: zane-neo <[email protected]>

* fix failure UTs

Signed-off-by: zane-neo <[email protected]>

* Change to delete model metadata first

Signed-off-by: zane-neo <[email protected]>

* format code

Signed-off-by: zane-neo <[email protected]>

* Remove remote function check

Signed-off-by: zane-neo <[email protected]>

* Fix failure UTs

Signed-off-by: zane-neo <[email protected]>

---------

Signed-off-by: zane-neo <[email protected]>
  • Loading branch information
zane-neo authored Feb 7, 2024
1 parent 50d30df commit f18eaf3
Show file tree
Hide file tree
Showing 3 changed files with 287 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,19 @@
import static org.opensearch.ml.utils.MLNodeUtils.createXContentParserFromRegistry;
import static org.opensearch.ml.utils.RestActionUtils.getFetchSourceContext;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

import org.opensearch.OpenSearchStatusException;
import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
Expand All @@ -34,7 +37,6 @@
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.TermsQueryBuilder;
import org.opensearch.index.reindex.BulkByScrollResponse;
import org.opensearch.index.reindex.DeleteByQueryAction;
Expand Down Expand Up @@ -167,7 +169,9 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<Delete
wrappedListener.onFailure(e);
}
} else {
wrappedListener.onFailure(new OpenSearchStatusException("Failed to find model", RestStatus.NOT_FOUND));
// when model metadata is not found, model chunk and controller might still there, delete them here and return success
// response
deleteModelChunksAndController(wrappedListener, modelId, null);
}
}, e -> { wrappedListener.onFailure((new OpenSearchStatusException("Failed to find model", RestStatus.NOT_FOUND))); }));
} catch (Exception e) {
Expand All @@ -177,31 +181,25 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<Delete
}

@VisibleForTesting
void deleteModelChunks(String modelId, DeleteResponse deleteResponse, ActionListener<DeleteResponse> actionListener) {
void deleteModelChunks(String modelId, ActionListener<Boolean> actionListener) {
DeleteByQueryRequest deleteModelsRequest = new DeleteByQueryRequest(ML_MODEL_INDEX);
deleteModelsRequest.setQuery(new TermsQueryBuilder(MODEL_ID_FIELD, modelId));

client.execute(DeleteByQueryAction.INSTANCE, deleteModelsRequest, ActionListener.wrap(r -> {
if ((r.getBulkFailures() == null || r.getBulkFailures().size() == 0)
&& (r.getSearchFailures() == null || r.getSearchFailures().size() == 0)) {
log.debug("All model chunks are deleted for model {}", modelId);
if (deleteResponse != null) {
// If model metaData not found and deleteResponse is null, do not return here.
// ResourceNotFound is returned to notify that this model was deleted.
// This is a walk around to avoid cleaning up model leftovers. Will revisit if
// necessary.
actionListener.onResponse(deleteResponse);
}
actionListener.onResponse(true);
} else {
returnFailure(r, modelId, actionListener);
}
}, e -> {
log.error("Failed to delete ML model for " + modelId, e);
log.error("Failed to delete model chunks for: " + modelId, e);
actionListener.onFailure(e);
}));
}

private void returnFailure(BulkByScrollResponse response, String modelId, ActionListener<DeleteResponse> actionListener) {
private void returnFailure(BulkByScrollResponse response, String modelId, ActionListener<Boolean> actionListener) {
String errorMessage = "";
if (response.isTimedOut()) {
errorMessage = OS_STATUS_EXCEPTION_MESSAGE + ", " + TIMEOUT_MSG + modelId;
Expand All @@ -215,24 +213,56 @@ private void returnFailure(BulkByScrollResponse response, String modelId, Action
}

private void deleteModel(String modelId, ActionListener<DeleteResponse> actionListener) {
DeleteRequest deleteRequest = new DeleteRequest(ML_MODEL_INDEX, modelId);
client.delete(deleteRequest, new ActionListener<DeleteResponse>() {
DeleteRequest deleteRequest = new DeleteRequest(ML_MODEL_INDEX, modelId).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.delete(deleteRequest, new ActionListener<>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
deleteModelChunks(modelId, deleteResponse, actionListener);
deleteController(modelId);
deleteModelChunksAndController(actionListener, modelId, deleteResponse);
}

@Override
public void onFailure(Exception e) {
log.error("Failed to delete model meta data for model: " + modelId, e);
if (e instanceof ResourceNotFoundException) {
deleteModelChunks(modelId, null, actionListener);
deleteController(modelId);
deleteModelChunksAndController(actionListener, modelId, null);
} else {
log.error("Model is not all cleaned up, please try again: " + modelId, e);
actionListener.onFailure(e);
}
}
});
}

private void deleteModelChunksAndController(
ActionListener<DeleteResponse> actionListener,
String modelId,
DeleteResponse deleteResponse
) {
CountDownLatch countDownLatch = new CountDownLatch(2);
AtomicBoolean bothDeleted = new AtomicBoolean(true);
ActionListener<Boolean> countDownActionListener = ActionListener.wrap(b -> {
countDownLatch.countDown();
bothDeleted.compareAndSet(true, b);
if (countDownLatch.getCount() == 0) {
if (bothDeleted.get()) {
log.debug("model chunks and model controller for model {} deleted successfully", modelId);
if (deleteResponse != null) {
actionListener.onResponse(deleteResponse);
} else {
actionListener.onFailure(new OpenSearchStatusException("Failed to find model", RestStatus.NOT_FOUND));
}
} else {
actionListener.onFailure(new IllegalStateException("Model is not all cleaned up, please try again: " + modelId));
}
actionListener.onFailure(e);
}
}, e -> {
countDownLatch.countDown();
bothDeleted.compareAndSet(true, false);
if (countDownLatch.getCount() == 0) {
actionListener.onFailure(new IllegalStateException("Model is not all cleaned up, please try again: " + modelId, e));
}
});
deleteModelChunks(modelId, countDownActionListener);
deleteController(modelId, countDownActionListener);
}

/**
Expand All @@ -241,20 +271,20 @@ public void onFailure(Exception e) {
*
* @param modelId model ID
*/
private void deleteController(String modelId, ActionListener<DeleteResponse> actionListener) {
private void deleteController(String modelId, ActionListener<Boolean> actionListener) {
DeleteRequest deleteRequest = new DeleteRequest(ML_CONTROLLER_INDEX, modelId);
client.delete(deleteRequest, new ActionListener<>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
log.info("Model controller for model {} successfully deleted from index, result: {}", modelId, deleteResponse.getResult());
actionListener.onResponse(deleteResponse);
actionListener.onResponse(true);
}

@Override
public void onFailure(Exception e) {
if (e instanceof IndexNotFoundException) {
if (e instanceof ResourceNotFoundException) {
log.info("Model controller not deleted due to no model controller found for model: " + modelId);
actionListener.onFailure(e);
actionListener.onResponse(true); // we consider this as success
} else {
log.error("Failed to delete model controller for model: " + modelId, e);
actionListener.onFailure(e);
Expand All @@ -263,28 +293,6 @@ public void onFailure(Exception e) {
});
}

/**
* Delete the model controller for a model after the model is deleted from the
* ML index with build-in listener.
*
* @param modelId model ID
*/
private void deleteController(String modelId) {
deleteController(modelId, ActionListener.wrap(deleteResponse -> {
if (deleteResponse.getResult() == DocWriteResponse.Result.DELETED) {
log.info("Model controller for model {} successfully deleted from index, result: {}", modelId, deleteResponse.getResult());
} else {
log.info("The deletion of model controller for model {} returned with result: {}", modelId, deleteResponse.getResult());
}
}, e -> {
if (e instanceof IndexNotFoundException) {
log.debug("Model controller not deleted due to no model controller found for model: " + modelId);
} else {
log.error("Failed to delete model controller for model: " + modelId, e);
}
}));
}

private Boolean isModelNotDeployed(MLModelState mlModelState) {
return !mlModelState.equals(MLModelState.LOADED)
&& !mlModelState.equals(MLModelState.LOADING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.support.nodes.TransportNodesAction;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.client.Client;
Expand Down Expand Up @@ -174,7 +175,7 @@ protected MLUndeployModelNodesResponse newResponse(
deployToAllNodes.put(modelId, false);
}
updateRequest.index(ML_MODEL_INDEX).id(modelId).doc(updateDocument);
bulkRequest.add(updateRequest);
bulkRequest.add(updateRequest).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
}
syncUpInput.setDeployToAllNodes(deployToAllNodes);
ActionListener<BulkResponse> actionListener = ActionListener.wrap(r -> {
Expand Down
Loading

0 comments on commit f18eaf3

Please sign in to comment.