From 1e4127cc794351d8ad6f2b993cb60f32c8bd811a Mon Sep 17 00:00:00 2001 From: Tyler Ohlsen Date: Tue, 6 Aug 2024 18:34:48 -0700 Subject: [PATCH] Support fine-grained provisioning; handle more edge cases on search flow (#268) Signed-off-by: Tyler Ohlsen --- common/interfaces.ts | 12 +- .../input_transform_modal.tsx | 6 +- .../processor_inputs/ml_processor_inputs.tsx | 12 +- .../output_transform_modal.tsx | 6 +- .../configure_search_request.tsx | 24 ++- .../workflow_inputs/workflow_inputs.tsx | 181 ++++++++++++------ public/pages/workflows/new_workflow/utils.ts | 23 ++- public/route_service.ts | 8 +- public/store/reducers/workflows_reducer.ts | 11 +- public/utils/config_to_form_utils.ts | 21 +- public/utils/config_to_schema_utils.ts | 12 +- public/utils/config_to_template_utils.ts | 12 +- public/utils/config_to_workspace_utils.ts | 4 +- public/utils/form_to_config_utils.ts | 26 ++- server/cluster/flow_framework_plugin.ts | 6 +- .../routes/flow_framework_routes_service.ts | 7 +- 16 files changed, 276 insertions(+), 95 deletions(-) diff --git a/common/interfaces.ts b/common/interfaces.ts index 0feac2fd..0a26e4f9 100644 --- a/common/interfaces.ts +++ b/common/interfaces.ts @@ -58,15 +58,25 @@ export type IndexConfig = { settings: IConfigField; }; +// TODO: may expand to just IndexConfig (including mappings/settings info) +// if we want to persist this for users using some existing index, +// and want to pass that index config around. +export type SearchIndexConfig = { + name: IConfigField; +}; + export type IngestConfig = { - enabled: boolean; + enabled: IConfigField; source: {}; + pipelineName: IConfigField; enrich: ProcessorsConfig; index: IndexConfig; }; export type SearchConfig = { request: IConfigField; + index: SearchIndexConfig; + pipelineName: IConfigField; enrichRequest: ProcessorsConfig; enrichResponse: ProcessorsConfig; }; diff --git a/public/pages/workflow_detail/workflow_inputs/processor_inputs/input_transform_modal.tsx b/public/pages/workflow_detail/workflow_inputs/processor_inputs/input_transform_modal.tsx index cd91abe0..068656a4 100644 --- a/public/pages/workflow_detail/workflow_inputs/processor_inputs/input_transform_modal.tsx +++ b/public/pages/workflow_detail/workflow_inputs/processor_inputs/input_transform_modal.tsx @@ -234,7 +234,11 @@ export function InputTransformModal(props: InputTransformModalProps) { helpText={`An array specifying how to map fields from the ingested document to the model’s input.`} helpLink={ML_INFERENCE_DOCS_LINK} keyPlaceholder="Model input field" - valuePlaceholder="Document field" + valuePlaceholder={ + props.context === PROCESSOR_CONTEXT.SEARCH_REQUEST + ? 'Query field' + : 'Document field' + } keyOptions={props.inputFields} onFormChange={props.onFormChange} // If the map we are adding is the first one, populate the selected option to index 0 diff --git a/public/pages/workflow_detail/workflow_inputs/processor_inputs/ml_processor_inputs.tsx b/public/pages/workflow_detail/workflow_inputs/processor_inputs/ml_processor_inputs.tsx index 7ff69f4f..719aa908 100644 --- a/public/pages/workflow_detail/workflow_inputs/processor_inputs/ml_processor_inputs.tsx +++ b/public/pages/workflow_detail/workflow_inputs/processor_inputs/ml_processor_inputs.tsx @@ -229,7 +229,11 @@ export function MLProcessorInputs(props: MLProcessorInputsProps) { helpText={`An array specifying how to map fields from the ingested document to the model’s input.`} helpLink={ML_INFERENCE_DOCS_LINK} keyPlaceholder="Model input field" - valuePlaceholder="Document field" + valuePlaceholder={ + props.context === PROCESSOR_CONTEXT.SEARCH_REQUEST + ? 'Query field' + : 'Document field' + } onFormChange={props.onFormChange} keyOptions={inputFields} /> @@ -273,7 +277,11 @@ export function MLProcessorInputs(props: MLProcessorInputsProps) { label="Output Map" helpText={`An array specifying how to map the model’s output to new document fields.`} helpLink={ML_INFERENCE_DOCS_LINK} - keyPlaceholder="New document field" + keyPlaceholder={ + props.context === PROCESSOR_CONTEXT.SEARCH_REQUEST + ? 'Query field' + : 'Document field' + } valuePlaceholder="Model output field" onFormChange={props.onFormChange} valueOptions={outputFields} diff --git a/public/pages/workflow_detail/workflow_inputs/processor_inputs/output_transform_modal.tsx b/public/pages/workflow_detail/workflow_inputs/processor_inputs/output_transform_modal.tsx index f408ac41..246a0887 100644 --- a/public/pages/workflow_detail/workflow_inputs/processor_inputs/output_transform_modal.tsx +++ b/public/pages/workflow_detail/workflow_inputs/processor_inputs/output_transform_modal.tsx @@ -108,7 +108,7 @@ export function OutputTransformModal(props: OutputTransformModalProps) { const valuesWithoutOutputMapConfig = cloneDeep(values); set( valuesWithoutOutputMapConfig, - `ingest.enrich.${props.config.id}.outputMap`, + `ingest.enrich.${props.config.id}.output_map`, [] ); const curIngestPipeline = formikToPartialPipeline( @@ -146,7 +146,7 @@ export function OutputTransformModal(props: OutputTransformModalProps) { const valuesWithoutOutputMapConfig = cloneDeep(values); set( valuesWithoutOutputMapConfig, - `search.enrichResponse.${props.config.id}.outputMap`, + `search.enrichResponse.${props.config.id}.output_map`, [] ); const curSearchPipeline = formikToPartialPipeline( @@ -226,7 +226,7 @@ export function OutputTransformModal(props: OutputTransformModalProps) { label="Output Map" helpText={`An array specifying how to map the model’s output to new fields.`} helpLink={ML_INFERENCE_DOCS_LINK} - keyPlaceholder="New document field" + keyPlaceholder="Document field" valuePlaceholder="Model output field" valueOptions={props.outputFields} onFormChange={props.onFormChange} diff --git a/public/pages/workflow_detail/workflow_inputs/search_inputs/configure_search_request.tsx b/public/pages/workflow_detail/workflow_inputs/search_inputs/configure_search_request.tsx index faaaee30..a6dac344 100644 --- a/public/pages/workflow_detail/workflow_inputs/search_inputs/configure_search_request.tsx +++ b/public/pages/workflow_detail/workflow_inputs/search_inputs/configure_search_request.tsx @@ -22,7 +22,7 @@ import { EuiText, EuiTitle, } from '@elastic/eui'; -import { SearchHit, WorkspaceFormValues } from '../../../../../common'; +import { SearchHit, WorkflowFormValues } from '../../../../../common'; import { JsonField } from '../input_fields'; import { AppState, @@ -44,18 +44,27 @@ export function ConfigureSearchRequest(props: ConfigureSearchRequestProps) { const dispatch = useAppDispatch(); // Form state - const { values } = useFormikContext(); - const indexName = values.ingest.index.name; + const { values, setFieldValue, setFieldTouched } = useFormikContext< + WorkflowFormValues + >(); const ingestEnabled = values.ingest.enabled; + const searchIndexNameFormPath = 'search.index.name'; // All indices state const indices = useSelector((state: AppState) => state.opensearch.indices); // Selected index state const [selectedIndex, setSelectedIndex] = useState( - undefined + values.search.index.name ); + // initial load: set the search index value, if not already set + useEffect(() => { + if (values.ingest.enabled) { + setFieldValue(searchIndexNameFormPath, values.ingest.index.name); + } + }, []); + // Edit modal state const [isEditModalOpen, setIsEditModalOpen] = useState(false); @@ -116,7 +125,7 @@ export function ConfigureSearchRequest(props: ConfigureSearchRequestProps) { {ingestEnabled ? ( - + ) : ( { setSelectedIndex(option); + setFieldValue(searchIndexNameFormPath, option); + setFieldTouched(searchIndexNameFormPath, true); + props.onFormChange(); }} isInvalid={selectedIndex === undefined} /> @@ -165,7 +177,7 @@ export function ConfigureSearchRequest(props: ConfigureSearchRequestProps) { // see https://opensearch.org/docs/latest/search-plugins/search-pipelines/using-search-pipeline/#disabling-the-default-pipeline-for-a-request dispatch( searchIndex({ - index: indexName, + index: values.search.index.name, body: values.search.request, searchPipeline: '_none', }) diff --git a/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx b/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx index 906159ee..8e10c11b 100644 --- a/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx +++ b/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx @@ -5,7 +5,7 @@ import React, { useCallback, useEffect, useState } from 'react'; import { useSelector } from 'react-redux'; -import { useFormikContext } from 'formik'; +import { getIn, useFormikContext } from 'formik'; import { debounce, isEmpty } from 'lodash'; import { EuiButton, @@ -124,6 +124,9 @@ export function WorkflowInputs(props: WorkflowInputsProps) { const onIngestAndProvisioned = onIngest && ingestProvisioned; const onIngestAndUnprovisioned = onIngest && !ingestProvisioned; const onIngestAndDisabled = onIngest && !ingestEnabled; + const isProposingNoSearchResources = + isEmpty(getIn(values, 'search.enrichRequest')) && + isEmpty(getIn(values, 'search.enrichResponse')); // Auto-save the UI metadata when users update form values. // Only update the underlying workflow template (deprovision/provision) when @@ -160,6 +163,7 @@ export function WorkflowInputs(props: WorkflowInputsProps) { workflowId: props.workflow?.id as string, workflowTemplate: updatedTemplate, updateFields: true, + reprovision: false, }) ) .unwrap() @@ -188,64 +192,95 @@ export function WorkflowInputs(props: WorkflowInputsProps) { setSearchProvisioned(hasProvisionedSearchResources(props.workflow)); }, [props.workflow]); - // Utility fn to update the workflow, including any updated/new resources - // Eventually, should be able to use fine-grained provisioning to do a single API call - // instead of the currently-implemented deprovision -> update -> provision. - // To simplify and minimize errors, we set various sleep calls in between the actions - // to allow time for full deprovisioning / provisioning to occur, such as index deletion - // & index re-creation. - // TODO: update to fine-grained provisioning when available. + // Utility fn to update the workflow, including any updated/new resources. + // The reprovision param is used to determine whether we are doing full + // deprovision/update/provision, vs. update w/ reprovision (fine-grained provisioning). + // Details on the reprovision API is here: https://github.com/opensearch-project/flow-framework/pull/804 async function updateWorkflowAndResources( - updatedWorkflow: Workflow + updatedWorkflow: Workflow, + reprovision: boolean ): Promise { let success = false; - await dispatch( - deprovisionWorkflow({ - workflowId: updatedWorkflow.id as string, - resourceIds: getResourcesToBeForceDeleted(props.workflow), - }) - ) - .unwrap() - .then(async (result) => { - await dispatch( - updateWorkflow({ - workflowId: updatedWorkflow.id as string, - workflowTemplate: reduceToTemplate(updatedWorkflow), - }) - ) - .unwrap() - .then(async (result) => { - await sleep(1000); - await dispatch(provisionWorkflow(updatedWorkflow.id as string)) - .unwrap() - .then(async (result) => { - await sleep(1000); - success = true; - // Kicking off an async task to re-fetch the workflow details - // after some amount of time. Provisioning will finish in an indeterminate - // amount of time and may be long and expensive; we add this single - // auto-fetching to cover the majority of provisioning updates which - // are inexpensive and will finish within milliseconds. - new Promise((f) => setTimeout(f, 1000)).then(async () => { - dispatch(getWorkflow(updatedWorkflow.id as string)); + if (reprovision) { + await dispatch( + updateWorkflow({ + workflowId: updatedWorkflow.id as string, + workflowTemplate: reduceToTemplate(updatedWorkflow), + reprovision: true, + }) + ) + .unwrap() + .then(async (result) => { + await sleep(1000); + success = true; + // Kicking off an async task to re-fetch the workflow details + // after some amount of time. Provisioning will finish in an indeterminate + // amount of time and may be long and expensive; we add this single + // auto-fetching to cover the majority of provisioning updates which + // are inexpensive and will finish within milliseconds. + setTimeout(async () => { + dispatch(getWorkflow(updatedWorkflow.id as string)); + }, 1000); + }) + .catch((error: any) => { + console.error('Error reprovisioning workflow: ', error); + }); + } else { + await dispatch( + deprovisionWorkflow({ + workflowId: updatedWorkflow.id as string, + resourceIds: getResourcesToBeForceDeleted(props.workflow), + }) + ) + .unwrap() + .then(async (result) => { + await dispatch( + updateWorkflow({ + workflowId: updatedWorkflow.id as string, + workflowTemplate: reduceToTemplate(updatedWorkflow), + reprovision: false, + }) + ) + .unwrap() + .then(async (result) => { + await sleep(1000); + await dispatch(provisionWorkflow(updatedWorkflow.id as string)) + .unwrap() + .then(async (result) => { + await sleep(1000); + success = true; + // Kicking off an async task to re-fetch the workflow details + // after some amount of time. Provisioning will finish in an indeterminate + // amount of time and may be long and expensive; we add this single + // auto-fetching to cover the majority of provisioning updates which + // are inexpensive and will finish within milliseconds. + setTimeout(async () => { + dispatch(getWorkflow(updatedWorkflow.id as string)); + }, 1000); + }) + .catch((error: any) => { + console.error('Error provisioning updated workflow: ', error); }); - }) - .catch((error: any) => { - console.error('Error provisioning updated workflow: ', error); - }); - }) - .catch((error: any) => { - console.error('Error updating workflow: ', error); - }); - }) - .catch((error: any) => { - console.error('Error deprovisioning workflow: ', error); - }); + }) + .catch((error: any) => { + console.error('Error updating workflow: ', error); + }); + }) + .catch((error: any) => { + console.error('Error deprovisioning workflow: ', error); + }); + } return success; } // Utility fn to validate the form and update the workflow if valid - async function validateAndUpdateWorkflow(): Promise { + // Support fine-grained validation if we only need to validate a subset + // of the entire form. + async function validateAndUpdateWorkflow( + reprovision: boolean, + includeIngest: boolean = true, + includeSearch: boolean = true + ): Promise { let success = false; // Submit the form to bubble up any errors. // Ideally we handle Promise accept/rejects with submitForm(), but there is @@ -253,8 +288,13 @@ export function WorkflowInputs(props: WorkflowInputsProps) { // The workaround is to additionally execute validateForm() which will return any errors found. submitForm(); await validateForm() - .then(async (validationResults: {}) => { - if (Object.keys(validationResults).length > 0) { + .then(async (validationResults: { ingest?: {}; search?: {} }) => { + const { ingest, search } = validationResults; + const relevantValidationResults = { + ...(includeIngest && ingest !== undefined ? { ingest } : {}), + ...(includeSearch && search !== undefined ? { search } : {}), + }; + if (Object.keys(relevantValidationResults).length > 0) { // TODO: may want to persist more fine-grained form validation (ingest vs. search) // For example, running an ingest should be possible, even with some // invalid query or search processor config. And vice versa. @@ -272,7 +312,10 @@ export function WorkflowInputs(props: WorkflowInputsProps) { }, workflows: configToTemplateFlows(updatedConfig), } as Workflow; - success = await updateWorkflowAndResources(updatedWorkflow); + success = await updateWorkflowAndResources( + updatedWorkflow, + reprovision + ); } }) .catch((error) => { @@ -282,6 +325,9 @@ export function WorkflowInputs(props: WorkflowInputsProps) { return success; } + // Updating ingest. In this case, do full deprovision/update/provision, since we want + // to clean up any created resources and not have leftover / stale data in some index. + // This is propagated by passing `reprovision=false` to validateAndUpdateWorkflow() async function validateAndRunIngestion(): Promise { let success = false; try { @@ -290,7 +336,7 @@ export function WorkflowInputs(props: WorkflowInputsProps) { ingestDocsObjs = JSON.parse(props.ingestDocs); } catch (e) {} if (ingestDocsObjs.length > 0 && !isEmpty(ingestDocsObjs[0])) { - success = await validateAndUpdateWorkflow(); + success = await validateAndUpdateWorkflow(false, true, false); if (success) { const bulkBody = prepareBulkBody( values.ingest.index.name, @@ -318,6 +364,13 @@ export function WorkflowInputs(props: WorkflowInputsProps) { return success; } + // Updating search. If existing ingest resources, run fine-grained provisioning to persist that + // created index and any indexed data, and only update/re-create the search + // pipeline, as well as adding that pipeline as the default pipeline for the existing index. + // If no ingest resources (using user's existing index), run full + // deprovision/update/provision, since we're just re-creating the search pipeline. + // This logic is propagated by passing `reprovision=true/false` in the + // validateAndUpdateWorkflow() fn calls below. async function validateAndRunQuery(): Promise { let success = false; try { @@ -326,13 +379,14 @@ export function WorkflowInputs(props: WorkflowInputsProps) { queryObj = JSON.parse(props.query); } catch (e) {} if (!isEmpty(queryObj)) { - // TODO: currently this will execute deprovision in child fns. - // In the future, we must omit deprovisioning the index, as it contains - // the data we are executing the query against. Tracking issue: - // https://github.com/opensearch-project/flow-framework/issues/717 - success = await validateAndUpdateWorkflow(); + if (hasProvisionedIngestResources(props.workflow)) { + success = await validateAndUpdateWorkflow(true, false, true); + } else { + success = await validateAndUpdateWorkflow(false, false, true); + } + if (success) { - const indexName = values.ingest.index.name; + const indexName = values.search.index.name; dispatch(searchIndex({ index: indexName, body: props.query })) .unwrap() .then(async (resp) => { @@ -604,7 +658,10 @@ export function WorkflowInputs(props: WorkflowInputsProps) { { validateAndRunQuery(); diff --git a/public/pages/workflows/new_workflow/utils.ts b/public/pages/workflows/new_workflow/utils.ts index d45f6891..43f6a145 100644 --- a/public/pages/workflows/new_workflow/utils.ts +++ b/public/pages/workflows/new_workflow/utils.ts @@ -12,6 +12,7 @@ import { WORKFLOW_TYPE, FETCH_ALL_QUERY_BODY, } from '../../../../common'; +import { generateId } from '../../../utils'; // Fn to produce the complete preset template with all necessary UI metadata. export function enrichPresetWorkflowWithUiMetadata( @@ -44,8 +45,17 @@ function fetchEmptyMetadata(): UIState { type: WORKFLOW_TYPE.CUSTOM, config: { ingest: { - enabled: true, + enabled: { + id: 'enabled', + type: 'boolean', + value: true, + }, source: {}, + pipelineName: { + id: 'pipelineName', + type: 'string', + value: generateId('ingest_pipeline'), + }, enrich: { processors: [], }, @@ -70,6 +80,17 @@ function fetchEmptyMetadata(): UIState { type: 'json', value: JSON.stringify(FETCH_ALL_QUERY_BODY, undefined, 2), }, + pipelineName: { + id: 'pipelineName', + type: 'string', + value: generateId('search_pipeline'), + }, + index: { + name: { + id: 'indexName', + type: 'string', + }, + }, enrichRequest: { processors: [], }, diff --git a/public/route_service.ts b/public/route_service.ts index 99017c8e..14435646 100644 --- a/public/route_service.ts +++ b/public/route_service.ts @@ -40,7 +40,8 @@ export interface RouteService { updateWorkflow: ( workflowId: string, workflowTemplate: WorkflowTemplate, - updateFields: boolean + updateFields: boolean, + reprovision: boolean ) => Promise; provisionWorkflow: (workflowId: string) => Promise; deprovisionWorkflow: ( @@ -115,11 +116,12 @@ export function configureRoutes(core: CoreStart): RouteService { updateWorkflow: async ( workflowId: string, workflowTemplate: WorkflowTemplate, - updateFields: boolean + updateFields: boolean, + reprovision: boolean ) => { try { const response = await core.http.put<{ respString: string }>( - `${UPDATE_WORKFLOW_NODE_API_PATH}/${workflowId}/${updateFields}`, + `${UPDATE_WORKFLOW_NODE_API_PATH}/${workflowId}/${updateFields}/${reprovision}`, { body: JSON.stringify(workflowTemplate), } diff --git a/public/store/reducers/workflows_reducer.ts b/public/store/reducers/workflows_reducer.ts index ea4c4549..a87e3545 100644 --- a/public/store/reducers/workflows_reducer.ts +++ b/public/store/reducers/workflows_reducer.ts @@ -95,16 +95,23 @@ export const updateWorkflow = createAsyncThunk( workflowId: string; workflowTemplate: WorkflowTemplate; updateFields?: boolean; + reprovision?: boolean; }, { rejectWithValue } ) => { - const { workflowId, workflowTemplate, updateFields } = workflowInfo; + const { + workflowId, + workflowTemplate, + updateFields, + reprovision, + } = workflowInfo; const response: | any | HttpFetchError = await getRouteService().updateWorkflow( workflowId, workflowTemplate, - updateFields || false + updateFields || false, + reprovision || false ); if (response instanceof HttpFetchError) { return rejectWithValue( diff --git a/public/utils/config_to_form_utils.ts b/public/utils/config_to_form_utils.ts index 6563f6f6..fb36451b 100644 --- a/public/utils/config_to_form_utils.ts +++ b/public/utils/config_to_form_utils.ts @@ -15,6 +15,7 @@ import { ConfigFieldType, ConfigFieldValue, ModelFormValue, + SearchIndexConfig, } from '../../common'; /* @@ -40,7 +41,11 @@ function ingestConfigToFormik( ): FormikValues { let ingestFormikValues = {} as FormikValues; if (ingestConfig) { - ingestFormikValues['enabled'] = ingestConfig.enabled; + ingestFormikValues['enabled'] = + ingestConfig.enabled.value || getInitialValue(ingestConfig.enabled.type); + ingestFormikValues['pipelineName'] = + ingestConfig.pipelineName.value || + getInitialValue(ingestConfig.pipelineName.type); ingestFormikValues['docs'] = ingestDocs || getInitialValue('jsonArray'); ingestFormikValues['enrich'] = processorsConfigToFormik( ingestConfig.enrich @@ -92,6 +97,10 @@ function searchConfigToFormik( if (searchConfig) { searchFormikValues['request'] = searchConfig.request.value || getInitialValue('json'); + searchFormikValues['pipelineName'] = + searchConfig.pipelineName.value || + getInitialValue(searchConfig.pipelineName.type); + searchFormikValues['index'] = searchIndexConfigToFormik(searchConfig.index); searchFormikValues['enrichRequest'] = processorsConfigToFormik( searchConfig.enrichRequest ); @@ -102,6 +111,16 @@ function searchConfigToFormik( return searchFormikValues; } +function searchIndexConfigToFormik( + searchIndexConfig: SearchIndexConfig +): FormikValues { + let formValues = {} as FormikValues; + formValues['name'] = + searchIndexConfig.name.value || + getInitialValue(searchIndexConfig.name.type); + return formValues; +} + // Helper fn to get an initial value based on the field type export function getInitialValue(fieldType: ConfigFieldType): ConfigFieldValue { switch (fieldType) { diff --git a/public/utils/config_to_schema_utils.ts b/public/utils/config_to_schema_utils.ts index 6a4801d4..44b0a019 100644 --- a/public/utils/config_to_schema_utils.ts +++ b/public/utils/config_to_schema_utils.ts @@ -14,6 +14,7 @@ import { WorkflowSchemaObj, IndexConfig, IConfigField, + SearchIndexConfig, } from '../../common'; /* @@ -31,10 +32,11 @@ function ingestConfigToSchema( ingestConfig: IngestConfig | undefined ): ObjectSchema { const ingestSchemaObj = {} as { [key: string]: Schema }; - if (ingestConfig) { + if (ingestConfig?.enabled) { ingestSchemaObj['docs'] = getFieldSchema({ type: 'jsonArray', } as IConfigField); + ingestSchemaObj['pipelineName'] = getFieldSchema(ingestConfig.pipelineName); ingestSchemaObj['enrich'] = processorsConfigToSchema(ingestConfig.enrich); ingestSchemaObj['index'] = indexConfigToSchema(ingestConfig.index); } @@ -57,6 +59,8 @@ function searchConfigToSchema( searchSchemaObj['request'] = getFieldSchema({ type: 'json', } as IConfigField); + searchSchemaObj['pipelineName'] = getFieldSchema(searchConfig.pipelineName); + searchSchemaObj['index'] = searchIndexToSchema(searchConfig.index); searchSchemaObj['enrichRequest'] = processorsConfigToSchema( searchConfig.enrichRequest ); @@ -67,6 +71,12 @@ function searchConfigToSchema( return yup.object(searchSchemaObj); } +function searchIndexToSchema(searchIndexConfig: SearchIndexConfig): Schema { + const searchIndexSchemaObj = {} as { [key: string]: Schema }; + searchIndexSchemaObj['name'] = getFieldSchema(searchIndexConfig.name); + return yup.object(searchIndexSchemaObj); +} + function processorsConfigToSchema(processorsConfig: ProcessorsConfig): Schema { const processorsSchemaObj = {} as { [key: string]: Schema }; processorsConfig.processors.forEach((processorConfig) => { diff --git a/public/utils/config_to_template_utils.ts b/public/utils/config_to_template_utils.ts index af857405..bd63de00 100644 --- a/public/utils/config_to_template_utils.ts +++ b/public/utils/config_to_template_utils.ts @@ -35,7 +35,6 @@ import { DELIMITER_OPTIONAL_FIELDS, } from '../../common'; import { processorConfigToFormik } from './config_to_form_utils'; -import { generateId } from './utils'; /* **************** Config -> template utils ********************** @@ -65,7 +64,7 @@ function configToProvisionTemplateFlow(config: WorkflowConfig): TemplateFlow { (node) => node.type === WORKFLOW_STEP_TYPE.CREATE_SEARCH_PIPELINE_STEP_TYPE ) as CreateSearchPipelineNode; - if (config.ingest.enabled) { + if (config.ingest.enabled.value) { nodes.push( indexConfigToTemplateNode( config.ingest.index, @@ -84,13 +83,14 @@ function configToProvisionTemplateFlow(config: WorkflowConfig): TemplateFlow { function ingestConfigToTemplateNodes( ingestConfig: IngestConfig ): TemplateNode[] { - const ingestPipelineName = generateId('ingest_pipeline'); + const ingestPipelineName = ingestConfig.pipelineName.value; + const ingestEnabled = ingestConfig.enabled.value; const ingestProcessors = processorConfigsToTemplateProcessors( ingestConfig.enrich.processors ); const hasProcessors = ingestProcessors.length > 0; - return hasProcessors && ingestConfig.enabled + return hasProcessors && ingestEnabled ? [ { id: ingestPipelineName, @@ -110,7 +110,7 @@ function ingestConfigToTemplateNodes( function searchConfigToTemplateNodes( searchConfig: SearchConfig ): TemplateNode[] { - const searchPipelineName = generateId('search_pipeline'); + const searchPipelineName = searchConfig.pipelineName.value; const searchRequestProcessors = processorConfigsToTemplateProcessors( searchConfig.enrichRequest.processors ); @@ -295,7 +295,7 @@ function indexConfigToTemplateNode( updateFinalInputsAndSettings(searchPipelineNode); return { - id: 'create_index', + id: indexConfig.name.value as string, type: WORKFLOW_STEP_TYPE.CREATE_INDEX_STEP_TYPE, previous_node_inputs: finalPreviousNodeInputs, user_inputs: { diff --git a/public/utils/config_to_workspace_utils.ts b/public/utils/config_to_workspace_utils.ts index f57d3ad7..cb5aa459 100644 --- a/public/utils/config_to_workspace_utils.ts +++ b/public/utils/config_to_workspace_utils.ts @@ -88,7 +88,7 @@ function ingestConfigToWorkspaceFlow( // Parent ingest node const parentNode = { - id: generateId(COMPONENT_CATEGORY.INGEST), + id: ingestConfig.pipelineName.value, position: { x: 400, y: 400 }, type: NODE_CATEGORY.INGEST_GROUP, data: { label: COMPONENT_CATEGORY.INGEST }, @@ -186,7 +186,7 @@ function searchConfigToWorkspaceFlow( // Parent search node const parentNode = { - id: generateId(COMPONENT_CATEGORY.SEARCH), + id: searchConfig.pipelineName.value, position: { x: 400, y: 800 }, type: NODE_CATEGORY.SEARCH_GROUP, data: { label: COMPONENT_CATEGORY.SEARCH }, diff --git a/public/utils/form_to_config_utils.ts b/public/utils/form_to_config_utils.ts index 153977ab..f83a6331 100644 --- a/public/utils/form_to_config_utils.ts +++ b/public/utils/form_to_config_utils.ts @@ -12,6 +12,7 @@ import { SearchConfig, ProcessorsConfig, IndexConfig, + SearchIndexConfig, } from '../../common'; import { getInitialValue } from './config_to_form_utils'; @@ -45,7 +46,14 @@ function formikToIngestUiConfig( ): IngestConfig { return { ...existingConfig, - enabled: ingestFormValues['enabled'], + enabled: { + ...existingConfig.enabled, + value: ingestFormValues['enabled'], + }, + pipelineName: { + ...existingConfig.pipelineName, + value: ingestFormValues['pipelineName'], + }, enrich: formikToProcessorsUiConfig( ingestFormValues['enrich'], existingConfig.enrich @@ -77,6 +85,14 @@ function formikToSearchUiConfig( ...existingConfig.request, value: searchFormValues['request'], }, + pipelineName: { + ...existingConfig.pipelineName, + value: searchFormValues['pipelineName'], + }, + index: formikToSearchIndexUiConfig( + searchFormValues['index'], + existingConfig.index + ), enrichRequest: formikToProcessorsUiConfig( searchFormValues['enrichRequest'], existingConfig.enrichRequest @@ -88,6 +104,14 @@ function formikToSearchUiConfig( }; } +function formikToSearchIndexUiConfig( + searchIndexFormValues: FormikValues, + existingConfig: SearchIndexConfig +): SearchIndexConfig { + existingConfig['name'].value = searchIndexFormValues['name']; + return existingConfig; +} + function formikToProcessorsUiConfig( formValues: FormikValues, existingConfig: ProcessorsConfig diff --git a/server/cluster/flow_framework_plugin.ts b/server/cluster/flow_framework_plugin.ts index 2dec0fef..c01f81b7 100644 --- a/server/cluster/flow_framework_plugin.ts +++ b/server/cluster/flow_framework_plugin.ts @@ -75,7 +75,7 @@ export function flowFrameworkPlugin(Client: any, config: any, components: any) { flowFramework.updateWorkflow = ca({ url: { - fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>?update_fields=<%=update_fields%>`, + fmt: `${FLOW_FRAMEWORK_WORKFLOW_ROUTE_PREFIX}/<%=workflow_id%>?update_fields=<%=update_fields%>&reprovision=<%=reprovision%>`, req: { workflow_id: { type: 'string', @@ -85,6 +85,10 @@ export function flowFrameworkPlugin(Client: any, config: any, components: any) { type: 'boolean', required: true, }, + reprovision: { + type: 'boolean', + required: true, + }, }, }, needBody: true, diff --git a/server/routes/flow_framework_routes_service.ts b/server/routes/flow_framework_routes_service.ts index fc43b561..861e0752 100644 --- a/server/routes/flow_framework_routes_service.ts +++ b/server/routes/flow_framework_routes_service.ts @@ -93,11 +93,12 @@ export function registerFlowFrameworkRoutes( router.put( { - path: `${UPDATE_WORKFLOW_NODE_API_PATH}/{workflow_id}/{update_fields}`, + path: `${UPDATE_WORKFLOW_NODE_API_PATH}/{workflow_id}/{update_fields}/{reprovision}`, validate: { params: schema.object({ workflow_id: schema.string(), update_fields: schema.boolean(), + reprovision: schema.boolean(), }), body: schema.any(), }, @@ -294,9 +295,10 @@ export class FlowFrameworkRoutesService { req: OpenSearchDashboardsRequest, res: OpenSearchDashboardsResponseFactory ): Promise> => { - const { workflow_id, update_fields } = req.params as { + const { workflow_id, update_fields, reprovision } = req.params as { workflow_id: string; update_fields: boolean; + reprovision: boolean; }; const workflowTemplate = req.body as WorkflowTemplate; try { @@ -306,6 +308,7 @@ export class FlowFrameworkRoutesService { workflow_id, // default update_fields to false if not explicitly set otherwise update_fields: update_fields, + reprovision: reprovision, body: workflowTemplate, });