Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds REST APIs for creating and provisioning a workflow #63

Merged
merged 30 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
d3be2e4
Inital implementation, set up rest/transport actions, registration, p…
joshpalis Sep 29, 2023
544ce89
Merge branch 'main' into create
joshpalis Oct 2, 2023
c32bf31
Addressing PR comments, seting params to snake case, removing redunda…
joshpalis Oct 2, 2023
0bceff6
Merge branch 'main' into create
joshpalis Oct 3, 2023
cc32079
Introducing getExecutorBuilders extension point to FlowFramworkPlugin…
joshpalis Oct 4, 2023
a51c681
updating unit tests for FlowFrameworkPluginTests, adding WorkflowRequ…
joshpalis Oct 5, 2023
524407e
Adding validate/toXContent tests for workflow request/responses
joshpalis Oct 5, 2023
6aaa8c7
Adding unit tests for create and provision rest actions
joshpalis Oct 5, 2023
527ddd6
Merge branch 'main' into create
joshpalis Oct 9, 2023
791f943
Addressing PR comments (Part 1). Moving common vlaues to CommonValue …
joshpalis Oct 9, 2023
c7c819b
Addressing PR comments (Part 2), adding globalcontexthandler to creat…
joshpalis Oct 9, 2023
8084005
Merge branch 'main' into create
joshpalis Oct 9, 2023
6c3d3db
Addressing PR comments (Part 3)
joshpalis Oct 9, 2023
288a8ae
Removing TODOs for RestAction constructors, adding basic unit tests f…
joshpalis Oct 9, 2023
e200d9a
Adding CreateWorkflowTransportAction unit tests
joshpalis Oct 9, 2023
956e823
Adding intial failure test case for the ProvisionWorkflowTransportAct…
joshpalis Oct 10, 2023
9236966
Updating base URI namespace to workflow instead of workflows
joshpalis Oct 10, 2023
0ac7873
Addressing PR comment, updating invalid template config test, removin…
joshpalis Oct 10, 2023
b945bdd
Add success test case for ProvisionWorkflowTransportAction
joshpalis Oct 10, 2023
c29a639
Merge branch 'main' into create
joshpalis Oct 10, 2023
8f1fef7
Merge branch 'main' into create
joshpalis Oct 11, 2023
1dac1ee
Updating global context index mapping for template version and compat…
joshpalis Oct 11, 2023
afeb2b6
Fixing bugs, changed GC index mapping so that template/compatibility …
joshpalis Oct 12, 2023
bbe8eff
Updating GlobalContextHandler.updateTemplate() to use toDocumentSourc…
joshpalis Oct 12, 2023
6725c72
Merge branch 'main' into create
joshpalis Oct 12, 2023
b910ebc
Replacing exceptions with FlowFrameworException
joshpalis Oct 12, 2023
decb31f
Resolving javadoc warnings
joshpalis Oct 12, 2023
09dd471
Cleaning up TODOs
joshpalis Oct 12, 2023
4d96e50
Addressing PR comments
joshpalis Oct 12, 2023
8200441
Addressing PR comments, moving some common template parsing methods t…
joshpalis Oct 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,54 @@
package org.opensearch.flowframework;

import com.google.common.collect.ImmutableList;
import org.opensearch.action.ActionRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.flowframework.rest.RestCreateWorkflowAction;
import org.opensearch.flowframework.rest.RestProvisionWorkflowAction;
import org.opensearch.flowframework.transport.CreateWorkflowAction;
import org.opensearch.flowframework.transport.CreateWorkflowTransportAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowTransportAction;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;

/**
* An OpenSearch plugin that enables builders to innovate AI apps on OpenSearch.
*/
public class FlowFrameworkPlugin extends Plugin {
public class FlowFrameworkPlugin extends Plugin implements ActionPlugin {

/**
* The base URI for this plugin's rest actions
*/
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
public static final String AI_FLOW_FRAMEWORK_BASE_URI = "/_plugins/_ai_flow";
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
/**
* The URI for this plugin's workflow rest actions
*/
public static final String WORKFLOWS_URI = AI_FLOW_FRAMEWORK_BASE_URI + "/workflows";
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved

@Override
public Collection<Object> createComponents(
Expand All @@ -51,4 +77,26 @@ public Collection<Object> createComponents(

return ImmutableList.of(workflowStepFactory, workflowProcessSorter);
}

@Override
public List<RestHandler> getRestHandlers(
Settings settings,
RestController restController,
ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings,
SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
return ImmutableList.of(new RestCreateWorkflowAction(), new RestProvisionWorkflowAction());
}

@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return ImmutableList.of(
new ActionHandler<>(CreateWorkflowAction.INSTANCE, CreateWorkflowTransportAction.class),
new ActionHandler<>(ProvisionWorkflowAction.INSTANCE, ProvisionWorkflowTransportAction.class)
);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.rest;

import com.google.common.collect.ImmutableList;
import org.opensearch.client.node.NodeClient;
import org.opensearch.flowframework.FlowFrameworkPlugin;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.transport.CreateWorkflowAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;

import java.io.IOException;
import java.util.List;
import java.util.Locale;

/**
* Rest Action to facilitate requests to create and update a use case template
*/
public class RestCreateWorkflowAction extends BaseRestHandler {

private static final String CREATE_WORKFLOW_ACTION = "create_workflow_action";

// TODO : move to common values class, pending implementation
/**
* Field name for workflow Id, the document Id of the indexed use case template
*/
public static final String WORKFLOW_ID = "workflowId";
joshpalis marked this conversation as resolved.
Show resolved Hide resolved

/**
* Intantiates a new RestCreateWorkflowAction
*/
public RestCreateWorkflowAction() {
// TODO : Pass settings and cluster service to constructor and add settings update consumer for request timeout value
}

@Override
public String getName() {
return CREATE_WORKFLOW_ACTION;
}

@Override
public List<Route> routes() {
return ImmutableList.of(
// Create new workflow
new Route(RestRequest.Method.POST, String.format(Locale.ROOT, "%s", FlowFrameworkPlugin.WORKFLOWS_URI)),
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
// Update workflow
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
new Route(RestRequest.Method.PUT, String.format(Locale.ROOT, "%s/{%s}", FlowFrameworkPlugin.WORKFLOWS_URI, WORKFLOW_ID))
);
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {

String workflowId = request.param(WORKFLOW_ID, null);
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
Template template = Template.parse(request.content().utf8ToString());
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, template);
return channel -> client.execute(CreateWorkflowAction.INSTANCE, workflowRequest, new RestToXContentListener<>(channel));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.rest;

import com.google.common.collect.ImmutableList;
import org.opensearch.client.node.NodeClient;
import org.opensearch.flowframework.FlowFrameworkPlugin;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.transport.ProvisionWorkflowAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;

import java.io.IOException;
import java.util.List;
import java.util.Locale;

/**
* Rest action to facilitate requests to provision a workflow from an inline defined or stored use case template
*/
public class RestProvisionWorkflowAction extends BaseRestHandler {

private static final String PROVISION_WORKFLOW_ACTION = "provision_workflow_action";

// TODO : move to common values class, pending implementation
/**
* Field name for workflow Id, the document Id of the indexed use case template
*/
public static final String WORKFLOW_ID = "workflowId";

/**
* Instantiates a new RestProvisionWorkflowAction
*/
public RestProvisionWorkflowAction() {
// TODO : Pass settings and cluster service to constructor and add settings update consumer for request timeout value
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public String getName() {
return PROVISION_WORKFLOW_ACTION;
}

@Override
public List<Route> routes() {
return ImmutableList.of(
// Provision workflow from inline use case template
new Route(RestRequest.Method.POST, String.format(Locale.ROOT, "%s/%s", FlowFrameworkPlugin.WORKFLOWS_URI, "_provision")),
// Provision workflow from indexed use case template
new Route(
RestRequest.Method.POST,
String.format(Locale.ROOT, "%s/{%s}/%s", FlowFrameworkPlugin.WORKFLOWS_URI, WORKFLOW_ID, "_provision")
)
);
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {

String workflowId = request.param(WORKFLOW_ID, null);
Template template = null;

if (request.hasContent()) {
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
template = Template.parse(request.content().utf8ToString());
}

// Validate workflow request inputs
if (workflowId == null && template == null) {
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
throw new IOException("WorkflowId and template cannot be both null");
}

// Create request and provision
WorkflowRequest workflowRequest = new WorkflowRequest(workflowId, template);
return channel -> client.execute(ProvisionWorkflowAction.INSTANCE, workflowRequest, new RestToXContentListener<>(channel));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.transport;

import org.opensearch.action.ActionType;

/**
* External Action for public facing RestCreateWorkflowActiom
*/
public class CreateWorkflowAction extends ActionType<WorkflowResponse> {

// TODO : Determine external action prefix for plugin
/** The name of this action */
public static final String NAME = "workflows/create";
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
/** An instance of this action */
public static final CreateWorkflowAction INSTANCE = new CreateWorkflowAction();

private CreateWorkflowAction() {
super(NAME, WorkflowResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.transport;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.client.Client;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

/**
* Transport Action to index or update a use case template within the Global Context
*/
public class CreateWorkflowTransportAction extends HandledTransportAction<WorkflowRequest, WorkflowResponse> {

private final Logger logger = LogManager.getLogger(CreateWorkflowTransportAction.class);

private final Client client;

/**
* Intantiates a new CreateWorkflowTransportAction
* @param transportService the TransportService
* @param actionFilters action filters
* @param client the node client to interact with an index
*/
@Inject
public CreateWorkflowTransportAction(TransportService transportService, ActionFilters actionFilters, Client client) {
super(CreateWorkflowAction.NAME, transportService, actionFilters, WorkflowRequest::new);
this.client = client;
}

@Override
protected void doExecute(Task task, WorkflowRequest request, ActionListener<WorkflowResponse> listener) {

String workflowId;
// TODO : Check if global context index exists, and if it does not then create

if (request.getWorkflowId() == null) {
// TODO : Create new entry
// TODO : Insert doc

// TODO : get document ID
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
workflowId = "";
// TODO : check if state index exists, and if it does not, then create
// TODO : insert state index doc, mapped with documentId, defaulted to NOT_STARTED
} else {
// TODO : Update existing entry
workflowId = request.getWorkflowId();
// TODO : Update state index entry, default back to NOT_STARTED
}

listener.onResponse(new WorkflowResponse(workflowId));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.transport;

import org.opensearch.action.ActionType;

/**
* External Action for public facing RestProvisionWorkflowAction
*/
public class ProvisionWorkflowAction extends ActionType<WorkflowResponse> {

// TODO : Determine external action prefix for plugin
/** The name of this action */
public static final String NAME = "workflows/provision";
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
/** An instance of this action */
public static final ProvisionWorkflowAction INSTANCE = new ProvisionWorkflowAction();

private ProvisionWorkflowAction() {
super(NAME, WorkflowResponse::new);
}
}
Loading