Skip to content

Commit

Permalink
feat(sdk): Support loading pipeline from yaml (kubeflow#8209)
Browse files Browse the repository at this point in the history
* support loading pipeline from yaml

* release note

* cleanup

* maintain read/write consistency with component/pipeline compilation.
  • Loading branch information
chensun authored and jlyaoyuli committed Jan 5, 2023
1 parent 074c02e commit 6ec39cc
Show file tree
Hide file tree
Showing 16 changed files with 562 additions and 188 deletions.
2 changes: 1 addition & 1 deletion sdk/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## Major Features and Improvements
* Support parallelism setting in ParallelFor [\#8146](https://github.com/kubeflow/pipelines/pull/8146)
* Support for Python v3.10 [\#8186](https://github.com/kubeflow/pipelines/pull/8186)
* Support pipeline as a component [\#8179](https://github.com/kubeflow/pipelines/pull/8179), [\#8204](https://github.com/kubeflow/pipelines/pull/8204)
* Support pipeline as a component [\#8179](https://github.com/kubeflow/pipelines/pull/8179), [\#8204](https://github.com/kubeflow/pipelines/pull/8204), [\#8209](https://github.com/kubeflow/pipelines/pull/8209)

## Breaking Changes

Expand Down
3 changes: 2 additions & 1 deletion sdk/python/kfp/compiler/_read_write_test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@
'pipeline_in_pipeline',
'pipeline_in_pipeline_complex',
'pipeline_with_outputs',
'pipeline_in_pipeline_loaded_from_yaml',
],
'test_data_dir': 'sdk/python/kfp/compiler/test_data/pipelines',
'config': {
'read': False,
'read': True,
'write': True
}
},
Expand Down
45 changes: 9 additions & 36 deletions sdk/python/kfp/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,13 @@
https://docs.google.com/document/d/1PUDuSQ8vmeKSBloli53mp7GIvzekaY7sggg6ywy35Dk/
"""

import inspect
from typing import Any, Callable, Dict, Mapping, Optional, Union
import uuid
from typing import Any, Callable, Dict, Optional, Union

from kfp.compiler import pipeline_spec_builder as builder
from kfp.components import base_component
from kfp.components import component_factory
from kfp.components import graph_component
from kfp.components import pipeline_channel
from kfp.components import pipeline_context
from kfp.components import yaml_component
from kfp.components.types import type_utils
from kfp.pipeline_spec import pipeline_spec_pb2


class Compiler:
Expand Down Expand Up @@ -72,39 +67,17 @@ def compile(
"""

with type_utils.TypeCheckManager(enable=type_check):
if isinstance(pipeline_func, graph_component.GraphComponent):
# Retrieve the pre-comppiled pipeline spec.
pipeline_spec = pipeline_func.component_spec.implementation.graph

# Verify that pipeline_parameters contains only input names
# that match the pipeline inputs definition.
for input_name, input_value in (pipeline_parameters or
{}).items():
if input_name in pipeline_spec.root.input_definitions.parameters:
pipeline_spec.root.input_definitions.parameters[
input_name].default_value.CopyFrom(
builder.to_protobuf_value(input_value))
elif input_name in pipeline_spec.root.input_definitions.artifacts:
raise NotImplementedError(
'Default value for artifact input is not supported yet.'
)
else:
raise ValueError(
'Pipeline parameter {} does not match any known '
'pipeline input.'.format(input_name))

elif isinstance(pipeline_func, base_component.BaseComponent):
component_spec = builder.modify_component_spec_for_compile(
component_spec=pipeline_func.component_spec,
pipeline_name=pipeline_name,
pipeline_parameters_override=pipeline_parameters,
)
pipeline_spec = component_spec.to_pipeline_spec()
else:
if not isinstance(pipeline_func, base_component.BaseComponent):
raise ValueError(
'Unsupported pipeline_func type. Expected '
'subclass of `base_component.BaseComponent` or '
'`Callable` constructed with @dsl.pipeline '
f'decorator. Got: {type(pipeline_func)}')

pipeline_spec = builder.modify_pipeline_spec_with_override(
pipeline_spec=pipeline_func.pipeline_spec,
pipeline_name=pipeline_name,
pipeline_parameters=pipeline_parameters,
)
builder.write_pipeline_spec_to_file(
pipeline_spec=pipeline_spec, package_path=package_path)
128 changes: 72 additions & 56 deletions sdk/python/kfp/compiler/pipeline_spec_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -1027,40 +1027,46 @@ def populate_metrics_in_dag_outputs(
sub_task_output = unique_output_name


def modify_component_spec_for_compile(
component_spec: structures.ComponentSpec,
def modify_pipeline_spec_with_override(
pipeline_spec: pipeline_spec_pb2.PipelineSpec,
pipeline_name: Optional[str],
pipeline_parameters_override: Optional[Mapping[str, Any]],
) -> structures.ComponentSpec:
"""Modifies the ComponentSpec using arguments passed to the
Compiler.compile method.
pipeline_parameters: Optional[Mapping[str, Any]],
) -> pipeline_spec_pb2.PipelineSpec:
"""Modifies the PipelineSpec using arguments passed to the Compiler.compile
method.
Args:
component_spec (structures.ComponentSpec): ComponentSpec to modify.
pipeline_spec (pipeline_spec_pb2.PipelineSpec): PipelineSpec to modify.
pipeline_name (Optional[str]): Name of the pipeline. Overrides component name.
pipeline_parameters_override (Optional[Mapping[str, Any]]): Pipeline parameters. Overrides component input default values.
pipeline_parameters (Optional[Mapping[str, Any]]): Pipeline parameters. Overrides component input default values.
Returns:
The modified PipelineSpec copy.
Raises:
ValueError: If a parameter is passed to the compiler that is not a component input.
Returns:
structures.ComponentSpec: The modified ComponentSpec.
"""
pipeline_name = pipeline_name or utils.sanitize_component_name(
component_spec.name).replace(utils._COMPONENT_NAME_PREFIX, '')

component_spec.name = pipeline_name
if component_spec.inputs is not None:
pipeline_parameters_override = pipeline_parameters_override or {}
for input_name in pipeline_parameters_override:
if input_name not in component_spec.inputs:
raise ValueError(
f'Parameter {input_name} does not match any known component parameters.'
)
component_spec.inputs[
input_name].default = pipeline_parameters_override[input_name]
pipeline_spec_new = pipeline_spec_pb2.PipelineSpec()
pipeline_spec_new.CopyFrom(pipeline_spec)
pipeline_spec = pipeline_spec_new

if pipeline_name is not None:
pipeline_spec.pipeline_info.name = pipeline_name

# Verify that pipeline_parameters contains only input names
# that match the pipeline inputs definition.
for input_name, input_value in (pipeline_parameters or {}).items():
if input_name in pipeline_spec.root.input_definitions.parameters:
pipeline_spec.root.input_definitions.parameters[
input_name].default_value.CopyFrom(
to_protobuf_value(input_value))
elif input_name in pipeline_spec.root.input_definitions.artifacts:
raise NotImplementedError(
'Default value for artifact input is not supported.')
else:
raise ValueError('Pipeline parameter {} does not match any known '
'pipeline input.'.format(input_name))

return component_spec
return pipeline_spec


def build_spec_by_group(
Expand Down Expand Up @@ -1125,6 +1131,9 @@ def build_spec_by_group(
]
is_parent_component_root = (group_component_spec == pipeline_spec.root)

# Track if component spec is addeded from merging pipeline spec.
component_spec_added = False

if isinstance(subgroup, pipeline_task.PipelineTask):

subgroup_task_spec = build_task_spec_for_task(
Expand All @@ -1138,30 +1147,33 @@ def build_spec_by_group(
task=subgroup)
task_name_to_component_spec[subgroup.name] = subgroup_component_spec

executor_label = subgroup_component_spec.executor_label
if subgroup_component_spec.executor_label:
executor_label = utils.make_name_unique_by_adding_index(
name=subgroup_component_spec.executor_label,
collection=list(deployment_config.executors.keys()),
delimiter='-')
subgroup_component_spec.executor_label = executor_label

if executor_label not in deployment_config.executors:
if subgroup.container_spec is not None:
subgroup_container_spec = build_container_spec_for_task(
task=subgroup)
deployment_config.executors[
executor_label].container.CopyFrom(
subgroup_container_spec)
elif subgroup.importer_spec is not None:
subgroup_importer_spec = build_importer_spec_for_task(
task=subgroup)
deployment_config.executors[
executor_label].importer.CopyFrom(
subgroup_importer_spec)
elif subgroup.pipeline_spec is not None:
merge_deployment_spec_and_component_spec(
main_pipeline_spec=pipeline_spec,
main_deployment_config=deployment_config,
sub_pipeline_spec=subgroup.pipeline_spec,
sub_pipeline_component_name=subgroup_component_name,
)
else:
raise RuntimeError
if subgroup.container_spec is not None:
subgroup_container_spec = build_container_spec_for_task(
task=subgroup)
deployment_config.executors[executor_label].container.CopyFrom(
subgroup_container_spec)
elif subgroup.importer_spec is not None:
subgroup_importer_spec = build_importer_spec_for_task(
task=subgroup)
deployment_config.executors[executor_label].importer.CopyFrom(
subgroup_importer_spec)
elif subgroup.pipeline_spec is not None:
merge_deployment_spec_and_component_spec(
main_pipeline_spec=pipeline_spec,
main_deployment_config=deployment_config,
sub_pipeline_spec=subgroup.pipeline_spec,
sub_pipeline_component_name=subgroup_component_name,
)
component_spec_added = True
else:
raise RuntimeError
elif isinstance(subgroup, tasks_group.ParallelFor):

# "Punch the hole", adding additional inputs (other than loop
Expand Down Expand Up @@ -1258,8 +1270,14 @@ def build_spec_by_group(
subgroup_task_spec.dependent_tasks.extend(
[utils.sanitize_task_name(dep) for dep in group_dependencies])

# Add component spec if not exists
if subgroup_component_name not in pipeline_spec.components:
# Add component spec if not already added from merging pipeline spec.
if not component_spec_added:
subgroup_component_name = utils.make_name_unique_by_adding_index(
name=subgroup_component_name,
collection=list(pipeline_spec.components.keys()),
delimiter='-')

subgroup_task_spec.component_ref.name = subgroup_component_name
pipeline_spec.components[subgroup_component_name].CopyFrom(
subgroup_component_spec)

Expand Down Expand Up @@ -1470,12 +1488,11 @@ def _rename_component_refs(
sub_pipeline_spec.root)


def create_pipeline_spec_and_deployment_config(
def create_pipeline_spec(
pipeline: pipeline_context.Pipeline,
component_spec: structures.ComponentSpec,
pipeline_outputs: Optional[Any] = None,
) -> Tuple[pipeline_spec_pb2.PipelineSpec,
pipeline_spec_pb2.PipelineDeploymentConfig]:
) -> pipeline_spec_pb2.PipelineSpec:
"""Creates a pipeline spec object.
Args:
Expand All @@ -1484,8 +1501,7 @@ def create_pipeline_spec_and_deployment_config(
pipeline_outputs: The pipeline outputs via return.
Returns:
A tuple of PipelineSpec proto representing the compiled pipeline and its
PipelineDeploymentconfig proto object.
A PipelineSpec proto representing the compiled pipeline.
Raises:
ValueError if the argument is of unsupported types.
Expand Down Expand Up @@ -1553,7 +1569,7 @@ def create_pipeline_spec_and_deployment_config(
deployment_config=deployment_config,
)

return pipeline_spec, deployment_config
return pipeline_spec


def write_pipeline_spec_to_file(pipeline_spec: pipeline_spec_pb2.PipelineSpec,
Expand Down
14 changes: 8 additions & 6 deletions sdk/python/kfp/compiler/read_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,15 @@ def load_compiled_file(filename: str) -> Dict[str, Any]:
return ignore_kfp_version_helper(contents)


def set_description_in_component_spec_to_none(
def strip_some_component_spec_fields(
component_spec: structures.ComponentSpec) -> structures.ComponentSpec:
"""Sets the description field of a ComponentSpec to None."""
"""Strips some component spec fields that should be ignored when comparing
with golden result."""
# Ignore description when comparing components specs read in from v1 component YAML and from IR YAML, because non lightweight Python components defined in v1 YAML can have a description field, but IR YAML does not preserve this field unless the component is a lightweight Python function-based component
component_spec.description = None
# ignore SDK version so that golden snapshots don't need to be updated between SDK version bump
if component_spec.implementation.graph is not None:
component_spec.implementation.graph.sdk_version = ''
return component_spec


Expand Down Expand Up @@ -158,10 +162,8 @@ def _test_serialization_deserialization_consistency(self, yaml_file: str):
reloaded_component = self._compile_and_load_component(
original_component)
self.assertEqual(
set_description_in_component_spec_to_none(
original_component.component_spec),
set_description_in_component_spec_to_none(
reloaded_component.component_spec))
strip_some_component_spec_fields(original_component.component_spec),
strip_some_component_spec_fields(reloaded_component.component_spec))

def _test_serialization_correctness(self,
python_file: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ def pipeline_not_used():


@dsl.pipeline(name='pipeline-in-pipeline-complex')
def my_pipeline():
print_op1(msg='Hello')
def my_pipeline(msg: str = 'Hello'):
print_op1(msg=msg)
with dsl.ParallelFor(['Hello', 'world!']) as item:
graph_component(msg=item)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,13 @@ root:
inputs:
parameters:
msg:
runtimeValue:
constant: Hello
componentInputParameter: msg
taskInfo:
name: print-op1
inputDefinitions:
parameters:
msg:
defaultValue: Hello
parameterType: STRING
schemaVersion: 2.1.0
sdkVersion: kfp-2.0.0-beta.3
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Copyright 2022 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pathlib

from kfp import compiler
from kfp import components
from kfp import dsl
from kfp.dsl import Artifact
from kfp.dsl import Input


@dsl.component
def print_op1(data: Input[Artifact]):
with open(data.path, 'r') as f:
print(f.read())


reuse_yaml_pipeline = components.load_component_from_file(
pathlib.Path(__file__).parent / 'pipeline_with_outputs.yaml')


@dsl.pipeline(name='pipeline-in-pipeline')
def my_pipeline():
task = reuse_yaml_pipeline(msg='Hello')
print_op1(data=task.output)


if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=my_pipeline,
package_path=__file__.replace('.py', '.yaml'))
Loading

0 comments on commit 6ec39cc

Please sign in to comment.