diff --git a/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java index b3f004d71..b1246b18a 100644 --- a/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java @@ -17,6 +17,7 @@ import org.opensearch.client.Client; import org.opensearch.common.inject.Inject; import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.commons.authuser.User; import org.opensearch.core.action.ActionListener; import org.opensearch.core.rest.RestStatus; import org.opensearch.flowframework.exception.FlowFrameworkException; @@ -46,6 +47,7 @@ import static org.opensearch.flowframework.common.CommonValue.PROVISION_WORKFLOW; import static org.opensearch.flowframework.common.WorkflowResources.getDeprovisionStepByWorkflowStep; import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep; +import static org.opensearch.flowframework.util.ParseUtils.getUserContext; /** * Transport Action to deprovision a workflow from a stored use case template @@ -97,6 +99,7 @@ public DeprovisionWorkflowTransportAction( @Override protected void doExecute(Task task, WorkflowRequest request, ActionListener listener) { // Retrieve use case template from global context + User user = getUserContext(client); String workflowId = request.getWorkflowId(); GetRequest getRequest = new GetRequest(GLOBAL_CONTEXT_INDEX, workflowId); @@ -127,7 +130,7 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener { if (exception instanceof FlowFrameworkException) { logger.error("Workflow validation failed for workflow : " + workflowId); @@ -144,6 +147,7 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener provisionProcessSequence, ActionListener listener @@ -157,7 +161,7 @@ private void getResourcesAndExecute( .collect(Collectors.toMap(ResourceCreated::workflowStepId, Function.identity())); // Now finally do the deprovision - executeDeprovisionSequence(workflowId, resourceMap, provisionProcessSequence, listener); + executeDeprovisionSequence(user, workflowId, resourceMap, provisionProcessSequence, listener); }, exception -> { logger.error("Failed to get workflow state for workflow " + workflowId); listener.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))); @@ -165,6 +169,7 @@ private void getResourcesAndExecute( } private void executeDeprovisionSequence( + User user, String workflowId, Map resourceMap, List provisionProcessSequence, @@ -217,6 +222,8 @@ private void executeDeprovisionSequence( logger.info("Successful {} for {}", deprovisionNode.id(), resourceNameAndId); // Remove from list so we don't try again iter.remove(); + // Pause briefly before next step + Thread.sleep(100); } catch (Throwable t) { logger.info( "Failed {} for {}: {}", @@ -225,7 +232,6 @@ private void executeDeprovisionSequence( t.getCause() == null ? t.getMessage() : t.getCause().getMessage() ); } - if (deprovisionFuture.isCompletedExceptionally()) {} else {} } if (deprovisionProcessSequence.size() < resourceCount) { // If we've deleted something, decrement and try again if not zero @@ -241,15 +247,20 @@ private void executeDeprovisionSequence( pn.nodeTimeout() ); }).collect(Collectors.toList()); + // Pause briefly before next loop + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + break; + } } else { // If nothing was deleted, exit loop break; } } if (deprovisionProcessSequence.isEmpty()) { - // Successful deprovision, return workflow ID - // TODO: Reset state for this workflow ID to NOT STARTED - listener.onResponse(new WorkflowResponse(workflowId)); + // Successful deprovision + resetWorkflowState(user, workflowId, listener); } else { // Failed deprovision, give user list of remaining resources listener.onFailure( @@ -258,6 +269,8 @@ private void executeDeprovisionSequence( "Failed to deprovision some resources: [" + deprovisionProcessSequence.stream() .map(pn -> getResourceFromProcessNode(pn, resourceMap)) + .filter(Objects::nonNull) + .distinct() .collect(Collectors.joining(", ")) + "].", RestStatus.ACCEPTED @@ -266,6 +279,20 @@ private void executeDeprovisionSequence( } } + private void resetWorkflowState(User user, String workflowId, ActionListener listener) { + flowFrameworkIndicesHandler.putInitialStateToWorkflowState(workflowId, user, ActionListener.wrap(stateResponse -> { + logger.info("create state workflow doc"); + listener.onResponse(new WorkflowResponse(workflowId)); + }, exception -> { + logger.error("Failed to save workflow state : {}", exception.getMessage()); + if (exception instanceof FlowFrameworkException) { + listener.onFailure(exception); + } else { + listener.onFailure(new FlowFrameworkException(exception.getMessage(), RestStatus.BAD_REQUEST)); + } + })); + } + private String getResourceFromProcessNode(ProcessNode deprovisionNode, Map resourceMap) { String deprovisionId = deprovisionNode.id(); int pos = deprovisionId.indexOf(DEPROVISION_SUFFIX);