From ec9d0b5aeee5b9b47867b637a788eb87e4269de9 Mon Sep 17 00:00:00 2001 From: connor-mccarthy Date: Thu, 28 Apr 2022 14:56:35 -0600 Subject: [PATCH] use basemodel for structures --- sdk/python/kfp/components/base_component.py | 2 +- sdk/python/kfp/components/pipeline_task.py | 7 +- sdk/python/kfp/components/structures.py | 598 ++++++++++--------- sdk/python/kfp/components/structures_test.py | 181 +++--- 4 files changed, 400 insertions(+), 388 deletions(-) diff --git a/sdk/python/kfp/components/base_component.py b/sdk/python/kfp/components/base_component.py index 8ccf38dc860b..a6491a530c5b 100644 --- a/sdk/python/kfp/components/base_component.py +++ b/sdk/python/kfp/components/base_component.py @@ -74,7 +74,7 @@ def __call__(self, *args, **kwargs) -> pipeline_task.PipelineTask: missing_arguments = [ input_name for input_name, input_spec in ( self.component_spec.inputs or {}).items() - if input_name not in task_inputs and not input_spec.optional and + if input_name not in task_inputs and not input_spec._optional and not type_utils.is_task_final_status_type(input_spec.type) ] if missing_arguments: diff --git a/sdk/python/kfp/components/pipeline_task.py b/sdk/python/kfp/components/pipeline_task.py index 916c0ddb1c6c..c1291b13a8d2 100644 --- a/sdk/python/kfp/components/pipeline_task.py +++ b/sdk/python/kfp/components/pipeline_task.py @@ -13,13 +13,13 @@ # limitations under the License. """Pipeline task class and operations.""" -import re import copy +import re from typing import Any, List, Mapping, Optional, Union from kfp.components import constants -from kfp.components import placeholders from kfp.components import pipeline_channel +from kfp.components import placeholders from kfp.components import structures from kfp.components.types import type_utils @@ -116,6 +116,7 @@ def __init__( self.container_spec = None if component_spec.implementation.container is not None: + self.container_spec = self._resolve_command_line_and_arguments( component_spec=component_spec, args=args, @@ -266,7 +267,7 @@ def expand_command_part(arg) -> Union[str, List[str], None]: return input_path else: input_spec = inputs_dict[input_name] - if input_spec.optional: + if input_spec._optional: return None else: raise ValueError( diff --git a/sdk/python/kfp/components/structures.py b/sdk/python/kfp/components/structures.py index 71743f9e93fa..290d104f978b 100644 --- a/sdk/python/kfp/components/structures.py +++ b/sdk/python/kfp/components/structures.py @@ -13,51 +13,52 @@ # limitations under the License. """Definitions for component spec.""" -import dataclasses +import ast +import functools import itertools -from typing import Any, Dict, Mapping, Optional, Sequence, Union +from typing import Any, Dict, List, Mapping, Optional, Union -import pydantic import yaml +from kfp.components import base_model from kfp.components import utils from kfp.components import v1_components from kfp.components import v1_structures from kfp.utils import ir_utils -class BaseModel(pydantic.BaseModel): +class InputSpec_(base_model.BaseModel): + """Component input definitions. (Inner class). - class Config: - allow_population_by_field_name = True - arbitrary_types_allowed = True + Attributes: + type: The type of the input. + default (optional): the default value for the input. + description: Optional: the user description of the input. + """ + type: Union[str, dict] + default: Union[Any, None] = None + description: Optional[str] = None -class InputSpec(BaseModel): +# Hack to allow access to __init__ arguments for setting _optional value +class InputSpec(InputSpec_, base_model.BaseModel): """Component input definitions. Attributes: type: The type of the input. - default: Optional; the default value for the input. + default (optional): the default value for the input. description: Optional: the user description of the input. - optional: Wether the input is optional. An input is optional when it has - an explicit default value. + _optional: Wether the input is optional. An input is optional when it has an explicit default value. """ - type: Union[str, dict] - default: Optional[Any] = None - description: Optional[str] = None - _optional: bool = pydantic.PrivateAttr() - def __init__(self, **data): - super().__init__(**data) - # An input is optional if a default value is explicitly specified. - self._optional = 'default' in data + @functools.wraps(InputSpec_.__init__) + def __init__(self, *args, **kwargs): + if args: + raise ValueError('InputSpec does not accept positional arguments.') + super().__init__(*args, **kwargs) + self._optional = 'default' in kwargs - @property - def optional(self) -> bool: - return self._optional - -class OutputSpec(BaseModel): +class OutputSpec(base_model.BaseModel): """Component output definitions. Attributes: @@ -68,55 +69,54 @@ class OutputSpec(BaseModel): description: Optional[str] = None -class BasePlaceholder(BaseModel): - """Base class for placeholders that could appear in container cmd and - args.""" - pass - - -class InputValuePlaceholder(BasePlaceholder): +class InputValuePlaceholder(base_model.BaseModel): """Class that holds input value for conditional cases. Attributes: input_name: name of the input. """ - input_name: str = pydantic.Field(alias='inputValue') + input_name: str + _aliases = {'input_name': 'inputValue'} -class InputPathPlaceholder(BasePlaceholder): +class InputPathPlaceholder(base_model.BaseModel): """Class that holds input path for conditional cases. Attributes: input_name: name of the input. """ - input_name: str = pydantic.Field(alias='inputPath') + input_name: str + _aliases = {'input_name': 'inputPath'} -class InputUriPlaceholder(BasePlaceholder): +class InputUriPlaceholder(base_model.BaseModel): """Class that holds input uri for conditional cases. Attributes: input_name: name of the input. """ - input_name: str = pydantic.Field(alias='inputUri') + input_name: str + _aliases = {'input_name': 'inputUri'} -class OutputPathPlaceholder(BasePlaceholder): +class OutputPathPlaceholder(base_model.BaseModel): """Class that holds output path for conditional cases. Attributes: output_name: name of the output. """ - output_name: str = pydantic.Field(alias='outputPath') + output_name: str + _aliases = {'output_name': 'outputPath'} -class OutputUriPlaceholder(BasePlaceholder): +class OutputUriPlaceholder(base_model.BaseModel): """Class that holds output uri for conditional cases. Attributes: output_name: name of the output. """ - output_name: str = pydantic.Field(alias='outputUri') + output_name: str + _aliases = {'output_name': 'outputUri'} ValidCommandArgs = Union[str, InputValuePlaceholder, InputPathPlaceholder, @@ -125,16 +125,17 @@ class OutputUriPlaceholder(BasePlaceholder): 'ConcatPlaceholder'] -class ConcatPlaceholder(BasePlaceholder): +class ConcatPlaceholder(base_model.BaseModel): """Class that extends basePlaceholders for concatenation. Attributes: items: string or ValidCommandArgs for concatenation. """ - items: Sequence[ValidCommandArgs] = pydantic.Field(alias='concat') + items: List[ValidCommandArgs] + _aliases = {'items': 'concat'} -class IfPresentPlaceholderStructure(BaseModel): +class IfPresentPlaceholderStructure(base_model.BaseModel): """Class that holds structure for conditional cases. Attributes: @@ -146,43 +147,35 @@ class IfPresentPlaceholderStructure(BaseModel): the command-line argument will be replaced at run-time by the expanded value of otherwise. """ - input_name: str = pydantic.Field(alias='inputName') - then: Sequence[ValidCommandArgs] - otherwise: Optional[Sequence[ValidCommandArgs]] = pydantic.Field( - None, alias='else') + input_name: str + then: List[ValidCommandArgs] + otherwise: Optional[List[ValidCommandArgs]] = None + _aliases = {'input_name': 'inputName', 'otherwise': 'else'} - @pydantic.validator('otherwise', allow_reuse=True) - def empty_otherwise_sequence(cls, v): - if v == []: - return None - return v + def transform_otherwise(self) -> None: + """Use None instead of empty list for optional.""" + self.otherwise = None if self.otherwise == [] else self.otherwise -class IfPresentPlaceholder(BasePlaceholder): +class IfPresentPlaceholder(base_model.BaseModel): """Class that extends basePlaceholders for conditional cases. Attributes: if_present (ifPresent): holds structure for conditional cases. """ - if_structure: IfPresentPlaceholderStructure = pydantic.Field( - alias='ifPresent') - - -IfPresentPlaceholderStructure.update_forward_refs() -IfPresentPlaceholder.update_forward_refs() -ConcatPlaceholder.update_forward_refs() + if_structure: IfPresentPlaceholderStructure + _aliases = {'if_structure': 'ifPresent'} -@dataclasses.dataclass -class ResourceSpec: +class ResourceSpec(base_model.BaseModel): """The resource requirements of a container execution. Attributes: - cpu_limit: Optional; the limit of the number of vCPU cores. - memory_limit: Optional; the memory limit in GB. - accelerator_type: Optional; the type of accelerators attached to the + cpu_limit (optional): the limit of the number of vCPU cores. + memory_limit (optional): the memory limit in GB. + accelerator_type (optional): the type of accelerators attached to the container. - accelerator_count: Optional; the number of accelerators attached. + accelerator_count (optional): the number of accelerators attached. """ cpu_limit: Optional[float] = None memory_limit: Optional[float] = None @@ -190,36 +183,36 @@ class ResourceSpec: accelerator_count: Optional[int] = None -class ContainerSpec(BaseModel): +class ContainerSpec(base_model.BaseModel): """Container implementation definition. Attributes: image: The container image. - command: Optional; the container entrypoint. - args: Optional; the arguments to the container entrypoint. - env: Optional; the environment variables to be passed to the container. - resources: Optional; the specification on the resource requirements. + command (optional): the container entrypoint. + args (optional): the arguments to the container entrypoint. + env (optional): the environment variables to be passed to the container. + resources (optional): the specification on the resource requirements. """ image: str - command: Optional[Sequence[ValidCommandArgs]] = None - args: Optional[Sequence[ValidCommandArgs]] = None + command: Optional[List[ValidCommandArgs]] = None + args: Optional[List[ValidCommandArgs]] = None env: Optional[Mapping[str, ValidCommandArgs]] = None resources: Optional[ResourceSpec] = None - @pydantic.validator('command', 'args', allow_reuse=True) - def empty_sequence(cls, v): - if v == []: - return None - return v + def transform_command(self) -> None: + """Use None instead of empty list for command.""" + self.command = None if self.command == [] else self.command - @pydantic.validator('env', allow_reuse=True) - def empty_map(cls, v): - if v == {}: - return None - return v + def transform_args(self) -> None: + """Use None instead of empty list for args.""" + self.args = None if self.args == [] else self.args + def transform_env(self) -> None: + """Use None instead of empty dict for env.""" + self.env = None if self.env == {} else self.env -class TaskSpec(BaseModel): + +class TaskSpec(base_model.BaseModel): """The spec of a pipeline task. Attributes: @@ -227,23 +220,23 @@ class TaskSpec(BaseModel): inputs: The sources of task inputs. Constant values or PipelineParams. dependent_tasks: The list of upstream tasks. component_ref: The name of a component spec this task is based on. - trigger_condition: Optional; an expression which will be evaluated into + trigger_condition (optional): an expression which will be evaluated into a boolean value. True to trigger the task to run. - trigger_strategy: Optional; when the task will be ready to be triggered. + trigger_strategy (optional): when the task will be ready to be triggered. Valid values include: "TRIGGER_STRATEGY_UNSPECIFIED", "ALL_UPSTREAM_TASKS_SUCCEEDED", and "ALL_UPSTREAM_TASKS_COMPLETED". - iterator_items: Optional; the items to iterate on. A constant value or + iterator_items (optional): the items to iterate on. A constant value or a PipelineParam. - iterator_item_input: Optional; the name of the input which has the item + iterator_item_input (optional): the name of the input which has the item from the [items][] collection. - enable_caching: Optional; whether or not to enable caching for the task. + enable_caching (optional): whether or not to enable caching for the task. Default is True. - display_name: Optional; the display name of the task. If not specified, + display_name (optional): the display name of the task. If not specified, the task name will be used as the display name. """ name: str inputs: Mapping[str, Any] - dependent_tasks: Sequence[str] + dependent_tasks: List[str] component_ref: str trigger_condition: Optional[str] = None trigger_strategy: Optional[str] = None @@ -253,7 +246,7 @@ class TaskSpec(BaseModel): display_name: Optional[str] = None -class DagSpec(BaseModel): +class DagSpec(base_model.BaseModel): """DAG(graph) implementation definition. Attributes: @@ -261,11 +254,10 @@ class DagSpec(BaseModel): outputs: Defines how the outputs of the dag are linked to the sub tasks. """ tasks: Mapping[str, TaskSpec] - # TODO(chensun): revisit if we need a DagOutputsSpec class. outputs: Mapping[str, Any] -class ImporterSpec(BaseModel): +class ImporterSpec(base_model.BaseModel): """ImporterSpec definition. Attributes: @@ -273,7 +265,7 @@ class ImporterSpec(BaseModel): type_schema: The type of the artifact. reimport: Whether or not import an artifact regardless it has been imported before. - metadata: Optional; the properties of the artifact. + metadata (optional): the properties of the artifact. """ artifact_uri: str type_schema: str @@ -281,7 +273,7 @@ class ImporterSpec(BaseModel): metadata: Optional[Mapping[str, Any]] = None -class Implementation(BaseModel): +class Implementation(base_model.BaseModel): """Implementation definition. Attributes: @@ -294,97 +286,211 @@ class Implementation(BaseModel): importer: Optional[ImporterSpec] = None -class ComponentSpec(BaseModel): +def try_to_get_dict_from_string(element: str) -> Union[dict, str]: + try: + res = ast.literal_eval(element) + except (ValueError, SyntaxError): + return element + + if not isinstance(res, dict): + return element + return res + + +def convert_str_or_dict_to_placeholder( + element: Union[str, dict, + ValidCommandArgs]) -> Union[str, ValidCommandArgs]: + """Converts command and args elements to a placholder type based on value + of the key of the placeholder string, else returns the input. + + Args: + element (Union[str, dict, ValidCommandArgs]): A ContainerSpec.command or ContainerSpec.args element. + + Raises: + TypeError: If `element` is invalid. + + Returns: + Union[str, ValidCommandArgs]: Possibly converted placeholder or original input. + """ + + if not isinstance(element, (dict, str)): + return element + + elif isinstance(element, str): + res = try_to_get_dict_from_string(element) + if not isinstance(res, dict): + return element + + elif isinstance(element, dict): + res = element + else: + raise TypeError( + f'Invalid type for arg: {type(element)}. Expected str or dict.') + + has_one_entry = len(res) == 1 + + if not has_one_entry: + raise ValueError( + f"Got unexpected dictionary {res}. Expected a dictionary with one entry." + ) + + first_key = list(res.keys())[0] + first_value = list(res.values())[0] + if first_key == 'inputValue': + return InputValuePlaceholder( + input_name=utils.sanitize_input_name(first_value)) + + elif first_key == 'inputPath': + return InputPathPlaceholder( + input_name=utils.sanitize_input_name(first_value)) + + elif first_key == 'inputUri': + return InputUriPlaceholder( + input_name=utils.sanitize_input_name(first_value)) + + elif first_key == 'outputPath': + return OutputPathPlaceholder( + output_name=utils.sanitize_input_name(first_value)) + + elif first_key == 'outputUri': + return OutputUriPlaceholder( + output_name=utils.sanitize_input_name(first_value)) + + elif first_key == 'ifPresent': + structure_kwargs = res['ifPresent'] + structure_kwargs['input_name'] = structure_kwargs.pop('inputName') + structure_kwargs['otherwise'] = structure_kwargs.pop('else') + structure_kwargs['then'] = [ + convert_str_or_dict_to_placeholder(e) + for e in structure_kwargs['then'] + ] + structure_kwargs['otherwise'] = [ + convert_str_or_dict_to_placeholder(e) + for e in structure_kwargs['otherwise'] + ] + if_structure = IfPresentPlaceholderStructure(**structure_kwargs) + + return IfPresentPlaceholder(if_structure=if_structure) + + elif first_key == 'concat': + return ConcatPlaceholder(items=[ + convert_str_or_dict_to_placeholder(e) for e in res['concat'] + ]) + + else: + raise TypeError( + f'Unexpected command/argument type: "{element}" of type "{type(element)}".' + ) + + +def _check_valid_placeholder_reference(valid_inputs: List[str], + valid_outputs: List[str], + placeholder: ValidCommandArgs) -> None: + """Validates input/output placeholders refer to an existing input/output. + + Args: + valid_inputs: The existing input names. + valid_outputs: The existing output names. + arg: The placeholder argument for checking. + + Raises: + ValueError: if any placeholder references a non-existing input or + output. + TypeError: if any argument is neither a str nor a placeholder + instance. + """ + if isinstance( + placeholder, + (InputValuePlaceholder, InputPathPlaceholder, InputUriPlaceholder)): + if placeholder.input_name not in valid_inputs: + raise ValueError( + f'Argument "{placeholder}" references non-existing input.') + elif isinstance(placeholder, (OutputPathPlaceholder, OutputUriPlaceholder)): + if placeholder.output_name not in valid_outputs: + raise ValueError( + f'Argument "{placeholder}" references non-existing output.') + elif isinstance(placeholder, IfPresentPlaceholder): + if placeholder.if_structure.input_name not in valid_inputs: + raise ValueError( + f'Argument "{placeholder}" references non-existing input.') + for placeholder in itertools.chain( + placeholder.if_structure.then or [], + placeholder.if_structure.otherwise or []): + _check_valid_placeholder_reference(valid_inputs, valid_outputs, + placeholder) + elif isinstance(placeholder, ConcatPlaceholder): + for placeholder in placeholder.items: + _check_valid_placeholder_reference(valid_inputs, valid_outputs, + placeholder) + elif not isinstance(placeholder, str): + raise TypeError( + f'Unexpected argument "{placeholder}" of type {type(placeholder)}.') + + +ValidCommandArgTypes = (str, InputValuePlaceholder, InputPathPlaceholder, + InputUriPlaceholder, OutputPathPlaceholder, + OutputUriPlaceholder, IfPresentPlaceholder, + ConcatPlaceholder) + + +class ComponentSpec(base_model.BaseModel): """The definition of a component. Attributes: name: The name of the component. - description: Optional; the description of the component. - inputs: Optional; the input definitions of the component. - outputs: Optional; the output definitions of the component. + description (optional): the description of the component. + inputs (optional): the input definitions of the component. + outputs (optional): the output definitions of the component. implementation: The implementation of the component. Either an executor (container, importer) or a DAG consists of other components. """ name: str + implementation: Implementation description: Optional[str] = None inputs: Optional[Dict[str, InputSpec]] = None outputs: Optional[Dict[str, OutputSpec]] = None - implementation: Implementation - @pydantic.validator('inputs', 'outputs', allow_reuse=True) - def empty_map(cls, v): - if v == {}: - return None - return v + def transform_inputs(self) -> None: + """Use None instead of empty list for inputs.""" + self.inputs = None if self.inputs == {} else self.inputs - @pydantic.root_validator(allow_reuse=True) - def validate_placeholders(cls, values): - if values.get('implementation').container is None: - return values - containerSpec: ContainerSpec = values.get('implementation').container + def transform_outputs(self) -> None: + """Use None instead of empty list for outputs.""" + self.outputs = None if self.outputs == {} else self.outputs - try: - valid_inputs = values.get('inputs').keys() - except AttributeError: - valid_inputs = [] + def transform_command_input_placeholders(self) -> None: + """Converts command and args elements to a placholder type where + applicable.""" + if self.implementation.container is not None: - try: - valid_outputs = values.get('outputs').keys() - except AttributeError: - valid_outputs = [] + if self.implementation.container.command is not None: + self.implementation.container.command = [ + convert_str_or_dict_to_placeholder(e) + for e in self.implementation.container.command + ] - for arg in itertools.chain((containerSpec.command or []), - (containerSpec.args or [])): - cls._check_valid_placeholder_reference(valid_inputs, valid_outputs, - arg) + if self.implementation.container.args is not None: + self.implementation.container.args = [ + convert_str_or_dict_to_placeholder(e) + for e in self.implementation.container.args + ] - return values + def validate_placeholders(self): + """Validates that input/output placeholders refer to an existing + input/output.""" + implementation = self.implementation + if getattr(implementation, 'container', None) is None: + return - @classmethod - def _check_valid_placeholder_reference(cls, valid_inputs: Sequence[str], - valid_outputs: Sequence[str], - arg: ValidCommandArgs) -> None: - """Validates placeholder reference existing input/output names. + containerSpec: ContainerSpec = implementation.container - Args: - valid_inputs: The existing input names. - valid_outputs: The existing output names. - arg: The placeholder argument for checking. + valid_inputs = [] if self.inputs is None else list(self.inputs.keys()) + valid_outputs = [] if self.outputs is None else list( + self.outputs.keys()) - Raises: - ValueError: if any placeholder references a non-existing input or - output. - TypeError: if any argument is neither a str nor a placeholder - instance. - """ - - if isinstance( - arg, - (InputValuePlaceholder, InputPathPlaceholder, InputUriPlaceholder)): - if arg.input_name not in valid_inputs: - raise ValueError( - f'Argument "{arg}" references non-existing input.') - elif isinstance(arg, (OutputPathPlaceholder, OutputUriPlaceholder)): - if arg.output_name not in valid_outputs: - raise ValueError( - f'Argument "{arg}" references non-existing output.') - elif isinstance(arg, IfPresentPlaceholder): - if arg.if_structure.input_name not in valid_inputs: - raise ValueError( - f'Argument "{arg}" references non-existing input.') - for placeholder in itertools.chain(arg.if_structure.then or [], - arg.if_structure.otherwise or - []): - cls._check_valid_placeholder_reference(valid_inputs, - valid_outputs, - placeholder) - elif isinstance(arg, ConcatPlaceholder): - for placeholder in arg.items: - cls._check_valid_placeholder_reference(valid_inputs, - valid_outputs, - placeholder) - elif not isinstance(arg, str): - raise TypeError(f'Unexpected argument "{arg}".') + for arg in itertools.chain((containerSpec.command or []), + (containerSpec.args or [])): + _check_valid_placeholder_reference(valid_inputs, valid_outputs, arg) @classmethod def from_v1_component_spec( @@ -405,27 +511,18 @@ def from_v1_component_spec( component_dict = v1_component_spec.to_dict() if component_dict.get('implementation') is None: raise ValueError('Implementation field not found') - if 'container' not in component_dict.get('implementation'): + + if 'container' not in component_dict.get( + 'implementation'): # type: ignore raise NotImplementedError - def _transform_arg(arg: Union[str, Dict[str, str]]) -> ValidCommandArgs: + def convert_v1_if_present_placholder_to_v2( + arg: Dict[str, Any]) -> Union[Dict[str, Any], ValidCommandArgs]: if isinstance(arg, str): + arg = try_to_get_dict_from_string(arg) + if not isinstance(arg, dict): return arg - if 'inputValue' in arg: - return InputValuePlaceholder( - input_name=utils.sanitize_input_name(arg['inputValue'])) - if 'inputPath' in arg: - return InputPathPlaceholder( - input_name=utils.sanitize_input_name(arg['inputPath'])) - if 'inputUri' in arg: - return InputUriPlaceholder( - input_name=utils.sanitize_input_name(arg['inputUri'])) - if 'outputPath' in arg: - return OutputPathPlaceholder( - output_name=utils.sanitize_input_name(arg['outputPath'])) - if 'outputUri' in arg: - return OutputUriPlaceholder( - output_name=utils.sanitize_input_name(arg['outputUri'])) + if 'if' in arg: if_placeholder_values = arg['if'] if_placeholder_values_then = list(if_placeholder_values['then']) @@ -434,62 +531,53 @@ def _transform_arg(arg: Union[str, Dict[str, str]]) -> ValidCommandArgs: if_placeholder_values['else']) except KeyError: if_placeholder_values_else = [] - - IfPresentPlaceholderStructure.update_forward_refs() return IfPresentPlaceholder( if_structure=IfPresentPlaceholderStructure( input_name=utils.sanitize_input_name( if_placeholder_values['cond']['isPresent']), - then=list( - _transform_arg(val) - for val in if_placeholder_values_then), - otherwise=list( - _transform_arg(val) - for val in if_placeholder_values_else))) - if 'concat' in arg: - ConcatPlaceholder.update_forward_refs() - - return ConcatPlaceholder( - concat=list(_transform_arg(val) for val in arg['concat'])) - raise ValueError( - f'Unexpected command/argument type: "{arg}" of type "{type(arg)}".' - ) + then=[ + convert_str_or_dict_to_placeholder(val) + for val in if_placeholder_values_then + ], + otherwise=[ + convert_str_or_dict_to_placeholder(val) + for val in if_placeholder_values_else + ])) + + elif 'concat' in arg: + + return ConcatPlaceholder(items=[ + convert_str_or_dict_to_placeholder(val) + for val in arg['concat'] + ]) + elif isinstance(arg, (ValidCommandArgTypes, dict)): + return arg + else: + raise TypeError( + f"Unexpected argument {arg} of type {type(arg)}.") implementation = component_dict['implementation']['container'] implementation['command'] = [ - _transform_arg(command) + convert_v1_if_present_placholder_to_v2(command) for command in implementation.pop('command', []) ] implementation['args'] = [ - _transform_arg(command) + convert_v1_if_present_placholder_to_v2(command) for command in implementation.pop('args', []) ] implementation['env'] = { - key: _transform_arg(command) + key: convert_v1_if_present_placholder_to_v2(command) for key, command in implementation.pop('env', {}).items() } - container_spec = ContainerSpec(image=implementation['image']) - # Workaround for https://github.com/samuelcolvin/pydantic/issues/2079 - def _copy_model(obj): - if isinstance(obj, BaseModel): - return obj.copy(deep=True) - return obj - # Must assign these after the constructor call, otherwise it won't work. if implementation['command']: - container_spec.command = [ - _copy_model(cmd) for cmd in implementation['command'] - ] + container_spec.command = implementation['command'] if implementation['args']: - container_spec.args = [ - _copy_model(arg) for arg in implementation['args'] - ] + container_spec.args = implementation['args'] if implementation['env']: - container_spec.env = { - k: _copy_model(v) for k, v in implementation['env'] - } + container_spec.env = implementation['env'] return ComponentSpec( name=component_dict.get('name', 'name'), @@ -507,69 +595,6 @@ def _copy_model(obj): for spec in component_dict.get('outputs', []) }) - def to_v1_component_spec(self) -> v1_structures.ComponentSpec: - """Converts to v1 ComponentSpec. - - Returns: - Component spec in the form of V1 ComponentSpec. - - Needed until downstream accept new ComponentSpec. - """ - - def _transform_arg(arg: ValidCommandArgs) -> Any: - if isinstance(arg, str): - return arg - if isinstance(arg, InputValuePlaceholder): - return v1_structures.InputValuePlaceholder(arg.input_name) - if isinstance(arg, InputPathPlaceholder): - return v1_structures.InputPathPlaceholder(arg.input_name) - if isinstance(arg, InputUriPlaceholder): - return v1_structures.InputUriPlaceholder(arg.input_name) - if isinstance(arg, OutputPathPlaceholder): - return v1_structures.OutputPathPlaceholder(arg.output_name) - if isinstance(arg, OutputUriPlaceholder): - return v1_structures.OutputUriPlaceholder(arg.output_name) - if isinstance(arg, IfPresentPlaceholder): - return v1_structures.IfPlaceholder(arg.if_structure) - if isinstance(arg, ConcatPlaceholder): - return v1_structures.ConcatPlaceholder(arg.concat) - raise ValueError( - f'Unexpected command/argument type: "{arg}" of type "{type(arg)}".' - ) - - return v1_structures.ComponentSpec( - name=self.name, - inputs=[ - v1_structures.InputSpec( - name=name, - type=input_spec.type, - default=input_spec.default, - ) for name, input_spec in self.inputs.items() - ], - outputs=[ - v1_structures.OutputSpec( - name=name, - type=output_spec.type, - ) for name, output_spec in self.outputs.items() - ], - implementation=v1_structures.ContainerImplementation( - container=v1_structures.ContainerSpec( - image=self.implementation.container.image, - command=[ - _transform_arg(cmd) - for cmd in self.implementation.container.command or [] - ], - args=[ - _transform_arg(arg) - for arg in self.implementation.container.args or [] - ], - env={ - name: _transform_arg(value) for name, value in - self.implementation.container.env or {} - }, - )), - ) - @classmethod def load_from_component_yaml(cls, component_yaml: str) -> 'ComponentSpec': """Loads V1 or V2 component yaml into ComponentSpec. @@ -580,19 +605,18 @@ def load_from_component_yaml(cls, component_yaml: str) -> 'ComponentSpec': Returns: Component spec in the form of V2 ComponentSpec. """ - json_component = yaml.safe_load(component_yaml) try: - return ComponentSpec.parse_obj(json_component) - except (pydantic.ValidationError, AttributeError): + return ComponentSpec.from_dict(json_component, by_alias=True) + except AttributeError: v1_component = v1_components._load_component_spec_from_component_text( component_yaml) return cls.from_v1_component_spec(v1_component) def save_to_component_yaml(self, output_file: str) -> None: - """Saves ComponentSpec into yaml file. + """Saves ComponentSpec into YAML file. Args: output_file: File path to store the component yaml. """ - ir_utils._write_ir_to_file(self.dict(), output_file) + ir_utils._write_ir_to_file(self.to_dict(by_alias=True), output_file) diff --git a/sdk/python/kfp/components/structures_test.py b/sdk/python/kfp/components/structures_test.py index d2ae378b9b2e..405d3b0b56f7 100644 --- a/sdk/python/kfp/components/structures_test.py +++ b/sdk/python/kfp/components/structures_test.py @@ -13,11 +13,11 @@ # limitations under the License. """Tests for kfp.components.structures.""" +import os +import tempfile import textwrap import unittest -from unittest import mock -import pydantic from absl.testing import parameterized from kfp.components import structures @@ -33,7 +33,7 @@ - default then: - --arg1 - - {inputValue: optional_input_1} + - {inputUri: optional_input_1} image: alpine inputs: - {name: optional_input_1, optional: true, type: String} @@ -49,7 +49,7 @@ inputName: optional_input_1 then: - --arg1 - - {inputValue: optional_input_1} + - {inputUri: optional_input_1} image: alpine inputs: optional_input_1: {default: null, type: String} @@ -67,7 +67,7 @@ input_name='optional_input_1', then=[ '--arg1', - structures.InputValuePlaceholder( + structures.InputUriPlaceholder( input_name='optional_input_1'), ], otherwise=[ @@ -110,7 +110,7 @@ container=structures.ContainerSpec( image='alpine', args=[ - structures.ConcatPlaceholder(concat=[ + structures.ConcatPlaceholder(items=[ '--arg1', structures.InputValuePlaceholder(input_name='input_prefix'), ]) @@ -147,7 +147,7 @@ container=structures.ContainerSpec( image='alpine', args=[ - structures.ConcatPlaceholder(concat=[ + structures.ConcatPlaceholder(items=[ '--arg1', structures.IfPresentPlaceholder( if_structure=structures.IfPresentPlaceholderStructure( @@ -160,7 +160,7 @@ otherwise=[ '--arg2', 'default', - structures.ConcatPlaceholder(concat=[ + structures.ConcatPlaceholder(items=[ '--arg1', structures.InputValuePlaceholder( input_name='input_prefix'), @@ -177,8 +177,9 @@ class StructuresTest(parameterized.TestCase): def test_component_spec_with_placeholder_referencing_nonexisting_input_output( self): with self.assertRaisesRegex( - pydantic.ValidationError, 'Argument "input_name=\'input000\'" ' - 'references non-existing input.'): + ValueError, + r'^Argument \"InputValuePlaceholder[\s\S]*\'input000\'[\s\S]*references non-existing input.' + ): structures.ComponentSpec( name='component_1', implementation=structures.Implementation( @@ -199,9 +200,9 @@ def test_component_spec_with_placeholder_referencing_nonexisting_input_output( ) with self.assertRaisesRegex( - pydantic.ValidationError, - 'Argument "output_name=\'output000\'" ' - 'references non-existing output.'): + ValueError, + r'^Argument \"OutputPathPlaceholder[\s\S]*\'output000\'[\s\S]*references non-existing output.' + ): structures.ComponentSpec( name='component_1', implementation=structures.Implementation( @@ -222,28 +223,10 @@ def test_component_spec_with_placeholder_referencing_nonexisting_input_output( ) def test_simple_component_spec_save_to_component_yaml(self): - open_mock = mock.mock_open() - expected_yaml = textwrap.dedent("""\ - implementation: - container: - command: - - sh - - -c - - 'set -ex - - echo "$0" > "$1"' - - {inputValue: input1} - - {outputPath: output1} - image: alpine - inputs: - input1: {type: String} - name: component_1 - outputs: - output1: {type: String} - """) - - with mock.patch("builtins.open", open_mock, create=True): - structures.ComponentSpec( + # tests writing old style (less verbose) and reading in new style (more verbose) + with tempfile.TemporaryDirectory() as tempdir: + output_path = os.path.join(tempdir, 'component.yaml') + original_component_spec = structures.ComponentSpec( name='component_1', implementation=structures.Implementation( container=structures.ContainerSpec( @@ -258,64 +241,17 @@ def test_simple_component_spec_save_to_component_yaml(self): output_name='output1'), ], )), - inputs={ - 'input1': structures.InputSpec(type='String') - }, - outputs={ - 'output1': structures.OutputSpec(type='String') - }, - ).save_to_component_yaml('test_save_file.yaml') - - open_mock.assert_called_once_with('test_save_file.yaml', 'w') - - def test_simple_component_spec_save_to_component_yaml(self): - open_mock = mock.mock_open() - expected_yaml = textwrap.dedent("""\ - implementation: - container: - command: - - sh - - -c - - 'set -ex - - echo "$0" > "$1"' - - {inputValue: input1} - - {outputPath: output1} - image: alpine - inputs: - input1: {type: String} - name: component_1 - outputs: - output1: {type: String} - """) + inputs={'input1': structures.InputSpec(type='String')}, + outputs={'output1': structures.OutputSpec(type='String')}, + ) + original_component_spec.save_to_component_yaml(output_path) - with mock.patch( - "builtins.open", open_mock, create=True), self.assertWarnsRegex( - DeprecationWarning, r"Compiling to JSON is deprecated"): - structures.ComponentSpec( - name='component_1', - implementation=structures.Implementation( - container=structures.ContainerSpec( - image='alpine', - command=[ - 'sh', - '-c', - 'set -ex\necho "$0" > "$1"', - structures.InputValuePlaceholder( - input_name='input1'), - structures.OutputPathPlaceholder( - output_name='output1'), - ], - )), - inputs={ - 'input1': structures.InputSpec(type='String') - }, - outputs={ - 'output1': structures.OutputSpec(type='String') - }, - ).save_to_component_yaml('test_save_file.json') + # test that it can be read back correctly + with open(output_path, 'r') as f: + new_component_spec = structures.ComponentSpec.load_from_component_yaml( + f.read()) - open_mock.assert_called_once_with('test_save_file.json', 'w') + self.assertEqual(original_component_spec, new_component_spec) @parameterized.parameters( { @@ -333,12 +269,16 @@ def test_simple_component_spec_save_to_component_yaml(self): ) def test_component_spec_placeholder_save_to_component_yaml( self, expected_yaml, component): - open_mock = mock.mock_open() - - with mock.patch("builtins.open", open_mock, create=True): - component.save_to_component_yaml('test_save_file.yaml') + with tempfile.TemporaryDirectory() as tempdir: + output_path = os.path.join(tempdir, 'component.yaml') + component.save_to_component_yaml(output_path) + with open(output_path, 'r') as f: + contents = f.read() - open_mock.assert_called_once_with('test_save_file.yaml', 'w') + # test that what was written can be reloaded correctly + new_component_spec = structures.ComponentSpec.load_from_component_yaml( + contents) + self.assertEqual(new_component_spec, component) def test_simple_component_spec_load_from_v2_component_yaml(self): component_yaml_v2 = textwrap.dedent("""\ @@ -469,9 +409,56 @@ def test_component_spec_load_from_v1_component_yaml(self): 'output_1': structures.OutputSpec(type='Artifact'), 'output_2': structures.OutputSpec(type='Artifact'), }) - self.assertEqual(generated_spec, expected_spec) +class TestValidators(unittest.TestCase): + + def test_IfPresentPlaceholderStructure_otherwise(self): + obj = structures.IfPresentPlaceholderStructure( + then='then', input_name='input_name', otherwise=['something']) + self.assertEqual(obj.otherwise, ['something']) + + obj = structures.IfPresentPlaceholderStructure( + then='then', input_name='input_name', otherwise=[]) + self.assertEqual(obj.otherwise, None) + + def test_ContainerSpec_command_and_args(self): + obj = structures.ContainerSpec( + image='image', command=['command'], args=['args']) + self.assertEqual(obj.command, ['command']) + self.assertEqual(obj.args, ['args']) + + obj = structures.ContainerSpec(image='image', command=[], args=[]) + self.assertEqual(obj.command, None) + self.assertEqual(obj.args, None) + + def test_ContainerSpec_env(self): + obj = structures.ContainerSpec( + image='image', + command=['command'], + args=['args'], + env={'env': 'env'}) + self.assertEqual(obj.env, {'env': 'env'}) + + obj = structures.ContainerSpec( + image='image', command=[], args=[], env={}) + self.assertEqual(obj.env, None) + + def test_ComponentSpec_inputs(self): + obj = structures.ComponentSpec( + name='name', + implementation=structures.Implementation(container=None), + inputs={}) + self.assertEqual(obj.inputs, None) + + def test_ComponentSpec_outputs(self): + obj = structures.ComponentSpec( + name='name', + implementation=structures.Implementation(container=None), + outputs={}) + self.assertEqual(obj.outputs, None) + + if __name__ == '__main__': unittest.main()