From 95feb0e179d3277e7f5eb98d386255a0ec56c80b Mon Sep 17 00:00:00 2001 From: Owais Kazi Date: Thu, 30 May 2024 14:20:15 -0700 Subject: [PATCH] Added workflow step for ReIndex Step (#718) * Initial commit for reindex workflow step with extra params Signed-off-by: owaiskazi19 * Addressed PR comments Signed-off-by: owaiskazi19 * Changed request per second to Float Signed-off-by: owaiskazi19 * Addressed string array for source indices and removed state index entry Signed-off-by: owaiskazi19 * Minor comments Signed-off-by: owaiskazi19 --------- Signed-off-by: owaiskazi19 Signed-off-by: martinpkr --- CHANGELOG.md | 2 + .../flowframework/common/CommonValue.java | 5 +- .../common/WorkflowResources.java | 3 + .../flowframework/workflow/ReindexStep.java | 176 +++++++++++++++ .../workflow/WorkflowStepFactory.java | 6 + .../model/WorkflowValidatorTests.java | 2 +- .../workflow/ReindexStepTests.java | 201 ++++++++++++++++++ 7 files changed, 393 insertions(+), 2 deletions(-) create mode 100644 src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java create mode 100644 src/test/java/org/opensearch/flowframework/workflow/ReindexStepTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 465053a1e..9bc658575 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) ## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.14...2.x) ### Features ### Enhancements +- Add Workflow Step for Reindex from source index to destination ([#718](https://github.com/opensearch-project/flow-framework/pull/718)) + ### Bug Fixes - Add user mapping to Workflow State index ([#705](https://github.com/opensearch-project/flow-framework/pull/705)) diff --git a/src/main/java/org/opensearch/flowframework/common/CommonValue.java b/src/main/java/org/opensearch/flowframework/common/CommonValue.java index ac0291687..2a835b852 100644 --- a/src/main/java/org/opensearch/flowframework/common/CommonValue.java +++ b/src/main/java/org/opensearch/flowframework/common/CommonValue.java @@ -174,7 +174,10 @@ private CommonValue() {} public static final String DELAY_FIELD = "delay"; /** Model Interface Field */ public static final String INTERFACE_FIELD = "interface"; - + /** The source index field for reindex */ + public static final String SOURCE_INDEX = "source_index"; + /** The destination index field for reindex */ + public static final String DESTINATION_INDEX = "destination_index"; /* * Constants associated with resource provisioning / state */ diff --git a/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java b/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java index a024ec3b8..e349e57e0 100644 --- a/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java +++ b/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java @@ -27,6 +27,7 @@ import org.opensearch.flowframework.workflow.RegisterLocalSparseEncodingModelStep; import org.opensearch.flowframework.workflow.RegisterModelGroupStep; import org.opensearch.flowframework.workflow.RegisterRemoteModelStep; +import org.opensearch.flowframework.workflow.ReindexStep; import org.opensearch.flowframework.workflow.UndeployModelStep; import java.util.Set; @@ -58,6 +59,8 @@ public enum WorkflowResources { CREATE_SEARCH_PIPELINE(CreateSearchPipelineStep.NAME, WorkflowResources.PIPELINE_ID, null), // TODO delete step /** Workflow steps for creating an index and associated created resource */ CREATE_INDEX(CreateIndexStep.NAME, WorkflowResources.INDEX_NAME, NoOpStep.NAME), + /** Workflow steps for reindex a source index to destination index and associated created resource */ + REINDEX(ReindexStep.NAME, WorkflowResources.INDEX_NAME, NoOpStep.NAME), /** Workflow steps for registering/deleting an agent and the associated created resource */ REGISTER_AGENT(RegisterAgentStep.NAME, WorkflowResources.AGENT_ID, DeleteAgentStep.NAME); diff --git a/src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java b/src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java new file mode 100644 index 000000000..bc335db97 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/workflow/ReindexStep.java @@ -0,0 +1,176 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.ExceptionsHelper; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.client.Client; +import org.opensearch.common.Booleans; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.Strings; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.exception.WorkflowStepException; +import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; +import org.opensearch.flowframework.util.ParseUtils; +import org.opensearch.index.reindex.BulkByScrollResponse; +import org.opensearch.index.reindex.ReindexAction; +import org.opensearch.index.reindex.ReindexRequest; + +import java.util.Map; +import java.util.Set; + +import static org.opensearch.flowframework.common.CommonValue.DESTINATION_INDEX; +import static org.opensearch.flowframework.common.CommonValue.SOURCE_INDEX; + +/** + * Step to reindex + */ +public class ReindexStep implements WorkflowStep { + + private static final Logger logger = LogManager.getLogger(ReindexStep.class); + private final Client client; + private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler; + + /** The name of this step, used as a key in the template and the {@link WorkflowStepFactory} */ + public static final String NAME = "reindex"; + /** The refresh field for reindex */ + private static final String REFRESH = "refresh"; + /** The requests_per_second field for reindex */ + private static final String REQUESTS_PER_SECOND = "requests_per_second"; + /** The require_alias field for reindex */ + private static final String REQUIRE_ALIAS = "require_alias"; + /** The slices field for reindex */ + private static final String SLICES = "slices"; + /** The max_docs field for reindex */ + private static final String MAX_DOCS = "max_docs"; + + /** + * Instantiate this class + * + * @param client Client to create an index + * @param flowFrameworkIndicesHandler FlowFrameworkIndicesHandler class to update system indices + */ + public ReindexStep(Client client, FlowFrameworkIndicesHandler flowFrameworkIndicesHandler) { + this.client = client; + this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler; + } + + @Override + public PlainActionFuture execute( + String currentNodeId, + WorkflowData currentNodeInputs, + Map outputs, + Map previousNodeInputs, + Map params + ) { + + PlainActionFuture reIndexFuture = PlainActionFuture.newFuture(); + + Set requiredKeys = Set.of(SOURCE_INDEX, DESTINATION_INDEX); + + Set optionalKeys = Set.of(REFRESH, REQUESTS_PER_SECOND, REQUIRE_ALIAS, SLICES, MAX_DOCS); + + try { + Map inputs = ParseUtils.getInputsFromPreviousSteps( + requiredKeys, + optionalKeys, + currentNodeInputs, + outputs, + previousNodeInputs, + params + ); + + String sourceIndices = (String) inputs.get(SOURCE_INDEX); + String destinationIndex = (String) inputs.get(DESTINATION_INDEX); + Boolean refresh = inputs.containsKey(REFRESH) ? Booleans.parseBoolean(inputs.get(REFRESH).toString()) : null; + Float requestsPerSecond = inputs.containsKey(REQUESTS_PER_SECOND) + ? Float.parseFloat(inputs.get(REQUESTS_PER_SECOND).toString()) + : null; + Boolean requireAlias = inputs.containsKey(REQUIRE_ALIAS) ? Booleans.parseBoolean(inputs.get(REQUIRE_ALIAS).toString()) : null; + Integer slices = (Integer) inputs.get(SLICES); + Integer maxDocs = (Integer) inputs.get(MAX_DOCS); + + ReindexRequest reindexRequest = new ReindexRequest().setSourceIndices(Strings.splitStringByCommaToArray(sourceIndices)) + .setDestIndex(destinationIndex); + + if (refresh != null) { + reindexRequest.setRefresh(refresh); + } + if (requestsPerSecond != null) { + reindexRequest.setRequestsPerSecond(requestsPerSecond); + } + if (requireAlias != null) { + reindexRequest.setRequireAlias(requireAlias); + } + if (maxDocs != null) { + reindexRequest.setMaxDocs(maxDocs); + } + if (slices != null) { + reindexRequest.setSlices(slices); + } + + ActionListener actionListener = new ActionListener<>() { + + @Override + public void onResponse(BulkByScrollResponse bulkByScrollResponse) { + logger.info("Reindex from source: {} to destination {}", sourceIndices, destinationIndex); + try { + if (bulkByScrollResponse.getBulkFailures().isEmpty() && bulkByScrollResponse.getSearchFailures().isEmpty()) { + reIndexFuture.onResponse( + new WorkflowData( + Map.of( + NAME, + Map.ofEntries( + Map.entry(DESTINATION_INDEX, destinationIndex), + Map.entry(SOURCE_INDEX, sourceIndices) + ) + ), + currentNodeInputs.getWorkflowId(), + currentNodeInputs.getNodeId() + ) + ); + } else { + String errorMessage = "Failed to get bulk response " + bulkByScrollResponse.getBulkFailures(); + reIndexFuture.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST)); + } + } catch (Exception e) { + String errorMessage = "Failed to parse and update new created resource"; + logger.error(errorMessage, e); + reIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + } + } + + @Override + public void onFailure(Exception e) { + String errorMessage = "Failed to reindex from source " + sourceIndices + " to " + destinationIndex; + logger.error(errorMessage, e); + reIndexFuture.onFailure(new WorkflowStepException(errorMessage, ExceptionsHelper.status(e))); + } + }; + + client.execute(ReindexAction.INSTANCE, reindexRequest, actionListener); + + } catch (IllegalArgumentException iae) { + String error = "Failed to reindex " + iae.getMessage(); + reIndexFuture.onFailure(new WorkflowStepException(error, RestStatus.BAD_REQUEST)); + } catch (Exception e) { + reIndexFuture.onFailure(e); + } + + return reIndexFuture; + } + + @Override + public String getName() { + return NAME; + } +} diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java index 224cbf1eb..7ab5c1061 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java @@ -34,6 +34,7 @@ import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS; import static org.opensearch.flowframework.common.CommonValue.CREDENTIAL_FIELD; import static org.opensearch.flowframework.common.CommonValue.DESCRIPTION_FIELD; +import static org.opensearch.flowframework.common.CommonValue.DESTINATION_INDEX; import static org.opensearch.flowframework.common.CommonValue.EMBEDDING_DIMENSION; import static org.opensearch.flowframework.common.CommonValue.FRAMEWORK_TYPE; import static org.opensearch.flowframework.common.CommonValue.FUNCTION_NAME; @@ -47,6 +48,7 @@ import static org.opensearch.flowframework.common.CommonValue.PIPELINE_ID; import static org.opensearch.flowframework.common.CommonValue.PROTOCOL_FIELD; import static org.opensearch.flowframework.common.CommonValue.REGISTER_MODEL_STATUS; +import static org.opensearch.flowframework.common.CommonValue.SOURCE_INDEX; import static org.opensearch.flowframework.common.CommonValue.SUCCESS; import static org.opensearch.flowframework.common.CommonValue.TOOLS_FIELD; import static org.opensearch.flowframework.common.CommonValue.TYPE; @@ -84,6 +86,7 @@ public WorkflowStepFactory( ) { stepMap.put(NoOpStep.NAME, NoOpStep::new); stepMap.put(CreateIndexStep.NAME, () -> new CreateIndexStep(client, flowFrameworkIndicesHandler)); + stepMap.put(ReindexStep.NAME, () -> new ReindexStep(client, flowFrameworkIndicesHandler)); stepMap.put( RegisterLocalCustomModelStep.NAME, () -> new RegisterLocalCustomModelStep(threadPool, mlClient, flowFrameworkIndicesHandler, flowFrameworkSettings) @@ -125,6 +128,9 @@ public enum WorkflowSteps { /** Create Index Step */ CREATE_INDEX(CreateIndexStep.NAME, List.of(INDEX_NAME, CONFIGURATIONS), List.of(INDEX_NAME), Collections.emptyList(), null), + /** Create ReIndex Step */ + REINDEX(ReindexStep.NAME, List.of(SOURCE_INDEX, DESTINATION_INDEX), List.of(ReindexStep.NAME), Collections.emptyList(), null), + /** Create Connector Step */ CREATE_CONNECTOR( CreateConnectorStep.NAME, diff --git a/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java b/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java index 80a9788c2..19cb3d718 100644 --- a/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java +++ b/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java @@ -46,7 +46,7 @@ public void testParseWorkflowValidator() throws IOException { WorkflowValidator validator = new WorkflowValidator(workflowStepValidators); - assertEquals(17, validator.getWorkflowStepValidators().size()); + assertEquals(18, validator.getWorkflowStepValidators().size()); assertTrue(validator.getWorkflowStepValidators().keySet().contains("create_connector")); assertEquals(7, validator.getWorkflowStepValidators().get("create_connector").getInputs().size()); diff --git a/src/test/java/org/opensearch/flowframework/workflow/ReindexStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/ReindexStepTests.java new file mode 100644 index 000000000..97eff365a --- /dev/null +++ b/src/test/java/org/opensearch/flowframework/workflow/ReindexStepTests.java @@ -0,0 +1,201 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import org.apache.lucene.tests.util.LuceneTestCase; +import org.opensearch.OpenSearchException; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.action.update.UpdateResponse; +import org.opensearch.client.Client; +import org.opensearch.common.Randomness; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; +import org.opensearch.index.reindex.BulkByScrollResponse; +import org.opensearch.index.reindex.BulkByScrollTask; +import org.opensearch.index.reindex.ReindexRequest; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import org.mockito.ArgumentCaptor; +import org.mockito.MockitoAnnotations; + +import static java.lang.Math.abs; +import static java.util.stream.Collectors.toList; +import static org.opensearch.action.DocWriteResponse.Result.UPDATED; +import static org.opensearch.common.unit.TimeValue.timeValueMillis; +import static org.opensearch.flowframework.common.CommonValue.DESTINATION_INDEX; +import static org.opensearch.flowframework.common.CommonValue.SOURCE_INDEX; +import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX; +import static org.opensearch.flowframework.workflow.ReindexStep.NAME; +import static org.apache.lucene.tests.util.TestUtil.randomSimpleString; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class ReindexStepTests extends OpenSearchTestCase { + private WorkflowData inputData = WorkflowData.EMPTY; + private Client client; + private ReindexStep reIndexStep; + /** The refresh field for reindex */ + private static final String REFRESH = "refresh"; + /** The requests_per_second field for reindex */ + private static final String REQUESTS_PER_SECOND = "requests_per_second"; + /** The require_alias field for reindex */ + private static final String REQUIRE_ALIAS = "require_alias"; + /** The slices field for reindex */ + private static final String SLICES = "slices"; + /** The max_docs field for reindex */ + private static final String MAX_DOCS = "max_docs"; + + private FlowFrameworkIndicesHandler flowFrameworkIndicesHandler; + + @Override + public void setUp() throws Exception { + super.setUp(); + this.flowFrameworkIndicesHandler = mock(FlowFrameworkIndicesHandler.class); + MockitoAnnotations.openMocks(this); + + inputData = new WorkflowData( + Map.ofEntries( + Map.entry(SOURCE_INDEX, "demo"), + Map.entry(DESTINATION_INDEX, "dest"), + Map.entry(REFRESH, true), + Map.entry(REQUESTS_PER_SECOND, 2.0), + Map.entry(REQUIRE_ALIAS, false), + Map.entry(SLICES, 1), + Map.entry(MAX_DOCS, 2) + ), + "test-id", + "test-node-id" + ); + + client = mock(Client.class); + reIndexStep = new ReindexStep(client, flowFrameworkIndicesHandler); + } + + public void testReindexStep() throws ExecutionException, InterruptedException, IOException { + + @SuppressWarnings({ "unchecked" }) + ArgumentCaptor> actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class); + + doAnswer(invocation -> { + ActionListener updateResponseListener = invocation.getArgument(4); + updateResponseListener.onResponse(new UpdateResponse(new ShardId(WORKFLOW_STATE_INDEX, "", 1), "id", -2, 0, 0, UPDATED)); + return null; + }).when(flowFrameworkIndicesHandler).updateResourceInStateIndex(anyString(), anyString(), anyString(), anyString(), any()); + + PlainActionFuture future = reIndexStep.execute( + inputData.getNodeId(), + inputData, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap() + ); + + verify(client, times(1)).execute(any(), any(ReindexRequest.class), actionListenerCaptor.capture()); + actionListenerCaptor.getValue() + .onResponse( + new BulkByScrollResponse( + timeValueMillis(randomNonNegativeLong()), + randomStatus(), + Collections.emptyList(), + Collections.emptyList(), + randomBoolean() + ) + ); + + assertTrue(future.isDone()); + Map outputData = Map.of(NAME, Map.ofEntries(Map.entry(DESTINATION_INDEX, "dest"), Map.entry(SOURCE_INDEX, "demo"))); + assertEquals(outputData, future.get().getContent()); + + } + + public void testReindexStepFailure() throws ExecutionException, InterruptedException { + @SuppressWarnings({ "unchecked" }) + ArgumentCaptor> actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class); + PlainActionFuture future = reIndexStep.execute( + inputData.getNodeId(), + inputData, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap() + ); + assertFalse(future.isDone()); + verify(client, times(1)).execute(any(), any(ReindexRequest.class), actionListenerCaptor.capture()); + + actionListenerCaptor.getValue().onFailure(new Exception("Failed to reindex from source demo to dest")); + + assertTrue(future.isDone()); + ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent()); + assertTrue(ex.getCause() instanceof Exception); + assertEquals("Failed to reindex from source demo to dest", ex.getCause().getMessage()); + } + + private static BulkByScrollTask.Status randomStatus() { + if (randomBoolean()) { + return randomWorkingStatus(null); + } + boolean canHaveNullStatues = randomBoolean(); + List statuses = IntStream.range(0, between(0, 10)).mapToObj(i -> { + if (canHaveNullStatues && LuceneTestCase.rarely()) { + return null; + } + if (randomBoolean()) { + return new BulkByScrollTask.StatusOrException(new OpenSearchException(randomAlphaOfLength(5))); + } + return new BulkByScrollTask.StatusOrException(randomWorkingStatus(i)); + }).collect(toList()); + return new BulkByScrollTask.Status(statuses, randomBoolean() ? "test" : null); + } + + private static BulkByScrollTask.Status randomWorkingStatus(Integer sliceId) { + // These all should be believably small because we sum them if we have multiple workers + int total = between(0, 10000000); + int updated = between(0, total); + int created = between(0, total - updated); + int deleted = between(0, total - updated - created); + int noops = total - updated - created - deleted; + int batches = between(0, 10000); + long versionConflicts = between(0, total); + long bulkRetries = between(0, 10000000); + long searchRetries = between(0, 100000); + // smallest unit of time during toXContent is Milliseconds + TimeUnit[] timeUnits = { TimeUnit.MILLISECONDS, TimeUnit.SECONDS, TimeUnit.MINUTES, TimeUnit.HOURS, TimeUnit.DAYS }; + TimeValue throttled = new TimeValue(randomIntBetween(0, 1000), randomFrom(timeUnits)); + TimeValue throttledUntil = new TimeValue(randomIntBetween(0, 1000), randomFrom(timeUnits)); + return new BulkByScrollTask.Status( + sliceId, + total, + updated, + created, + deleted, + batches, + versionConflicts, + noops, + bulkRetries, + searchRetries, + throttled, + abs(Randomness.get().nextFloat()), + randomBoolean() ? null : randomSimpleString(Randomness.get()), + throttledUntil + ); + } +}