From 1c031217979d4a977ab0a0596c8316ee74ce17e7 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Fri, 5 Jul 2024 17:16:48 -0700 Subject: [PATCH] Add allow_delete parameter to Deprovision API Signed-off-by: Daniel Widdis --- CHANGELOG.md | 2 + .../flowframework/common/CommonValue.java | 2 + .../common/WorkflowResources.java | 9 +- .../rest/RestDeprovisionWorkflowAction.java | 10 +- .../DeprovisionWorkflowTransportAction.java | 77 ++++++--- .../workflow/DeleteIndexStep.java | 108 ++++++++++++ .../workflow/DeleteIngestPipelineStep.java | 108 ++++++++++++ .../workflow/DeleteSearchPipelineStep.java | 108 ++++++++++++ .../flowframework/workflow/WorkflowStep.java | 8 + .../workflow/WorkflowStepFactory.java | 42 ++++- .../FlowFrameworkRestTestCase.java | 18 ++ .../model/WorkflowValidatorTests.java | 65 +------ .../rest/FlowFrameworkRestApiIT.java | 44 +++-- .../RestDeprovisionWorkflowActionTests.java | 33 +++- ...provisionWorkflowTransportActionTests.java | 163 ++++++++++++++++-- .../workflow/DeleteIndexStepTests.java | 124 +++++++++++++ .../DeleteIngestPipelineStepTests.java | 124 +++++++++++++ .../DeleteSearchPipelineStepTests.java | 124 +++++++++++++ 18 files changed, 1051 insertions(+), 118 deletions(-) create mode 100644 src/main/java/org/opensearch/flowframework/workflow/DeleteIndexStep.java create mode 100644 src/main/java/org/opensearch/flowframework/workflow/DeleteIngestPipelineStep.java create mode 100644 src/main/java/org/opensearch/flowframework/workflow/DeleteSearchPipelineStep.java create mode 100644 src/test/java/org/opensearch/flowframework/workflow/DeleteIndexStepTests.java create mode 100644 src/test/java/org/opensearch/flowframework/workflow/DeleteIngestPipelineStepTests.java create mode 100644 src/test/java/org/opensearch/flowframework/workflow/DeleteSearchPipelineStepTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 53cbbc567..0b26cc51f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) ## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.14...2.x) ### Features +- Add allow_delete parameter to Deprovision API and add DeleteIndex, DeleteIngestPipeline, and DeleteSearchPipeline Workflow Steps ([#763](https://github.com/opensearch-project/flow-framework/pull/763)) + ### Enhancements - Register system index descriptors through SystemIndexPlugin.getSystemIndexDescriptors ([#750](https://github.com/opensearch-project/flow-framework/pull/750)) diff --git a/src/main/java/org/opensearch/flowframework/common/CommonValue.java b/src/main/java/org/opensearch/flowframework/common/CommonValue.java index 10a23357a..2d25aaa09 100644 --- a/src/main/java/org/opensearch/flowframework/common/CommonValue.java +++ b/src/main/java/org/opensearch/flowframework/common/CommonValue.java @@ -68,6 +68,8 @@ private CommonValue() {} public static final String WORKFLOW_ID = "workflow_id"; /** Field name for template validation, the flag to indicate if validation is necessary */ public static final String VALIDATION = "validation"; + /** Param name for allow deletion during deprovisioning */ + public static final String ALLOW_DELETE = "allow_delete"; /** The field name for provision workflow within a use case template*/ public static final String PROVISION_WORKFLOW = "provision"; /** The field name for workflow steps. This field represents the name of the workflow steps to be fetched. */ diff --git a/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java b/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java index e349e57e0..ad94f7b21 100644 --- a/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java +++ b/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java @@ -18,7 +18,10 @@ import org.opensearch.flowframework.workflow.CreateSearchPipelineStep; import org.opensearch.flowframework.workflow.DeleteAgentStep; import org.opensearch.flowframework.workflow.DeleteConnectorStep; +import org.opensearch.flowframework.workflow.DeleteIndexStep; +import org.opensearch.flowframework.workflow.DeleteIngestPipelineStep; import org.opensearch.flowframework.workflow.DeleteModelStep; +import org.opensearch.flowframework.workflow.DeleteSearchPipelineStep; import org.opensearch.flowframework.workflow.DeployModelStep; import org.opensearch.flowframework.workflow.NoOpStep; import org.opensearch.flowframework.workflow.RegisterAgentStep; @@ -54,11 +57,11 @@ public enum WorkflowResources { /** Workflow steps for deploying/undeploying a model and associated created resource */ DEPLOY_MODEL(DeployModelStep.NAME, WorkflowResources.MODEL_ID, UndeployModelStep.NAME), /** Workflow steps for creating an ingest-pipeline and associated created resource */ - CREATE_INGEST_PIPELINE(CreateIngestPipelineStep.NAME, WorkflowResources.PIPELINE_ID, null), // TODO delete step + CREATE_INGEST_PIPELINE(CreateIngestPipelineStep.NAME, WorkflowResources.PIPELINE_ID, DeleteIngestPipelineStep.NAME), /** Workflow steps for creating an ingest-pipeline and associated created resource */ - CREATE_SEARCH_PIPELINE(CreateSearchPipelineStep.NAME, WorkflowResources.PIPELINE_ID, null), // TODO delete step + CREATE_SEARCH_PIPELINE(CreateSearchPipelineStep.NAME, WorkflowResources.PIPELINE_ID, DeleteSearchPipelineStep.NAME), /** Workflow steps for creating an index and associated created resource */ - CREATE_INDEX(CreateIndexStep.NAME, WorkflowResources.INDEX_NAME, NoOpStep.NAME), + CREATE_INDEX(CreateIndexStep.NAME, WorkflowResources.INDEX_NAME, DeleteIndexStep.NAME), /** Workflow steps for reindex a source index to destination index and associated created resource */ REINDEX(ReindexStep.NAME, WorkflowResources.INDEX_NAME, NoOpStep.NAME), /** Workflow steps for registering/deleting an agent and the associated created resource */ diff --git a/src/main/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowAction.java index 0d0bf8b64..d75255b8d 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowAction.java @@ -25,9 +25,12 @@ import org.opensearch.rest.RestRequest; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Locale; +import java.util.Map; +import static org.opensearch.flowframework.common.CommonValue.ALLOW_DELETE; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI; import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED; @@ -57,6 +60,7 @@ public String getName() { @Override protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { String workflowId = request.param(WORKFLOW_ID); + String allowDelete = request.param(ALLOW_DELETE); try { if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) { throw new FlowFrameworkException( @@ -73,7 +77,11 @@ protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request if (workflowId == null) { throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST); } - WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null); + WorkflowRequest workflowRequest = new WorkflowRequest( + workflowId, + null, + allowDelete == null ? Collections.emptyMap() : Map.of(ALLOW_DELETE, allowDelete) + ); return channel -> client.execute(DeprovisionWorkflowAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> { XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS); diff --git a/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java index 98f93aa07..cf3f2361a 100644 --- a/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportAction.java @@ -19,6 +19,7 @@ import org.opensearch.common.inject.Inject; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.Strings; import org.opensearch.core.rest.RestStatus; import org.opensearch.flowframework.common.FlowFrameworkSettings; import org.opensearch.flowframework.exception.FlowFrameworkException; @@ -28,6 +29,7 @@ import org.opensearch.flowframework.model.State; import org.opensearch.flowframework.workflow.ProcessNode; import org.opensearch.flowframework.workflow.WorkflowData; +import org.opensearch.flowframework.workflow.WorkflowStep; import org.opensearch.flowframework.workflow.WorkflowStepFactory; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; @@ -40,8 +42,10 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.stream.Collectors; +import static org.opensearch.flowframework.common.CommonValue.ALLOW_DELETE; import static org.opensearch.flowframework.common.CommonValue.DEPROVISION_WORKFLOW_THREAD_POOL; import static org.opensearch.flowframework.common.CommonValue.PROVISIONING_PROGRESS_FIELD; import static org.opensearch.flowframework.common.CommonValue.PROVISION_END_TIME_FIELD; @@ -95,6 +99,7 @@ public DeprovisionWorkflowTransportAction( @Override protected void doExecute(Task task, WorkflowRequest request, ActionListener listener) { String workflowId = request.getWorkflowId(); + String allowDelete = request.getParams().get(ALLOW_DELETE); GetWorkflowStateRequest getStateRequest = new GetWorkflowStateRequest(workflowId, true); // Stash thread context to interact with system index @@ -103,9 +108,17 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener { context.restore(); + Set deleteAllowedResources = Strings.tokenizeByCommaToSet(allowDelete); // Retrieve resources from workflow state and deprovision threadPool.executor(DEPROVISION_WORKFLOW_THREAD_POOL) - .execute(() -> executeDeprovisionSequence(workflowId, response.getWorkflowState().resourcesCreated(), listener)); + .execute( + () -> executeDeprovisionSequence( + workflowId, + response.getWorkflowState().resourcesCreated(), + deleteAllowedResources, + listener + ) + ); }, exception -> { String errorMessage = "Failed to get workflow state for workflow " + workflowId; logger.error(errorMessage, exception); @@ -121,18 +134,20 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener resourcesCreated, + Set deleteAllowedResources, ActionListener listener ) { - + List deleteNotAllowed = new ArrayList<>(); // Create a list of ProcessNodes with the corresponding deprovision workflow steps List deprovisionProcessSequence = new ArrayList<>(); for (ResourceCreated resource : resourcesCreated) { String workflowStepId = resource.workflowStepId(); String stepName = resource.workflowStepName(); - String deprovisionStep = getDeprovisionStepByWorkflowStep(stepName); - // Unimplemented steps presently return null, so skip - if (deprovisionStep == null) { + WorkflowStep deprovisionStep = workflowStepFactory.createStep(getDeprovisionStepByWorkflowStep(stepName)); + // Skip if the step requires allow_delete but the resourceId isn't included + if (deprovisionStep.allowDeleteRequired() && !deleteAllowedResources.contains(resource.resourceId())) { + deleteNotAllowed.add(resource); continue; } // New ID is old ID with (deprovision step type) prepended @@ -140,7 +155,7 @@ private void executeDeprovisionSequence( deprovisionProcessSequence.add( new ProcessNode( deprovisionStepId, - workflowStepFactory.createStep(deprovisionStep), + deprovisionStep, Collections.emptyMap(), Collections.emptyMap(), new WorkflowData(Map.of(getResourceByWorkflowStep(stepName), resource.resourceId()), workflowId, deprovisionStepId), @@ -215,17 +230,21 @@ private void executeDeprovisionSequence( List remainingResources = deprovisionProcessSequence.stream() .map(pn -> getResourceFromDeprovisionNode(pn, resourcesCreated)) .collect(Collectors.toList()); - logger.info("Resources remaining: {}", remainingResources); - updateWorkflowState(workflowId, remainingResources, listener); + logger.info("Resources remaining: {}.", remainingResources); + if (!deleteNotAllowed.isEmpty()) { + logger.info("Resources requiring allow_delete: {}.", deleteNotAllowed); + } + updateWorkflowState(workflowId, remainingResources, deleteNotAllowed, listener); } private void updateWorkflowState( String workflowId, List remainingResources, + List deleteNotAllowed, ActionListener listener ) { - if (remainingResources.isEmpty()) { - // Successful deprovision, reset state to initial + if (remainingResources.isEmpty() && deleteNotAllowed.isEmpty()) { + // Successful deprovision of all resources, reset state to initial flowFrameworkIndicesHandler.doesTemplateExist(workflowId, templateExists -> { if (Boolean.TRUE.equals(templateExists)) { flowFrameworkIndicesHandler.putInitialStateToWorkflowState( @@ -244,35 +263,49 @@ private void updateWorkflowState( listener.onResponse(new WorkflowResponse(workflowId)); }, listener); } else { - // Failed deprovision + // Remaining resources only includes ones we tried to delete + List stateIndexResources = new ArrayList<>(remainingResources); + // Add in those we skipped + stateIndexResources.addAll(deleteNotAllowed); flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc( workflowId, Map.ofEntries( Map.entry(STATE_FIELD, State.COMPLETED), Map.entry(PROVISIONING_PROGRESS_FIELD, ProvisioningProgress.DONE), Map.entry(PROVISION_END_TIME_FIELD, Instant.now().toEpochMilli()), - Map.entry(RESOURCES_CREATED_FIELD, remainingResources) + Map.entry(RESOURCES_CREATED_FIELD, stateIndexResources) ), ActionListener.wrap(updateResponse -> { logger.info("updated workflow {} state to COMPLETED", workflowId); }, exception -> { logger.error("Failed to update workflow {} state", workflowId, exception); }) ); // give user list of remaining resources + StringBuilder message = new StringBuilder(); + appendResourceInfo(message, "Failed to deprovision some resources: ", remainingResources); + appendResourceInfo(message, "These resources require the " + ALLOW_DELETE + " parameter to deprovision: ", deleteNotAllowed); listener.onFailure( - new FlowFrameworkException( - "Failed to deprovision some resources: [" - + remainingResources.stream() - .map(DeprovisionWorkflowTransportAction::getResourceNameAndId) - .filter(Objects::nonNull) - .distinct() - .collect(Collectors.joining(", ")) - + "].", - RestStatus.ACCEPTED - ) + new FlowFrameworkException(message.toString(), remainingResources.isEmpty() ? RestStatus.FORBIDDEN : RestStatus.ACCEPTED) ); } } + private static void appendResourceInfo(StringBuilder message, String prefix, List resources) { + if (!resources.isEmpty()) { + if (message.length() > 0) { + message.append(" "); + } + message.append(prefix) + .append( + resources.stream() + .map(DeprovisionWorkflowTransportAction::getResourceNameAndId) + .filter(Objects::nonNull) + .distinct() + .collect(Collectors.joining(", ", "[", "]")) + ) + .append("."); + } + } + private static ResourceCreated getResourceFromDeprovisionNode(ProcessNode deprovisionNode, List resourcesCreated) { return resourcesCreated.stream() .filter(r -> deprovisionNode.id().equals("(deprovision_" + r.workflowStepName() + ") " + r.workflowStepId())) diff --git a/src/main/java/org/opensearch/flowframework/workflow/DeleteIndexStep.java b/src/main/java/org/opensearch/flowframework/workflow/DeleteIndexStep.java new file mode 100644 index 000000000..31783c55e --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/workflow/DeleteIndexStep.java @@ -0,0 +1,108 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.ExceptionsHelper; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.client.Client; +import org.opensearch.core.action.ActionListener; +import org.opensearch.flowframework.exception.WorkflowStepException; +import org.opensearch.flowframework.util.ParseUtils; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +import static org.opensearch.flowframework.common.WorkflowResources.INDEX_NAME; +import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException; + +/** + * Step to delete an index + */ +public class DeleteIndexStep implements WorkflowStep { + + private static final Logger logger = LogManager.getLogger(DeleteIndexStep.class); + private final Client client; + + /** The name of this step, used as a key in the template and the {@link WorkflowStepFactory} */ + public static final String NAME = "delete_index"; + /** Required input keys */ + public static final Set REQUIRED_INPUTS = Set.of(INDEX_NAME); + /** Optional input keys */ + public static final Set OPTIONAL_INPUTS = Collections.emptySet(); + /** Provided output keys */ + public static final Set PROVIDED_OUTPUTS = Set.of(INDEX_NAME); + + /** + * Instantiate this class + * + * @param client Client to delete an index + */ + public DeleteIndexStep(Client client) { + this.client = client; + } + + @Override + public PlainActionFuture execute( + String currentNodeId, + WorkflowData currentNodeInputs, + Map outputs, + Map previousNodeInputs, + Map params + ) { + PlainActionFuture deleteIndexFuture = PlainActionFuture.newFuture(); + + try { + Map inputs = ParseUtils.getInputsFromPreviousSteps( + REQUIRED_INPUTS, + OPTIONAL_INPUTS, + currentNodeInputs, + outputs, + previousNodeInputs, + params + ); + + String indexName = (String) inputs.get(INDEX_NAME); + + DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName); + + client.admin().indices().delete(deleteIndexRequest, ActionListener.wrap(acknowledgedResponse -> { + logger.info("Deleted index: {}", indexName); + deleteIndexFuture.onResponse( + new WorkflowData( + Map.ofEntries(Map.entry(INDEX_NAME, indexName)), + currentNodeInputs.getWorkflowId(), + currentNodeInputs.getNodeId() + ) + ); + }, ex -> { + Exception e = getSafeException(ex); + String errorMessage = (e == null ? "Failed to delete the index " + indexName : e.getMessage()); + logger.error(errorMessage, e); + deleteIndexFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); + })); + } catch (Exception e) { + deleteIndexFuture.onFailure(e); + } + return deleteIndexFuture; + } + + @Override + public String getName() { + return NAME; + } + + @Override + public boolean allowDeleteRequired() { + return true; + } +} diff --git a/src/main/java/org/opensearch/flowframework/workflow/DeleteIngestPipelineStep.java b/src/main/java/org/opensearch/flowframework/workflow/DeleteIngestPipelineStep.java new file mode 100644 index 000000000..473b1655c --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/workflow/DeleteIngestPipelineStep.java @@ -0,0 +1,108 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.ExceptionsHelper; +import org.opensearch.action.ingest.DeletePipelineRequest; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.client.Client; +import org.opensearch.core.action.ActionListener; +import org.opensearch.flowframework.exception.WorkflowStepException; +import org.opensearch.flowframework.util.ParseUtils; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +import static org.opensearch.flowframework.common.WorkflowResources.PIPELINE_ID; +import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException; + +/** + * Step to delete an ingest pipeline + */ +public class DeleteIngestPipelineStep implements WorkflowStep { + + private static final Logger logger = LogManager.getLogger(DeleteIngestPipelineStep.class); + private final Client client; + + /** The name of this step, used as a key in the template and the {@link WorkflowStepFactory} */ + public static final String NAME = "delete_ingest_pipeline"; + /** Required input keys */ + public static final Set REQUIRED_INPUTS = Set.of(PIPELINE_ID); + /** Optional input keys */ + public static final Set OPTIONAL_INPUTS = Collections.emptySet(); + /** Provided output keys */ + public static final Set PROVIDED_OUTPUTS = Set.of(PIPELINE_ID); + + /** + * Instantiate this class + * + * @param client Client to delete an Ingest Pipeline + */ + public DeleteIngestPipelineStep(Client client) { + this.client = client; + } + + @Override + public PlainActionFuture execute( + String currentNodeId, + WorkflowData currentNodeInputs, + Map outputs, + Map previousNodeInputs, + Map params + ) { + PlainActionFuture deletePipelineFuture = PlainActionFuture.newFuture(); + + try { + Map inputs = ParseUtils.getInputsFromPreviousSteps( + REQUIRED_INPUTS, + OPTIONAL_INPUTS, + currentNodeInputs, + outputs, + previousNodeInputs, + params + ); + + String pipelineId = (String) inputs.get(PIPELINE_ID); + + DeletePipelineRequest deletePipelineRequest = new DeletePipelineRequest(pipelineId); + + client.admin().cluster().deletePipeline(deletePipelineRequest, ActionListener.wrap(acknowledgedResponse -> { + logger.info("Deleted IngestPipeline: {}", pipelineId); + deletePipelineFuture.onResponse( + new WorkflowData( + Map.ofEntries(Map.entry(PIPELINE_ID, pipelineId)), + currentNodeInputs.getWorkflowId(), + currentNodeInputs.getNodeId() + ) + ); + }, ex -> { + Exception e = getSafeException(ex); + String errorMessage = (e == null ? "Failed to delete the ingest pipeline " + pipelineId : e.getMessage()); + logger.error(errorMessage, e); + deletePipelineFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); + })); + } catch (Exception e) { + deletePipelineFuture.onFailure(e); + } + return deletePipelineFuture; + } + + @Override + public String getName() { + return NAME; + } + + @Override + public boolean allowDeleteRequired() { + return true; + } +} diff --git a/src/main/java/org/opensearch/flowframework/workflow/DeleteSearchPipelineStep.java b/src/main/java/org/opensearch/flowframework/workflow/DeleteSearchPipelineStep.java new file mode 100644 index 000000000..a391ef20d --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/workflow/DeleteSearchPipelineStep.java @@ -0,0 +1,108 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.ExceptionsHelper; +import org.opensearch.action.search.DeleteSearchPipelineRequest; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.client.Client; +import org.opensearch.core.action.ActionListener; +import org.opensearch.flowframework.exception.WorkflowStepException; +import org.opensearch.flowframework.util.ParseUtils; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +import static org.opensearch.flowframework.common.WorkflowResources.PIPELINE_ID; +import static org.opensearch.flowframework.exception.WorkflowStepException.getSafeException; + +/** + * Step to delete a search pipeline + */ +public class DeleteSearchPipelineStep implements WorkflowStep { + + private static final Logger logger = LogManager.getLogger(DeleteSearchPipelineStep.class); + private final Client client; + + /** The name of this step, used as a key in the template and the {@link WorkflowStepFactory} */ + public static final String NAME = "delete_search_pipeline"; + /** Required input keys */ + public static final Set REQUIRED_INPUTS = Set.of(PIPELINE_ID); + /** Optional input keys */ + public static final Set OPTIONAL_INPUTS = Collections.emptySet(); + /** Provided output keys */ + public static final Set PROVIDED_OUTPUTS = Set.of(PIPELINE_ID); + + /** + * Instantiate this class + * + * @param client Client to delete a Search Pipeline + */ + public DeleteSearchPipelineStep(Client client) { + this.client = client; + } + + @Override + public PlainActionFuture execute( + String currentNodeId, + WorkflowData currentNodeInputs, + Map outputs, + Map previousNodeInputs, + Map params + ) { + PlainActionFuture deleteSearchPipelineFuture = PlainActionFuture.newFuture(); + + try { + Map inputs = ParseUtils.getInputsFromPreviousSteps( + REQUIRED_INPUTS, + OPTIONAL_INPUTS, + currentNodeInputs, + outputs, + previousNodeInputs, + params + ); + + String pipelineId = (String) inputs.get(PIPELINE_ID); + + DeleteSearchPipelineRequest deleteSearchPipelineRequest = new DeleteSearchPipelineRequest(pipelineId); + + client.admin().cluster().deleteSearchPipeline(deleteSearchPipelineRequest, ActionListener.wrap(acknowledgedResponse -> { + logger.info("Deleted SearchPipeline: {}", pipelineId); + deleteSearchPipelineFuture.onResponse( + new WorkflowData( + Map.ofEntries(Map.entry(PIPELINE_ID, pipelineId)), + currentNodeInputs.getWorkflowId(), + currentNodeInputs.getNodeId() + ) + ); + }, ex -> { + Exception e = getSafeException(ex); + String errorMessage = (e == null ? "Failed to delete the search pipeline " + pipelineId : e.getMessage()); + logger.error(errorMessage, e); + deleteSearchPipelineFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); + })); + } catch (Exception e) { + deleteSearchPipelineFuture.onFailure(e); + } + return deleteSearchPipelineFuture; + } + + @Override + public String getName() { + return NAME; + } + + @Override + public boolean allowDeleteRequired() { + return true; + } +} diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java index ebc8be094..0456e5bbf 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java @@ -39,4 +39,12 @@ PlainActionFuture execute( * @return the name of this workflow step. */ String getName(); + + /** + * For steps which delete data, override this method to require the resource ID to be specified on the rest path to deprovision it + * @return true if the resource ID must be specified for deprovisioning + */ + default boolean allowDeleteRequired() { + return false; + } } diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java index 7ab5c1061..f91367cf0 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java @@ -22,6 +22,7 @@ import org.opensearch.ml.client.MachineLearningNodeClient; import org.opensearch.threadpool.ThreadPool; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -86,6 +87,7 @@ public WorkflowStepFactory( ) { stepMap.put(NoOpStep.NAME, NoOpStep::new); stepMap.put(CreateIndexStep.NAME, () -> new CreateIndexStep(client, flowFrameworkIndicesHandler)); + stepMap.put(DeleteIndexStep.NAME, () -> new DeleteIndexStep(client)); stepMap.put(ReindexStep.NAME, () -> new ReindexStep(client, flowFrameworkIndicesHandler)); stepMap.put( RegisterLocalCustomModelStep.NAME, @@ -113,21 +115,31 @@ public WorkflowStepFactory( stepMap.put(RegisterAgentStep.NAME, () -> new RegisterAgentStep(mlClient, flowFrameworkIndicesHandler)); stepMap.put(DeleteAgentStep.NAME, () -> new DeleteAgentStep(mlClient)); stepMap.put(CreateIngestPipelineStep.NAME, () -> new CreateIngestPipelineStep(client, flowFrameworkIndicesHandler)); + stepMap.put(DeleteIngestPipelineStep.NAME, () -> new DeleteIngestPipelineStep(client)); stepMap.put(CreateSearchPipelineStep.NAME, () -> new CreateSearchPipelineStep(client, flowFrameworkIndicesHandler)); + stepMap.put(DeleteSearchPipelineStep.NAME, () -> new DeleteSearchPipelineStep(client)); } /** * Enum encapsulating the different step names, their inputs, outputs, required plugin and timeout of the step */ - public enum WorkflowSteps { /** Noop Step */ - NOOP("noop", Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), null), + NOOP(NoOpStep.NAME, Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), null), /** Create Index Step */ CREATE_INDEX(CreateIndexStep.NAME, List.of(INDEX_NAME, CONFIGURATIONS), List.of(INDEX_NAME), Collections.emptyList(), null), + /** Delete Index Step */ + DELETE_INDEX( + DeleteIndexStep.NAME, + DeleteIndexStep.REQUIRED_INPUTS, // TODO: Copy this pattern to other steps, see + DeleteIndexStep.PROVIDED_OUTPUTS, // https://github.com/opensearch-project/flow-framework/issues/535 + Collections.emptyList(), + null + ), + /** Create ReIndex Step */ REINDEX(ReindexStep.NAME, List.of(SOURCE_INDEX, DESTINATION_INDEX), List.of(ReindexStep.NAME), Collections.emptyList(), null), @@ -225,6 +237,15 @@ public enum WorkflowSteps { null ), + /** Delete Ingest Pipeline Step */ + DELETE_INGEST_PIPELINE( + DeleteIngestPipelineStep.NAME, + DeleteIngestPipelineStep.REQUIRED_INPUTS, + DeleteIngestPipelineStep.PROVIDED_OUTPUTS, + Collections.emptyList(), + null + ), + /** Create Search Pipeline Step */ CREATE_SEARCH_PIPELINE( CreateSearchPipelineStep.NAME, @@ -232,6 +253,15 @@ public enum WorkflowSteps { List.of(PIPELINE_ID), Collections.emptyList(), null + ), + + /** Delete Search Pipeline Step */ + DELETE_SEARCH_PIPELINE( + DeleteSearchPipelineStep.NAME, + DeleteSearchPipelineStep.REQUIRED_INPUTS, + DeleteSearchPipelineStep.PROVIDED_OUTPUTS, + Collections.emptyList(), + null ); private final String workflowStepName; @@ -240,7 +270,13 @@ public enum WorkflowSteps { private final List requiredPlugins; private final TimeValue timeout; - WorkflowSteps(String workflowStepName, List inputs, List outputs, List requiredPlugins, TimeValue timeout) { + WorkflowSteps( + String workflowStepName, + Collection inputs, + Collection outputs, + List requiredPlugins, + TimeValue timeout + ) { this.workflowStepName = workflowStepName; this.inputs = List.copyOf(inputs); this.outputs = List.copyOf(outputs); diff --git a/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java b/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java index 922c26b0f..784749fc0 100644 --- a/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java +++ b/src/test/java/org/opensearch/flowframework/FlowFrameworkRestTestCase.java @@ -470,6 +470,24 @@ protected Response deprovisionWorkflow(RestClient client, String workflowId) thr ); } + /** + * Helper method to invoke the Deprovision Workflow Rest Action + * @param client the rest client + * @param workflowId the workflow ID to deprovision + * @return a rest response + * @throws Exception if the request fails + */ + protected Response deprovisionWorkflowWithAllowDelete(RestClient client, String workflowId, String allowedResource) throws Exception { + return TestHelpers.makeRequest( + client, + "POST", + String.format(Locale.ROOT, "%s/%s/%s%s", WORKFLOW_URI, workflowId, "_deprovision?allow_delete=", allowedResource), + Collections.emptyMap(), + "", + null + ); + } + /** * Helper method to invoke the Delete Workflow Rest Action * @param client the rest client diff --git a/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java b/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java index 19cb3d718..a9df22a82 100644 --- a/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java +++ b/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java @@ -46,63 +46,7 @@ public void testParseWorkflowValidator() throws IOException { WorkflowValidator validator = new WorkflowValidator(workflowStepValidators); - assertEquals(18, validator.getWorkflowStepValidators().size()); - - assertTrue(validator.getWorkflowStepValidators().keySet().contains("create_connector")); - assertEquals(7, validator.getWorkflowStepValidators().get("create_connector").getInputs().size()); - assertEquals(1, validator.getWorkflowStepValidators().get("create_connector").getOutputs().size()); - - assertTrue(validator.getWorkflowStepValidators().keySet().contains("delete_model")); - assertEquals(1, validator.getWorkflowStepValidators().get("delete_model").getInputs().size()); - assertEquals(1, validator.getWorkflowStepValidators().get("delete_model").getOutputs().size()); - - assertTrue(validator.getWorkflowStepValidators().keySet().contains("deploy_model")); - assertEquals(1, validator.getWorkflowStepValidators().get("deploy_model").getInputs().size()); - assertEquals(1, validator.getWorkflowStepValidators().get("deploy_model").getOutputs().size()); - - assertTrue(validator.getWorkflowStepValidators().keySet().contains("register_remote_model")); - assertEquals(2, validator.getWorkflowStepValidators().get("register_remote_model").getInputs().size()); - assertEquals(2, validator.getWorkflowStepValidators().get("register_remote_model").getOutputs().size()); - - assertTrue(validator.getWorkflowStepValidators().keySet().contains("register_model_group")); - assertEquals(1, validator.getWorkflowStepValidators().get("register_model_group").getInputs().size()); - assertEquals(2, validator.getWorkflowStepValidators().get("register_model_group").getOutputs().size()); - - assertTrue(validator.getWorkflowStepValidators().keySet().contains("register_local_custom_model")); - assertEquals(9, validator.getWorkflowStepValidators().get("register_local_custom_model").getInputs().size()); - assertEquals(2, validator.getWorkflowStepValidators().get("register_local_custom_model").getOutputs().size()); - - assertTrue(validator.getWorkflowStepValidators().keySet().contains("register_local_sparse_encoding_model")); - assertEquals(3, validator.getWorkflowStepValidators().get("register_local_sparse_encoding_model").getInputs().size()); - assertEquals(5, validator.getWorkflowStepValidators().get("register_local_sparse_encoding_model").getOutputs().size()); - - assertTrue(validator.getWorkflowStepValidators().keySet().contains("register_local_pretrained_model")); - assertEquals(3, validator.getWorkflowStepValidators().get("register_local_pretrained_model").getInputs().size()); - assertEquals(2, validator.getWorkflowStepValidators().get("register_local_pretrained_model").getOutputs().size()); - - assertTrue(validator.getWorkflowStepValidators().keySet().contains("undeploy_model")); - assertEquals(1, validator.getWorkflowStepValidators().get("undeploy_model").getInputs().size()); - assertEquals(1, validator.getWorkflowStepValidators().get("undeploy_model").getOutputs().size()); - - assertTrue(validator.getWorkflowStepValidators().keySet().contains("delete_connector")); - assertEquals(1, validator.getWorkflowStepValidators().get("delete_connector").getInputs().size()); - assertEquals(1, validator.getWorkflowStepValidators().get("delete_connector").getOutputs().size()); - - assertTrue(validator.getWorkflowStepValidators().keySet().contains("register_agent")); - assertEquals(2, validator.getWorkflowStepValidators().get("register_agent").getInputs().size()); - assertEquals(1, validator.getWorkflowStepValidators().get("register_agent").getOutputs().size()); - - assertTrue(validator.getWorkflowStepValidators().keySet().contains("delete_agent")); - assertEquals(1, validator.getWorkflowStepValidators().get("delete_agent").getInputs().size()); - assertEquals(1, validator.getWorkflowStepValidators().get("delete_agent").getOutputs().size()); - - assertTrue(validator.getWorkflowStepValidators().keySet().contains("create_tool")); - assertEquals(1, validator.getWorkflowStepValidators().get("create_tool").getInputs().size()); - assertEquals(1, validator.getWorkflowStepValidators().get("create_tool").getOutputs().size()); - - assertTrue(validator.getWorkflowStepValidators().keySet().contains("noop")); - assertEquals(0, validator.getWorkflowStepValidators().get("noop").getInputs().size()); - assertEquals(0, validator.getWorkflowStepValidators().get("noop").getOutputs().size()); + assertEquals(21, validator.getWorkflowStepValidators().size()); } public void testWorkflowStepFactoryHasValidators() throws IOException { @@ -130,6 +74,11 @@ public void testWorkflowStepFactoryHasValidators() throws IOException { // Check if each registered step has a corresponding validator definition assertTrue(registeredWorkflowStepTypes.containsAll(registeredWorkflowValidatorTypes)); assertTrue(registeredWorkflowValidatorTypes.containsAll(registeredWorkflowStepTypes)); - } + // Check JSON + String json = workflowValidator.toJson(); + for (String step : registeredWorkflowStepTypes) { + assertTrue(json.contains("\"" + step + "\"")); + } + } } diff --git a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java index b12137070..f6d54419d 100644 --- a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java +++ b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java @@ -9,8 +9,6 @@ package org.opensearch.flowframework.rest; import org.apache.hc.core5.http.io.entity.EntityUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.opensearch.action.ingest.GetPipelineResponse; import org.opensearch.action.search.SearchResponse; import org.opensearch.client.Response; @@ -50,8 +48,6 @@ import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID; public class FlowFrameworkRestApiIT extends FlowFrameworkRestTestCase { - private static final Logger logger = LogManager.getLogger(FlowFrameworkRestApiIT.class); - private static AtomicBoolean waitToStart = new AtomicBoolean(true); @Before @@ -409,7 +405,7 @@ public void testCreateAndProvisionIngestAndSearchPipeline() throws Exception { "create_ingest_pipeline" ); - List workflowStepNames = resourcesCreated.stream() + List workflowStepNames = resourcesCreated.stream() .peek(resourceCreated -> assertNotNull(resourceCreated.resourceId())) .map(ResourceCreated::workflowStepName) .collect(Collectors.toList()); @@ -457,7 +453,7 @@ public void testDefaultCohereUseCase() throws Exception { List expectedStepNames = List.of("create_connector", "register_remote_model", "deploy_model"); - List workflowStepNames = resourcesCreated.stream() + List workflowStepNames = resourcesCreated.stream() .peek(resourceCreated -> assertNotNull(resourceCreated.resourceId())) .map(ResourceCreated::workflowStepName) .collect(Collectors.toList()); @@ -548,6 +544,7 @@ public void testSemanticSearchWithLocalModelEndToEnd() throws Exception { // This template should create 4 resources, registered model_id, deployed model_id, ingest pipeline, and index name assertEquals(4, resourcesCreated.size()); String modelId = resourcesCreated.get(1).resourceId(); + String pipelineId = resourcesCreated.get(2).resourceId(); String indexName = resourcesCreated.get(3).resourceId(); // Short wait before ingesting data @@ -560,32 +557,45 @@ public void testSemanticSearchWithLocalModelEndToEnd() throws Exception { SearchResponse neuralSearchResponse = neuralSearchRequest(indexName, modelId); assertNotNull(neuralSearchResponse); Thread.sleep(500); - deleteIndex(indexName); - // Hit Deprovision API - // By design, this may not completely deprovision the first time if it takes >2s to process removals - Response deprovisionResponse = deprovisionWorkflow(client(), workflowId); + // Hit Deprovision API using allow_delete but only for the pipeline + Response deprovisionResponse = null; try { + // By design, this may not completely deprovision the first time if it takes >2s to process removals + deprovisionResponse = deprovisionWorkflowWithAllowDelete(client(), workflowId, pipelineId); assertBusy( () -> { getAndAssertWorkflowStatus(client(), workflowId, State.NOT_STARTED, ProvisioningProgress.NOT_STARTED); }, 30, TimeUnit.SECONDS ); + } catch (ResponseException re) { + // 403 return if completed with only index remaining to delete + assertEquals(RestStatus.FORBIDDEN, TestHelpers.restStatus(re.getResponse())); } catch (ComparisonFailure e) { // 202 return if still processing + assertNotNull(deprovisionResponse); assertEquals(RestStatus.ACCEPTED, TestHelpers.restStatus(deprovisionResponse)); } - if (TestHelpers.restStatus(deprovisionResponse) == RestStatus.ACCEPTED) { + if (deprovisionResponse != null && TestHelpers.restStatus(deprovisionResponse) == RestStatus.ACCEPTED) { // Short wait before we try again Thread.sleep(10000); - deprovisionResponse = deprovisionWorkflow(client(), workflowId); - assertBusy( - () -> { getAndAssertWorkflowStatus(client(), workflowId, State.NOT_STARTED, ProvisioningProgress.NOT_STARTED); }, - 30, - TimeUnit.SECONDS - ); + // Expected failure since we haven't provided allow_delete param + try { + deprovisionResponse = deprovisionWorkflowWithAllowDelete(client(), workflowId, pipelineId); + } catch (ResponseException re) { + // Expected 403 return with only index remaining to delete + assertEquals(RestStatus.FORBIDDEN, TestHelpers.restStatus(re.getResponse())); + } } + // Now try again with allow_delete for the index + deprovisionResponse = deprovisionWorkflowWithAllowDelete(client(), workflowId, pipelineId + "," + indexName); + assertBusy( + () -> { getAndAssertWorkflowStatus(client(), workflowId, State.NOT_STARTED, ProvisioningProgress.NOT_STARTED); }, + 30, + TimeUnit.SECONDS + ); assertEquals(RestStatus.OK, TestHelpers.restStatus(deprovisionResponse)); + // Hit Delete API Response deleteResponse = deleteWorkflow(client(), workflowId); assertEquals(RestStatus.OK, TestHelpers.restStatus(deleteResponse)); diff --git a/src/test/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowActionTests.java b/src/test/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowActionTests.java index e9a2c5a47..0d24b033a 100644 --- a/src/test/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowActionTests.java +++ b/src/test/java/org/opensearch/flowframework/rest/RestDeprovisionWorkflowActionTests.java @@ -9,8 +9,12 @@ package org.opensearch.flowframework.rest; import org.opensearch.client.node.NodeClient; +import org.opensearch.core.action.ActionListener; import org.opensearch.core.rest.RestStatus; import org.opensearch.flowframework.common.FlowFrameworkSettings; +import org.opensearch.flowframework.transport.DeprovisionWorkflowAction; +import org.opensearch.flowframework.transport.WorkflowRequest; +import org.opensearch.flowframework.transport.WorkflowResponse; import org.opensearch.rest.RestHandler.Route; import org.opensearch.rest.RestRequest; import org.opensearch.test.OpenSearchTestCase; @@ -19,9 +23,15 @@ import java.util.List; import java.util.Locale; +import java.util.Map; +import static org.opensearch.flowframework.common.CommonValue.ALLOW_DELETE; +import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; public class RestDeprovisionWorkflowActionTests extends OpenSearchTestCase { @@ -37,7 +47,7 @@ public void setUp() throws Exception { flowFrameworkFeatureEnabledSetting = mock(FlowFrameworkSettings.class); when(flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()).thenReturn(true); - this.deprovisionWorkflowRestAction = new RestDeprovisionWorkflowAction(flowFrameworkFeatureEnabledSetting); + this.deprovisionWorkflowRestAction = spy(new RestDeprovisionWorkflowAction(flowFrameworkFeatureEnabledSetting)); this.deprovisionWorkflowPath = String.format(Locale.ROOT, "%s/{%s}/%s", WORKFLOW_URI, "workflow_id", "_deprovision"); this.nodeClient = mock(NodeClient.class); } @@ -55,7 +65,6 @@ public void testRestDeprovisiionWorkflowActionRoutes() { } public void testNullWorkflowId() throws Exception { - // Request with no params RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) .withPath(this.deprovisionWorkflowPath) @@ -69,6 +78,25 @@ public void testNullWorkflowId() throws Exception { assertTrue(channel.capturedResponse().content().utf8ToString().contains("workflow_id cannot be null")); } + public void testAllowDeleteParam() throws Exception { + String allowDeleteParam = "foo,bar"; + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) + .withPath(this.deprovisionWorkflowPath) + .withParams(Map.ofEntries(Map.entry(WORKFLOW_ID, "workflow_id"), Map.entry(ALLOW_DELETE, allowDeleteParam))) + .build(); + FakeRestChannel channel = new FakeRestChannel(request, true, 1); + doAnswer(invocation -> { + WorkflowRequest workflowRequest = invocation.getArgument(1); + ActionListener responseListener = invocation.getArgument(2); + responseListener.onResponse(new WorkflowResponse(workflowRequest.getParams().get(ALLOW_DELETE))); + return null; + }).when(nodeClient).execute(any(DeprovisionWorkflowAction.class), any(WorkflowRequest.class), any()); + + deprovisionWorkflowRestAction.handleRequest(request, channel, nodeClient); + assertEquals(RestStatus.OK, channel.capturedResponse().status()); + assertTrue(channel.capturedResponse().content().utf8ToString().contains(allowDeleteParam)); + } + public void testFeatureFlagNotEnabled() throws Exception { when(flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()).thenReturn(false); RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST) @@ -76,6 +104,7 @@ public void testFeatureFlagNotEnabled() throws Exception { .build(); FakeRestChannel channel = new FakeRestChannel(request, false, 1); deprovisionWorkflowRestAction.handleRequest(request, channel, nodeClient); + assertEquals(1, channel.errors().get()); assertEquals(RestStatus.FORBIDDEN, channel.capturedResponse().status()); assertTrue(channel.capturedResponse().content().utf8ToString().contains("This API is disabled.")); } diff --git a/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java index 9265c4c63..51561c28e 100644 --- a/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/DeprovisionWorkflowTransportActionTests.java @@ -17,11 +17,16 @@ import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; import org.opensearch.flowframework.common.FlowFrameworkSettings; +import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.flowframework.model.ResourceCreated; import org.opensearch.flowframework.model.WorkflowState; import org.opensearch.flowframework.workflow.DeleteConnectorStep; +import org.opensearch.flowframework.workflow.DeleteIndexStep; +import org.opensearch.flowframework.workflow.DeleteIngestPipelineStep; +import org.opensearch.flowframework.workflow.UndeployModelStep; import org.opensearch.flowframework.workflow.WorkflowData; import org.opensearch.flowframework.workflow.WorkflowStepFactory; import org.opensearch.tasks.Task; @@ -33,16 +38,20 @@ import org.junit.AfterClass; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.mockito.ArgumentCaptor; +import static org.opensearch.flowframework.common.CommonValue.ALLOW_DELETE; import static org.opensearch.flowframework.common.CommonValue.DEPROVISION_WORKFLOW_THREAD_POOL; import static org.opensearch.flowframework.common.CommonValue.FLOW_FRAMEWORK_THREAD_POOL_PREFIX; import static org.opensearch.flowframework.common.WorkflowResources.CONNECTOR_ID; +import static org.opensearch.flowframework.common.WorkflowResources.INDEX_NAME; import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID; +import static org.opensearch.flowframework.common.WorkflowResources.PIPELINE_ID; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anyString; @@ -68,6 +77,9 @@ public class DeprovisionWorkflowTransportActionTests extends OpenSearchTestCase private Client client; private WorkflowStepFactory workflowStepFactory; private DeleteConnectorStep deleteConnectorStep; + private UndeployModelStep undeployModelStep; + private DeleteIndexStep deleteIndexStep; + private DeleteIngestPipelineStep deleteIngestPipelineStep; private DeprovisionWorkflowTransportAction deprovisionWorkflowTransportAction; private FlowFrameworkIndicesHandler flowFrameworkIndicesHandler; private FlowFrameworkSettings flowFrameworkSettings; @@ -83,7 +95,15 @@ public void setUp() throws Exception { this.workflowStepFactory = mock(WorkflowStepFactory.class); this.deleteConnectorStep = mock(DeleteConnectorStep.class); - when(this.workflowStepFactory.createStep("delete_connector")).thenReturn(deleteConnectorStep); + when(this.workflowStepFactory.createStep(DeleteConnectorStep.NAME)).thenReturn(deleteConnectorStep); + this.undeployModelStep = mock(UndeployModelStep.class); + when(this.workflowStepFactory.createStep(UndeployModelStep.NAME)).thenReturn(undeployModelStep); + this.deleteIndexStep = mock(DeleteIndexStep.class); + when(this.deleteIndexStep.allowDeleteRequired()).thenReturn(true); + when(this.workflowStepFactory.createStep(DeleteIndexStep.NAME)).thenReturn(deleteIndexStep); + this.deleteIngestPipelineStep = mock(DeleteIngestPipelineStep.class); + when(this.deleteIngestPipelineStep.allowDeleteRequired()).thenReturn(true); + when(this.workflowStepFactory.createStep(DeleteIngestPipelineStep.NAME)).thenReturn(deleteIngestPipelineStep); this.flowFrameworkIndicesHandler = mock(FlowFrameworkIndicesHandler.class); flowFrameworkSettings = mock(FlowFrameworkSettings.class); @@ -108,9 +128,8 @@ public static void cleanup() { public void testDeprovisionWorkflow() throws Exception { String workflowId = "1"; - CountDownLatch latch = new CountDownLatch(1); @SuppressWarnings("unchecked") - ActionListener listener = spy(new LatchedActionListener(mock(ActionListener.class), latch)); + ActionListener listener = mock(ActionListener.class); WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null); doAnswer(invocation -> { @@ -133,10 +152,12 @@ public void testDeprovisionWorkflow() throws Exception { future.onResponse(WorkflowData.EMPTY); when(this.deleteConnectorStep.execute(anyString(), any(WorkflowData.class), anyMap(), anyMap(), anyMap())).thenReturn(future); - deprovisionWorkflowTransportAction.doExecute(mock(Task.class), workflowRequest, listener); - ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(WorkflowResponse.class); - + CountDownLatch latch = new CountDownLatch(1); + LatchedActionListener latchedActionListener = new LatchedActionListener<>(listener, latch); + deprovisionWorkflowTransportAction.doExecute(mock(Task.class), workflowRequest, latchedActionListener); latch.await(5, TimeUnit.SECONDS); + + ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(WorkflowResponse.class); verify(listener, times(1)).onResponse(responseCaptor.capture()); assertEquals(workflowId, responseCaptor.getValue().getWorkflowId()); } @@ -144,9 +165,8 @@ public void testDeprovisionWorkflow() throws Exception { public void testFailToDeprovision() throws Exception { String workflowId = "1"; - CountDownLatch latch = new CountDownLatch(1); @SuppressWarnings("unchecked") - ActionListener listener = spy(new LatchedActionListener(mock(ActionListener.class), latch)); + ActionListener listener = mock(ActionListener.class); WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null); doAnswer(invocation -> { @@ -161,13 +181,132 @@ public void testFailToDeprovision() throws Exception { PlainActionFuture future = PlainActionFuture.newFuture(); future.onFailure(new RuntimeException("rte")); - when(this.deleteConnectorStep.execute(anyString(), any(WorkflowData.class), anyMap(), anyMap(), anyMap())).thenReturn(future); - - deprovisionWorkflowTransportAction.doExecute(mock(Task.class), workflowRequest, listener); - ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + when(this.undeployModelStep.execute(anyString(), any(WorkflowData.class), anyMap(), anyMap(), anyMap())).thenReturn(future); + CountDownLatch latch = new CountDownLatch(1); + LatchedActionListener latchedActionListener = new LatchedActionListener<>(listener, latch); + deprovisionWorkflowTransportAction.doExecute(mock(Task.class), workflowRequest, latchedActionListener); latch.await(5, TimeUnit.SECONDS); + + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(FlowFrameworkException.class); + verify(listener, times(1)).onFailure(exceptionCaptor.capture()); + assertEquals(RestStatus.ACCEPTED, exceptionCaptor.getValue().getRestStatus()); assertEquals("Failed to deprovision some resources: [model_id modelId].", exceptionCaptor.getValue().getMessage()); } + + public void testAllowDeleteRequired() throws Exception { + String workflowId = "1"; + + @SuppressWarnings("unchecked") + ActionListener listener = mock(ActionListener.class); + + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(2); + + WorkflowState state = WorkflowState.builder() + .resourcesCreated(List.of(new ResourceCreated("create_index", "step_1", INDEX_NAME, "test-index"))) + .build(); + responseListener.onResponse(new GetWorkflowStateResponse(state, true)); + return null; + }).when(client).execute(any(GetWorkflowStateAction.class), any(GetWorkflowStateRequest.class), any()); + + doAnswer(invocation -> { + Consumer booleanConsumer = invocation.getArgument(1); + booleanConsumer.accept(Boolean.FALSE); + return null; + }).when(flowFrameworkIndicesHandler).doesTemplateExist(anyString(), any(), any()); + + // Test failure with no param + WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null); + + CountDownLatch latch = new CountDownLatch(1); + LatchedActionListener latchedActionListener = new LatchedActionListener<>(listener, latch); + deprovisionWorkflowTransportAction.doExecute(mock(Task.class), workflowRequest, latchedActionListener); + latch.await(5, TimeUnit.SECONDS); + + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(FlowFrameworkException.class); + + verify(listener, times(1)).onFailure(exceptionCaptor.capture()); + assertEquals(RestStatus.FORBIDDEN, exceptionCaptor.getValue().getRestStatus()); + assertEquals( + "These resources require the allow_delete parameter to deprovision: [index_name test-index].", + exceptionCaptor.getValue().getMessage() + ); + + // Test (2nd) failure with wrong allow_delete param + workflowRequest = new WorkflowRequest(workflowId, null, Map.of(ALLOW_DELETE, "wrong-index")); + + latch = new CountDownLatch(1); + latchedActionListener = new LatchedActionListener<>(listener, latch); + deprovisionWorkflowTransportAction.doExecute(mock(Task.class), workflowRequest, latchedActionListener); + latch.await(5, TimeUnit.SECONDS); + + exceptionCaptor = ArgumentCaptor.forClass(FlowFrameworkException.class); + verify(listener, times(2)).onFailure(exceptionCaptor.capture()); + assertEquals(RestStatus.FORBIDDEN, exceptionCaptor.getValue().getRestStatus()); + assertEquals( + "These resources require the allow_delete parameter to deprovision: [index_name test-index].", + exceptionCaptor.getValue().getMessage() + ); + + // Test success with correct allow_delete param + workflowRequest = new WorkflowRequest(workflowId, null, Map.of(ALLOW_DELETE, "wrong-index,test-index,other-index")); + + PlainActionFuture future = PlainActionFuture.newFuture(); + future.onResponse(WorkflowData.EMPTY); + when(this.deleteIndexStep.execute(anyString(), any(WorkflowData.class), anyMap(), anyMap(), anyMap())).thenReturn(future); + + latch = new CountDownLatch(1); + latchedActionListener = new LatchedActionListener<>(listener, latch); + deprovisionWorkflowTransportAction.doExecute(mock(Task.class), workflowRequest, latchedActionListener); + latch.await(5, TimeUnit.SECONDS); + + ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(WorkflowResponse.class); + verify(listener, times(1)).onResponse(responseCaptor.capture()); + assertEquals(workflowId, responseCaptor.getValue().getWorkflowId()); + } + + public void testFailToDeprovisionAndAllowDeleteRequired() throws Exception { + String workflowId = "1"; + + @SuppressWarnings("unchecked") + ActionListener listener = mock(ActionListener.class); + WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null, Map.of(ALLOW_DELETE, "wrong-index,test-pipeline")); + + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(2); + + WorkflowState state = WorkflowState.builder() + .resourcesCreated( + List.of( + new ResourceCreated("deploy_model", "step_1", MODEL_ID, "modelId"), + new ResourceCreated("create_index", "step_2", INDEX_NAME, "test-index"), + new ResourceCreated("create_ingest_pipeline", "step_3", PIPELINE_ID, "test-pipeline") + ) + ) + .build(); + responseListener.onResponse(new GetWorkflowStateResponse(state, true)); + return null; + }).when(client).execute(any(GetWorkflowStateAction.class), any(GetWorkflowStateRequest.class), any()); + + PlainActionFuture future = PlainActionFuture.newFuture(); + future.onFailure(new RuntimeException("rte")); + when(this.undeployModelStep.execute(anyString(), any(WorkflowData.class), anyMap(), anyMap(), anyMap())).thenReturn(future); + + CountDownLatch latch = new CountDownLatch(1); + LatchedActionListener latchedActionListener = new LatchedActionListener<>(listener, latch); + deprovisionWorkflowTransportAction.doExecute(mock(Task.class), workflowRequest, latchedActionListener); + latch.await(5, TimeUnit.SECONDS); + + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(FlowFrameworkException.class); + + verify(listener, times(1)).onFailure(exceptionCaptor.capture()); + assertEquals(RestStatus.ACCEPTED, exceptionCaptor.getValue().getRestStatus()); + assertEquals( + "Failed to deprovision some resources: [pipeline_id test-pipeline, model_id modelId]." + + " These resources require the allow_delete parameter to deprovision: [index_name test-index].", + exceptionCaptor.getValue().getMessage() + ); + } } diff --git a/src/test/java/org/opensearch/flowframework/workflow/DeleteIndexStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/DeleteIndexStepTests.java new file mode 100644 index 000000000..9ef574427 --- /dev/null +++ b/src/test/java/org/opensearch/flowframework/workflow/DeleteIndexStepTests.java @@ -0,0 +1,124 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.client.AdminClient; +import org.opensearch.client.Client; +import org.opensearch.client.IndicesAdminClient; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import static org.opensearch.flowframework.common.WorkflowResources.INDEX_NAME; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class DeleteIndexStepTests extends OpenSearchTestCase { + private WorkflowData inputData; + + @Mock + private Client client; + @Mock + private AdminClient adminClient; + @Mock + private IndicesAdminClient indicesAdminClient; + + @Override + public void setUp() throws Exception { + super.setUp(); + + MockitoAnnotations.openMocks(this); + when(client.admin()).thenReturn(adminClient); + when(adminClient.indices()).thenReturn(indicesAdminClient); + + inputData = new WorkflowData(Collections.emptyMap(), "test-id", "test-node-id"); + } + + public void testDeleteIndex() throws IOException, ExecutionException, InterruptedException { + + String indexName = randomAlphaOfLength(5); + DeleteIndexStep deleteIndexStep = new DeleteIndexStep(client); + + doAnswer(invocation -> { + @SuppressWarnings("deprecation") + ActionListener actionListener = invocation.getArgument(1); + actionListener.onResponse(new AcknowledgedResponse(true)); + return null; + }).when(indicesAdminClient).delete(any(DeleteIndexRequest.class), any()); + + PlainActionFuture future = deleteIndexStep.execute( + inputData.getNodeId(), + inputData, + Map.of("step_1", new WorkflowData(Map.of(INDEX_NAME, indexName), "workflowId", "nodeId")), + Map.of("step_1", INDEX_NAME), + Collections.emptyMap() + ); + verify(indicesAdminClient).delete(any(DeleteIndexRequest.class), any()); + + assertTrue(future.isDone()); + assertEquals(indexName, future.get().getContent().get(INDEX_NAME)); + } + + public void testNoIndexNameInOutput() throws IOException { + DeleteIndexStep deleteIndexStep = new DeleteIndexStep(client); + + PlainActionFuture future = deleteIndexStep.execute( + inputData.getNodeId(), + inputData, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap() + ); + + assertTrue(future.isDone()); + ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent()); + assertTrue(ex.getCause() instanceof FlowFrameworkException); + assertEquals("Missing required inputs [index_name] in workflow [test-id] node [test-node-id]", ex.getCause().getMessage()); + } + + public void testDeleteIndexFailure() throws IOException { + DeleteIndexStep deleteIndexStep = new DeleteIndexStep(client); + + doAnswer(invocation -> { + ActionListener actionListener = invocation.getArgument(1); + actionListener.onFailure(new FlowFrameworkException("Failed", RestStatus.INTERNAL_SERVER_ERROR)); + return null; + }).when(indicesAdminClient).delete(any(DeleteIndexRequest.class), any()); + + PlainActionFuture future = deleteIndexStep.execute( + inputData.getNodeId(), + inputData, + Map.of("step_1", new WorkflowData(Map.of(INDEX_NAME, "test"), "workflowId", "nodeId")), + Map.of("step_1", INDEX_NAME), + Collections.emptyMap() + ); + + verify(indicesAdminClient).delete(any(DeleteIndexRequest.class), any()); + + assertTrue(future.isDone()); + ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent()); + assertTrue(ex.getCause() instanceof FlowFrameworkException); + assertEquals("Failed to delete the index test", ex.getCause().getMessage()); + } +} diff --git a/src/test/java/org/opensearch/flowframework/workflow/DeleteIngestPipelineStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/DeleteIngestPipelineStepTests.java new file mode 100644 index 000000000..9b1c070bd --- /dev/null +++ b/src/test/java/org/opensearch/flowframework/workflow/DeleteIngestPipelineStepTests.java @@ -0,0 +1,124 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.action.ingest.DeletePipelineRequest; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.client.AdminClient; +import org.opensearch.client.Client; +import org.opensearch.client.ClusterAdminClient; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import static org.opensearch.flowframework.common.WorkflowResources.PIPELINE_ID; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class DeleteIngestPipelineStepTests extends OpenSearchTestCase { + private WorkflowData inputData; + + @Mock + private Client client; + @Mock + private AdminClient adminClient; + @Mock + private ClusterAdminClient clusterAdminClient; + + @Override + public void setUp() throws Exception { + super.setUp(); + + MockitoAnnotations.openMocks(this); + when(client.admin()).thenReturn(adminClient); + when(adminClient.cluster()).thenReturn(clusterAdminClient); + + inputData = new WorkflowData(Collections.emptyMap(), "test-id", "test-node-id"); + } + + public void testDeletePipeline() throws IOException, ExecutionException, InterruptedException { + + String pipelineId = randomAlphaOfLength(5); + DeleteIngestPipelineStep deleteIngestPipelineStep = new DeleteIngestPipelineStep(client); + + doAnswer(invocation -> { + @SuppressWarnings("deprecation") + ActionListener actionListener = invocation.getArgument(1); + actionListener.onResponse(new AcknowledgedResponse(true)); + return null; + }).when(clusterAdminClient).deletePipeline(any(DeletePipelineRequest.class), any()); + + PlainActionFuture future = deleteIngestPipelineStep.execute( + inputData.getNodeId(), + inputData, + Map.of("step_1", new WorkflowData(Map.of(PIPELINE_ID, pipelineId), "workflowId", "nodeId")), + Map.of("step_1", PIPELINE_ID), + Collections.emptyMap() + ); + verify(clusterAdminClient).deletePipeline(any(DeletePipelineRequest.class), any()); + + assertTrue(future.isDone()); + assertEquals(pipelineId, future.get().getContent().get(PIPELINE_ID)); + } + + public void testNoPipelineIdInOutput() throws IOException { + DeleteIngestPipelineStep deleteIngestPipelineStep = new DeleteIngestPipelineStep(client); + + PlainActionFuture future = deleteIngestPipelineStep.execute( + inputData.getNodeId(), + inputData, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap() + ); + + assertTrue(future.isDone()); + ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent()); + assertTrue(ex.getCause() instanceof FlowFrameworkException); + assertEquals("Missing required inputs [pipeline_id] in workflow [test-id] node [test-node-id]", ex.getCause().getMessage()); + } + + public void testDeletePipelineFailure() throws IOException { + DeleteIngestPipelineStep deleteIngestPipelineStep = new DeleteIngestPipelineStep(client); + + doAnswer(invocation -> { + ActionListener actionListener = invocation.getArgument(1); + actionListener.onFailure(new FlowFrameworkException("Failed", RestStatus.INTERNAL_SERVER_ERROR)); + return null; + }).when(clusterAdminClient).deletePipeline(any(DeletePipelineRequest.class), any()); + + PlainActionFuture future = deleteIngestPipelineStep.execute( + inputData.getNodeId(), + inputData, + Map.of("step_1", new WorkflowData(Map.of(PIPELINE_ID, "test"), "workflowId", "nodeId")), + Map.of("step_1", PIPELINE_ID), + Collections.emptyMap() + ); + + verify(clusterAdminClient).deletePipeline(any(DeletePipelineRequest.class), any()); + + assertTrue(future.isDone()); + ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent()); + assertTrue(ex.getCause() instanceof FlowFrameworkException); + assertEquals("Failed to delete the ingest pipeline test", ex.getCause().getMessage()); + } +} diff --git a/src/test/java/org/opensearch/flowframework/workflow/DeleteSearchPipelineStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/DeleteSearchPipelineStepTests.java new file mode 100644 index 000000000..4ebc7fd5b --- /dev/null +++ b/src/test/java/org/opensearch/flowframework/workflow/DeleteSearchPipelineStepTests.java @@ -0,0 +1,124 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.action.search.DeleteSearchPipelineRequest; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.client.AdminClient; +import org.opensearch.client.Client; +import org.opensearch.client.ClusterAdminClient; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import static org.opensearch.flowframework.common.WorkflowResources.PIPELINE_ID; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class DeleteSearchPipelineStepTests extends OpenSearchTestCase { + private WorkflowData inputData; + + @Mock + private Client client; + @Mock + private AdminClient adminClient; + @Mock + private ClusterAdminClient clusterAdminClient; + + @Override + public void setUp() throws Exception { + super.setUp(); + + MockitoAnnotations.openMocks(this); + when(client.admin()).thenReturn(adminClient); + when(adminClient.cluster()).thenReturn(clusterAdminClient); + + inputData = new WorkflowData(Collections.emptyMap(), "test-id", "test-node-id"); + } + + public void testDeleteSearchPipeline() throws IOException, ExecutionException, InterruptedException { + + String pipelineId = randomAlphaOfLength(5); + DeleteSearchPipelineStep deleteSearchPipelineStep = new DeleteSearchPipelineStep(client); + + doAnswer(invocation -> { + @SuppressWarnings("deprecation") + ActionListener actionListener = invocation.getArgument(1); + actionListener.onResponse(new AcknowledgedResponse(true)); + return null; + }).when(clusterAdminClient).deleteSearchPipeline(any(DeleteSearchPipelineRequest.class), any()); + + PlainActionFuture future = deleteSearchPipelineStep.execute( + inputData.getNodeId(), + inputData, + Map.of("step_1", new WorkflowData(Map.of(PIPELINE_ID, pipelineId), "workflowId", "nodeId")), + Map.of("step_1", PIPELINE_ID), + Collections.emptyMap() + ); + verify(clusterAdminClient).deleteSearchPipeline(any(DeleteSearchPipelineRequest.class), any()); + + assertTrue(future.isDone()); + assertEquals(pipelineId, future.get().getContent().get(PIPELINE_ID)); + } + + public void testNoPipelineIdInOutput() throws IOException { + DeleteSearchPipelineStep deleteSearchPipelineStep = new DeleteSearchPipelineStep(client); + + PlainActionFuture future = deleteSearchPipelineStep.execute( + inputData.getNodeId(), + inputData, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap() + ); + + assertTrue(future.isDone()); + ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent()); + assertTrue(ex.getCause() instanceof FlowFrameworkException); + assertEquals("Missing required inputs [pipeline_id] in workflow [test-id] node [test-node-id]", ex.getCause().getMessage()); + } + + public void testDeleteSearchPipelineFailure() throws IOException { + DeleteSearchPipelineStep deleteSearchPipelineStep = new DeleteSearchPipelineStep(client); + + doAnswer(invocation -> { + ActionListener actionListener = invocation.getArgument(1); + actionListener.onFailure(new FlowFrameworkException("Failed", RestStatus.INTERNAL_SERVER_ERROR)); + return null; + }).when(clusterAdminClient).deleteSearchPipeline(any(DeleteSearchPipelineRequest.class), any()); + + PlainActionFuture future = deleteSearchPipelineStep.execute( + inputData.getNodeId(), + inputData, + Map.of("step_1", new WorkflowData(Map.of(PIPELINE_ID, "test"), "workflowId", "nodeId")), + Map.of("step_1", PIPELINE_ID), + Collections.emptyMap() + ); + + verify(clusterAdminClient).deleteSearchPipeline(any(DeleteSearchPipelineRequest.class), any()); + + assertTrue(future.isDone()); + ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent()); + assertTrue(ex.getCause() instanceof FlowFrameworkException); + assertEquals("Failed to delete the search pipeline test", ex.getCause().getMessage()); + } +}