Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding create search pipeline step #569

Merged
merged 2 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.12...2.x)
### Features
- Adding create ingest pipeline step ([#558](https://github.com/opensearch-project/flow-framework/pull/558))
- Adding create search pipeline step ([#569](https://github.com/opensearch-project/flow-framework/pull/569))

### Enhancements
- Substitute REST path or body parameters in Workflow Steps ([#525](https://github.com/opensearch-project/flow-framework/pull/525))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.flowframework.workflow.CreateConnectorStep;
import org.opensearch.flowframework.workflow.CreateIndexStep;
import org.opensearch.flowframework.workflow.CreateIngestPipelineStep;
import org.opensearch.flowframework.workflow.CreateSearchPipelineStep;
import org.opensearch.flowframework.workflow.DeleteAgentStep;
import org.opensearch.flowframework.workflow.DeleteConnectorStep;
import org.opensearch.flowframework.workflow.DeleteModelStep;
Expand Down Expand Up @@ -53,6 +54,8 @@ public enum WorkflowResources {
DEPLOY_MODEL(DeployModelStep.NAME, WorkflowResources.MODEL_ID, UndeployModelStep.NAME),
/** Workflow steps for creating an ingest-pipeline and associated created resource */
CREATE_INGEST_PIPELINE(CreateIngestPipelineStep.NAME, WorkflowResources.PIPELINE_ID, null), // TODO delete step
/** Workflow steps for creating an ingest-pipeline and associated created resource */
CREATE_SEARCH_PIPELINE(CreateSearchPipelineStep.NAME, WorkflowResources.PIPELINE_ID, null), // TODO delete step
/** Workflow steps for creating an index and associated created resource */
CREATE_INDEX(CreateIndexStep.NAME, WorkflowResources.INDEX_NAME, null), // TODO delete step
/** Workflow steps for registering/deleting an agent and the associated created resource */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.workflow;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.ingest.PutPipelineRequest;
import org.opensearch.action.search.PutSearchPipelineRequest;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.client.ClusterAdminClient;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Set;

import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;
import static org.opensearch.flowframework.common.WorkflowResources.PIPELINE_ID;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;

/**
* Step to create either a search or ingest pipeline
*/
public abstract class AbstractCreatePipelineStep implements WorkflowStep {
private static final Logger logger = LogManager.getLogger(AbstractCreatePipelineStep.class);

// Client to store a pipeline in the cluster state
private final ClusterAdminClient clusterAdminClient;

private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;

/**
* Instantiates a new AbstractCreatePipelineStep
* @param client The client to create a pipeline and store workflow data into the global context index
* @param flowFrameworkIndicesHandler FlowFrameworkIndicesHandler class to update system indices
*/
protected AbstractCreatePipelineStep(Client client, FlowFrameworkIndicesHandler flowFrameworkIndicesHandler) {
this.clusterAdminClient = client.admin().cluster();
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
}

@Override
public PlainActionFuture<WorkflowData> execute(
String currentNodeId,
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
) {

PlainActionFuture<WorkflowData> createPipelineFuture = PlainActionFuture.newFuture();

Set<String> requiredKeys = Set.of(PIPELINE_ID, CONFIGURATIONS);

// currently, we are supporting an optional param of model ID into the various processors
Set<String> optionalKeys = Set.of(MODEL_ID);

try {
Map<String, Object> inputs = ParseUtils.getInputsFromPreviousSteps(
requiredKeys,
optionalKeys,
currentNodeInputs,
outputs,
previousNodeInputs,
params
);

String pipelineId = (String) inputs.get(PIPELINE_ID);
String configurations = (String) inputs.get(CONFIGURATIONS);

byte[] byteArr = configurations.getBytes(StandardCharsets.UTF_8);
BytesReference configurationsBytes = new BytesArray(byteArr);

String pipelineToBeCreated = this.getName();
ActionListener<AcknowledgedResponse> putPipelineActionListener = new ActionListener<>() {

@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
String resourceName = getResourceByWorkflowStep(getName());
try {
flowFrameworkIndicesHandler.updateResourceInStateIndex(
currentNodeInputs.getWorkflowId(),
currentNodeId,
getName(),
pipelineId,
ActionListener.wrap(updateResponse -> {
logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex());
// PutPipelineRequest returns only an AcknowledgeResponse, saving pipelineId instead
// TODO: revisit this concept of pipeline_id to be consistent with what makes most sense to end user here
createPipelineFuture.onResponse(
new WorkflowData(
Map.of(resourceName, pipelineId),
currentNodeInputs.getWorkflowId(),
currentNodeInputs.getNodeId()
)
);
}, exception -> {
String errorMessage = "Failed to update new created "

Check warning on line 114 in src/main/java/org/opensearch/flowframework/workflow/AbstractCreatePipelineStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/AbstractCreatePipelineStep.java#L114

Added line #L114 was not covered by tests
+ currentNodeId
+ " resource "
+ getName()

Check warning on line 117 in src/main/java/org/opensearch/flowframework/workflow/AbstractCreatePipelineStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/AbstractCreatePipelineStep.java#L117

Added line #L117 was not covered by tests
+ " id "
+ pipelineId;
logger.error(errorMessage, exception);
createPipelineFuture.onFailure(
new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))

Check warning on line 122 in src/main/java/org/opensearch/flowframework/workflow/AbstractCreatePipelineStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/AbstractCreatePipelineStep.java#L120-L122

Added lines #L120 - L122 were not covered by tests
);
})

Check warning on line 124 in src/main/java/org/opensearch/flowframework/workflow/AbstractCreatePipelineStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/AbstractCreatePipelineStep.java#L124

Added line #L124 was not covered by tests
);

} catch (Exception e) {
String errorMessage = "Failed to parse and update new created resource";
logger.error(errorMessage, e);
createPipelineFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));

Check warning on line 130 in src/main/java/org/opensearch/flowframework/workflow/AbstractCreatePipelineStep.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/workflow/AbstractCreatePipelineStep.java#L127-L130

Added lines #L127 - L130 were not covered by tests
}
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed step " + pipelineToBeCreated;
logger.error(errorMessage, e);
createPipelineFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}

};

if (pipelineToBeCreated.equals(CreateSearchPipelineStep.NAME)) {
PutSearchPipelineRequest putSearchPipelineRequest = new PutSearchPipelineRequest(
pipelineId,
configurationsBytes,
XContentType.JSON
);
clusterAdminClient.putSearchPipeline(putSearchPipelineRequest, putPipelineActionListener);
} else {
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, configurationsBytes, XContentType.JSON);
clusterAdminClient.putPipeline(putPipelineRequest, putPipelineActionListener);
}

} catch (FlowFrameworkException e) {
createPipelineFuture.onFailure(e);
}
return createPipelineFuture;

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,135 +10,25 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.ingest.PutPipelineRequest;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.client.Client;
import org.opensearch.client.ClusterAdminClient;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Set;

import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;
import static org.opensearch.flowframework.common.WorkflowResources.PIPELINE_ID;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;

/**
* Step to create an ingest pipeline
*/
public class CreateIngestPipelineStep implements WorkflowStep {
public class CreateIngestPipelineStep extends AbstractCreatePipelineStep {
private static final Logger logger = LogManager.getLogger(CreateIngestPipelineStep.class);

/** The name of this step, used as a key in the template and the {@link WorkflowStepFactory} */
public static final String NAME = "create_ingest_pipeline";

// Client to store a pipeline in the cluster state
private final ClusterAdminClient clusterAdminClient;

private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;

/**
* Instantiates a new CreateIngestPipelineStep
* @param client The client to create a pipeline and store workflow data into the global context index
* @param flowFrameworkIndicesHandler FlowFrameworkIndicesHandler class to update system indices
*/
public CreateIngestPipelineStep(Client client, FlowFrameworkIndicesHandler flowFrameworkIndicesHandler) {
this.clusterAdminClient = client.admin().cluster();
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
}

@Override
public PlainActionFuture<WorkflowData> execute(
String currentNodeId,
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
) {

PlainActionFuture<WorkflowData> createIngestPipelineFuture = PlainActionFuture.newFuture();

Set<String> requiredKeys = Set.of(PIPELINE_ID, CONFIGURATIONS);

// currently, we are supporting an optional param of model ID into the various processors
Set<String> optionalKeys = Set.of(MODEL_ID);

try {
Map<String, Object> inputs = ParseUtils.getInputsFromPreviousSteps(
requiredKeys,
optionalKeys,
currentNodeInputs,
outputs,
previousNodeInputs,
params
);

String pipelineId = (String) inputs.get(PIPELINE_ID);
String configurations = (String) inputs.get(CONFIGURATIONS);

byte[] byteArr = configurations.getBytes(StandardCharsets.UTF_8);
BytesReference configurationsBytes = new BytesArray(byteArr);

// Create PutPipelineRequest and execute
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, configurationsBytes, XContentType.JSON);
clusterAdminClient.putPipeline(putPipelineRequest, ActionListener.wrap(acknowledgedResponse -> {
String resourceName = getResourceByWorkflowStep(getName());
try {
flowFrameworkIndicesHandler.updateResourceInStateIndex(
currentNodeInputs.getWorkflowId(),
currentNodeId,
getName(),
pipelineId,
ActionListener.wrap(updateResponse -> {
logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex());
// PutPipelineRequest returns only an AcknowledgeResponse, saving pipelineId instead
// TODO: revisit this concept of pipeline_id to be consistent with what makes most sense to end user here
createIngestPipelineFuture.onResponse(
new WorkflowData(
Map.of(resourceName, pipelineId),
currentNodeInputs.getWorkflowId(),
currentNodeInputs.getNodeId()
)
);
}, exception -> {
String errorMessage = "Failed to update new created "
+ currentNodeId
+ " resource "
+ getName()
+ " id "
+ pipelineId;
logger.error(errorMessage, exception);
createIngestPipelineFuture.onFailure(
new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))
);
})
);

} catch (Exception e) {
String errorMessage = "Failed to parse and update new created resource";
logger.error(errorMessage, e);
createIngestPipelineFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
}, e -> {
String errorMessage = "Failed to create ingest pipeline";
logger.error(errorMessage, e);
createIngestPipelineFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}));

} catch (FlowFrameworkException e) {
createIngestPipelineFuture.onFailure(e);
}

return createIngestPipelineFuture;
super(client, flowFrameworkIndicesHandler);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.workflow;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.Client;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;

/**
* Step to create a search pipeline
*/
public class CreateSearchPipelineStep extends AbstractCreatePipelineStep {
private static final Logger logger = LogManager.getLogger(CreateSearchPipelineStep.class);

/** The name of this step, used as a key in the template and the {@link WorkflowStepFactory} */
public static final String NAME = "create_search_pipeline";

/**
* Instantiates a new CreateSearchPipelineStep
* @param client The client to create a pipeline and store workflow data into the global context index
* @param flowFrameworkIndicesHandler FlowFrameworkIndicesHandler class to update system indices
*/
public CreateSearchPipelineStep(Client client, FlowFrameworkIndicesHandler flowFrameworkIndicesHandler) {
super(client, flowFrameworkIndicesHandler);
}

@Override
public String getName() {
return NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public WorkflowStepFactory(
stepMap.put(RegisterAgentStep.NAME, () -> new RegisterAgentStep(mlClient, flowFrameworkIndicesHandler));
stepMap.put(DeleteAgentStep.NAME, () -> new DeleteAgentStep(mlClient));
stepMap.put(CreateIngestPipelineStep.NAME, () -> new CreateIngestPipelineStep(client, flowFrameworkIndicesHandler));
stepMap.put(CreateSearchPipelineStep.NAME, () -> new CreateSearchPipelineStep(client, flowFrameworkIndicesHandler));
}

/**
Expand Down Expand Up @@ -211,6 +212,15 @@ public enum WorkflowSteps {
List.of(PIPELINE_ID),
Collections.emptyList(),
null
),

/** Create Search Pipeline Step */
CREATE_SEARCH_PIPELINE(
CreateSearchPipelineStep.NAME,
List.of(PIPELINE_ID, CONFIGURATIONS),
List.of(PIPELINE_ID),
Collections.emptyList(),
null
);

private final String workflowStepName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void testParseWorkflowValidator() throws IOException {

WorkflowValidator validator = new WorkflowValidator(workflowStepValidators);

assertEquals(15, validator.getWorkflowStepValidators().size());
assertEquals(16, validator.getWorkflowStepValidators().size());

assertTrue(validator.getWorkflowStepValidators().keySet().contains("create_connector"));
assertEquals(7, validator.getWorkflowStepValidators().get("create_connector").getInputs().size());
Expand Down
Loading
Loading