forked from opensearch-project/flow-framework
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adding create search pipeline step (opensearch-project#569)
* adding create search pipeline step Signed-off-by: Amit Galitzky <[email protected]> * addresing nit changes Signed-off-by: Amit Galitzky <[email protected]> --------- Signed-off-by: Amit Galitzky <[email protected]>
- Loading branch information
1 parent
7a2d9d9
commit bc2306b
Showing
12 changed files
with
434 additions
and
128 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
161 changes: 161 additions & 0 deletions
161
src/main/java/org/opensearch/flowframework/workflow/AbstractCreatePipelineStep.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
package org.opensearch.flowframework.workflow; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.ExceptionsHelper; | ||
import org.opensearch.action.ingest.PutPipelineRequest; | ||
import org.opensearch.action.search.PutSearchPipelineRequest; | ||
import org.opensearch.action.support.PlainActionFuture; | ||
import org.opensearch.action.support.master.AcknowledgedResponse; | ||
import org.opensearch.client.Client; | ||
import org.opensearch.client.ClusterAdminClient; | ||
import org.opensearch.common.xcontent.XContentType; | ||
import org.opensearch.core.action.ActionListener; | ||
import org.opensearch.core.common.bytes.BytesArray; | ||
import org.opensearch.core.common.bytes.BytesReference; | ||
import org.opensearch.flowframework.exception.FlowFrameworkException; | ||
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; | ||
import org.opensearch.flowframework.util.ParseUtils; | ||
|
||
import java.nio.charset.StandardCharsets; | ||
import java.util.Map; | ||
import java.util.Set; | ||
|
||
import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS; | ||
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID; | ||
import static org.opensearch.flowframework.common.WorkflowResources.PIPELINE_ID; | ||
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep; | ||
|
||
/** | ||
* Step to create either a search or ingest pipeline | ||
*/ | ||
public abstract class AbstractCreatePipelineStep implements WorkflowStep { | ||
private static final Logger logger = LogManager.getLogger(AbstractCreatePipelineStep.class); | ||
|
||
// Client to store a pipeline in the cluster state | ||
private final ClusterAdminClient clusterAdminClient; | ||
|
||
private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler; | ||
|
||
/** | ||
* Instantiates a new AbstractCreatePipelineStep | ||
* @param client The client to create a pipeline and store workflow data into the global context index | ||
* @param flowFrameworkIndicesHandler FlowFrameworkIndicesHandler class to update system indices | ||
*/ | ||
protected AbstractCreatePipelineStep(Client client, FlowFrameworkIndicesHandler flowFrameworkIndicesHandler) { | ||
this.clusterAdminClient = client.admin().cluster(); | ||
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler; | ||
} | ||
|
||
@Override | ||
public PlainActionFuture<WorkflowData> execute( | ||
String currentNodeId, | ||
WorkflowData currentNodeInputs, | ||
Map<String, WorkflowData> outputs, | ||
Map<String, String> previousNodeInputs, | ||
Map<String, String> params | ||
) { | ||
|
||
PlainActionFuture<WorkflowData> createPipelineFuture = PlainActionFuture.newFuture(); | ||
|
||
Set<String> requiredKeys = Set.of(PIPELINE_ID, CONFIGURATIONS); | ||
|
||
// currently, we are supporting an optional param of model ID into the various processors | ||
Set<String> optionalKeys = Set.of(MODEL_ID); | ||
|
||
try { | ||
Map<String, Object> inputs = ParseUtils.getInputsFromPreviousSteps( | ||
requiredKeys, | ||
optionalKeys, | ||
currentNodeInputs, | ||
outputs, | ||
previousNodeInputs, | ||
params | ||
); | ||
|
||
String pipelineId = (String) inputs.get(PIPELINE_ID); | ||
String configurations = (String) inputs.get(CONFIGURATIONS); | ||
|
||
byte[] byteArr = configurations.getBytes(StandardCharsets.UTF_8); | ||
BytesReference configurationsBytes = new BytesArray(byteArr); | ||
|
||
String pipelineToBeCreated = this.getName(); | ||
ActionListener<AcknowledgedResponse> putPipelineActionListener = new ActionListener<>() { | ||
|
||
@Override | ||
public void onResponse(AcknowledgedResponse acknowledgedResponse) { | ||
String resourceName = getResourceByWorkflowStep(getName()); | ||
try { | ||
flowFrameworkIndicesHandler.updateResourceInStateIndex( | ||
currentNodeInputs.getWorkflowId(), | ||
currentNodeId, | ||
getName(), | ||
pipelineId, | ||
ActionListener.wrap(updateResponse -> { | ||
logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex()); | ||
// PutPipelineRequest returns only an AcknowledgeResponse, saving pipelineId instead | ||
// TODO: revisit this concept of pipeline_id to be consistent with what makes most sense to end user here | ||
createPipelineFuture.onResponse( | ||
new WorkflowData( | ||
Map.of(resourceName, pipelineId), | ||
currentNodeInputs.getWorkflowId(), | ||
currentNodeInputs.getNodeId() | ||
) | ||
); | ||
}, exception -> { | ||
String errorMessage = "Failed to update new created " | ||
+ currentNodeId | ||
+ " resource " | ||
+ getName() | ||
+ " id " | ||
+ pipelineId; | ||
logger.error(errorMessage, exception); | ||
createPipelineFuture.onFailure( | ||
new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)) | ||
); | ||
}) | ||
); | ||
|
||
} catch (Exception e) { | ||
String errorMessage = "Failed to parse and update new created resource"; | ||
logger.error(errorMessage, e); | ||
createPipelineFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); | ||
} | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
String errorMessage = "Failed step " + pipelineToBeCreated; | ||
logger.error(errorMessage, e); | ||
createPipelineFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); | ||
} | ||
|
||
}; | ||
|
||
if (pipelineToBeCreated.equals(CreateSearchPipelineStep.NAME)) { | ||
PutSearchPipelineRequest putSearchPipelineRequest = new PutSearchPipelineRequest( | ||
pipelineId, | ||
configurationsBytes, | ||
XContentType.JSON | ||
); | ||
clusterAdminClient.putSearchPipeline(putSearchPipelineRequest, putPipelineActionListener); | ||
} else { | ||
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, configurationsBytes, XContentType.JSON); | ||
clusterAdminClient.putPipeline(putPipelineRequest, putPipelineActionListener); | ||
} | ||
|
||
} catch (FlowFrameworkException e) { | ||
createPipelineFuture.onFailure(e); | ||
} | ||
return createPipelineFuture; | ||
|
||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
38 changes: 38 additions & 0 deletions
38
src/main/java/org/opensearch/flowframework/workflow/CreateSearchPipelineStep.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
package org.opensearch.flowframework.workflow; | ||
|
||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.client.Client; | ||
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; | ||
|
||
/** | ||
* Step to create a search pipeline | ||
*/ | ||
public class CreateSearchPipelineStep extends AbstractCreatePipelineStep { | ||
private static final Logger logger = LogManager.getLogger(CreateSearchPipelineStep.class); | ||
|
||
/** The name of this step, used as a key in the template and the {@link WorkflowStepFactory} */ | ||
public static final String NAME = "create_search_pipeline"; | ||
|
||
/** | ||
* Instantiates a new CreateSearchPipelineStep | ||
* @param client The client to create a pipeline and store workflow data into the global context index | ||
* @param flowFrameworkIndicesHandler FlowFrameworkIndicesHandler class to update system indices | ||
*/ | ||
public CreateSearchPipelineStep(Client client, FlowFrameworkIndicesHandler flowFrameworkIndicesHandler) { | ||
super(client, flowFrameworkIndicesHandler); | ||
} | ||
|
||
@Override | ||
public String getName() { | ||
return NAME; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.