Skip to content

Commit

Permalink
perf optimization: resolve all leaf node concurrently (#30281)
Browse files Browse the repository at this point in the history
  • Loading branch information
elliotzh authored May 6, 2023
1 parent cbb3590 commit 564928c
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 28 deletions.
60 changes: 32 additions & 28 deletions sdk/ml/azure-ai-ml/azure/ai/ml/operations/_component_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,17 +663,21 @@ def _try_resolve_compute_for_node(cls, node: BaseNode, _: str, resolver):

@classmethod
def _divide_nodes_to_resolve_into_layers(cls, component: PipelineComponent, extra_operations: List[Callable]):
"""Traverse the pipeline component and divide nodes to resolve into layers.
"""Traverse the pipeline component and divide nodes to resolve into layers. Note that all leaf nodes will be
put in the last layer.
For example, for below pipeline component, assuming that all nodes need to be resolved:
A
/|\
B C D
| |
E F
|
G
return value will be:
[
[("B", B), ("C", C), ("D", D)],
[("E", E), ("F", F)],
[("B", B), ("C", C)],
[("E", E)],
[("D", D), ("F", F), ("G", G)],
]
:param component: The pipeline component to resolve.
Expand All @@ -683,32 +687,35 @@ def _divide_nodes_to_resolve_into_layers(cls, component: PipelineComponent, extr
:return: A list of layers of nodes to resolve.
:rtype: List[List[Tuple[str, BaseNode]]]
"""
# add an empty layer to mark the end of the first layer
layers, cur_layer_head, cur_layer = [list(component.jobs.items()), []], 0, 0

while cur_layer < len(layers) and cur_layer_head < len(layers[cur_layer]):
key, job_instance = layers[cur_layer][cur_layer_head]
cur_layer_head += 1

cls._resolve_binding_on_supported_fields_for_node(job_instance)
if isinstance(job_instance, LoopNode):
job_instance = job_instance.body

for extra_operation in extra_operations:
extra_operation(job_instance, key)
nodes_to_process = list(component.jobs.items())
layers = []
leaf_nodes = []

while nodes_to_process:
layers.append([])
new_nodes_to_process = []
for key, job_instance in nodes_to_process:
cls._resolve_binding_on_supported_fields_for_node(job_instance)
if isinstance(job_instance, LoopNode):
job_instance = job_instance.body

if isinstance(job_instance, BaseNode) and isinstance(job_instance._component, PipelineComponent):
if cur_layer + 1 == len(layers):
layers.append([])
layers[cur_layer + 1].extend(job_instance.component.jobs.items())
for extra_operation in extra_operations:
extra_operation(job_instance, key)

if cur_layer_head == len(layers[cur_layer]):
cur_layer += 1
cur_layer_head = 0
if isinstance(job_instance, BaseNode) and isinstance(job_instance._component, PipelineComponent):
# candidates for next layer
new_nodes_to_process.extend(job_instance.component.jobs.items())
# use layers to store pipeline nodes in each layer for now
layers[-1].append((key, job_instance))
else:
# note that LoopNode has already been replaced by its body here
leaf_nodes.append((key, job_instance))
nodes_to_process = new_nodes_to_process

# if there is no subgraph, pop the empty layer inserted at the beginning
if len(layers[-1]) == 0:
# if there is subgraph, the last item in layers will be empty for now as all leaf nodes are stored in leaf_nodes
if len(layers) != 0:
layers.pop()
layers.append(leaf_nodes)

return layers

Expand Down Expand Up @@ -761,9 +768,6 @@ def _resolve_dependencies_for_pipeline_component_jobs(

for layer in reversed(layers):
for _, job_instance in layer:
if isinstance(job_instance, LoopNode):
job_instance = job_instance.body

if isinstance(job_instance, AutoMLJob):
# only compute is resolved here
self._job_operations._resolve_arm_id_for_automl_job(job_instance, resolver, inside_pipeline=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@

import pytest
import yaml

from azure.ai.ml import Input, load_component, load_job
from azure.ai.ml._restclient.v2022_05_01.models import ComponentVersionData
from azure.ai.ml.entities import Component, PipelineComponent, PipelineJob
from azure.ai.ml.entities._inputs_outputs import GroupInput
from azure.ai.ml.entities._job.pipeline._io import PipelineInput, _GroupAttrDict
from azure.ai.ml.operations import ComponentOperations

from .._util import _COMPONENT_TIMEOUT_SECOND

Expand Down Expand Up @@ -434,3 +436,30 @@ def test_simple_jobs_from_rest(self) -> None:
"component_a_job"
]
assert obj_node_dict == node_dict

def test_divide_nodes_to_resolve_into_layers(self):
component_path = "./tests/test_configs/components/helloworld_multi_layer_pipeline_component.yml"
component: PipelineComponent = load_component(source=component_path)

node_name_list = []

def extra_operation(node, node_name: str) -> None:
node_name_list.append((node_name, node.type))

layers = ComponentOperations._divide_nodes_to_resolve_into_layers(component, [extra_operation])
# all 6 nodes has been processed by extra_operation
assert len(node_name_list) == 6

def get_layer_node_name_set(layer):
return set([node_name for node_name, _ in layer])

# 3 layers
assert len(layers) == 3
assert len(layers[0]) == 2
assert get_layer_node_name_set(layers[0]) == {"pipeline_component_1", "pipeline_component_2"}
assert len(layers[1]) == 1
assert get_layer_node_name_set(layers[1]) == {"pipeline_component"}
assert len(layers[2]) == 3
# all leaf nodes in last layer
# 2 leaf node of the same node name
assert get_layer_node_name_set(layers[2]) == {"command_component", "component_a_job"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
$schema: https://azuremlschemas.azureedge.net/development/pipelineComponent.schema.json
type: pipeline

name: helloworld_pipeline_component
display_name: Hello World Pipeline Component
description: This is the basic pipeline component
tags:
tag: tagvalue
owner: sdkteam

version: 1

inputs:
component_in_number:
description: A number for pipeline component
type: number
default: 10.99
optional: True
component_in_path:
description: A path for pipeline component
type: uri_folder
outputs:
nested_output:
type: uri_folder
nested_output2:
type: uri_folder


jobs:
pipeline_component_1:
type: pipeline
component: ./helloworld_nested_pipeline_component.yml
inputs:
component_in_path: ${{parent.inputs.component_in_path}}
outputs:
component_out_path: ${{parent.outputs.nested_output}}

pipeline_component_2:
type: pipeline
component: ./helloworld_pipeline_component.yml
inputs:
component_in_path: ${{parent.inputs.component_in_path}}
outputs:
output_path: ${{parent.outputs.nested_output2}}

command_component:
type: command
component: ./helloworld_component.yml
inputs:
component_in_number: ${{parent.inputs.component_in_number}}
component_in_path: ${{parent.inputs.component_in_path}}

0 comments on commit 564928c

Please sign in to comment.