diff --git a/common/constants.ts b/common/constants.ts index b569665d..033e8a0c 100644 --- a/common/constants.ts +++ b/common/constants.ts @@ -5,14 +5,26 @@ export const PLUGIN_ID = 'flow-framework'; +/** + * BACKEND/CLUSTER APIs + */ +export const FLOW_FRAMEWORK_API_ROUTE_PREFIX = '/_plugins/_flow_framework'; +export const FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX = `${FLOW_FRAMEWORK_API_ROUTE_PREFIX}/workflow`; +export const FLOW_FRAMEWORK_SEARCH_WORKFLOWS_ROUTE = `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/_search`; + +/** + * NODE APIs + */ export const BASE_NODE_API_PATH = '/api/flow_framework'; -// OpenSearch APIs -export const BASE_INDICES_NODE_API_PATH = `${BASE_NODE_API_PATH}/indices`; -export const SEARCH_INDICES_PATH = `${BASE_INDICES_NODE_API_PATH}/search`; -export const FETCH_INDICES_PATH = `${BASE_INDICES_NODE_API_PATH}/fetch`; +// OpenSearch node APIs +export const BASE_OPENSEARCH_NODE_API_PATH = `${BASE_NODE_API_PATH}/opensearch`; +export const CAT_INDICES_NODE_API_PATH = `${BASE_OPENSEARCH_NODE_API_PATH}/catIndices`; -// Flow Framework APIs +// Flow Framework node APIs export const BASE_WORKFLOW_NODE_API_PATH = `${BASE_NODE_API_PATH}/workflow`; -export const GET_WORKFLOW_PATH = `${BASE_WORKFLOW_NODE_API_PATH}`; -export const CREATE_WORKFLOW_PATH = `${BASE_WORKFLOW_NODE_API_PATH}/create`; +export const GET_WORKFLOW_NODE_API_PATH = `${BASE_WORKFLOW_NODE_API_PATH}`; +export const SEARCH_WORKFLOWS_NODE_API_PATH = `${BASE_WORKFLOW_NODE_API_PATH}/search`; +export const GET_WORKFLOW_STATE_NODE_API_PATH = `${BASE_WORKFLOW_NODE_API_PATH}/state`; +export const CREATE_WORKFLOW_NODE_API_PATH = `${BASE_WORKFLOW_NODE_API_PATH}/create`; +export const DELETE_WORKFLOW_NODE_API_PATH = `${BASE_WORKFLOW_NODE_API_PATH}/delete`; diff --git a/common/interfaces.ts b/common/interfaces.ts index 7ecffadc..19c8011a 100644 --- a/common/interfaces.ts +++ b/common/interfaces.ts @@ -102,3 +102,7 @@ export enum WORKFLOW_STATE { IN_PROGRESS = 'In progress', NOT_STARTED = 'Not started', } + +export type WorkflowDict = { + [workflowId: string]: Workflow; +}; diff --git a/public/pages/workflows/workflow_list/workflow_list.tsx b/public/pages/workflows/workflow_list/workflow_list.tsx index bf2a726e..26a48477 100644 --- a/public/pages/workflows/workflow_list/workflow_list.tsx +++ b/public/pages/workflows/workflow_list/workflow_list.tsx @@ -34,28 +34,29 @@ const sorting = { */ export function WorkflowList(props: WorkflowListProps) { const { workflows } = useSelector((state: AppState) => state.workflows); - const workflowsAsList = Object.values(workflows); // search bar state const [searchQuery, setSearchQuery] = useState(''); const debounceSearchQuery = debounce((query: string) => { setSearchQuery(query); - }, 100); + }, 200); // filters state const [selectedStates, setSelectedStates] = useState( getStateOptions() ); - const [filteredWorkflows, setFilteredWorkflows] = useState( - workflowsAsList || [] - ); + const [filteredWorkflows, setFilteredWorkflows] = useState([]); - // When a filter selection or search query changes, update the list + // When any filter changes or new workflows are found, update the list useEffect(() => { setFilteredWorkflows( - fetchFilteredWorkflows(workflowsAsList, selectedStates, searchQuery) + fetchFilteredWorkflows( + Object.values(workflows), + selectedStates, + searchQuery + ) ); - }, [selectedStates, searchQuery]); + }, [selectedStates, searchQuery, workflows]); return ( diff --git a/public/pages/workflows/workflows.tsx b/public/pages/workflows/workflows.tsx index 84994cde..f05573ab 100644 --- a/public/pages/workflows/workflows.tsx +++ b/public/pages/workflows/workflows.tsx @@ -19,7 +19,7 @@ import { BREADCRUMBS } from '../../utils'; import { getCore } from '../../services'; import { WorkflowList } from './workflow_list'; import { NewWorkflow } from './new_workflow'; -import { AppState, getWorkflow } from '../../store'; +import { AppState, searchWorkflows } from '../../store'; export interface WorkflowsRouterProps {} @@ -79,8 +79,9 @@ export function Workflows(props: WorkflowsProps) { ]); }); + // On initial render: fetch all workflows useEffect(() => { - dispatch(getWorkflow('dummy-id')); + dispatch(searchWorkflows({ query: { match_all: {} } })); }, []); return ( diff --git a/public/route_service.ts b/public/route_service.ts index 3d188ae7..ad9407bb 100644 --- a/public/route_service.ts +++ b/public/route_service.ts @@ -5,15 +5,28 @@ import { CoreStart, HttpFetchError } from '../../../src/core/public'; import { - FETCH_INDICES_PATH, - GET_WORKFLOW_PATH, - SEARCH_INDICES_PATH, + CREATE_WORKFLOW_NODE_API_PATH, + DELETE_WORKFLOW_NODE_API_PATH, + CAT_INDICES_NODE_API_PATH, + GET_WORKFLOW_NODE_API_PATH, + GET_WORKFLOW_STATE_NODE_API_PATH, + SEARCH_WORKFLOWS_NODE_API_PATH, } from '../common'; +/** + * A simple client-side service interface containing all of the available node API functions. + * Exposed in services.ts. + * Example function call: getRouteService().getWorkflow() + * + * Used in redux by wrapping them in async thunk functions which mutate redux state when executed. + */ export interface RouteService { getWorkflow: (workflowId: string) => Promise; - searchIndex: (indexName: string, body: {}) => Promise; - fetchIndices: (pattern: string) => Promise; + searchWorkflows: (body: {}) => Promise; + getWorkflowState: (workflowId: string) => Promise; + createWorkflow: (body: {}) => Promise; + deleteWorkflow: (workflowId: string) => Promise; + catIndices: (pattern: string) => Promise; } export function configureRoutes(core: CoreStart): RouteService { @@ -21,17 +34,40 @@ export function configureRoutes(core: CoreStart): RouteService { getWorkflow: async (workflowId: string) => { try { const response = await core.http.get<{ respString: string }>( - `${GET_WORKFLOW_PATH}/${workflowId}` + `${GET_WORKFLOW_NODE_API_PATH}/${workflowId}` ); return response; } catch (e: any) { return e as HttpFetchError; } }, - searchIndex: async (indexName: string, body: {}) => { + searchWorkflows: async (body: {}) => { + try { + const response = await core.http.post<{ respString: string }>( + SEARCH_WORKFLOWS_NODE_API_PATH, + { + body: JSON.stringify(body), + } + ); + return response; + } catch (e: any) { + return e as HttpFetchError; + } + }, + getWorkflowState: async (workflowId: string) => { try { const response = await core.http.get<{ respString: string }>( - `${SEARCH_INDICES_PATH}/${indexName}`, + `${GET_WORKFLOW_STATE_NODE_API_PATH}/${workflowId}` + ); + return response; + } catch (e: any) { + return e as HttpFetchError; + } + }, + createWorkflow: async (body: {}) => { + try { + const response = await core.http.post<{ respString: string }>( + CREATE_WORKFLOW_NODE_API_PATH, { body: JSON.stringify(body), } @@ -41,10 +77,20 @@ export function configureRoutes(core: CoreStart): RouteService { return e as HttpFetchError; } }, - fetchIndices: async (pattern: string) => { + deleteWorkflow: async (workflowId: string) => { + try { + const response = await core.http.delete<{ respString: string }>( + `${DELETE_WORKFLOW_NODE_API_PATH}/${workflowId}` + ); + return response; + } catch (e: any) { + return e as HttpFetchError; + } + }, + catIndices: async (pattern: string) => { try { const response = await core.http.get<{ respString: string }>( - `${FETCH_INDICES_PATH}/${pattern}` + `${CAT_INDICES_NODE_API_PATH}/${pattern}` ); return response; } catch (e: any) { diff --git a/public/store/reducers/opensearch_reducer.ts b/public/store/reducers/opensearch_reducer.ts index 71e40817..d9a68e10 100644 --- a/public/store/reducers/opensearch_reducer.ts +++ b/public/store/reducers/opensearch_reducer.ts @@ -15,19 +15,19 @@ const initialState = { }; const OPENSEARCH_PREFIX = 'opensearch'; -const FETCH_INDICES_ACTION = `${OPENSEARCH_PREFIX}/fetchIndices`; +const CAT_INDICES_ACTION = `${OPENSEARCH_PREFIX}/catIndices`; -export const fetchIndices = createAsyncThunk( - FETCH_INDICES_ACTION, +export const catIndices = createAsyncThunk( + CAT_INDICES_ACTION, async (pattern: string, { rejectWithValue }) => { // defaulting to fetch everything except system indices (starting with '.') const patternString = pattern || '*,-.*'; - const response: any | HttpFetchError = await getRouteService().fetchIndices( + const response: any | HttpFetchError = await getRouteService().catIndices( patternString ); if (response instanceof HttpFetchError) { return rejectWithValue( - 'Error fetching indices: ' + response.body.message + 'Error running cat indices: ' + response.body.message ); } else { return response; @@ -41,11 +41,11 @@ const opensearchSlice = createSlice({ reducers: {}, extraReducers: (builder) => { builder - .addCase(fetchIndices.pending, (state, action) => { + .addCase(catIndices.pending, (state, action) => { state.loading = true; state.errorMessage = ''; }) - .addCase(fetchIndices.fulfilled, (state, action) => { + .addCase(catIndices.fulfilled, (state, action) => { const indicesMap = new Map(); action.payload.forEach((index: Index) => { indicesMap.set(index.name, index); @@ -54,7 +54,7 @@ const opensearchSlice = createSlice({ state.loading = false; state.errorMessage = ''; }) - .addCase(fetchIndices.rejected, (state, action) => { + .addCase(catIndices.rejected, (state, action) => { state.errorMessage = action.payload as string; state.loading = false; }); diff --git a/public/store/reducers/workflows_reducer.ts b/public/store/reducers/workflows_reducer.ts index d0cdedb9..40c80c70 100644 --- a/public/store/reducers/workflows_reducer.ts +++ b/public/store/reducers/workflows_reducer.ts @@ -13,74 +13,79 @@ import { generateId, initComponentData, WORKFLOW_STATE, + WorkflowDict, } from '../../../common'; import { HttpFetchError } from '../../../../../src/core/public'; import { getRouteService } from '../../services'; -// TODO: remove hardcoded initial state below after fetching from server side -const id1 = generateId('text_embedding_processor'); -const id2 = generateId('text_embedding_processor'); -const id3 = generateId('knn_index'); -const dummyNodes = [ - { - id: id1, - position: { x: 0, y: 500 }, - data: initComponentData(new TextEmbeddingProcessor().toObj(), id1), - type: 'customComponent', - }, - { - id: id2, - position: { x: 0, y: 200 }, - data: initComponentData(new TextEmbeddingProcessor().toObj(), id2), - type: 'customComponent', - }, - { - id: id3, - position: { x: 500, y: 500 }, - data: initComponentData(new KnnIndex().toObj(), id3), - type: 'customComponent', - }, -] as ReactFlowComponent[]; - -let workflows = {} as { [workflowId: string]: Workflow }; -workflows['workflow-1-id'] = { - name: 'Workflow-1', - id: 'workflow-1-id', - description: 'description for workflow 1', - state: WORKFLOW_STATE.SUCCEEDED, - workspaceFlowState: { - nodes: dummyNodes, - edges: [] as ReactFlowEdge[], - }, - template: {}, -} as Workflow; -workflows['workflow-2-id'] = { - name: 'Workflow-2', - id: 'workflow-2-id', - description: 'description for workflow 2', - state: WORKFLOW_STATE.FAILED, - workspaceFlowState: { - nodes: dummyNodes, - edges: [] as ReactFlowEdge[], - }, - template: {}, -} as Workflow; +// TODO: remove hardcoded initial state below after fetching from server side, +// and workflow data model interface is more defined. +// const id1 = generateId('text_embedding_processor'); +// const id2 = generateId('text_embedding_processor'); +// const id3 = generateId('knn_index'); +// const dummyNodes = [ +// { +// id: id1, +// position: { x: 0, y: 500 }, +// data: initComponentData(new TextEmbeddingProcessor().toObj(), id1), +// type: 'customComponent', +// }, +// { +// id: id2, +// position: { x: 0, y: 200 }, +// data: initComponentData(new TextEmbeddingProcessor().toObj(), id2), +// type: 'customComponent', +// }, +// { +// id: id3, +// position: { x: 500, y: 500 }, +// data: initComponentData(new KnnIndex().toObj(), id3), +// type: 'customComponent', +// }, +// ] as ReactFlowComponent[]; -const initialState = { - loading: false, - errorMessage: '', - workflows, -}; +// let workflows = {} as { [workflowId: string]: Workflow }; +// workflows['workflow-1-id'] = { +// name: 'Workflow-1', +// id: 'workflow-1-id', +// description: 'description for workflow 1', +// state: WORKFLOW_STATE.SUCCEEDED, +// workspaceFlowState: { +// nodes: dummyNodes, +// edges: [] as ReactFlowEdge[], +// }, +// template: {}, +// } as Workflow; +// workflows['workflow-2-id'] = { +// name: 'Workflow-2', +// id: 'workflow-2-id', +// description: 'description for workflow 2', +// state: WORKFLOW_STATE.FAILED, +// workspaceFlowState: { +// nodes: dummyNodes, +// edges: [] as ReactFlowEdge[], +// }, +// template: {}, +// } as Workflow; -// TODO: uncomment when workflow fetching is working // const initialState = { // loading: false, // errorMessage: '', -// workflows: {} as { [workflowId: string]: Workflow }, +// workflows, // }; -const WORKFLOWS_PREFIX = 'workflows'; -const GET_WORKFLOW_ACTION = `${WORKFLOWS_PREFIX}/getWorkflow`; +const initialState = { + loading: false, + errorMessage: '', + workflows: {} as WorkflowDict, +}; + +const WORKFLOWS_ACTION_PREFIX = 'workflows'; +const GET_WORKFLOW_ACTION = `${WORKFLOWS_ACTION_PREFIX}/getWorkflow`; +const SEARCH_WORKFLOWS_ACTION = `${WORKFLOWS_ACTION_PREFIX}/searchWorkflows`; +const GET_WORKFLOW_STATE_ACTION = `${WORKFLOWS_ACTION_PREFIX}/getWorkflowState`; +const CREATE_WORKFLOW_ACTION = `${WORKFLOWS_ACTION_PREFIX}/createWorkflow`; +const DELETE_WORKFLOW_ACTION = `${WORKFLOWS_ACTION_PREFIX}/deleteWorkflow`; export const getWorkflow = createAsyncThunk( GET_WORKFLOW_ACTION, @@ -98,28 +103,165 @@ export const getWorkflow = createAsyncThunk( } ); +export const searchWorkflows = createAsyncThunk( + SEARCH_WORKFLOWS_ACTION, + async (body: {}, { rejectWithValue }) => { + const response: + | any + | HttpFetchError = await getRouteService().searchWorkflows(body); + if (response instanceof HttpFetchError) { + return rejectWithValue( + 'Error searching workflows: ' + response.body.message + ); + } else { + return response; + } + } +); + +export const getWorkflowState = createAsyncThunk( + GET_WORKFLOW_STATE_ACTION, + async (workflowId: string, { rejectWithValue }) => { + const response: + | any + | HttpFetchError = await getRouteService().getWorkflowState(workflowId); + if (response instanceof HttpFetchError) { + return rejectWithValue( + 'Error getting workflow state: ' + response.body.message + ); + } else { + return response; + } + } +); + +export const createWorkflow = createAsyncThunk( + CREATE_WORKFLOW_ACTION, + async (body: {}, { rejectWithValue }) => { + const response: + | any + | HttpFetchError = await getRouteService().createWorkflow(body); + if (response instanceof HttpFetchError) { + return rejectWithValue( + 'Error creating workflow: ' + response.body.message + ); + } else { + return response; + } + } +); + +export const deleteWorkflow = createAsyncThunk( + DELETE_WORKFLOW_ACTION, + async (workflowId: string, { rejectWithValue }) => { + const response: + | any + | HttpFetchError = await getRouteService().deleteWorkflow(workflowId); + if (response instanceof HttpFetchError) { + return rejectWithValue( + 'Error deleting workflow: ' + response.body.message + ); + } else { + return response; + } + } +); + const workflowsSlice = createSlice({ name: 'workflows', initialState, reducers: {}, extraReducers: (builder) => { builder + // Pending states: set state consistently across all actions .addCase(getWorkflow.pending, (state, action) => { state.loading = true; state.errorMessage = ''; }) + .addCase(searchWorkflows.pending, (state, action) => { + state.loading = true; + state.errorMessage = ''; + }) + .addCase(createWorkflow.pending, (state, action) => { + state.loading = true; + state.errorMessage = ''; + }) + .addCase(deleteWorkflow.pending, (state, action) => { + state.loading = true; + state.errorMessage = ''; + }) + .addCase(getWorkflowState.pending, (state, action) => { + state.loading = true; + state.errorMessage = ''; + }) + // Fulfilled states: mutate state depending on the action type + // and payloads .addCase(getWorkflow.fulfilled, (state, action) => { - const workflow = action.payload; - state.workflows = { - ...state.workflows, - [workflow.id]: workflow, - }; + // TODO: add logic to mutate state + // const workflow = action.payload; + // state.workflows = { + // ...state.workflows, + // [workflow.id]: workflow, + // }; + state.loading = false; + state.errorMessage = ''; + }) + .addCase(searchWorkflows.fulfilled, (state, action) => { + const { workflows } = action.payload as { workflows: WorkflowDict }; + state.workflows = workflows; + state.loading = false; + state.errorMessage = ''; + }) + .addCase(getWorkflowState.fulfilled, (state, action) => { + // TODO: add logic to mutate state + // const workflow = action.payload; + // state.workflows = { + // ...state.workflows, + // [workflow.id]: workflow, + // }; state.loading = false; state.errorMessage = ''; }) + .addCase(createWorkflow.fulfilled, (state, action) => { + // TODO: add logic to mutate state + // const workflow = action.payload; + // state.workflows = { + // ...state.workflows, + // [workflow.id]: workflow, + // }; + state.loading = false; + state.errorMessage = ''; + }) + .addCase(deleteWorkflow.fulfilled, (state, action) => { + // TODO: add logic to mutate state + // const workflow = action.payload; + // state.workflows = { + // ...state.workflows, + // [workflow.id]: workflow, + // }; + state.loading = false; + state.errorMessage = ''; + }) + // Rejected states: set state consistently across all actions .addCase(getWorkflow.rejected, (state, action) => { state.errorMessage = action.payload as string; state.loading = false; + }) + .addCase(searchWorkflows.rejected, (state, action) => { + state.errorMessage = action.payload as string; + state.loading = false; + }) + .addCase(getWorkflowState.rejected, (state, action) => { + state.errorMessage = action.payload as string; + state.loading = false; + }) + .addCase(createWorkflow.rejected, (state, action) => { + state.errorMessage = action.payload as string; + state.loading = false; + }) + .addCase(deleteWorkflow.rejected, (state, action) => { + state.errorMessage = action.payload as string; + state.loading = false; }); }, }); diff --git a/server/cluster/flow_framework_plugin.ts b/server/cluster/flow_framework_plugin.ts index 82ba52e2..084d3bbf 100644 --- a/server/cluster/flow_framework_plugin.ts +++ b/server/cluster/flow_framework_plugin.ts @@ -3,8 +3,17 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX } from '../utils'; +import { + FLOW_FRAMEWORK_SEARCH_WORKFLOWS_ROUTE, + FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX, +} from '../../common'; +/** + * Used during the plugin's setup() lifecycle phase to register various client actions + * representing Flow Framework plugin APIs. These are then exposed and used on the + * server-side when processing node APIs - see server/routes/flow_framework_routes_service + * for examples. + */ export function flowFrameworkPlugin(Client: any, config: any, components: any) { const ca = components.clientAction.factory; @@ -23,4 +32,47 @@ export function flowFrameworkPlugin(Client: any, config: any, components: any) { }, method: 'GET', }); + + flowFramework.searchWorkflows = ca({ + url: { + fmt: FLOW_FRAMEWORK_SEARCH_WORKFLOWS_ROUTE, + }, + needBody: true, + // Exposed client rejects making GET requests with a body. So, we use POST + method: 'POST', + }); + + flowFramework.getWorkflowState = ca({ + url: { + fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>/_status`, + req: { + workflow_id: { + type: 'string', + required: true, + }, + }, + }, + method: 'GET', + }); + + flowFramework.createWorkflow = ca({ + url: { + fmt: FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX, + }, + needBody: true, + method: 'POST', + }); + + flowFramework.deleteWorkflow = ca({ + url: { + fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>`, + req: { + workflow_id: { + type: 'string', + required: true, + }, + }, + }, + method: 'DELETE', + }); } diff --git a/server/routes/flow_framework_routes_service.ts b/server/routes/flow_framework_routes_service.ts index 37fce010..ece8fc3e 100644 --- a/server/routes/flow_framework_routes_service.ts +++ b/server/routes/flow_framework_routes_service.ts @@ -11,16 +11,27 @@ import { OpenSearchDashboardsRequest, OpenSearchDashboardsResponseFactory, } from '../../../../src/core/server'; -import { GET_WORKFLOW_PATH } from '../../common'; -import { generateCustomError } from './helpers'; +import { + CREATE_WORKFLOW_NODE_API_PATH, + DELETE_WORKFLOW_NODE_API_PATH, + GET_WORKFLOW_NODE_API_PATH, + GET_WORKFLOW_STATE_NODE_API_PATH, + SEARCH_WORKFLOWS_NODE_API_PATH, + WorkflowDict, +} from '../../common'; +import { generateCustomError, toWorkflowObj } from './helpers'; +/** + * Server-side routes to process flow-framework-related node API calls and execute the + * corresponding API calls against the OpenSearch cluster. + */ export function registerFlowFrameworkRoutes( router: IRouter, flowFrameworkRoutesService: FlowFrameworkRoutesService ): void { router.get( { - path: `${GET_WORKFLOW_PATH}/{workflow_id}`, + path: `${GET_WORKFLOW_NODE_API_PATH}/{workflow_id}`, validate: { params: schema.object({ workflow_id: schema.string(), @@ -29,6 +40,50 @@ export function registerFlowFrameworkRoutes( }, flowFrameworkRoutesService.getWorkflow ); + + router.post( + { + path: SEARCH_WORKFLOWS_NODE_API_PATH, + validate: { + body: schema.any(), + }, + }, + flowFrameworkRoutesService.searchWorkflows + ); + + router.get( + { + path: `${GET_WORKFLOW_STATE_NODE_API_PATH}/{workflow_id}`, + validate: { + params: schema.object({ + workflow_id: schema.string(), + }), + }, + }, + flowFrameworkRoutesService.getWorkflowState + ); + + router.post( + { + path: CREATE_WORKFLOW_NODE_API_PATH, + validate: { + body: schema.any(), + }, + }, + flowFrameworkRoutesService.createWorkflow + ); + + router.delete( + { + path: `${DELETE_WORKFLOW_NODE_API_PATH}/{workflow_id}`, + validate: { + params: schema.object({ + workflow_id: schema.string(), + }), + }, + }, + flowFrameworkRoutesService.deleteWorkflow + ); } export class FlowFrameworkRoutesService { @@ -38,21 +93,99 @@ export class FlowFrameworkRoutesService { this.client = client; } + // TODO: test e2e getWorkflow = async ( context: RequestHandlerContext, req: OpenSearchDashboardsRequest, res: OpenSearchDashboardsResponseFactory ): Promise> => { - // eslint-disable-next-line @typescript-eslint/naming-convention - const { workflow_id } = req.params; - console.log('workflow ID: ', workflow_id); - + const { workflow_id } = req.params as { workflow_id: string }; try { - console.log('making rquest...'); const response = await this.client .asScoped(req) .callAsCurrentUser('flowFramework.getWorkflow', { workflow_id }); console.log('response from get workflow: ', response); + // TODO: format response + return res.ok({ body: response }); + } catch (err: any) { + return generateCustomError(res, err); + } + }; + + searchWorkflows = async ( + context: RequestHandlerContext, + req: OpenSearchDashboardsRequest, + res: OpenSearchDashboardsResponseFactory + ): Promise> => { + const body = req.body; + try { + const response = await this.client + .asScoped(req) + .callAsCurrentUser('flowFramework.searchWorkflows', { body }); + const workflowHits = response.hits.hits as any[]; + const workflowsMap = {} as WorkflowDict; + workflowHits.forEach((workflowHit: any) => { + workflowsMap[workflowHit._id] = toWorkflowObj(workflowHit); + }); + + return res.ok({ body: { workflows: workflowsMap } }); + } catch (err: any) { + return generateCustomError(res, err); + } + }; + + // TODO: test e2e + getWorkflowState = async ( + context: RequestHandlerContext, + req: OpenSearchDashboardsRequest, + res: OpenSearchDashboardsResponseFactory + ): Promise> => { + const { workflow_id } = req.params as { workflow_id: string }; + try { + const response = await this.client + .asScoped(req) + .callAsCurrentUser('flowFramework.getWorkflowState', { workflow_id }); + console.log('response from get workflow state: ', response); + // TODO: format response + return res.ok({ body: response }); + } catch (err: any) { + return generateCustomError(res, err); + } + }; + + // TODO: test e2e + createWorkflow = async ( + context: RequestHandlerContext, + req: OpenSearchDashboardsRequest, + res: OpenSearchDashboardsResponseFactory + ): Promise> => { + const body = req.body; + + try { + const response = await this.client + .asScoped(req) + .callAsCurrentUser('flowFramework.createWorkflow', { body }); + console.log('response from create workflow: ', response); + // TODO: format response + return res.ok({ body: response }); + } catch (err: any) { + return generateCustomError(res, err); + } + }; + + // TODO: test e2e + deleteWorkflow = async ( + context: RequestHandlerContext, + req: OpenSearchDashboardsRequest, + res: OpenSearchDashboardsResponseFactory + ): Promise> => { + const { workflow_id } = req.params as { workflow_id: string }; + try { + const response = await this.client + .asScoped(req) + .callAsCurrentUser('flowFramework.deleteWorkflow', { workflow_id }); + console.log('response from delete workflow: ', response); + // TODO: format response return res.ok({ body: response }); } catch (err: any) { return generateCustomError(res, err); diff --git a/server/routes/helpers.ts b/server/routes/helpers.ts index 4ffa1ea1..e67d90f4 100644 --- a/server/routes/helpers.ts +++ b/server/routes/helpers.ts @@ -3,6 +3,8 @@ * SPDX-License-Identifier: Apache-2.0 */ +import { WORKFLOW_STATE, Workflow } from '../../common'; + // OSD does not provide an interface for this response, but this is following the suggested // implementations. To prevent typescript complaining, leaving as loosely-typed 'any' export function generateCustomError(res: any, err: any) { @@ -16,3 +18,19 @@ export function generateCustomError(res: any, err: any) { }, }); } + +export function toWorkflowObj(workflowHit: any): Workflow { + // TODO: update schema parsing after hit schema has been updated. + // https://github.com/opensearch-project/flow-framework/issues/546 + const hitSource = workflowHit.fields.filter[0]; + // const hitSource = workflowHit._source; + return { + id: workflowHit._id, + name: hitSource.name, + description: hitSource.description || '', + // TODO: update below values after frontend Workflow interface is finalized + template: {}, + lastUpdated: 1234, + state: WORKFLOW_STATE.SUCCEEDED, + } as Workflow; +} diff --git a/server/routes/opensearch_routes_service.ts b/server/routes/opensearch_routes_service.ts index 49fcc954..d162e3ce 100644 --- a/server/routes/opensearch_routes_service.ts +++ b/server/routes/opensearch_routes_service.ts @@ -12,36 +12,27 @@ import { OpenSearchDashboardsRequest, OpenSearchDashboardsResponseFactory, } from '../../../../src/core/server'; -import { SEARCH_INDICES_PATH, FETCH_INDICES_PATH, Index } from '../../common'; +import { CAT_INDICES_NODE_API_PATH, Index } from '../../common'; import { generateCustomError } from './helpers'; +/** + * Server-side routes to process OpenSearch-related node API calls and execute the + * corresponding API calls against the OpenSearch cluster. + */ export function registerOpenSearchRoutes( router: IRouter, opensearchRoutesService: OpenSearchRoutesService ): void { router.get( { - path: `${SEARCH_INDICES_PATH}/{index_name}`, - validate: { - params: schema.object({ - index_name: schema.string(), - }), - body: schema.any(), - }, - }, - opensearchRoutesService.searchIndex - ); - - router.get( - { - path: `${FETCH_INDICES_PATH}/{pattern}`, + path: `${CAT_INDICES_NODE_API_PATH}/{pattern}`, validate: { params: schema.object({ pattern: schema.string(), }), }, }, - opensearchRoutesService.fetchIndices + opensearchRoutesService.catIndices ); } @@ -52,36 +43,12 @@ export class OpenSearchRoutesService { this.client = client; } - searchIndex = async ( - context: RequestHandlerContext, - req: OpenSearchDashboardsRequest, - res: OpenSearchDashboardsResponseFactory - ): Promise> => { - // eslint-disable-next-line @typescript-eslint/naming-convention - const { index_name } = req.params; - const body = req.body; - - const params = { - index: index_name, - body, - } as SearchRequest; - - try { - const response = await this.client - .asScoped(req) - .callAsCurrentUser('search', params); - return res.ok({ body: response }); - } catch (err: any) { - return generateCustomError(res, err); - } - }; - - fetchIndices = async ( + catIndices = async ( context: RequestHandlerContext, req: OpenSearchDashboardsRequest, res: OpenSearchDashboardsResponseFactory ): Promise> => { - const { pattern } = req.params; + const { pattern } = req.params as { pattern: string }; try { const response = await this.client .asScoped(req) diff --git a/server/utils/constants.ts b/server/utils/constants.ts deleted file mode 100644 index de153ce1..00000000 --- a/server/utils/constants.ts +++ /dev/null @@ -1,7 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -export const FLOW_FRAMEWORK_API_ROUTE_PREFIX = '/_plugins/_flow_framework'; -export const FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX = `${FLOW_FRAMEWORK_API_ROUTE_PREFIX}/workflow`; diff --git a/server/utils/index.ts b/server/utils/index.ts deleted file mode 100644 index 2e209c79..00000000 --- a/server/utils/index.ts +++ /dev/null @@ -1,6 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -export * from './constants';