diff --git a/sdk/python/kfp/compiler/pipeline_spec_builder.py b/sdk/python/kfp/compiler/pipeline_spec_builder.py index 6e4bc4e8690..3f1575005da 100644 --- a/sdk/python/kfp/compiler/pipeline_spec_builder.py +++ b/sdk/python/kfp/compiler/pipeline_spec_builder.py @@ -28,6 +28,7 @@ from kfp.dsl import component_factory from kfp.dsl import for_loop from kfp.dsl import pipeline_channel +from kfp.dsl import pipeline_config from kfp.dsl import pipeline_context from kfp.dsl import pipeline_task from kfp.dsl import placeholders @@ -1879,6 +1880,7 @@ def create_pipeline_spec( pipeline: pipeline_context.Pipeline, component_spec: structures.ComponentSpec, pipeline_outputs: Optional[Any] = None, + pipeline_config: pipeline_config.PipelineConfig = None, ) -> Tuple[pipeline_spec_pb2.PipelineSpec, pipeline_spec_pb2.PlatformSpec]: """Creates a pipeline spec object. @@ -1886,6 +1888,7 @@ def create_pipeline_spec( pipeline: The instantiated pipeline object. component_spec: The component spec structures. pipeline_outputs: The pipeline outputs via return. + pipeline_config: The pipeline config object. Returns: A PipelineSpec proto representing the compiled pipeline. @@ -1947,6 +1950,10 @@ def create_pipeline_spec( ) platform_spec = pipeline_spec_pb2.PlatformSpec() + if pipeline_config is not None: + _merge_pipeline_config( + pipelineConfig=pipeline_config, platformSpec=platform_spec) + for group in all_groups: build_spec_by_group( pipeline_spec=pipeline_spec, @@ -2062,6 +2069,17 @@ def write_pipeline_spec_to_file( f'The output path {package_path} should end with ".yaml".') +def _merge_pipeline_config(pipelineConfig: pipeline_config.PipelineConfig, + platformSpec: pipeline_spec_pb2.PlatformSpec): + # TODO: add pipeline config options (ttl, semaphore, etc.) to the dict + # json_format.ParseDict( + # {'pipelineConfig': { + # '': pipelineConfig., + # }}, platformSpec.platforms['kubernetes']) + + return platformSpec + + def extract_comments_from_pipeline_spec(pipeline_spec: dict, pipeline_description: str) -> str: diff --git a/sdk/python/kfp/dsl/__init__.py b/sdk/python/kfp/dsl/__init__.py index 615f0bcf31b..19ab8027cb0 100644 --- a/sdk/python/kfp/dsl/__init__.py +++ b/sdk/python/kfp/dsl/__init__.py @@ -264,6 +264,7 @@ def my_pipeline(): from kfp.dsl.for_loop import Collected from kfp.dsl.importer_node import importer from kfp.dsl.pipeline_channel import OneOf + from kfp.dsl.pipeline_config import PipelineConfig from kfp.dsl.pipeline_context import pipeline from kfp.dsl.pipeline_task import PipelineTask from kfp.dsl.placeholders import ConcatPlaceholder @@ -292,4 +293,5 @@ def my_pipeline(): 'IfPresentPlaceholder', 'ConcatPlaceholder', 'PipelineTask', + 'PipelineConfig', ]) diff --git a/sdk/python/kfp/dsl/component_factory.py b/sdk/python/kfp/dsl/component_factory.py index 443728f4d2f..5e7a25c0edc 100644 --- a/sdk/python/kfp/dsl/component_factory.py +++ b/sdk/python/kfp/dsl/component_factory.py @@ -27,6 +27,7 @@ from kfp.dsl import container_component_artifact_channel from kfp.dsl import container_component_class from kfp.dsl import graph_component +from kfp.dsl import pipeline_config from kfp.dsl import placeholders from kfp.dsl import python_component from kfp.dsl import structures @@ -691,6 +692,7 @@ def create_graph_component_from_func( name: Optional[str] = None, description: Optional[str] = None, display_name: Optional[str] = None, + pipeline_config: pipeline_config.PipelineConfig = None, ) -> graph_component.GraphComponent: """Implementation for the @pipeline decorator. @@ -707,6 +709,7 @@ def create_graph_component_from_func( component_spec=component_spec, pipeline_func=func, display_name=display_name, + pipeline_config=pipeline_config, ) diff --git a/sdk/python/kfp/dsl/graph_component.py b/sdk/python/kfp/dsl/graph_component.py index 2b09927dfa1..7b17ec8a2e3 100644 --- a/sdk/python/kfp/dsl/graph_component.py +++ b/sdk/python/kfp/dsl/graph_component.py @@ -20,6 +20,7 @@ from kfp.compiler import pipeline_spec_builder as builder from kfp.dsl import base_component from kfp.dsl import pipeline_channel +from kfp.dsl import pipeline_config from kfp.dsl import pipeline_context from kfp.dsl import structures from kfp.pipeline_spec import pipeline_spec_pb2 @@ -37,9 +38,11 @@ def __init__( component_spec: structures.ComponentSpec, pipeline_func: Callable, display_name: Optional[str] = None, + pipeline_config: pipeline_config.PipelineConfig = None, ): super().__init__(component_spec=component_spec) self.pipeline_func = pipeline_func + self.pipeline_config = pipeline_config args_list = [] signature = inspect.signature(pipeline_func) @@ -69,6 +72,7 @@ def __init__( pipeline=dsl_pipeline, component_spec=self.component_spec, pipeline_outputs=pipeline_outputs, + pipeline_config=pipeline_config, ) pipeline_root = getattr(pipeline_func, 'pipeline_root', None) diff --git a/sdk/python/kfp/dsl/pipeline_config.py b/sdk/python/kfp/dsl/pipeline_config.py new file mode 100644 index 00000000000..a4e90c28a01 --- /dev/null +++ b/sdk/python/kfp/dsl/pipeline_config.py @@ -0,0 +1,23 @@ +# Copyright 2024 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Pipeline-level config options.""" + + +class PipelineConfig: + """PipelineConfig contains pipeline-level config options.""" + + def __init__(self): + pass + + # TODO add pipeline level configs diff --git a/sdk/python/kfp/dsl/pipeline_context.py b/sdk/python/kfp/dsl/pipeline_context.py index 4d0bbbaa840..049aad3a748 100644 --- a/sdk/python/kfp/dsl/pipeline_context.py +++ b/sdk/python/kfp/dsl/pipeline_context.py @@ -18,17 +18,20 @@ from typing import Callable, Optional from kfp.dsl import component_factory +from kfp.dsl import pipeline_config from kfp.dsl import pipeline_task from kfp.dsl import tasks_group from kfp.dsl import utils -def pipeline(func: Optional[Callable] = None, - *, - name: Optional[str] = None, - description: Optional[str] = None, - pipeline_root: Optional[str] = None, - display_name: Optional[str] = None) -> Callable: +def pipeline( + func: Optional[Callable] = None, + *, + name: Optional[str] = None, + description: Optional[str] = None, + pipeline_root: Optional[str] = None, + display_name: Optional[str] = None, + pipeline_config: pipeline_config.PipelineConfig = None) -> Callable: """Decorator used to construct a pipeline. Example @@ -50,6 +53,7 @@ def my_pipeline(a: str, b: int): pipeline_root: The root directory from which to read input and output parameters and artifacts. display_name: A human-readable name for the pipeline. + pipeline_config: Pipeline-level config options. """ if func is None: return functools.partial( @@ -58,6 +62,7 @@ def my_pipeline(a: str, b: int): description=description, pipeline_root=pipeline_root, display_name=display_name, + pipeline_config=pipeline_config, ) if pipeline_root: @@ -68,6 +73,7 @@ def my_pipeline(a: str, b: int): name=name, description=description, display_name=display_name, + pipeline_config=pipeline_config, )