diff --git a/sdk/RELEASE.md b/sdk/RELEASE.md index f710382ccbe..3e43e2633f0 100644 --- a/sdk/RELEASE.md +++ b/sdk/RELEASE.md @@ -16,6 +16,7 @@ # 2.3.0 ## Features * Support `PipelineTaskFinalStatus` in tasks that use `.ignore_upstream_failure()` [\#10010](https://github.com/kubeflow/pipelines/pull/10010) +* Add support for a Pythonic artifact authoring style [\#9932](https://github.com/kubeflow/pipelines/pull/9932) ## Breaking changes diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index b5d7a5267d7..b98d5624d63 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -33,6 +33,7 @@ from kfp.compiler import compiler_utils from kfp.dsl import Artifact from kfp.dsl import ContainerSpec +from kfp.dsl import Dataset from kfp.dsl import graph_component from kfp.dsl import Input from kfp.dsl import Model @@ -5279,5 +5280,415 @@ def roll_die_pipeline() -> str: return dsl.OneOf(t3, t4.output) +class TestPythonicArtifactAuthoring(unittest.TestCase): + # python component + def test_pythonic_input_artifact(self): + + @dsl.component + def pythonic_style(in_artifact: Artifact): + print(in_artifact) + + self.assertEqual( + pythonic_style.pipeline_spec.components['comp-pythonic-style'] + .input_definitions.artifacts['in_artifact'].artifact_type + .schema_title, + 'system.Artifact', + ) + + self.assertFalse( + pythonic_style.pipeline_spec.components['comp-pythonic-style'] + .input_definitions.parameters) + + @dsl.component + def standard_style(in_artifact: Input[Artifact]): + print(in_artifact) + + self.assertEqual( + pythonic_style.pipeline_spec.components['comp-pythonic-style'] + .input_definitions.artifacts['in_artifact'].artifact_type + .schema_title, + standard_style.pipeline_spec.components['comp-standard-style'] + .input_definitions.artifacts['in_artifact'].artifact_type + .schema_title, + ) + + def test_pythonic_input_artifact_optional(self): + + @dsl.component + def pythonic_style(in_artifact: Optional[Artifact] = None): + print(in_artifact) + + self.assertEqual( + pythonic_style.pipeline_spec.components['comp-pythonic-style'] + .input_definitions.artifacts['in_artifact'].artifact_type + .schema_title, + 'system.Artifact', + ) + + self.assertFalse( + pythonic_style.pipeline_spec.components['comp-pythonic-style'] + .input_definitions.parameters) + + @dsl.component + def standard_style(in_artifact: Optional[Input[Artifact]] = None): + print(in_artifact) + + self.assertEqual( + pythonic_style.pipeline_spec.components['comp-pythonic-style'] + .input_definitions.artifacts['in_artifact'].artifact_type + .schema_title, + standard_style.pipeline_spec.components['comp-standard-style'] + .input_definitions.artifacts['in_artifact'].artifact_type + .schema_title, + ) + + def test_pythonic_input_list_of_artifacts(self): + + @dsl.component + def pythonic_style(in_artifact: List[Artifact]): + print(in_artifact) + + self.assertEqual( + pythonic_style.pipeline_spec.components['comp-pythonic-style'] + .input_definitions.artifacts['in_artifact'].artifact_type + .schema_title, + 'system.Artifact', + ) + self.assertTrue( + pythonic_style.pipeline_spec.components['comp-pythonic-style'] + .input_definitions.artifacts['in_artifact'].is_artifact_list) + + self.assertFalse( + pythonic_style.pipeline_spec.components['comp-pythonic-style'] + .input_definitions.parameters) + + @dsl.component + def standard_style(in_artifact: Input[List[Artifact]]): + print(in_artifact) + + self.assertEqual( + pythonic_style.pipeline_spec.components['comp-pythonic-style'] + .input_definitions.artifacts['in_artifact'].artifact_type + .schema_title, + standard_style.pipeline_spec.components['comp-standard-style'] + .input_definitions.artifacts['in_artifact'].artifact_type + .schema_title, + ) + + def test_pythonic_input_list_of_artifacts_optional(self): + + @dsl.component + def pythonic_style(in_artifact: Optional[List[Artifact]] = None): + print(in_artifact) + + self.assertEqual( + pythonic_style.pipeline_spec.components['comp-pythonic-style'] + .input_definitions.artifacts['in_artifact'].artifact_type + .schema_title, + 'system.Artifact', + ) + self.assertTrue( + pythonic_style.pipeline_spec.components['comp-pythonic-style'] + .input_definitions.artifacts['in_artifact'].is_artifact_list) + + self.assertFalse( + pythonic_style.pipeline_spec.components['comp-pythonic-style'] + .input_definitions.parameters) + + @dsl.component + def standard_style(in_artifact: Optional[Input[List[Artifact]]] = None): + print(in_artifact) + + self.assertEqual( + pythonic_style.pipeline_spec.components['comp-pythonic-style'] + .input_definitions.artifacts['in_artifact'].artifact_type + .schema_title, + standard_style.pipeline_spec.components['comp-standard-style'] + .input_definitions.artifacts['in_artifact'].artifact_type + .schema_title, + ) + + def test_pythonic_output_artifact(self): + + @dsl.component + def pythonic_style() -> Artifact: + return Artifact(uri='gs://my_bucket/foo') + + self.assertEqual( + pythonic_style.pipeline_spec.components['comp-pythonic-style'] + .output_definitions.artifacts['Output'].artifact_type.schema_title, + 'system.Artifact', + ) + + self.assertFalse( + pythonic_style.pipeline_spec.components['comp-pythonic-style'] + .output_definitions.parameters) + + @dsl.component + def standard_style(named_output: Output[Artifact]): + return Artifact(uri='gs://my_bucket/foo') + + self.assertEqual( + pythonic_style.pipeline_spec.components['comp-pythonic-style'] + .output_definitions.artifacts['Output'].artifact_type.schema_title, + standard_style.pipeline_spec.components['comp-standard-style'] + .output_definitions.artifacts['named_output'].artifact_type + .schema_title, + ) + + def test_pythonic_output_artifact_multiple_returns(self): + + @dsl.component + def pythonic_style() -> NamedTuple('outputs', a=Artifact, d=Dataset): + a = Artifact(uri='gs://my_bucket/foo/artifact') + d = Artifact(uri='gs://my_bucket/foo/dataset') + outputs = NamedTuple('outputs', a=Artifact, d=Dataset) + return outputs(a=a, d=d) + + self.assertEqual( + pythonic_style.pipeline_spec.components['comp-pythonic-style'] + .output_definitions.artifacts['a'].artifact_type.schema_title, + 'system.Artifact', + ) + self.assertEqual( + pythonic_style.pipeline_spec.components['comp-pythonic-style'] + .output_definitions.artifacts['d'].artifact_type.schema_title, + 'system.Dataset', + ) + + self.assertFalse( + pythonic_style.pipeline_spec.components['comp-pythonic-style'] + .output_definitions.parameters) + + @dsl.component + def standard_style(a: Output[Artifact], d: Output[Dataset]): + a.uri = 'gs://my_bucket/foo/artifact' + d.uri = 'gs://my_bucket/foo/dataset' + + self.assertEqual( + pythonic_style.pipeline_spec.components['comp-pythonic-style'] + .output_definitions.artifacts['a'].artifact_type.schema_title, + standard_style.pipeline_spec.components['comp-standard-style'] + .output_definitions.artifacts['a'].artifact_type.schema_title, + ) + + self.assertEqual( + pythonic_style.pipeline_spec.components['comp-pythonic-style'] + .output_definitions.artifacts['d'].artifact_type.schema_title, + standard_style.pipeline_spec.components['comp-standard-style'] + .output_definitions.artifacts['d'].artifact_type.schema_title, + ) + + def test_pythonic_output_list_artifacts(self): + + with self.assertRaisesRegex( + ValueError, + r"Output lists of artifacts are only supported for pipelines\. Got output list of artifacts for output parameter 'Output' of component 'pythonic-style'\." + ): + + @dsl.component + def pythonic_style() -> List[Artifact]: + pass + + def test_mixed_component_authoring_styles(self): + # can be permitted, since the expected behavior is unambiguous + + # in traditional; out pythonic + @dsl.component + def back_compat_style(in_artifact: Input[Artifact]) -> Artifact: + print(in_artifact) + return Artifact(uri='gs://my_bucket/foo') + + self.assertTrue(back_compat_style.pipeline_spec) + + # out traditional; in pythonic + @dsl.component + def mixed_style(in_artifact: Artifact, out_artifact: Output[Artifact]): + print(in_artifact) + out_artifact.uri = 'gs://my_bucket/foo' + + self.assertTrue(mixed_style.pipeline_spec) + + # pipeline + def test_pipeline_input_artifact(self): + + @dsl.component + def pythonic_style(in_artifact: Artifact): + print(in_artifact) + + @dsl.pipeline + def my_pipeline(in_artifact: Artifact): + pythonic_style(in_artifact=in_artifact) + + self.assertEqual( + my_pipeline.pipeline_spec.root.input_definitions + .artifacts['in_artifact'].artifact_type.schema_title, + 'system.Artifact', + ) + + self.assertFalse( + my_pipeline.pipeline_spec.root.input_definitions.parameters) + + def test_pipeline_input_artifact_optional(self): + + @dsl.component + def pythonic_style(in_artifact: Optional[Artifact] = None): + print(in_artifact) + + @dsl.pipeline + def my_pipeline(in_artifact: Optional[Artifact] = None): + pythonic_style(in_artifact=in_artifact) + + self.assertEqual( + my_pipeline.pipeline_spec.root.input_definitions + .artifacts['in_artifact'].artifact_type.schema_title, + 'system.Artifact', + ) + + self.assertFalse( + my_pipeline.pipeline_spec.root.input_definitions.parameters) + + def test_pipeline_input_list_of_artifacts(self): + + @dsl.component + def pythonic_style(in_artifact: List[Artifact]): + print(in_artifact) + + @dsl.pipeline + def my_pipeline(in_artifact: List[Artifact]): + pythonic_style(in_artifact=in_artifact) + + self.assertEqual( + my_pipeline.pipeline_spec.root.input_definitions + .artifacts['in_artifact'].artifact_type.schema_title, + 'system.Artifact', + ) + self.assertTrue(my_pipeline.pipeline_spec.root.input_definitions + .artifacts['in_artifact'].is_artifact_list) + + self.assertFalse( + my_pipeline.pipeline_spec.root.input_definitions.parameters) + + def test_pipeline_input_list_of_artifacts_optional(self): + + @dsl.component + def pythonic_style(in_artifact: Optional[List[Artifact]] = None): + print(in_artifact) + + @dsl.pipeline + def my_pipeline(in_artifact: Optional[List[Artifact]] = None): + pythonic_style(in_artifact=in_artifact) + + self.assertEqual( + my_pipeline.pipeline_spec.root.input_definitions + .artifacts['in_artifact'].artifact_type.schema_title, + 'system.Artifact', + ) + + self.assertFalse( + my_pipeline.pipeline_spec.root.input_definitions.parameters) + + def test_pipeline_output_artifact(self): + + @dsl.component + def pythonic_style() -> Artifact: + return Artifact(uri='gs://my_bucket/foo') + + @dsl.pipeline + def my_pipeline() -> Artifact: + return pythonic_style().output + + self.assertEqual( + my_pipeline.pipeline_spec.root.output_definitions + .artifacts['Output'].artifact_type.schema_title, 'system.Artifact') + + self.assertFalse( + my_pipeline.pipeline_spec.root.output_definitions.parameters) + + def test_pipeline_output_list_of_artifacts(self): + + @dsl.component + def noop() -> Artifact: + # write artifact + return Artifact(uri='gs://my_bucket/foo/bar') + + @dsl.pipeline + def my_pipeline() -> List[Artifact]: + with dsl.ParallelFor([1, 2, 3]): + t = noop() + + return dsl.Collected(t.output) + + self.assertEqual( + my_pipeline.pipeline_spec.root.output_definitions + .artifacts['Output'].artifact_type.schema_title, 'system.Artifact') + self.assertTrue(my_pipeline.pipeline_spec.root.output_definitions + .artifacts['Output'].is_artifact_list) + + self.assertFalse( + my_pipeline.pipeline_spec.root.output_definitions.parameters) + + # container + def test_container_input_artifact(self): + with self.assertRaisesRegex( + TypeError, + r"Container Components must wrap input and output artifact annotations with Input/Output type markers \(Input\[\] or Output\[\]\)\. Got function input 'in_artifact' with annotation \." + ): + + @dsl.container_component + def comp(in_artifact: Artifact): + return dsl.ContainerSpec(image='alpine', command=['pwd']) + + def test_container_input_artifact_optional(self): + with self.assertRaisesRegex( + TypeError, + r"Container Components must wrap input and output artifact annotations with Input/Output type markers \(Input\[\] or Output\[\]\)\. Got function input 'in_artifact' with annotation \." + ): + + @dsl.container_component + def comp(in_artifact: Optional[Artifact] = None): + return dsl.ContainerSpec(image='alpine', command=['pwd']) + + def test_container_input_list_of_artifacts(self): + with self.assertRaisesRegex( + TypeError, + r"Container Components must wrap input and output artifact annotations with Input/Output type markers \(Input\[\] or Output\[\]\)\. Got function input 'in_artifact' with annotation typing\.List\[kfp\.dsl\.types\.artifact_types\.Artifact\]\." + ): + + @dsl.container_component + def comp(in_artifact: List[Artifact]): + return dsl.ContainerSpec(image='alpine', command=['pwd']) + + def test_container_input_list_of_artifacts_optional(self): + with self.assertRaisesRegex( + TypeError, + r"Container Components must wrap input and output artifact annotations with Input/Output type markers \(Input\[\] or Output\[\]\)\. Got function input 'in_artifact' with annotation typing\.List\[kfp\.dsl\.types\.artifact_types\.Artifact\]\." + ): + + @dsl.container_component + def comp(in_artifact: Optional[List[Artifact]] = None): + return dsl.ContainerSpec(image='alpine', command=['pwd']) + + def test_container_output_artifact(self): + with self.assertRaisesRegex( + TypeError, + r'Return annotation should be either ContainerSpec or omitted for container components\.' + ): + + @dsl.container_component + def comp() -> Artifact: + return dsl.ContainerSpec(image='alpine', command=['pwd']) + + def test_container_output_list_of_artifact(self): + with self.assertRaisesRegex( + TypeError, + r'Return annotation should be either ContainerSpec or omitted for container components\.' + ): + + @dsl.container_component + def comp() -> List[Artifact]: + return dsl.ContainerSpec(image='alpine', command=['pwd']) + + if __name__ == '__main__': unittest.main() diff --git a/sdk/python/kfp/dsl/__init__.py b/sdk/python/kfp/dsl/__init__.py index c2c70c847d5..d2372156c7e 100644 --- a/sdk/python/kfp/dsl/__init__.py +++ b/sdk/python/kfp/dsl/__init__.py @@ -29,6 +29,7 @@ 'Metrics', 'Model', 'SlicedClassificationMetrics', + 'get_uri', 'PIPELINE_JOB_NAME_PLACEHOLDER', 'PIPELINE_JOB_RESOURCE_NAME_PLACEHOLDER', 'PIPELINE_JOB_ID_PLACEHOLDER', @@ -44,6 +45,7 @@ from kfp.dsl.types.artifact_types import Artifact from kfp.dsl.types.artifact_types import ClassificationMetrics from kfp.dsl.types.artifact_types import Dataset +from kfp.dsl.types.artifact_types import get_uri from kfp.dsl.types.artifact_types import HTML from kfp.dsl.types.artifact_types import Markdown from kfp.dsl.types.artifact_types import Metrics diff --git a/sdk/python/kfp/dsl/component_factory.py b/sdk/python/kfp/dsl/component_factory.py index f34dd33fe0a..29402dc1315 100644 --- a/sdk/python/kfp/dsl/component_factory.py +++ b/sdk/python/kfp/dsl/component_factory.py @@ -17,7 +17,8 @@ import pathlib import re import textwrap -from typing import Callable, List, Mapping, Optional, Tuple, Type, Union +from typing import (Any, Callable, Dict, List, Mapping, Optional, Tuple, Type, + Union) import warnings import docstring_parser @@ -192,7 +193,7 @@ def _get_function_source_definition(func: Callable) -> str: return '\n'.join(func_code_lines) -def _maybe_make_unique(name: str, names: List[str]): +def maybe_make_unique(name: str, names: List[str]): if name not in names: return name @@ -204,188 +205,187 @@ def _maybe_make_unique(name: str, names: List[str]): raise RuntimeError(f'Too many arguments with the name {name}') -def extract_component_interface( - func: Callable, +def get_name_to_specs( + signature: inspect.Signature, containerized: bool = False, - description: Optional[str] = None, - name: Optional[str] = None, -) -> structures.ComponentSpec: - - signature = inspect.signature(func) - parameters = list(signature.parameters.values()) - - original_docstring = inspect.getdoc(func) - parsed_docstring = docstring_parser.parse(original_docstring) - - inputs = {} - outputs = {} +) -> Tuple[Dict[str, Any], Dict[str, Any]]: + """Returns two dictionaries. - input_names = set() - output_names = set() - for parameter in parameters: - parameter_type = type_annotations.maybe_strip_optional_from_annotation( - parameter.annotation) - passing_style = None - io_name = parameter.name - is_artifact_list = False - - if type_annotations.is_Input_Output_artifact_annotation(parameter_type): - # passing_style is either type_annotations.InputAnnotation or - # type_annotations.OutputAnnotation. - passing_style = type_annotations.get_io_artifact_annotation( - parameter_type) - - # parameter_type is a type like typing_extensions.Annotated[kfp.dsl.types.artifact_types.Artifact, ] OR typing_extensions.Annotated[typing.List[kfp.dsl.types.artifact_types.Artifact], ] - - is_artifact_list = type_annotations.is_list_of_artifacts( - parameter_type.__origin__) - - parameter_type = type_annotations.get_io_artifact_class( - parameter_type) - if not type_annotations.is_artifact_class(parameter_type): - raise ValueError( - f'Input[T] and Output[T] are only supported when T is an artifact or list of artifacts. Found `{io_name} with type {parameter_type}`' + The first is a mapping of input name to input annotation. The second + is a mapping of output name to output annotation. + """ + func_params = list(signature.parameters.values()) + + name_to_input_specs = {} + name_to_output_specs = {} + + ### handle function parameter annotations ### + for func_param in func_params: + name = func_param.name + if name == SINGLE_OUTPUT_NAME: + raise ValueError( + f'"{SINGLE_OUTPUT_NAME}" is an invalid parameter name.') + # Stripping Optional from Optional[] is the only processing done + # on annotations in this flow. Other than that, we extract the raw + # annotation and process later. + annotation = type_annotations.maybe_strip_optional_from_annotation( + func_param.annotation) + + # no annotation + if annotation == inspect._empty: + raise TypeError(f'Missing type annotation for argument: {name}') + + # is Input[Artifact], Input[List[]], (e.g., str), or InputPath() + elif (type_annotations.is_artifact_wrapped_in_Input(annotation) or + isinstance( + annotation, + type_annotations.InputPath, + ) or type_utils.is_parameter_type(annotation)): + name_to_input_specs[maybe_make_unique( + name, list(name_to_input_specs))] = make_input_spec( + annotation, func_param) + # is Artifact annotation (e.g., Artifact, Dataset, etc.) + # or List[] + elif type_annotations.issubclass_of_artifact( + annotation) or type_annotations.is_list_of_artifacts( + annotation): + if containerized: + raise TypeError( + f"Container Components must wrap input and output artifact annotations with Input/Output type markers (Input[] or Output[]). Got function input '{name}' with annotation {annotation}." ) + name_to_input_specs[maybe_make_unique( + name, list(name_to_input_specs))] = make_input_spec( + annotation, func_param) + + # is Output[Artifact] or OutputPath() + elif type_annotations.is_artifact_wrapped_in_Output( + annotation) or isinstance(annotation, + type_annotations.OutputPath): + name_to_output_specs[maybe_make_unique( + name, + list(name_to_output_specs))] = make_output_spec(annotation) + + # parameter type + else: + type_string = type_utils._annotation_to_type_struct(annotation) + name_to_input_specs[maybe_make_unique( + name, list(name_to_input_specs))] = make_input_spec( + type_string, func_param) - if parameter.default is not inspect.Parameter.empty: - if passing_style in [ - type_annotations.OutputAnnotation, - type_annotations.OutputPath, - ]: - raise ValueError( - 'Default values for Output artifacts are not supported.' - ) - elif parameter.default is not None: - raise ValueError( - f'Optional Input artifacts may only have default value None. Got: {parameter.default}.' - ) - - elif isinstance( - parameter_type, - (type_annotations.InputPath, type_annotations.OutputPath)): - passing_style = type(parameter_type) - parameter_type = parameter_type.type - if parameter.default is not inspect.Parameter.empty and not ( - passing_style == type_annotations.InputPath and - parameter.default is None): - raise ValueError( - 'Path inputs only support default values of None. Default' - ' values for outputs are not supported.') - - type_struct = type_utils._annotation_to_type_struct(parameter_type) - if type_struct is None: - raise TypeError( - f'Missing type annotation for argument: {parameter.name}') + ### handle return annotations ### + return_ann = signature.return_annotation - if passing_style in [ - type_annotations.OutputAnnotation, type_annotations.OutputPath + # validate container component returns + if containerized: + if return_ann not in [ + inspect.Parameter.empty, + structures.ContainerSpec, ]: - if io_name == SINGLE_OUTPUT_NAME: - raise ValueError( - f'"{SINGLE_OUTPUT_NAME}" is an invalid parameter name.') - io_name = _maybe_make_unique(io_name, output_names) - output_names.add(io_name) - if type_annotations.is_artifact_class(parameter_type): - schema_version = parameter_type.schema_version - output_spec = structures.OutputSpec( - type=type_utils.create_bundled_artifact_type( - type_struct, schema_version), - is_artifact_list=is_artifact_list) - else: - output_spec = structures.OutputSpec(type=type_struct) - outputs[io_name] = output_spec - else: - io_name = _maybe_make_unique(io_name, input_names) - input_names.add(io_name) - type_ = type_utils.create_bundled_artifact_type( - type_struct, parameter_type.schema_version - ) if type_annotations.is_artifact_class( - parameter_type) else type_struct - default = None if parameter.default == inspect.Parameter.empty or type_annotations.is_artifact_class( - parameter_type) else parameter.default - optional = parameter.default is not inspect.Parameter.empty or type_utils.is_task_final_status_type( - type_struct) - input_spec = structures.InputSpec( - type=type_, - default=default, - optional=optional, - is_artifact_list=is_artifact_list, + raise TypeError( + 'Return annotation should be either ContainerSpec or omitted for container components.' ) + # ignore omitted returns + elif return_ann is None or return_ann == inspect.Parameter.empty: + pass + # is NamedTuple + elif hasattr(return_ann, '_fields'): + # Getting field type annotations. + # __annotations__ does not exist in python 3.5 and earlier + # _field_types does not exist in python 3.9 and later + field_annotations = getattr(return_ann, '__annotations__', + None) or getattr(return_ann, '_field_types') + for name in return_ann._fields: + annotation = field_annotations[name] + if not type_annotations.is_list_of_artifacts( + annotation) and not type_annotations.is_artifact_class( + annotation): + annotation = type_utils._annotation_to_type_struct(annotation) + name_to_output_specs[maybe_make_unique( + name, + list(name_to_output_specs))] = make_output_spec(annotation) + # is deprecated dict returns style + elif isinstance(return_ann, dict): + warnings.warn( + 'The ability to specify multiple outputs using the dict syntax' + ' has been deprecated. It will be removed soon after release' + ' 0.1.32. Please use typing.NamedTuple to declare multiple' + ' outputs.', DeprecationWarning) + for output_name, output_type_annotation in return_ann.items(): + output_type = type_utils._annotation_to_type_struct( + output_type_annotation) + name_to_output_specs[maybe_make_unique( + output_name, list(name_to_output_specs))] = output_type + # is the simple single return case (can be `-> ` or `-> Artifact`) + # treated the same way, since processing is done in inner functions + else: + name_to_output_specs[maybe_make_unique( + SINGLE_OUTPUT_NAME, + list(name_to_output_specs))] = make_output_spec(return_ann) + return name_to_input_specs, name_to_output_specs + + +def canonicalize_annotation(annotation: Any): + """Does cleaning on annotations that are common between input and output + annotations.""" + if type_annotations.is_Input_Output_artifact_annotation(annotation): + annotation = type_annotations.strip_Input_or_Output_marker(annotation) + if isinstance(annotation, + (type_annotations.InputPath, type_annotations.OutputPath)): + annotation = annotation.type + return annotation + + +def make_input_output_spec_args(annotation: Any) -> Dict[str, Any]: + """Gets a dict of kwargs shared between InputSpec and OutputSpec.""" + is_artifact_list = type_annotations.is_list_of_artifacts(annotation) + if is_artifact_list: + annotation = type_annotations.get_inner_type(annotation) + + if type_annotations.issubclass_of_artifact(annotation): + typ = type_utils.create_bundled_artifact_type(annotation.schema_title, + annotation.schema_version) + else: + typ = type_utils._annotation_to_type_struct(annotation) + return {'type': typ, 'is_artifact_list': is_artifact_list} - inputs[io_name] = input_spec - #Analyzing the return type annotations. - return_ann = signature.return_annotation - if not containerized: - if hasattr(return_ann, '_fields'): #NamedTuple - # Getting field type annotations. - # __annotations__ does not exist in python 3.5 and earlier - # _field_types does not exist in python 3.9 and later - field_annotations = getattr(return_ann, '__annotations__', - None) or getattr( - return_ann, '_field_types', None) - for field_name in return_ann._fields: - output_name = _maybe_make_unique(field_name, output_names) - output_names.add(output_name) - type_var = field_annotations.get(field_name) - if type_annotations.is_list_of_artifacts(type_var): - artifact_cls = type_var.__args__[0] - output_spec = structures.OutputSpec( - type=type_utils.create_bundled_artifact_type( - artifact_cls.schema_title, - artifact_cls.schema_version), - is_artifact_list=True) - elif type_annotations.is_artifact_class(type_var): - output_spec = structures.OutputSpec( - type=type_utils.create_bundled_artifact_type( - type_var.schema_title, type_var.schema_version)) - else: - type_struct = type_utils._annotation_to_type_struct( - type_var) - output_spec = structures.OutputSpec(type=type_struct) - outputs[output_name] = output_spec - # Deprecated dict-based way of declaring multiple outputs. Was only used by - # the @component decorator - elif isinstance(return_ann, dict): - warnings.warn( - 'The ability to specify multiple outputs using the dict syntax' - ' has been deprecated. It will be removed soon after release' - ' 0.1.32. Please use typing.NamedTuple to declare multiple' - ' outputs.') - for output_name, output_type_annotation in return_ann.items(): - output_type_struct = type_utils._annotation_to_type_struct( - output_type_annotation) - output_spec = structures.OutputSpec(type=output_type_struct) - outputs[name] = output_spec - elif signature.return_annotation is not None and signature.return_annotation != inspect.Parameter.empty: - output_name = _maybe_make_unique(SINGLE_OUTPUT_NAME, output_names) - # Fixes exotic, but possible collision: - # `def func(output_path: OutputPath()) -> str: ...` - output_names.add(output_name) - return_ann = signature.return_annotation - if type_annotations.is_list_of_artifacts(return_ann): - artifact_cls = return_ann.__args__[0] - output_spec = structures.OutputSpec( - type=type_utils.create_bundled_artifact_type( - artifact_cls.schema_title, artifact_cls.schema_version), - is_artifact_list=True) - elif type_annotations.is_artifact_class(return_ann): - output_spec = structures.OutputSpec( - type=type_utils.create_bundled_artifact_type( - return_ann.schema_title, return_ann.schema_version), - is_artifact_list=False) - else: - type_struct = type_utils._annotation_to_type_struct(return_ann) - output_spec = structures.OutputSpec(type=type_struct) - - outputs[output_name] = output_spec - elif return_ann != inspect.Parameter.empty and return_ann != structures.ContainerSpec: - raise TypeError( - 'Return annotation should be either ContainerSpec or omitted for container components.' +def make_output_spec(annotation: Any) -> structures.OutputSpec: + annotation = canonicalize_annotation(annotation) + args = make_input_output_spec_args(annotation) + return structures.OutputSpec(**args) + + +def make_input_spec(annotation: Any, + inspect_param: inspect.Parameter) -> structures.InputSpec: + """Makes an InputSpec from a cleaned output annotation.""" + annotation = canonicalize_annotation(annotation) + input_output_spec_args = make_input_output_spec_args(annotation) + + if (type_annotations.issubclass_of_artifact(annotation) or + input_output_spec_args['is_artifact_list'] + ) and inspect_param.default not in {None, inspect._empty}: + raise ValueError( + f'Optional Input artifacts may only have default value None. Got: {inspect_param.default}.' ) - component_name = name or _python_function_name_to_component_name( - func.__name__) + default = None if inspect_param.default == inspect.Parameter.empty or type_annotations.issubclass_of_artifact( + annotation) else inspect_param.default + + optional = inspect_param.default is not inspect.Parameter.empty or type_utils.is_task_final_status_type( + getattr(inspect_param.annotation, '__name__', '')) + return structures.InputSpec( + **input_output_spec_args, + default=default, + optional=optional, + ) + + +def extract_component_interface( + func: Callable, + containerized: bool = False, + description: Optional[str] = None, + name: Optional[str] = None, +) -> structures.ComponentSpec: def assign_descriptions( inputs_or_outputs: Mapping[str, Union[structures.InputSpec, @@ -417,23 +417,32 @@ def parse_docstring_with_return_as_args( return None - assign_descriptions(inputs, parsed_docstring.params) + signature = inspect.signature(func) + name_to_input_spec, name_to_output_spec = get_name_to_specs( + signature, containerized) + original_docstring = inspect.getdoc(func) + parsed_docstring = docstring_parser.parse(original_docstring) + + assign_descriptions(name_to_input_spec, parsed_docstring.params) modified_parsed_docstring = parse_docstring_with_return_as_args( original_docstring) if modified_parsed_docstring is not None: - assign_descriptions(outputs, modified_parsed_docstring.params) + assign_descriptions(name_to_output_spec, + modified_parsed_docstring.params) description = get_pipeline_description( decorator_description=description, docstring=parsed_docstring, ) + component_name = name or _python_function_name_to_component_name( + func.__name__) return structures.ComponentSpec( name=component_name, description=description, - inputs=inputs or None, - outputs=outputs or None, + inputs=name_to_input_spec or None, + outputs=name_to_output_spec or None, implementation=structures.Implementation(), ) @@ -573,7 +582,7 @@ def make_input_for_parameterized_container_component_function( Type[artifact_types.Artifact]] ) -> Union[placeholders.Placeholder, container_component_artifact_channel .ContainerComponentArtifactChannel]: - if type_annotations.is_input_artifact(annotation): + if type_annotations.is_artifact_wrapped_in_Input(annotation): if type_annotations.is_list_of_artifacts(annotation.__origin__): return placeholders.InputListOfArtifactsPlaceholder(name) @@ -581,7 +590,7 @@ def make_input_for_parameterized_container_component_function( return container_component_artifact_channel.ContainerComponentArtifactChannel( io_type='input', var_name=name) - elif type_annotations.is_output_artifact(annotation): + elif type_annotations.is_artifact_wrapped_in_Output(annotation): if type_annotations.is_list_of_artifacts(annotation.__origin__): return placeholders.OutputListOfArtifactsPlaceholder(name) diff --git a/sdk/python/kfp/dsl/executor.py b/sdk/python/kfp/dsl/executor.py index 7429c0de2b3..87d20e43c85 100644 --- a/sdk/python/kfp/dsl/executor.py +++ b/sdk/python/kfp/dsl/executor.py @@ -16,6 +16,7 @@ import os import re from typing import Any, Callable, Dict, List, Optional, Union +import warnings from kfp import dsl from kfp.dsl import task_final_status @@ -39,6 +40,12 @@ def __init__( self.func = function_to_execute self.executor_input = executor_input + self.executor_output_path = self.executor_input['outputs']['outputFile'] + + # drop executor_output.json part from the outputFile path + artifact_types.CONTAINER_TASK_ROOT = os.path.split( + self.executor_output_path)[0] + self.input_artifacts: Dict[str, Union[dsl.Artifact, List[dsl.Artifact]]] = {} self.output_artifacts: Dict[str, dsl.Artifact] = {} @@ -55,9 +62,14 @@ def assign_input_and_output_artifacts(self) -> None: if list_of_artifact_proto_structs: annotation = self.func.__annotations__[name] # InputPath has no attribute __origin__ and also should be handled as a single artifact - if type_annotations.is_Input_Output_artifact_annotation( - annotation) and type_annotations.is_list_of_artifacts( - annotation.__origin__): + annotation = type_annotations.maybe_strip_optional_from_annotation( + annotation) + is_list_of_artifacts = ( + type_annotations.is_Input_Output_artifact_annotation( + annotation) and + type_annotations.is_list_of_artifacts(annotation.__origin__) + ) or type_annotations.is_list_of_artifacts(annotation) + if is_list_of_artifacts: self.input_artifacts[name] = [ self.make_artifact( msg, @@ -129,7 +141,7 @@ def get_output_parameter_path(self, parameter_name: str) -> Optional[str]: path = parameter.get('outputFile', None) if path: - os.makedirs(os.path.dirname(path), exist_ok=True) + makedirs_recursively(path) return path def get_output_artifact_path(self, artifact_name: str) -> str: @@ -189,8 +201,29 @@ def handle_single_return_value(self, output_name: str, annotation_type: Any, f'Function `{self.func.__name__}` returned value of type {type(return_value)}; want type {origin_type}' ) self.write_output_parameter_value(output_name, return_value) + elif is_artifact(annotation_type): - self.write_output_artifact_payload(output_name, return_value) + if isinstance(return_value, artifact_types.Artifact): + # for -> Artifact annotations, where the user returns an artifact + artifact_name = self.executor_input['outputs']['artifacts'][ + output_name]['artifacts'][0]['name'] + # users should not override the name for Vertex Pipelines + # if empty string, replace + # else provide descriptive warning and prefer letting backend throw exception + running_on_vertex = 'VERTEX_AI_PIPELINES_RUN_LABELS' in os.environ + if running_on_vertex: + if return_value.name == '': + return_value.name = artifact_name + else: + # prefer letting the backend throw the runtime exception + warnings.warn( + f'If you are running your pipeline Vertex AI Pipelines, you should not provide a name for your artifact. It will be set to the Vertex artifact resource name {artifact_name} by default. Got value for name: {return_value.name}.', + RuntimeWarning, + stacklevel=2) + self.output_artifacts[output_name] = return_value + else: + # for -> Artifact annotations, where the user returns some data that the executor should serialize + self.write_output_artifact_payload(output_name, return_value) else: raise RuntimeError( f'Unknown return type: {annotation_type}. Must be one of the supported data types: https://www.kubeflow.org/docs/components/pipelines/v2/data-types/' @@ -209,18 +242,6 @@ def write_executor_output(self, Returns: Optional[str]: Returns the location of the executor_output file as a string if the file is written. Else, None. """ - if self.output_artifacts: - self.excutor_output['artifacts'] = {} - - for name, artifact in self.output_artifacts.items(): - runtime_artifact = { - 'name': artifact.name, - 'uri': artifact.uri, - 'metadata': artifact.metadata, - } - artifacts_list = {'artifacts': [runtime_artifact]} - - self.excutor_output['artifacts'][name] = artifacts_list if func_output is not None: if is_parameter(self.return_annotation) or is_artifact( @@ -248,6 +269,19 @@ def write_executor_output(self, f'Unknown return type: {self.return_annotation}. Must be one of `str`, `int`, `float`, a subclass of `Artifact`, or a NamedTuple collection of these types.' ) + if self.output_artifacts: + self.excutor_output['artifacts'] = {} + + for name, artifact in self.output_artifacts.items(): + runtime_artifact = { + 'name': artifact.name, + 'uri': artifact.uri, + 'metadata': artifact.metadata, + } + artifacts_list = {'artifacts': [runtime_artifact]} + + self.excutor_output['artifacts'][name] = artifacts_list + # This check is to ensure only one worker (in a mirrored, distributed training/compute strategy) attempts to write to the same executor output file at the same time using gcsfuse, which enforces immutability of files. write_file = True @@ -259,12 +293,10 @@ def write_executor_output(self, write_file = cluster_spec['task']['type'] in CHIEF_NODE_LABELS if write_file: - executor_output_path = self.executor_input['outputs']['outputFile'] - os.makedirs(os.path.dirname(executor_output_path), exist_ok=True) - with open(executor_output_path, 'w') as f: + makedirs_recursively(self.executor_output_path) + with open(self.executor_output_path, 'w') as f: f.write(json.dumps(self.excutor_output)) - return executor_output_path - + return self.executor_output_path return None def execute(self) -> Optional[str]: @@ -300,17 +332,23 @@ def execute(self) -> Optional[str]: error_message=value.get('error').get('message', None), ) + elif type_annotations.is_list_of_artifacts(v): + func_kwargs[k] = self.get_input_artifact(k) + elif is_parameter(v): value = self.get_input_parameter_value(k) if value is not None: func_kwargs[k] = value elif type_annotations.is_Input_Output_artifact_annotation(v): - if type_annotations.is_input_artifact(v): + if type_annotations.is_artifact_wrapped_in_Input(v): func_kwargs[k] = self.get_input_artifact(k) - if type_annotations.is_output_artifact(v): + if type_annotations.is_artifact_wrapped_in_Output(v): func_kwargs[k] = self.get_output_artifact(k) + elif is_artifact(v): + func_kwargs[k] = self.get_input_artifact(k) + elif isinstance(v, type_annotations.OutputPath): if is_parameter(v.type): func_kwargs[k] = self.get_output_parameter_path(k) diff --git a/sdk/python/kfp/dsl/executor_test.py b/sdk/python/kfp/dsl/executor_test.py index 4cc59693447..8b799d2c5a8 100644 --- a/sdk/python/kfp/dsl/executor_test.py +++ b/sdk/python/kfp/dsl/executor_test.py @@ -13,6 +13,7 @@ # limitations under the License. """Tests for kfp.dsl.executor.""" +import contextlib import json import os import tempfile @@ -41,10 +42,21 @@ class ExecutorTest(parameterized.TestCase): def setUp(cls): cls.maxDiff = None cls._test_dir = tempfile.mkdtemp() + + cls.prev_gcs_prefix = artifact_types._GCS_LOCAL_MOUNT_PREFIX + cls.prev_minio_prefix = artifact_types._MINIO_LOCAL_MOUNT_PREFIX + cls.prev_s3_prefix = artifact_types._S3_LOCAL_MOUNT_PREFIX + artifact_types._GCS_LOCAL_MOUNT_PREFIX = cls._test_dir + '/' artifact_types._MINIO_LOCAL_MOUNT_PREFIX = cls._test_dir + '/minio/' artifact_types._S3_LOCAL_MOUNT_PREFIX = cls._test_dir + '/s3/' + @classmethod + def tearDown(cls): + artifact_types._GCS_LOCAL_MOUNT_PREFIX = cls.prev_gcs_prefix + artifact_types._MINIO_LOCAL_MOUNT_PREFIX = cls.prev_minio_prefix + artifact_types._S3_LOCAL_MOUNT_PREFIX = cls.prev_s3_prefix + def execute(self, func: Callable, executor_input: str) -> None: executor_input_dict = json.loads(executor_input % {'test_dir': self._test_dir}) @@ -52,12 +64,12 @@ def execute(self, func: Callable, executor_input: str) -> None: executor.Executor( executor_input=executor_input_dict, function_to_execute=func).execute() + return executor_input_dict['outputs']['outputFile'] def execute_and_load_output_metadata(self, func: Callable, executor_input: str) -> dict: - self.execute(func, executor_input) - with open(os.path.join(self._test_dir, 'output_metadata.json'), - 'r') as f: + output_file = self.execute(func, executor_input) + with open(output_file) as f: return json.loads(f.read()) def test_input_and_output_parameters(self): @@ -1153,6 +1165,76 @@ def test_func(input_artifact: Input[Artifact]): input_artifact.name, 'projects/123/locations/us-central1/metadataStores/default/artifacts/input_artifact' ) + + output_metadata = self.execute_and_load_output_metadata( + test_func, executor_input) + + self.assertDictEqual(output_metadata, {}) + + def test_single_artifact_input_pythonic(self): + executor_input = """\ + { + "inputs": { + "artifacts": { + "input_artifact": { + "artifacts": [ + { + "metadata": {}, + "name": "projects/123/locations/us-central1/metadataStores/default/artifacts/input_artifact", + "type": { + "schemaTitle": "system.Artifact" + }, + "uri": "gs://some-bucket/output/input_artifact" + } + ] + } + } + }, + "outputs": { + "outputFile": "%(test_dir)s/output_metadata.json" + } + } + """ + + def test_func(input_artifact: Artifact): + self.assertIsInstance(input_artifact, Artifact) + self.assertEqual( + input_artifact.name, + 'projects/123/locations/us-central1/metadataStores/default/artifacts/input_artifact' + ) + + output_metadata = self.execute_and_load_output_metadata( + test_func, executor_input) + + self.assertDictEqual(output_metadata, {}) + + def test_single_artifact_input_pythonic_with_optional(self): + executor_input = """\ + { + "inputs": { + "artifacts": { + "input_artifact": { + "artifacts": [ + { + "metadata": {}, + "name": "projects/123/locations/us-central1/metadataStores/default/artifacts/input_artifact", + "type": { + "schemaTitle": "system.Artifact" + }, + "uri": "gs://some-bucket/output/input_artifact" + } + ] + } + } + }, + "outputs": { + "outputFile": "%(test_dir)s/output_metadata.json" + } + } + """ + + def test_func(input_artifact: Optional[Artifact] = None): + self.assertIsInstance(input_artifact, Artifact) self.assertEqual( input_artifact.name, 'projects/123/locations/us-central1/metadataStores/default/artifacts/input_artifact' @@ -1163,6 +1245,224 @@ def test_func(input_artifact: Input[Artifact]): self.assertDictEqual(output_metadata, {}) + def test_single_artifact_output_pythonic(self): + executor_input = """\ + { + "inputs": {}, + "outputs": { + "artifacts": { + "Output": { + "artifacts": [ + { + "metadata": {}, + "name": "projects/123/locations/us-central1/metadataStores/default/artifacts/123", + "type": { + "schemaTitle": "system.Artifact" + }, + "uri": "gs://some-bucket/output" + } + ] + } + }, + "outputFile": "%(test_dir)s/output_metadata.json" + } + } + """ + + def test_func() -> Artifact: + return Artifact( + uri='gs://manually_specified_bucket/foo', + metadata={'data': 123}, + ) + + with temporary_envvar('VERTEX_AI_PIPELINES_RUN_LABELS', '12325'): + output_metadata = self.execute_and_load_output_metadata( + test_func, executor_input) + + self.assertDictEqual( + output_metadata, { + 'artifacts': { + 'Output': { + 'artifacts': [{ + 'name': + 'projects/123/locations/us-central1/metadataStores/default/artifacts/123', + 'uri': + 'gs://manually_specified_bucket/foo', + 'metadata': { + 'data': 123 + } + }] + } + }, + }) + + def test_single_artifact_output_pythonic_with_get_uri(self): + executor_input = """\ + { + "inputs": {}, + "outputs": { + "artifacts": { + "Output": { + "artifacts": [ + { + "metadata": {}, + "name": "projects/123/locations/us-central1/metadataStores/default/artifacts/123", + "type": { + "schemaTitle": "system.Artifact" + }, + "uri": "gs://some-bucket/output" + } + ] + } + }, + "outputFile": "%(test_dir)s/another_bucket/output_metadata.json" + } + } + """ + + def test_func() -> Artifact: + return Artifact( + uri=dsl.get_uri(suffix='my_artifact'), + metadata={'data': 123}, + ) + + with temporary_envvar('VERTEX_AI_PIPELINES_RUN_LABELS', '12325'): + output_metadata = self.execute_and_load_output_metadata( + test_func, executor_input) + + self.assertDictEqual( + output_metadata, { + 'artifacts': { + 'Output': { + 'artifacts': [{ + 'name': + 'projects/123/locations/us-central1/metadataStores/default/artifacts/123', + 'uri': + 'gs://another_bucket/my_artifact', + 'metadata': { + 'data': 123 + } + }] + } + }, + }) + + def test_multiple_artifact_output_pythonic_with_get_uri(self): + executor_input = """\ + { + "inputs": {}, + "outputs": { + "artifacts": { + "a": { + "artifacts": [ + { + "metadata": {}, + "name": "projects/123/locations/us-central1/metadataStores/default/artifacts/123", + "type": { + "schemaTitle": "system.Artifact" + }, + "uri": "gs://some-bucket/output" + } + ] + }, + "d": { + "artifacts": [ + { + "metadata": {}, + "name": "projects/123/locations/us-central1/metadataStores/default/artifacts/321", + "type": { + "schemaTitle": "system.Dataset" + }, + "uri": "gs://some-bucket/output" + } + ] + } + }, + "outputFile": "%(test_dir)s/another_bucket/output_metadata.json" + } + } + """ + + def test_func() -> NamedTuple('outputs', a=Artifact, d=Dataset): + outputs = NamedTuple('outputs', a=Artifact, d=Dataset) + return outputs( + a=Artifact( + uri=dsl.get_uri(suffix='artifact'), + metadata={'data': 123}, + ), + d=Dataset( + uri=dsl.get_uri(suffix='dataset'), + metadata={}, + )) + + with temporary_envvar('VERTEX_AI_PIPELINES_RUN_LABELS', '12325'): + output_metadata = self.execute_and_load_output_metadata( + test_func, executor_input) + + self.assertDictEqual( + output_metadata, { + 'artifacts': { + 'a': { + 'artifacts': [{ + 'name': + 'projects/123/locations/us-central1/metadataStores/default/artifacts/123', + 'uri': + 'gs://another_bucket/artifact', + 'metadata': { + 'data': 123 + } + }] + }, + 'd': { + 'artifacts': [{ + 'name': + 'projects/123/locations/us-central1/metadataStores/default/artifacts/321', + 'uri': + 'gs://another_bucket/dataset', + 'metadata': {} + }] + } + }, + }) + + def test_warns_if_artifact_name_for_vertex(self): + executor_input = """\ + { + "inputs": {}, + "outputs": { + "artifacts": { + "Output": { + "artifacts": [ + { + "metadata": {}, + "name": "projects/123/locations/us-central1/metadataStores/default/artifacts/123", + "type": { + "schemaTitle": "system.Artifact" + }, + "uri": "gs://some-bucket/output" + } + ] + } + }, + "outputFile": "%(test_dir)s/output_metadata.json" + } + } + """ + + def test_func() -> Artifact: + return Artifact( + name='illegal_custom_name', + uri='gs://manually_specified_bucket/foo', + metadata={'data': 123}, + ) + + with temporary_envvar('VERTEX_AI_PIPELINES_RUN_LABELS', '12325'): + with self.assertWarnsRegex( + RuntimeWarning, + r'If you are running your pipeline Vertex AI Pipelines, you should not provide a name for your artifact\. It will be set to the Vertex artifact resource name projects/123/locations/us-central1/metadataStores/default/artifacts/123 by default\. Got value for name: illegal_custom_name\.' + ): + self.execute_and_load_output_metadata(test_func, executor_input) + def test_list_of_artifacts_input(self): executor_input = """\ { @@ -1212,6 +1512,104 @@ def test_func(input_list: Input[List[Artifact]]): self.assertDictEqual(output_metadata, {}) + def test_list_of_artifacts_input_pythonic(self): + executor_input = """\ + { + "inputs": { + "artifacts": { + "input_list": { + "artifacts": [ + { + "metadata": {}, + "name": "projects/123/locations/us-central1/metadataStores/default/artifacts/input_list/0", + "type": { + "schemaTitle": "system.Artifact" + }, + "uri": "gs://some-bucket/output/input_list/0" + }, + { + "metadata": {}, + "name": "projects/123/locations/us-central1/metadataStores/default/artifacts/input_list/1", + "type": { + "schemaTitle": "system.Artifact" + }, + "uri": "gs://some-bucket/output/input_list/1" + } + ] + } + } + }, + "outputs": { + "outputFile": "%(test_dir)s/output_metadata.json" + } + } + """ + + def test_func(input_list: List[Artifact]): + self.assertEqual(len(input_list), 2) + self.assertEqual( + input_list[0].name, + 'projects/123/locations/us-central1/metadataStores/default/artifacts/input_list/0' + ) + self.assertEqual( + input_list[1].name, + 'projects/123/locations/us-central1/metadataStores/default/artifacts/input_list/1' + ) + + output_metadata = self.execute_and_load_output_metadata( + test_func, executor_input) + + self.assertDictEqual(output_metadata, {}) + + def test_list_of_artifacts_input_pythonic_with_optional(self): + executor_input = """\ + { + "inputs": { + "artifacts": { + "input_list": { + "artifacts": [ + { + "metadata": {}, + "name": "projects/123/locations/us-central1/metadataStores/default/artifacts/input_list/0", + "type": { + "schemaTitle": "system.Artifact" + }, + "uri": "gs://some-bucket/output/input_list/0" + }, + { + "metadata": {}, + "name": "projects/123/locations/us-central1/metadataStores/default/artifacts/input_list/1", + "type": { + "schemaTitle": "system.Artifact" + }, + "uri": "gs://some-bucket/output/input_list/1" + } + ] + } + } + }, + "outputs": { + "outputFile": "%(test_dir)s/output_metadata.json" + } + } + """ + + def test_func(input_list: List[Artifact] = None): + self.assertEqual(len(input_list), 2) + self.assertEqual( + input_list[0].name, + 'projects/123/locations/us-central1/metadataStores/default/artifacts/input_list/0' + ) + self.assertEqual( + input_list[1].name, + 'projects/123/locations/us-central1/metadataStores/default/artifacts/input_list/1' + ) + + output_metadata = self.execute_and_load_output_metadata( + test_func, executor_input) + + self.assertDictEqual(output_metadata, {}) + class TestDictToArtifact(parameterized.TestCase): @@ -1329,5 +1727,23 @@ def test_dict_to_artifact_kfp_artifact( executor.create_artifact_instance(runtime_artifact), expected_type) +@contextlib.contextmanager +def temporary_envvar(key: str, value: str) -> None: + # Save the old value if it exists + old_value = os.environ.get(key, None) + + # Set the new value + os.environ[key] = value + + try: + yield + finally: + # Restore the old value or delete the key if it didn't exist before + if old_value is not None: + os.environ[key] = old_value + else: + del os.environ[key] + + if __name__ == '__main__': unittest.main() diff --git a/sdk/python/kfp/dsl/types/artifact_types.py b/sdk/python/kfp/dsl/types/artifact_types.py index 2c6999c2d83..f7a676573de 100644 --- a/sdk/python/kfp/dsl/types/artifact_types.py +++ b/sdk/python/kfp/dsl/types/artifact_types.py @@ -11,9 +11,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""Classes for input/output Artifacts in KFP SDK.""" +"""Classes and utilities for using and creating artifacts in components.""" +import os from typing import Dict, List, Optional, Type +import warnings _GCS_LOCAL_MOUNT_PREFIX = '/gcs/' _MINIO_LOCAL_MOUNT_PREFIX = '/minio/' @@ -90,13 +92,17 @@ def _get_path(self) -> Optional[str]: return None def _set_path(self, path: str) -> None: - if path.startswith(_GCS_LOCAL_MOUNT_PREFIX): - path = 'gs://' + path[len(_GCS_LOCAL_MOUNT_PREFIX):] - elif path.startswith(_MINIO_LOCAL_MOUNT_PREFIX): - path = 'minio://' + path[len(_MINIO_LOCAL_MOUNT_PREFIX):] - elif path.startswith(_S3_LOCAL_MOUNT_PREFIX): - path = 's3://' + path[len(_S3_LOCAL_MOUNT_PREFIX):] - self.uri = path + self.uri = convert_local_path_to_remote_path(path) + + +def convert_local_path_to_remote_path(path: str) -> str: + if path.startswith(_GCS_LOCAL_MOUNT_PREFIX): + return 'gs://' + path[len(_GCS_LOCAL_MOUNT_PREFIX):] + elif path.startswith(_MINIO_LOCAL_MOUNT_PREFIX): + return 'minio://' + path[len(_MINIO_LOCAL_MOUNT_PREFIX):] + elif path.startswith(_S3_LOCAL_MOUNT_PREFIX): + return 's3://' + path[len(_S3_LOCAL_MOUNT_PREFIX):] + return path class Model(Artifact): @@ -470,3 +476,38 @@ class Markdown(Artifact): Markdown, ] } + +CONTAINER_TASK_ROOT: Optional[str] = None + + +# suffix default of 'Output' should be the same key as the default key for a +# single output component, but use value not variable for reference docs +def get_uri(suffix: str = 'Output') -> str: + """Gets the task root URI, a unique object storage URI associated with the + current task. This function may only be called at task runtime. + + Returns an empty string if the task root cannot be inferred from the runtime environment. + + Args: + suffix: A suffix to append to the URI. This is a helpful for creating unique subdirectories when the component has multiple outputs. + + Returns: + The URI or empty string. + """ + if CONTAINER_TASK_ROOT is None: + raise RuntimeError( + f"'dsl.{get_uri.__name__}' can only be called at task runtime. The task root is unknown in the current environment." + ) + UNSUPPORTED_KFP_PATH = '/tmp/kfp_outputs' + if CONTAINER_TASK_ROOT == UNSUPPORTED_KFP_PATH: + warnings.warn( + f'dsl.{get_uri.__name__} is not yet supported by the KFP backend. Please specify a URI explicitly.', + RuntimeWarning, + stacklevel=2, + ) + # return empty string, not None, to conform with logic in artifact + # constructor which immediately converts uri=None to uri='' + # this way the .path property can worry about handling fewer input types + return '' + remote_task_root = convert_local_path_to_remote_path(CONTAINER_TASK_ROOT) + return os.path.join(remote_task_root, suffix) diff --git a/sdk/python/kfp/dsl/types/artifact_types_test.py b/sdk/python/kfp/dsl/types/artifact_types_test.py index 917ad95a456..c34f4a6bba0 100644 --- a/sdk/python/kfp/dsl/types/artifact_types_test.py +++ b/sdk/python/kfp/dsl/types/artifact_types_test.py @@ -13,18 +13,20 @@ # limitations under the License. """Tests for kfp.components.types.artifact_types.""" +import contextlib import json import os import unittest from absl.testing import parameterized +from kfp import dsl from kfp.dsl.types import artifact_types -class ArtifactsTest(parameterized.TestCase): +class ArtifactsTest(unittest.TestCase): def test_complex_metrics(self): - metrics = artifact_types.ClassificationMetrics() + metrics = dsl.ClassificationMetrics() metrics.log_roc_data_point(threshold=0.1, tpr=98.2, fpr=96.2) metrics.log_roc_data_point(threshold=24.3, tpr=24.5, fpr=98.4) metrics.set_confusion_matrix_categories(['dog', 'cat', 'horses']) @@ -41,7 +43,7 @@ def test_complex_metrics(self): self.assertEqual(expected_json, metrics.metadata) def test_complex_metrics_bulk_loading(self): - metrics = artifact_types.ClassificationMetrics() + metrics = dsl.ClassificationMetrics() metrics.log_roc_curve( fpr=[85.1, 85.1, 85.1], tpr=[52.6, 52.6, 52.6], @@ -57,5 +59,92 @@ def test_complex_metrics_bulk_loading(self): self.assertEqual(expected_json, metrics.metadata) +@contextlib.contextmanager +def set_temporary_task_root(task_root: str): + artifact_types.CONTAINER_TASK_ROOT = task_root + try: + yield + finally: + artifact_types.CONTAINER_TASK_ROOT = None + + +class TestGetUri(unittest.TestCase): + + def test_raise_if_no_env_var(self): + + with self.assertRaisesRegex( + RuntimeError, + r"'dsl\.get_uri' can only be called at task runtime\. The task root is unknown in the current environment\." + ): + dsl.get_uri() + + def test_default_gcs(self): + with set_temporary_task_root( + '/gcs/my_bucket/123456789/abc-09-14-2023-14-21-53/foo_123456789' + ): + self.assertEqual( + 'gs://my_bucket/123456789/abc-09-14-2023-14-21-53/foo_123456789/Output', + dsl.get_uri()) + + def test_default_s3(self): + with set_temporary_task_root( + '/s3/my_bucket/123456789/abc-09-14-2023-14-21-53/foo_123456789' + ): + self.assertEqual( + 's3://my_bucket/123456789/abc-09-14-2023-14-21-53/foo_123456789/Output', + dsl.get_uri()) + + def test_default_minio(self): + with set_temporary_task_root( + '/minio/my_bucket/123456789/abc-09-14-2023-14-21-53/foo_123456789' + ): + self.assertEqual( + 'minio://my_bucket/123456789/abc-09-14-2023-14-21-53/foo_123456789/Output', + dsl.get_uri()) + + def test_suffix_arg_gcs(self): + with set_temporary_task_root( + '/gcs/my_bucket/123456789/abc-09-14-2023-14-21-53/foo_123456789' + ): + self.assertEqual( + 'gs://my_bucket/123456789/abc-09-14-2023-14-21-53/foo_123456789/model', + dsl.get_uri('model')) + + def test_suffix_arg_tmp_no_suffix(self): + with set_temporary_task_root('/tmp/kfp_outputs'): + with self.assertWarnsRegex( + RuntimeWarning, + r'dsl\.get_uri is not yet supported by the KFP backend\. Please specify a URI explicitly\.' + ): + actual = dsl.get_uri('model') + self.assertEqual('', actual) + + def test_suffix_arg_tmp_with_suffix(self): + with set_temporary_task_root('/tmp/kfp_outputs'): + with self.assertWarnsRegex( + RuntimeWarning, + r'dsl\.get_uri is not yet supported by the KFP backend\. Please specify a URI explicitly\.' + ): + actual = dsl.get_uri('model') + self.assertEqual('', actual) + + +class TestConvertLocalPathToRemotePath(parameterized.TestCase): + + @parameterized.parameters([{ + 'local_path': local_path, + 'expected': expected + } for local_path, expected in [ + ('/gcs/foo/bar', 'gs://foo/bar'), + ('/minio/foo/bar', 'minio://foo/bar'), + ('/s3/foo/bar', 's3://foo/bar'), + ('/tmp/kfp_outputs', '/tmp/kfp_outputs'), + ('/some/random/path', '/some/random/path'), + ]]) + def test_gcs(self, local_path, expected): + actual = artifact_types.convert_local_path_to_remote_path(local_path) + self.assertEqual(actual, expected) + + if __name__ == '__main__': unittest.main() diff --git a/sdk/python/kfp/dsl/types/type_annotations.py b/sdk/python/kfp/dsl/types/type_annotations.py index 1d9e2f2b0e7..cd6adb89d8b 100644 --- a/sdk/python/kfp/dsl/types/type_annotations.py +++ b/sdk/python/kfp/dsl/types/type_annotations.py @@ -135,7 +135,7 @@ def is_Input_Output_artifact_annotation(typ) -> bool: return True -def is_input_artifact(typ) -> bool: +def is_artifact_wrapped_in_Input(typ: Any) -> bool: """Returns True if typ is of type Input[T].""" if not is_Input_Output_artifact_annotation(typ): return False @@ -143,7 +143,7 @@ def is_input_artifact(typ) -> bool: return typ.__metadata__[0] == InputAnnotation -def is_output_artifact(typ) -> bool: +def is_artifact_wrapped_in_Output(typ: Any) -> bool: """Returns True if typ is of type Output[T].""" if not is_Input_Output_artifact_annotation(typ): return False @@ -160,14 +160,19 @@ def get_io_artifact_class(typ): return None # extract inner type from list of artifacts - inner = typ.__args__[0] + inner = strip_Input_or_Output_marker(typ) if hasattr(inner, '__origin__') and inner.__origin__ == list: return inner.__args__[0] return inner -def get_io_artifact_annotation(typ): +def strip_Input_or_Output_marker(typ: Any) -> artifact_types.Artifact: + return typ.__args__[0] + + +def get_input_or_output_marker( + typ) -> Optional[Union[InputAnnotation, OutputAnnotation]]: if not is_Input_Output_artifact_annotation(typ): return None diff --git a/sdk/python/kfp/dsl/types/type_annotations_test.py b/sdk/python/kfp/dsl/types/type_annotations_test.py index b57e2540824..df34682ce06 100644 --- a/sdk/python/kfp/dsl/types/type_annotations_test.py +++ b/sdk/python/kfp/dsl/types/type_annotations_test.py @@ -58,21 +58,24 @@ def test_is_not_artifact_annotation(self, annotation): Input, ]) def test_is_input_artifact(self, annotation): - self.assertTrue(type_annotations.is_input_artifact(annotation)) + self.assertTrue( + type_annotations.is_artifact_wrapped_in_Input(annotation)) @parameterized.parameters([ Output[Model], Output, ]) def test_is_not_input_artifact(self, annotation): - self.assertFalse(type_annotations.is_input_artifact(annotation)) + self.assertFalse( + type_annotations.is_artifact_wrapped_in_Input(annotation)) @parameterized.parameters([ Output[Model], Output[List[Model]], ]) def test_is_output_artifact(self, annotation): - self.assertTrue(type_annotations.is_output_artifact(annotation)) + self.assertTrue( + type_annotations.is_artifact_wrapped_in_Output(annotation)) @parameterized.parameters([ Input[Model], @@ -80,7 +83,8 @@ def test_is_output_artifact(self, annotation): Input, ]) def test_is_not_output_artifact(self, annotation): - self.assertFalse(type_annotations.is_output_artifact(annotation)) + self.assertFalse( + type_annotations.is_artifact_wrapped_in_Output(annotation)) def test_get_io_artifact_class(self): self.assertEqual( @@ -97,26 +101,26 @@ def test_get_io_artifact_class(self): def test_get_io_artifact_annotation(self): self.assertEqual( - type_annotations.get_io_artifact_annotation(Output[Model]), + type_annotations.get_input_or_output_marker(Output[Model]), OutputAnnotation) self.assertEqual( - type_annotations.get_io_artifact_annotation(Output[List[Model]]), + type_annotations.get_input_or_output_marker(Output[List[Model]]), OutputAnnotation) self.assertEqual( - type_annotations.get_io_artifact_annotation(Input[Model]), + type_annotations.get_input_or_output_marker(Input[Model]), InputAnnotation) self.assertEqual( - type_annotations.get_io_artifact_annotation(Input[List[Model]]), + type_annotations.get_input_or_output_marker(Input[List[Model]]), InputAnnotation) self.assertEqual( - type_annotations.get_io_artifact_annotation(Input), InputAnnotation) + type_annotations.get_input_or_output_marker(Input), InputAnnotation) self.assertEqual( - type_annotations.get_io_artifact_annotation(Output), + type_annotations.get_input_or_output_marker(Output), OutputAnnotation) self.assertEqual( - type_annotations.get_io_artifact_annotation(Model), None) - self.assertEqual(type_annotations.get_io_artifact_annotation(str), None) + type_annotations.get_input_or_output_marker(Model), None) + self.assertEqual(type_annotations.get_input_or_output_marker(str), None) @parameterized.parameters( { diff --git a/sdk/python/test_data/pipelines/pythonic_artifact_with_single_return.py b/sdk/python/test_data/pipelines/pythonic_artifact_with_single_return.py new file mode 100644 index 00000000000..31353e852b7 --- /dev/null +++ b/sdk/python/test_data/pipelines/pythonic_artifact_with_single_return.py @@ -0,0 +1,58 @@ +# Copyright 2023 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kfp import dsl +from kfp.dsl import Dataset +from kfp.dsl import Model + + +@dsl.component(packages_to_install=['dill==0.3.7']) +def make_language_model(text_dataset: Dataset) -> Model: + # dill allows pickling objects belonging to a function's local namespace + import dill + + with open(text_dataset.path) as f: + text = f.read() + + # insert train on text here # + + def dummy_model(x: str) -> str: + return x + + model = Model( + uri=dsl.get_uri(suffix='model'), + metadata={'data': text_dataset.name}, + ) + + with open(model.path, 'wb') as f: + dill.dump(dummy_model, f) + + return model + + +@dsl.pipeline +def make_language_model_pipeline() -> Model: + importer = dsl.importer( + artifact_uri='gs://ml-pipeline-playground/shakespeare1.txt', + artifact_class=Dataset, + reimport=False, + metadata={'key': 'value'}) + return make_language_model(text_dataset=importer.output).output + + +if __name__ == '__main__': + from kfp import compiler + compiler.Compiler().compile( + pipeline_func=make_language_model_pipeline, + package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/test_data/pipelines/pythonic_artifact_with_single_return.yaml b/sdk/python/test_data/pipelines/pythonic_artifact_with_single_return.yaml new file mode 100644 index 00000000000..d010d50a6ba --- /dev/null +++ b/sdk/python/test_data/pipelines/pythonic_artifact_with_single_return.yaml @@ -0,0 +1,123 @@ +# PIPELINE DEFINITION +# Name: make-language-model-pipeline +# Outputs: +# Output: system.Model +components: + comp-importer: + executorLabel: exec-importer + inputDefinitions: + parameters: + uri: + parameterType: STRING + outputDefinitions: + artifacts: + artifact: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + comp-make-language-model: + executorLabel: exec-make-language-model + inputDefinitions: + artifacts: + text_dataset: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + outputDefinitions: + artifacts: + Output: + artifactType: + schemaTitle: system.Model + schemaVersion: 0.0.1 +deploymentSpec: + executors: + exec-importer: + importer: + artifactUri: + constant: gs://ml-pipeline-playground/shakespeare1.txt + metadata: + key: value + typeSchema: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + exec-make-language-model: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - make_language_model + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.2.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'dill==0.3.7'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef make_language_model(text_dataset: Dataset) -> Model:\n # dill\ + \ allows pickling objects belonging to a function's local namespace\n \ + \ import dill\n\n with open(text_dataset.path) as f:\n text =\ + \ f.read()\n\n # insert train on text here #\n\n def dummy_model(x:\ + \ str) -> str:\n return x\n\n model = Model(\n uri=dsl.get_uri(suffix='model'),\n\ + \ metadata={'data': text_dataset.name},\n )\n\n with open(model.path,\ + \ 'wb') as f:\n dill.dump(dummy_model, f)\n\n return model\n\n" + image: python:3.7 +pipelineInfo: + name: make-language-model-pipeline +root: + dag: + outputs: + artifacts: + Output: + artifactSelectors: + - outputArtifactKey: Output + producerSubtask: make-language-model + tasks: + importer: + cachingOptions: + enableCache: true + componentRef: + name: comp-importer + inputs: + parameters: + uri: + runtimeValue: + constant: gs://ml-pipeline-playground/shakespeare1.txt + taskInfo: + name: importer + make-language-model: + cachingOptions: + enableCache: true + componentRef: + name: comp-make-language-model + dependentTasks: + - importer + inputs: + artifacts: + text_dataset: + taskOutputArtifact: + outputArtifactKey: artifact + producerTask: importer + taskInfo: + name: make-language-model + outputDefinitions: + artifacts: + Output: + artifactType: + schemaTitle: system.Model + schemaVersion: 0.0.1 +schemaVersion: 2.1.0 +sdkVersion: kfp-2.2.0 diff --git a/sdk/python/test_data/pipelines/pythonic_artifacts_with_list_of_artifacts.py b/sdk/python/test_data/pipelines/pythonic_artifacts_with_list_of_artifacts.py new file mode 100644 index 00000000000..899bc483df4 --- /dev/null +++ b/sdk/python/test_data/pipelines/pythonic_artifacts_with_list_of_artifacts.py @@ -0,0 +1,52 @@ +# Copyright 2023 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List + +from kfp import dsl +from kfp.dsl import Dataset + + +@dsl.component +def make_dataset(text: str) -> Dataset: + dataset = Dataset(uri=dsl.get_uri(), metadata={'length': len(text)}) + with open(dataset.path, 'w') as f: + f.write(text) + return dataset + + +@dsl.component +def join_datasets(datasets: List[Dataset]) -> Dataset: + texts = [] + for dataset in datasets: + with open(dataset.path, 'r') as f: + texts.append(f.read()) + + return ''.join(texts) + + +@dsl.pipeline +def make_and_join_datasets( + texts: List[str] = ['Hello', ',', ' ', 'world!']) -> Dataset: + with dsl.ParallelFor(texts) as text: + t1 = make_dataset(text=text) + + return join_datasets(datasets=dsl.Collected(t1.output)).output + + +if __name__ == '__main__': + from kfp import compiler + compiler.Compiler().compile( + pipeline_func=make_and_join_datasets, + package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/test_data/pipelines/pythonic_artifacts_with_list_of_artifacts.yaml b/sdk/python/test_data/pipelines/pythonic_artifacts_with_list_of_artifacts.yaml new file mode 100644 index 00000000000..53b5fb17f8a --- /dev/null +++ b/sdk/python/test_data/pipelines/pythonic_artifacts_with_list_of_artifacts.yaml @@ -0,0 +1,187 @@ +# PIPELINE DEFINITION +# Name: make-and-join-datasets +# Inputs: +# texts: list [Default: ['Hello', ',', ' ', 'world!']] +# Outputs: +# Output: system.Dataset +components: + comp-for-loop-1: + dag: + outputs: + artifacts: + pipelinechannel--make-dataset-Output: + artifactSelectors: + - outputArtifactKey: Output + producerSubtask: make-dataset + tasks: + make-dataset: + cachingOptions: + enableCache: true + componentRef: + name: comp-make-dataset + inputs: + parameters: + text: + componentInputParameter: pipelinechannel--texts-loop-item + taskInfo: + name: make-dataset + inputDefinitions: + parameters: + pipelinechannel--texts: + parameterType: LIST + pipelinechannel--texts-loop-item: + parameterType: STRING + outputDefinitions: + artifacts: + pipelinechannel--make-dataset-Output: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + isArtifactList: true + comp-join-datasets: + executorLabel: exec-join-datasets + inputDefinitions: + artifacts: + datasets: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + isArtifactList: true + outputDefinitions: + artifacts: + Output: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + comp-make-dataset: + executorLabel: exec-make-dataset + inputDefinitions: + parameters: + text: + parameterType: STRING + outputDefinitions: + artifacts: + Output: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 +deploymentSpec: + executors: + exec-join-datasets: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - join_datasets + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.2.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef join_datasets(datasets: List[Dataset]) -> Dataset:\n texts\ + \ = []\n for dataset in datasets:\n with open(dataset.path, 'r')\ + \ as f:\n texts.append(f.read())\n\n return ''.join(texts)\n\ + \n" + image: python:3.7 + exec-make-dataset: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - make_dataset + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.2.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef make_dataset(text: str) -> Dataset:\n dataset = Dataset(uri=dsl.get_uri(),\ + \ metadata={'length': len(text)})\n with open(dataset.path, 'w') as f:\n\ + \ f.write(text)\n return dataset\n\n" + image: python:3.7 +pipelineInfo: + name: make-and-join-datasets +root: + dag: + outputs: + artifacts: + Output: + artifactSelectors: + - outputArtifactKey: Output + producerSubtask: join-datasets + tasks: + for-loop-1: + componentRef: + name: comp-for-loop-1 + inputs: + parameters: + pipelinechannel--texts: + componentInputParameter: texts + parameterIterator: + itemInput: pipelinechannel--texts-loop-item + items: + inputParameter: pipelinechannel--texts + taskInfo: + name: for-loop-1 + join-datasets: + cachingOptions: + enableCache: true + componentRef: + name: comp-join-datasets + dependentTasks: + - for-loop-1 + inputs: + artifacts: + datasets: + taskOutputArtifact: + outputArtifactKey: pipelinechannel--make-dataset-Output + producerTask: for-loop-1 + taskInfo: + name: join-datasets + inputDefinitions: + parameters: + texts: + defaultValue: + - Hello + - ',' + - ' ' + - world! + isOptional: true + parameterType: LIST + outputDefinitions: + artifacts: + Output: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 +schemaVersion: 2.1.0 +sdkVersion: kfp-2.2.0 diff --git a/sdk/python/test_data/pipelines/pythonic_artifacts_with_multiple_returns.py b/sdk/python/test_data/pipelines/pythonic_artifacts_with_multiple_returns.py new file mode 100644 index 00000000000..845ebb74728 --- /dev/null +++ b/sdk/python/test_data/pipelines/pythonic_artifacts_with_multiple_returns.py @@ -0,0 +1,93 @@ +# Copyright 2023 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import NamedTuple + +from kfp import dsl +from kfp.dsl import Artifact +from kfp.dsl import Dataset + + +@dsl.component +def dataset_splitter( + in_dataset: Dataset +) -> NamedTuple( + 'outputs', + dataset1=Dataset, + dataset2=Dataset, +): + + with open(in_dataset.path) as f: + in_data = f.read() + + out_data1, out_data2 = in_data[:len(in_data) // 2], in_data[len(in_data) // + 2:] + + dataset1 = Dataset( + uri=dsl.get_uri(suffix='dataset1'), + metadata={'original_data': in_dataset.name}, + ) + with open(dataset1.path, 'w') as f: + f.write(out_data1) + + dataset2 = Dataset( + uri=dsl.get_uri(suffix='dataset2'), + metadata={'original_data': in_dataset.name}, + ) + with open(dataset2.path, 'w') as f: + f.write(out_data2) + + outputs = NamedTuple( + 'outputs', + dataset1=Dataset, + dataset2=Dataset, + ) + return outputs(dataset1=dataset1, dataset2=dataset2) + + +outputs = NamedTuple( + 'outputs', + dataset1=Dataset, + dataset2=Dataset, +) + + +@dsl.pipeline +def splitter_pipeline(in_dataset: Dataset) -> outputs: + task = dataset_splitter(in_dataset=in_dataset) + return outputs( + task.outputs['dataset1'], + task.outputs['dataset1'], + ) + + +@dsl.component +def make_dataset() -> Artifact: + artifact = Artifact(uri=dsl.get_uri('dataset')) + with open(artifact.path, 'w') as f: + f.write('Hello, world') + return artifact + + +@dsl.pipeline +def split_datasets_and_return_first() -> Dataset: + t1 = make_dataset() + return splitter_pipeline(in_dataset=t1.output).outputs['dataset1'] + + +if __name__ == '__main__': + from kfp import compiler + compiler.Compiler().compile( + pipeline_func=split_datasets_and_return_first, + package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/test_data/pipelines/pythonic_artifacts_with_multiple_returns.yaml b/sdk/python/test_data/pipelines/pythonic_artifacts_with_multiple_returns.yaml new file mode 100644 index 00000000000..2f655b097cf --- /dev/null +++ b/sdk/python/test_data/pipelines/pythonic_artifacts_with_multiple_returns.yaml @@ -0,0 +1,184 @@ +# PIPELINE DEFINITION +# Name: split-datasets-and-return-first +# Outputs: +# Output: system.Dataset +components: + comp-dataset-splitter: + executorLabel: exec-dataset-splitter + inputDefinitions: + artifacts: + in_dataset: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + outputDefinitions: + artifacts: + dataset1: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + dataset2: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + comp-make-dataset: + executorLabel: exec-make-dataset + outputDefinitions: + artifacts: + Output: + artifactType: + schemaTitle: system.Artifact + schemaVersion: 0.0.1 + comp-splitter-pipeline: + dag: + outputs: + artifacts: + dataset1: + artifactSelectors: + - outputArtifactKey: dataset1 + producerSubtask: dataset-splitter + dataset2: + artifactSelectors: + - outputArtifactKey: dataset1 + producerSubtask: dataset-splitter + tasks: + dataset-splitter: + cachingOptions: + enableCache: true + componentRef: + name: comp-dataset-splitter + inputs: + artifacts: + in_dataset: + componentInputArtifact: in_dataset + taskInfo: + name: dataset-splitter + inputDefinitions: + artifacts: + in_dataset: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + outputDefinitions: + artifacts: + dataset1: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + dataset2: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 +deploymentSpec: + executors: + exec-dataset-splitter: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - dataset_splitter + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.2.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef dataset_splitter(\n in_dataset: Dataset\n) -> NamedTuple(\n\ + \ 'outputs',\n dataset1=Dataset,\n dataset2=Dataset,\n\ + ):\n\n with open(in_dataset.path) as f:\n in_data = f.read()\n\ + \n out_data1, out_data2 = in_data[:len(in_data) // 2], in_data[len(in_data)\ + \ //\n 2:]\n\ + \n dataset1 = Dataset(\n uri=dsl.get_uri(suffix='dataset1'),\n\ + \ metadata={'original_data': in_dataset.name},\n )\n with open(dataset1.path,\ + \ 'w') as f:\n f.write(out_data1)\n\n dataset2 = Dataset(\n \ + \ uri=dsl.get_uri(suffix='dataset2'),\n metadata={'original_data':\ + \ in_dataset.name},\n )\n with open(dataset2.path, 'w') as f:\n \ + \ f.write(out_data2)\n\n outputs = NamedTuple(\n 'outputs',\n\ + \ dataset1=Dataset,\n dataset2=Dataset,\n )\n return\ + \ outputs(dataset1=dataset1, dataset2=dataset2)\n\n" + image: python:3.7 + exec-make-dataset: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - make_dataset + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.2.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef make_dataset() -> Artifact:\n artifact = Artifact(uri=dsl.get_uri('dataset'))\n\ + \ with open(artifact.path, 'w') as f:\n f.write('Hello, world')\n\ + \ return artifact\n\n" + image: python:3.7 +pipelineInfo: + name: split-datasets-and-return-first +root: + dag: + outputs: + artifacts: + Output: + artifactSelectors: + - outputArtifactKey: dataset1 + producerSubtask: splitter-pipeline + tasks: + make-dataset: + cachingOptions: + enableCache: true + componentRef: + name: comp-make-dataset + taskInfo: + name: make-dataset + splitter-pipeline: + cachingOptions: + enableCache: true + componentRef: + name: comp-splitter-pipeline + dependentTasks: + - make-dataset + inputs: + artifacts: + in_dataset: + taskOutputArtifact: + outputArtifactKey: Output + producerTask: make-dataset + taskInfo: + name: splitter-pipeline + outputDefinitions: + artifacts: + Output: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 +schemaVersion: 2.1.0 +sdkVersion: kfp-2.2.0 diff --git a/sdk/python/test_data/test_data_config.yaml b/sdk/python/test_data/test_data_config.yaml index d64d7a1aea3..7421c83303e 100644 --- a/sdk/python/test_data/test_data_config.yaml +++ b/sdk/python/test_data/test_data_config.yaml @@ -180,6 +180,15 @@ pipelines: - module: if_elif_else_with_oneof_parameters name: outer_pipeline execute: false + - module: pythonic_artifact_with_single_return + name: make_language_model_pipeline + execute: false + - module: pythonic_artifacts_with_multiple_returns + name: split_datasets_and_return_first + execute: false + - module: pythonic_artifacts_with_list_of_artifacts + name: make_and_join_datasets + execute: false components: test_data_dir: sdk/python/test_data/components read: true diff --git a/test/sdk-execution-tests/requirements.txt b/test/sdk-execution-tests/requirements.txt index d9df3d9858d..bf44f120498 100644 --- a/test/sdk-execution-tests/requirements.txt +++ b/test/sdk-execution-tests/requirements.txt @@ -1,4 +1,3 @@ sdk/python pytest==7.1.3 pytest-asyncio-cooperative==0.28.0 -pytest-mock==3.8.2 diff --git a/test/sdk-execution-tests/sdk_execution_tests.py b/test/sdk-execution-tests/sdk_execution_tests.py index 1613cb627b7..b05f185acbb 100644 --- a/test/sdk-execution-tests/sdk_execution_tests.py +++ b/test/sdk-execution-tests/sdk_execution_tests.py @@ -65,7 +65,9 @@ def create_test_case_parameters() -> List[TestCase]: return parameters -def wait(run_result: client.client.RunPipelineResult) -> kfp_server_api.V2beta1Run: +def wait( + run_result: client.client.RunPipelineResult +) -> kfp_server_api.V2beta1Run: return kfp_client.wait_for_run_completion( run_id=run_result.run_id, timeout=int(TIMEOUT_SECONDS)) @@ -104,16 +106,14 @@ def get_kfp_package_path() -> str: return path -partial_component_decorator = functools.partial( +dsl.component = functools.partial( dsl.component, kfp_package_path=get_kfp_package_path()) @pytest.mark.asyncio_cooperative @pytest.mark.parametrize('test_case', create_test_case_parameters()) -async def test(test_case: TestCase, mocker) -> None: +async def test(test_case: TestCase) -> None: """Asynchronously runs all samples and test that they succeed.""" - mocker.patch.object(dsl, 'component', partial_component_decorator) - event_loop = asyncio.get_running_loop() try: run_url, run_result = run(test_case) @@ -123,3 +123,7 @@ async def test(test_case: TestCase, mocker) -> None: api_run = await event_loop.run_in_executor(None, wait, run_result) assert api_run.state == 'SUCCEEDED', f'Pipeline {test_case.name} ended with incorrect status: {api_run.state}. More info: {run_url}' + + +if __name__ == '__main__': + pytest.main()