From 94eaa875b372ab07e572ffe1740a84079131751e Mon Sep 17 00:00:00 2001 From: Connor McCarthy Date: Thu, 17 Mar 2022 21:00:40 -0600 Subject: [PATCH] chore(sdk): make kfp v2 hermetic (#7428) * move v1 files to v2 * format with yapf * remove unused imports * clean up * update copyright * move imports to module top level * apply some pylint changes * loosen pylintrc * remove v2_yaml_utils.py --- .pylintrc | 15 +- sdk/python/kfp/components/structures.py | 8 +- sdk/python/kfp/components/types/type_utils.py | 14 +- .../kfp/components/types/type_utils_test.py | 17 +- sdk/python/kfp/components/v1_components.py | 44 + sdk/python/kfp/components/v1_modelbase.py | 394 ++++++++ sdk/python/kfp/components/v1_structures.py | 855 ++++++++++++++++++ 7 files changed, 1318 insertions(+), 29 deletions(-) create mode 100644 sdk/python/kfp/components/v1_components.py create mode 100644 sdk/python/kfp/components/v1_modelbase.py create mode 100644 sdk/python/kfp/components/v1_structures.py diff --git a/.pylintrc b/.pylintrc index 82084297119d..e4c6f40f7503 100644 --- a/.pylintrc +++ b/.pylintrc @@ -66,7 +66,7 @@ confidence= # no Warning level messages displayed, use"--disable=all --enable=classes # --disable=W" # TODO(numerology): enable missing-module-docstring after finish the effort. -disable=missing-module-docstring, unspecified-encoding +disable=missing-module-docstring, unspecified-encoding, missing-function-docstring [REPORTS] @@ -99,10 +99,6 @@ evaluation=10.0 - ((float(5 * error + warning + refactor + convention) / stateme [BASIC] -# Good variable names which should always be accepted, separated by a comma -# s3 is whitelisted for its special meaning. -good-names=i,j,k,f,ex,Run,_,s3 - # Bad variable names which should always be refused, separated by a comma bad-names=foo,bar,baz,toto,tutu,tata @@ -117,9 +113,6 @@ include-naming-hint=no # to this list to register other decorators that produce valid properties. property-classes=abc.abstractproperty -# Regular expression matching correct variable names -variable-rgx=[a-z_][a-z0-9_]{2,30}$ - # Naming hint for variable names variable-name-hint=[a-z_][a-z0-9_]{2,30}$ @@ -185,6 +178,7 @@ no-docstring-rgx=^test_ # ones are exempt. docstring-min-length=-1 +disable=invalid-name [ELIF] @@ -296,6 +290,7 @@ generated-members=set_shape,np.float32 # produce valid context managers. contextmanager-decorators=contextlib.contextmanager +disable=redefined-builtin [VARIABLES] @@ -318,6 +313,8 @@ callbacks=cb_,_cb # builtins. redefining-builtins-modules=six.moves,future.builtins +disable=unused-argument + [CLASSES] @@ -332,7 +329,7 @@ valid-metaclass-classmethod-first-arg=mcs # List of member names, which should be excluded from the protected access # warning. -exclude-protected=_asdict,_fields,_replace,_source,_make +disable=protected-access [DESIGN] diff --git a/sdk/python/kfp/components/structures.py b/sdk/python/kfp/components/structures.py index d6db20885fb5..e52fe9d52331 100644 --- a/sdk/python/kfp/components/structures.py +++ b/sdk/python/kfp/components/structures.py @@ -1,4 +1,4 @@ -# Copyright 2021 The Kubeflow Authors +# Copyright 2021-2022 The Kubeflow Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -21,8 +21,8 @@ import pydantic import yaml from kfp.components import utils -from kfp.deprecated.components import _components -from kfp.deprecated.components import structures as v1_structures +from kfp.components import v1_components +from kfp.components import v1_structures class BaseModel(pydantic.BaseModel): @@ -587,7 +587,7 @@ def load_from_component_yaml(cls, component_yaml: str) -> 'ComponentSpec': try: return ComponentSpec.parse_obj(json_component) except (pydantic.ValidationError, AttributeError): - v1_component = _components._load_component_spec_from_component_text( + v1_component = v1_components._load_component_spec_from_component_text( component_yaml) return cls.from_v1_component_spec(v1_component) diff --git a/sdk/python/kfp/components/types/type_utils.py b/sdk/python/kfp/components/types/type_utils.py index 1943e08b9455..ab2587ad562b 100644 --- a/sdk/python/kfp/components/types/type_utils.py +++ b/sdk/python/kfp/components/types/type_utils.py @@ -1,4 +1,4 @@ -# Copyright 2020 The Kubeflow Authors +# Copyright 2020-2022 The Kubeflow Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,17 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. """Utilities for component I/O type mapping.""" -import json import inspect import re import warnings -from typing import Dict, List, Optional, Type, Union +from typing import List, Optional, Type, Union -from kfp.deprecated.components import structures -from kfp.pipeline_spec import pipeline_spec_pb2 +import kfp from kfp.components import task_final_status +from kfp.components import v1_structures from kfp.components.types import artifact_types from kfp.components.types import type_annotations +from kfp.pipeline_spec import pipeline_spec_pb2 PARAMETER_TYPES = Union[str, int, float, bool, dict, list] @@ -179,7 +179,7 @@ def get_parameter_type_field_name(type_name: Optional[str]) -> str: def get_input_artifact_type_schema( input_name: str, - inputs: List[structures.InputSpec], + inputs: List[v1_structures.InputSpec], ) -> Optional[str]: """Find the input artifact type by input name. @@ -249,7 +249,7 @@ def verify_type_compatibility( error_text = error_message_prefix + ( 'Argument type "{}" is incompatible with the input type "{}"' ).format(str(given_type), str(expected_type)) - import kfp.deprecated as kfp + if kfp.TYPE_CHECK: raise InconsistentTypeException(error_text) else: diff --git a/sdk/python/kfp/components/types/type_utils_test.py b/sdk/python/kfp/components/types/type_utils_test.py index f7db405fed81..69dd5af2d211 100644 --- a/sdk/python/kfp/components/types/type_utils_test.py +++ b/sdk/python/kfp/components/types/type_utils_test.py @@ -1,4 +1,4 @@ -# Copyright 2020 The Kubeflow Authors +# Copyright 2020-2022 The Kubeflow Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,16 +11,15 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import sys import unittest from typing import Any, Dict, List, Union from absl.testing import parameterized -from kfp.deprecated.components import structures -from kfp.pipeline_spec import pipeline_spec_pb2 as pb -from kfp.components.types import artifact_types, type_utils +from kfp.components import v1_structures +from kfp.components.types import artifact_types +from kfp.components.types import type_utils from kfp.components.types.type_utils import InconsistentTypeException +from kfp.pipeline_spec import pipeline_spec_pb2 as pb _PARAMETER_TYPES = [ 'String', @@ -302,9 +301,9 @@ def test_get_parameter_type_invalid(self): def test_get_input_artifact_type_schema(self): input_specs = [ - structures.InputSpec(name='input1', type='String'), - structures.InputSpec(name='input2', type='Model'), - structures.InputSpec(name='input3', type=None), + v1_structures.InputSpec(name='input1', type='String'), + v1_structures.InputSpec(name='input2', type='Model'), + v1_structures.InputSpec(name='input3', type=None), ] # input not found. with self.assertRaises(AssertionError) as cm: diff --git a/sdk/python/kfp/components/v1_components.py b/sdk/python/kfp/components/v1_components.py new file mode 100644 index 000000000000..3c85b17c5bd2 --- /dev/null +++ b/sdk/python/kfp/components/v1_components.py @@ -0,0 +1,44 @@ +# Copyright 2018-2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import hashlib +import warnings + +import yaml +from kfp.components import v1_structures + + +def _load_component_spec_from_component_text( + text) -> v1_structures.ComponentSpec: + component_dict = yaml.safe_load(text) + component_spec = v1_structures.ComponentSpec.from_dict(component_dict) + + if isinstance(component_spec.implementation, + v1_structures.ContainerImplementation) and ( + component_spec.implementation.container.command is None): + warnings.warn( + 'Container component must specify command to be compatible with KFP ' + 'v2 compatible mode and emissary executor, which will be the default' + ' executor for KFP v2.' + 'https://www.kubeflow.org/docs/components/pipelines/installation/choose-executor/', + category=FutureWarning, + ) + + # Calculating hash digest for the component + data = text if isinstance(text, bytes) else text.encode('utf-8') + data = data.replace(b'\r\n', b'\n') # Normalizing line endings + digest = hashlib.sha256(data).hexdigest() + component_spec._digest = digest + + return component_spec diff --git a/sdk/python/kfp/components/v1_modelbase.py b/sdk/python/kfp/components/v1_modelbase.py new file mode 100644 index 000000000000..0091aff7ea88 --- /dev/null +++ b/sdk/python/kfp/components/v1_modelbase.py @@ -0,0 +1,394 @@ +# Copyright 2018-2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import inspect +from collections import OrderedDict +from collections import abc +from typing import (Any, Dict, List, Mapping, MutableMapping, MutableSequence, + Sequence, Type, TypeVar, Union, cast, get_type_hints) + +T = TypeVar('T') + + +def verify_object_against_type(x: Any, typ: Type[T]) -> T: + """Verifies that the object is compatible to the specified type (types from + the typing package can be used).""" + #TODO: Merge with parse_object_from_struct_based_on_type which has almost the same code + if typ is type(None): + if x is None: + return x + else: + raise TypeError('Error: Object "{}" is not None.'.format(x)) + + if typ is Any or type(typ) is TypeVar: + return x + + try: #isinstance can fail for generics + if isinstance(x, typ): + return cast(typ, x) + except Exception: + pass + + if hasattr(typ, '__origin__'): #Handling generic types + if typ.__origin__ is Union: #Optional == Union + exception_map = {} + possible_types = typ.__args__ + if type( + None + ) in possible_types and x is None: #Shortcut for Optional[] tests. Can be removed, but the exceptions will be more noisy. + return x + for possible_type in possible_types: + try: + verify_object_against_type(x, possible_type) + return x + except Exception as ex: + exception_map[possible_type] = ex + pass + #exception_lines = ['Exception for type {}: {}.'.format(t, e) for t, e in exception_map.items()] + exception_lines = [str(e) for t, e in exception_map.items()] + exception_lines.append( + 'Error: Object "{}" is incompatible with type "{}".'.format( + x, typ)) + raise TypeError('\n'.join(exception_lines)) + + #not Union => not None + if x is None: + raise TypeError( + 'Error: None object is incompatible with type {}'.format(typ)) + + #assert isinstance(x, typ.__origin__) + generic_type = typ.__origin__ or getattr( + typ, '__extra__', None + ) #In python <3.7 typing.List.__origin__ == None; Python 3.7 has working __origin__, but no __extra__ TODO: Remove the __extra__ once we move to Python 3.7 + if generic_type in [ + list, List, abc.Sequence, abc.MutableSequence, Sequence, + MutableSequence + ] and type(x) is not str: #! str is also Sequence + if not isinstance(x, generic_type): + raise TypeError( + 'Error: Object "{}" is incompatible with type "{}"'.format( + x, typ)) + # In Python <3.7 Mapping.__args__ is None. + # In Python 3.9 typ.__args__ does not exist when the generic type does not have subscripts + type_args = typ.__args__ if getattr( + typ, '__args__', None) is not None else (Any, Any) + inner_type = type_args[0] + for item in x: + verify_object_against_type(item, inner_type) + return x + + elif generic_type in [ + dict, Dict, abc.Mapping, abc.MutableMapping, Mapping, + MutableMapping, OrderedDict + ]: + if not isinstance(x, generic_type): + raise TypeError( + 'Error: Object "{}" is incompatible with type "{}"'.format( + x, typ)) + # In Python <3.7 Mapping.__args__ is None. + # In Python 3.9 typ.__args__ does not exist when the generic type does not have subscripts + type_args = typ.__args__ if getattr( + typ, '__args__', None) is not None else (Any, Any) + inner_key_type = type_args[0] + inner_value_type = type_args[1] + for k, v in x.items(): + verify_object_against_type(k, inner_key_type) + verify_object_against_type(v, inner_value_type) + return x + + else: + raise TypeError( + 'Error: Unsupported generic type "{}". type.__origin__ or type.__extra__ == "{}"' + .format(typ, generic_type)) + + raise TypeError('Error: Object "{}" is incompatible with type "{}"'.format( + x, typ)) + + +def parse_object_from_struct_based_on_type(struct: Any, typ: Type[T]) -> T: + """Constructs an object from structure (usually dict) based on type. + + Supports list and dict types from the typing package plus Optional[] + and Union[] types. If some type is a class that has .from_dict class + method, that method is used for object construction. + """ + if typ is type(None): + if struct is None: + return None + else: + raise TypeError('Error: Structure "{}" is not None.'.format(struct)) + + if typ is Any or type(typ) is TypeVar: + return struct + + try: #isinstance can fail for generics + #if (isinstance(struct, typ) + # and not (typ is Sequence and type(struct) is str) #! str is also Sequence + # and not (typ is int and type(struct) is bool) #! bool is int + #): + if type(struct) is typ: + return struct + except: + pass + if hasattr(typ, 'from_dict'): + try: #More informative errors + return typ.from_dict(struct) + except Exception as ex: + raise TypeError( + 'Error: {}.from_dict(struct={}) failed with exception:\n{}' + .format(typ.__name__, struct, str(ex))) + if hasattr(typ, '__origin__'): #Handling generic types + if typ.__origin__ is Union: #Optional == Union + results = {} + exception_map = {} + # In Python 3.9 typ.__args__ does not exist when the generic type does not have subscripts + # Union without subscripts seems useless, but semantically it should be the same as Any. + possible_types = list(getattr(typ, '__args__', [Any])) + #if type(None) in possible_types and struct is None: #Shortcut for Optional[] tests. Can be removed, but the exceptions will be more noisy. + # return None + #Hack for Python <3.7 which for some reason "simplifies" Union[bool, int, ...] to just Union[int, ...] + if int in possible_types: + possible_types = possible_types + [bool] + for possible_type in possible_types: + try: + obj = parse_object_from_struct_based_on_type( + struct, possible_type) + results[possible_type] = obj + except Exception as ex: + if isinstance(ex, TypeError): + exception_map[possible_type] = ex + else: + exception_map[ + possible_type] = 'Unexpected exception when trying to convert structure "{}" to type "{}": {}: {}'.format( + struct, typ, type(ex), ex) + pass + + #Single successful parsing. + if len(results) == 1: + return list(results.values())[0] + + if len(results) > 1: + raise TypeError( + 'Error: Structure "{}" is ambiguous. It can be parsed to multiple types: {}.' + .format(struct, list(results.keys()))) + + exception_lines = [str(e) for t, e in exception_map.items()] + exception_lines.append( + 'Error: Structure "{}" is incompatible with type "{}" - none of the types in Union are compatible.' + .format(struct, typ)) + raise TypeError('\n'.join(exception_lines)) + #not Union => not None + if struct is None: + raise TypeError( + 'Error: None structure is incompatible with type {}'.format( + typ)) + + #assert isinstance(x, typ.__origin__) + generic_type = typ.__origin__ or getattr( + typ, '__extra__', None + ) #In python <3.7 typing.List.__origin__ == None; Python 3.7 has working __origin__, but no __extra__ TODO: Remove the __extra__ once we move to Python 3.7 + if generic_type in [ + list, List, abc.Sequence, abc.MutableSequence, Sequence, + MutableSequence + ] and type(struct) is not str: #! str is also Sequence + if not isinstance(struct, generic_type): + raise TypeError( + 'Error: Structure "{}" is incompatible with type "{}" - it does not have list type.' + .format(struct, typ)) + # In Python <3.7 Mapping.__args__ is None. + # In Python 3.9 typ.__args__ does not exist when the generic type does not have subscripts + type_args = typ.__args__ if getattr( + typ, '__args__', None) is not None else (Any, Any) + inner_type = type_args[0] + return [ + parse_object_from_struct_based_on_type(item, inner_type) + for item in struct + ] + + elif generic_type in [ + dict, Dict, abc.Mapping, abc.MutableMapping, Mapping, + MutableMapping, OrderedDict + ]: #in Python <3.7 there is a difference between abc.Mapping and typing.Mapping + if not isinstance(struct, generic_type): + raise TypeError( + 'Error: Structure "{}" is incompatible with type "{}" - it does not have dict type.' + .format(struct, typ)) + # In Python <3.7 Mapping.__args__ is None. + # In Python 3.9 typ.__args__ does not exist when the generic type does not have subscripts + type_args = typ.__args__ if getattr( + typ, '__args__', None) is not None else (Any, Any) + inner_key_type = type_args[0] + inner_value_type = type_args[1] + return { + parse_object_from_struct_based_on_type(k, inner_key_type): + parse_object_from_struct_based_on_type(v, inner_value_type) + for k, v in struct.items() + } + + else: + raise TypeError( + 'Error: Unsupported generic type "{}". type.__origin__ or type.__extra__ == "{}"' + .format(typ, generic_type)) + + raise TypeError( + 'Error: Structure "{}" is incompatible with type "{}". Structure is not the instance of the type, the type does not have .from_dict method and is not generic.' + .format(struct, typ)) + + +def convert_object_to_struct(obj, serialized_names: Mapping[str, str] = {}): + """Converts an object to structure (usually a dict). + + Serializes all properties that do not start with underscores. If the + type of some property is a class that has .to_dict class method, + that method is used for conversion. Used by the ModelBase class. + """ + signature = inspect.signature(obj.__init__) #Needed for default values + result = {} + for python_name in signature.parameters: #TODO: Make it possible to specify the field ordering regardless of the presence of default values + value = getattr(obj, python_name) + if python_name.startswith('_'): + continue + attr_name = serialized_names.get(python_name, python_name) + if hasattr(value, "to_dict"): + result[attr_name] = value.to_dict() + elif isinstance(value, list): + result[attr_name] = [ + (x.to_dict() if hasattr(x, 'to_dict') else x) for x in value + ] + elif isinstance(value, dict): + result[attr_name] = { + k: (v.to_dict() if hasattr(v, 'to_dict') else v) + for k, v in value.items() + } + else: + param = signature.parameters.get(python_name, None) + if param is None or param.default == inspect.Parameter.empty or value != param.default: + result[attr_name] = value + + return result + + +def parse_object_from_struct_based_on_class_init( + cls: Type[T], + struct: Mapping, + serialized_names: Mapping[str, str] = {}) -> T: + """Constructs an object of specified class from structure (usually dict) + using the class.__init__ method. Converts all constructor arguments to + appropriate types based on the __init__ type hints. Used by the ModelBase + class. + + Arguments: + + serialized_names: specifies the mapping between __init__ parameter names and the structure key names for cases where these names are different (due to language syntax clashes or style differences). + """ + parameter_types = get_type_hints( + cls.__init__) #Properlty resolves forward references + + serialized_names_to_pythonic = {v: k for k, v in serialized_names.items()} + #If a pythonic name has a different original name, we forbid the pythonic name in the structure. Otherwise, this function would accept "python-styled" structures that should be invalid + forbidden_struct_keys = set( + serialized_names_to_pythonic.values()).difference( + serialized_names_to_pythonic.keys()) + args = {} + for original_name, value in struct.items(): + if original_name in forbidden_struct_keys: + raise ValueError( + 'Use "{}" key instead of pythonic key "{}" in the structure: {}.' + .format(serialized_names[original_name], original_name, struct)) + python_name = serialized_names_to_pythonic.get(original_name, + original_name) + param_type = parameter_types.get(python_name, None) + if param_type is not None: + args[python_name] = parse_object_from_struct_based_on_type( + value, param_type) + else: + args[python_name] = value + + return cls(**args) + + +class ModelBase: + """Base class for types that can be converted to JSON-like dict structures + or constructed from such structures. The object fields, their types and + default values are taken from the __init__ method arguments. Override the + _serialized_names mapping to control the key names of the serialized + structures. + + The derived class objects will have the .from_dict and .to_dict methods for conversion to or from structure. The base class constructor accepts the arguments map, checks the argument types and sets the object field values. + + Example derived class: + + class TaskSpec(ModelBase): + _serialized_names = { + 'component_ref': 'componentRef', + 'is_enabled': 'isEnabled', + } + + def __init__(self, + component_ref: ComponentReference, + arguments: Optional[Mapping[str, ArgumentType]] = None, + is_enabled: Optional[Union[ArgumentType, EqualsPredicate, NotEqualsPredicate]] = None, #Optional property with default value + ): + super().__init__(locals()) #Calling the ModelBase constructor to check the argument types and set the object field values. + + task_spec = TaskSpec.from_dict("{'componentRef': {...}, 'isEnabled: {'and': {...}}}") # = instance of TaskSpec + task_struct = task_spec.to_dict() #= "{'componentRef': {...}, 'isEnabled: {'and': {...}}}" + """ + _serialized_names = {} + + def __init__(self, args): + parameter_types = get_type_hints(self.__class__.__init__) + field_values = { + k: v + for k, v in args.items() + if k != 'self' and not k.startswith('_') + } + for k, v in field_values.items(): + parameter_type = parameter_types.get(k, None) + if parameter_type is not None: + try: + verify_object_against_type(v, parameter_type) + except Exception as e: + raise TypeError( + 'Argument for {} is not compatible with type "{}". Exception: {}' + .format(k, parameter_type, e)) + self.__dict__.update(field_values) + + @classmethod + def from_dict(cls: Type[T], struct: Mapping) -> T: + return parse_object_from_struct_based_on_class_init( + cls, struct, serialized_names=cls._serialized_names) + + def to_dict(self) -> Mapping: + return convert_object_to_struct( + self, serialized_names=self._serialized_names) + + def _get_field_names(self): + return list(inspect.signature(self.__init__).parameters) + + def __repr__(self): + return self.__class__.__name__ + '(' + ', '.join( + param + '=' + repr(getattr(self, param)) + for param in self._get_field_names()) + ')' + + def __eq__(self, other): + return self.__class__ == other.__class__ and { + k: getattr(self, k) for k in self._get_field_names() + } == {k: getattr(other, k) for k in other._get_field_names()} + + def __ne__(self, other): + return not self == other + + def __hash__(self): + return hash(repr(self)) diff --git a/sdk/python/kfp/components/v1_structures.py b/sdk/python/kfp/components/v1_structures.py new file mode 100644 index 000000000000..66c9726729c2 --- /dev/null +++ b/sdk/python/kfp/components/v1_structures.py @@ -0,0 +1,855 @@ +# Copyright 2018-2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections import OrderedDict +from typing import Any, Dict, List, Mapping, Optional, Union + +import yaml + +from .v1_modelbase import ModelBase + +PrimitiveTypes = Union[str, int, float, bool] +PrimitiveTypesIncludingNone = Optional[PrimitiveTypes] + +TypeSpecType = Union[str, Dict, List] + + +class InputSpec(ModelBase): + """Describes the component input specification.""" + + def __init__( + self, + name: str, + type: Optional[TypeSpecType] = None, + description: Optional[str] = None, + default: Optional[PrimitiveTypes] = None, + optional: Optional[bool] = False, + annotations: Optional[Dict[str, Any]] = None, + ): + super().__init__(locals()) + + +class OutputSpec(ModelBase): + """Describes the component output specification.""" + + def __init__( + self, + name: str, + type: Optional[TypeSpecType] = None, + description: Optional[str] = None, + annotations: Optional[Dict[str, Any]] = None, + ): + super().__init__(locals()) + + +class InputValuePlaceholder(ModelBase): #Non-standard attr names + """Represents the command-line argument placeholder that will be replaced + at run-time by the input argument value.""" + _serialized_names = { + 'input_name': 'inputValue', + } + + def __init__( + self, + input_name: str, + ): + super().__init__(locals()) + + +class InputPathPlaceholder(ModelBase): #Non-standard attr names + """Represents the command-line argument placeholder that will be replaced + at run-time by a local file path pointing to a file containing the input + argument value.""" + _serialized_names = { + 'input_name': 'inputPath', + } + + def __init__( + self, + input_name: str, + ): + super().__init__(locals()) + + +class OutputPathPlaceholder(ModelBase): #Non-standard attr names + """Represents the command-line argument placeholder that will be replaced + at run-time by a local file path pointing to a file where the program + should write its output data.""" + _serialized_names = { + 'output_name': 'outputPath', + } + + def __init__( + self, + output_name: str, + ): + super().__init__(locals()) + + +class InputUriPlaceholder(ModelBase): # Non-standard attr names + """Represents a placeholder for the URI of an input artifact. + + Represents the command-line argument placeholder that will be + replaced at run-time by the URI of the input artifact argument. + """ + _serialized_names = { + 'input_name': 'inputUri', + } + + def __init__( + self, + input_name: str, + ): + super().__init__(locals()) + + +class OutputUriPlaceholder(ModelBase): # Non-standard attr names + """Represents a placeholder for the URI of an output artifact. + + Represents the command-line argument placeholder that will be + replaced at run-time by a URI of the output artifac where the + program should write its output data. + """ + _serialized_names = { + 'output_name': 'outputUri', + } + + def __init__( + self, + output_name: str, + ): + super().__init__(locals()) + + +class InputMetadataPlaceholder(ModelBase): # Non-standard attr names + """Represents the file path to an input artifact metadata. + + During runtime, this command-line argument placeholder will be + replaced by the path where the metadata file associated with this + artifact has been written to. Currently only supported in v2 + components. + """ + _serialized_names = { + 'input_name': 'inputMetadata', + } + + def __init__(self, input_name: str): + super().__init__(locals()) + + +class InputOutputPortNamePlaceholder(ModelBase): # Non-standard attr names + """Represents the output port name of an input artifact. + + During compile time, this command-line argument placeholder will be + replaced by the actual output port name used by the producer task. + Currently only supported in v2 components. + """ + _serialized_names = { + 'input_name': 'inputOutputPortName', + } + + def __init__(self, input_name: str): + super().__init__(locals()) + + +class OutputMetadataPlaceholder(ModelBase): # Non-standard attr names + """Represents the output metadata JSON file location of this task. + + This file will encode the metadata information produced by this task: + - Artifacts metadata, but not the content of the artifact, and + - output parameters. + + Only supported in v2 components. + """ + _serialized_names = { + 'output_metadata': 'outputMetadata', + } + + def __init__(self, output_metadata: type(None) = None): + if output_metadata: + raise RuntimeError( + 'Output metadata placeholder cannot be associated with key') + super().__init__(locals()) + + def to_dict(self) -> Mapping[str, Any]: + # Override parent implementation. Otherwise it always returns {}. + return {'outputMetadata': None} + + +class ExecutorInputPlaceholder(ModelBase): # Non-standard attr names + """Represents the serialized ExecutorInput message at runtime. + + This placeholder will be replaced by a serialized + [ExecutorInput](https://github.com/kubeflow/pipelines/blob/61f9c2c328d245d89c9d9b8c923f24dbbd08cdc9/api/v2alpha1/pipeline_spec.proto#L730) + proto message at runtime, which includes parameters of the task, artifact + URIs and metadata. + """ + _serialized_names = { + 'executor_input': 'executorInput', + } + + def __init__(self, executor_input: type(None) = None): + if executor_input: + raise RuntimeError( + 'Executor input placeholder cannot be associated with input key' + '. Got %s' % executor_input) + super().__init__(locals()) + + def to_dict(self) -> Mapping[str, Any]: + # Override parent implementation. Otherwise it always returns {}. + return {'executorInput': None} + + +CommandlineArgumentType = Union[str, InputValuePlaceholder, + InputPathPlaceholder, OutputPathPlaceholder, + InputUriPlaceholder, OutputUriPlaceholder, + InputMetadataPlaceholder, + InputOutputPortNamePlaceholder, + OutputMetadataPlaceholder, + ExecutorInputPlaceholder, 'ConcatPlaceholder', + 'IfPlaceholder',] + + +class ConcatPlaceholder(ModelBase): #Non-standard attr names + """Represents the command-line argument placeholder that will be replaced + at run-time by the concatenated values of its items.""" + _serialized_names = { + 'items': 'concat', + } + + def __init__( + self, + items: List[CommandlineArgumentType], + ): + super().__init__(locals()) + + +class IsPresentPlaceholder(ModelBase): #Non-standard attr names + """Represents the command-line argument placeholder that will be replaced + at run-time by a boolean value specifying whether the caller has passed an + argument for the specified optional input.""" + _serialized_names = { + 'input_name': 'isPresent', + } + + def __init__( + self, + input_name: str, + ): + super().__init__(locals()) + + +IfConditionArgumentType = Union[bool, str, IsPresentPlaceholder, + InputValuePlaceholder] + + +class IfPlaceholderStructure(ModelBase): #Non-standard attr names + '''Used in by the IfPlaceholder - the command-line argument placeholder that will be replaced at run-time by the expanded value of either "then_value" or "else_value" depending on the submissio-time resolved value of the "cond" predicate.''' + _serialized_names = { + 'condition': 'cond', + 'then_value': 'then', + 'else_value': 'else', + } + + def __init__( + self, + condition: IfConditionArgumentType, + then_value: Union[CommandlineArgumentType, + List[CommandlineArgumentType]], + else_value: Optional[Union[CommandlineArgumentType, + List[CommandlineArgumentType]]] = None, + ): + super().__init__(locals()) + + +class IfPlaceholder(ModelBase): #Non-standard attr names + """Represents the command-line argument placeholder that will be replaced + at run-time by the expanded value of either "then_value" or "else_value" + depending on the submissio-time resolved value of the "cond" predicate.""" + _serialized_names = { + 'if_structure': 'if', + } + + def __init__( + self, + if_structure: IfPlaceholderStructure, + ): + super().__init__(locals()) + + +class ContainerSpec(ModelBase): + """Describes the container component implementation.""" + _serialized_names = { + 'file_outputs': + 'fileOutputs', #TODO: rename to something like legacy_unconfigurable_output_paths + } + + def __init__( + self, + image: str, + command: Optional[List[CommandlineArgumentType]] = None, + args: Optional[List[CommandlineArgumentType]] = None, + env: Optional[Mapping[str, str]] = None, + file_outputs: + Optional[Mapping[ + str, + str]] = None, #TODO: rename to something like legacy_unconfigurable_output_paths + ): + super().__init__(locals()) + + +class ContainerImplementation(ModelBase): + """Represents the container component implementation.""" + + def __init__( + self, + container: ContainerSpec, + ): + super().__init__(locals()) + + +ImplementationType = Union[ContainerImplementation, 'GraphImplementation'] + + +class MetadataSpec(ModelBase): + + def __init__( + self, + annotations: Optional[Dict[str, str]] = None, + labels: Optional[Dict[str, str]] = None, + ): + super().__init__(locals()) + + +class ComponentSpec(ModelBase): + """Component specification. + + Describes the metadata (name, description, annotations and labels), + the interface (inputs and outputs) and the implementation of the + component. + """ + + def __init__( + self, + name: Optional[str] = None, #? Move to metadata? + description: Optional[str] = None, #? Move to metadata? + metadata: Optional[MetadataSpec] = None, + inputs: Optional[List[InputSpec]] = None, + outputs: Optional[List[OutputSpec]] = None, + implementation: Optional[ImplementationType] = None, + version: Optional[str] = 'google.com/cloud/pipelines/component/v1', + #tags: Optional[Set[str]] = None, + ): + super().__init__(locals()) + self._post_init() + + def _post_init(self): + #Checking input names for uniqueness + self._inputs_dict = {} + if self.inputs: + for input in self.inputs: + if input.name in self._inputs_dict: + raise ValueError('Non-unique input name "{}"'.format( + input.name)) + self._inputs_dict[input.name] = input + + #Checking output names for uniqueness + self._outputs_dict = {} + if self.outputs: + for output in self.outputs: + if output.name in self._outputs_dict: + raise ValueError('Non-unique output name "{}"'.format( + output.name)) + self._outputs_dict[output.name] = output + + if isinstance(self.implementation, ContainerImplementation): + container = self.implementation.container + + if container.file_outputs: + for output_name, path in container.file_outputs.items(): + if output_name not in self._outputs_dict: + raise TypeError( + 'Unconfigurable output entry "{}" references non-existing output.' + .format({output_name: path})) + + def verify_arg(arg): + if arg is None: + pass + elif isinstance( + arg, (str, int, float, bool, OutputMetadataPlaceholder, + ExecutorInputPlaceholder)): + pass + elif isinstance(arg, list): + for arg2 in arg: + verify_arg(arg2) + elif isinstance( + arg, + (InputUriPlaceholder, InputValuePlaceholder, + InputPathPlaceholder, IsPresentPlaceholder, + InputMetadataPlaceholder, InputOutputPortNamePlaceholder)): + if arg.input_name not in self._inputs_dict: + raise TypeError( + 'Argument "{}" references non-existing input.' + .format(arg)) + elif isinstance(arg, + (OutputUriPlaceholder, OutputPathPlaceholder)): + if arg.output_name not in self._outputs_dict: + raise TypeError( + 'Argument "{}" references non-existing output.' + .format(arg)) + elif isinstance(arg, ConcatPlaceholder): + for arg2 in arg.items: + verify_arg(arg2) + elif isinstance(arg, IfPlaceholder): + verify_arg(arg.if_structure.condition) + verify_arg(arg.if_structure.then_value) + verify_arg(arg.if_structure.else_value) + else: + raise TypeError('Unexpected argument "{}"'.format(arg)) + + verify_arg(container.command) + verify_arg(container.args) + + if isinstance(self.implementation, GraphImplementation): + graph = self.implementation.graph + + if graph.output_values is not None: + for output_name, argument in graph.output_values.items(): + if output_name not in self._outputs_dict: + raise TypeError( + 'Graph output argument entry "{}" references non-existing output.' + .format({output_name: argument})) + + if graph.tasks is not None: + for task in graph.tasks.values(): + if task.arguments is not None: + for argument in task.arguments.values(): + if isinstance( + argument, GraphInputArgument + ) and argument.graph_input.input_name not in self._inputs_dict: + raise TypeError( + 'Argument "{}" references non-existing input.' + .format(argument)) + + def save(self, file_path: str): + """Saves the component definition to file. + + It can be shared online and later loaded using the + load_component function. + """ + + component_yaml = yaml.dump(self.to_dict(), sort_keys=False) + with open(file_path, 'w') as f: + f.write(component_yaml) + + +class ComponentReference(ModelBase): + """Component reference. + + Contains information that can be used to locate and load a component + by name, digest or URL + """ + + def __init__( + self, + name: Optional[str] = None, + digest: Optional[str] = None, + tag: Optional[str] = None, + url: Optional[str] = None, + spec: Optional[ComponentSpec] = None, + ): + super().__init__(locals()) + self._post_init() + + def _post_init(self) -> None: + if not any([self.name, self.digest, self.tag, self.url, self.spec]): + raise TypeError('Need at least one argument.') + + +class GraphInputReference(ModelBase): + """References the input of the graph (the scope is a single graph).""" + _serialized_names = { + 'input_name': 'inputName', + } + + def __init__( + self, + input_name: str, + type: + Optional[ + TypeSpecType] = None, # Can be used to override the reference data type + ): + super().__init__(locals()) + + def as_argument(self) -> 'GraphInputArgument': + return GraphInputArgument(graph_input=self) + + def with_type(self, type_spec: TypeSpecType) -> 'GraphInputReference': + return GraphInputReference( + input_name=self.input_name, + type=type_spec, + ) + + def without_type(self) -> 'GraphInputReference': + return self.with_type(None) + + +class GraphInputArgument(ModelBase): + """Represents the component argument value that comes from the graph + component input.""" + _serialized_names = { + 'graph_input': 'graphInput', + } + + def __init__( + self, + graph_input: GraphInputReference, + ): + super().__init__(locals()) + + +class TaskOutputReference(ModelBase): + """References the output of some task (the scope is a single graph).""" + _serialized_names = { + 'task_id': 'taskId', + 'output_name': 'outputName', + } + + def __init__( + self, + output_name: str, + task_id: + Optional[ + str] = None, # Used for linking to the upstream task in serialized component file. + task: + Optional[ + 'TaskSpec'] = None, # Used for linking to the upstream task in runtime since Task does not have an ID until inserted into a graph. + type: + Optional[ + TypeSpecType] = None, # Can be used to override the reference data type + ): + super().__init__(locals()) + if self.task_id is None and self.task is None: + raise TypeError('task_id and task cannot be None at the same time.') + + def with_type(self, type_spec: TypeSpecType) -> 'TaskOutputReference': + return TaskOutputReference( + output_name=self.output_name, + task_id=self.task_id, + task=self.task, + type=type_spec, + ) + + def without_type(self) -> 'TaskOutputReference': + return self.with_type(None) + + +class TaskOutputArgument(ModelBase + ): #Has additional constructor for convenience + """Represents the component argument value that comes from the output of + another task.""" + _serialized_names = { + 'task_output': 'taskOutput', + } + + def __init__( + self, + task_output: TaskOutputReference, + ): + super().__init__(locals()) + + @staticmethod + def construct( + task_id: str, + output_name: str, + ) -> 'TaskOutputArgument': + return TaskOutputArgument( + TaskOutputReference( + task_id=task_id, + output_name=output_name, + )) + + def with_type(self, type_spec: TypeSpecType) -> 'TaskOutputArgument': + return TaskOutputArgument( + task_output=self.task_output.with_type(type_spec),) + + def without_type(self) -> 'TaskOutputArgument': + return self.with_type(None) + + +ArgumentType = Union[PrimitiveTypes, GraphInputArgument, TaskOutputArgument] + + +class TwoOperands(ModelBase): + + def __init__( + self, + op1: ArgumentType, + op2: ArgumentType, + ): + super().__init__(locals()) + + +class BinaryPredicate(ModelBase): #abstract base type + + def __init__(self, operands: TwoOperands): + super().__init__(locals()) + + +class EqualsPredicate(BinaryPredicate): + """Represents the "equals" comparison predicate.""" + _serialized_names = {'operands': '=='} + + +class NotEqualsPredicate(BinaryPredicate): + """Represents the "not equals" comparison predicate.""" + _serialized_names = {'operands': '!='} + + +class GreaterThanPredicate(BinaryPredicate): + """Represents the "greater than" comparison predicate.""" + _serialized_names = {'operands': '>'} + + +class GreaterThanOrEqualPredicate(BinaryPredicate): + """Represents the "greater than or equal" comparison predicate.""" + _serialized_names = {'operands': '>='} + + +class LessThenPredicate(BinaryPredicate): + """Represents the "less than" comparison predicate.""" + _serialized_names = {'operands': '<'} + + +class LessThenOrEqualPredicate(BinaryPredicate): + """Represents the "less than or equal" comparison predicate.""" + _serialized_names = {'operands': '<='} + + +PredicateType = Union[ArgumentType, EqualsPredicate, NotEqualsPredicate, + GreaterThanPredicate, GreaterThanOrEqualPredicate, + LessThenPredicate, LessThenOrEqualPredicate, + 'NotPredicate', 'AndPredicate', 'OrPredicate',] + + +class TwoBooleanOperands(ModelBase): + + def __init__( + self, + op1: PredicateType, + op2: PredicateType, + ): + super().__init__(locals()) + + +class NotPredicate(ModelBase): + """Represents the "not" logical operation.""" + _serialized_names = {'operand': 'not'} + + def __init__(self, operand: PredicateType): + super().__init__(locals()) + + +class AndPredicate(ModelBase): + """Represents the "and" logical operation.""" + _serialized_names = {'operands': 'and'} + + def __init__(self, operands: TwoBooleanOperands): + super().__init__(locals()) + + +class OrPredicate(ModelBase): + """Represents the "or" logical operation.""" + _serialized_names = {'operands': 'or'} + + def __init__(self, operands: TwoBooleanOperands): + super().__init__(locals()) + + +class RetryStrategySpec(ModelBase): + _serialized_names = { + 'max_retries': 'maxRetries', + } + + def __init__( + self, + max_retries: int, + ): + super().__init__(locals()) + + +class CachingStrategySpec(ModelBase): + _serialized_names = { + 'max_cache_staleness': 'maxCacheStaleness', + } + + def __init__( + self, + max_cache_staleness: Optional[ + str] = None, # RFC3339 compliant duration: P30DT1H22M3S + ): + super().__init__(locals()) + + +class ExecutionOptionsSpec(ModelBase): + _serialized_names = { + 'retry_strategy': 'retryStrategy', + 'caching_strategy': 'cachingStrategy', + } + + def __init__( + self, + retry_strategy: Optional[RetryStrategySpec] = None, + caching_strategy: Optional[CachingStrategySpec] = None, + ): + super().__init__(locals()) + + +class TaskSpec(ModelBase): + """Task specification. + + Task is a "configured" component - a component supplied with arguments and other applied configuration changes. + """ + _serialized_names = { + 'component_ref': 'componentRef', + 'is_enabled': 'isEnabled', + 'execution_options': 'executionOptions' + } + + def __init__( + self, + component_ref: ComponentReference, + arguments: Optional[Mapping[str, ArgumentType]] = None, + is_enabled: Optional[PredicateType] = None, + execution_options: Optional[ExecutionOptionsSpec] = None, + annotations: Optional[Dict[str, Any]] = None, + ): + super().__init__(locals()) + #TODO: If component_ref is resolved to component spec, then check that the arguments correspond to the inputs + + def _init_outputs(self): + #Adding output references to the task + if self.component_ref.spec is None: + return + task_outputs = OrderedDict() + for output in self.component_ref.spec.outputs or []: + task_output_ref = TaskOutputReference( + output_name=output.name, + task=self, + type=output. + type, # TODO: Resolve type expressions. E.g. type: {TypeOf: Input 1} + ) + task_output_arg = TaskOutputArgument(task_output=task_output_ref) + task_outputs[output.name] = task_output_arg + + self.outputs = task_outputs + if len(task_outputs) == 1: + self.output = list(task_outputs.values())[0] + + +class GraphSpec(ModelBase): + """Describes the graph component implementation. + + It represents a graph of component tasks connected to the upstream + sources of data using the argument specifications. It also describes + the sources of graph output values. + """ + _serialized_names = { + 'output_values': 'outputValues', + } + + def __init__( + self, + tasks: Mapping[str, TaskSpec], + output_values: Mapping[str, ArgumentType] = None, + ): + super().__init__(locals()) + self._post_init() + + def _post_init(self): + #Checking task output references and preparing the dependency table + task_dependencies = {} + for task_id, task in self.tasks.items(): + dependencies = set() + task_dependencies[task_id] = dependencies + if task.arguments is not None: + for argument in task.arguments.values(): + if isinstance(argument, TaskOutputArgument): + dependencies.add(argument.task_output.task_id) + if argument.task_output.task_id not in self.tasks: + raise TypeError( + 'Argument "{}" references non-existing task.' + .format(argument)) + + #Topologically sorting tasks to detect cycles + task_dependents = {k: set() for k in task_dependencies.keys()} + for task_id, dependencies in task_dependencies.items(): + for dependency in dependencies: + task_dependents[dependency].add(task_id) + task_number_of_remaining_dependencies = { + k: len(v) for k, v in task_dependencies.items() + } + sorted_tasks = OrderedDict() + + def process_task(task_id): + if task_number_of_remaining_dependencies[ + task_id] == 0 and task_id not in sorted_tasks: + sorted_tasks[task_id] = self.tasks[task_id] + for dependent_task in task_dependents[task_id]: + task_number_of_remaining_dependencies[ + dependent_task] = task_number_of_remaining_dependencies[ + dependent_task] - 1 + process_task(dependent_task) + + for task_id in task_dependencies.keys(): + process_task(task_id) + if len(sorted_tasks) != len(task_dependencies): + tasks_with_unsatisfied_dependencies = { + k: v + for k, v in task_number_of_remaining_dependencies.items() + if v > 0 + } + task_wth_minimal_number_of_unsatisfied_dependencies = min( + tasks_with_unsatisfied_dependencies.keys(), + key=lambda task_id: tasks_with_unsatisfied_dependencies[task_id] + ) + raise ValueError('Task "{}" has cyclical dependency.'.format( + task_wth_minimal_number_of_unsatisfied_dependencies)) + + self._toposorted_tasks = sorted_tasks + + +class GraphImplementation(ModelBase): + """Represents the graph component implementation.""" + + def __init__( + self, + graph: GraphSpec, + ): + super().__init__(locals()) + + +class PipelineRunSpec(ModelBase): + """The object that can be sent to the backend to start a new Run.""" + _serialized_names = { + 'root_task': 'rootTask', + #'on_exit_task': 'onExitTask', + } + + def __init__( + self, + root_task: TaskSpec, + #on_exit_task: Optional[TaskSpec] = None, + ): + super().__init__(locals())