diff --git a/samples/test/config.yaml b/samples/test/config.yaml index 1c6cce44acc..fe273050eef 100644 --- a/samples/test/config.yaml +++ b/samples/test/config.yaml @@ -1,4 +1,4 @@ -# Copyright 2021 The Kubeflow Authors +# Copyright 2021-2022 The Kubeflow Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -122,6 +122,8 @@ path: samples.v2.producer_consumer_param_test - name: pipeline_with_importer path: samples.v2.pipeline_with_importer_test +- name: pipeline_container_no_input + path: samples.v2.pipeline_container_no_input_test # TODO(capri-xiyue): Re-enable after figuring out V2 Engine # and protobuf.Value support. # - name: cache_v2 diff --git a/samples/v2/pipeline_container_no_input.py b/samples/v2/pipeline_container_no_input.py new file mode 100644 index 00000000000..d9e7c24c5c5 --- /dev/null +++ b/samples/v2/pipeline_container_no_input.py @@ -0,0 +1,38 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os + +from kfp import compiler +from kfp import dsl + + +@dsl.container_component +def container_no_input(): + return dsl.ContainerSpec( + image='python:3.7', + command=['echo', 'hello world'], + args=[], + ) + + +@dsl.pipeline(name='v2-container-component-no-input') +def pipeline_container_no_input(): + container_no_input() + + +if __name__ == '__main__': + # execute only if run as a script + compiler.Compiler().compile( + pipeline_func=pipeline_container_no_input, + package_path='pipeline_container_no_input.yaml') diff --git a/samples/v2/pipeline_container_no_input_test.py b/samples/v2/pipeline_container_no_input_test.py new file mode 100644 index 00000000000..f77572e53fe --- /dev/null +++ b/samples/v2/pipeline_container_no_input_test.py @@ -0,0 +1,52 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Pipeline container no input v2 engine pipeline.""" + +from __future__ import annotations + +import unittest + +import kfp.deprecated as kfp +import kfp_server_api +from ml_metadata.proto import Execution + +from kfp.samples.test.utils import KfpTask, TaskInputs, TaskOutputs, TestCase, run_pipeline_func +from .pipeline_container_no_input import pipeline_container_no_input + + +def verify(t: unittest.TestCase, run: kfp_server_api.ApiRun, + tasks: dict[str, KfpTask], **kwargs): + t.assertEqual(run.status, 'Succeeded') + t.assertEqual( + { + 'container-no-input': + KfpTask( + name='container-no-input', + type='system.ContainerExecution', + state=Execution.State.COMPLETE, + inputs=TaskInputs(parameters={}, artifacts=[]), + outputs=TaskOutputs(parameters={}, artifacts=[])) + }, + tasks, + ) + + +if __name__ == '__main__': + run_pipeline_func([ + TestCase( + pipeline_func=pipeline_container_no_input, + verify_func=verify, + mode=kfp.dsl.PipelineExecutionMode.V2_ENGINE, + ), + ]) diff --git a/samples/v2/two_step_pipeline_containerized.py b/samples/v2/two_step_pipeline_containerized.py new file mode 100644 index 00000000000..2cee711ed7d --- /dev/null +++ b/samples/v2/two_step_pipeline_containerized.py @@ -0,0 +1,55 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Two step pipeline using dsl.container_component decorator.""" +import os + +from kfp import compiler +from kfp.dsl import container_component +from kfp.dsl import ContainerSpec +from kfp.dsl import Dataset +from kfp.dsl import Input +from kfp.dsl import Output +from kfp.dsl import pipeline + + +@container_component +def component1(text: str, output_gcs: Output[Dataset]): + return ContainerSpec( + image='google/cloud-sdk:slim', + command=[ + 'sh -c | set -e -x', 'echo', text, '| gsutil cp -', output_gcs.uri + ]) + + +@container_component +def component2(input_gcs: Input[Dataset]): + return ContainerSpec( + image='google/cloud-sdk:slim', + command=['sh', '-c', '|', 'set -e -x gsutil cat'], + args=[input_gcs.path]) + + +@pipeline(name='two-step-pipeline-containerized') +def two_step_pipeline_containerized(): + component_1 = component1(text='hi').set_display_name('Producer') + component_2 = component2(input_gcs=component_1.outputs['output_gcs']) + component_2.set_display_name('Consumer') + + +if __name__ == '__main__': + # execute only if run as a script + + compiler.Compiler().compile( + pipeline_func=two_step_pipeline_containerized, + package_path='two_step_pipeline_containerized.yaml') diff --git a/samples/v2/two_step_pipeline_containerized_test.py b/samples/v2/two_step_pipeline_containerized_test.py new file mode 100644 index 00000000000..9aa76556070 --- /dev/null +++ b/samples/v2/two_step_pipeline_containerized_test.py @@ -0,0 +1,86 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Pipeline container no input v2 engine pipeline.""" + +from __future__ import annotations + +import unittest + +import kfp.deprecated as kfp +from kfp.samples.test.utils import KfpTask +from kfp.samples.test.utils import run_pipeline_func +from kfp.samples.test.utils import TaskInputs +from kfp.samples.test.utils import TaskOutputs +from kfp.samples.test.utils import TestCase +import kfp_server_api +from ml_metadata.proto import Execution + +from .two_step_pipeline_containerized import two_step_pipeline_containerized + + +def verify(t: unittest.TestCase, run: kfp_server_api.ApiRun, + tasks: dict[str, KfpTask], **kwargs): + t.assertEqual(run.status, 'Succeeded') + component1_dict = tasks['component1'].get_dict() + component2_dict = tasks['component2'].get_dict() + for artifact in component1_dict.get('outputs').get('artifacts'): + # pop metadata here because the artifact which got re-imported may have metadata with uncertain data + if artifact.get('metadata') is not None: + artifact.pop('metadata') + for artifact in component2_dict.get('inputs').get('artifacts'): + # pop metadata here because the artifact which got re-imported may have metadata with uncertain data + if artifact.get('metadata') is not None: + artifact.pop('metadata') + + t.assertEqual( + { + 'name': 'component1', + 'inputs': { + 'parameters': { + 'text': 'hi' + } + }, + 'outputs': { + 'artifacts': [{ + 'name': 'output_gcs', + 'type': 'system.Dataset' + }], + }, + 'type': 'system.ContainerExecution', + 'state': Execution.State.COMPLETE, + }, component1_dict) + + t.assertEqual( + { + 'name': 'component2', + 'inputs': { + 'artifacts': [{ + 'name': 'input_gcs', + 'type': 'system.Dataset' + }], + }, + 'outputs': {}, + 'type': 'system.ContainerExecution', + 'state': Execution.State.COMPLETE, + }, component2_dict) + + +if __name__ == '__main__': + run_pipeline_func([ + TestCase( + pipeline_func=two_step_pipeline_containerized, + verify_func=verify, + mode=kfp.dsl.PipelineExecutionMode.V2_ENGINE, + ), + ]) diff --git a/sdk/python/kfp/compiler/_read_write_test_config.py b/sdk/python/kfp/compiler/_read_write_test_config.py index 05c79bcef01..dbe62c71122 100644 --- a/sdk/python/kfp/compiler/_read_write_test_config.py +++ b/sdk/python/kfp/compiler/_read_write_test_config.py @@ -42,6 +42,8 @@ 'pipeline_with_task_final_status', 'pipeline_with_task_final_status_yaml', 'component_with_pip_index_urls', + 'container_component_with_no_inputs', + 'two_step_pipeline_containerized', ], 'test_data_dir': 'sdk/python/kfp/compiler/test_data/pipelines', 'config': { @@ -60,6 +62,9 @@ 'nested_return', 'output_metrics', 'preprocess', + 'container_no_input', + 'container_io', + 'container_with_artifact_output', ], 'test_data_dir': 'sdk/python/kfp/compiler/test_data/components', 'config': { diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index 8418e66773b..c24e3bdf7a8 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -803,6 +803,104 @@ def hello_world(text: str) -> str: pipeline_spec['root']['inputDefinitions']['parameters']['text'] ['defaultValue'], 'override_string') + def test_compile_container_component_simple(self): + + @dsl.container_component + def hello_world_container() -> dsl.ContainerSpec: + """Hello world component.""" + return dsl.ContainerSpec( + image='python:3.7', + command=['echo', 'hello world'], + args=[], + ) + + with tempfile.TemporaryDirectory() as tempdir: + output_json = os.path.join(tempdir, 'component.yaml') + compiler.Compiler().compile( + pipeline_func=hello_world_container, + package_path=output_json, + pipeline_name='hello-world-container') + with open(output_json, 'r') as f: + pipeline_spec = yaml.safe_load(f) + self.assertEqual( + pipeline_spec['deploymentSpec']['executors'] + ['exec-hello-world-container']['container']['command'], + ['echo', 'hello world']) + + def test_compile_container_with_simple_io(self): + + @dsl.container_component + def container_simple_io(text: str, output_path: dsl.OutputPath(str)): + return dsl.ContainerSpec( + image='python:3.7', + command=['my_program', text], + args=['--output_path', output_path]) + + with tempfile.TemporaryDirectory() as tempdir: + output_json = os.path.join(tempdir, 'component.yaml') + compiler.Compiler().compile( + pipeline_func=container_simple_io, + package_path=output_json, + pipeline_name='container-simple-io') + with open(output_json, 'r') as f: + pipeline_spec = yaml.safe_load(f) + self.assertEqual( + pipeline_spec['components']['comp-container-simple-io'] + ['inputDefinitions']['parameters']['text']['parameterType'], + 'STRING') + self.assertEqual( + pipeline_spec['components']['comp-container-simple-io'] + ['outputDefinitions']['parameters']['output_path']['parameterType'], + 'STRING') + + def test_compile_container_with_artifact_output(self): + + @dsl.container_component + def container_with_artifact_output( + num_epochs: int, # also as an input + model: dsl.Output[dsl.Model], + model_config_path: dsl.OutputPath(str), + ): + return dsl.ContainerSpec( + image='gcr.io/my-image', + command=['sh', 'run.sh'], + args=[ + '--epochs', + num_epochs, + '--model_path', + model.uri, + '--model_config_path', + model_config_path, + ]) + + with tempfile.TemporaryDirectory() as tempdir: + output_yaml = os.path.join(tempdir, 'component.yaml') + compiler.Compiler().compile( + pipeline_func=container_with_artifact_output, + package_path=output_yaml, + pipeline_name='container-with-artifact-output') + with open(output_yaml, 'r') as f: + pipeline_spec = yaml.safe_load(f) + self.assertEqual( + pipeline_spec['components']['comp-container-with-artifact-output'] + ['inputDefinitions']['parameters']['num_epochs']['parameterType'], + 'NUMBER_INTEGER') + self.assertEqual( + pipeline_spec['components']['comp-container-with-artifact-output'] + ['outputDefinitions']['artifacts']['model']['artifactType'] + ['schemaTitle'], 'system.Model') + self.assertEqual( + pipeline_spec['components']['comp-container-with-artifact-output'] + ['outputDefinitions']['parameters']['model_config_path'] + ['parameterType'], 'STRING') + args_to_check = pipeline_spec['deploymentSpec']['executors'][ + 'exec-container-with-artifact-output']['container']['args'] + self.assertEqual(args_to_check[3], + "{{$.outputs.artifacts['model'].uri}}") + self.assertEqual( + args_to_check[5], + "{{$.outputs.parameters['model_config_path'].output_file}}") + class TestCompileBadInput(unittest.TestCase): diff --git a/sdk/python/kfp/compiler/test_data/components/container_io.py b/sdk/python/kfp/compiler/test_data/components/container_io.py new file mode 100644 index 00000000000..1b64bbc0b1b --- /dev/null +++ b/sdk/python/kfp/compiler/test_data/components/container_io.py @@ -0,0 +1,31 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from kfp.dsl import container_component +from kfp.dsl import ContainerSpec +from kfp.dsl import OutputPath + + +@container_component +def container_io(text: str, output_path: OutputPath(str)): + return ContainerSpec( + image='python:3.7', + command=['my_program', text], + args=['--output_path', output_path]) + + +if __name__ == '__main__': + from kfp import compiler + compiler.Compiler().compile( + pipeline_func=container_io, + package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/kfp/compiler/test_data/components/container_io.yaml b/sdk/python/kfp/compiler/test_data/components/container_io.yaml new file mode 100644 index 00000000000..61a83d02535 --- /dev/null +++ b/sdk/python/kfp/compiler/test_data/components/container_io.yaml @@ -0,0 +1,44 @@ +components: + comp-container-io: + executorLabel: exec-container-io + inputDefinitions: + parameters: + text: + parameterType: STRING + outputDefinitions: + parameters: + output_path: + parameterType: STRING +deploymentSpec: + executors: + exec-container-io: + container: + args: + - --output_path + - '{{$.outputs.parameters[''output_path''].output_file}}' + command: + - my_program + - '{{$.inputs.parameters[''text'']}}' + image: python:3.7 +pipelineInfo: + name: container-io +root: + dag: + tasks: + container-io: + cachingOptions: + enableCache: true + componentRef: + name: comp-container-io + inputs: + parameters: + text: + componentInputParameter: text + taskInfo: + name: container-io + inputDefinitions: + parameters: + text: + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.0.0-beta.1 diff --git a/sdk/python/kfp/compiler/test_data/components/container_no_input.py b/sdk/python/kfp/compiler/test_data/components/container_no_input.py new file mode 100644 index 00000000000..1a35eb3c47f --- /dev/null +++ b/sdk/python/kfp/compiler/test_data/components/container_no_input.py @@ -0,0 +1,31 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from kfp.dsl import container_component +from kfp.dsl import ContainerSpec + + +@container_component +def container_no_input(): + return ContainerSpec( + image='python:3.7', + command=['echo', 'hello world'], + args=[], + ) + + +if __name__ == '__main__': + from kfp import compiler + compiler.Compiler().compile( + pipeline_func=container_no_input, + package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/kfp/compiler/test_data/components/container_no_input.yaml b/sdk/python/kfp/compiler/test_data/components/container_no_input.yaml new file mode 100644 index 00000000000..984dd5d3888 --- /dev/null +++ b/sdk/python/kfp/compiler/test_data/components/container_no_input.yaml @@ -0,0 +1,25 @@ +components: + comp-container-no-input: + executorLabel: exec-container-no-input +deploymentSpec: + executors: + exec-container-no-input: + container: + command: + - echo + - hello world + image: python:3.7 +pipelineInfo: + name: container-no-input +root: + dag: + tasks: + container-no-input: + cachingOptions: + enableCache: true + componentRef: + name: comp-container-no-input + taskInfo: + name: container-no-input +schemaVersion: 2.1.0 +sdkVersion: kfp-2.0.0-beta.1 diff --git a/sdk/python/kfp/compiler/test_data/components/container_with_artifact_output.py b/sdk/python/kfp/compiler/test_data/components/container_with_artifact_output.py new file mode 100644 index 00000000000..92fb142e564 --- /dev/null +++ b/sdk/python/kfp/compiler/test_data/components/container_with_artifact_output.py @@ -0,0 +1,44 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from kfp.dsl import container_component +from kfp.dsl import ContainerSpec +from kfp.dsl import Model +from kfp.dsl import Output +from kfp.dsl import OutputPath + + +@container_component +def container_with_artifact_output( + num_epochs: int, # built-in types are parsed as inputs + model: Output[Model], + model_config_path: OutputPath(str), +): + return ContainerSpec( + image='gcr.io/my-image', + command=['sh', 'run.sh'], + args=[ + '--epochs', + num_epochs, + '--model_path', + model.uri, + '--model_config_path', + model_config_path, + ]) + + +if __name__ == '__main__': + from kfp import compiler + compiler.Compiler().compile( + pipeline_func=container_with_artifact_output, + package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/kfp/compiler/test_data/components/container_with_artifact_output.yaml b/sdk/python/kfp/compiler/test_data/components/container_with_artifact_output.yaml new file mode 100644 index 00000000000..15763b9c626 --- /dev/null +++ b/sdk/python/kfp/compiler/test_data/components/container_with_artifact_output.yaml @@ -0,0 +1,53 @@ +components: + comp-container-with-artifact-output: + executorLabel: exec-container-with-artifact-output + inputDefinitions: + parameters: + num_epochs: + parameterType: NUMBER_INTEGER + outputDefinitions: + artifacts: + model: + artifactType: + schemaTitle: system.Model + schemaVersion: 0.0.1 + parameters: + model_config_path: + parameterType: STRING +deploymentSpec: + executors: + exec-container-with-artifact-output: + container: + args: + - --epochs + - '{{$.inputs.parameters[''num_epochs'']}}' + - --model_path + - '{{$.outputs.artifacts[''model''].uri}}' + - --model_config_path + - '{{$.outputs.parameters[''model_config_path''].output_file}}' + command: + - sh + - run.sh + image: gcr.io/my-image +pipelineInfo: + name: container-with-artifact-output +root: + dag: + tasks: + container-with-artifact-output: + cachingOptions: + enableCache: true + componentRef: + name: comp-container-with-artifact-output + inputs: + parameters: + num_epochs: + componentInputParameter: num_epochs + taskInfo: + name: container-with-artifact-output + inputDefinitions: + parameters: + num_epochs: + parameterType: NUMBER_INTEGER +schemaVersion: 2.1.0 +sdkVersion: kfp-2.0.0-beta.1 diff --git a/sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.py b/sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.py new file mode 100644 index 00000000000..724b701dba1 --- /dev/null +++ b/sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.py @@ -0,0 +1,35 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kfp import compiler +from kfp import dsl + + +@dsl.container_component +def hello_world_container(): + return dsl.ContainerSpec( + image='python:3.7', + command=['echo', 'hello world'], + args=[], + ) + + +@dsl.pipeline(name='v2-container-component-no-input') +def pipeline(): + hello_world_container() + + +if __name__ == '__main__': + compiler.Compiler().compile( + pipeline_func=pipeline, package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.yaml b/sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.yaml new file mode 100644 index 00000000000..6ceab5e57a0 --- /dev/null +++ b/sdk/python/kfp/compiler/test_data/pipelines/container_component_with_no_inputs.yaml @@ -0,0 +1,25 @@ +components: + comp-hello-world-container: + executorLabel: exec-hello-world-container +deploymentSpec: + executors: + exec-hello-world-container: + container: + command: + - echo + - hello world + image: python:3.7 +pipelineInfo: + name: v2-container-component-no-input +root: + dag: + tasks: + hello-world-container: + cachingOptions: + enableCache: true + componentRef: + name: comp-hello-world-container + taskInfo: + name: hello-world-container +schemaVersion: 2.1.0 +sdkVersion: kfp-2.0.0-beta.0 diff --git a/sdk/python/kfp/compiler/test_data/pipelines/two_step_pipeline_containerized.py b/sdk/python/kfp/compiler/test_data/pipelines/two_step_pipeline_containerized.py new file mode 100644 index 00000000000..f85f0507c2b --- /dev/null +++ b/sdk/python/kfp/compiler/test_data/pipelines/two_step_pipeline_containerized.py @@ -0,0 +1,46 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kfp import compiler +from kfp import dsl + + +@dsl.container_component +def component1(text: str, output_gcs: dsl.Output[dsl.Dataset]): + return dsl.ContainerSpec( + image='google/cloud-sdk:slim', + command=[ + 'sh -c | set -e -x', 'echo', text, '| gsutil cp -', output_gcs.uri + ]) + + +@dsl.container_component +def component2(input_gcs: dsl.Input[dsl.Dataset]): + return dsl.ContainerSpec( + image='google/cloud-sdk:slim', + command=['sh', '-c', '|', 'set -e -x gsutil cat'], + args=[input_gcs.uri]) + + +@dsl.pipeline(name='containerized-two-step-pipeline') +def my_pipeline(text: str): + component_1 = component1(text=text).set_display_name('Producer') + component_2 = component2(input_gcs=component_1.outputs['output_gcs']) + component_2.set_display_name('Consumer') + + +if __name__ == '__main__': + compiler.Compiler().compile( + pipeline_func=my_pipeline, + package_path=__file__.replace('.py', '.yaml')) diff --git a/sdk/python/kfp/compiler/test_data/pipelines/two_step_pipeline_containerized.yaml b/sdk/python/kfp/compiler/test_data/pipelines/two_step_pipeline_containerized.yaml new file mode 100644 index 00000000000..38f2a3e6c90 --- /dev/null +++ b/sdk/python/kfp/compiler/test_data/pipelines/two_step_pipeline_containerized.yaml @@ -0,0 +1,79 @@ +components: + comp-component1: + executorLabel: exec-component1 + inputDefinitions: + parameters: + text: + parameterType: STRING + outputDefinitions: + artifacts: + output_gcs: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + comp-component2: + executorLabel: exec-component2 + inputDefinitions: + artifacts: + input_gcs: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 +deploymentSpec: + executors: + exec-component1: + container: + command: + - sh -c | set -e -x + - echo + - '{{$.inputs.parameters[''text'']}}' + - '| gsutil cp -' + - '{{$.outputs.artifacts[''output_gcs''].uri}}' + image: google/cloud-sdk:slim + exec-component2: + container: + args: + - '{{$.inputs.artifacts[''input_gcs''].uri}}' + command: + - sh + - -c + - '|' + - set -e -x gsutil cat + image: google/cloud-sdk:slim +pipelineInfo: + name: containerized-two-step-pipeline +root: + dag: + tasks: + component1: + cachingOptions: + enableCache: true + componentRef: + name: comp-component1 + inputs: + parameters: + text: + componentInputParameter: text + taskInfo: + name: Producer + component2: + cachingOptions: + enableCache: true + componentRef: + name: comp-component2 + dependentTasks: + - component1 + inputs: + artifacts: + input_gcs: + taskOutputArtifact: + outputArtifactKey: output_gcs + producerTask: component1 + taskInfo: + name: Consumer + inputDefinitions: + parameters: + text: + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.0.0-beta.1 diff --git a/sdk/python/kfp/components/__init__.py b/sdk/python/kfp/components/__init__.py index dd507b2b788..02ee9300c21 100644 --- a/sdk/python/kfp/components/__init__.py +++ b/sdk/python/kfp/components/__init__.py @@ -20,10 +20,12 @@ 'load_component_from_url', 'PythonComponent', 'BaseComponent', + 'ContainerComponent', 'YamlComponent', ] from kfp.components.base_component import BaseComponent +from kfp.components.container_component import ContainerComponent from kfp.components.python_component import PythonComponent from kfp.components.yaml_component import load_component_from_file from kfp.components.yaml_component import load_component_from_text diff --git a/sdk/python/kfp/components/component_factory.py b/sdk/python/kfp/components/component_factory.py index 9dbd920adee..f239f377921 100644 --- a/sdk/python/kfp/components/component_factory.py +++ b/sdk/python/kfp/components/component_factory.py @@ -17,10 +17,11 @@ import pathlib import re import textwrap -from typing import Callable, List, Optional, Tuple +from typing import Callable, List, Optional, Tuple, Union import warnings import docstring_parser +from kfp.components import container_component from kfp.components import placeholders from kfp.components import python_component from kfp.components import structures @@ -47,6 +48,33 @@ class ComponentInfo(): base_image: str = _DEFAULT_BASE_IMAGE +class ContainerComponentArtifactChannel(): + """A class for passing in placeholders into container_component decorated + function.""" + + def __init__(self, io_type: str, var_name: str): + self._io_type = io_type + self._var_name = var_name + + def __getattr__( + self, _name: str + ) -> Union[placeholders.InputUriPlaceholder, placeholders + .InputPathPlaceholder, placeholders.OutputUriPlaceholder, + placeholders.OutputPathPlaceholder]: + if _name not in ['uri', 'path']: + raise AttributeError(f'Cannot access artifact attribute "{_name}".') + if self._io_type == 'input': + if _name == 'uri': + return placeholders.InputUriPlaceholder(self._var_name) + elif _name == 'path': + return placeholders.InputPathPlaceholder(self._var_name) + elif self._io_type == 'output': + if _name == 'uri': + return placeholders.OutputUriPlaceholder(self._var_name) + elif _name == 'path': + return placeholders.OutputPathPlaceholder(self._var_name) + + # A map from function_name to components. This is always populated when a # module containing KFP components is loaded. Primarily used by KFP CLI # component builder to package components in a file into containers. @@ -171,7 +199,9 @@ def _maybe_make_unique(name: str, names: List[str]): raise RuntimeError('Too many arguments with the name {}'.format(name)) -def extract_component_interface(func: Callable) -> structures.ComponentSpec: +def extract_component_interface( + func: Callable, + containerized: bool = False) -> structures.ComponentSpec: single_output_name_const = 'Output' signature = inspect.signature(func) @@ -248,44 +278,51 @@ def extract_component_interface(func: Callable) -> structures.ComponentSpec: #Analyzing the return type annotations. return_ann = signature.return_annotation - if hasattr(return_ann, '_fields'): #NamedTuple - # Getting field type annotations. - # __annotations__ does not exist in python 3.5 and earlier - # _field_types does not exist in python 3.9 and later - field_annotations = getattr(return_ann, - '__annotations__', None) or getattr( - return_ann, '_field_types', None) - for field_name in return_ann._fields: - type_struct = None - if field_annotations: - type_struct = _annotation_to_type_struct( - field_annotations.get(field_name, None)) - - output_name = _maybe_make_unique(field_name, output_names) + if not containerized: + if hasattr(return_ann, '_fields'): #NamedTuple + # Getting field type annotations. + # __annotations__ does not exist in python 3.5 and earlier + # _field_types does not exist in python 3.9 and later + field_annotations = getattr(return_ann, '__annotations__', + None) or getattr( + return_ann, '_field_types', None) + for field_name in return_ann._fields: + type_struct = None + if field_annotations: + type_struct = _annotation_to_type_struct( + field_annotations.get(field_name, None)) + + output_name = _maybe_make_unique(field_name, output_names) + output_names.add(output_name) + output_spec = structures.OutputSpec(type=type_struct) + outputs[output_name] = output_spec + # Deprecated dict-based way of declaring multiple outputs. Was only used by + # the @component decorator + elif isinstance(return_ann, dict): + warnings.warn( + 'The ability to specify multiple outputs using the dict syntax' + ' has been deprecated. It will be removed soon after release' + ' 0.1.32. Please use typing.NamedTuple to declare multiple' + ' outputs.') + for output_name, output_type_annotation in return_ann.items(): + output_type_struct = _annotation_to_type_struct( + output_type_annotation) + output_spec = structures.OutputSpec(type=output_type_struct) + outputs[name] = output_spec + elif signature.return_annotation is not None and signature.return_annotation != inspect.Parameter.empty: + output_name = _maybe_make_unique(single_output_name_const, + output_names) + # Fixes exotic, but possible collision: + # `def func(output_path: OutputPath()) -> str: ...` output_names.add(output_name) + type_struct = _annotation_to_type_struct( + signature.return_annotation) output_spec = structures.OutputSpec(type=type_struct) outputs[output_name] = output_spec - # Deprecated dict-based way of declaring multiple outputs. Was only used by - # the @component decorator - elif isinstance(return_ann, dict): - warnings.warn( - 'The ability to specify multiple outputs using the dict syntax' - ' has been deprecated. It will be removed soon after release' - ' 0.1.32. Please use typing.NamedTuple to declare multiple' - ' outputs.') - for output_name, output_type_annotation in return_ann.items(): - output_type_struct = _annotation_to_type_struct( - output_type_annotation) - output_spec = structures.OutputSpec(type=output_type_struct) - outputs[name] = output_spec - elif signature.return_annotation is not None and signature.return_annotation != inspect.Parameter.empty: - output_name = _maybe_make_unique(single_output_name_const, output_names) - # Fixes exotic, but possible collision: - # `def func(output_path: OutputPath()) -> str: ...` - output_names.add(output_name) - type_struct = _annotation_to_type_struct(signature.return_annotation) - output_spec = structures.OutputSpec(type=type_struct) - outputs[output_name] = output_spec + elif return_ann != inspect.Parameter.empty and return_ann != structures.ContainerSpec: + raise TypeError( + 'Return annotation should be either ContainerSpec or omitted for container components.' + ) # Component name and description are derived from the function's name and # docstring. The name can be overridden by setting setting func.__name__ @@ -435,3 +472,44 @@ def create_component_from_func(func: Callable, return python_component.PythonComponent( component_spec=component_spec, python_func=func) + + +def create_container_component_from_func( + func: Callable) -> container_component.ContainerComponent: + """Implementation for the @container_component decorator. + + The decorator is defined under container_component_decorator.py. See + the decorator for the canonical documentation for this function. + """ + + component_spec = extract_component_interface(func, containerized=True) + arg_list = [] + signature = inspect.signature(func) + parameters = list(signature.parameters.values()) + for parameter in parameters: + parameter_type = type_annotations.maybe_strip_optional_from_annotation( + parameter.annotation) + io_name = parameter.name + if type_annotations.is_input_artifact(parameter_type): + arg_list.append( + ContainerComponentArtifactChannel( + io_type='input', var_name=io_name)) + elif type_annotations.is_output_artifact(parameter_type): + arg_list.append( + ContainerComponentArtifactChannel( + io_type='output', var_name=io_name)) + elif isinstance( + parameter_type, + (type_annotations.OutputAnnotation, type_annotations.OutputPath)): + arg_list.append(placeholders.OutputParameterPlaceholder(io_name)) + else: # parameter is an input value + arg_list.append(placeholders.InputValuePlaceholder(io_name)) + + container_spec = func(*arg_list) + for arg in (container_spec.command or []) + (container_spec.args or []): + if isinstance(arg, ContainerComponentArtifactChannel): + raise TypeError( + 'Cannot access artifact by itself in the container definition. Please use .uri or .path instead to access the artifact.' + ) + component_spec.implementation = structures.Implementation(container_spec) + return container_component.ContainerComponent(component_spec, func) diff --git a/sdk/python/kfp/components/component_factory_test.py b/sdk/python/kfp/components/component_factory_test.py index 6b984b6962b..18f528672f8 100644 --- a/sdk/python/kfp/components/component_factory_test.py +++ b/sdk/python/kfp/components/component_factory_test.py @@ -14,7 +14,9 @@ import unittest +from kfp import dsl from kfp.components import component_factory +from kfp.components import placeholders class TestGetPackagesToInstallCommand(unittest.TestCase): @@ -44,3 +46,39 @@ def test_with_packages_to_install_with_pip_index_url(self): concat_command = ' '.join(command) for package in packages_to_install + pip_index_urls: self.assertTrue(package in concat_command) + + +class TestContainerComponentArtifactChannel(unittest.TestCase): + + def test_correct_placeholder_and_attribute_error(self): + in_channel = component_factory.ContainerComponentArtifactChannel( + 'input', 'my_dataset') + out_channel = component_factory.ContainerComponentArtifactChannel( + 'output', 'my_result') + self.assertEqual(in_channel.uri, + placeholders.InputUriPlaceholder('my_dataset')) + self.assertEqual(out_channel.path, + placeholders.OutputPathPlaceholder('my_result')) + self.assertRaisesRegex(AttributeError, + r'Cannot access artifact attribute "name"', + lambda: in_channel.name) + self.assertRaisesRegex(AttributeError, + r'Cannot access artifact attribute "channel"', + lambda: out_channel.channel) + + +class TestContainerComponentFactory(unittest.TestCase): + + def test_raise_error_if_access_artifact_by_itself(self): + + def comp_with_artifact_input(dataset: dsl.Input[dsl.Dataset]): + return dsl.ContainerSpec( + image='gcr.io/my-image', + command=['sh', 'run.sh'], + args=[dataset]) + + self.assertRaisesRegex( + TypeError, + r'Cannot access artifact by itself in the container definition.', + component_factory.create_container_component_from_func, + comp_with_artifact_input) diff --git a/sdk/python/kfp/components/container_component.py b/sdk/python/kfp/components/container_component.py new file mode 100644 index 00000000000..d9e0773bd1f --- /dev/null +++ b/sdk/python/kfp/components/container_component.py @@ -0,0 +1,38 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Container-based component.""" + +from typing import Callable + +from kfp.components import base_component +from kfp.components import structures + + +class ContainerComponent(base_component.BaseComponent): + """Component defined via pre-built container. + + Attribute: + pipeline_func: The function that becomes the implementation of this component. + """ + + def __init__(self, component_spec: structures.ComponentSpec, + pipeline_func: Callable) -> None: + super().__init__(component_spec=component_spec) + self.pipeline_func = pipeline_func + + def execute(self, **kwargs): + # ContainerComponent`: Also inherits from `BaseComponent`. + # As its name suggests, this class backs (custom) container components. + # Its `execute()` method uses `docker run` for local component execution + raise NotImplementedError diff --git a/sdk/python/kfp/components/container_component_decorator.py b/sdk/python/kfp/components/container_component_decorator.py new file mode 100644 index 00000000000..95cb334eb80 --- /dev/null +++ b/sdk/python/kfp/components/container_component_decorator.py @@ -0,0 +1,53 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Callable + +from kfp.components import component_factory +from kfp.components import container_component + + +def container_component( + func: Callable) -> container_component.ContainerComponent: + """Decorator for container-based components in KFP v2. + + Args: + func: The python function to create a component from. The function + should have type annotations for all its arguments, indicating how + it is intended to be used (e.g. as an input/output Artifact object, + a plain parameter, or a path to a file). + + Example: + :: + + from kfp.dsl import container_component, ContainerSpec, InputPath, OutputPath, Output + + @container_component + def my_component( + dataset_path: InputPath(Dataset), + model: Output[Model], + num_epochs: int, + output_parameter: OutputPath(str), + ): + return ContainerSpec( + image='gcr.io/my-image', + command=['sh', 'my_component.sh'], + arguments=[ + '--dataset_path', dataset_path, + '--model_path', model.path, + '--output_parameter_path', output_parameter, + ] + ) + """ + return component_factory.create_container_component_from_func(func) diff --git a/sdk/python/kfp/components/container_component_decorator_test.py b/sdk/python/kfp/components/container_component_decorator_test.py new file mode 100644 index 00000000000..12f8c8a7226 --- /dev/null +++ b/sdk/python/kfp/components/container_component_decorator_test.py @@ -0,0 +1,77 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from kfp import dsl +from kfp.components import container_component + + +class TestContainerComponentDecorator(unittest.TestCase): + + def test_func_with_no_arg(self): + + @dsl.container_component + def hello_world() -> dsl.ContainerSpec: + """Hello world component.""" + return dsl.ContainerSpec( + image='python3.7', + command=['echo', 'hello world'], + args=[], + ) + + self.assertIsInstance(hello_world, + container_component.ContainerComponent) + self.assertIsNone(hello_world.component_spec.inputs) + + def test_func_with_simple_io(self): + + @dsl.container_component + def hello_world_io( + text: str, + text_output_path: dsl.OutputPath(str)) -> dsl.ContainerSpec: + """Hello world component with input and output.""" + return dsl.ContainerSpec( + image='python:3.7', + command=['echo'], + args=['--text', text, '--output_path', text_output_path]) + + self.assertIsInstance(hello_world_io, + container_component.ContainerComponent) + + def test_func_with_artifact_io(self): + + @dsl.container_component + def container_comp_with_artifacts( + dataset: dsl.Input[dsl.Dataset], + num_epochs: int, # also as an input + model: dsl.Output[dsl.Model], + model_config_path: dsl.OutputPath(str), + ): + return dsl.ContainerSpec( + image='gcr.io/my-image', + command=['sh', 'run.sh'], + args=[ + '--dataset_location', + dataset.path, + '--epochs', + num_epochs, + '--model_path', + model.uri, + '--model_config_path', + model_config_path, + ]) + + self.assertIsInstance(container_comp_with_artifacts, + container_component.ContainerComponent) diff --git a/sdk/python/kfp/components/structures.py b/sdk/python/kfp/components/structures.py index d1483673607..30ffcd8282c 100644 --- a/sdk/python/kfp/components/structures.py +++ b/sdk/python/kfp/components/structures.py @@ -180,18 +180,44 @@ class ResourceSpec(base_model.BaseModel): class ContainerSpec(base_model.BaseModel): """Container implementation definition. - Attributes: - image: The container image. - command (optional): the container entrypoint. - args (optional): the arguments to the container entrypoint. - env (optional): the environment variables to be passed to the container. - resources (optional): the specification on the resource requirements. + This is only used for pipeline authors when constructing a containerized component + using @container_component decorator. + + Examples: + :: + + @container_component + def container_with_artifact_output( + num_epochs: int, # built-in types are parsed as inputs + model: Output[Model], + model_config_path: OutputPath(str), + ): + return ContainerSpec( + image='gcr.io/my-image', + command=['sh', 'run.sh'], + args=[ + '--epochs', + num_epochs, + '--model_path', + model.uri, + '--model_config_path', + model_config_path, + ]) """ image: str + """Container image.""" + command: Optional[List[placeholders.CommandLineElement]] = None + """Container entrypoint.""" + args: Optional[List[placeholders.CommandLineElement]] = None + """Arguments to the container entrypoint.""" + env: Optional[Mapping[str, placeholders.CommandLineElement]] = None + """Environment variables to be passed to the container.""" + resources: Optional[ResourceSpec] = None + """Specification on the resource requirements.""" def transform_command(self) -> None: """Use None instead of empty list for command.""" diff --git a/sdk/python/kfp/dsl/__init__.py b/sdk/python/kfp/dsl/__init__.py index a83f239cd4a..1f21b2b8754 100644 --- a/sdk/python/kfp/dsl/__init__.py +++ b/sdk/python/kfp/dsl/__init__.py @@ -16,6 +16,8 @@ __all__ = [ 'component', + 'container_component', + 'ContainerSpec', 'importer', 'pipeline', 'PipelineTask', @@ -43,9 +45,11 @@ ] from kfp.components.component_decorator import component +from kfp.components.container_component_decorator import container_component from kfp.components.importer_node import importer from kfp.components.pipeline_context import pipeline from kfp.components.pipeline_task import PipelineTask +from kfp.components.structures import ContainerSpec from kfp.components.task_final_status import PipelineTaskFinalStatus from kfp.components.tasks_group import Condition from kfp.components.tasks_group import ExitHandler