From ec26f9e65e2783bf85805075aadb69677514ac88 Mon Sep 17 00:00:00 2001 From: Yaqi Date: Thu, 2 Sep 2021 10:33:41 -0700 Subject: [PATCH 1/2] feat(sdk): load v1 and v2 component spec --- sdk/RELEASE.md | 3 + .../components/experimental/component_spec.py | 157 ++++++++++++++---- .../experimental/component_spec_test.py | 133 +++++++++++++-- 3 files changed, 249 insertions(+), 44 deletions(-) diff --git a/sdk/RELEASE.md b/sdk/RELEASE.md index 5dcc42f81ca..487a6513e74 100644 --- a/sdk/RELEASE.md +++ b/sdk/RELEASE.md @@ -12,6 +12,9 @@ ## Bug Fixes and Other Changes +* Load v1 and v2 component yaml into v2 ComponentSpec and convert v1 component + spec to v2 component spec + ## Documentation Updates # 1.7.2 diff --git a/sdk/python/kfp/v2/components/experimental/component_spec.py b/sdk/python/kfp/v2/components/experimental/component_spec.py index 5556339a70a..f71c0a4e5ea 100644 --- a/sdk/python/kfp/v2/components/experimental/component_spec.py +++ b/sdk/python/kfp/v2/components/experimental/component_spec.py @@ -13,14 +13,15 @@ # limitations under the License. """Definitions for component spec.""" -from dataclasses import dataclass +import dataclasses +import enum +import json from typing import Any, Dict, Mapping, Optional, Sequence, Union + +from kfp.components import _components +from kfp.components import structures import pydantic import yaml -import json - -from kfp.components import _structures as v1_components -from kfp.components import _data_passing class InputSpec(pydantic.BaseModel): @@ -73,7 +74,7 @@ class OutputUriPlaceholder(BasePlaceholder): pass -@dataclass +@dataclasses.dataclass class ResourceSpec: """The resource requirements of a container execution. @@ -107,7 +108,7 @@ class ContainerSpec(pydantic.BaseModel): resources: Optional[ResourceSpec] = None -@dataclass +@dataclasses.dataclass class ImporterSpec: """ImporterSpec definition. @@ -124,7 +125,7 @@ class ImporterSpec: metadata: Optional[Mapping[str, Any]]= None -@dataclass +@dataclasses.dataclass class TaskSpec: """The spec of a pipeline task. @@ -155,7 +156,7 @@ class TaskSpec: iterator_item_input: Optional[str] = None -@dataclass +@dataclasses.dataclass class DagSpec: """DAG(graph) implementation definition. @@ -168,17 +169,24 @@ class DagSpec: outputs: Mapping[str, Any] +class SchemaVersion(str, enum.Enum): + V1 = 'v1' + V2 = 'v2' + + class ComponentSpec(pydantic.BaseModel): """The definition of a component. Attributes: name: The name of the component. - implementation: The implementation of the component. Either an executor (container, importer) or a DAG consists of other components. + implementation: The implementation of the component. Either an executor + (container, importer) or a DAG consists of other components. inputs: Optional; the input definitions of the component. outputs: Optional; the output definitions of the component. description: Optional; the description of the component. annotations: Optional; the annotations of the component as key-value pairs. labels: Optional; the labels of the component as key-value pairs. + schema_version: Internal field for tracking component version. """ name: str @@ -189,6 +197,7 @@ class ComponentSpec(pydantic.BaseModel): description: Optional[str] = None annotations: Optional[Mapping[str, str]] = None labels: Optional[Mapping[str, str]] = None + schema_version: SchemaVersion = SchemaVersion.V2 def _validate_placeholders( self, @@ -231,13 +240,81 @@ def _validate_placeholders( @classmethod def from_v1_component_spec( - cls, v1_component_spec: v1_components.ComponentSpec) -> 'ComponentSpec': - raise NotImplementedError + cls, v1_component_spec: structures.ComponentSpec) -> 'ComponentSpec': + """Converts V1 ComponentSpec to V2 ComponentSpec. + + Args: + v1_component_spec: The V1 ComponentSpec. + + Returns: + Component spec in the form of V2 ComponentSpec. + + Raises: + ValueError: If implementation is not found. + TypeError: if any argument is neither a str nor Dict. + """ + component_dict = v1_component_spec.to_dict() + if component_dict.get('implementation') is None: + raise ValueError('Implementation field not found') + if 'container' not in component_dict.get('implementation'): + raise NotImplementedError + + def _transform_arg( + arg: Union[str, Dict[str, str]]) -> Union[str, BasePlaceholder]: + if isinstance(arg, str): + return arg + elif 'inputValue' in arg: + return InputValuePlaceholder(name=arg['inputValue']) + elif 'inputPath' in arg: + return InputPathPlaceholder(name=arg['inputPath']) + elif 'inputUri' in arg: + return InputUriPlaceholder(name=arg['inputUri']) + elif 'outputPath' in arg: + return OutputPathPlaceholder(name=arg['outputPath']) + elif 'outputUri' in arg: + return OutputUriPlaceholder(name=arg['outputUri']) + else: + raise ValueError( + f'Unexpected command/argument type: "{arg}" of type "{type(arg)}".') - def to_v1_component_spec(self) -> v1_components.ComponentSpec: - """Convert to v1 ComponentSpec. + implementation = component_dict['implementation']['container'] + implementation['commands'] = [ + _transform_arg(command) + for command in implementation.pop('command', []) + ] + implementation['arguments'] = [ + _transform_arg(command) for command in implementation.pop('args', []) + ] + implementation['env'] = { + key: _transform_arg(command) + for key, command in implementation.pop('env', {}).items() + } + container_spec = ContainerSpec.parse_obj(implementation) + + return ComponentSpec( + name=component_dict.get('name', 'name'), + description=component_dict.get('description'), + implementation=container_spec, + inputs={ + spec['name']: InputSpec( + type=spec.get('type', 'Artifact'), + default=spec.get('default', None)) + for spec in component_dict.get('inputs', []) + }, + outputs={ + spec['name']: OutputSpec(type=spec.get('type', 'String')) + for spec in component_dict.get('outputs', []) + }, + schema_version=SchemaVersion.V1) + + def to_v1_component_spec(self) -> structures.ComponentSpec: + """Converts to v1 ComponentSpec. + + Returns: + Component spec in the form of V1 ComponentSpec. - Needed until downstream accept new ComponentSpec.""" + Needed until downstream accept new ComponentSpec. + """ if isinstance(self.implementation, DagSpec): raise NotImplementedError @@ -245,37 +322,37 @@ def _transform_arg(arg: Union[str, BasePlaceholder]) -> Any: if isinstance(arg, str): return arg elif isinstance(arg, InputValuePlaceholder): - return v1_components.InputValuePlaceholder(arg.name) + return structures.InputValuePlaceholder(arg.name) elif isinstance(arg, InputPathPlaceholder): - return v1_components.InputPathPlaceholder(arg.name) + return structures.InputPathPlaceholder(arg.name) elif isinstance(arg, InputUriPlaceholder): - return v1_components.InputUriPlaceholder(arg.name) + return structures.InputUriPlaceholder(arg.name) elif isinstance(arg, OutputPathPlaceholder): - return v1_components.OutputPathPlaceholder(arg.name) + return structures.OutputPathPlaceholder(arg.name) elif isinstance(arg, OutputUriPlaceholder): - return v1_components.OutputUriPlaceholder(arg.name) + return structures.OutputUriPlaceholder(arg.name) else: # TODO(chensun): transform additional placeholders: if, concat, etc.? raise ValueError( f'Unexpected command/argument type: "{arg}" of type "{type(arg)}".') - return v1_components.ComponentSpec( + return structures.ComponentSpec( name=self.name, inputs=[ - v1_components.InputSpec( + structures.InputSpec( name=name, type=input_spec.type, default=input_spec.default, ) for name, input_spec in self.inputs.items() ], outputs=[ - v1_components.OutputSpec( + structures.OutputSpec( name=name, type=output_spec.type, ) for name, output_spec in self.outputs.items() ], - implementation=v1_components.ContainerImplementation( - container=v1_components.ContainerSpec( + implementation=structures.ContainerImplementation( + container=structures.ContainerSpec( image=self.implementation.image, command=[ _transform_arg(cmd) @@ -294,10 +371,32 @@ def _transform_arg(arg: Union[str, BasePlaceholder]) -> Any: @classmethod def load_from_component_yaml(cls, component_yaml: str) -> 'ComponentSpec': - raise NotImplementedError + """Loads V1 or V2 component yaml into ComponentSpec. + + Args: + component_yaml: the component yaml in string format. + + Returns: + Component spec in the form of V2 ComponentSpec. + """ + + json_component = yaml.safe_load(component_yaml) + + if 'schema_version' in json_component and json_component[ + 'schema_version'] == SchemaVersion.V2: + return ComponentSpec.parse_obj(json_component) + + v1_component = _components._load_component_spec_from_component_text( + component_yaml) + return cls.from_v1_component_spec(v1_component) def save_to_component_yaml(self, output_file: str) -> None: + """Saves ComponentSpec into yaml file. + + Args: + output_file: File path to store the component yaml. + """ with open(output_file, 'a') as output_file: - json_component = self.json(exclude_unset=True, exclude_none=True) - yaml_file = yaml.safe_dump(json.loads(json_component)) - output_file.write(yaml_file) + json_component = self.json(exclude_none=True) + yaml_file = yaml.safe_dump(json.loads(json_component)) + output_file.write(yaml_file) diff --git a/sdk/python/kfp/v2/components/experimental/component_spec_test.py b/sdk/python/kfp/v2/components/experimental/component_spec_test.py index 9aa393f136e..07827b8b4b2 100644 --- a/sdk/python/kfp/v2/components/experimental/component_spec_test.py +++ b/sdk/python/kfp/v2/components/experimental/component_spec_test.py @@ -93,32 +93,135 @@ def test_component_spec_save_to_component_yaml(self): outputs: output1: type: String + schema_version: v2 """) with patch("builtins.open", open_mock, create=True): component_spec.ComponentSpec( - name='component_1', - implementation=component_spec.ContainerSpec( - image='alpine', - commands=[ - 'sh', - '-c', - 'set -ex\necho "$0" > "$1"', - component_spec.InputValuePlaceholder(name='input1'), - component_spec.OutputPathPlaceholder(name='output1'), - ], - ), - inputs={ + name='component_1', + implementation=component_spec.ContainerSpec( + image='alpine', + commands=[ + 'sh', + '-c', + 'set -ex\necho "$0" > "$1"', + component_spec.InputValuePlaceholder(name='input1'), + component_spec.OutputPathPlaceholder(name='output1'), + ], + ), + inputs={ 'input1': component_spec.InputSpec(type='String') - }, - outputs={ + }, + outputs={ 'output1': component_spec.OutputSpec(type='String') - }, + }, + schema_version=component_spec.SchemaVersion.V2, ).save_to_component_yaml('test_save_file.txt') open_mock.assert_called_with('test_save_file.txt', 'a') open_mock.return_value.write.assert_called_once_with(expected_yaml) + def test_component_spec_load_from_v2_component_yaml(self): + component_yaml_v2 = textwrap.dedent("""\ + implementation: + commands: + - sh + - -c + - 'set -ex + + echo "$0" > "$1"' + - name: input1 + - name: output1 + image: alpine + inputs: + input1: + type: String + name: component_1 + outputs: + output1: + type: String + schema_version: v2 + """) + + generated_spec = component_spec.ComponentSpec.load_from_component_yaml( + component_yaml_v2) + + expected_spec = component_spec.ComponentSpec( + name='component_1', + implementation=component_spec.ContainerSpec( + image='alpine', + commands=[ + 'sh', + '-c', + 'set -ex\necho "$0" > "$1"', + component_spec.InputValuePlaceholder(name='input1'), + component_spec.OutputPathPlaceholder(name='output1'), + ], + ), + inputs={'input1': component_spec.InputSpec(type='String')}, + outputs={'output1': component_spec.OutputSpec(type='String')}, + schema_version=component_spec.SchemaVersion.V2) + self.assertEqual(generated_spec, expected_spec) + + def test_component_spec_load_from_v1_component_yaml(self): + component_yaml_v1 = textwrap.dedent("""\ + name: Component with 2 inputs and 2 outputs + inputs: + - {name: Input parameter} + - {name: Input artifact} + outputs: + - {name: Output 1} + - {name: Output 2} + implementation: + container: + image: busybox + command: [sh, -c, ' + mkdir -p $(dirname "$2") + mkdir -p $(dirname "$3") + echo "$0" > "$2" + cp "$1" "$3" + ' + ] + args: + - {inputValue: Input parameter} + - {inputPath: Input artifact} + - {outputPath: Output 1} + - {outputPath: Output 2} + """) + + generated_spec = component_spec.ComponentSpec.load_from_component_yaml( + component_yaml_v1) + + expected_spec = component_spec.ComponentSpec( + name='Component with 2 inputs and 2 outputs', + implementation=component_spec.ContainerSpec( + image='busybox', + commands=[ + 'sh', + '-c', + (' mkdir -p $(dirname "$2") mkdir -p $(dirname "$3") ' + 'echo "$0" > "$2" cp "$1" "$3" '), + ], + arguments=[ + component_spec.InputValuePlaceholder(name='Input parameter'), + component_spec.InputPathPlaceholder(name='Input artifact'), + component_spec.OutputPathPlaceholder(name='Output 1'), + component_spec.OutputPathPlaceholder(name='Output 2'), + ], + env={}, + ), + inputs={ + 'Input parameter': component_spec.InputSpec(type='Artifact'), + 'Input artifact': component_spec.InputSpec(type='Artifact') + }, + outputs={ + 'Output 1': component_spec.OutputSpec(type='String'), + 'Output 2': component_spec.OutputSpec(type='String'), + }, + schema_version=component_spec.SchemaVersion.V1) + + self.assertEqual(generated_spec, expected_spec) + if __name__ == '__main__': unittest.main() From b8e11a9ab8966d2f96b289f9f6b9c38afa9bfc30 Mon Sep 17 00:00:00 2001 From: Yaqi Ji Date: Thu, 2 Sep 2021 10:35:38 -0700 Subject: [PATCH 2/2] Update RELEASE.md --- sdk/RELEASE.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/RELEASE.md b/sdk/RELEASE.md index 487a6513e74..ba70b7bde0b 100644 --- a/sdk/RELEASE.md +++ b/sdk/RELEASE.md @@ -13,7 +13,7 @@ ## Bug Fixes and Other Changes * Load v1 and v2 component yaml into v2 ComponentSpec and convert v1 component - spec to v2 component spec + spec to v2 component spec [\#6497](https://github.com/kubeflow/pipelines/issues/6497) ## Documentation Updates