Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk): supporting bring your own container for arbitrary input and outputs #8066

Merged
merged 25 commits into from
Aug 8, 2022
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
2df8d19
support container_component decorator for function with no inputs
zichuan-scott-xu Jun 29, 2022
101b615
resolve review comments
zichuan-scott-xu Jul 1, 2022
56ed6d5
add sample tests for milestone 1
zichuan-scott-xu Jul 6, 2022
40c9b81
modify compiler test data
zichuan-scott-xu Jul 7, 2022
e9a8bf1
resolve reviews
zichuan-scott-xu Jul 7, 2022
56ac809
resolve reviews
zichuan-scott-xu Jul 7, 2022
702f3ef
WIP
zichuan-scott-xu Jul 11, 2022
b6eb5e2
implementation of function of no inputs
zichuan-scott-xu Jul 13, 2022
4f7745a
fixed sample test
zichuan-scott-xu Jul 14, 2022
8275381
re-fix sample test
zichuan-scott-xu Jul 14, 2022
7b92223
fix rebase merge conflict
zichuan-scott-xu Jul 15, 2022
dc064b5
resolve formatting
zichuan-scott-xu Jul 15, 2022
eb31781
resolve isort error for test data
zichuan-scott-xu Jul 15, 2022
8db6d89
resolve comments
zichuan-scott-xu Jul 18, 2022
b146ae0
fix nit
zichuan-scott-xu Jul 18, 2022
bb9484e
resolve nit
zichuan-scott-xu Jul 18, 2022
920a30f
add implementation for placeholders i/o, sample and compiler tests
zichuan-scott-xu Jul 22, 2022
85cb271
Merge branch 'kubeflow:master' into cuj3-milestone2
zichuan-scott-xu Jul 22, 2022
4199c72
resolve comments and merge logic for constructing container component
zichuan-scott-xu Jul 27, 2022
48ec85b
Merge branch 'cuj3-milestone2' of https://github.com/zichuan-scott-xu…
zichuan-scott-xu Jul 27, 2022
97a1e1a
resolve comments
zichuan-scott-xu Jul 28, 2022
2dc74c4
resolve comments
zichuan-scott-xu Aug 1, 2022
e8f393c
fix assertion messages
zichuan-scott-xu Aug 1, 2022
bb282dc
add error handling for accessing artifact by itself
zichuan-scott-xu Aug 4, 2022
fb26f91
add test for raising error for accessing artifact by itself
zichuan-scott-xu Aug 5, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion samples/test/config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2021 The Kubeflow Authors
# Copyright 2021-2022 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -122,6 +122,8 @@
path: samples.v2.producer_consumer_param_test
- name: pipeline_with_importer
path: samples.v2.pipeline_with_importer_test
- name: pipeline_container_no_input
path: samples.v2.pipeline_container_no_input_test
# TODO(capri-xiyue): Re-enable after figuring out V2 Engine
# and protobuf.Value support.
# - name: cache_v2
Expand Down
38 changes: 38 additions & 0 deletions samples/v2/pipeline_container_no_input.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright 2022 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os

from kfp import compiler
from kfp import dsl


@dsl.container_component
def container_no_input():
return dsl.ContainerSpec(
image='python:3.7',
command=['echo', 'hello world'],
args=[],
)


@dsl.pipeline(name='v2-container-component-no-input')
def pipeline_container_no_input():
container_no_input()


if __name__ == '__main__':
# execute only if run as a script
compiler.Compiler().compile(
pipeline_func=pipeline_container_no_input,
package_path='pipeline_container_no_input.yaml')
52 changes: 52 additions & 0 deletions samples/v2/pipeline_container_no_input_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright 2022 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Pipeline container no input v2 engine pipeline."""

from __future__ import annotations

import unittest

import kfp.deprecated as kfp
import kfp_server_api
from ml_metadata.proto import Execution

from kfp.samples.test.utils import KfpTask, TaskInputs, TaskOutputs, TestCase, run_pipeline_func
from .pipeline_container_no_input import pipeline_container_no_input


def verify(t: unittest.TestCase, run: kfp_server_api.ApiRun,
tasks: dict[str, KfpTask], **kwargs):
t.assertEqual(run.status, 'Succeeded')
t.assertEqual(
{
'container-no-input':
KfpTask(
name='container-no-input',
type='system.ContainerExecution',
state=Execution.State.COMPLETE,
inputs=TaskInputs(parameters={}, artifacts=[]),
outputs=TaskOutputs(parameters={}, artifacts=[]))
},
tasks,
)


if __name__ == '__main__':
run_pipeline_func([
TestCase(
pipeline_func=pipeline_container_no_input,
verify_func=verify,
mode=kfp.dsl.PipelineExecutionMode.V2_ENGINE,
),
])
55 changes: 55 additions & 0 deletions samples/v2/two_step_pipeline_containerized.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Copyright 2022 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Two step pipeline using dsl.container_component decorator."""
import os

from kfp import compiler
from kfp.dsl import container_component
from kfp.dsl import ContainerSpec
from kfp.dsl import Dataset
from kfp.dsl import Input
from kfp.dsl import Output
from kfp.dsl import pipeline


@container_component
def component1(text: str, output_gcs: Output[Dataset]):
return ContainerSpec(
image='google/cloud-sdk:slim',
command=[
'sh -c | set -e -x', 'echo', text, '| gsutil cp -', output_gcs.uri
])


@container_component
def component2(input_gcs: Input[Dataset]):
return ContainerSpec(
image='google/cloud-sdk:slim',
command=['sh', '-c', '|', 'set -e -x gsutil cat'],
args=[input_gcs.path])


@pipeline(name='two-step-pipeline-containerized')
def two_step_pipeline_containerized():
component_1 = component1(text='hi').set_display_name('Producer')
component_2 = component2(input_gcs=component_1.outputs['output_gcs'])
component_2.set_display_name('Consumer')


if __name__ == '__main__':
# execute only if run as a script

compiler.Compiler().compile(
pipeline_func=two_step_pipeline_containerized,
package_path='two_step_pipeline_containerized.yaml')
86 changes: 86 additions & 0 deletions samples/v2/two_step_pipeline_containerized_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Copyright 2022 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Pipeline container no input v2 engine pipeline."""

from __future__ import annotations

import unittest

import kfp.deprecated as kfp
from kfp.samples.test.utils import KfpTask
from kfp.samples.test.utils import run_pipeline_func
from kfp.samples.test.utils import TaskInputs
from kfp.samples.test.utils import TaskOutputs
from kfp.samples.test.utils import TestCase
import kfp_server_api
from ml_metadata.proto import Execution

from .two_step_pipeline_containerized import two_step_pipeline_containerized


def verify(t: unittest.TestCase, run: kfp_server_api.ApiRun,
tasks: dict[str, KfpTask], **kwargs):
t.assertEqual(run.status, 'Succeeded')
component1_dict = tasks['component1'].get_dict()
component2_dict = tasks['component2'].get_dict()
for artifact in component1_dict.get('outputs').get('artifacts'):
# pop metadata here because the artifact which got re-imported may have metadata with uncertain data
if artifact.get('metadata') is not None:
artifact.pop('metadata')
for artifact in component2_dict.get('inputs').get('artifacts'):
# pop metadata here because the artifact which got re-imported may have metadata with uncertain data
if artifact.get('metadata') is not None:
artifact.pop('metadata')

t.assertEqual(
{
'name': 'component1',
'inputs': {
'parameters': {
'text': 'hi'
}
},
'outputs': {
'artifacts': [{
'name': 'output_gcs',
'type': 'system.Dataset'
}],
},
'type': 'system.ContainerExecution',
'state': Execution.State.COMPLETE,
}, component1_dict)

t.assertEqual(
{
'name': 'component2',
'inputs': {
'artifacts': [{
'name': 'input_gcs',
'type': 'system.Dataset'
}],
},
'outputs': {},
'type': 'system.ContainerExecution',
'state': Execution.State.COMPLETE,
}, component2_dict)


if __name__ == '__main__':
run_pipeline_func([
TestCase(
pipeline_func=two_step_pipeline_containerized,
verify_func=verify,
mode=kfp.dsl.PipelineExecutionMode.V2_ENGINE,
),
])
5 changes: 5 additions & 0 deletions sdk/python/kfp/compiler/_read_write_test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
'pipeline_with_task_final_status',
'pipeline_with_task_final_status_yaml',
'component_with_pip_index_urls',
'container_component_with_no_inputs',
'two_step_pipeline_containerized',
],
'test_data_dir': 'sdk/python/kfp/compiler/test_data/pipelines',
'config': {
Expand All @@ -60,6 +62,9 @@
'output_artifact',
'output_metrics',
'preprocess',
'container_no_input',
'container_io',
'container_with_artifact_output',
],
'test_data_dir': 'sdk/python/kfp/compiler/test_data/components',
'config': {
Expand Down
98 changes: 98 additions & 0 deletions sdk/python/kfp/compiler/compiler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,104 @@ def hello_world(text: str) -> str:
pipeline_spec['root']['inputDefinitions']['parameters']['text']
['defaultValue'], 'override_string')

def test_compile_container_component_simple(self):

@dsl.container_component
def hello_world_container() -> dsl.ContainerSpec:
"""Hello world component."""
return dsl.ContainerSpec(
image='python:3.7',
command=['echo', 'hello world'],
args=[],
)

with tempfile.TemporaryDirectory() as tempdir:
output_json = os.path.join(tempdir, 'component.yaml')
zichuan-scott-xu marked this conversation as resolved.
Show resolved Hide resolved
compiler.Compiler().compile(
pipeline_func=hello_world_container,
package_path=output_json,
pipeline_name='hello-world-container')
with open(output_json, 'r') as f:
pipeline_spec = yaml.safe_load(f)
self.assertEqual(
pipeline_spec['deploymentSpec']['executors']
['exec-hello-world-container']['container']['command'],
['echo', 'hello world'])

def test_compile_container_with_simple_io(self):

@dsl.container_component
def container_simple_io(text: str, output_path: dsl.OutputPath(str)):
return dsl.ContainerSpec(
image='python:3.7',
command=['my_program', text],
args=['--output_path', output_path])

with tempfile.TemporaryDirectory() as tempdir:
output_json = os.path.join(tempdir, 'component.yaml')
compiler.Compiler().compile(
pipeline_func=container_simple_io,
package_path=output_json,
pipeline_name='container-simple-io')
with open(output_json, 'r') as f:
pipeline_spec = yaml.safe_load(f)
self.assertEqual(
pipeline_spec['components']['comp-container-simple-io']
['inputDefinitions']['parameters']['text']['parameterType'],
'STRING')
self.assertEqual(
pipeline_spec['components']['comp-container-simple-io']
['outputDefinitions']['parameters']['output_path']['parameterType'],
'STRING')

def test_compile_container_with_artifact_output(self):

@dsl.container_component
def container_with_artifact_output(
num_epochs: int, # also as an input
model: dsl.Output[dsl.Model],
model_config_path: dsl.OutputPath(str),
):
return dsl.ContainerSpec(
image='gcr.io/my-image',
command=['sh', 'run.sh'],
args=[
'--epochs',
num_epochs,
'--model_path',
model.uri,
'--model_config_path',
model_config_path,
])

with tempfile.TemporaryDirectory() as tempdir:
output_yaml = os.path.join(tempdir, 'component.yaml')
compiler.Compiler().compile(
pipeline_func=container_with_artifact_output,
package_path=output_yaml,
pipeline_name='container-with-artifact-output')
with open(output_yaml, 'r') as f:
pipeline_spec = yaml.safe_load(f)
self.assertEqual(
zichuan-scott-xu marked this conversation as resolved.
Show resolved Hide resolved
pipeline_spec['components']['comp-container-with-artifact-output']
['inputDefinitions']['parameters']['num_epochs']['parameterType'],
'NUMBER_INTEGER')
self.assertEqual(
pipeline_spec['components']['comp-container-with-artifact-output']
['outputDefinitions']['artifacts']['model']['artifactType']
['schemaTitle'], 'system.Model')
self.assertEqual(
pipeline_spec['components']['comp-container-with-artifact-output']
['outputDefinitions']['parameters']['model_config_path']
['parameterType'], 'STRING')
args_to_check = pipeline_spec['deploymentSpec']['executors'][
'exec-container-with-artifact-output']['container']['args']
self.assertEqual(args_to_check[3],
"{{$.outputs.artifacts['model'].uri}}")
self.assertEqual(
args_to_check[5],
"{{$.outputs.parameters['model_config_path'].output_file}}")


class TestCompileBadInput(unittest.TestCase):

Expand Down
Loading