diff --git a/common/constants.ts b/common/constants.ts index 29fe1f95..2bc45505 100644 --- a/common/constants.ts +++ b/common/constants.ts @@ -119,6 +119,19 @@ export const NEURAL_SPARSE_TOKENIZER_TRANSFORMER = { version: '1.0.1', } as PretrainedSparseEncodingModel; +/** + * Various constants pertaining to Workflow configs + */ + +export enum PROCESSOR_TYPE { + MODEL = 'model_processor', +} + +export enum MODEL_TYPE { + TEXT_EMBEDDING = 'text_embedding', + SPARSE_ENCODER = 'sparse_encoder', +} + /** * Various constants pertaining to the drag-and-drop UI components */ diff --git a/common/interfaces.ts b/common/interfaces.ts index d9221525..c369fb8a 100644 --- a/common/interfaces.ts +++ b/common/interfaces.ts @@ -6,7 +6,12 @@ import { Node, Edge } from 'reactflow'; import { FormikValues } from 'formik'; import { ObjectSchema } from 'yup'; -import { COMPONENT_CLASS, COMPONENT_CATEGORY } from './constants'; +import { + COMPONENT_CLASS, + COMPONENT_CATEGORY, + PROCESSOR_TYPE, + MODEL_TYPE, +} from './constants'; export type Index = { name: string; @@ -42,8 +47,16 @@ export interface IConfig { metadata?: IConfigMetadata; } +export interface IProcessorConfig extends IConfig { + type: PROCESSOR_TYPE; +} + +export interface IModelProcessorConfig extends IProcessorConfig { + modelType: MODEL_TYPE; +} + export type EnrichConfig = { - processors: IConfig[]; + processors: IProcessorConfig[]; }; export type IndexConfig = { @@ -63,8 +76,8 @@ export type SearchConfig = { }; export type WorkflowConfig = { - ingest?: IngestConfig; - search?: SearchConfig; + ingest: IngestConfig; + search: SearchConfig; }; export type WorkflowFormValues = { diff --git a/public/configs/ingest_processors/base_ingest_processor.ts b/public/configs/ingest_processors/base_ingest_processor.ts index f45cf863..6120ab9c 100644 --- a/public/configs/ingest_processors/base_ingest_processor.ts +++ b/public/configs/ingest_processors/base_ingest_processor.ts @@ -10,10 +10,8 @@ import { BaseConfig } from '../base_config'; */ export abstract class BaseIngestProcessor extends BaseConfig { name: string; - type: string; constructor() { super(); this.name = ''; - this.type = ''; } } diff --git a/public/configs/ingest_processors/model_processor.ts b/public/configs/ingest_processors/model_processor.ts new file mode 100644 index 00000000..b550f41e --- /dev/null +++ b/public/configs/ingest_processors/model_processor.ts @@ -0,0 +1,17 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { MODEL_TYPE } from '../../../common'; +import { BaseIngestProcessor } from './base_ingest_processor'; + +/** + * A base model processor config + */ +export abstract class ModelProcessor extends BaseIngestProcessor { + type: MODEL_TYPE; + constructor() { + super(); + } +} diff --git a/public/configs/ingest_processors/text_embedding_processor.ts b/public/configs/ingest_processors/text_embedding_processor.ts index e30f7a9a..3da301f6 100644 --- a/public/configs/ingest_processors/text_embedding_processor.ts +++ b/public/configs/ingest_processors/text_embedding_processor.ts @@ -3,18 +3,19 @@ * SPDX-License-Identifier: Apache-2.0 */ +import { MODEL_TYPE } from '../../../common'; import { generateId } from '../../utils'; -import { BaseIngestProcessor } from './base_ingest_processor'; +import { ModelProcessor } from './model_processor'; /** * A specialized text embedding processor config */ -export class TextEmbeddingProcessor extends BaseIngestProcessor { +export class TextEmbeddingProcessor extends ModelProcessor { constructor() { super(); this.id = generateId('text_embedding_processor'); this.name = 'Text embedding processor'; - this.type = 'text_embedding'; + this.type = MODEL_TYPE.TEXT_EMBEDDING; this.fields = [ { label: 'Text Embedding Model', diff --git a/public/pages/workflow_detail/resizable_workspace.tsx b/public/pages/workflow_detail/resizable_workspace.tsx index fac2e65f..e0412a1d 100644 --- a/public/pages/workflow_detail/resizable_workspace.tsx +++ b/public/pages/workflow_detail/resizable_workspace.tsx @@ -19,6 +19,7 @@ import { ReactFlowEdge, WorkflowFormValues, WorkflowSchema, + WorkflowConfig, } from '../../../common'; import { APP_PATH, @@ -222,7 +223,10 @@ export function ResizableWorkspace(props: ResizableWorkspaceProps) { setIsSaving(false); } else { setFormValidOnSubmit(true); - const updatedConfig = formikToUiConfig(formikProps.values); + const updatedConfig = formikToUiConfig( + formikProps.values, + workflow?.ui_metadata?.config as WorkflowConfig + ); const updatedWorkflow = { ...workflow, ui_metadata: { diff --git a/public/pages/workflow_detail/utils/workflow_to_template_utils.ts b/public/pages/workflow_detail/utils/workflow_to_template_utils.ts index 0d725a5a..96548545 100644 --- a/public/pages/workflow_detail/utils/workflow_to_template_utils.ts +++ b/public/pages/workflow_detail/utils/workflow_to_template_utils.ts @@ -3,17 +3,11 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { FormikValues } from 'formik'; import { - WorkspaceFlowState, - ReactFlowComponent, TemplateFlows, - NODE_CATEGORY, TemplateNode, - COMPONENT_CLASS, CreateIngestPipelineNode, TextEmbeddingProcessor, - ReactFlowEdge, CreateIndexNode, TemplateFlow, TemplateEdge, @@ -29,11 +23,14 @@ import { NEURAL_SPARSE_TOKENIZER_TRANSFORMER, SparseEncodingProcessor, IndexMappings, - CreateSearchPipelineNode, WORKFLOW_STEP_TYPE, WorkflowConfig, + PROCESSOR_TYPE, + IModelProcessorConfig, + MODEL_TYPE, + IndexConfig, } from '../../../../common'; -import { componentDataToFormik, generateId } from '../../../utils'; +import { generateId, processorConfigToFormik } from '../../../utils'; /** * Given a WorkflowConfig with fully populated input values, @@ -48,146 +45,42 @@ export function configToTemplateFlows(config: WorkflowConfig): TemplateFlows { }; } -// TODO: implement this function configToProvisionTemplateFlow(config: WorkflowConfig): TemplateFlow { const nodes = [] as TemplateNode[]; const edges = [] as TemplateEdge[]; - return { - nodes, - edges, - }; -} - -/** - * Given a ReactFlow workspace flow with fully populated input values, - * generate a backend-compatible set of sub-workflows. - */ -export function toTemplateFlows( - workspaceFlow: WorkspaceFlowState -): TemplateFlows { - const provisionFlow = toProvisionTemplateFlow(workspaceFlow); + // TODO: few assumptions are made here, such as there will always be + // a single model-related processor. In the future make this more flexible and generic. + const modelProcessorConfig = config.ingest.enrich.processors.find( + (processorConfig) => processorConfig.type === PROCESSOR_TYPE.MODEL + ) as IModelProcessorConfig; - // TODO: support beyond provision - return { - provision: provisionFlow, - }; -} - -export function getNodesAndEdgesUnderParent( - parentGroup: NODE_CATEGORY, - allNodes: ReactFlowComponent[], - allEdges: ReactFlowEdge[] -): { nodes: ReactFlowComponent[]; edges: ReactFlowEdge[] } { - const parentId = allNodes.find((node) => node.type === parentGroup) - ?.id as string; - const nodes = allNodes.filter((node) => node.parentNode === parentId); - const nodeIds = nodes.map((node) => node.id); - const edges = allEdges.filter( - (edge) => nodeIds.includes(edge.source) || nodeIds.includes(edge.target) + nodes.push(...modelProcessorConfigToTemplateNodes(modelProcessorConfig)); + nodes.push( + indexConfigToTemplateNode(modelProcessorConfig, config.ingest.index) ); + return { nodes, edges, }; } -// Generates the end-to-end provision subflow, if applicable -function toProvisionTemplateFlow( - workspaceFlow: WorkspaceFlowState -): TemplateFlow { - const { - nodes: ingestNodes, - edges: ingestEdges, - } = getNodesAndEdgesUnderParent( - NODE_CATEGORY.INGEST_GROUP, - workspaceFlow.nodes, - workspaceFlow.edges - ); - const { - nodes: searchNodes, - edges: searchEdges, - } = getNodesAndEdgesUnderParent( - NODE_CATEGORY.SEARCH_GROUP, - workspaceFlow.nodes, - workspaceFlow.edges - ); - - // INGEST: iterate through nodes/edges and generate the valid template nodes - const prevNodes = [] as ReactFlowComponent[]; - const finalTemplateNodes = [] as TemplateNode[]; - const templateEdges = [] as TemplateEdge[]; - ingestNodes.forEach((node) => { - const templateNodes = toTemplateNodes(node, prevNodes, ingestEdges); - // it may be undefined if the node is not convertible for some reason - if (templateNodes) { - finalTemplateNodes.push(...templateNodes); - prevNodes.push(node); - } - }); - - ingestEdges.forEach((edge) => { - // it may be undefined if the edge is not convertible - // (e.g., connecting to some meta/other UI component, like "document" or "query") - const templateEdge = toTemplateEdge(edge); - if (templateEdge) { - templateEdges.push(templateEdge); - } - }); - - // SEARCH: iterate through nodes/edges and generate the valid template nodes - // TODO: currently the scope is limited to only expecting a single search processor - // node, and hence logic is hardcoded to return a single CreateSearchPipelineNode - searchNodes.forEach((node) => { - if (node.data.baseClasses?.includes(COMPONENT_CLASS.RESULTS_TRANSFORMER)) { - const templateNode = resultsTransformerToTemplateNode(node); - finalTemplateNodes.push(templateNode); - } - }); - - return { - nodes: finalTemplateNodes, - edges: templateEdges, - }; -} - -function toTemplateNodes( - flowNode: ReactFlowComponent, - prevNodes: ReactFlowComponent[], - edges: ReactFlowEdge[] -): TemplateNode[] | undefined { - if (flowNode.data.baseClasses?.includes(COMPONENT_CLASS.ML_TRANSFORMER)) { - return transformerToTemplateNodes(flowNode); - } else if (flowNode.data.baseClasses?.includes(COMPONENT_CLASS.INDEXER)) { - return [indexerToTemplateNode(flowNode, prevNodes, edges)]; - } -} - -function toTemplateEdge(flowEdge: ReactFlowEdge): TemplateEdge | undefined { - return isValidTemplateEdge(flowEdge) - ? { - source: flowEdge.source, - dest: flowEdge.target, - } - : undefined; -} - -// General fn to process all ML transform nodes. Convert into a final -// ingest pipeline with a processor specific to the final class of the node. +// General fn to process all ML processor configs. Convert into a final ingest pipeline. // Optionally prepend a register pretrained model step if the selected model -// is a pretrained and undeployed one. -function transformerToTemplateNodes( - flowNode: ReactFlowComponent +// is a pretrained and currently undeployed one. +function modelProcessorConfigToTemplateNodes( + modelProcessorConfig: IModelProcessorConfig ): TemplateNode[] { // TODO improvements to make here: // 1. Consideration of multiple ingest processors and how to collect them all, and finally create // a single ingest pipeline with all of them, in the same order as done on the UI - switch (flowNode.data.type) { - case COMPONENT_CLASS.TEXT_EMBEDDING_TRANSFORMER: - case COMPONENT_CLASS.SPARSE_ENCODER_TRANSFORMER: + switch (modelProcessorConfig.modelType) { + case MODEL_TYPE.TEXT_EMBEDDING: + case MODEL_TYPE.SPARSE_ENCODER: default: { - const { model, inputField, vectorField } = componentDataToFormik( - flowNode.data + const { model, inputField, vectorField } = processorConfigToFormik( + modelProcessorConfig ) as { model: ModelFormValue; inputField: string; @@ -198,7 +91,7 @@ function transformerToTemplateNodes( // register model workflow step type is different per use case const registerModelStepType = - flowNode.data.type === COMPONENT_CLASS.TEXT_EMBEDDING_TRANSFORMER + modelProcessorConfig.modelType === MODEL_TYPE.TEXT_EMBEDDING ? WORKFLOW_STEP_TYPE.REGISTER_LOCAL_PRETRAINED_MODEL_STEP_TYPE : WORKFLOW_STEP_TYPE.REGISTER_LOCAL_SPARSE_ENCODING_MODEL_STEP_TYPE; @@ -240,7 +133,7 @@ function transformerToTemplateNodes( // processor is different per use case const finalProcessor = - flowNode.data.type === COMPONENT_CLASS.TEXT_EMBEDDING_TRANSFORMER + modelProcessorConfig.modelType === MODEL_TYPE.TEXT_EMBEDDING ? ({ text_embedding: { model_id: finalModelId, @@ -260,12 +153,12 @@ function transformerToTemplateNodes( // ingest pipeline is different per use case const finalIngestPipelineDescription = - flowNode.data.type === COMPONENT_CLASS.TEXT_EMBEDDING_TRANSFORMER + modelProcessorConfig.modelType === MODEL_TYPE.TEXT_EMBEDDING ? 'An ingest pipeline with a text embedding processor' : 'An ingest pieline with a neural sparse encoding processor'; const createIngestPipelineStep = { - id: flowNode.data.id, + id: modelProcessorConfig.id, type: WORKFLOW_STEP_TYPE.CREATE_INGEST_PIPELINE_STEP_TYPE, user_inputs: { pipeline_id: ingestPipelineName, @@ -292,156 +185,65 @@ function transformerToTemplateNodes( } } -// General fn to convert an indexer node to a final CreateIndexNode template node. -function indexerToTemplateNode( - flowNode: ReactFlowComponent, - prevNodes: ReactFlowComponent[], - edges: ReactFlowEdge[] +// General fn to convert an index config to a final CreateIndexNode template node. +// Requires the processor configs +function indexConfigToTemplateNode( + modelProcessorConfig: IModelProcessorConfig, + indexConfig: IndexConfig ): CreateIndexNode { - switch (flowNode.data.type) { - case COMPONENT_CLASS.KNN_INDEXER: - default: { - const { indexName } = componentDataToFormik(flowNode.data); - // TODO: remove hardcoded logic here that is assuming each indexer node has - // exactly 1 directly connected create_ingest_pipeline predecessor node that - // contains an inputField and vectorField - const directlyConnectedNode = getDirectlyConnectedNodes( - flowNode, - prevNodes, - edges - )[0]; - - const { inputField, vectorField } = getNodeValues([ - directlyConnectedNode, - ]); + const indexName = indexConfig.name.value as string; + const { inputField, vectorField } = processorConfigToFormik( + modelProcessorConfig + ) as { + inputField: string; + vectorField: string; + }; - // index mappings are different per use case - const finalIndexMappings = { - properties: - directlyConnectedNode.data.type === - COMPONENT_CLASS.TEXT_EMBEDDING_TRANSFORMER - ? { - [vectorField]: { - type: 'knn_vector', - // TODO: remove hardcoding, fetch from the selected model - // (existing or from pretrained configuration) - dimension: 768, - method: { - engine: 'lucene', - space_type: 'l2', - name: 'hnsw', - parameters: {}, - }, - }, - [inputField]: { - type: 'text', - }, - } - : { - [vectorField]: { - type: 'rank_features', - }, - [inputField]: { - type: 'text', - }, + // index mappings are different per use case + const finalIndexMappings = { + properties: + modelProcessorConfig.modelType === MODEL_TYPE.TEXT_EMBEDDING + ? { + [vectorField]: { + type: 'knn_vector', + // TODO: remove hardcoding, fetch from the selected model + // (existing or from pretrained configuration) + dimension: 768, + method: { + engine: 'lucene', + space_type: 'l2', + name: 'hnsw', + parameters: {}, }, - } as IndexMappings; - - return { - id: flowNode.data.id, - type: WORKFLOW_STEP_TYPE.CREATE_INDEX_STEP_TYPE, - previous_node_inputs: { - [directlyConnectedNode.id]: 'pipeline_id', - }, - user_inputs: { - index_name: indexName, - configurations: { - settings: { - default_pipeline: `\${{${directlyConnectedNode.id}.pipeline_id}}`, }, - mappings: finalIndexMappings, + [inputField]: { + type: 'text', + }, + } + : { + [vectorField]: { + type: 'rank_features', + }, + [inputField]: { + type: 'text', + }, }, - }, - }; - } - } -} + } as IndexMappings; -// General fn to process all result transformer nodes. -// TODO: currently hardcoding to return a static configuration of a normalization -// phase results processor. Should make dynamic & generic -function resultsTransformerToTemplateNode( - flowNode: ReactFlowComponent -): CreateSearchPipelineNode { return { - id: flowNode.data.id, - type: WORKFLOW_STEP_TYPE.CREATE_SEARCH_PIPELINE_STEP_TYPE, + id: 'create_index', + type: WORKFLOW_STEP_TYPE.CREATE_INDEX_STEP_TYPE, + previous_node_inputs: { + [modelProcessorConfig.id]: 'pipeline_id', + }, user_inputs: { - pipeline_id: generateId('search_pipeline'), + index_name: indexName, configurations: { - phase_results_processors: [ - { - ['normalization-processor']: { - normalization: { - technique: 'min_max', - }, - combination: { - technique: 'arithmetic_mean', - parameters: { - weights: `[0.3, 0.7]`, - }, - }, - }, - }, - ], + settings: { + default_pipeline: `\${{${modelProcessorConfig.id}.pipeline_id}}`, + }, + mappings: finalIndexMappings, }, }, - } as CreateSearchPipelineNode; -} - -// Fetch all directly connected predecessor nodes -function getDirectlyConnectedNodes( - node: ReactFlowComponent, - prevNodes: ReactFlowComponent[], - edges: ReactFlowEdge[] -): ReactFlowComponent[] { - const directlyConnectedNodeIds = getDirectlyConnectedNodeIds(node, edges); - return prevNodes.filter((prevNode) => - directlyConnectedNodeIds.includes(prevNode.id) - ); -} - -// Get all values for an arr of flow nodes -function getNodeValues(nodes: ReactFlowComponent[]): FormikValues { - let values = {} as FormikValues; - nodes.forEach((node) => { - values = { - ...values, - ...componentDataToFormik(node.data), - }; - }); - return values; -} - -// Fetch all direct predecessor node IDs for a given node -function getDirectlyConnectedNodeIds( - flowNode: ReactFlowComponent, - edges: ReactFlowEdge[] -): string[] { - const incomingNodes = [] as string[]; - edges.forEach((edge) => { - if (edge.target === flowNode.id) { - incomingNodes.push(edge.source); - } - }); - return incomingNodes; -} - -function isValidTemplateEdge(flowEdge: ReactFlowEdge): boolean { - // TODO: may need to expand to handle multiple classes in the future (e.g., some 'query' component) - const invalidClass = COMPONENT_CLASS.DOCUMENT; - return ( - !flowEdge.sourceClasses?.includes(invalidClass) && - !flowEdge.targetClasses?.includes(invalidClass) - ); + }; } diff --git a/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx b/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx index 91c677c2..69e28fd9 100644 --- a/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx +++ b/public/pages/workflow_detail/workflow_inputs/workflow_inputs.tsx @@ -61,7 +61,11 @@ export function WorkflowInputs(props: WorkflowInputsProps) { {selectedStep === CREATE_STEP.INGEST ? ( { - let fieldValues = {} as FormikValues; - processorConfig.fields.forEach((field) => { - fieldValues[field.id] = field.value || getInitialValue(field.type); - }); - formValues[processorConfig.id] = fieldValues; + formValues[processorConfig.id] = processorConfigToFormik(processorConfig); }); - return formValues; } +export function processorConfigToFormik( + processorConfig: IProcessorConfig +): FormikValues { + const fieldValues = {} as FormikValues; + processorConfig.fields.forEach((field) => { + fieldValues[field.id] = field.value || getInitialValue(field.type); + }); + return fieldValues; +} + function indexConfigToFormik(indexConfig: IndexConfig): FormikValues { let formValues = {} as FormikValues; formValues['name'] = @@ -103,16 +109,60 @@ function searchConfigToFormik( return searchFormikValues; } -// TODO: this may need more tuning. Currently this is force-converting the FormikValues obj -// into a WorkflowConfig obj. It takes the assumption the form will include all possible -// config values, including defaults. +// Injecting all of the form values into the config export function formikToUiConfig( - formValues: WorkflowFormValues + formValues: WorkflowFormValues, + existingConfig: WorkflowConfig ): WorkflowConfig { - const workflowConfig = {} as WorkflowConfig; - workflowConfig['ingest'] = formValues.ingest as IngestConfig; - workflowConfig['search'] = formValues.search as SearchConfig; - return workflowConfig; + let updatedConfig = cloneDeep(existingConfig); + updatedConfig['ingest'] = formikToIngestUiConfig( + formValues.ingest, + updatedConfig.ingest + ); + updatedConfig['search'] = {} as SearchConfig; + + return { + ...updatedConfig, + ingest: formikToIngestUiConfig(formValues.ingest, updatedConfig.ingest), + }; +} + +function formikToIngestUiConfig( + ingestFormValues: FormikValues, + existingConfig: IngestConfig +): IngestConfig { + return { + ...existingConfig, + enrich: formikToEnrichUiConfig( + ingestFormValues['enrich'], + existingConfig.enrich + ), + index: formikToIndexUiConfig( + ingestFormValues['index'], + existingConfig.index + ), + }; +} + +function formikToEnrichUiConfig( + enrichFormValues: FormikValues, + existingConfig: EnrichConfig +): EnrichConfig { + existingConfig.processors.forEach((processorConfig) => { + const processorFormValues = enrichFormValues[processorConfig.id]; + processorConfig.fields.forEach((processorField) => { + processorField.value = processorFormValues[processorField.id]; + }); + }); + return existingConfig; +} + +function formikToIndexUiConfig( + indexFormValues: FormikValues, + existingConfig: IndexConfig +): IndexConfig { + existingConfig['name'].value = indexFormValues['name']; + return existingConfig; } /*