Skip to content

Commit

Permalink
feat(sdk): supporting bring your own container for arbitrary input an…
Browse files Browse the repository at this point in the history
…d outputs (#8066)

* support container_component decorator for function with no inputs

* resolve review comments

* add sample tests for milestone 1

* modify compiler test data

* resolve reviews

* resolve reviews

* WIP

* implementation of function of no inputs

* fixed sample test

* re-fix sample test

* fix rebase merge conflict

* resolve formatting

* resolve isort error for test data

* resolve comments

* fix nit

* resolve nit

* add implementation for placeholders i/o, sample and compiler tests

* resolve comments and merge logic for constructing container component

* resolve comments

* resolve comments

* fix assertion messages

* add error handling for accessing artifact by itself

* add test for raising error for accessing artifact by itself
  • Loading branch information
zichuan-scott-xu authored Aug 8, 2022
1 parent e8abec2 commit 04c827f
Show file tree
Hide file tree
Showing 25 changed files with 1,109 additions and 44 deletions.
4 changes: 3 additions & 1 deletion samples/test/config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2021 The Kubeflow Authors
# Copyright 2021-2022 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -122,6 +122,8 @@
path: samples.v2.producer_consumer_param_test
- name: pipeline_with_importer
path: samples.v2.pipeline_with_importer_test
- name: pipeline_container_no_input
path: samples.v2.pipeline_container_no_input_test
# TODO(capri-xiyue): Re-enable after figuring out V2 Engine
# and protobuf.Value support.
# - name: cache_v2
Expand Down
38 changes: 38 additions & 0 deletions samples/v2/pipeline_container_no_input.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright 2022 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os

from kfp import compiler
from kfp import dsl


@dsl.container_component
def container_no_input():
return dsl.ContainerSpec(
image='python:3.7',
command=['echo', 'hello world'],
args=[],
)


@dsl.pipeline(name='v2-container-component-no-input')
def pipeline_container_no_input():
container_no_input()


if __name__ == '__main__':
# execute only if run as a script
compiler.Compiler().compile(
pipeline_func=pipeline_container_no_input,
package_path='pipeline_container_no_input.yaml')
52 changes: 52 additions & 0 deletions samples/v2/pipeline_container_no_input_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright 2022 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Pipeline container no input v2 engine pipeline."""

from __future__ import annotations

import unittest

import kfp.deprecated as kfp
import kfp_server_api
from ml_metadata.proto import Execution

from kfp.samples.test.utils import KfpTask, TaskInputs, TaskOutputs, TestCase, run_pipeline_func
from .pipeline_container_no_input import pipeline_container_no_input


def verify(t: unittest.TestCase, run: kfp_server_api.ApiRun,
tasks: dict[str, KfpTask], **kwargs):
t.assertEqual(run.status, 'Succeeded')
t.assertEqual(
{
'container-no-input':
KfpTask(
name='container-no-input',
type='system.ContainerExecution',
state=Execution.State.COMPLETE,
inputs=TaskInputs(parameters={}, artifacts=[]),
outputs=TaskOutputs(parameters={}, artifacts=[]))
},
tasks,
)


if __name__ == '__main__':
run_pipeline_func([
TestCase(
pipeline_func=pipeline_container_no_input,
verify_func=verify,
mode=kfp.dsl.PipelineExecutionMode.V2_ENGINE,
),
])
55 changes: 55 additions & 0 deletions samples/v2/two_step_pipeline_containerized.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Copyright 2022 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Two step pipeline using dsl.container_component decorator."""
import os

from kfp import compiler
from kfp.dsl import container_component
from kfp.dsl import ContainerSpec
from kfp.dsl import Dataset
from kfp.dsl import Input
from kfp.dsl import Output
from kfp.dsl import pipeline


@container_component
def component1(text: str, output_gcs: Output[Dataset]):
return ContainerSpec(
image='google/cloud-sdk:slim',
command=[
'sh -c | set -e -x', 'echo', text, '| gsutil cp -', output_gcs.uri
])


@container_component
def component2(input_gcs: Input[Dataset]):
return ContainerSpec(
image='google/cloud-sdk:slim',
command=['sh', '-c', '|', 'set -e -x gsutil cat'],
args=[input_gcs.path])


@pipeline(name='two-step-pipeline-containerized')
def two_step_pipeline_containerized():
component_1 = component1(text='hi').set_display_name('Producer')
component_2 = component2(input_gcs=component_1.outputs['output_gcs'])
component_2.set_display_name('Consumer')


if __name__ == '__main__':
# execute only if run as a script

compiler.Compiler().compile(
pipeline_func=two_step_pipeline_containerized,
package_path='two_step_pipeline_containerized.yaml')
86 changes: 86 additions & 0 deletions samples/v2/two_step_pipeline_containerized_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Copyright 2022 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Pipeline container no input v2 engine pipeline."""

from __future__ import annotations

import unittest

import kfp.deprecated as kfp
from kfp.samples.test.utils import KfpTask
from kfp.samples.test.utils import run_pipeline_func
from kfp.samples.test.utils import TaskInputs
from kfp.samples.test.utils import TaskOutputs
from kfp.samples.test.utils import TestCase
import kfp_server_api
from ml_metadata.proto import Execution

from .two_step_pipeline_containerized import two_step_pipeline_containerized


def verify(t: unittest.TestCase, run: kfp_server_api.ApiRun,
tasks: dict[str, KfpTask], **kwargs):
t.assertEqual(run.status, 'Succeeded')
component1_dict = tasks['component1'].get_dict()
component2_dict = tasks['component2'].get_dict()
for artifact in component1_dict.get('outputs').get('artifacts'):
# pop metadata here because the artifact which got re-imported may have metadata with uncertain data
if artifact.get('metadata') is not None:
artifact.pop('metadata')
for artifact in component2_dict.get('inputs').get('artifacts'):
# pop metadata here because the artifact which got re-imported may have metadata with uncertain data
if artifact.get('metadata') is not None:
artifact.pop('metadata')

t.assertEqual(
{
'name': 'component1',
'inputs': {
'parameters': {
'text': 'hi'
}
},
'outputs': {
'artifacts': [{
'name': 'output_gcs',
'type': 'system.Dataset'
}],
},
'type': 'system.ContainerExecution',
'state': Execution.State.COMPLETE,
}, component1_dict)

t.assertEqual(
{
'name': 'component2',
'inputs': {
'artifacts': [{
'name': 'input_gcs',
'type': 'system.Dataset'
}],
},
'outputs': {},
'type': 'system.ContainerExecution',
'state': Execution.State.COMPLETE,
}, component2_dict)


if __name__ == '__main__':
run_pipeline_func([
TestCase(
pipeline_func=two_step_pipeline_containerized,
verify_func=verify,
mode=kfp.dsl.PipelineExecutionMode.V2_ENGINE,
),
])
5 changes: 5 additions & 0 deletions sdk/python/kfp/compiler/_read_write_test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
'pipeline_with_task_final_status',
'pipeline_with_task_final_status_yaml',
'component_with_pip_index_urls',
'container_component_with_no_inputs',
'two_step_pipeline_containerized',
],
'test_data_dir': 'sdk/python/kfp/compiler/test_data/pipelines',
'config': {
Expand All @@ -60,6 +62,9 @@
'nested_return',
'output_metrics',
'preprocess',
'container_no_input',
'container_io',
'container_with_artifact_output',
],
'test_data_dir': 'sdk/python/kfp/compiler/test_data/components',
'config': {
Expand Down
98 changes: 98 additions & 0 deletions sdk/python/kfp/compiler/compiler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,104 @@ def hello_world(text: str) -> str:
pipeline_spec['root']['inputDefinitions']['parameters']['text']
['defaultValue'], 'override_string')

def test_compile_container_component_simple(self):

@dsl.container_component
def hello_world_container() -> dsl.ContainerSpec:
"""Hello world component."""
return dsl.ContainerSpec(
image='python:3.7',
command=['echo', 'hello world'],
args=[],
)

with tempfile.TemporaryDirectory() as tempdir:
output_json = os.path.join(tempdir, 'component.yaml')
compiler.Compiler().compile(
pipeline_func=hello_world_container,
package_path=output_json,
pipeline_name='hello-world-container')
with open(output_json, 'r') as f:
pipeline_spec = yaml.safe_load(f)
self.assertEqual(
pipeline_spec['deploymentSpec']['executors']
['exec-hello-world-container']['container']['command'],
['echo', 'hello world'])

def test_compile_container_with_simple_io(self):

@dsl.container_component
def container_simple_io(text: str, output_path: dsl.OutputPath(str)):
return dsl.ContainerSpec(
image='python:3.7',
command=['my_program', text],
args=['--output_path', output_path])

with tempfile.TemporaryDirectory() as tempdir:
output_json = os.path.join(tempdir, 'component.yaml')
compiler.Compiler().compile(
pipeline_func=container_simple_io,
package_path=output_json,
pipeline_name='container-simple-io')
with open(output_json, 'r') as f:
pipeline_spec = yaml.safe_load(f)
self.assertEqual(
pipeline_spec['components']['comp-container-simple-io']
['inputDefinitions']['parameters']['text']['parameterType'],
'STRING')
self.assertEqual(
pipeline_spec['components']['comp-container-simple-io']
['outputDefinitions']['parameters']['output_path']['parameterType'],
'STRING')

def test_compile_container_with_artifact_output(self):

@dsl.container_component
def container_with_artifact_output(
num_epochs: int, # also as an input
model: dsl.Output[dsl.Model],
model_config_path: dsl.OutputPath(str),
):
return dsl.ContainerSpec(
image='gcr.io/my-image',
command=['sh', 'run.sh'],
args=[
'--epochs',
num_epochs,
'--model_path',
model.uri,
'--model_config_path',
model_config_path,
])

with tempfile.TemporaryDirectory() as tempdir:
output_yaml = os.path.join(tempdir, 'component.yaml')
compiler.Compiler().compile(
pipeline_func=container_with_artifact_output,
package_path=output_yaml,
pipeline_name='container-with-artifact-output')
with open(output_yaml, 'r') as f:
pipeline_spec = yaml.safe_load(f)
self.assertEqual(
pipeline_spec['components']['comp-container-with-artifact-output']
['inputDefinitions']['parameters']['num_epochs']['parameterType'],
'NUMBER_INTEGER')
self.assertEqual(
pipeline_spec['components']['comp-container-with-artifact-output']
['outputDefinitions']['artifacts']['model']['artifactType']
['schemaTitle'], 'system.Model')
self.assertEqual(
pipeline_spec['components']['comp-container-with-artifact-output']
['outputDefinitions']['parameters']['model_config_path']
['parameterType'], 'STRING')
args_to_check = pipeline_spec['deploymentSpec']['executors'][
'exec-container-with-artifact-output']['container']['args']
self.assertEqual(args_to_check[3],
"{{$.outputs.artifacts['model'].uri}}")
self.assertEqual(
args_to_check[5],
"{{$.outputs.parameters['model_config_path'].output_file}}")


class TestCompileBadInput(unittest.TestCase):

Expand Down
Loading

0 comments on commit 04c827f

Please sign in to comment.