Skip to content

Commit

Permalink
Initial commit for reindex workflow step with extra params
Browse files Browse the repository at this point in the history
Signed-off-by: owaiskazi19 <[email protected]>
  • Loading branch information
owaiskazi19 committed May 20, 2024
1 parent e3a8784 commit e2b6f88
Show file tree
Hide file tree
Showing 7 changed files with 399 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
### Enhancements
### Bug Fixes
- Add user mapping to Workflow State index ([#705](https://github.com/opensearch-project/flow-framework/pull/705))
- Add Workflow Step for Reindex from source index to destination ([#718](https://github.com/opensearch-project/flow-framework/pull/718))

### Infrastructure
### Documentation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,22 @@ private CommonValue() {}
public static final String DELAY_FIELD = "delay";
/** Model Interface Field */
public static final String INTERFACE_FIELD = "interface";

/** The reindex field for created resources */
public static final String RE_INDEX_FIELD = "reindex";
/** The source index field for reindex */
public static final String SOURCE_INDEX = "source_index";
/** The destination index field for reindex */
public static final String DESTINATION_INDEX = "destination_index";
/** The refresh field for reindex */
public static final String REFRESH = "refresh";
/** The requests_per_second field for reindex */
public static final String REQUESTS_PER_SECOND = "requests_per_second";
/** The require_alias field for reindex */
public static final String REQUIRE_ALIAS = "require_alias";
/** The slices field for reindex */
public static final String SLICES = "slices";
/** The max_docs field for reindex */
public static final String MAX_DOCS = "max_docs";
/*
* Constants associated with resource provisioning / state
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.flowframework.workflow.DeleteModelStep;
import org.opensearch.flowframework.workflow.DeployModelStep;
import org.opensearch.flowframework.workflow.NoOpStep;
import org.opensearch.flowframework.workflow.ReIndexStep;
import org.opensearch.flowframework.workflow.RegisterAgentStep;
import org.opensearch.flowframework.workflow.RegisterLocalCustomModelStep;
import org.opensearch.flowframework.workflow.RegisterLocalPretrainedModelStep;
Expand Down Expand Up @@ -58,6 +59,8 @@ public enum WorkflowResources {
CREATE_SEARCH_PIPELINE(CreateSearchPipelineStep.NAME, WorkflowResources.PIPELINE_ID, null), // TODO delete step
/** Workflow steps for creating an index and associated created resource */
CREATE_INDEX(CreateIndexStep.NAME, WorkflowResources.INDEX_NAME, NoOpStep.NAME),
/** Workflow steps for reindex a source index to destination index and associated created resource */
RE_INDEX(ReIndexStep.NAME, CommonValue.DESTINATION_INDEX, NoOpStep.NAME),
/** Workflow steps for registering/deleting an agent and the associated created resource */
REGISTER_AGENT(RegisterAgentStep.NAME, WorkflowResources.AGENT_ID, DeleteAgentStep.NAME);

Expand Down
175 changes: 175 additions & 0 deletions src/main/java/org/opensearch/flowframework/workflow/ReIndexStep.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.workflow;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.client.Client;
import org.opensearch.common.Booleans;
import org.opensearch.core.action.ActionListener;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.exception.WorkflowStepException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.index.reindex.BulkByScrollResponse;
import org.opensearch.index.reindex.ReindexAction;
import org.opensearch.index.reindex.ReindexRequest;

import java.util.Map;
import java.util.Set;

import static org.opensearch.flowframework.common.CommonValue.DESTINATION_INDEX;
import static org.opensearch.flowframework.common.CommonValue.MAX_DOCS;
import static org.opensearch.flowframework.common.CommonValue.REFRESH;
import static org.opensearch.flowframework.common.CommonValue.REQUESTS_PER_SECOND;
import static org.opensearch.flowframework.common.CommonValue.REQUIRE_ALIAS;
import static org.opensearch.flowframework.common.CommonValue.RE_INDEX_FIELD;
import static org.opensearch.flowframework.common.CommonValue.SLICES;
import static org.opensearch.flowframework.common.CommonValue.SOURCE_INDEX;

/**
* Step to reindex
*/
public class ReIndexStep implements WorkflowStep {

private static final Logger logger = LogManager.getLogger(ReIndexStep.class);
private final Client client;
private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;

/** The name of this step, used as a key in the template and the {@link WorkflowStepFactory} */
public static final String NAME = "reindex";

/**
* Instantiate this class
*
* @param client Client to create an index
* @param flowFrameworkIndicesHandler FlowFrameworkIndicesHandler class to update system indices
*/
public ReIndexStep(Client client, FlowFrameworkIndicesHandler flowFrameworkIndicesHandler) {
this.client = client;
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
}

@Override
public PlainActionFuture<WorkflowData> execute(
String currentNodeId,
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
) {

PlainActionFuture<WorkflowData> reIndexFuture = PlainActionFuture.newFuture();

Set<String> requiredKeys = Set.of(SOURCE_INDEX, DESTINATION_INDEX);

Set<String> optionalKeys = Set.of(REFRESH, REQUESTS_PER_SECOND, REQUIRE_ALIAS, SLICES, MAX_DOCS);

try {
Map<String, Object> inputs = ParseUtils.getInputsFromPreviousSteps(
requiredKeys,
optionalKeys,
currentNodeInputs,
outputs,
previousNodeInputs,
params
);

String sourceIndices = (String) inputs.get(SOURCE_INDEX);
String destinationIndex = (String) inputs.get(DESTINATION_INDEX);
Boolean refresh = inputs.containsKey(REFRESH) ? Booleans.parseBoolean(inputs.get(REFRESH).toString()) : null;
Integer requestsPerSecond = (Integer) inputs.get(REQUESTS_PER_SECOND);
Boolean requireAlias = inputs.containsKey(REQUIRE_ALIAS) ? Booleans.parseBoolean(inputs.get(REQUIRE_ALIAS).toString()) : null;
Integer slices = (Integer) inputs.get(SLICES);
Integer maxDocs = (Integer) inputs.get(MAX_DOCS);

ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(sourceIndices);
reindexRequest.setDestIndex(destinationIndex);
if (refresh != null) {
reindexRequest.setRefresh(refresh);
}
if (requestsPerSecond != null) {
reindexRequest.setRequestsPerSecond(requestsPerSecond);
}
if (requireAlias != null) {
reindexRequest.setRequireAlias(requireAlias);
}
if (maxDocs != null) {
reindexRequest.setMaxDocs(maxDocs);
}
if (slices != null) {
reindexRequest.setSlices(slices);
}

ActionListener<BulkByScrollResponse> actionListener = new ActionListener<>() {

@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {
logger.info("Reindex from source: {} to destination {}", sourceIndices, destinationIndex);
try {
if (bulkByScrollResponse.getBulkFailures().isEmpty() && bulkByScrollResponse.getSearchFailures().isEmpty()) {
flowFrameworkIndicesHandler.updateResourceInStateIndex(
currentNodeInputs.getWorkflowId(),
currentNodeId,
getName(),
destinationIndex,
ActionListener.wrap(response -> {
logger.info("successfully updated resource created in state index: {}", response.getIndex());

reIndexFuture.onResponse(
new WorkflowData(
Map.of(RE_INDEX_FIELD, Map.of(sourceIndices, destinationIndex)),
currentNodeInputs.getWorkflowId(),
currentNodeInputs.getNodeId()
)
);
}, exception -> {
String errorMessage = "Failed to update new reindexed"

Check warning on line 136 in src/main/java/org/opensearch/flowframework/workflow/ReIndexStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/ReIndexStep.java#L136

Added line #L136 was not covered by tests
+ currentNodeId
+ " resource "
+ getName()

Check warning on line 139 in src/main/java/org/opensearch/flowframework/workflow/ReIndexStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/ReIndexStep.java#L139

Added line #L139 was not covered by tests
+ " id "
+ destinationIndex;
logger.error(errorMessage, exception);
reIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));
})

Check warning on line 144 in src/main/java/org/opensearch/flowframework/workflow/ReIndexStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/ReIndexStep.java#L142-L144

Added lines #L142 - L144 were not covered by tests
);
}
} catch (Exception e) {
String errorMessage = "Failed to parse and update new created resource";
logger.error(errorMessage, e);
reIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));

Check warning on line 150 in src/main/java/org/opensearch/flowframework/workflow/ReIndexStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/ReIndexStep.java#L147-L150

Added lines #L147 - L150 were not covered by tests
}
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to reindex from source " + sourceIndices + " to " + destinationIndex;
logger.error(errorMessage, e);
reIndexFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e)));
}
};

client.execute(ReindexAction.INSTANCE, reindexRequest, actionListener);

} catch (Exception e) {
reIndexFuture.onFailure(e);

Check warning on line 165 in src/main/java/org/opensearch/flowframework/workflow/ReIndexStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/ReIndexStep.java#L164-L165

Added lines #L164 - L165 were not covered by tests
}

return reIndexFuture;
}

@Override
public String getName() {
return NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS;
import static org.opensearch.flowframework.common.CommonValue.CREDENTIAL_FIELD;
import static org.opensearch.flowframework.common.CommonValue.DESCRIPTION_FIELD;
import static org.opensearch.flowframework.common.CommonValue.DESTINATION_INDEX;
import static org.opensearch.flowframework.common.CommonValue.EMBEDDING_DIMENSION;
import static org.opensearch.flowframework.common.CommonValue.FRAMEWORK_TYPE;
import static org.opensearch.flowframework.common.CommonValue.FUNCTION_NAME;
Expand All @@ -47,6 +48,8 @@
import static org.opensearch.flowframework.common.CommonValue.PIPELINE_ID;
import static org.opensearch.flowframework.common.CommonValue.PROTOCOL_FIELD;
import static org.opensearch.flowframework.common.CommonValue.REGISTER_MODEL_STATUS;
import static org.opensearch.flowframework.common.CommonValue.RE_INDEX_FIELD;
import static org.opensearch.flowframework.common.CommonValue.SOURCE_INDEX;
import static org.opensearch.flowframework.common.CommonValue.SUCCESS;
import static org.opensearch.flowframework.common.CommonValue.TOOLS_FIELD;
import static org.opensearch.flowframework.common.CommonValue.TYPE;
Expand Down Expand Up @@ -84,6 +87,7 @@ public WorkflowStepFactory(
) {
stepMap.put(NoOpStep.NAME, NoOpStep::new);
stepMap.put(CreateIndexStep.NAME, () -> new CreateIndexStep(client, flowFrameworkIndicesHandler));
stepMap.put(ReIndexStep.NAME, () -> new ReIndexStep(client, flowFrameworkIndicesHandler));
stepMap.put(
RegisterLocalCustomModelStep.NAME,
() -> new RegisterLocalCustomModelStep(threadPool, mlClient, flowFrameworkIndicesHandler, flowFrameworkSettings)
Expand Down Expand Up @@ -125,6 +129,9 @@ public enum WorkflowSteps {
/** Create Index Step */
CREATE_INDEX(CreateIndexStep.NAME, List.of(INDEX_NAME, CONFIGURATIONS), List.of(INDEX_NAME), Collections.emptyList(), null),

/** Create ReIndex Step */
RE_INDEX(ReIndexStep.NAME, List.of(SOURCE_INDEX, DESTINATION_INDEX), List.of(RE_INDEX_FIELD), Collections.emptyList(), null),

/** Create Connector Step */
CREATE_CONNECTOR(
CreateConnectorStep.NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void testParseWorkflowValidator() throws IOException {

WorkflowValidator validator = new WorkflowValidator(workflowStepValidators);

assertEquals(17, validator.getWorkflowStepValidators().size());
assertEquals(18, validator.getWorkflowStepValidators().size());

assertTrue(validator.getWorkflowStepValidators().keySet().contains("create_connector"));
assertEquals(7, validator.getWorkflowStepValidators().get("create_connector").getInputs().size());
Expand Down
Loading

0 comments on commit e2b6f88

Please sign in to comment.