diff --git a/common/constants.ts b/common/constants.ts index 86bacf99..033e8a0c 100644 --- a/common/constants.ts +++ b/common/constants.ts @@ -5,7 +5,26 @@ export const PLUGIN_ID = 'flow-framework'; +/** + * BACKEND/CLUSTER APIs + */ +export const FLOW_FRAMEWORK_API_ROUTE_PREFIX = '/_plugins/_flow_framework'; +export const FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX = `${FLOW_FRAMEWORK_API_ROUTE_PREFIX}/workflow`; +export const FLOW_FRAMEWORK_SEARCH_WORKFLOWS_ROUTE = `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/_search`; + +/** + * NODE APIs + */ export const BASE_NODE_API_PATH = '/api/flow_framework'; -export const BASE_INDICES_NODE_API_PATH = `${BASE_NODE_API_PATH}/indices`; -export const SEARCH_INDICES_PATH = `${BASE_INDICES_NODE_API_PATH}/search`; -export const FETCH_INDICES_PATH = `${BASE_INDICES_NODE_API_PATH}/fetch`; + +// OpenSearch node APIs +export const BASE_OPENSEARCH_NODE_API_PATH = `${BASE_NODE_API_PATH}/opensearch`; +export const CAT_INDICES_NODE_API_PATH = `${BASE_OPENSEARCH_NODE_API_PATH}/catIndices`; + +// Flow Framework node APIs +export const BASE_WORKFLOW_NODE_API_PATH = `${BASE_NODE_API_PATH}/workflow`; +export const GET_WORKFLOW_NODE_API_PATH = `${BASE_WORKFLOW_NODE_API_PATH}`; +export const SEARCH_WORKFLOWS_NODE_API_PATH = `${BASE_WORKFLOW_NODE_API_PATH}/search`; +export const GET_WORKFLOW_STATE_NODE_API_PATH = `${BASE_WORKFLOW_NODE_API_PATH}/state`; +export const CREATE_WORKFLOW_NODE_API_PATH = `${BASE_WORKFLOW_NODE_API_PATH}/create`; +export const DELETE_WORKFLOW_NODE_API_PATH = `${BASE_WORKFLOW_NODE_API_PATH}/delete`; diff --git a/common/interfaces.ts b/common/interfaces.ts index 7ecffadc..19c8011a 100644 --- a/common/interfaces.ts +++ b/common/interfaces.ts @@ -102,3 +102,7 @@ export enum WORKFLOW_STATE { IN_PROGRESS = 'In progress', NOT_STARTED = 'Not started', } + +export type WorkflowDict = { + [workflowId: string]: Workflow; +}; diff --git a/public/pages/workflow_detail/workflow_detail.tsx b/public/pages/workflow_detail/workflow_detail.tsx index 085c7192..e57bcc83 100644 --- a/public/pages/workflow_detail/workflow_detail.tsx +++ b/public/pages/workflow_detail/workflow_detail.tsx @@ -49,9 +49,7 @@ function replaceActiveTab(activeTab: string, props: WorkflowDetailProps) { export function WorkflowDetail(props: WorkflowDetailProps) { const { workflows } = useSelector((state: AppState) => state.workflows); - const workflow = workflows.find( - (wf) => wf.id === props.match?.params?.workflowId - ); + const workflow = workflows[props.match?.params?.workflowId]; const workflowName = workflow ? workflow.name : ''; const tabFromUrl = queryString.parse(useLocation().search)[ diff --git a/public/pages/workflows/workflow_list/workflow_list.tsx b/public/pages/workflows/workflow_list/workflow_list.tsx index 8c52a9bc..26a48477 100644 --- a/public/pages/workflows/workflow_list/workflow_list.tsx +++ b/public/pages/workflows/workflow_list/workflow_list.tsx @@ -39,22 +39,24 @@ export function WorkflowList(props: WorkflowListProps) { const [searchQuery, setSearchQuery] = useState(''); const debounceSearchQuery = debounce((query: string) => { setSearchQuery(query); - }, 100); + }, 200); // filters state const [selectedStates, setSelectedStates] = useState( getStateOptions() ); - const [filteredWorkflows, setFilteredWorkflows] = useState( - workflows || [] - ); + const [filteredWorkflows, setFilteredWorkflows] = useState([]); - // When a filter selection or search query changes, update the list + // When any filter changes or new workflows are found, update the list useEffect(() => { setFilteredWorkflows( - fetchFilteredWorkflows(workflows, selectedStates, searchQuery) + fetchFilteredWorkflows( + Object.values(workflows), + selectedStates, + searchQuery + ) ); - }, [selectedStates, searchQuery]); + }, [selectedStates, searchQuery, workflows]); return ( diff --git a/public/pages/workflows/workflows.tsx b/public/pages/workflows/workflows.tsx index c3a6d50a..f05573ab 100644 --- a/public/pages/workflows/workflows.tsx +++ b/public/pages/workflows/workflows.tsx @@ -14,12 +14,12 @@ import { EuiSpacer, } from '@elastic/eui'; import queryString from 'query-string'; -import { useSelector } from 'react-redux'; +import { useDispatch, useSelector } from 'react-redux'; import { BREADCRUMBS } from '../../utils'; import { getCore } from '../../services'; import { WorkflowList } from './workflow_list'; import { NewWorkflow } from './new_workflow'; -import { AppState } from '../../store'; +import { AppState, searchWorkflows } from '../../store'; export interface WorkflowsRouterProps {} @@ -47,6 +47,7 @@ function replaceActiveTab(activeTab: string, props: WorkflowsProps) { * to get started on a new workflow. */ export function Workflows(props: WorkflowsProps) { + const dispatch = useDispatch(); const { workflows } = useSelector((state: AppState) => state.workflows); const tabFromUrl = queryString.parse(useLocation().search)[ @@ -61,7 +62,7 @@ export function Workflows(props: WorkflowsProps) { !selectedTabId || !Object.values(WORKFLOWS_TAB).includes(selectedTabId) ) { - if (workflows?.length > 0) { + if (Object.keys(workflows).length > 0) { setSelectedTabId(WORKFLOWS_TAB.MANAGE); replaceActiveTab(WORKFLOWS_TAB.MANAGE, props); } else { @@ -78,6 +79,11 @@ export function Workflows(props: WorkflowsProps) { ]); }); + // On initial render: fetch all workflows + useEffect(() => { + dispatch(searchWorkflows({ query: { match_all: {} } })); + }, []); + return ( diff --git a/public/route_service.ts b/public/route_service.ts index 54b2a76e..ad9407bb 100644 --- a/public/route_service.ts +++ b/public/route_service.ts @@ -4,19 +4,47 @@ */ import { CoreStart, HttpFetchError } from '../../../src/core/public'; -import { FETCH_INDICES_PATH, SEARCH_INDICES_PATH } from '../common'; +import { + CREATE_WORKFLOW_NODE_API_PATH, + DELETE_WORKFLOW_NODE_API_PATH, + CAT_INDICES_NODE_API_PATH, + GET_WORKFLOW_NODE_API_PATH, + GET_WORKFLOW_STATE_NODE_API_PATH, + SEARCH_WORKFLOWS_NODE_API_PATH, +} from '../common'; +/** + * A simple client-side service interface containing all of the available node API functions. + * Exposed in services.ts. + * Example function call: getRouteService().getWorkflow() + * + * Used in redux by wrapping them in async thunk functions which mutate redux state when executed. + */ export interface RouteService { - searchIndex: (indexName: string, body: {}) => Promise; - fetchIndices: (pattern: string) => Promise; + getWorkflow: (workflowId: string) => Promise; + searchWorkflows: (body: {}) => Promise; + getWorkflowState: (workflowId: string) => Promise; + createWorkflow: (body: {}) => Promise; + deleteWorkflow: (workflowId: string) => Promise; + catIndices: (pattern: string) => Promise; } export function configureRoutes(core: CoreStart): RouteService { return { - searchIndex: async (indexName: string, body: {}) => { + getWorkflow: async (workflowId: string) => { + try { + const response = await core.http.get<{ respString: string }>( + `${GET_WORKFLOW_NODE_API_PATH}/${workflowId}` + ); + return response; + } catch (e: any) { + return e as HttpFetchError; + } + }, + searchWorkflows: async (body: {}) => { try { const response = await core.http.post<{ respString: string }>( - `${SEARCH_INDICES_PATH}/${indexName}`, + SEARCH_WORKFLOWS_NODE_API_PATH, { body: JSON.stringify(body), } @@ -26,10 +54,43 @@ export function configureRoutes(core: CoreStart): RouteService { return e as HttpFetchError; } }, - fetchIndices: async (pattern: string) => { + getWorkflowState: async (workflowId: string) => { + try { + const response = await core.http.get<{ respString: string }>( + `${GET_WORKFLOW_STATE_NODE_API_PATH}/${workflowId}` + ); + return response; + } catch (e: any) { + return e as HttpFetchError; + } + }, + createWorkflow: async (body: {}) => { try { const response = await core.http.post<{ respString: string }>( - `${FETCH_INDICES_PATH}/${pattern}` + CREATE_WORKFLOW_NODE_API_PATH, + { + body: JSON.stringify(body), + } + ); + return response; + } catch (e: any) { + return e as HttpFetchError; + } + }, + deleteWorkflow: async (workflowId: string) => { + try { + const response = await core.http.delete<{ respString: string }>( + `${DELETE_WORKFLOW_NODE_API_PATH}/${workflowId}` + ); + return response; + } catch (e: any) { + return e as HttpFetchError; + } + }, + catIndices: async (pattern: string) => { + try { + const response = await core.http.get<{ respString: string }>( + `${CAT_INDICES_NODE_API_PATH}/${pattern}` ); return response; } catch (e: any) { diff --git a/public/store/reducers/opensearch_reducer.ts b/public/store/reducers/opensearch_reducer.ts index 783ed400..d9a68e10 100644 --- a/public/store/reducers/opensearch_reducer.ts +++ b/public/store/reducers/opensearch_reducer.ts @@ -6,6 +6,7 @@ import { createAsyncThunk, createSlice } from '@reduxjs/toolkit'; import { getRouteService } from '../../services'; import { Index } from '../../../common'; +import { HttpFetchError } from '../../../../../src/core/public'; const initialState = { loading: false, @@ -14,15 +15,23 @@ const initialState = { }; const OPENSEARCH_PREFIX = 'opensearch'; -const FETCH_INDICES_ACTION = `${OPENSEARCH_PREFIX}/fetchIndices`; +const CAT_INDICES_ACTION = `${OPENSEARCH_PREFIX}/catIndices`; -export const fetchIndices = createAsyncThunk( - FETCH_INDICES_ACTION, - async (pattern?: string) => { +export const catIndices = createAsyncThunk( + CAT_INDICES_ACTION, + async (pattern: string, { rejectWithValue }) => { // defaulting to fetch everything except system indices (starting with '.') const patternString = pattern || '*,-.*'; - const response = getRouteService().fetchIndices(patternString); - return response; + const response: any | HttpFetchError = await getRouteService().catIndices( + patternString + ); + if (response instanceof HttpFetchError) { + return rejectWithValue( + 'Error running cat indices: ' + response.body.message + ); + } else { + return response; + } } ); @@ -32,18 +41,20 @@ const opensearchSlice = createSlice({ reducers: {}, extraReducers: (builder) => { builder - .addCase(fetchIndices.pending, (state, action) => { + .addCase(catIndices.pending, (state, action) => { state.loading = true; + state.errorMessage = ''; }) - .addCase(fetchIndices.fulfilled, (state, action) => { + .addCase(catIndices.fulfilled, (state, action) => { const indicesMap = new Map(); action.payload.forEach((index: Index) => { indicesMap.set(index.name, index); }); state.indices = Object.fromEntries(indicesMap.entries()); state.loading = false; + state.errorMessage = ''; }) - .addCase(fetchIndices.rejected, (state, action) => { + .addCase(catIndices.rejected, (state, action) => { state.errorMessage = action.payload as string; state.loading = false; }); diff --git a/public/store/reducers/workflows_reducer.ts b/public/store/reducers/workflows_reducer.ts index ca10aed7..40c80c70 100644 --- a/public/store/reducers/workflows_reducer.ts +++ b/public/store/reducers/workflows_reducer.ts @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { createSlice } from '@reduxjs/toolkit'; +import { createAsyncThunk, createSlice } from '@reduxjs/toolkit'; import { Workflow, ReactFlowComponent, @@ -13,74 +13,257 @@ import { generateId, initComponentData, WORKFLOW_STATE, + WorkflowDict, } from '../../../common'; +import { HttpFetchError } from '../../../../../src/core/public'; +import { getRouteService } from '../../services'; -// TODO: remove after fetching from server-side -const id1 = generateId('text_embedding_processor'); -const id2 = generateId('text_embedding_processor'); -const id3 = generateId('knn_index'); -const dummyNodes = [ - { - id: id1, - position: { x: 0, y: 500 }, - data: initComponentData(new TextEmbeddingProcessor().toObj(), id1), - type: 'customComponent', - }, - { - id: id2, - position: { x: 0, y: 200 }, - data: initComponentData(new TextEmbeddingProcessor().toObj(), id2), - type: 'customComponent', - }, - { - id: id3, - position: { x: 500, y: 500 }, - data: initComponentData(new KnnIndex().toObj(), id3), - type: 'customComponent', - }, -] as ReactFlowComponent[]; +// TODO: remove hardcoded initial state below after fetching from server side, +// and workflow data model interface is more defined. +// const id1 = generateId('text_embedding_processor'); +// const id2 = generateId('text_embedding_processor'); +// const id3 = generateId('knn_index'); +// const dummyNodes = [ +// { +// id: id1, +// position: { x: 0, y: 500 }, +// data: initComponentData(new TextEmbeddingProcessor().toObj(), id1), +// type: 'customComponent', +// }, +// { +// id: id2, +// position: { x: 0, y: 200 }, +// data: initComponentData(new TextEmbeddingProcessor().toObj(), id2), +// type: 'customComponent', +// }, +// { +// id: id3, +// position: { x: 500, y: 500 }, +// data: initComponentData(new KnnIndex().toObj(), id3), +// type: 'customComponent', +// }, +// ] as ReactFlowComponent[]; + +// let workflows = {} as { [workflowId: string]: Workflow }; +// workflows['workflow-1-id'] = { +// name: 'Workflow-1', +// id: 'workflow-1-id', +// description: 'description for workflow 1', +// state: WORKFLOW_STATE.SUCCEEDED, +// workspaceFlowState: { +// nodes: dummyNodes, +// edges: [] as ReactFlowEdge[], +// }, +// template: {}, +// } as Workflow; +// workflows['workflow-2-id'] = { +// name: 'Workflow-2', +// id: 'workflow-2-id', +// description: 'description for workflow 2', +// state: WORKFLOW_STATE.FAILED, +// workspaceFlowState: { +// nodes: dummyNodes, +// edges: [] as ReactFlowEdge[], +// }, +// template: {}, +// } as Workflow; + +// const initialState = { +// loading: false, +// errorMessage: '', +// workflows, +// }; const initialState = { - // TODO: fetch from server-side later - workflows: [ - { - name: 'Workflow-1', - id: 'workflow-1-id', - description: 'description for workflow 1', - state: WORKFLOW_STATE.SUCCEEDED, - workspaceFlowState: { - nodes: dummyNodes, - edges: [] as ReactFlowEdge[], - }, - template: {}, - }, - { - name: 'Workflow-2', - id: 'workflow-2-id', - description: 'description for workflow 2', - state: WORKFLOW_STATE.FAILED, - workspaceFlowState: { - nodes: dummyNodes, - edges: [] as ReactFlowEdge[], - }, - template: {}, - }, - ] as Workflow[], loading: false, + errorMessage: '', + workflows: {} as WorkflowDict, }; +const WORKFLOWS_ACTION_PREFIX = 'workflows'; +const GET_WORKFLOW_ACTION = `${WORKFLOWS_ACTION_PREFIX}/getWorkflow`; +const SEARCH_WORKFLOWS_ACTION = `${WORKFLOWS_ACTION_PREFIX}/searchWorkflows`; +const GET_WORKFLOW_STATE_ACTION = `${WORKFLOWS_ACTION_PREFIX}/getWorkflowState`; +const CREATE_WORKFLOW_ACTION = `${WORKFLOWS_ACTION_PREFIX}/createWorkflow`; +const DELETE_WORKFLOW_ACTION = `${WORKFLOWS_ACTION_PREFIX}/deleteWorkflow`; + +export const getWorkflow = createAsyncThunk( + GET_WORKFLOW_ACTION, + async (workflowId: string, { rejectWithValue }) => { + const response: any | HttpFetchError = await getRouteService().getWorkflow( + workflowId + ); + if (response instanceof HttpFetchError) { + return rejectWithValue( + 'Error getting workflow: ' + response.body.message + ); + } else { + return response; + } + } +); + +export const searchWorkflows = createAsyncThunk( + SEARCH_WORKFLOWS_ACTION, + async (body: {}, { rejectWithValue }) => { + const response: + | any + | HttpFetchError = await getRouteService().searchWorkflows(body); + if (response instanceof HttpFetchError) { + return rejectWithValue( + 'Error searching workflows: ' + response.body.message + ); + } else { + return response; + } + } +); + +export const getWorkflowState = createAsyncThunk( + GET_WORKFLOW_STATE_ACTION, + async (workflowId: string, { rejectWithValue }) => { + const response: + | any + | HttpFetchError = await getRouteService().getWorkflowState(workflowId); + if (response instanceof HttpFetchError) { + return rejectWithValue( + 'Error getting workflow state: ' + response.body.message + ); + } else { + return response; + } + } +); + +export const createWorkflow = createAsyncThunk( + CREATE_WORKFLOW_ACTION, + async (body: {}, { rejectWithValue }) => { + const response: + | any + | HttpFetchError = await getRouteService().createWorkflow(body); + if (response instanceof HttpFetchError) { + return rejectWithValue( + 'Error creating workflow: ' + response.body.message + ); + } else { + return response; + } + } +); + +export const deleteWorkflow = createAsyncThunk( + DELETE_WORKFLOW_ACTION, + async (workflowId: string, { rejectWithValue }) => { + const response: + | any + | HttpFetchError = await getRouteService().deleteWorkflow(workflowId); + if (response instanceof HttpFetchError) { + return rejectWithValue( + 'Error deleting workflow: ' + response.body.message + ); + } else { + return response; + } + } +); + const workflowsSlice = createSlice({ name: 'workflows', initialState, - reducers: { - setWorkflows(state, action) { - state.workflows = action.payload; - }, - setLoading(state, action) { - state.loading = action.payload; - }, + reducers: {}, + extraReducers: (builder) => { + builder + // Pending states: set state consistently across all actions + .addCase(getWorkflow.pending, (state, action) => { + state.loading = true; + state.errorMessage = ''; + }) + .addCase(searchWorkflows.pending, (state, action) => { + state.loading = true; + state.errorMessage = ''; + }) + .addCase(createWorkflow.pending, (state, action) => { + state.loading = true; + state.errorMessage = ''; + }) + .addCase(deleteWorkflow.pending, (state, action) => { + state.loading = true; + state.errorMessage = ''; + }) + .addCase(getWorkflowState.pending, (state, action) => { + state.loading = true; + state.errorMessage = ''; + }) + // Fulfilled states: mutate state depending on the action type + // and payloads + .addCase(getWorkflow.fulfilled, (state, action) => { + // TODO: add logic to mutate state + // const workflow = action.payload; + // state.workflows = { + // ...state.workflows, + // [workflow.id]: workflow, + // }; + state.loading = false; + state.errorMessage = ''; + }) + .addCase(searchWorkflows.fulfilled, (state, action) => { + const { workflows } = action.payload as { workflows: WorkflowDict }; + state.workflows = workflows; + state.loading = false; + state.errorMessage = ''; + }) + .addCase(getWorkflowState.fulfilled, (state, action) => { + // TODO: add logic to mutate state + // const workflow = action.payload; + // state.workflows = { + // ...state.workflows, + // [workflow.id]: workflow, + // }; + state.loading = false; + state.errorMessage = ''; + }) + .addCase(createWorkflow.fulfilled, (state, action) => { + // TODO: add logic to mutate state + // const workflow = action.payload; + // state.workflows = { + // ...state.workflows, + // [workflow.id]: workflow, + // }; + state.loading = false; + state.errorMessage = ''; + }) + .addCase(deleteWorkflow.fulfilled, (state, action) => { + // TODO: add logic to mutate state + // const workflow = action.payload; + // state.workflows = { + // ...state.workflows, + // [workflow.id]: workflow, + // }; + state.loading = false; + state.errorMessage = ''; + }) + // Rejected states: set state consistently across all actions + .addCase(getWorkflow.rejected, (state, action) => { + state.errorMessage = action.payload as string; + state.loading = false; + }) + .addCase(searchWorkflows.rejected, (state, action) => { + state.errorMessage = action.payload as string; + state.loading = false; + }) + .addCase(getWorkflowState.rejected, (state, action) => { + state.errorMessage = action.payload as string; + state.loading = false; + }) + .addCase(createWorkflow.rejected, (state, action) => { + state.errorMessage = action.payload as string; + state.loading = false; + }) + .addCase(deleteWorkflow.rejected, (state, action) => { + state.errorMessage = action.payload as string; + state.loading = false; + }); }, }); export const workflowsReducer = workflowsSlice.reducer; -export const { setWorkflows, setLoading } = workflowsSlice.actions; diff --git a/server/cluster/flow_framework_plugin.ts b/server/cluster/flow_framework_plugin.ts new file mode 100644 index 00000000..084d3bbf --- /dev/null +++ b/server/cluster/flow_framework_plugin.ts @@ -0,0 +1,78 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { + FLOW_FRAMEWORK_SEARCH_WORKFLOWS_ROUTE, + FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX, +} from '../../common'; + +/** + * Used during the plugin's setup() lifecycle phase to register various client actions + * representing Flow Framework plugin APIs. These are then exposed and used on the + * server-side when processing node APIs - see server/routes/flow_framework_routes_service + * for examples. + */ +export function flowFrameworkPlugin(Client: any, config: any, components: any) { + const ca = components.clientAction.factory; + + Client.prototype.flowFramework = components.clientAction.namespaceFactory(); + const flowFramework = Client.prototype.flowFramework.prototype; + + flowFramework.getWorkflow = ca({ + url: { + fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>`, + req: { + workflow_id: { + type: 'string', + required: true, + }, + }, + }, + method: 'GET', + }); + + flowFramework.searchWorkflows = ca({ + url: { + fmt: FLOW_FRAMEWORK_SEARCH_WORKFLOWS_ROUTE, + }, + needBody: true, + // Exposed client rejects making GET requests with a body. So, we use POST + method: 'POST', + }); + + flowFramework.getWorkflowState = ca({ + url: { + fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>/_status`, + req: { + workflow_id: { + type: 'string', + required: true, + }, + }, + }, + method: 'GET', + }); + + flowFramework.createWorkflow = ca({ + url: { + fmt: FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX, + }, + needBody: true, + method: 'POST', + }); + + flowFramework.deleteWorkflow = ca({ + url: { + fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>`, + req: { + workflow_id: { + type: 'string', + required: true, + }, + }, + }, + method: 'DELETE', + }); +} diff --git a/server/cluster/index.ts b/server/cluster/index.ts new file mode 100644 index 00000000..5ff70614 --- /dev/null +++ b/server/cluster/index.ts @@ -0,0 +1,6 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +export * from './flow_framework_plugin'; diff --git a/server/plugin.ts b/server/plugin.ts index 2778db98..2772d5aa 100644 --- a/server/plugin.ts +++ b/server/plugin.ts @@ -10,12 +10,20 @@ import { Plugin, Logger, } from '../../../src/core/server'; - +import { first } from 'rxjs/operators'; +import { flowFrameworkPlugin } from './cluster'; import { FlowFrameworkDashboardsPluginSetup, FlowFrameworkDashboardsPluginStart, } from './types'; -import { registerOpenSearchRoutes } from './routes'; +import { + registerOpenSearchRoutes, + registerFlowFrameworkRoutes, + OpenSearchRoutesService, + FlowFrameworkRoutesService, +} from './routes'; + +import { ILegacyClusterClient } from '../../../src/core/server/'; export class FlowFrameworkDashboardsPlugin implements @@ -24,17 +32,35 @@ export class FlowFrameworkDashboardsPlugin FlowFrameworkDashboardsPluginStart > { private readonly logger: Logger; + private readonly globalConfig$: any; constructor(initializerContext: PluginInitializerContext) { this.logger = initializerContext.logger.get(); + this.globalConfig$ = initializerContext.config.legacy.globalConfig$; } - public setup(core: CoreSetup) { + public async setup(core: CoreSetup) { this.logger.debug('flow-framework-dashboards: Setup'); const router = core.http.createRouter(); - // Register server side APIs - registerOpenSearchRoutes(router); + // Get global config + const globalConfig = await this.globalConfig$.pipe(first()).toPromise(); + + // Create OpenSearch client, including flow framework plugin APIs + const client: ILegacyClusterClient = core.opensearch.legacy.createClient( + 'flow_framework', + { + plugins: [flowFrameworkPlugin], + ...globalConfig.opensearch, + } + ); + + const opensearchRoutesService = new OpenSearchRoutesService(client); + const flowFrameworkRoutesService = new FlowFrameworkRoutesService(client); + + // Register server side APIs with the corresponding service functions + registerOpenSearchRoutes(router, opensearchRoutesService); + registerFlowFrameworkRoutes(router, flowFrameworkRoutesService); return {}; } diff --git a/server/routes/flow_framework_routes_service.ts b/server/routes/flow_framework_routes_service.ts new file mode 100644 index 00000000..be98978b --- /dev/null +++ b/server/routes/flow_framework_routes_service.ts @@ -0,0 +1,194 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { schema } from '@osd/config-schema'; +import { + IRouter, + IOpenSearchDashboardsResponse, + RequestHandlerContext, + OpenSearchDashboardsRequest, + OpenSearchDashboardsResponseFactory, +} from '../../../../src/core/server'; +import { + CREATE_WORKFLOW_NODE_API_PATH, + DELETE_WORKFLOW_NODE_API_PATH, + GET_WORKFLOW_NODE_API_PATH, + GET_WORKFLOW_STATE_NODE_API_PATH, + SEARCH_WORKFLOWS_NODE_API_PATH, + WorkflowDict, +} from '../../common'; +import { generateCustomError, toWorkflowObj } from './helpers'; + +/** + * Server-side routes to process flow-framework-related node API calls and execute the + * corresponding API calls against the OpenSearch cluster. + */ +export function registerFlowFrameworkRoutes( + router: IRouter, + flowFrameworkRoutesService: FlowFrameworkRoutesService +): void { + router.get( + { + path: `${GET_WORKFLOW_NODE_API_PATH}/{workflow_id}`, + validate: { + params: schema.object({ + workflow_id: schema.string(), + }), + }, + }, + flowFrameworkRoutesService.getWorkflow + ); + + router.post( + { + path: SEARCH_WORKFLOWS_NODE_API_PATH, + validate: { + body: schema.any(), + }, + }, + flowFrameworkRoutesService.searchWorkflows + ); + + router.get( + { + path: `${GET_WORKFLOW_STATE_NODE_API_PATH}/{workflow_id}`, + validate: { + params: schema.object({ + workflow_id: schema.string(), + }), + }, + }, + flowFrameworkRoutesService.getWorkflowState + ); + + router.post( + { + path: CREATE_WORKFLOW_NODE_API_PATH, + validate: { + body: schema.any(), + }, + }, + flowFrameworkRoutesService.createWorkflow + ); + + router.delete( + { + path: `${DELETE_WORKFLOW_NODE_API_PATH}/{workflow_id}`, + validate: { + params: schema.object({ + workflow_id: schema.string(), + }), + }, + }, + flowFrameworkRoutesService.deleteWorkflow + ); +} + +export class FlowFrameworkRoutesService { + private client: any; + + constructor(client: any) { + this.client = client; + } + + // TODO: test e2e + getWorkflow = async ( + context: RequestHandlerContext, + req: OpenSearchDashboardsRequest, + res: OpenSearchDashboardsResponseFactory + ): Promise> => { + const { workflow_id } = req.params as { workflow_id: string }; + try { + const response = await this.client + .asScoped(req) + .callAsCurrentUser('flowFramework.getWorkflow', { workflow_id }); + console.log('response from get workflow: ', response); + // TODO: format response + return res.ok({ body: response }); + } catch (err: any) { + return generateCustomError(res, err); + } + }; + + searchWorkflows = async ( + context: RequestHandlerContext, + req: OpenSearchDashboardsRequest, + res: OpenSearchDashboardsResponseFactory + ): Promise> => { + const body = req.body; + try { + const response = await this.client + .asScoped(req) + .callAsCurrentUser('flowFramework.searchWorkflows', { body }); + const workflowHits = response.hits.hits as any[]; + const workflowDict = {} as WorkflowDict; + workflowHits.forEach((workflowHit: any) => { + workflowDict[workflowHit._id] = toWorkflowObj(workflowHit); + }); + + return res.ok({ body: { workflows: workflowDict } }); + } catch (err: any) { + return generateCustomError(res, err); + } + }; + + // TODO: test e2e + getWorkflowState = async ( + context: RequestHandlerContext, + req: OpenSearchDashboardsRequest, + res: OpenSearchDashboardsResponseFactory + ): Promise> => { + const { workflow_id } = req.params as { workflow_id: string }; + try { + const response = await this.client + .asScoped(req) + .callAsCurrentUser('flowFramework.getWorkflowState', { workflow_id }); + console.log('response from get workflow state: ', response); + // TODO: format response + return res.ok({ body: response }); + } catch (err: any) { + return generateCustomError(res, err); + } + }; + + // TODO: test e2e + createWorkflow = async ( + context: RequestHandlerContext, + req: OpenSearchDashboardsRequest, + res: OpenSearchDashboardsResponseFactory + ): Promise> => { + const body = req.body; + + try { + const response = await this.client + .asScoped(req) + .callAsCurrentUser('flowFramework.createWorkflow', { body }); + console.log('response from create workflow: ', response); + // TODO: format response + return res.ok({ body: response }); + } catch (err: any) { + return generateCustomError(res, err); + } + }; + + // TODO: test e2e + deleteWorkflow = async ( + context: RequestHandlerContext, + req: OpenSearchDashboardsRequest, + res: OpenSearchDashboardsResponseFactory + ): Promise> => { + const { workflow_id } = req.params as { workflow_id: string }; + try { + const response = await this.client + .asScoped(req) + .callAsCurrentUser('flowFramework.deleteWorkflow', { workflow_id }); + console.log('response from delete workflow: ', response); + // TODO: format response + return res.ok({ body: response }); + } catch (err: any) { + return generateCustomError(res, err); + } + }; +} diff --git a/server/routes/helpers.ts b/server/routes/helpers.ts index 4ffa1ea1..e67d90f4 100644 --- a/server/routes/helpers.ts +++ b/server/routes/helpers.ts @@ -3,6 +3,8 @@ * SPDX-License-Identifier: Apache-2.0 */ +import { WORKFLOW_STATE, Workflow } from '../../common'; + // OSD does not provide an interface for this response, but this is following the suggested // implementations. To prevent typescript complaining, leaving as loosely-typed 'any' export function generateCustomError(res: any, err: any) { @@ -16,3 +18,19 @@ export function generateCustomError(res: any, err: any) { }, }); } + +export function toWorkflowObj(workflowHit: any): Workflow { + // TODO: update schema parsing after hit schema has been updated. + // https://github.com/opensearch-project/flow-framework/issues/546 + const hitSource = workflowHit.fields.filter[0]; + // const hitSource = workflowHit._source; + return { + id: workflowHit._id, + name: hitSource.name, + description: hitSource.description || '', + // TODO: update below values after frontend Workflow interface is finalized + template: {}, + lastUpdated: 1234, + state: WORKFLOW_STATE.SUCCEEDED, + } as Workflow; +} diff --git a/server/routes/index.ts b/server/routes/index.ts index 00b49143..8b55b8cb 100644 --- a/server/routes/index.ts +++ b/server/routes/index.ts @@ -3,4 +3,5 @@ * SPDX-License-Identifier: Apache-2.0 */ -export * from './opensearch_routes'; +export * from './opensearch_routes_service'; +export * from './flow_framework_routes_service'; diff --git a/server/routes/opensearch_routes.ts b/server/routes/opensearch_routes.ts deleted file mode 100644 index 5454c4b7..00000000 --- a/server/routes/opensearch_routes.ts +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -import { schema } from '@osd/config-schema'; -import { SearchRequest } from '@opensearch-project/opensearch/api/types'; -import { - IRouter, - IOpenSearchDashboardsResponse, -} from '../../../../src/core/server'; -import { SEARCH_INDICES_PATH, FETCH_INDICES_PATH, Index } from '../../common'; -import { generateCustomError } from './helpers'; - -export function registerOpenSearchRoutes(router: IRouter): void { - router.post( - { - path: `${SEARCH_INDICES_PATH}/{index_name}`, - validate: { - params: schema.object({ - index_name: schema.string(), - }), - body: schema.any(), - }, - }, - async (context, req, res): Promise> => { - const client = context.core.opensearch.client.asCurrentUser; - // eslint-disable-next-line @typescript-eslint/naming-convention - const { index_name } = req.params; - const body = req.body; - - const params = { - index: index_name, - body, - } as SearchRequest; - - try { - const response = await client.search(params); - return res.ok({ body: response }); - } catch (err: any) { - return generateCustomError(res, err); - } - } - ); - router.post( - { - path: `${FETCH_INDICES_PATH}/{pattern}`, - validate: { - params: schema.object({ - pattern: schema.string(), - }), - }, - }, - async (context, req, res): Promise> => { - const client = context.core.opensearch.client.asCurrentUser; - const { pattern } = req.params; - try { - const response = await client.cat.indices({ - index: pattern, - format: 'json', - h: 'health,index', - }); - - // re-formatting the index results to match Index - const cleanedIndices = response.body.map((index) => ({ - name: index.index, - health: index.health, - })) as Index[]; - - return res.ok({ body: cleanedIndices }); - } catch (err: any) { - return generateCustomError(res, err); - } - } - ); -} diff --git a/server/routes/opensearch_routes_service.ts b/server/routes/opensearch_routes_service.ts new file mode 100644 index 00000000..d162e3ce --- /dev/null +++ b/server/routes/opensearch_routes_service.ts @@ -0,0 +1,72 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { schema } from '@osd/config-schema'; +import { SearchRequest } from '@opensearch-project/opensearch/api/types'; +import { + IRouter, + IOpenSearchDashboardsResponse, + RequestHandlerContext, + OpenSearchDashboardsRequest, + OpenSearchDashboardsResponseFactory, +} from '../../../../src/core/server'; +import { CAT_INDICES_NODE_API_PATH, Index } from '../../common'; +import { generateCustomError } from './helpers'; + +/** + * Server-side routes to process OpenSearch-related node API calls and execute the + * corresponding API calls against the OpenSearch cluster. + */ +export function registerOpenSearchRoutes( + router: IRouter, + opensearchRoutesService: OpenSearchRoutesService +): void { + router.get( + { + path: `${CAT_INDICES_NODE_API_PATH}/{pattern}`, + validate: { + params: schema.object({ + pattern: schema.string(), + }), + }, + }, + opensearchRoutesService.catIndices + ); +} + +export class OpenSearchRoutesService { + private client: any; + + constructor(client: any) { + this.client = client; + } + + catIndices = async ( + context: RequestHandlerContext, + req: OpenSearchDashboardsRequest, + res: OpenSearchDashboardsResponseFactory + ): Promise> => { + const { pattern } = req.params as { pattern: string }; + try { + const response = await this.client + .asScoped(req) + .callAsCurrentUser('cat.indices', { + index: pattern, + format: 'json', + h: 'health,index', + }); + + // re-formatting the index results to match Index + const cleanedIndices = response.map((index: any) => ({ + name: index.index, + health: index.health, + })) as Index[]; + + return res.ok({ body: cleanedIndices }); + } catch (err: any) { + return generateCustomError(res, err); + } + }; +}