Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Finish onboarding ML inference search request / response processors #183

Merged
merged 1 commit into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 114 additions & 34 deletions public/pages/workflow_detail/utils/workflow_to_template_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import {
MLInferenceProcessor,
MapFormValue,
IngestProcessor,
SearchProcessor,
IngestConfig,
SearchConfig,
CreateSearchPipelineNode,
} from '../../../../common';
import { generateId, processorConfigToFormik } from '../../../utils';

Expand All @@ -41,15 +45,22 @@ function configToProvisionTemplateFlow(config: WorkflowConfig): TemplateFlow {
const edges = [] as TemplateEdge[];

nodes.push(
...processorConfigsToTemplateNodes(config.ingest.enrich.processors)
...ingestConfigToTemplateNodes(config.ingest),
...searchConfigToTemplateNodes(config.search)
);

const createIngestPipelineNode = nodes.find(
(node) => node.type === WORKFLOW_STEP_TYPE.CREATE_INGEST_PIPELINE_STEP_TYPE
) as CreateIngestPipelineNode;
const createSearchPipelineNode = nodes.find(
(node) => node.type === WORKFLOW_STEP_TYPE.CREATE_SEARCH_PIPELINE_STEP_TYPE
) as CreateSearchPipelineNode;

nodes.push(
indexConfigToTemplateNode(
config.ingest.index,
nodes.find(
(node) =>
node.type === WORKFLOW_STEP_TYPE.CREATE_INGEST_PIPELINE_STEP_TYPE
) as CreateIngestPipelineNode
createIngestPipelineNode,
createSearchPipelineNode
)
);

Expand All @@ -59,12 +70,68 @@ function configToProvisionTemplateFlow(config: WorkflowConfig): TemplateFlow {
};
}

// General fn to process all processor configs. Generate a final
// ingest pipeline containing all of the processors, maintaining order
function processorConfigsToTemplateNodes(
processorConfigs: IProcessorConfig[]
function ingestConfigToTemplateNodes(
ingestConfig: IngestConfig
): TemplateNode[] {
const processorsList = [] as IngestProcessor[];
const ingestPipelineName = generateId('ingest_pipeline');
const ingestProcessors = processorConfigsToTemplateProcessors(
ingestConfig.enrich.processors
);
const hasProcessors = ingestProcessors.length > 0;

return hasProcessors
? [
{
id: ingestPipelineName,
type: WORKFLOW_STEP_TYPE.CREATE_INGEST_PIPELINE_STEP_TYPE,
user_inputs: {
pipeline_id: ingestPipelineName,
configurations: {
description: 'An ingest pipeline',
processors: ingestProcessors,
},
},
} as CreateIngestPipelineNode,
]
: [];
}

function searchConfigToTemplateNodes(
searchConfig: SearchConfig
): TemplateNode[] {
const searchPipelineName = generateId('search_pipeline');
const searchRequestProcessors = processorConfigsToTemplateProcessors(
searchConfig.enrichRequest.processors
);
const searchResponseProcessors = processorConfigsToTemplateProcessors(
searchConfig.enrichResponse.processors
);
const hasProcessors =
searchRequestProcessors.length > 0 || searchResponseProcessors.length > 0;

return hasProcessors
? [
{
id: searchPipelineName,
type: WORKFLOW_STEP_TYPE.CREATE_SEARCH_PIPELINE_STEP_TYPE,
user_inputs: {
pipeline_id: searchPipelineName,
configurations: {
request_processors: searchRequestProcessors,
response_processors: searchResponseProcessors,
},
},
} as CreateSearchPipelineNode,
]
: [];
}

// General fn to process all processor configs and convert them
// into a final list of template-formatted IngestProcessor/SearchProcessors.
function processorConfigsToTemplateProcessors(
processorConfigs: IProcessorConfig[]
): (IngestProcessor | SearchProcessor)[] {
const processorsList = [] as (IngestProcessor | SearchProcessor)[];

processorConfigs.forEach((processorConfig) => {
// TODO: support more processor types
Expand Down Expand Up @@ -100,49 +167,62 @@ function processorConfigsToTemplateNodes(
}
});

const ingestPipelineName = generateId('ingest_pipeline');
return [
{
id: ingestPipelineName,
type: WORKFLOW_STEP_TYPE.CREATE_INGEST_PIPELINE_STEP_TYPE,
user_inputs: {
pipeline_id: ingestPipelineName,
configurations: {
description: 'An ingest pipeline',
processors: processorsList,
},
},
} as CreateIngestPipelineNode,
];
return processorsList;
}

// General fn to convert an index config to a final CreateIndexNode template node.
// Requires any ingest/pipeline node details to set any defaults
// Requires any ingest/pipeline node details to set any defaults, if applicable.
function indexConfigToTemplateNode(
indexConfig: IndexConfig,
ingestPipelineNode: CreateIngestPipelineNode
ingestPipelineNode?: CreateIngestPipelineNode,
searchPipelineNode?: CreateSearchPipelineNode
): CreateIndexNode {
const indexName = indexConfig.name.value as string;

// TODO: extract model details to determine the mappings

// index mappings are different per use case
const finalIndexMappings = {
properties: {},
} as IndexMappings;

let finalPreviousNodeInputs = {};
let finalSettings = {};

function updateFinalInputsAndSettings(
createPipelineNode:
| CreateIngestPipelineNode
| CreateSearchPipelineNode
| undefined
): void {
if (createPipelineNode) {
finalPreviousNodeInputs = {
...finalPreviousNodeInputs,
[createPipelineNode.id]: 'pipeline_id',
};

// Search and ingest pipelines expect different keys for setting index defaults
const pipelineKey =
createPipelineNode.type ===
WORKFLOW_STEP_TYPE.CREATE_INGEST_PIPELINE_STEP_TYPE
? 'default_pipeline'
: 'index.search.default_pipeline';

finalSettings = {
...finalSettings,
[pipelineKey]: `\${{${createPipelineNode.id}.pipeline_id}}`,
};
}
}
updateFinalInputsAndSettings(ingestPipelineNode);
updateFinalInputsAndSettings(searchPipelineNode);

return {
id: 'create_index',
type: WORKFLOW_STEP_TYPE.CREATE_INDEX_STEP_TYPE,
previous_node_inputs: {
[ingestPipelineNode.id]: 'pipeline_id',
},
previous_node_inputs: finalPreviousNodeInputs,
user_inputs: {
index_name: indexName,
configurations: {
settings: {
default_pipeline: `\${{${ingestPipelineNode.id}.pipeline_id}}`,
},
settings: finalSettings,
mappings: finalIndexMappings,
},
},
Expand Down
Loading
Loading