diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/operations/_component_operations.py b/sdk/ml/azure-ai-ml/azure/ai/ml/operations/_component_operations.py index 94605da3d6e1..dfa8a52bc492 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/operations/_component_operations.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/operations/_component_operations.py @@ -663,17 +663,21 @@ def _try_resolve_compute_for_node(cls, node: BaseNode, _: str, resolver): @classmethod def _divide_nodes_to_resolve_into_layers(cls, component: PipelineComponent, extra_operations: List[Callable]): - """Traverse the pipeline component and divide nodes to resolve into layers. + """Traverse the pipeline component and divide nodes to resolve into layers. Note that all leaf nodes will be + put in the last layer. For example, for below pipeline component, assuming that all nodes need to be resolved: A /|\ B C D | | E F + | + G return value will be: [ - [("B", B), ("C", C), ("D", D)], - [("E", E), ("F", F)], + [("B", B), ("C", C)], + [("E", E)], + [("D", D), ("F", F), ("G", G)], ] :param component: The pipeline component to resolve. @@ -683,32 +687,35 @@ def _divide_nodes_to_resolve_into_layers(cls, component: PipelineComponent, extr :return: A list of layers of nodes to resolve. :rtype: List[List[Tuple[str, BaseNode]]] """ - # add an empty layer to mark the end of the first layer - layers, cur_layer_head, cur_layer = [list(component.jobs.items()), []], 0, 0 - - while cur_layer < len(layers) and cur_layer_head < len(layers[cur_layer]): - key, job_instance = layers[cur_layer][cur_layer_head] - cur_layer_head += 1 - - cls._resolve_binding_on_supported_fields_for_node(job_instance) - if isinstance(job_instance, LoopNode): - job_instance = job_instance.body - - for extra_operation in extra_operations: - extra_operation(job_instance, key) + nodes_to_process = list(component.jobs.items()) + layers = [] + leaf_nodes = [] + + while nodes_to_process: + layers.append([]) + new_nodes_to_process = [] + for key, job_instance in nodes_to_process: + cls._resolve_binding_on_supported_fields_for_node(job_instance) + if isinstance(job_instance, LoopNode): + job_instance = job_instance.body - if isinstance(job_instance, BaseNode) and isinstance(job_instance._component, PipelineComponent): - if cur_layer + 1 == len(layers): - layers.append([]) - layers[cur_layer + 1].extend(job_instance.component.jobs.items()) + for extra_operation in extra_operations: + extra_operation(job_instance, key) - if cur_layer_head == len(layers[cur_layer]): - cur_layer += 1 - cur_layer_head = 0 + if isinstance(job_instance, BaseNode) and isinstance(job_instance._component, PipelineComponent): + # candidates for next layer + new_nodes_to_process.extend(job_instance.component.jobs.items()) + # use layers to store pipeline nodes in each layer for now + layers[-1].append((key, job_instance)) + else: + # note that LoopNode has already been replaced by its body here + leaf_nodes.append((key, job_instance)) + nodes_to_process = new_nodes_to_process - # if there is no subgraph, pop the empty layer inserted at the beginning - if len(layers[-1]) == 0: + # if there is subgraph, the last item in layers will be empty for now as all leaf nodes are stored in leaf_nodes + if len(layers) != 0: layers.pop() + layers.append(leaf_nodes) return layers @@ -761,9 +768,6 @@ def _resolve_dependencies_for_pipeline_component_jobs( for layer in reversed(layers): for _, job_instance in layer: - if isinstance(job_instance, LoopNode): - job_instance = job_instance.body - if isinstance(job_instance, AutoMLJob): # only compute is resolved here self._job_operations._resolve_arm_id_for_automl_job(job_instance, resolver, inside_pipeline=True) diff --git a/sdk/ml/azure-ai-ml/tests/component/unittests/test_pipeline_component_entity.py b/sdk/ml/azure-ai-ml/tests/component/unittests/test_pipeline_component_entity.py index 699db05582bd..4add592c7c11 100644 --- a/sdk/ml/azure-ai-ml/tests/component/unittests/test_pipeline_component_entity.py +++ b/sdk/ml/azure-ai-ml/tests/component/unittests/test_pipeline_component_entity.py @@ -5,11 +5,13 @@ import pytest import yaml + from azure.ai.ml import Input, load_component, load_job from azure.ai.ml._restclient.v2022_05_01.models import ComponentVersionData from azure.ai.ml.entities import Component, PipelineComponent, PipelineJob from azure.ai.ml.entities._inputs_outputs import GroupInput from azure.ai.ml.entities._job.pipeline._io import PipelineInput, _GroupAttrDict +from azure.ai.ml.operations import ComponentOperations from .._util import _COMPONENT_TIMEOUT_SECOND @@ -434,3 +436,30 @@ def test_simple_jobs_from_rest(self) -> None: "component_a_job" ] assert obj_node_dict == node_dict + + def test_divide_nodes_to_resolve_into_layers(self): + component_path = "./tests/test_configs/components/helloworld_multi_layer_pipeline_component.yml" + component: PipelineComponent = load_component(source=component_path) + + node_name_list = [] + + def extra_operation(node, node_name: str) -> None: + node_name_list.append((node_name, node.type)) + + layers = ComponentOperations._divide_nodes_to_resolve_into_layers(component, [extra_operation]) + # all 6 nodes has been processed by extra_operation + assert len(node_name_list) == 6 + + def get_layer_node_name_set(layer): + return set([node_name for node_name, _ in layer]) + + # 3 layers + assert len(layers) == 3 + assert len(layers[0]) == 2 + assert get_layer_node_name_set(layers[0]) == {"pipeline_component_1", "pipeline_component_2"} + assert len(layers[1]) == 1 + assert get_layer_node_name_set(layers[1]) == {"pipeline_component"} + assert len(layers[2]) == 3 + # all leaf nodes in last layer + # 2 leaf node of the same node name + assert get_layer_node_name_set(layers[2]) == {"command_component", "component_a_job"} diff --git a/sdk/ml/azure-ai-ml/tests/test_configs/components/helloworld_multi_layer_pipeline_component.yml b/sdk/ml/azure-ai-ml/tests/test_configs/components/helloworld_multi_layer_pipeline_component.yml new file mode 100644 index 000000000000..11934c4197bf --- /dev/null +++ b/sdk/ml/azure-ai-ml/tests/test_configs/components/helloworld_multi_layer_pipeline_component.yml @@ -0,0 +1,51 @@ +$schema: https://azuremlschemas.azureedge.net/development/pipelineComponent.schema.json +type: pipeline + +name: helloworld_pipeline_component +display_name: Hello World Pipeline Component +description: This is the basic pipeline component +tags: + tag: tagvalue + owner: sdkteam + +version: 1 + +inputs: + component_in_number: + description: A number for pipeline component + type: number + default: 10.99 + optional: True + component_in_path: + description: A path for pipeline component + type: uri_folder +outputs: + nested_output: + type: uri_folder + nested_output2: + type: uri_folder + + +jobs: + pipeline_component_1: + type: pipeline + component: ./helloworld_nested_pipeline_component.yml + inputs: + component_in_path: ${{parent.inputs.component_in_path}} + outputs: + component_out_path: ${{parent.outputs.nested_output}} + + pipeline_component_2: + type: pipeline + component: ./helloworld_pipeline_component.yml + inputs: + component_in_path: ${{parent.inputs.component_in_path}} + outputs: + output_path: ${{parent.outputs.nested_output2}} + + command_component: + type: command + component: ./helloworld_component.yml + inputs: + component_in_number: ${{parent.inputs.component_in_number}} + component_in_path: ${{parent.inputs.component_in_path}}