From a1048038c667c7c458b1e53d1f852ea0230234dd Mon Sep 17 00:00:00 2001 From: Tyler Ohlsen Date: Mon, 17 Jun 2024 14:24:17 -0700 Subject: [PATCH] Finish onboarding ML inference search request / response processors (#182) * Support dynamic workflow generation based on req/resp processors Signed-off-by: Tyler Ohlsen * Onboard search processors to be dynamically added to template Signed-off-by: Tyler Ohlsen * one line improvement Signed-off-by: Tyler Ohlsen * Minor refactorings Signed-off-by: Tyler Ohlsen --------- Signed-off-by: Tyler Ohlsen (cherry picked from commit d0a9846bda675fa5a27b7f8c0c5277b4777eefc9) --- .../utils/workflow_to_template_utils.ts | 148 ++++++++++++---- public/utils/utils.ts | 163 +++++++++--------- 2 files changed, 196 insertions(+), 115 deletions(-) diff --git a/public/pages/workflow_detail/utils/workflow_to_template_utils.ts b/public/pages/workflow_detail/utils/workflow_to_template_utils.ts index a7848f2f..5e52e803 100644 --- a/public/pages/workflow_detail/utils/workflow_to_template_utils.ts +++ b/public/pages/workflow_detail/utils/workflow_to_template_utils.ts @@ -20,6 +20,10 @@ import { MLInferenceProcessor, MapFormValue, IngestProcessor, + SearchProcessor, + IngestConfig, + SearchConfig, + CreateSearchPipelineNode, } from '../../../../common'; import { generateId, processorConfigToFormik } from '../../../utils'; @@ -41,15 +45,22 @@ function configToProvisionTemplateFlow(config: WorkflowConfig): TemplateFlow { const edges = [] as TemplateEdge[]; nodes.push( - ...processorConfigsToTemplateNodes(config.ingest.enrich.processors) + ...ingestConfigToTemplateNodes(config.ingest), + ...searchConfigToTemplateNodes(config.search) ); + + const createIngestPipelineNode = nodes.find( + (node) => node.type === WORKFLOW_STEP_TYPE.CREATE_INGEST_PIPELINE_STEP_TYPE + ) as CreateIngestPipelineNode; + const createSearchPipelineNode = nodes.find( + (node) => node.type === WORKFLOW_STEP_TYPE.CREATE_SEARCH_PIPELINE_STEP_TYPE + ) as CreateSearchPipelineNode; + nodes.push( indexConfigToTemplateNode( config.ingest.index, - nodes.find( - (node) => - node.type === WORKFLOW_STEP_TYPE.CREATE_INGEST_PIPELINE_STEP_TYPE - ) as CreateIngestPipelineNode + createIngestPipelineNode, + createSearchPipelineNode ) ); @@ -59,12 +70,68 @@ function configToProvisionTemplateFlow(config: WorkflowConfig): TemplateFlow { }; } -// General fn to process all processor configs. Generate a final -// ingest pipeline containing all of the processors, maintaining order -function processorConfigsToTemplateNodes( - processorConfigs: IProcessorConfig[] +function ingestConfigToTemplateNodes( + ingestConfig: IngestConfig ): TemplateNode[] { - const processorsList = [] as IngestProcessor[]; + const ingestPipelineName = generateId('ingest_pipeline'); + const ingestProcessors = processorConfigsToTemplateProcessors( + ingestConfig.enrich.processors + ); + const hasProcessors = ingestProcessors.length > 0; + + return hasProcessors + ? [ + { + id: ingestPipelineName, + type: WORKFLOW_STEP_TYPE.CREATE_INGEST_PIPELINE_STEP_TYPE, + user_inputs: { + pipeline_id: ingestPipelineName, + configurations: { + description: 'An ingest pipeline', + processors: ingestProcessors, + }, + }, + } as CreateIngestPipelineNode, + ] + : []; +} + +function searchConfigToTemplateNodes( + searchConfig: SearchConfig +): TemplateNode[] { + const searchPipelineName = generateId('search_pipeline'); + const searchRequestProcessors = processorConfigsToTemplateProcessors( + searchConfig.enrichRequest.processors + ); + const searchResponseProcessors = processorConfigsToTemplateProcessors( + searchConfig.enrichResponse.processors + ); + const hasProcessors = + searchRequestProcessors.length > 0 || searchResponseProcessors.length > 0; + + return hasProcessors + ? [ + { + id: searchPipelineName, + type: WORKFLOW_STEP_TYPE.CREATE_SEARCH_PIPELINE_STEP_TYPE, + user_inputs: { + pipeline_id: searchPipelineName, + configurations: { + request_processors: searchRequestProcessors, + response_processors: searchResponseProcessors, + }, + }, + } as CreateSearchPipelineNode, + ] + : []; +} + +// General fn to process all processor configs and convert them +// into a final list of template-formatted IngestProcessor/SearchProcessors. +function processorConfigsToTemplateProcessors( + processorConfigs: IProcessorConfig[] +): (IngestProcessor | SearchProcessor)[] { + const processorsList = [] as (IngestProcessor | SearchProcessor)[]; processorConfigs.forEach((processorConfig) => { // TODO: support more processor types @@ -100,49 +167,62 @@ function processorConfigsToTemplateNodes( } }); - const ingestPipelineName = generateId('ingest_pipeline'); - return [ - { - id: ingestPipelineName, - type: WORKFLOW_STEP_TYPE.CREATE_INGEST_PIPELINE_STEP_TYPE, - user_inputs: { - pipeline_id: ingestPipelineName, - configurations: { - description: 'An ingest pipeline', - processors: processorsList, - }, - }, - } as CreateIngestPipelineNode, - ]; + return processorsList; } // General fn to convert an index config to a final CreateIndexNode template node. -// Requires any ingest/pipeline node details to set any defaults +// Requires any ingest/pipeline node details to set any defaults, if applicable. function indexConfigToTemplateNode( indexConfig: IndexConfig, - ingestPipelineNode: CreateIngestPipelineNode + ingestPipelineNode?: CreateIngestPipelineNode, + searchPipelineNode?: CreateSearchPipelineNode ): CreateIndexNode { const indexName = indexConfig.name.value as string; // TODO: extract model details to determine the mappings - - // index mappings are different per use case const finalIndexMappings = { properties: {}, } as IndexMappings; + let finalPreviousNodeInputs = {}; + let finalSettings = {}; + + function updateFinalInputsAndSettings( + createPipelineNode: + | CreateIngestPipelineNode + | CreateSearchPipelineNode + | undefined + ): void { + if (createPipelineNode) { + finalPreviousNodeInputs = { + ...finalPreviousNodeInputs, + [createPipelineNode.id]: 'pipeline_id', + }; + + // Search and ingest pipelines expect different keys for setting index defaults + const pipelineKey = + createPipelineNode.type === + WORKFLOW_STEP_TYPE.CREATE_INGEST_PIPELINE_STEP_TYPE + ? 'default_pipeline' + : 'index.search.default_pipeline'; + + finalSettings = { + ...finalSettings, + [pipelineKey]: `\${{${createPipelineNode.id}.pipeline_id}}`, + }; + } + } + updateFinalInputsAndSettings(ingestPipelineNode); + updateFinalInputsAndSettings(searchPipelineNode); + return { id: 'create_index', type: WORKFLOW_STEP_TYPE.CREATE_INDEX_STEP_TYPE, - previous_node_inputs: { - [ingestPipelineNode.id]: 'pipeline_id', - }, + previous_node_inputs: finalPreviousNodeInputs, user_inputs: { index_name: indexName, configurations: { - settings: { - default_pipeline: `\${{${ingestPipelineNode.id}.pipeline_id}}`, - }, + settings: finalSettings, mappings: finalIndexMappings, }, }, diff --git a/public/utils/utils.ts b/public/utils/utils.ts index 1d618cf5..3e1b3610 100644 --- a/public/utils/utils.ts +++ b/public/utils/utils.ts @@ -408,6 +408,19 @@ function generateIngestParentWidth(ingestConfig: IngestConfig): number { ); } +// Helper fn for determining the search parent width, based on the number of +// search request processors, search response processors, and the specified +// spacing/margin between nodes +function generateSearchParentWidth(searchConfig: SearchConfig): number { + return ( + (searchConfig.enrichRequest.processors.length + + searchConfig.enrichResponse.processors.length + + 3) * + (NODE_WIDTH + NODE_SPACING) + + NODE_SPACING + ); +} + function ingestConfigToWorkspaceFlow( ingestConfig: IngestConfig ): WorkspaceFlowState { @@ -454,9 +467,10 @@ function ingestConfigToWorkspaceFlow( nodes.push(docNode, indexNode); // Get nodes/edges from the sub-configurations - const enrichWorkspaceFlow = enrichConfigToWorkspaceFlow( + const enrichWorkspaceFlow = processorsConfigToWorkspaceFlow( ingestConfig.enrich, - parentNode.id + parentNode.id, + NODE_WIDTH + NODE_SPACING * 2 // node padding + (width of doc node) + node padding ); nodes.push(...enrichWorkspaceFlow.nodes); @@ -471,56 +485,6 @@ function ingestConfigToWorkspaceFlow( }; } -// TODO: support non-model-type processor configs -function enrichConfigToWorkspaceFlow( - enrichConfig: ProcessorsConfig, - parentNodeId: string -): WorkspaceFlowState { - const nodes = [] as ReactFlowComponent[]; - const edges = [] as ReactFlowEdge[]; - - let xPosition = NODE_WIDTH + NODE_SPACING * 2; // node padding + (width of doc node) + node padding - let prevNodeId = undefined as string | undefined; - - const mlProcessorConfigs = enrichConfig.processors.filter( - (processorConfig) => processorConfig.type === PROCESSOR_TYPE.ML - ) as IProcessorConfig[]; - - mlProcessorConfigs.forEach((mlProcessorConfig) => { - let transformer = {} as MLTransformer; - let transformerNodeId = ''; - switch (mlProcessorConfig.type) { - case PROCESSOR_TYPE.ML: - default: { - transformer = new MLTransformer(); - transformerNodeId = generateId(COMPONENT_CLASS.ML_TRANSFORMER); - break; - } - } - - nodes.push({ - id: transformerNodeId, - position: { x: xPosition, y: NODE_HEIGHT_Y }, - data: initComponentData(transformer, transformerNodeId), - type: NODE_CATEGORY.CUSTOM, - parentNode: parentNodeId, - extent: 'parent', - }); - xPosition += NODE_SPACING + NODE_WIDTH; - - if (prevNodeId) { - edges.push( - generateReactFlowEdge(generateId('edge'), prevNodeId, transformerNodeId) - ); - } - prevNodeId = transformerNodeId; - }); - return { - nodes, - edges, - }; -} - // Given the set of localized flows per sub-configuration, generate the global ingest-level edges. // This takes the assumption the flow is linear, and all sub-configuration flows are fully connected. function getIngestEdges( @@ -568,7 +532,7 @@ function searchConfigToWorkspaceFlow( type: NODE_CATEGORY.SEARCH_GROUP, data: { label: COMPONENT_CATEGORY.SEARCH }, style: { - width: 1300, + width: generateSearchParentWidth(searchConfig), height: PARENT_NODE_HEIGHT, }, className: 'reactflow__group-node__search', @@ -576,6 +540,20 @@ function searchConfigToWorkspaceFlow( nodes.push(parentNode); + // Get nodes/edges from the processor sub-configurations + const enrichRequestWorkspaceFlow = processorsConfigToWorkspaceFlow( + searchConfig.enrichRequest, + parentNode.id, + NODE_WIDTH + NODE_SPACING * 2 // node padding + (width of query node) + node padding + ); + const enrichResponseWorkspaceFlow = processorsConfigToWorkspaceFlow( + searchConfig.enrichResponse, + parentNode.id, + NODE_SPACING + + (NODE_WIDTH + NODE_SPACING) * + (enrichRequestWorkspaceFlow.nodes.length + 2) // node padding + (width + padding of query node, any request processor nodes, and index node) + ); + // By default, always include a query node, an index node, and a results node. const queryNodeId = generateId(COMPONENT_CLASS.NEURAL_QUERY); const queryNode = { @@ -589,7 +567,13 @@ function searchConfigToWorkspaceFlow( const indexNodeId = generateId(COMPONENT_CLASS.KNN_INDEXER); const indexNode = { id: indexNodeId, - position: { x: 500, y: 70 }, + position: { + x: + parentNode.style.width - + (NODE_WIDTH + NODE_SPACING) * + (enrichResponseWorkspaceFlow.nodes.length + 2), + y: NODE_HEIGHT_Y, + }, data: initComponentData(new KnnIndexer().toObj(), indexNodeId), type: NODE_CATEGORY.CUSTOM, parentNode: parentNode.id, @@ -598,7 +582,10 @@ function searchConfigToWorkspaceFlow( const resultsNodeId = generateId(COMPONENT_CLASS.RESULTS); const resultsNode = { id: resultsNodeId, - position: { x: 900, y: 70 }, + position: { + x: parentNode.style.width - (NODE_WIDTH + NODE_SPACING), + y: NODE_HEIGHT_Y, + }, data: initComponentData(new Results().toObj(), resultsNodeId), type: NODE_CATEGORY.CUSTOM, parentNode: parentNode.id, @@ -606,16 +593,6 @@ function searchConfigToWorkspaceFlow( } as ReactFlowComponent; nodes.push(queryNode, indexNode, resultsNode); - // Get nodes/edges from the sub-configurations - const enrichRequestWorkspaceFlow = enrichRequestConfigToWorkspaceFlow( - searchConfig.enrichRequest, - parentNode.id - ); - const enrichResponseWorkspaceFlow = enrichResponseConfigToWorkspaceFlow( - searchConfig.enrichResponse, - parentNode.id - ); - nodes.push( ...enrichRequestWorkspaceFlow.nodes, ...enrichResponseWorkspaceFlow.nodes @@ -642,28 +619,52 @@ function searchConfigToWorkspaceFlow( }; } -// TODO: implement this -function enrichRequestConfigToWorkspaceFlow( - enrichRequestConfig: ProcessorsConfig, - parentNodeId: string +// Helper fn to generate a dynamic list of processor nodes +// based on the list of processors in a config +// TODO: support non-model-type processors +function processorsConfigToWorkspaceFlow( + processorsConfig: ProcessorsConfig, + parentNodeId: string, + xPosition: number ): WorkspaceFlowState { const nodes = [] as ReactFlowComponent[]; const edges = [] as ReactFlowEdge[]; - return { - nodes, - edges, - }; -} + let prevNodeId = undefined as string | undefined; -// TODO: implement this -function enrichResponseConfigToWorkspaceFlow( - enrichRequestConfig: ProcessorsConfig, - parentNodeId: string -): WorkspaceFlowState { - const nodes = [] as ReactFlowComponent[]; - const edges = [] as ReactFlowEdge[]; + const mlProcessorConfigs = processorsConfig.processors.filter( + (processorConfig) => processorConfig.type === PROCESSOR_TYPE.ML + ) as IProcessorConfig[]; + mlProcessorConfigs.forEach((mlProcessorConfig) => { + let transformer = {} as MLTransformer; + let transformerNodeId = ''; + switch (mlProcessorConfig.type) { + case PROCESSOR_TYPE.ML: + default: { + transformer = new MLTransformer(); + transformerNodeId = generateId(COMPONENT_CLASS.ML_TRANSFORMER); + break; + } + } + + nodes.push({ + id: transformerNodeId, + position: { x: xPosition, y: NODE_HEIGHT_Y }, + data: initComponentData(transformer, transformerNodeId), + type: NODE_CATEGORY.CUSTOM, + parentNode: parentNodeId, + extent: 'parent', + }); + xPosition += NODE_SPACING + NODE_WIDTH; + + if (prevNodeId) { + edges.push( + generateReactFlowEdge(generateId('edge'), prevNodeId, transformerNodeId) + ); + } + prevNodeId = transformerNodeId; + }); return { nodes, edges,