diff --git a/CHANGELOG.md b/CHANGELOG.md index 00c9b712f..02916436a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,8 +14,9 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) ## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.12...2.x) ### Features -- Adding create ingest pipeline step ([#558](https://github.com/opensearch-project/flow-framework/pull/558)) -- Adding create search pipeline step ([#569](https://github.com/opensearch-project/flow-framework/pull/569)) +- Added create ingest pipeline step ([#558](https://github.com/opensearch-project/flow-framework/pull/558)) +- Added create search pipeline step ([#569](https://github.com/opensearch-project/flow-framework/pull/569)) +- Added create index step ([#574](https://github.com/opensearch-project/flow-framework/pull/574)) ### Enhancements - Substitute REST path or body parameters in Workflow Steps ([#525](https://github.com/opensearch-project/flow-framework/pull/525)) diff --git a/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java b/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java index c1303adc0..a024ec3b8 100644 --- a/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java +++ b/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java @@ -57,7 +57,7 @@ public enum WorkflowResources { /** Workflow steps for creating an ingest-pipeline and associated created resource */ CREATE_SEARCH_PIPELINE(CreateSearchPipelineStep.NAME, WorkflowResources.PIPELINE_ID, null), // TODO delete step /** Workflow steps for creating an index and associated created resource */ - CREATE_INDEX(CreateIndexStep.NAME, WorkflowResources.INDEX_NAME, null), // TODO delete step + CREATE_INDEX(CreateIndexStep.NAME, WorkflowResources.INDEX_NAME, NoOpStep.NAME), /** Workflow steps for registering/deleting an agent and the associated created resource */ REGISTER_AGENT(RegisterAgentStep.NAME, WorkflowResources.AGENT_ID, DeleteAgentStep.NAME); diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java index 11a41919c..509d4b417 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java @@ -12,23 +12,24 @@ import org.apache.logging.log4j.Logger; import org.opensearch.ExceptionsHelper; import org.opensearch.action.admin.indices.create.CreateIndexRequest; -import org.opensearch.action.admin.indices.create.CreateIndexResponse; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.client.Client; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.settings.Settings; -import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.flowframework.exception.FlowFrameworkException; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; +import org.opensearch.flowframework.util.ParseUtils; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Set; -import static org.opensearch.flowframework.common.CommonValue.DEFAULT_MAPPING_OPTION; +import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS; +import static org.opensearch.flowframework.common.WorkflowResources.INDEX_NAME; import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep; /** @@ -37,23 +38,19 @@ public class CreateIndexStep implements WorkflowStep { private static final Logger logger = LogManager.getLogger(CreateIndexStep.class); - private final ClusterService clusterService; private final Client client; private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler; /** The name of this step, used as a key in the template and the {@link WorkflowStepFactory} */ public static final String NAME = "create_index"; - static Map indexMappingUpdated = new HashMap<>(); /** * Instantiate this class * - * @param clusterService The OpenSearch cluster service * @param client Client to create an index * @param flowFrameworkIndicesHandler FlowFrameworkIndicesHandler class to update system indices */ - public CreateIndexStep(ClusterService clusterService, Client client, FlowFrameworkIndicesHandler flowFrameworkIndicesHandler) { - this.clusterService = clusterService; + public CreateIndexStep(Client client, FlowFrameworkIndicesHandler flowFrameworkIndicesHandler) { this.client = client; this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler; } @@ -67,26 +64,42 @@ public PlainActionFuture execute( Map params ) { PlainActionFuture createIndexFuture = PlainActionFuture.newFuture(); - ActionListener actionListener = new ActionListener<>() { - @Override - public void onResponse(CreateIndexResponse createIndexResponse) { + Set requiredKeys = Set.of(INDEX_NAME, CONFIGURATIONS); + + Set optionalKeys = Collections.emptySet(); + + try { + Map inputs = ParseUtils.getInputsFromPreviousSteps( + requiredKeys, + optionalKeys, + currentNodeInputs, + outputs, + previousNodeInputs, + params + ); + + String indexName = (String) inputs.get(INDEX_NAME); + + String configurations = (String) inputs.get(CONFIGURATIONS); + + byte[] byteArr = configurations.getBytes(StandardCharsets.UTF_8); + BytesReference configurationsBytes = new BytesArray(byteArr); + + CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).source(configurationsBytes, XContentType.JSON); + client.admin().indices().create(createIndexRequest, ActionListener.wrap(acknowledgedResponse -> { + String resourceName = getResourceByWorkflowStep(getName()); + logger.info("Created index: {}", indexName); try { - String resourceName = getResourceByWorkflowStep(getName()); - logger.info("created index: {}", createIndexResponse.index()); flowFrameworkIndicesHandler.updateResourceInStateIndex( currentNodeInputs.getWorkflowId(), currentNodeId, getName(), - createIndexResponse.index(), + indexName, ActionListener.wrap(response -> { logger.info("successfully updated resource created in state index: {}", response.getIndex()); createIndexFuture.onResponse( - new WorkflowData( - Map.of(resourceName, createIndexResponse.index()), - currentNodeInputs.getWorkflowId(), - currentNodeId - ) + new WorkflowData(Map.of(resourceName, indexName), currentNodeInputs.getWorkflowId(), currentNodeId) ); }, exception -> { String errorMessage = "Failed to update new created " @@ -94,62 +107,23 @@ public void onResponse(CreateIndexResponse createIndexResponse) { + " resource " + getName() + " id " - + createIndexResponse.index(); + + indexName; logger.error(errorMessage, exception); createIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))); }) ); - } catch (Exception e) { + } catch (IOException ex) { String errorMessage = "Failed to parse and update new created resource"; - logger.error(errorMessage, e); - createIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + logger.error(errorMessage, ex); + createIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(ex))); } - } - - @Override - public void onFailure(Exception e) { - String errorMessage = "Failed to create an index"; + }, e -> { + String errorMessage = "Failed to create the index " + indexName; logger.error(errorMessage, e); createIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); - } - }; - - String index = null; - String defaultMappingOption = null; - Settings settings = null; - - // TODO: Recreating the list to get this compiling - // Need to refactor the below iteration to pull directly from the maps - List data = new ArrayList<>(); - data.add(currentNodeInputs); - data.addAll(outputs.values()); - - try { - for (WorkflowData workflowData : data) { - Map content = workflowData.getContent(); - index = (String) content.get(getResourceByWorkflowStep(getName())); - defaultMappingOption = (String) content.get(DEFAULT_MAPPING_OPTION); - if (index != null && defaultMappingOption != null && settings != null) { - break; - } - } - } catch (Exception e) { - String errorMessage = "Failed to find the correct resource for the workflow step " + NAME; - logger.error(errorMessage, e); - createIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); - } - - // TODO: - // 1. Create settings based on the index settings received from content - - try { - CreateIndexRequest request = new CreateIndexRequest(index).mapping( - FlowFrameworkIndicesHandler.getIndexMappings("mappings/" + defaultMappingOption + ".json"), - JsonXContent.jsonXContent.mediaType() - ); - client.admin().indices().create(request, actionListener); + })); } catch (Exception e) { - logger.error("Failed to find the right mapping for the index", e); + createIndexFuture.onFailure(e); } return createIndexFuture; diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java index 82079454a..a47c7c5d8 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java @@ -54,6 +54,7 @@ import static org.opensearch.flowframework.common.CommonValue.VERSION_FIELD; import static org.opensearch.flowframework.common.WorkflowResources.AGENT_ID; import static org.opensearch.flowframework.common.WorkflowResources.CONNECTOR_ID; +import static org.opensearch.flowframework.common.WorkflowResources.INDEX_NAME; import static org.opensearch.flowframework.common.WorkflowResources.MODEL_GROUP_ID; import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID; @@ -82,6 +83,7 @@ public WorkflowStepFactory( Client client ) { stepMap.put(NoOpStep.NAME, NoOpStep::new); + stepMap.put(CreateIndexStep.NAME, () -> new CreateIndexStep(client, flowFrameworkIndicesHandler)); stepMap.put( RegisterLocalCustomModelStep.NAME, () -> new RegisterLocalCustomModelStep(threadPool, mlClient, flowFrameworkIndicesHandler, flowFrameworkSettings) @@ -120,6 +122,9 @@ public enum WorkflowSteps { /** Noop Step */ NOOP("noop", Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), null), + /** Create Index Step */ + CREATE_INDEX(CreateIndexStep.NAME, List.of(INDEX_NAME, CONFIGURATIONS), List.of(INDEX_NAME), Collections.emptyList(), null), + /** Create Connector Step */ CREATE_CONNECTOR( CreateConnectorStep.NAME, diff --git a/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java b/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java index 3266820e2..b5a87f5c2 100644 --- a/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java +++ b/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java @@ -46,7 +46,7 @@ public void testParseWorkflowValidator() throws IOException { WorkflowValidator validator = new WorkflowValidator(workflowStepValidators); - assertEquals(16, validator.getWorkflowStepValidators().size()); + assertEquals(17, validator.getWorkflowStepValidators().size()); assertTrue(validator.getWorkflowStepValidators().keySet().contains("create_connector")); assertEquals(7, validator.getWorkflowStepValidators().get("create_connector").getInputs().size()); diff --git a/src/test/java/org/opensearch/flowframework/workflow/CreateIndexStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/CreateIndexStepTests.java index 4e035149e..56a26429a 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/CreateIndexStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/CreateIndexStepTests.java @@ -15,11 +15,8 @@ import org.opensearch.client.AdminClient; import org.opensearch.client.Client; import org.opensearch.client.IndicesAdminClient; -import org.opensearch.cluster.ClusterName; -import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; -import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.action.ActionListener; @@ -30,16 +27,15 @@ import java.io.IOException; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicBoolean; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import static org.opensearch.action.DocWriteResponse.Result.UPDATED; +import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS; import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX; import static org.opensearch.flowframework.common.WorkflowResources.INDEX_NAME; @@ -59,10 +55,7 @@ public class CreateIndexStepTests extends OpenSearchTestCase { private CreateIndexStep createIndexStep; private ThreadContext threadContext; private Metadata metadata; - private Map indexMappingUpdated = new HashMap<>(); - @Mock - private ClusterService clusterService; @Mock private IndicesAdminClient indicesAdminClient; @Mock @@ -76,12 +69,14 @@ public void setUp() throws Exception { super.setUp(); this.flowFrameworkIndicesHandler = mock(FlowFrameworkIndicesHandler.class); MockitoAnnotations.openMocks(this); + String configurations = + "{\"settings\":{\"index\":{\"number_of_shards\":2,\"number_of_replicas\":1}},\"mappings\":{\"_doc\":{\"properties\":{\"age\":{\"type\":\"integer\"}}}},\"aliases\":{\"sample-alias1\":{}}}"; + inputData = new WorkflowData( - Map.ofEntries(Map.entry(INDEX_NAME, "demo"), Map.entry("default_mapping_option", "knn")), + Map.ofEntries(Map.entry(INDEX_NAME, "demo"), Map.entry(CONFIGURATIONS, configurations)), "test-id", "test-node-id" ); - clusterService = mock(ClusterService.class); client = mock(Client.class); adminClient = mock(AdminClient.class); metadata = mock(Metadata.class); @@ -92,11 +87,9 @@ public void setUp() throws Exception { when(threadPool.getThreadContext()).thenReturn(threadContext); when(client.admin()).thenReturn(adminClient); when(adminClient.indices()).thenReturn(indicesAdminClient); - when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("test cluster")).build()); when(metadata.indices()).thenReturn(Map.of(GLOBAL_CONTEXT_INDEX, indexMetadata)); - createIndexStep = new CreateIndexStep(clusterService, client, flowFrameworkIndicesHandler); - CreateIndexStep.indexMappingUpdated = indexMappingUpdated; + createIndexStep = new CreateIndexStep(client, flowFrameworkIndicesHandler); } public void testCreateIndexStep() throws ExecutionException, InterruptedException, IOException { @@ -145,6 +138,6 @@ public void testCreateIndexStepFailure() throws ExecutionException, InterruptedE assertTrue(future.isDone()); ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent()); assertTrue(ex.getCause() instanceof Exception); - assertEquals("Failed to create an index", ex.getCause().getMessage()); + assertEquals("Failed to create the index demo", ex.getCause().getMessage()); } }