From 5c87863ad4d280de6c8ea88299fe0aee8dcbd5f6 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Fri, 19 Jul 2024 08:24:24 -0700 Subject: [PATCH] Integrate with updated Flow Framework APIs (#222) (#226) Signed-off-by: Tyler Ohlsen (cherry picked from commit 9216efcd0dd6de912d19e730814fa040cacf63c1) Co-authored-by: Tyler Ohlsen --- .../input_fields/map_field.tsx | 8 ++ .../input_fields/model_field.tsx | 1 + .../workflow_inputs/workflow_inputs.tsx | 92 +++++++++++++++++-- public/route_service.ts | 22 +++-- public/store/reducers/workflows_reducer.ts | 20 +++- public/utils/utils.ts | 27 +++++- server/cluster/flow_framework_plugin.ts | 23 ++++- .../routes/flow_framework_routes_service.ts | 48 ++++++++-- 8 files changed, 208 insertions(+), 33 deletions(-) diff --git a/public/pages/workflow_detail/workflow_inputs/input_fields/map_field.tsx b/public/pages/workflow_detail/workflow_inputs/input_fields/map_field.tsx index a27071e3..98b6c45c 100644 --- a/public/pages/workflow_detail/workflow_inputs/input_fields/map_field.tsx +++ b/public/pages/workflow_detail/workflow_inputs/input_fields/map_field.tsx @@ -104,6 +104,10 @@ export function MapField(props: MapFieldProps) { className="euiFieldText" value={mapping.key} onChange={(e) => { + form.setFieldTouched( + `${props.fieldPath}.${idx}.key`, + true + ); form.setFieldValue( `${props.fieldPath}.${idx}.key`, e.target.value @@ -119,6 +123,10 @@ export function MapField(props: MapFieldProps) { className="euiFieldText" value={mapping.value} onChange={(e) => { + form.setFieldTouched( + `${props.fieldPath}.${idx}.value`, + true + ); form.setFieldValue( `${props.fieldPath}.${idx}.value`, e.target.value diff --git a/public/pages/workflow_detail/workflow_inputs/input_fields/model_field.tsx b/public/pages/workflow_detail/workflow_inputs/input_fields/model_field.tsx index 802d8174..a6317355 100644 --- a/public/pages/workflow_detail/workflow_inputs/input_fields/model_field.tsx +++ b/public/pages/workflow_detail/workflow_inputs/input_fields/model_field.tsx @@ -108,6 +108,7 @@ export function ModelField(props: ModelFieldProps) { )} valueOfSelected={field.value?.id || ''} onChange={(option: string) => { + form.setFieldTouched(props.fieldPath, true); form.setFieldValue(props.fieldPath, { id: option, } as ModelFormValue); diff --git a/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx b/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx index e6dc6322..ec1798c0 100644 --- a/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx +++ b/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx @@ -3,10 +3,10 @@ * SPDX-License-Identifier: Apache-2.0 */ -import React, { useEffect, useState } from 'react'; +import React, { useCallback, useEffect, useState } from 'react'; import { useSelector } from 'react-redux'; import { useFormikContext } from 'formik'; -import { isEmpty } from 'lodash'; +import { debounce, isEmpty } from 'lodash'; import { EuiButton, EuiButtonEmpty, @@ -31,6 +31,7 @@ import { Workflow, WorkflowConfig, WorkflowFormValues, + WorkflowTemplate, } from '../../../../common'; import { IngestInputs } from './ingest_inputs'; import { SearchInputs } from './search_inputs'; @@ -53,6 +54,7 @@ import { hasProvisionedIngestResources, hasProvisionedSearchResources, generateId, + getResourcesToBeForceDeleted, } from '../../../utils'; import { BooleanField } from './input_fields'; import { ExportOptions } from './export_options'; @@ -90,9 +92,13 @@ enum INGEST_OPTION { */ export function WorkflowInputs(props: WorkflowInputsProps) { - const { submitForm, validateForm, setFieldValue, values } = useFormikContext< - WorkflowFormValues - >(); + const { + submitForm, + validateForm, + setFieldValue, + values, + touched, + } = useFormikContext(); const dispatch = useAppDispatch(); // Overall workspace state @@ -117,6 +123,59 @@ export function WorkflowInputs(props: WorkflowInputsProps) { const onIngestAndUnprovisioned = onIngest && !ingestProvisioned; const onIngestAndDisabled = onIngest && !ingestEnabled; + // Auto-save the UI metadata when users update form values. + // Only update the underlying workflow template (deprovision/provision) when + // users explicitly run ingest/search and need to have updated resources + // to test against. + // We use useCallback() with an autosave flag that is only set within the fn itself. + // This is so we can fetch the latest values (uiConfig, formik values) inside a memoized fn, + // but only when we need to. + const [autosave, setAutosave] = useState(false); + function triggerAutosave(): void { + setAutosave(!autosave); + } + const debounceAutosave = useCallback( + debounce(async () => { + triggerAutosave(); + }, 10000), + [autosave] + ); + + // Hook to execute autosave when triggered. Runs the update API with update_fields set to true, + // to update the ui_metadata without updating the underlying template for a provisioned workflow. + useEffect(() => { + (async () => { + if (!isEmpty(touched)) { + const updatedTemplate = { + name: props.workflow?.name, + ui_metadata: { + ...props.workflow?.ui_metadata, + config: formikToUiConfig(values, props.uiConfig as WorkflowConfig), + }, + } as WorkflowTemplate; + await dispatch( + updateWorkflow({ + workflowId: props.workflow?.id as string, + workflowTemplate: updatedTemplate, + updateFields: true, + }) + ) + .unwrap() + .then(async (result) => {}) + .catch((error: any) => { + console.error('Error autosaving workflow: ', error); + }); + } + })(); + }, [autosave]); + + // Hook to listen for changes to form values and trigger autosave + useEffect(() => { + if (!isEmpty(values)) { + debounceAutosave(); + } + }, [values]); + useEffect(() => { setIngestProvisioned(hasProvisionedIngestResources(props.workflow)); setSearchProvisioned(hasProvisionedSearchResources(props.workflow)); @@ -129,7 +188,12 @@ export function WorkflowInputs(props: WorkflowInputsProps) { updatedWorkflow: Workflow ): Promise { let success = false; - await dispatch(deprovisionWorkflow(updatedWorkflow.id as string)) + await dispatch( + deprovisionWorkflow({ + workflowId: updatedWorkflow.id as string, + resourceIds: getResourcesToBeForceDeleted(props.workflow), + }) + ) .unwrap() .then(async (result) => { await dispatch( @@ -249,6 +313,10 @@ export function WorkflowInputs(props: WorkflowInputsProps) { queryObj = JSON.parse(props.query); } catch (e) {} if (!isEmpty(queryObj)) { + // TODO: currently this will execute deprovision in child fns. + // In the future, we must omit deprovisioning the index, as it contains + // the data we are executing the query against. Tracking issue: + // https://github.com/opensearch-project/flow-framework/issues/717 success = await validateAndUpdateWorkflow(); if (success) { const indexName = values.ingest.index.name; @@ -312,7 +380,7 @@ export function WorkflowInputs(props: WorkflowInputsProps) { setIsModalOpen(false)}> -

{`Delete resources for workflow ${props.workflow.name}?`}

+

{`Delete resources for workflow ${props.workflow?.name}?`}

@@ -328,8 +396,14 @@ export function WorkflowInputs(props: WorkflowInputsProps) { { - // @ts-ignore - await dispatch(deprovisionWorkflow(props.workflow.id)) + await dispatch( + deprovisionWorkflow({ + workflowId: props.workflow?.id as string, + resourceIds: getResourcesToBeForceDeleted( + props.workflow + ), + }) + ) .unwrap() .then(async (result) => { setFieldValue('ingest.enabled', false); diff --git a/public/route_service.ts b/public/route_service.ts index 9e110c38..99017c8e 100644 --- a/public/route_service.ts +++ b/public/route_service.ts @@ -39,10 +39,14 @@ export interface RouteService { createWorkflow: (body: {}) => Promise; updateWorkflow: ( workflowId: string, - workflowTemplate: WorkflowTemplate + workflowTemplate: WorkflowTemplate, + updateFields: boolean ) => Promise; provisionWorkflow: (workflowId: string) => Promise; - deprovisionWorkflow: (workflowId: string) => Promise; + deprovisionWorkflow: ( + workflowId: string, + resourceIds?: string + ) => Promise; deleteWorkflow: (workflowId: string) => Promise; getWorkflowPresets: () => Promise; catIndices: (pattern: string) => Promise; @@ -110,11 +114,12 @@ export function configureRoutes(core: CoreStart): RouteService { }, updateWorkflow: async ( workflowId: string, - workflowTemplate: WorkflowTemplate + workflowTemplate: WorkflowTemplate, + updateFields: boolean ) => { try { const response = await core.http.put<{ respString: string }>( - `${UPDATE_WORKFLOW_NODE_API_PATH}/${workflowId}`, + `${UPDATE_WORKFLOW_NODE_API_PATH}/${workflowId}/${updateFields}`, { body: JSON.stringify(workflowTemplate), } @@ -134,11 +139,12 @@ export function configureRoutes(core: CoreStart): RouteService { return e as HttpFetchError; } }, - deprovisionWorkflow: async (workflowId: string) => { + deprovisionWorkflow: async (workflowId: string, resourceIds?: string) => { try { - const response = await core.http.post<{ respString: string }>( - `${DEPROVISION_WORKFLOW_NODE_API_PATH}/${workflowId}` - ); + const path = resourceIds + ? `${DEPROVISION_WORKFLOW_NODE_API_PATH}/${workflowId}/${resourceIds}` + : `${DEPROVISION_WORKFLOW_NODE_API_PATH}/${workflowId}`; + const response = await core.http.post<{ respString: string }>(path); return response; } catch (e: any) { return e as HttpFetchError; diff --git a/public/store/reducers/workflows_reducer.ts b/public/store/reducers/workflows_reducer.ts index f4ccc509..ea4c4549 100644 --- a/public/store/reducers/workflows_reducer.ts +++ b/public/store/reducers/workflows_reducer.ts @@ -91,15 +91,20 @@ export const createWorkflow = createAsyncThunk( export const updateWorkflow = createAsyncThunk( UPDATE_WORKFLOW_ACTION, async ( - workflowInfo: { workflowId: string; workflowTemplate: WorkflowTemplate }, + workflowInfo: { + workflowId: string; + workflowTemplate: WorkflowTemplate; + updateFields?: boolean; + }, { rejectWithValue } ) => { - const { workflowId, workflowTemplate } = workflowInfo; + const { workflowId, workflowTemplate, updateFields } = workflowInfo; const response: | any | HttpFetchError = await getRouteService().updateWorkflow( workflowId, - workflowTemplate + workflowTemplate, + updateFields || false ); if (response instanceof HttpFetchError) { return rejectWithValue( @@ -129,11 +134,16 @@ export const provisionWorkflow = createAsyncThunk( export const deprovisionWorkflow = createAsyncThunk( DEPROVISION_WORKFLOW_ACTION, - async (workflowId: string, { rejectWithValue }) => { + async ( + deprovisionInfo: { workflowId: string; resourceIds?: string }, + { rejectWithValue } + ) => { + const { workflowId, resourceIds } = deprovisionInfo; const response: | any | HttpFetchError = await getRouteService().deprovisionWorkflow( - workflowId + workflowId, + resourceIds ); if (response instanceof HttpFetchError) { return rejectWithValue( diff --git a/public/utils/utils.ts b/public/utils/utils.ts index 80eaac7d..2aecdd14 100644 --- a/public/utils/utils.ts +++ b/public/utils/utils.ts @@ -4,7 +4,11 @@ */ import yaml from 'js-yaml'; -import { WORKFLOW_STEP_TYPE, Workflow } from '../../common'; +import { + WORKFLOW_RESOURCE_TYPE, + WORKFLOW_STEP_TYPE, + Workflow, +} from '../../common'; // Append 16 random characters export function generateId(prefix?: string): string { @@ -47,6 +51,27 @@ export function hasProvisionedSearchResources( return result; } +// returns a comma-delimited string of all resource IDs that need to be force deleted. +// see https://github.com/opensearch-project/flow-framework/pull/763 +export function getResourcesToBeForceDeleted( + workflow: Workflow | undefined +): string | undefined { + const resources = workflow?.resourcesCreated?.filter( + (workflowResource) => + workflowResource.type === WORKFLOW_RESOURCE_TYPE.INDEX_NAME || + workflowResource.type === WORKFLOW_RESOURCE_TYPE.PIPELINE_ID + ); + + if (resources !== undefined && resources.length > 0) { + return resources + .map((resource) => resource.id) + .map(String) + .join(','); + } else { + return undefined; + } +} + export function getObjFromJsonOrYamlString( fileContents: string | undefined ): object | undefined { diff --git a/server/cluster/flow_framework_plugin.ts b/server/cluster/flow_framework_plugin.ts index 4f9b0990..2dec0fef 100644 --- a/server/cluster/flow_framework_plugin.ts +++ b/server/cluster/flow_framework_plugin.ts @@ -75,12 +75,16 @@ export function flowFrameworkPlugin(Client: any, config: any, components: any) { flowFramework.updateWorkflow = ca({ url: { - fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>`, + fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>?update_fields=<%=update_fields%>`, req: { workflow_id: { type: 'string', required: true, }, + update_fields: { + type: 'boolean', + required: true, + }, }, }, needBody: true, @@ -113,6 +117,23 @@ export function flowFrameworkPlugin(Client: any, config: any, components: any) { method: 'POST', }); + flowFramework.forceDeprovisionWorkflow = ca({ + url: { + fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>/_deprovision?allow_delete=<%=resource_ids%>`, + req: { + workflow_id: { + type: 'string', + required: true, + }, + resource_ids: { + type: 'string', + required: true, + }, + }, + }, + method: 'POST', + }); + flowFramework.deleteWorkflow = ca({ url: { fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>`, diff --git a/server/routes/flow_framework_routes_service.ts b/server/routes/flow_framework_routes_service.ts index 45dea688..a2d62dc3 100644 --- a/server/routes/flow_framework_routes_service.ts +++ b/server/routes/flow_framework_routes_service.ts @@ -92,10 +92,11 @@ export function registerFlowFrameworkRoutes( router.put( { - path: `${UPDATE_WORKFLOW_NODE_API_PATH}/{workflow_id}`, + path: `${UPDATE_WORKFLOW_NODE_API_PATH}/{workflow_id}/{update_fields}`, validate: { params: schema.object({ workflow_id: schema.string(), + update_fields: schema.boolean(), }), body: schema.any(), }, @@ -127,6 +128,19 @@ export function registerFlowFrameworkRoutes( flowFrameworkRoutesService.deprovisionWorkflow ); + router.post( + { + path: `${DEPROVISION_WORKFLOW_NODE_API_PATH}/{workflow_id}/{resource_ids}`, + validate: { + params: schema.object({ + workflow_id: schema.string(), + resource_ids: schema.string(), + }), + }, + }, + flowFrameworkRoutesService.deprovisionWorkflow + ); + router.delete( { path: `${DELETE_WORKFLOW_NODE_API_PATH}/{workflow_id}`, @@ -279,14 +293,18 @@ export class FlowFrameworkRoutesService { req: OpenSearchDashboardsRequest, res: OpenSearchDashboardsResponseFactory ): Promise> => { - const { workflow_id } = req.params as { workflow_id: string }; + const { workflow_id, update_fields } = req.params as { + workflow_id: string; + update_fields: boolean; + }; const workflowTemplate = req.body as WorkflowTemplate; - try { await this.client .asScoped(req) .callAsCurrentUser('flowFramework.updateWorkflow', { workflow_id, + // default update_fields to false if not explicitly set otherwise + update_fields: update_fields, body: workflowTemplate, }); @@ -317,13 +335,25 @@ export class FlowFrameworkRoutesService { req: OpenSearchDashboardsRequest, res: OpenSearchDashboardsResponseFactory ): Promise> => { - const { workflow_id } = req.params as { workflow_id: string }; + const { workflow_id, resource_ids } = req.params as { + workflow_id: string; + resource_ids?: string; + }; try { - await this.client - .asScoped(req) - .callAsCurrentUser('flowFramework.deprovisionWorkflow', { - workflow_id, - }); + if (resource_ids !== undefined) { + await this.client + .asScoped(req) + .callAsCurrentUser('flowFramework.forceDeprovisionWorkflow', { + workflow_id, + resource_ids, + }); + } else { + await this.client + .asScoped(req) + .callAsCurrentUser('flowFramework.deprovisionWorkflow', { + workflow_id, + }); + } return res.ok(); } catch (err: any) { return generateCustomError(res, err);