From 9f19dd0a5e724e30b7493a6cc2a6dd8fd1e4cd58 Mon Sep 17 00:00:00 2001 From: Amit Galitzky Date: Tue, 12 Mar 2024 14:40:07 -0700 Subject: [PATCH] adding create search pipeline step Signed-off-by: Amit Galitzky --- CHANGELOG.md | 1 + .../common/WorkflowResources.java | 5 +- .../workflow/AbstractCreatePipelineStep.java | 161 +++++++++++++++++ .../workflow/CreateIngestPipelineStep.java | 114 +----------- .../workflow/CreateSearchPipelineStep.java | 38 ++++ .../workflow/WorkflowStepFactory.java | 10 ++ .../model/WorkflowValidatorTests.java | 2 +- .../rest/FlowFrameworkRestApiIT.java | 32 ++-- .../flowframework/util/ParseUtilsTests.java | 6 + .../CreateIngestPipelineStepTests.java | 4 +- .../CreateSearchPipelineStepTests.java | 166 ++++++++++++++++++ ...n => ingest-search-pipeline-template.json} | 25 +++ 12 files changed, 436 insertions(+), 128 deletions(-) create mode 100644 src/main/java/org/opensearch/flowframework/workflow/AbstractCreatePipelineStep.java create mode 100644 src/main/java/org/opensearch/flowframework/workflow/CreateSearchPipelineStep.java create mode 100644 src/test/java/org/opensearch/flowframework/workflow/CreateSearchPipelineStepTests.java rename src/test/resources/template/{ingest-pipeline-template.json => ingest-search-pipeline-template.json} (76%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 66a7e67b7..c0e3c849f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) ## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.12...2.x) ### Features - Adding create ingest pipeline step ([#558](https://github.com/opensearch-project/flow-framework/pull/558)) +- adding create search pipeline step ([#569](https://github.com/opensearch-project/flow-framework/pull/569)) ### Enhancements - Substitute REST path or body parameters in Workflow Steps ([#525](https://github.com/opensearch-project/flow-framework/pull/525)) diff --git a/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java b/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java index 50c37b2ca..b0069afe0 100644 --- a/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java +++ b/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java @@ -15,6 +15,7 @@ import org.opensearch.flowframework.workflow.CreateConnectorStep; import org.opensearch.flowframework.workflow.CreateIndexStep; import org.opensearch.flowframework.workflow.CreateIngestPipelineStep; +import org.opensearch.flowframework.workflow.CreateSearchPipelineStep; import org.opensearch.flowframework.workflow.DeleteAgentStep; import org.opensearch.flowframework.workflow.DeleteConnectorStep; import org.opensearch.flowframework.workflow.DeleteModelStep; @@ -56,7 +57,9 @@ public enum WorkflowResources { /** Workflow steps for creating an index and associated created resource */ CREATE_INDEX(CreateIndexStep.NAME, WorkflowResources.INDEX_NAME, null), // TODO delete step /** Workflow steps for registering/deleting an agent and the associated created resource */ - REGISTER_AGENT(RegisterAgentStep.NAME, WorkflowResources.AGENT_ID, DeleteAgentStep.NAME); + REGISTER_AGENT(RegisterAgentStep.NAME, WorkflowResources.AGENT_ID, DeleteAgentStep.NAME), + /** Workflow steps for creating an ingest-pipeline and associated created resource */ + CREATE_SEARCH_PIPELINE(CreateSearchPipelineStep.NAME, WorkflowResources.PIPELINE_ID, null); // TODO delete step /** Connector Id for a remote model connector */ public static final String CONNECTOR_ID = "connector_id"; diff --git a/src/main/java/org/opensearch/flowframework/workflow/AbstractCreatePipelineStep.java b/src/main/java/org/opensearch/flowframework/workflow/AbstractCreatePipelineStep.java new file mode 100644 index 000000000..14f51afa8 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/workflow/AbstractCreatePipelineStep.java @@ -0,0 +1,161 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.ExceptionsHelper; +import org.opensearch.action.ingest.PutPipelineRequest; +import org.opensearch.action.search.PutSearchPipelineRequest; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.client.Client; +import org.opensearch.client.ClusterAdminClient; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; +import org.opensearch.flowframework.util.ParseUtils; + +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.Set; + +import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS; +import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID; +import static org.opensearch.flowframework.common.WorkflowResources.PIPELINE_ID; +import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep; + +/** + * Step to create either a search or ingest pipeline + */ +public abstract class AbstractCreatePipelineStep implements WorkflowStep { + private static final Logger logger = LogManager.getLogger(AbstractCreatePipelineStep.class); + + // Client to store a pipeline in the cluster state + private final ClusterAdminClient clusterAdminClient; + + private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler; + + /** + * Instantiates a new AbstractCreatePipelineStep + * @param client The client to create a pipeline and store workflow data into the global context index + * @param flowFrameworkIndicesHandler FlowFrameworkIndicesHandler class to update system indices + */ + protected AbstractCreatePipelineStep(Client client, FlowFrameworkIndicesHandler flowFrameworkIndicesHandler) { + this.clusterAdminClient = client.admin().cluster(); + this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler; + } + + @Override + public PlainActionFuture execute( + String currentNodeId, + WorkflowData currentNodeInputs, + Map outputs, + Map previousNodeInputs, + Map params + ) { + + PlainActionFuture createPipelineFuture = PlainActionFuture.newFuture(); + + Set requiredKeys = Set.of(PIPELINE_ID, CONFIGURATIONS); + + // currently, we are supporting an optional param of model ID into the various processors + Set optionalKeys = Set.of(MODEL_ID); + + try { + Map inputs = ParseUtils.getInputsFromPreviousSteps( + requiredKeys, + optionalKeys, + currentNodeInputs, + outputs, + previousNodeInputs, + params + ); + + String pipelineId = (String) inputs.get(PIPELINE_ID); + String configurations = (String) inputs.get(CONFIGURATIONS); + + byte[] byteArr = configurations.getBytes(StandardCharsets.UTF_8); + BytesReference configurationsBytes = new BytesArray(byteArr); + + String pipelineToBeCreated = this.getName(); + ActionListener putPipelineActionListener = new ActionListener<>() { + + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + String resourceName = getResourceByWorkflowStep(getName()); + try { + flowFrameworkIndicesHandler.updateResourceInStateIndex( + currentNodeInputs.getWorkflowId(), + currentNodeId, + getName(), + pipelineId, + ActionListener.wrap(updateResponse -> { + logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex()); + // PutPipelineRequest returns only an AcknowledgeResponse, saving pipelineId instead + // TODO: revisit this concept of pipeline_id to be consistent with what makes most sense to end user here + createPipelineFuture.onResponse( + new WorkflowData( + Map.of(resourceName, pipelineId), + currentNodeInputs.getWorkflowId(), + currentNodeInputs.getNodeId() + ) + ); + }, exception -> { + String errorMessage = "Failed to update new created " + + currentNodeId + + " resource " + + getName() + + " id " + + pipelineId; + logger.error(errorMessage, exception); + createPipelineFuture.onFailure( + new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)) + ); + }) + ); + + } catch (Exception e) { + String errorMessage = "Failed to parse and update new created resource"; + logger.error(errorMessage, e); + createPipelineFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + } + } + + @Override + public void onFailure(Exception e) { + String errorMessage = "Failed step " + pipelineToBeCreated; + logger.error(errorMessage, e); + createPipelineFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + } + + }; + + if (pipelineToBeCreated.equals(CreateSearchPipelineStep.NAME)) { + PutSearchPipelineRequest putSearchPipelineRequest = new PutSearchPipelineRequest( + pipelineId, + configurationsBytes, + XContentType.JSON + ); + clusterAdminClient.putSearchPipeline(putSearchPipelineRequest, putPipelineActionListener); + } else { + PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, configurationsBytes, XContentType.JSON); + clusterAdminClient.putPipeline(putPipelineRequest, putPipelineActionListener); + } + + } catch (FlowFrameworkException e) { + createPipelineFuture.onFailure(e); + } + return createPipelineFuture; + + } +} diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java index 9d840573c..c89b25d16 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStep.java @@ -10,135 +10,25 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.ExceptionsHelper; -import org.opensearch.action.ingest.PutPipelineRequest; -import org.opensearch.action.support.PlainActionFuture; import org.opensearch.client.Client; -import org.opensearch.client.ClusterAdminClient; -import org.opensearch.common.xcontent.XContentType; -import org.opensearch.core.action.ActionListener; -import org.opensearch.core.common.bytes.BytesArray; -import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; -import org.opensearch.flowframework.util.ParseUtils; - -import java.nio.charset.StandardCharsets; -import java.util.Map; -import java.util.Set; - -import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS; -import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID; -import static org.opensearch.flowframework.common.WorkflowResources.PIPELINE_ID; -import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep; /** * Step to create an ingest pipeline */ -public class CreateIngestPipelineStep implements WorkflowStep { +public class CreateIngestPipelineStep extends AbstractCreatePipelineStep { private static final Logger logger = LogManager.getLogger(CreateIngestPipelineStep.class); /** The name of this step, used as a key in the template and the {@link WorkflowStepFactory} */ public static final String NAME = "create_ingest_pipeline"; - // Client to store a pipeline in the cluster state - private final ClusterAdminClient clusterAdminClient; - - private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler; - /** * Instantiates a new CreateIngestPipelineStep * @param client The client to create a pipeline and store workflow data into the global context index * @param flowFrameworkIndicesHandler FlowFrameworkIndicesHandler class to update system indices */ public CreateIngestPipelineStep(Client client, FlowFrameworkIndicesHandler flowFrameworkIndicesHandler) { - this.clusterAdminClient = client.admin().cluster(); - this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler; - } - - @Override - public PlainActionFuture execute( - String currentNodeId, - WorkflowData currentNodeInputs, - Map outputs, - Map previousNodeInputs, - Map params - ) { - - PlainActionFuture createIngestPipelineFuture = PlainActionFuture.newFuture(); - - Set requiredKeys = Set.of(PIPELINE_ID, CONFIGURATIONS); - - // currently, we are supporting an optional param of model ID into the various processors - Set optionalKeys = Set.of(MODEL_ID); - - try { - Map inputs = ParseUtils.getInputsFromPreviousSteps( - requiredKeys, - optionalKeys, - currentNodeInputs, - outputs, - previousNodeInputs, - params - ); - - String pipelineId = (String) inputs.get(PIPELINE_ID); - String configurations = (String) inputs.get(CONFIGURATIONS); - - byte[] byteArr = configurations.getBytes(StandardCharsets.UTF_8); - BytesReference configurationsBytes = new BytesArray(byteArr); - - // Create PutPipelineRequest and execute - PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, configurationsBytes, XContentType.JSON); - clusterAdminClient.putPipeline(putPipelineRequest, ActionListener.wrap(acknowledgedResponse -> { - String resourceName = getResourceByWorkflowStep(getName()); - try { - flowFrameworkIndicesHandler.updateResourceInStateIndex( - currentNodeInputs.getWorkflowId(), - currentNodeId, - getName(), - pipelineId, - ActionListener.wrap(updateResponse -> { - logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex()); - // PutPipelineRequest returns only an AcknowledgeResponse, saving pipelineId instead - // TODO: revisit this concept of pipeline_id to be consistent with what makes most sense to end user here - createIngestPipelineFuture.onResponse( - new WorkflowData( - Map.of(resourceName, pipelineId), - currentNodeInputs.getWorkflowId(), - currentNodeInputs.getNodeId() - ) - ); - }, exception -> { - String errorMessage = "Failed to update new created " - + currentNodeId - + " resource " - + getName() - + " id " - + pipelineId; - logger.error(errorMessage, exception); - createIngestPipelineFuture.onFailure( - new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)) - ); - }) - ); - - } catch (Exception e) { - String errorMessage = "Failed to parse and update new created resource"; - logger.error(errorMessage, e); - createIngestPipelineFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); - } - }, e -> { - String errorMessage = "Failed to create ingest pipeline"; - logger.error(errorMessage, e); - createIngestPipelineFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); - })); - - } catch (FlowFrameworkException e) { - createIngestPipelineFuture.onFailure(e); - } - - return createIngestPipelineFuture; + super(client, flowFrameworkIndicesHandler); } @Override diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateSearchPipelineStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateSearchPipelineStep.java new file mode 100644 index 000000000..1dc8fc745 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateSearchPipelineStep.java @@ -0,0 +1,38 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.client.Client; +import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; + +/** + * Step to create a search pipeline + */ +public class CreateSearchPipelineStep extends AbstractCreatePipelineStep { + private static final Logger logger = LogManager.getLogger(CreateSearchPipelineStep.class); + + /** The name of this step, used as a key in the template and the {@link WorkflowStepFactory} */ + public static final String NAME = "create_search_pipeline"; + + /** + * Instantiates a new CreateSearchPipelineStep + * @param client The client to create a pipeline and store workflow data into the global context index + * @param flowFrameworkIndicesHandler FlowFrameworkIndicesHandler class to update system indices + */ + public CreateSearchPipelineStep(Client client, FlowFrameworkIndicesHandler flowFrameworkIndicesHandler) { + super(client, flowFrameworkIndicesHandler); + } + + @Override + public String getName() { + return NAME; + } +} diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java index b8b736890..11bc84d23 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java @@ -108,6 +108,7 @@ public WorkflowStepFactory( stepMap.put(RegisterAgentStep.NAME, () -> new RegisterAgentStep(mlClient, flowFrameworkIndicesHandler)); stepMap.put(DeleteAgentStep.NAME, () -> new DeleteAgentStep(mlClient)); stepMap.put(CreateIngestPipelineStep.NAME, () -> new CreateIngestPipelineStep(client, flowFrameworkIndicesHandler)); + stepMap.put(CreateSearchPipelineStep.NAME, () -> new CreateSearchPipelineStep(client, flowFrameworkIndicesHandler)); } /** @@ -211,6 +212,15 @@ public enum WorkflowSteps { List.of(PIPELINE_ID), Collections.emptyList(), null + ), + + /** Create Ingest Pipeline Step */ + CREATE_SEARCH_PIPELINE( + CreateSearchPipelineStep.NAME, + List.of(PIPELINE_ID, CONFIGURATIONS), + List.of(PIPELINE_ID), + Collections.emptyList(), + null ); private final String workflowStepName; diff --git a/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java b/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java index 6b1841708..3266820e2 100644 --- a/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java +++ b/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java @@ -46,7 +46,7 @@ public void testParseWorkflowValidator() throws IOException { WorkflowValidator validator = new WorkflowValidator(workflowStepValidators); - assertEquals(15, validator.getWorkflowStepValidators().size()); + assertEquals(16, validator.getWorkflowStepValidators().size()); assertTrue(validator.getWorkflowStepValidators().keySet().contains("create_connector")); assertEquals(7, validator.getWorkflowStepValidators().get("create_connector").getInputs().size()); diff --git a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java index 255ac39f7..69391cbd1 100644 --- a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java +++ b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java @@ -346,10 +346,10 @@ public void testTimestamps() throws Exception { assertEquals(RestStatus.OK.getStatus(), response.getStatusLine().getStatusCode()); } - public void testCreateAndProvisionIngestPipeline() throws Exception { + public void testCreateAndProvisionIngestAndSearchPipeline() throws Exception { // Using a 3 step template to create a connector, register remote model and deploy model - Template template = TestHelpers.createTemplateFromFile("ingest-pipeline-template.json"); + Template template = TestHelpers.createTemplateFromFile("ingest-search-pipeline-template.json"); // Hit Create Workflow API with original template Response response = createWorkflow(client(), template); @@ -373,16 +373,24 @@ public void testCreateAndProvisionIngestPipeline() throws Exception { // Wait until provisioning has completed successfully before attempting to retrieve created resources List resourcesCreated = getResourcesCreated(client(), workflowId, 30); - // This template should create 4 resources, connector_id, registered model_id, deployed model_id and pipelineId - assertEquals(4, resourcesCreated.size()); - assertEquals("create_connector", resourcesCreated.get(0).workflowStepName()); - assertNotNull(resourcesCreated.get(0).resourceId()); - assertEquals("register_remote_model", resourcesCreated.get(1).workflowStepName()); - assertNotNull(resourcesCreated.get(1).resourceId()); - assertEquals("deploy_model", resourcesCreated.get(2).workflowStepName()); - assertNotNull(resourcesCreated.get(2).resourceId()); - assertEquals("create_ingest_pipeline", resourcesCreated.get(3).workflowStepName()); - assertNotNull(resourcesCreated.get(3).resourceId()); + List expectedStepNames = List.of( + "create_connector", + "register_remote_model", + "deploy_model", + "create_search_pipeline", + "create_ingest_pipeline" + ); + + List workflowStepNames = resourcesCreated.stream() + .peek(resourceCreated -> assertNotNull(resourceCreated.resourceId())) + .map(ResourceCreated::workflowStepName) + .collect(Collectors.toList()); + for (String expectedName : expectedStepNames) { + assertTrue(workflowStepNames.contains(expectedName)); + } + + // This template should create 5 resources, connector_id, registered model_id, deployed model_id and pipelineId + assertEquals(5, resourcesCreated.size()); String modelId = resourcesCreated.get(2).resourceId(); GetPipelineResponse getPipelinesResponse = getPipelines(); diff --git a/src/test/java/org/opensearch/flowframework/util/ParseUtilsTests.java b/src/test/java/org/opensearch/flowframework/util/ParseUtilsTests.java index 5148f9251..06aaf45d9 100644 --- a/src/test/java/org/opensearch/flowframework/util/ParseUtilsTests.java +++ b/src/test/java/org/opensearch/flowframework/util/ParseUtilsTests.java @@ -82,6 +82,12 @@ public void testBuildAndParseStringToStringMap() throws IOException { assertEquals(stringMap.get("one"), parsedMap.get("one")); } + public void testParseArbitraryStringToObjectMapToString() throws IOException { + Map map = Map.ofEntries(Map.entry("test-1", Map.of("test-1", "test-1"))); + String parsedMap = ParseUtils.parseArbitraryStringToObjectMapToString(map); + assertEquals("{\"test-1\":{\"test-1\":\"test-1\"}}", parsedMap); + } + public void testGetInputsFromPreviousSteps() { WorkflowData currentNodeInputs = new WorkflowData( Map.ofEntries( diff --git a/src/test/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStepTests.java index f8c9402d0..eb254ae0b 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/CreateIngestPipelineStepTests.java @@ -122,13 +122,13 @@ public void testCreateIngestPipelineStepFailure() throws InterruptedException { // Mock put pipeline request execution and return false verify(clusterAdminClient, times(1)).putPipeline(any(PutPipelineRequest.class), actionListenerCaptor.capture()); - actionListenerCaptor.getValue().onFailure(new Exception("Failed to create ingest pipeline")); + actionListenerCaptor.getValue().onFailure(new Exception("Failed step create_ingest_pipeline")); assertTrue(future.isDone()); ExecutionException exception = assertThrows(ExecutionException.class, () -> future.get()); assertTrue(exception.getCause() instanceof Exception); - assertEquals("Failed to create ingest pipeline", exception.getCause().getMessage()); + assertEquals("Failed step create_ingest_pipeline", exception.getCause().getMessage()); } public void testMissingData() throws InterruptedException { diff --git a/src/test/java/org/opensearch/flowframework/workflow/CreateSearchPipelineStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/CreateSearchPipelineStepTests.java new file mode 100644 index 000000000..737b28a4c --- /dev/null +++ b/src/test/java/org/opensearch/flowframework/workflow/CreateSearchPipelineStepTests.java @@ -0,0 +1,166 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import org.opensearch.action.search.PutSearchPipelineRequest; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.action.update.UpdateResponse; +import org.opensearch.client.AdminClient; +import org.opensearch.client.Client; +import org.opensearch.client.ClusterAdminClient; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import org.mockito.ArgumentCaptor; + +import static org.opensearch.action.DocWriteResponse.Result.UPDATED; +import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS; +import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX; +import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID; +import static org.opensearch.flowframework.common.WorkflowResources.PIPELINE_ID; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@SuppressWarnings("deprecation") +public class CreateSearchPipelineStepTests extends OpenSearchTestCase { + + private WorkflowData inputData; + private WorkflowData outpuData; + private Client client; + private AdminClient adminClient; + private ClusterAdminClient clusterAdminClient; + private FlowFrameworkIndicesHandler flowFrameworkIndicesHandler; + + @Override + public void setUp() throws Exception { + super.setUp(); + this.flowFrameworkIndicesHandler = mock(FlowFrameworkIndicesHandler.class); + + String configurations = + "{\"response_processors\":[{\"retrieval_augmented_generation\":{\"context_field_list\":[\"text\"],\"user_instructions\":\"Generate a concise and informative answer in less than 100 words for the given question\",\"description\":\"Demo pipeline Using OpenAI Connector\",\"tag\":\"openai_pipeline_demo\",\"model_id\":\"tbFoNI4BW58L8XKV4RF3\",\"system_prompt\":\"You are a helpful assistant\"}}]}"; + inputData = new WorkflowData( + Map.ofEntries(Map.entry(CONFIGURATIONS, configurations), Map.entry(PIPELINE_ID, "pipelineId")), + "test-id", + "test-node-id" + ); + + // Set output data to returned pipelineId + outpuData = new WorkflowData(Map.ofEntries(Map.entry(PIPELINE_ID, "pipelineId")), "test-id", "test-node-id"); + + client = mock(Client.class); + adminClient = mock(AdminClient.class); + clusterAdminClient = mock(ClusterAdminClient.class); + + when(client.admin()).thenReturn(adminClient); + when(adminClient.cluster()).thenReturn(clusterAdminClient); + } + + public void testCreateSearchPipelineStep() throws InterruptedException, ExecutionException, IOException { + + CreateSearchPipelineStep createSearchPipelineStep = new CreateSearchPipelineStep(client, flowFrameworkIndicesHandler); + + doAnswer(invocation -> { + ActionListener updateResponseListener = invocation.getArgument(4); + updateResponseListener.onResponse(new UpdateResponse(new ShardId(WORKFLOW_STATE_INDEX, "", 1), "id", -2, 0, 0, UPDATED)); + return null; + }).when(flowFrameworkIndicesHandler).updateResourceInStateIndex(anyString(), anyString(), anyString(), anyString(), any()); + + @SuppressWarnings("unchecked") + ArgumentCaptor> actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class); + PlainActionFuture future = createSearchPipelineStep.execute( + inputData.getNodeId(), + inputData, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap() + ); + + assertFalse(future.isDone()); + + // Mock put pipeline request execution and return true + verify(clusterAdminClient, times(1)).putSearchPipeline(any(PutSearchPipelineRequest.class), actionListenerCaptor.capture()); + actionListenerCaptor.getValue().onResponse(new AcknowledgedResponse(true)); + + assertTrue(future.isDone()); + assertEquals(outpuData.getContent(), future.get().getContent()); + } + + public void testCreateSearchPipelineStepFailure() throws InterruptedException { + + CreateSearchPipelineStep createSearchPipelineStep = new CreateSearchPipelineStep(client, flowFrameworkIndicesHandler); + + @SuppressWarnings("unchecked") + ArgumentCaptor> actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class); + PlainActionFuture future = createSearchPipelineStep.execute( + inputData.getNodeId(), + inputData, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap() + ); + + assertFalse(future.isDone()); + + // Mock put pipeline request execution and return false + verify(clusterAdminClient, times(1)).putSearchPipeline(any(PutSearchPipelineRequest.class), actionListenerCaptor.capture()); + actionListenerCaptor.getValue().onFailure(new Exception("Failed step create_search_pipeline")); + + assertTrue(future.isDone()); + + ExecutionException exception = assertThrows(ExecutionException.class, () -> future.get()); + assertTrue(exception.getCause() instanceof Exception); + assertEquals("Failed step create_search_pipeline", exception.getCause().getMessage()); + } + + public void testMissingData() throws InterruptedException { + CreateSearchPipelineStep createSearchPipelineStep = new CreateSearchPipelineStep(client, flowFrameworkIndicesHandler); + + // Data with missing input and output fields + WorkflowData incorrectData = new WorkflowData( + Map.ofEntries( + Map.entry("id", PIPELINE_ID), + Map.entry("description", "some description"), + Map.entry("type", "text_embedding"), + Map.entry(MODEL_ID, MODEL_ID) + ), + "test-id", + "test-node-id" + ); + + PlainActionFuture future = createSearchPipelineStep.execute( + incorrectData.getNodeId(), + incorrectData, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap() + ); + assertTrue(future.isDone()); + + ExecutionException exception = assertThrows(ExecutionException.class, () -> future.get()); + assertTrue(exception.getCause() instanceof Exception); + assertEquals( + "Missing required inputs [configurations, pipeline_id] in workflow [test-id] node [test-node-id]", + exception.getCause().getMessage() + ); + } + +} diff --git a/src/test/resources/template/ingest-pipeline-template.json b/src/test/resources/template/ingest-search-pipeline-template.json similarity index 76% rename from src/test/resources/template/ingest-pipeline-template.json rename to src/test/resources/template/ingest-search-pipeline-template.json index b5ee4d19d..35f6b7f86 100644 --- a/src/test/resources/template/ingest-pipeline-template.json +++ b/src/test/resources/template/ingest-search-pipeline-template.json @@ -81,6 +81,31 @@ ] } } + }, + { + "id": "create_search_pipeline", + "type": "create_search_pipeline", + "previous_node_inputs": { + "deploy_openai_model": "model_id" + }, + "user_inputs": { + "pipeline_id": "rag_pipeline", + "configurations": { + "request_processors": [ + { + "filter_query" : { + "tag" : "tag1", + "description" : "This processor is going to restrict to publicly visible documents", + "query" : { + "term": { + "visibility": "public" + } + } + } + } + ] + } + } } ] }