Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Get Workflow API to retrieve a stored template by workflow id #263

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.rest.RestCreateWorkflowAction;
import org.opensearch.flowframework.rest.RestGetTemplateAction;
import org.opensearch.flowframework.rest.RestGetWorkflowAction;
import org.opensearch.flowframework.rest.RestProvisionWorkflowAction;
import org.opensearch.flowframework.rest.RestSearchWorkflowAction;
import org.opensearch.flowframework.transport.CreateWorkflowAction;
import org.opensearch.flowframework.transport.CreateWorkflowTransportAction;
import org.opensearch.flowframework.transport.GetTemplateAction;
import org.opensearch.flowframework.transport.GetTemplateTransportAction;
import org.opensearch.flowframework.transport.GetWorkflowAction;
import org.opensearch.flowframework.transport.GetWorkflowTransportAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowAction;
Expand Down Expand Up @@ -125,7 +128,8 @@ public List<RestHandler> getRestHandlers(
new RestCreateWorkflowAction(flowFrameworkFeatureEnabledSetting, settings, clusterService),
new RestProvisionWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestSearchWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestGetWorkflowAction(flowFrameworkFeatureEnabledSetting)
new RestGetWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestGetTemplateAction(flowFrameworkFeatureEnabledSetting)
);
}

Expand All @@ -135,7 +139,8 @@ public List<RestHandler> getRestHandlers(
new ActionHandler<>(CreateWorkflowAction.INSTANCE, CreateWorkflowTransportAction.class),
new ActionHandler<>(ProvisionWorkflowAction.INSTANCE, ProvisionWorkflowTransportAction.class),
new ActionHandler<>(SearchWorkflowAction.INSTANCE, SearchWorkflowTransportAction.class),
new ActionHandler<>(GetWorkflowAction.INSTANCE, GetWorkflowTransportAction.class)
new ActionHandler<>(GetWorkflowAction.INSTANCE, GetWorkflowTransportAction.class),
new ActionHandler<>(GetTemplateAction.INSTANCE, GetTemplateTransportAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.rest;

import com.google.common.collect.ImmutableList;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.client.node.NodeClient;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.transport.GetTemplateAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;

import java.io.IOException;
import java.util.List;
import java.util.Locale;

import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;

/**
* Rest Action to facilitate requests to get a stored template
*/
public class RestGetTemplateAction extends BaseRestHandler {

private static final String GET_TEMPLATE_ACTION = "get_template";
private static final Logger logger = LogManager.getLogger(RestGetTemplateAction.class);
private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting;

/**
* Instantiates a new RestGetWorkflowAction
* @param flowFrameworkFeatureEnabledSetting Whether this API is enabled
*/
public RestGetTemplateAction(FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting) {
this.flowFrameworkFeatureEnabledSetting = flowFrameworkFeatureEnabledSetting;
}

@Override
public String getName() {
return GET_TEMPLATE_ACTION;
}

@Override
public List<Route> routes() {
return ImmutableList.of(new Route(RestRequest.Method.GET, String.format(Locale.ROOT, "%s/{%s}", WORKFLOW_URI, WORKFLOW_ID)));
}

@Override
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {

try {
if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) {
throw new FlowFrameworkException(
"This API is disabled. To enable it, update the setting [" + FLOW_FRAMEWORK_ENABLED.getKey() + "] to true.",
RestStatus.FORBIDDEN
);
}

// Validate content
if (request.hasContent()) {
throw new FlowFrameworkException("Invalid request format", RestStatus.BAD_REQUEST);
}
// Validate params
String workflowId = request.param(WORKFLOW_ID);
if (workflowId == null) {
throw new FlowFrameworkException("workflow_id cannot be null", RestStatus.BAD_REQUEST);
}

WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, null);
return channel -> client.execute(GetTemplateAction.INSTANCE, workflowRequest, ActionListener.wrap(response -> {
XContentBuilder builder = response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}, exception -> {
try {
FlowFrameworkException ex = new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception));
XContentBuilder exceptionBuilder = ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS);
channel.sendResponse(new BytesRestResponse(ex.getRestStatus(), exceptionBuilder));

} catch (IOException e) {
logger.error("Failed to send back get template exception", e);
channel.sendResponse(new BytesRestResponse(ExceptionsHelper.status(e), e.getMessage()));
}
}));

} catch (FlowFrameworkException ex) {
return channel -> channel.sendResponse(
new BytesRestResponse(ex.getRestStatus(), ex.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.transport;

import org.opensearch.action.ActionType;

import static org.opensearch.flowframework.common.CommonValue.TRANSPORT_ACTION_NAME_PREFIX;

/**
* External Action for public facing RestGetTemplateAction
*/
public class GetTemplateAction extends ActionType<GetTemplateResponse> {
/** The name of this action */
public static final String NAME = TRANSPORT_ACTION_NAME_PREFIX + "template/get";
/** An instance of this action */
public static final GetTemplateAction INSTANCE = new GetTemplateAction();

private GetTemplateAction() {
super(NAME, GetTemplateResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.transport;

import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.model.Template;

import java.io.IOException;

/**
* Transport Response from getting a template
*/
public class GetTemplateResponse extends ActionResponse implements ToXContentObject {

/** The template */
private Template template;

/**
* Instantiates a new GetTemplateResponse from an input stream
* @param in the input stream to read from
* @throws IOException if the template json cannot be read from the input stream
*/
public GetTemplateResponse(StreamInput in) throws IOException {
super(in);
this.template = Template.parse(in.readString());
}

/**
* Instantiates a new GetTemplateResponse
* @param template the template
*/
public GetTemplateResponse(Template template) {
this.template = template;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(template.toJson());
}

@Override
public XContentBuilder toXContent(XContentBuilder xContentBuilder, Params params) throws IOException {
return this.template.toXContent(xContentBuilder, params);
}

/**
* Gets the template
* @return the template
*/
public Template getTemplate() {
return this.template;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.transport;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.client.Client;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.model.Template;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;

/**
* Transport action to retrieve a use case template within the Global Context
*/
public class GetTemplateTransportAction extends HandledTransportAction<WorkflowRequest, GetTemplateResponse> {

private final Logger logger = LogManager.getLogger(GetTemplateTransportAction.class);
private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;
private final Client client;

/**
* Instantiates a new GetTempltateTransportAction instance
* @param transportService the transport service
* @param actionFilters action filters
* @param flowFrameworkIndicesHandler The Flow Framework indices handler
* @param client the Opensearch Client
*/
@Inject
public GetTemplateTransportAction(
TransportService transportService,
ActionFilters actionFilters,
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler,
Client client
) {
super(GetTemplateAction.NAME, transportService, actionFilters, WorkflowRequest::new);
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
this.client = client;
}

@Override
protected void doExecute(Task task, WorkflowRequest request, ActionListener<GetTemplateResponse> listener) {
if (flowFrameworkIndicesHandler.doesIndexExist(GLOBAL_CONTEXT_INDEX)) {

String workflowId = request.getWorkflowId();
GetRequest getRequest = new GetRequest(GLOBAL_CONTEXT_INDEX, workflowId);

// Retrieve workflow by ID
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
client.get(getRequest, ActionListener.wrap(response -> {
context.restore();

if (!response.isExists()) {
listener.onFailure(
new FlowFrameworkException(
"Failed to retrieve template (" + workflowId + ") from global context.",
RestStatus.NOT_FOUND
)
);
} else {
listener.onResponse(new GetTemplateResponse(Template.parse(response.getSourceAsString())));
}
}, exception -> {
logger.error("Failed to retrieve template from global context.", exception);
listener.onFailure(new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception)));
}));
} catch (Exception e) {
logger.error("Failed to retrieve template from global context.", e);
listener.onFailure(new FlowFrameworkException(e.getMessage(), ExceptionsHelper.status(e)));
}

} else {
listener.onFailure(new FlowFrameworkException("There are no templates in the global_context", RestStatus.NOT_FOUND));
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ public void testPlugin() throws IOException {
4,
ffp.createComponents(client, clusterService, threadPool, null, null, null, environment, null, null, null, null).size()
);
assertEquals(4, ffp.getRestHandlers(settings, null, null, null, null, null, null).size());
assertEquals(4, ffp.getActions().size());
assertEquals(5, ffp.getRestHandlers(settings, null, null, null, null, null, null).size());
assertEquals(5, ffp.getActions().size());
assertEquals(1, ffp.getExecutorBuilders(settings).size());
assertEquals(4, ffp.getSettings().size());
}
Expand Down
Loading