Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk): Support using pipeline in exit handlers #8220

Merged
merged 8 commits into from
Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* Support parallelism setting in ParallelFor [\#8146](https://github.com/kubeflow/pipelines/pull/8146)
* Support for Python v3.10 [\#8186](https://github.com/kubeflow/pipelines/pull/8186)
* Support pipeline as a component [\#8179](https://github.com/kubeflow/pipelines/pull/8179), [\#8204](https://github.com/kubeflow/pipelines/pull/8204), [\#8209](https://github.com/kubeflow/pipelines/pull/8209)
* Support using pipeline in exit handlers [\#8220](https://github.com/kubeflow/pipelines/pull/8220)

## Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/python/kfp/compiler/_read_write_test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
'pipeline_in_pipeline_complex',
'pipeline_with_outputs',
'pipeline_in_pipeline_loaded_from_yaml',
'pipeline_as_exit_task',
],
'test_data_dir': 'sdk/python/kfp/compiler/test_data/pipelines',
'config': {
Expand Down
145 changes: 81 additions & 64 deletions sdk/python/kfp/compiler/pipeline_spec_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -1131,9 +1131,6 @@ def build_spec_by_group(
]
is_parent_component_root = (group_component_spec == pipeline_spec.root)

# Track if component spec is addeded from merging pipeline spec.
component_spec_added = False

if isinstance(subgroup, pipeline_task.PipelineTask):

subgroup_task_spec = build_task_spec_for_task(
Expand Down Expand Up @@ -1165,13 +1162,12 @@ def build_spec_by_group(
deployment_config.executors[executor_label].importer.CopyFrom(
subgroup_importer_spec)
elif subgroup.pipeline_spec is not None:
merge_deployment_spec_and_component_spec(
sub_pipeline_spec = merge_deployment_spec_and_component_spec(
main_pipeline_spec=pipeline_spec,
main_deployment_config=deployment_config,
sub_pipeline_spec=subgroup.pipeline_spec,
sub_pipeline_component_name=subgroup_component_name,
)
component_spec_added = True
subgroup_component_spec = sub_pipeline_spec.root
else:
raise RuntimeError
elif isinstance(subgroup, tasks_group.ParallelFor):
Expand Down Expand Up @@ -1270,16 +1266,15 @@ def build_spec_by_group(
subgroup_task_spec.dependent_tasks.extend(
[utils.sanitize_task_name(dep) for dep in group_dependencies])

# Add component spec if not already added from merging pipeline spec.
if not component_spec_added:
subgroup_component_name = utils.make_name_unique_by_adding_index(
name=subgroup_component_name,
collection=list(pipeline_spec.components.keys()),
delimiter='-')
# Add component spec
subgroup_component_name = utils.make_name_unique_by_adding_index(
name=subgroup_component_name,
collection=list(pipeline_spec.components.keys()),
delimiter='-')

subgroup_task_spec.component_ref.name = subgroup_component_name
pipeline_spec.components[subgroup_component_name].CopyFrom(
subgroup_component_spec)
subgroup_task_spec.component_ref.name = subgroup_component_name
pipeline_spec.components[subgroup_component_name].CopyFrom(
subgroup_component_spec)

# Add task spec
group_component_spec.dag.tasks[subgroup.name].CopyFrom(
Expand All @@ -1306,6 +1301,13 @@ def build_exit_handler_groups_recursively(
return
for group in parent_group.groups:
if isinstance(group, tasks_group.ExitHandler):

# remove this if block to support nested exit handlers
if not parent_group.is_root:
raise ValueError(
f'{tasks_group.ExitHandler.__name__} can only be used within the outermost scope of a pipeline function definition. Using an {tasks_group.ExitHandler.__name__} within {group_type_to_dsl_class[parent_group.group_type].__name__} {parent_group.name} is not allowed.'
)

exit_task = group.exit_task
exit_task_name = utils.sanitize_task_name(exit_task.name)
exit_handler_group_task_name = utils.sanitize_task_name(group.name)
Expand All @@ -1319,33 +1321,43 @@ def build_exit_handler_groups_recursively(
exit_task_component_spec = build_component_spec_for_exit_task(
task=exit_task)

exit_task_container_spec = build_container_spec_for_task(
task=exit_task)

# remove this if block to support nested exit handlers
if not parent_group.is_root:
raise ValueError(
f'{tasks_group.ExitHandler.__name__} can only be used within the outermost scope of a pipeline function definition. Using an {tasks_group.ExitHandler.__name__} within {group_type_to_dsl_class[parent_group.group_type].__name__} {parent_group.name} is not allowed.'
# Add exit task container spec if applicable.
if exit_task.container_spec is not None:
exit_task_container_spec = build_container_spec_for_task(
task=exit_task)
executor_label = utils.make_name_unique_by_adding_index(
name=exit_task_component_spec.executor_label,
collection=list(deployment_config.executors.keys()),
delimiter='-')
exit_task_component_spec.executor_label = executor_label
deployment_config.executors[executor_label].container.CopyFrom(
exit_task_container_spec)
elif exit_task.pipeline_spec is not None:
exit_task_pipeline_spec = merge_deployment_spec_and_component_spec(
main_pipeline_spec=pipeline_spec,
main_deployment_config=deployment_config,
sub_pipeline_spec=exit_task.pipeline_spec,
)
exit_task_component_spec = exit_task_pipeline_spec.root
else:
raise RuntimeError
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we add an error message? (e.g. Exit task is missing both container spec and pipeline spec)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.


parent_dag = pipeline_spec.root.dag if parent_group.is_root else pipeline_spec.components[
utils.sanitize_component_name(parent_group.name)].dag
# Add exit task component spec.
component_name = utils.make_name_unique_by_adding_index(
name=exit_task_task_spec.component_ref.name,
collection=list(pipeline_spec.components.keys()),
delimiter='-')
exit_task_task_spec.component_ref.name = component_name
pipeline_spec.components[component_name].CopyFrom(
exit_task_component_spec)

# Add exit task task spec.
parent_dag = pipeline_spec.root.dag
parent_dag.tasks[exit_task_name].CopyFrom(exit_task_task_spec)

# Add exit task component spec if it does not exist.
component_name = exit_task_task_spec.component_ref.name
if component_name not in pipeline_spec.components:
pipeline_spec.components[component_name].CopyFrom(
exit_task_component_spec)
pipeline_spec.deployment_spec.update(
json_format.MessageToDict(deployment_config))

# Add exit task container spec if it does not exist.
executor_label = exit_task_component_spec.executor_label
if executor_label not in deployment_config.executors:
deployment_config.executors[executor_label].container.CopyFrom(
exit_task_container_spec)
pipeline_spec.deployment_spec.update(
json_format.MessageToDict(deployment_config))
build_exit_handler_groups_recursively(
parent_group=group,
pipeline_spec=pipeline_spec,
Expand All @@ -1356,8 +1368,7 @@ def merge_deployment_spec_and_component_spec(
main_pipeline_spec: pipeline_spec_pb2.PipelineSpec,
main_deployment_config: pipeline_spec_pb2.PipelineDeploymentConfig,
sub_pipeline_spec: pipeline_spec_pb2.PipelineSpec,
sub_pipeline_component_name: str,
) -> None:
) -> pipeline_spec_pb2.PipelineSpec:
"""Merges deployment spec and component spec from a sub pipeline spec into
the main spec.

Expand All @@ -1371,8 +1382,9 @@ def merge_deployment_spec_and_component_spec(
sub_pipeline_spec: The pipeline spec of an inner pipeline whose
deployment specs and component specs need to be copied into the main
specs.
sub_pipeline_component_name: The name of sub pipeline's root component
spec.

Returns:
The possibly modified copy of pipeline spec.
"""
# Make a copy of the sub_pipeline_spec so that the "template" remains
# unchanged and works even the pipeline is reused multiple times.
Expand All @@ -1385,8 +1397,8 @@ def merge_deployment_spec_and_component_spec(
_merge_component_spec(
main_pipeline_spec=main_pipeline_spec,
sub_pipeline_spec=sub_pipeline_spec_copy,
sub_pipeline_component_name=sub_pipeline_component_name,
)
return sub_pipeline_spec_copy


def _merge_deployment_spec(
Expand Down Expand Up @@ -1422,12 +1434,12 @@ def _rename_executor_labels(

for executor_label, executor_spec in sub_deployment_config.executors.items(
):
if executor_label in main_deployment_config.executors:
old_executor_label = executor_label
executor_label = utils.make_name_unique_by_adding_index(
name=executor_label,
collection=list(main_deployment_config.executors.keys()),
delimiter='-')
old_executor_label = executor_label
executor_label = utils.make_name_unique_by_adding_index(
name=executor_label,
collection=list(main_deployment_config.executors.keys()),
delimiter='-')
if executor_label != old_executor_label:
_rename_executor_labels(
pipeline_spec=sub_pipeline_spec,
old_executor_label=old_executor_label,
Expand All @@ -1439,7 +1451,6 @@ def _rename_executor_labels(
def _merge_component_spec(
main_pipeline_spec: pipeline_spec_pb2.PipelineSpec,
sub_pipeline_spec: pipeline_spec_pb2.PipelineSpec,
sub_pipeline_component_name: str,
) -> None:
"""Merges component spec from a sub pipeline spec into the main config.

Expand All @@ -1451,8 +1462,6 @@ def _merge_component_spec(
main_pipeline_spec: The main pipeline spec to merge into.
sub_pipeline_spec: The pipeline spec of an inner pipeline whose
component specs need to be merged into the global config.
sub_pipeline_component_name: The name of sub pipeline's root component
spec.
"""

def _rename_component_refs(
Expand All @@ -1462,30 +1471,38 @@ def _rename_component_refs(
) -> None:
"""Renames the old component_ref to the new one in task spec."""
for _, component_spec in pipeline_spec.components.items():
if component_spec.dag:
for _, task_spec in component_spec.dag.tasks.items():
if task_spec.component_ref.name == old_component_ref:
task_spec.component_ref.name = new_component_ref
if not component_spec.dag:
continue
for _, task_spec in component_spec.dag.tasks.items():
if task_spec.component_ref.name == old_component_ref:
task_spec.component_ref.name = new_component_ref

for _, task_spec in pipeline_spec.root.dag.tasks.items():
if task_spec.component_ref.name == old_component_ref:
task_spec.component_ref.name = new_component_ref

# Do all the renaming in place, then do the acutal merge of component specs
# in a second pass. This would ensure all component specs are in the final
# state at the time of merging.
old_name_to_new_name = {}
for component_name, component_spec in sub_pipeline_spec.components.items():
if component_name in main_pipeline_spec.components:
old_component_name = component_name
component_name = utils.make_name_unique_by_adding_index(
name=component_name,
collection=list(main_pipeline_spec.components.keys()),
delimiter='-')
old_component_name = component_name
new_component_name = utils.make_name_unique_by_adding_index(
name=component_name,
collection=list(main_pipeline_spec.components.keys()),
delimiter='-')
old_name_to_new_name[old_component_name] = new_component_name

if new_component_name != old_component_name:
_rename_component_refs(
pipeline_spec=sub_pipeline_spec,
old_component_ref=old_component_name,
new_component_ref=component_name)
main_pipeline_spec.components[component_name].CopyFrom(component_spec)
new_component_ref=new_component_name)

main_pipeline_spec.components[sub_pipeline_component_name].CopyFrom(
sub_pipeline_spec.root)
for old_component_name, component_spec in sub_pipeline_spec.components.items(
):
main_pipeline_spec.components[
old_name_to_new_name[old_component_name]].CopyFrom(component_spec)


def create_pipeline_spec(
Expand Down
26 changes: 12 additions & 14 deletions sdk/python/kfp/compiler/pipeline_spec_builder_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,30 +226,28 @@ def test_merge_deployment_spec_and_component_spec(self):
pipeline_spec_pb2.ComponentSpec(executor_label='exec-1-2'))
expected_main_pipeline_spec.components['comp-2'].CopyFrom(
pipeline_spec_pb2.ComponentSpec(executor_label='exec-2'))
expected_main_pipeline_spec.components['inner-pipeline'].CopyFrom(
pipeline_spec_pb2.ComponentSpec(dag=pipeline_spec_pb2.DagSpec()))
expected_main_pipeline_spec.components['inner-pipeline'].dag.tasks[
'task-1'].CopyFrom(
pipeline_spec_pb2.PipelineTaskSpec(
component_ref=pipeline_spec_pb2.ComponentRef(
name='comp-1-2')))
expected_main_pipeline_spec.components['inner-pipeline'].dag.tasks[
'task-2'].CopyFrom(
pipeline_spec_pb2.PipelineTaskSpec(
component_ref=pipeline_spec_pb2.ComponentRef(
name='comp-2')))

pipeline_spec_builder.merge_deployment_spec_and_component_spec(
expected_sub_pipeline_spec_root = pipeline_spec_pb2.ComponentSpec(
dag=pipeline_spec_pb2.DagSpec())
expected_sub_pipeline_spec_root.dag.tasks['task-1'].CopyFrom(
pipeline_spec_pb2.PipelineTaskSpec(
component_ref=pipeline_spec_pb2.ComponentRef(name='comp-1-2')))
expected_sub_pipeline_spec_root.dag.tasks['task-2'].CopyFrom(
pipeline_spec_pb2.PipelineTaskSpec(
component_ref=pipeline_spec_pb2.ComponentRef(name='comp-2')))

sub_pipeline_spec_copy = pipeline_spec_builder.merge_deployment_spec_and_component_spec(
main_pipeline_spec=main_pipeline_spec,
main_deployment_config=main_deployment_config,
sub_pipeline_spec=sub_pipeline_spec,
sub_pipeline_component_name='inner-pipeline',
)

self.assertEqual(sub_pipeline_spec, expected_sub_pipeline_spec)
self.assertEqual(main_pipeline_spec, expected_main_pipeline_spec)
self.assertEqual(main_deployment_config,
expected_main_deployment_config)
self.assertEqual(sub_pipeline_spec_copy.root,
expected_sub_pipeline_spec_root)


def pipeline_spec_from_file(filepath: str) -> str:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Copyright 2022 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Pipeline using ExitHandler with PipelineTaskFinalStatus."""

from kfp import compiler
from kfp import dsl
from kfp.dsl import component
from kfp.dsl import PipelineTaskFinalStatus


@component
def print_op(message: str):
"""Prints a message."""
print(message)


@component
def fail_op(message: str):
"""Fails."""
import sys
print(message)
sys.exit(1)


@component
def get_run_state(status: dict) -> str:
print('Pipeline status: ', status)
return status['state']


@dsl.pipeline(name='conditional-notification')
def exit_op(status: PipelineTaskFinalStatus):
"""Checks pipeline run status."""
with dsl.Condition(get_run_state(status=status).output == 'FAILED'):
print_op(message='notify task failure.')


@dsl.pipeline(name='pipeline-with-task-final-status-conditional')
def my_pipeline(message: str = 'Hello World!'):
exit_task = exit_op()

with dsl.ExitHandler(exit_task, name='my-pipeline'):
print_op(message=message)
fail_op(message='Task failed.')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering, do we support nested exit handling? For example, there may be ways to handle a failed inner task but let the outer task run as usual.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. While I believe IR can possibly support nested exit handler, I'm not sure if Argo can support it.
So unless we have concrete use case/strong feature demand, I think we'll hold off adding such a feature that could potentially break symmetry between KFP and Vertex (again, this needs investigation on whether Argo can support it).



if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=my_pipeline,
package_path=__file__.replace('.py', '.yaml'))
Loading