Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Fetch workflow state; align data models with backend #89

Merged
merged 1 commit into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export const PLUGIN_ID = 'flow-framework';
export const FLOW_FRAMEWORK_API_ROUTE_PREFIX = '/_plugins/_flow_framework';
export const FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX = `${FLOW_FRAMEWORK_API_ROUTE_PREFIX}/workflow`;
export const FLOW_FRAMEWORK_SEARCH_WORKFLOWS_ROUTE = `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/_search`;
export const FLOW_FRAMEWORK_SEARCH_WORKFLOW_STATE_ROUTE = `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/state/_search`;

/**
* NODE APIs
Expand Down
10 changes: 6 additions & 4 deletions common/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,13 @@ export type UseCaseTemplate = {
export type Workflow = {
id: string;
name: string;
useCase: string;
description?: string;
// ReactFlow state may not exist if a workflow is created via API/backend-only.
workspaceFlowState?: WorkspaceFlowState;
template: UseCaseTemplate;
lastUpdated: number;
lastLaunched: number;
state: WORKFLOW_STATE;
};

Expand All @@ -95,12 +97,12 @@ export type WorkflowLaunch = {
lastUpdated: number;
};

// TODO: finalize list of possible workflow states from backend
// Based off of https://github.com/opensearch-project/flow-framework/blob/main/src/main/java/org/opensearch/flowframework/model/State.java
export enum WORKFLOW_STATE {
SUCCEEDED = 'Succeeded',
FAILED = 'Failed',
IN_PROGRESS = 'In progress',
NOT_STARTED = 'Not started',
PROVISIONING = 'Provisioning',
FAILED = 'Failed',
COMPLETED = 'Completed',
}

export type WorkflowDict = {
Expand Down
2 changes: 1 addition & 1 deletion public/pages/workflow_detail/launches/launch_list.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export function LaunchList(props: LaunchListProps) {
const workflowLaunches = [
{
id: 'Launch_1',
state: WORKFLOW_STATE.IN_PROGRESS,
state: WORKFLOW_STATE.PROVISIONING,
lastUpdated: 12345678,
},
{
Expand Down
19 changes: 12 additions & 7 deletions public/pages/workflows/workflow_list/columns.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,23 @@ export const columns = [
),
},
{
field: 'id',
name: 'ID',
field: 'state',
name: 'Status',
sortable: true,
},
{
field: 'description',
name: 'Description',
sortable: false,
field: 'useCase',
name: 'Type',
sortable: true,
},
{
field: 'state',
name: 'Status',
field: 'lastUpdated',
name: 'Last updated',
sortable: true,
},
{
field: 'lastLaunched',
name: 'Last launched',
sortable: true,
},
];
8 changes: 4 additions & 4 deletions public/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,22 +131,22 @@ export function getStateOptions(): EuiFilterSelectItem[] {
return [
// @ts-ignore
{
name: WORKFLOW_STATE.SUCCEEDED,
name: WORKFLOW_STATE.NOT_STARTED,
checked: 'on',
} as EuiFilterSelectItem,
// @ts-ignore
{
name: WORKFLOW_STATE.NOT_STARTED,
name: WORKFLOW_STATE.PROVISIONING,
checked: 'on',
} as EuiFilterSelectItem,
// @ts-ignore
{
name: WORKFLOW_STATE.IN_PROGRESS,
name: WORKFLOW_STATE.FAILED,
checked: 'on',
} as EuiFilterSelectItem,
// @ts-ignore
{
name: WORKFLOW_STATE.FAILED,
name: WORKFLOW_STATE.COMPLETED,
checked: 'on',
} as EuiFilterSelectItem,
];
Expand Down
10 changes: 10 additions & 0 deletions server/cluster/flow_framework_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import {
FLOW_FRAMEWORK_SEARCH_WORKFLOWS_ROUTE,
FLOW_FRAMEWORK_SEARCH_WORKFLOW_STATE_ROUTE,
FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX,
} from '../../common';

Expand Down Expand Up @@ -55,6 +56,15 @@ export function flowFrameworkPlugin(Client: any, config: any, components: any) {
method: 'GET',
});

flowFramework.searchWorkflowState = ca({
url: {
fmt: FLOW_FRAMEWORK_SEARCH_WORKFLOW_STATE_ROUTE,
},
needBody: true,
// Exposed client rejects making GET requests with a body. So, we use POST
method: 'POST',
});

flowFramework.createWorkflow = ca({
url: {
fmt: FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX,
Expand Down
23 changes: 15 additions & 8 deletions server/routes/flow_framework_routes_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
GET_WORKFLOW_NODE_API_PATH,
GET_WORKFLOW_STATE_NODE_API_PATH,
SEARCH_WORKFLOWS_NODE_API_PATH,
WorkflowDict,
} from '../../common';
import { generateCustomError, toWorkflowObj } from './helpers';
import { generateCustomError, getWorkflowsFromResponses } from './helpers';

/**
* Server-side routes to process flow-framework-related node API calls and execute the
Expand Down Expand Up @@ -99,12 +98,12 @@
req: OpenSearchDashboardsRequest,
res: OpenSearchDashboardsResponseFactory
): Promise<IOpenSearchDashboardsResponse<any>> => {
const { workflow_id } = req.params as { workflow_id: string };

Check failure on line 101 in server/routes/flow_framework_routes_service.ts

View workflow job for this annotation

GitHub Actions / Run lint script

Variable name `workflow_id` must match one of the following formats: camelCase, UPPER_CASE, PascalCase

Check failure on line 101 in server/routes/flow_framework_routes_service.ts

View workflow job for this annotation

GitHub Actions / Run lint script

Variable name `workflow_id` must match one of the following formats: camelCase, UPPER_CASE, PascalCase
try {
const response = await this.client
.asScoped(req)
.callAsCurrentUser('flowFramework.getWorkflow', { workflow_id });
console.log('response from get workflow: ', response);

Check failure on line 106 in server/routes/flow_framework_routes_service.ts

View workflow job for this annotation

GitHub Actions / Run lint script

Unexpected console statement

Check failure on line 106 in server/routes/flow_framework_routes_service.ts

View workflow job for this annotation

GitHub Actions / Run lint script

Unexpected console statement
// TODO: format response
return res.ok({ body: response });
} catch (err: any) {
Expand All @@ -112,22 +111,30 @@
}
};

// TODO: can remove or simplify if we can fetch all data from a single API call. Tracking issue:
// https://github.com/opensearch-project/flow-framework/issues/171
// Current implementation is making two calls and combining results via helper fn
searchWorkflows = async (
context: RequestHandlerContext,
req: OpenSearchDashboardsRequest,
res: OpenSearchDashboardsResponseFactory
): Promise<IOpenSearchDashboardsResponse<any>> => {
const body = req.body;
try {
const response = await this.client
const workflowsResponse = await this.client
.asScoped(req)
.callAsCurrentUser('flowFramework.searchWorkflows', { body });
const workflowHits = response.hits.hits as any[];
const workflowDict = {} as WorkflowDict;
workflowHits.forEach((workflowHit: any) => {
workflowDict[workflowHit._id] = toWorkflowObj(workflowHit);
});
const workflowHits = workflowsResponse.hits.hits as any[];

const workflowStatesResponse = await this.client
.asScoped(req)
.callAsCurrentUser('flowFramework.searchWorkflowState', { body });
const workflowStateHits = workflowStatesResponse.hits.hits as any[];

const workflowDict = getWorkflowsFromResponses(
workflowHits,
workflowStateHits
);
return res.ok({ body: { workflows: workflowDict } });
} catch (err: any) {
return generateCustomError(res, err);
Expand All @@ -140,12 +147,12 @@
req: OpenSearchDashboardsRequest,
res: OpenSearchDashboardsResponseFactory
): Promise<IOpenSearchDashboardsResponse<any>> => {
const { workflow_id } = req.params as { workflow_id: string };

Check failure on line 150 in server/routes/flow_framework_routes_service.ts

View workflow job for this annotation

GitHub Actions / Run lint script

Variable name `workflow_id` must match one of the following formats: camelCase, UPPER_CASE, PascalCase

Check failure on line 150 in server/routes/flow_framework_routes_service.ts

View workflow job for this annotation

GitHub Actions / Run lint script

Variable name `workflow_id` must match one of the following formats: camelCase, UPPER_CASE, PascalCase
try {
const response = await this.client
.asScoped(req)
.callAsCurrentUser('flowFramework.getWorkflowState', { workflow_id });
console.log('response from get workflow state: ', response);

Check failure on line 155 in server/routes/flow_framework_routes_service.ts

View workflow job for this annotation

GitHub Actions / Run lint script

Unexpected console statement

Check failure on line 155 in server/routes/flow_framework_routes_service.ts

View workflow job for this annotation

GitHub Actions / Run lint script

Unexpected console statement
// TODO: format response
return res.ok({ body: response });
} catch (err: any) {
Expand All @@ -165,7 +172,7 @@
const response = await this.client
.asScoped(req)
.callAsCurrentUser('flowFramework.createWorkflow', { body });
console.log('response from create workflow: ', response);

Check failure on line 175 in server/routes/flow_framework_routes_service.ts

View workflow job for this annotation

GitHub Actions / Run lint script

Unexpected console statement

Check failure on line 175 in server/routes/flow_framework_routes_service.ts

View workflow job for this annotation

GitHub Actions / Run lint script

Unexpected console statement
// TODO: format response
return res.ok({ body: response });
} catch (err: any) {
Expand All @@ -179,12 +186,12 @@
req: OpenSearchDashboardsRequest,
res: OpenSearchDashboardsResponseFactory
): Promise<IOpenSearchDashboardsResponse<any>> => {
const { workflow_id } = req.params as { workflow_id: string };

Check failure on line 189 in server/routes/flow_framework_routes_service.ts

View workflow job for this annotation

GitHub Actions / Run lint script

Variable name `workflow_id` must match one of the following formats: camelCase, UPPER_CASE, PascalCase

Check failure on line 189 in server/routes/flow_framework_routes_service.ts

View workflow job for this annotation

GitHub Actions / Run lint script

Variable name `workflow_id` must match one of the following formats: camelCase, UPPER_CASE, PascalCase
try {
const response = await this.client
.asScoped(req)
.callAsCurrentUser('flowFramework.deleteWorkflow', { workflow_id });
console.log('response from delete workflow: ', response);

Check failure on line 194 in server/routes/flow_framework_routes_service.ts

View workflow job for this annotation

GitHub Actions / Run lint script

Unexpected console statement

Check failure on line 194 in server/routes/flow_framework_routes_service.ts

View workflow job for this annotation

GitHub Actions / Run lint script

Unexpected console statement
// TODO: format response
return res.ok({ body: response });
} catch (err: any) {
Expand Down
37 changes: 33 additions & 4 deletions server/routes/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

import { WORKFLOW_STATE, Workflow } from '../../common';
import { WORKFLOW_STATE, Workflow, WorkflowDict } from '../../common';

// OSD does not provide an interface for this response, but this is following the suggested
// implementations. To prevent typescript complaining, leaving as loosely-typed 'any'
Expand All @@ -19,18 +19,47 @@
});
}

export function toWorkflowObj(workflowHit: any): Workflow {
function toWorkflowObj(workflowHit: any): Workflow {
// TODO: update schema parsing after hit schema has been updated.
// https://github.com/opensearch-project/flow-framework/issues/546
const hitSource = workflowHit.fields.filter[0];
// const hitSource = workflowHit._source;
return {
id: workflowHit._id,
name: hitSource.name,
useCase: hitSource.use_case,
description: hitSource.description || '',
// TODO: update below values after frontend Workflow interface is finalized
template: {},
// TODO: this needs to be persisted by backend. Tracking issue:
// https://github.com/opensearch-project/flow-framework/issues/548
lastUpdated: 1234,
state: WORKFLOW_STATE.SUCCEEDED,
} as Workflow;
}

// TODO: can remove or simplify if we can fetch all data from a single API call. Tracking issue:
// https://github.com/opensearch-project/flow-framework/issues/171
// Current implementation combines 2 search responses to create a single set of workflows with
// static information + state information
export function getWorkflowsFromResponses(
workflowHits: any[],
workflowStateHits: any[]
): WorkflowDict {
const workflowDict = {} as WorkflowDict;
workflowHits.forEach((workflowHit: any) => {
workflowDict[workflowHit._id] = toWorkflowObj(workflowHit);
const workflowStateHit = workflowStateHits.find(
(workflowStateHit) => workflowStateHit._id === workflowHit._id

Check failure on line 51 in server/routes/helpers.ts

View workflow job for this annotation

GitHub Actions / Run lint script

'workflowStateHit' is already declared in the upper scope

Check failure on line 51 in server/routes/helpers.ts

View workflow job for this annotation

GitHub Actions / Run lint script

'workflowStateHit' is already declared in the upper scope
);
const workflowState = workflowStateHit._source
.state as typeof WORKFLOW_STATE;
workflowDict[workflowHit._id] = {
...workflowDict[workflowHit._id],
// @ts-ignore
state: WORKFLOW_STATE[workflowState],
// TODO: this needs to be persisted by backend. Tracking issue:
// https://github.com/opensearch-project/flow-framework/issues/548
lastLaunched: 1234,
};
});
return workflowDict;
}
Loading