Skip to content

Commit

Permalink
Remove remote function check
Browse files Browse the repository at this point in the history
Signed-off-by: zane-neo <[email protected]>
  • Loading branch information
zane-neo committed Feb 7, 2024
1 parent 3951275 commit 6d60e95
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.opensearch.index.reindex.BulkByScrollResponse;
import org.opensearch.index.reindex.DeleteByQueryAction;
import org.opensearch.index.reindex.DeleteByQueryRequest;
import org.opensearch.ml.common.FunctionName;
import org.opensearch.ml.common.MLModel;
import org.opensearch.ml.common.model.MLModelState;
import org.opensearch.ml.common.transport.model.MLModelDeleteAction;
Expand Down Expand Up @@ -116,7 +115,6 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<Delete
MLModel mlModel = MLModel.parse(parser, algorithmName);
Boolean isHidden = (Boolean) r.getSource().get(IS_HIDDEN_FIELD);
MLModelState mlModelState = mlModel.getModelState();
FunctionName functionName = mlModel.getAlgorithm();
if (isHidden != null && isHidden) {
if (!isSuperAdmin) {
wrappedListener
Expand All @@ -128,7 +126,7 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<Delete
);
} else {
if (isModelNotDeployed(mlModelState)) {
deleteModel(modelId, functionName, actionListener);
deleteModel(modelId, actionListener);
} else {
wrappedListener
.onFailure(
Expand All @@ -151,7 +149,7 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<Delete
)
);
} else if (isModelNotDeployed(mlModelState)) {
deleteModel(modelId, functionName, actionListener);
deleteModel(modelId, actionListener);
} else {
wrappedListener
.onFailure(
Expand All @@ -173,7 +171,7 @@ protected void doExecute(Task task, ActionRequest request, ActionListener<Delete
} else {
// when model metadata is not found, model chunk and controller might still there, delete them here and return success
// response
deleteModelChunksAndController(null, wrappedListener, modelId, null);
deleteModelChunksAndController(wrappedListener, modelId, null);
}
}, e -> { wrappedListener.onFailure((new OpenSearchStatusException("Failed to find model", RestStatus.NOT_FOUND))); }));
} catch (Exception e) {
Expand Down Expand Up @@ -214,79 +212,57 @@ private void returnFailure(BulkByScrollResponse response, String modelId, Action
actionListener.onFailure(new OpenSearchStatusException(errorMessage, RestStatus.INTERNAL_SERVER_ERROR));
}

private void deleteModel(String modelId, FunctionName functionName, ActionListener<DeleteResponse> actionListener) {
private void deleteModel(String modelId, ActionListener<DeleteResponse> actionListener) {
DeleteRequest deleteRequest = new DeleteRequest(ML_MODEL_INDEX, modelId).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.delete(deleteRequest, new ActionListener<>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
deleteModelChunksAndController(functionName, actionListener, modelId, deleteResponse);
deleteModelChunksAndController(actionListener, modelId, deleteResponse);
}

@Override
public void onFailure(Exception e) {
if (e instanceof ResourceNotFoundException) {
deleteModelChunksAndController(functionName, actionListener, modelId, null);
deleteModelChunksAndController(actionListener, modelId, null);
} else {
log.error("Failed to delete model meta data for model, please try again: " + modelId, e);
log.error("Model is not all cleaned up, please try again: " + modelId, e);
actionListener.onFailure(e);
}
}
});
}

private void deleteModelChunksAndController(
FunctionName functionName,
ActionListener<DeleteResponse> actionListener,
String modelId,
DeleteResponse deleteResponse
) {
if (FunctionName.REMOTE != functionName) {
CountDownLatch countDownLatch = new CountDownLatch(2);
AtomicBoolean bothDeleted = new AtomicBoolean(true);
ActionListener<Boolean> countDownActionListener = ActionListener.wrap(b -> {
countDownLatch.countDown();
bothDeleted.compareAndSet(true, b);
if (countDownLatch.getCount() == 0) {
if (bothDeleted.get()) {
log.debug("model chunks and model controller for model {} deleted successfully", modelId);
if (deleteResponse != null) {
actionListener.onResponse(deleteResponse);
} else {
actionListener.onFailure(new OpenSearchStatusException("Failed to find model", RestStatus.NOT_FOUND));
}
CountDownLatch countDownLatch = new CountDownLatch(2);
AtomicBoolean bothDeleted = new AtomicBoolean(true);
ActionListener<Boolean> countDownActionListener = ActionListener.wrap(b -> {
countDownLatch.countDown();
bothDeleted.compareAndSet(true, b);
if (countDownLatch.getCount() == 0) {
if (bothDeleted.get()) {
log.debug("model chunks and model controller for model {} deleted successfully", modelId);
if (deleteResponse != null) {
actionListener.onResponse(deleteResponse);
} else {
actionListener
.onFailure(
new IllegalStateException("Failed to delete model chunks or model controller, please try again: " + modelId)
);
actionListener.onFailure(new OpenSearchStatusException("Failed to find model", RestStatus.NOT_FOUND));
}
}
}, e -> {
countDownLatch.countDown();
bothDeleted.compareAndSet(true, false);
if (countDownLatch.getCount() == 0) {
actionListener
.onFailure(
new IllegalStateException("Failed to delete model chunks or model controller, please try again: " + modelId, e)
);
}
});
deleteModelChunks(modelId, countDownActionListener);
deleteController(modelId, countDownActionListener);
} else {
ActionListener<Boolean> deleteControllerListener = ActionListener.wrap(b -> {
log.debug("model controller for model {} deleted successfully", modelId);
if (deleteResponse != null) {
actionListener.onResponse(deleteResponse);
} else {
actionListener.onFailure(new OpenSearchStatusException("Failed to find model", RestStatus.NOT_FOUND));
actionListener.onFailure(new IllegalStateException("Model is not all cleaned up, please try again: " + modelId));
}
}, e -> {
log.error("Failed to delete model controller, please try again: " + modelId, e);
actionListener.onFailure(new IllegalStateException("Failed to delete model controller, please try again: " + modelId, e));
});
deleteController(modelId, deleteControllerListener);
}
}
}, e -> {
countDownLatch.countDown();
bothDeleted.compareAndSet(true, false);
if (countDownLatch.getCount() == 0) {
actionListener.onFailure(new IllegalStateException("Model is not all cleaned up, please try again: " + modelId, e));
}
});
deleteModelChunks(modelId, countDownActionListener);
deleteController(modelId, countDownActionListener);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,7 @@ public void testDeleteLocalModel_deleteModelController_failed() throws IOExcepti
deleteModelTransportAction.doExecute(null, mlModelDeleteRequest, actionListener);
ArgumentCaptor<Exception> argumentCaptor = ArgumentCaptor.forClass(Exception.class);
verify(actionListener).onFailure(argumentCaptor.capture());
assertEquals(
"Failed to delete model chunks or model controller, please try again: test_id",
argumentCaptor.getValue().getMessage()
);
assertEquals("Model is not all cleaned up, please try again: test_id", argumentCaptor.getValue().getMessage());
}

public void testDeleteRemoteModel_deleteModelChunks_failed() throws IOException {
Expand All @@ -272,10 +269,7 @@ public void testDeleteRemoteModel_deleteModelChunks_failed() throws IOException
deleteModelTransportAction.doExecute(null, mlModelDeleteRequest, actionListener);
ArgumentCaptor<Exception> argumentCaptor = ArgumentCaptor.forClass(Exception.class);
verify(actionListener).onFailure(argumentCaptor.capture());
assertEquals(
"Failed to delete model chunks or model controller, please try again: test_id",
argumentCaptor.getValue().getMessage()
);
assertEquals("Model is not all cleaned up, please try again: test_id", argumentCaptor.getValue().getMessage());
}

public void testDeleteHiddenModel_Success() throws IOException {
Expand Down

0 comments on commit 6d60e95

Please sign in to comment.