Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Integrate with updated Flow Framework APIs #226

Merged
merged 1 commit into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ export function MapField(props: MapFieldProps) {
className="euiFieldText"
value={mapping.key}
onChange={(e) => {
form.setFieldTouched(
`${props.fieldPath}.${idx}.key`,
true
);
form.setFieldValue(
`${props.fieldPath}.${idx}.key`,
e.target.value
Expand All @@ -119,6 +123,10 @@ export function MapField(props: MapFieldProps) {
className="euiFieldText"
value={mapping.value}
onChange={(e) => {
form.setFieldTouched(
`${props.fieldPath}.${idx}.value`,
true
);
form.setFieldValue(
`${props.fieldPath}.${idx}.value`,
e.target.value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ export function ModelField(props: ModelFieldProps) {
)}
valueOfSelected={field.value?.id || ''}
onChange={(option: string) => {
form.setFieldTouched(props.fieldPath, true);
form.setFieldValue(props.fieldPath, {
id: option,
} as ModelFormValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
* SPDX-License-Identifier: Apache-2.0
*/

import React, { useEffect, useState } from 'react';
import React, { useCallback, useEffect, useState } from 'react';
import { useSelector } from 'react-redux';
import { useFormikContext } from 'formik';
import { isEmpty } from 'lodash';
import { debounce, isEmpty } from 'lodash';
import {
EuiButton,
EuiButtonEmpty,
Expand All @@ -31,6 +31,7 @@ import {
Workflow,
WorkflowConfig,
WorkflowFormValues,
WorkflowTemplate,
} from '../../../../common';
import { IngestInputs } from './ingest_inputs';
import { SearchInputs } from './search_inputs';
Expand All @@ -53,6 +54,7 @@ import {
hasProvisionedIngestResources,
hasProvisionedSearchResources,
generateId,
getResourcesToBeForceDeleted,
} from '../../../utils';
import { BooleanField } from './input_fields';
import { ExportOptions } from './export_options';
Expand Down Expand Up @@ -90,9 +92,13 @@ enum INGEST_OPTION {
*/

export function WorkflowInputs(props: WorkflowInputsProps) {
const { submitForm, validateForm, setFieldValue, values } = useFormikContext<
WorkflowFormValues
>();
const {
submitForm,
validateForm,
setFieldValue,
values,
touched,
} = useFormikContext<WorkflowFormValues>();
const dispatch = useAppDispatch();

// Overall workspace state
Expand All @@ -117,6 +123,59 @@ export function WorkflowInputs(props: WorkflowInputsProps) {
const onIngestAndUnprovisioned = onIngest && !ingestProvisioned;
const onIngestAndDisabled = onIngest && !ingestEnabled;

// Auto-save the UI metadata when users update form values.
// Only update the underlying workflow template (deprovision/provision) when
// users explicitly run ingest/search and need to have updated resources
// to test against.
// We use useCallback() with an autosave flag that is only set within the fn itself.
// This is so we can fetch the latest values (uiConfig, formik values) inside a memoized fn,
// but only when we need to.
const [autosave, setAutosave] = useState<boolean>(false);
function triggerAutosave(): void {
setAutosave(!autosave);
}
const debounceAutosave = useCallback(
debounce(async () => {
triggerAutosave();
}, 10000),
[autosave]
);

// Hook to execute autosave when triggered. Runs the update API with update_fields set to true,
// to update the ui_metadata without updating the underlying template for a provisioned workflow.
useEffect(() => {
(async () => {
if (!isEmpty(touched)) {
const updatedTemplate = {
name: props.workflow?.name,
ui_metadata: {
...props.workflow?.ui_metadata,
config: formikToUiConfig(values, props.uiConfig as WorkflowConfig),
},
} as WorkflowTemplate;
await dispatch(
updateWorkflow({
workflowId: props.workflow?.id as string,
workflowTemplate: updatedTemplate,
updateFields: true,
})
)
.unwrap()
.then(async (result) => {})
.catch((error: any) => {
console.error('Error autosaving workflow: ', error);
});
}
})();
}, [autosave]);

// Hook to listen for changes to form values and trigger autosave
useEffect(() => {
if (!isEmpty(values)) {
debounceAutosave();
}
}, [values]);

useEffect(() => {
setIngestProvisioned(hasProvisionedIngestResources(props.workflow));
setSearchProvisioned(hasProvisionedSearchResources(props.workflow));
Expand All @@ -129,7 +188,12 @@ export function WorkflowInputs(props: WorkflowInputsProps) {
updatedWorkflow: Workflow
): Promise<boolean> {
let success = false;
await dispatch(deprovisionWorkflow(updatedWorkflow.id as string))
await dispatch(
deprovisionWorkflow({
workflowId: updatedWorkflow.id as string,
resourceIds: getResourcesToBeForceDeleted(props.workflow),
})
)
.unwrap()
.then(async (result) => {
await dispatch(
Expand Down Expand Up @@ -249,6 +313,10 @@ export function WorkflowInputs(props: WorkflowInputsProps) {
queryObj = JSON.parse(props.query);
} catch (e) {}
if (!isEmpty(queryObj)) {
// TODO: currently this will execute deprovision in child fns.
// In the future, we must omit deprovisioning the index, as it contains
// the data we are executing the query against. Tracking issue:
// https://github.com/opensearch-project/flow-framework/issues/717
success = await validateAndUpdateWorkflow();
if (success) {
const indexName = values.ingest.index.name;
Expand Down Expand Up @@ -312,7 +380,7 @@ export function WorkflowInputs(props: WorkflowInputsProps) {
<EuiModal onClose={() => setIsModalOpen(false)}>
<EuiModalHeader>
<EuiModalHeaderTitle>
<p>{`Delete resources for workflow ${props.workflow.name}?`}</p>
<p>{`Delete resources for workflow ${props.workflow?.name}?`}</p>
</EuiModalHeaderTitle>
</EuiModalHeader>
<EuiModalBody>
Expand All @@ -328,8 +396,14 @@ export function WorkflowInputs(props: WorkflowInputsProps) {
</EuiButtonEmpty>
<EuiButton
onClick={async () => {
// @ts-ignore
await dispatch(deprovisionWorkflow(props.workflow.id))
await dispatch(
deprovisionWorkflow({
workflowId: props.workflow?.id as string,
resourceIds: getResourcesToBeForceDeleted(
props.workflow
),
})
)
.unwrap()
.then(async (result) => {
setFieldValue('ingest.enabled', false);
Expand Down
22 changes: 14 additions & 8 deletions public/route_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,14 @@ export interface RouteService {
createWorkflow: (body: {}) => Promise<any | HttpFetchError>;
updateWorkflow: (
workflowId: string,
workflowTemplate: WorkflowTemplate
workflowTemplate: WorkflowTemplate,
updateFields: boolean
) => Promise<any | HttpFetchError>;
provisionWorkflow: (workflowId: string) => Promise<any | HttpFetchError>;
deprovisionWorkflow: (workflowId: string) => Promise<any | HttpFetchError>;
deprovisionWorkflow: (
workflowId: string,
resourceIds?: string
) => Promise<any | HttpFetchError>;
deleteWorkflow: (workflowId: string) => Promise<any | HttpFetchError>;
getWorkflowPresets: () => Promise<any | HttpFetchError>;
catIndices: (pattern: string) => Promise<any | HttpFetchError>;
Expand Down Expand Up @@ -110,11 +114,12 @@ export function configureRoutes(core: CoreStart): RouteService {
},
updateWorkflow: async (
workflowId: string,
workflowTemplate: WorkflowTemplate
workflowTemplate: WorkflowTemplate,
updateFields: boolean
) => {
try {
const response = await core.http.put<{ respString: string }>(
`${UPDATE_WORKFLOW_NODE_API_PATH}/${workflowId}`,
`${UPDATE_WORKFLOW_NODE_API_PATH}/${workflowId}/${updateFields}`,
{
body: JSON.stringify(workflowTemplate),
}
Expand All @@ -134,11 +139,12 @@ export function configureRoutes(core: CoreStart): RouteService {
return e as HttpFetchError;
}
},
deprovisionWorkflow: async (workflowId: string) => {
deprovisionWorkflow: async (workflowId: string, resourceIds?: string) => {
try {
const response = await core.http.post<{ respString: string }>(
`${DEPROVISION_WORKFLOW_NODE_API_PATH}/${workflowId}`
);
const path = resourceIds
? `${DEPROVISION_WORKFLOW_NODE_API_PATH}/${workflowId}/${resourceIds}`
: `${DEPROVISION_WORKFLOW_NODE_API_PATH}/${workflowId}`;
const response = await core.http.post<{ respString: string }>(path);
return response;
} catch (e: any) {
return e as HttpFetchError;
Expand Down
20 changes: 15 additions & 5 deletions public/store/reducers/workflows_reducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,20 @@ export const createWorkflow = createAsyncThunk(
export const updateWorkflow = createAsyncThunk(
UPDATE_WORKFLOW_ACTION,
async (
workflowInfo: { workflowId: string; workflowTemplate: WorkflowTemplate },
workflowInfo: {
workflowId: string;
workflowTemplate: WorkflowTemplate;
updateFields?: boolean;
},
{ rejectWithValue }
) => {
const { workflowId, workflowTemplate } = workflowInfo;
const { workflowId, workflowTemplate, updateFields } = workflowInfo;
const response:
| any
| HttpFetchError = await getRouteService().updateWorkflow(
workflowId,
workflowTemplate
workflowTemplate,
updateFields || false
);
if (response instanceof HttpFetchError) {
return rejectWithValue(
Expand Down Expand Up @@ -129,11 +134,16 @@ export const provisionWorkflow = createAsyncThunk(

export const deprovisionWorkflow = createAsyncThunk(
DEPROVISION_WORKFLOW_ACTION,
async (workflowId: string, { rejectWithValue }) => {
async (
deprovisionInfo: { workflowId: string; resourceIds?: string },
{ rejectWithValue }
) => {
const { workflowId, resourceIds } = deprovisionInfo;
const response:
| any
| HttpFetchError = await getRouteService().deprovisionWorkflow(
workflowId
workflowId,
resourceIds
);
if (response instanceof HttpFetchError) {
return rejectWithValue(
Expand Down
27 changes: 26 additions & 1 deletion public/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
*/

import yaml from 'js-yaml';
import { WORKFLOW_STEP_TYPE, Workflow } from '../../common';
import {
WORKFLOW_RESOURCE_TYPE,
WORKFLOW_STEP_TYPE,
Workflow,
} from '../../common';

// Append 16 random characters
export function generateId(prefix?: string): string {
Expand Down Expand Up @@ -47,6 +51,27 @@ export function hasProvisionedSearchResources(
return result;
}

// returns a comma-delimited string of all resource IDs that need to be force deleted.
// see https://github.com/opensearch-project/flow-framework/pull/763
export function getResourcesToBeForceDeleted(
workflow: Workflow | undefined
): string | undefined {
const resources = workflow?.resourcesCreated?.filter(
(workflowResource) =>
workflowResource.type === WORKFLOW_RESOURCE_TYPE.INDEX_NAME ||
workflowResource.type === WORKFLOW_RESOURCE_TYPE.PIPELINE_ID
);

if (resources !== undefined && resources.length > 0) {
return resources
.map((resource) => resource.id)
.map(String)
.join(',');
} else {
return undefined;
}
}

export function getObjFromJsonOrYamlString(
fileContents: string | undefined
): object | undefined {
Expand Down
23 changes: 22 additions & 1 deletion server/cluster/flow_framework_plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,16 @@ export function flowFrameworkPlugin(Client: any, config: any, components: any) {

flowFramework.updateWorkflow = ca({
url: {
fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>`,
fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>?update_fields=<%=update_fields%>`,
req: {
workflow_id: {
type: 'string',
required: true,
},
update_fields: {
type: 'boolean',
required: true,
},
},
},
needBody: true,
Expand Down Expand Up @@ -113,6 +117,23 @@ export function flowFrameworkPlugin(Client: any, config: any, components: any) {
method: 'POST',
});

flowFramework.forceDeprovisionWorkflow = ca({
url: {
fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>/_deprovision?allow_delete=<%=resource_ids%>`,
req: {
workflow_id: {
type: 'string',
required: true,
},
resource_ids: {
type: 'string',
required: true,
},
},
},
method: 'POST',
});

flowFramework.deleteWorkflow = ca({
url: {
fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>`,
Expand Down
Loading
Loading