Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Create index workflow step #574

Merged
merged 5 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)

## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.12...2.x)
### Features
- Adding create ingest pipeline step ([#558](https://github.com/opensearch-project/flow-framework/pull/558))
- Adding create search pipeline step ([#569](https://github.com/opensearch-project/flow-framework/pull/569))
- Added create ingest pipeline step ([#558](https://github.com/opensearch-project/flow-framework/pull/558))
- Added create search pipeline step ([#569](https://github.com/opensearch-project/flow-framework/pull/569))
- Added create and delete index step ([#574](https://github.com/opensearch-project/flow-framework/pull/574))

### Enhancements
- Substitute REST path or body parameters in Workflow Steps ([#525](https://github.com/opensearch-project/flow-framework/pull/525))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.flowframework.workflow.CreateSearchPipelineStep;
import org.opensearch.flowframework.workflow.DeleteAgentStep;
import org.opensearch.flowframework.workflow.DeleteConnectorStep;
import org.opensearch.flowframework.workflow.DeleteIndexStep;
import org.opensearch.flowframework.workflow.DeleteModelStep;
import org.opensearch.flowframework.workflow.DeployModelStep;
import org.opensearch.flowframework.workflow.NoOpStep;
Expand Down Expand Up @@ -57,7 +58,7 @@ public enum WorkflowResources {
/** Workflow steps for creating an ingest-pipeline and associated created resource */
CREATE_SEARCH_PIPELINE(CreateSearchPipelineStep.NAME, WorkflowResources.PIPELINE_ID, null), // TODO delete step
/** Workflow steps for creating an index and associated created resource */
CREATE_INDEX(CreateIndexStep.NAME, WorkflowResources.INDEX_NAME, null), // TODO delete step
CREATE_INDEX(CreateIndexStep.NAME, WorkflowResources.INDEX_NAME, DeleteIndexStep.NAME),
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
/** Workflow steps for registering/deleting an agent and the associated created resource */
REGISTER_AGENT(RegisterAgentStep.NAME, WorkflowResources.AGENT_ID, DeleteAgentStep.NAME);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,24 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.Set;

import static org.opensearch.flowframework.common.CommonValue.DEFAULT_MAPPING_OPTION;
import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS;
import static org.opensearch.flowframework.common.WorkflowResources.INDEX_NAME;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;

/**
Expand All @@ -37,23 +38,19 @@
public class CreateIndexStep implements WorkflowStep {

private static final Logger logger = LogManager.getLogger(CreateIndexStep.class);
private final ClusterService clusterService;
private final Client client;
private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;

/** The name of this step, used as a key in the template and the {@link WorkflowStepFactory} */
public static final String NAME = "create_index";
static Map<String, AtomicBoolean> indexMappingUpdated = new HashMap<>();

/**
* Instantiate this class
*
* @param clusterService The OpenSearch cluster service
* @param client Client to create an index
* @param flowFrameworkIndicesHandler FlowFrameworkIndicesHandler class to update system indices
*/
public CreateIndexStep(ClusterService clusterService, Client client, FlowFrameworkIndicesHandler flowFrameworkIndicesHandler) {
this.clusterService = clusterService;
public CreateIndexStep(Client client, FlowFrameworkIndicesHandler flowFrameworkIndicesHandler) {
this.client = client;
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
}
Expand All @@ -67,89 +64,66 @@
Map<String, String> params
) {
PlainActionFuture<WorkflowData> createIndexFuture = PlainActionFuture.newFuture();
ActionListener<CreateIndexResponse> actionListener = new ActionListener<>() {

@Override
public void onResponse(CreateIndexResponse createIndexResponse) {
Set<String> requiredKeys = Set.of(INDEX_NAME, CONFIGURATIONS);

Set<String> optionalKeys = Collections.emptySet();

try {
Map<String, Object> inputs = ParseUtils.getInputsFromPreviousSteps(
requiredKeys,
optionalKeys,
currentNodeInputs,
outputs,
previousNodeInputs,
params
);

String indexName = (String) inputs.get(INDEX_NAME);

String configurations = (String) inputs.get(CONFIGURATIONS);

byte[] byteArr = configurations.getBytes(StandardCharsets.UTF_8);
BytesReference configurationsBytes = new BytesArray(byteArr);

CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).source(configurationsBytes, XContentType.JSON);
client.admin().indices().create(createIndexRequest, ActionListener.wrap(acknowledgedResponse -> {
String resourceName = getResourceByWorkflowStep(getName());
logger.info("Created index: {}", indexName);
try {
String resourceName = getResourceByWorkflowStep(getName());
logger.info("created index: {}", createIndexResponse.index());
flowFrameworkIndicesHandler.updateResourceInStateIndex(
currentNodeInputs.getWorkflowId(),
currentNodeId,
getName(),
createIndexResponse.index(),
indexName,
ActionListener.wrap(response -> {
logger.info("successfully updated resource created in state index: {}", response.getIndex());
createIndexFuture.onResponse(
new WorkflowData(
Map.of(resourceName, createIndexResponse.index()),
currentNodeInputs.getWorkflowId(),
currentNodeId
)
new WorkflowData(Map.of(resourceName, indexName), currentNodeInputs.getWorkflowId(), currentNodeId)
);
}, exception -> {
String errorMessage = "Failed to update new created "
+ currentNodeId
+ " resource "
+ getName()
+ " id "
+ createIndexResponse.index();
+ indexName;
logger.error(errorMessage, exception);
createIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));
})
);
} catch (Exception e) {
} catch (IOException ex) {

Check warning on line 115 in src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java#L115

Added line #L115 was not covered by tests
String errorMessage = "Failed to parse and update new created resource";
logger.error(errorMessage, e);
createIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
logger.error(errorMessage, ex);
createIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(ex)));

Check warning on line 118 in src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java#L117-L118

Added lines #L117 - L118 were not covered by tests
}
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to create an index";
}, e -> {
String errorMessage = "Failed to create the index " + indexName;
logger.error(errorMessage, e);
createIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
};

String index = null;
String defaultMappingOption = null;
Settings settings = null;

// TODO: Recreating the list to get this compiling
// Need to refactor the below iteration to pull directly from the maps
List<WorkflowData> data = new ArrayList<>();
data.add(currentNodeInputs);
data.addAll(outputs.values());

try {
for (WorkflowData workflowData : data) {
Map<String, Object> content = workflowData.getContent();
index = (String) content.get(getResourceByWorkflowStep(getName()));
defaultMappingOption = (String) content.get(DEFAULT_MAPPING_OPTION);
if (index != null && defaultMappingOption != null && settings != null) {
break;
}
}
} catch (Exception e) {
String errorMessage = "Failed to find the correct resource for the workflow step " + NAME;
logger.error(errorMessage, e);
createIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}

// TODO:
// 1. Create settings based on the index settings received from content

try {
CreateIndexRequest request = new CreateIndexRequest(index).mapping(
FlowFrameworkIndicesHandler.getIndexMappings("mappings/" + defaultMappingOption + ".json"),
JsonXContent.jsonXContent.mediaType()
);
client.admin().indices().create(request, actionListener);
}));
} catch (Exception e) {
logger.error("Failed to find the right mapping for the index", e);
createIndexFuture.onFailure(e);

Check warning on line 126 in src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java#L126

Added line #L126 was not covered by tests
amitgalitz marked this conversation as resolved.
Show resolved Hide resolved
}

return createIndexFuture;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.workflow;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.util.ParseUtils;

import java.util.Collections;
import java.util.Map;
import java.util.Set;

import static org.opensearch.flowframework.common.WorkflowResources.INDEX_NAME;

/**
* Step to delete an index
*/
public class DeleteIndexStep implements WorkflowStep {
private static final Logger logger = LogManager.getLogger(DeleteConnectorStep.class);

private final Client client;

/** The name of this step, used as a key in the template and the {@link WorkflowStepFactory} */
public static final String NAME = "delete_index";

/**
* Instantiate this class
*
* @param client Client to create an index
*/
public DeleteIndexStep(Client client) {
this.client = client;
}

@Override
public PlainActionFuture<WorkflowData> execute(
String currentNodeId,
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
) {

PlainActionFuture<WorkflowData> deleteIndexFuture = PlainActionFuture.newFuture();

Set<String> requiredKeys = Set.of(INDEX_NAME);
Set<String> optionalKeys = Collections.emptySet();

try {
Map<String, Object> inputs = ParseUtils.getInputsFromPreviousSteps(
requiredKeys,
optionalKeys,
currentNodeInputs,
outputs,
previousNodeInputs,
params
);
String indexName = (String) inputs.get(INDEX_NAME);

DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
client.admin().indices().delete(deleteIndexRequest, new ActionListener<AcknowledgedResponse>() {
amitgalitz marked this conversation as resolved.
Show resolved Hide resolved
@Override
public void onResponse(AcknowledgedResponse response) {
logger.info("Deleted index: {}", indexName);
deleteIndexFuture.onResponse(
new WorkflowData(
Map.ofEntries(Map.entry(INDEX_NAME, indexName)),
currentNodeInputs.getWorkflowId(),
currentNodeInputs.getNodeId()
)
);
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to delete the index " + indexName;
logger.error(errorMessage, e);
deleteIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
});
} catch (FlowFrameworkException e) {
deleteIndexFuture.onFailure(e);

Check warning on line 95 in src/main/java/org/opensearch/flowframework/workflow/DeleteIndexStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/DeleteIndexStep.java#L94-L95

Added lines #L94 - L95 were not covered by tests
}

return deleteIndexFuture;
}

@Override
public String getName() {
return NAME;

Check warning on line 103 in src/main/java/org/opensearch/flowframework/workflow/DeleteIndexStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/DeleteIndexStep.java#L103

Added line #L103 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import static org.opensearch.flowframework.common.CommonValue.VERSION_FIELD;
import static org.opensearch.flowframework.common.WorkflowResources.AGENT_ID;
import static org.opensearch.flowframework.common.WorkflowResources.CONNECTOR_ID;
import static org.opensearch.flowframework.common.WorkflowResources.INDEX_NAME;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_GROUP_ID;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;

Expand Down Expand Up @@ -82,6 +83,8 @@ public WorkflowStepFactory(
Client client
) {
stepMap.put(NoOpStep.NAME, NoOpStep::new);
stepMap.put(CreateIndexStep.NAME, () -> new CreateIndexStep(client, flowFrameworkIndicesHandler));
stepMap.put(DeleteIndexStep.NAME, () -> new DeleteIndexStep(client));
stepMap.put(
RegisterLocalCustomModelStep.NAME,
() -> new RegisterLocalCustomModelStep(threadPool, mlClient, flowFrameworkIndicesHandler, flowFrameworkSettings)
Expand Down Expand Up @@ -120,6 +123,9 @@ public enum WorkflowSteps {
/** Noop Step */
NOOP("noop", Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), null),

/** Create Index Step */
CREATE_INDEX(CreateIndexStep.NAME, List.of(INDEX_NAME, CONFIGURATIONS), List.of(INDEX_NAME), Collections.emptyList(), null),

/** Create Connector Step */
CREATE_CONNECTOR(
CreateConnectorStep.NAME,
Expand Down Expand Up @@ -184,6 +190,9 @@ public enum WorkflowSteps {
null
),

/** Delete Index Step */
DELETE_INDEX(DeleteIndexStep.NAME, List.of(INDEX_NAME), List.of(INDEX_NAME), Collections.emptyList(), null),

/** Deploy Model Step */
DEPLOY_MODEL(DeployModelStep.NAME, List.of(MODEL_ID), List.of(MODEL_ID), List.of(OPENSEARCH_ML), TimeValue.timeValueSeconds(15)),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void testParseWorkflowValidator() throws IOException {

WorkflowValidator validator = new WorkflowValidator(workflowStepValidators);

assertEquals(16, validator.getWorkflowStepValidators().size());
assertEquals(18, validator.getWorkflowStepValidators().size());

assertTrue(validator.getWorkflowStepValidators().keySet().contains("create_connector"));
assertEquals(7, validator.getWorkflowStepValidators().get("create_connector").getInputs().size());
Expand Down
Loading
Loading