Skip to content

Commit

Permalink
adding create search pipeline step
Browse files Browse the repository at this point in the history
Signed-off-by: Amit Galitzky <[email protected]>
  • Loading branch information
amitgalitz committed Mar 12, 2024
1 parent 7a2d9d9 commit 9f19dd0
Show file tree
Hide file tree
Showing 12 changed files with 436 additions and 128 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.12...2.x)
### Features
- Adding create ingest pipeline step ([#558](https://github.com/opensearch-project/flow-framework/pull/558))
- adding create search pipeline step ([#569](https://github.com/opensearch-project/flow-framework/pull/569))

### Enhancements
- Substitute REST path or body parameters in Workflow Steps ([#525](https://github.com/opensearch-project/flow-framework/pull/525))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.flowframework.workflow.CreateConnectorStep;
import org.opensearch.flowframework.workflow.CreateIndexStep;
import org.opensearch.flowframework.workflow.CreateIngestPipelineStep;
import org.opensearch.flowframework.workflow.CreateSearchPipelineStep;
import org.opensearch.flowframework.workflow.DeleteAgentStep;
import org.opensearch.flowframework.workflow.DeleteConnectorStep;
import org.opensearch.flowframework.workflow.DeleteModelStep;
Expand Down Expand Up @@ -56,7 +57,9 @@ public enum WorkflowResources {
/** Workflow steps for creating an index and associated created resource */
CREATE_INDEX(CreateIndexStep.NAME, WorkflowResources.INDEX_NAME, null), // TODO delete step
/** Workflow steps for registering/deleting an agent and the associated created resource */
REGISTER_AGENT(RegisterAgentStep.NAME, WorkflowResources.AGENT_ID, DeleteAgentStep.NAME);
REGISTER_AGENT(RegisterAgentStep.NAME, WorkflowResources.AGENT_ID, DeleteAgentStep.NAME),
/** Workflow steps for creating an ingest-pipeline and associated created resource */
CREATE_SEARCH_PIPELINE(CreateSearchPipelineStep.NAME, WorkflowResources.PIPELINE_ID, null); // TODO delete step

/** Connector Id for a remote model connector */
public static final String CONNECTOR_ID = "connector_id";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.workflow;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.ingest.PutPipelineRequest;
import org.opensearch.action.search.PutSearchPipelineRequest;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.client.ClusterAdminClient;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Set;

import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;
import static org.opensearch.flowframework.common.WorkflowResources.PIPELINE_ID;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;

/**
* Step to create either a search or ingest pipeline
*/
public abstract class AbstractCreatePipelineStep implements WorkflowStep {
private static final Logger logger = LogManager.getLogger(AbstractCreatePipelineStep.class);

// Client to store a pipeline in the cluster state
private final ClusterAdminClient clusterAdminClient;

private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;

/**
* Instantiates a new AbstractCreatePipelineStep
* @param client The client to create a pipeline and store workflow data into the global context index
* @param flowFrameworkIndicesHandler FlowFrameworkIndicesHandler class to update system indices
*/
protected AbstractCreatePipelineStep(Client client, FlowFrameworkIndicesHandler flowFrameworkIndicesHandler) {
this.clusterAdminClient = client.admin().cluster();
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
}

@Override
public PlainActionFuture<WorkflowData> execute(
String currentNodeId,
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
) {

PlainActionFuture<WorkflowData> createPipelineFuture = PlainActionFuture.newFuture();

Set<String> requiredKeys = Set.of(PIPELINE_ID, CONFIGURATIONS);

// currently, we are supporting an optional param of model ID into the various processors
Set<String> optionalKeys = Set.of(MODEL_ID);

try {
Map<String, Object> inputs = ParseUtils.getInputsFromPreviousSteps(
requiredKeys,
optionalKeys,
currentNodeInputs,
outputs,
previousNodeInputs,
params
);

String pipelineId = (String) inputs.get(PIPELINE_ID);
String configurations = (String) inputs.get(CONFIGURATIONS);

byte[] byteArr = configurations.getBytes(StandardCharsets.UTF_8);
BytesReference configurationsBytes = new BytesArray(byteArr);

String pipelineToBeCreated = this.getName();
ActionListener<AcknowledgedResponse> putPipelineActionListener = new ActionListener<>() {

@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
String resourceName = getResourceByWorkflowStep(getName());
try {
flowFrameworkIndicesHandler.updateResourceInStateIndex(
currentNodeInputs.getWorkflowId(),
currentNodeId,
getName(),
pipelineId,
ActionListener.wrap(updateResponse -> {
logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex());
// PutPipelineRequest returns only an AcknowledgeResponse, saving pipelineId instead
// TODO: revisit this concept of pipeline_id to be consistent with what makes most sense to end user here
createPipelineFuture.onResponse(
new WorkflowData(
Map.of(resourceName, pipelineId),
currentNodeInputs.getWorkflowId(),
currentNodeInputs.getNodeId()
)
);
}, exception -> {
String errorMessage = "Failed to update new created "
+ currentNodeId
+ " resource "
+ getName()
+ " id "
+ pipelineId;
logger.error(errorMessage, exception);
createPipelineFuture.onFailure(
new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))
);
})
);

} catch (Exception e) {
String errorMessage = "Failed to parse and update new created resource";
logger.error(errorMessage, e);
createPipelineFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed step " + pipelineToBeCreated;
logger.error(errorMessage, e);
createPipelineFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}

};

if (pipelineToBeCreated.equals(CreateSearchPipelineStep.NAME)) {
PutSearchPipelineRequest putSearchPipelineRequest = new PutSearchPipelineRequest(
pipelineId,
configurationsBytes,
XContentType.JSON
);
clusterAdminClient.putSearchPipeline(putSearchPipelineRequest, putPipelineActionListener);
} else {
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, configurationsBytes, XContentType.JSON);
clusterAdminClient.putPipeline(putPipelineRequest, putPipelineActionListener);
}

} catch (FlowFrameworkException e) {
createPipelineFuture.onFailure(e);
}
return createPipelineFuture;

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,135 +10,25 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.ingest.PutPipelineRequest;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.client.Client;
import org.opensearch.client.ClusterAdminClient;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Set;

import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;
import static org.opensearch.flowframework.common.WorkflowResources.PIPELINE_ID;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;

/**
* Step to create an ingest pipeline
*/
public class CreateIngestPipelineStep implements WorkflowStep {
public class CreateIngestPipelineStep extends AbstractCreatePipelineStep {
private static final Logger logger = LogManager.getLogger(CreateIngestPipelineStep.class);

/** The name of this step, used as a key in the template and the {@link WorkflowStepFactory} */
public static final String NAME = "create_ingest_pipeline";

// Client to store a pipeline in the cluster state
private final ClusterAdminClient clusterAdminClient;

private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;

/**
* Instantiates a new CreateIngestPipelineStep
* @param client The client to create a pipeline and store workflow data into the global context index
* @param flowFrameworkIndicesHandler FlowFrameworkIndicesHandler class to update system indices
*/
public CreateIngestPipelineStep(Client client, FlowFrameworkIndicesHandler flowFrameworkIndicesHandler) {
this.clusterAdminClient = client.admin().cluster();
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
}

@Override
public PlainActionFuture<WorkflowData> execute(
String currentNodeId,
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs,
Map<String, String> params
) {

PlainActionFuture<WorkflowData> createIngestPipelineFuture = PlainActionFuture.newFuture();

Set<String> requiredKeys = Set.of(PIPELINE_ID, CONFIGURATIONS);

// currently, we are supporting an optional param of model ID into the various processors
Set<String> optionalKeys = Set.of(MODEL_ID);

try {
Map<String, Object> inputs = ParseUtils.getInputsFromPreviousSteps(
requiredKeys,
optionalKeys,
currentNodeInputs,
outputs,
previousNodeInputs,
params
);

String pipelineId = (String) inputs.get(PIPELINE_ID);
String configurations = (String) inputs.get(CONFIGURATIONS);

byte[] byteArr = configurations.getBytes(StandardCharsets.UTF_8);
BytesReference configurationsBytes = new BytesArray(byteArr);

// Create PutPipelineRequest and execute
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, configurationsBytes, XContentType.JSON);
clusterAdminClient.putPipeline(putPipelineRequest, ActionListener.wrap(acknowledgedResponse -> {
String resourceName = getResourceByWorkflowStep(getName());
try {
flowFrameworkIndicesHandler.updateResourceInStateIndex(
currentNodeInputs.getWorkflowId(),
currentNodeId,
getName(),
pipelineId,
ActionListener.wrap(updateResponse -> {
logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex());
// PutPipelineRequest returns only an AcknowledgeResponse, saving pipelineId instead
// TODO: revisit this concept of pipeline_id to be consistent with what makes most sense to end user here
createIngestPipelineFuture.onResponse(
new WorkflowData(
Map.of(resourceName, pipelineId),
currentNodeInputs.getWorkflowId(),
currentNodeInputs.getNodeId()
)
);
}, exception -> {
String errorMessage = "Failed to update new created "
+ currentNodeId
+ " resource "
+ getName()
+ " id "
+ pipelineId;
logger.error(errorMessage, exception);
createIngestPipelineFuture.onFailure(
new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))
);
})
);

} catch (Exception e) {
String errorMessage = "Failed to parse and update new created resource";
logger.error(errorMessage, e);
createIngestPipelineFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
}, e -> {
String errorMessage = "Failed to create ingest pipeline";
logger.error(errorMessage, e);
createIngestPipelineFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}));

} catch (FlowFrameworkException e) {
createIngestPipelineFuture.onFailure(e);
}

return createIngestPipelineFuture;
super(client, flowFrameworkIndicesHandler);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.workflow;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.Client;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;

/**
* Step to create a search pipeline
*/
public class CreateSearchPipelineStep extends AbstractCreatePipelineStep {
private static final Logger logger = LogManager.getLogger(CreateSearchPipelineStep.class);

/** The name of this step, used as a key in the template and the {@link WorkflowStepFactory} */
public static final String NAME = "create_search_pipeline";

/**
* Instantiates a new CreateSearchPipelineStep
* @param client The client to create a pipeline and store workflow data into the global context index
* @param flowFrameworkIndicesHandler FlowFrameworkIndicesHandler class to update system indices
*/
public CreateSearchPipelineStep(Client client, FlowFrameworkIndicesHandler flowFrameworkIndicesHandler) {
super(client, flowFrameworkIndicesHandler);
}

@Override
public String getName() {
return NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public WorkflowStepFactory(
stepMap.put(RegisterAgentStep.NAME, () -> new RegisterAgentStep(mlClient, flowFrameworkIndicesHandler));
stepMap.put(DeleteAgentStep.NAME, () -> new DeleteAgentStep(mlClient));
stepMap.put(CreateIngestPipelineStep.NAME, () -> new CreateIngestPipelineStep(client, flowFrameworkIndicesHandler));
stepMap.put(CreateSearchPipelineStep.NAME, () -> new CreateSearchPipelineStep(client, flowFrameworkIndicesHandler));
}

/**
Expand Down Expand Up @@ -211,6 +212,15 @@ public enum WorkflowSteps {
List.of(PIPELINE_ID),
Collections.emptyList(),
null
),

/** Create Ingest Pipeline Step */
CREATE_SEARCH_PIPELINE(
CreateSearchPipelineStep.NAME,
List.of(PIPELINE_ID, CONFIGURATIONS),
List.of(PIPELINE_ID),
Collections.emptyList(),
null
);

private final String workflowStepName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void testParseWorkflowValidator() throws IOException {

WorkflowValidator validator = new WorkflowValidator(workflowStepValidators);

assertEquals(15, validator.getWorkflowStepValidators().size());
assertEquals(16, validator.getWorkflowStepValidators().size());

assertTrue(validator.getWorkflowStepValidators().keySet().contains("create_connector"));
assertEquals(7, validator.getWorkflowStepValidators().get("create_connector").getInputs().size());
Expand Down
Loading

0 comments on commit 9f19dd0

Please sign in to comment.