Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk): support optional artifact inputs #8623

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

## Bug fixes and other changes
* Fully support optional parameter inputs by witing `isOptional` field to IR [\#8612](https://github.com/kubeflow/pipelines/pull/8612)
* Add support for optional artifact inputs (toward feature parity with KFP SDK v1) [\#8623](https://github.com/kubeflow/pipelines/pull/8623)

## Documentation updates
# 2.0.0-beta.9
Expand Down
208 changes: 208 additions & 0 deletions sdk/python/kfp/compiler/compiler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from kfp.cli import cli
from kfp.compiler import compiler
from kfp.components.types import type_utils
from kfp.dsl import Artifact
from kfp.dsl import ContainerSpec
from kfp.dsl import Input
from kfp.dsl import Model
Expand Down Expand Up @@ -2443,6 +2444,213 @@ def my_pipeline():
my_pipeline.pipeline_spec.components['comp-inner-pipeline']
.input_definitions.parameters['x'].is_optional)

def test__component__artifact(self):

@dsl.component
def comp(x: Optional[Input[Artifact]] = None):
print(x)

@dsl.pipeline
def my_pipeline():
comp()

self.assertTrue(my_pipeline.pipeline_spec.components['comp-comp']
.input_definitions.artifacts['x'].is_optional)

with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, 'comp.yaml')
compiler.Compiler().compile(comp, path)
loaded_comp = components.load_component_from_file(path)

@dsl.pipeline
def my_pipeline():
loaded_comp()

self.assertTrue(my_pipeline.pipeline_spec.components['comp-comp']
.input_definitions.artifacts['x'].is_optional)

def test__pipeline__artifact(self):

@dsl.component
def comp(x: Optional[Input[Artifact]] = None):
print(x)

@dsl.pipeline
def inner_pipeline(x: Optional[Input[Artifact]] = None):
comp(x=x)

@dsl.pipeline
def my_pipeline():
inner_pipeline()

self.assertTrue(my_pipeline.pipeline_spec.components['comp-comp']
.input_definitions.artifacts['x'].is_optional)

with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, 'comp.yaml')
compiler.Compiler().compile(comp, path)
loaded_comp = components.load_component_from_file(path)

@dsl.pipeline
def my_pipeline():
loaded_comp()

self.assertTrue(my_pipeline.pipeline_spec.components['comp-comp']
.input_definitions.artifacts['x'].is_optional)


class TestCompileOptionalArtifacts(unittest.TestCase):

def test_python_comp(self):

@dsl.component
def comp(x: Optional[Input[Artifact]] = None):
print(x)

artifact_spec_from_root = comp.pipeline_spec.root.input_definitions.artifacts[
'x']
self.assertTrue(artifact_spec_from_root.is_optional)

artifact_spec_from_comp = comp.pipeline_spec.components[
'comp-comp'].input_definitions.artifacts['x']
self.assertTrue(artifact_spec_from_comp.is_optional)

def test_python_comp_with_model(self):

@dsl.component
def comp(x: Optional[Input[Model]] = None):
print(x)

artifact_spec_from_root = comp.pipeline_spec.root.input_definitions.artifacts[
'x']
self.assertTrue(artifact_spec_from_root.is_optional)

artifact_spec_from_comp = comp.pipeline_spec.components[
'comp-comp'].input_definitions.artifacts['x']
self.assertTrue(artifact_spec_from_comp.is_optional)

def test_python_comp_without_optional_type_modifier(self):

@dsl.component
def comp(x: Input[Model] = None):
print(x)

artifact_spec_from_root = comp.pipeline_spec.root.input_definitions.artifacts[
'x']
self.assertTrue(artifact_spec_from_root.is_optional)

artifact_spec_from_comp = comp.pipeline_spec.components[
'comp-comp'].input_definitions.artifacts['x']
self.assertTrue(artifact_spec_from_comp.is_optional)

def test_container_comp(self):

@dsl.container_component
def comp(x: Optional[Input[Artifact]] = None):
return dsl.ContainerSpec(
image='alpine',
command=[
dsl.IfPresentPlaceholder(
input_name='x',
then=['echo', x.uri],
else_=['echo', 'No artifact provided!'])
])

artifact_spec_from_root = comp.pipeline_spec.root.input_definitions.artifacts[
'x']
self.assertTrue(artifact_spec_from_root.is_optional)

artifact_spec_from_comp = comp.pipeline_spec.components[
'comp-comp'].input_definitions.artifacts['x']
self.assertTrue(artifact_spec_from_comp.is_optional)

def test_pipeline(self):

@dsl.component
def comp():
print('hello')

@dsl.pipeline
def my_pipeline(x: Optional[Input[Artifact]] = None):
comp()

artifact_spec_from_root = my_pipeline.pipeline_spec.root.input_definitions.artifacts[
'x']
self.assertTrue(artifact_spec_from_root.is_optional)

def test_pipeline_without_optional_type_modifier(self):

@dsl.component
def comp():
print('hello')

@dsl.pipeline
def my_pipeline(x: Input[Artifact] = None):
comp()

artifact_spec_from_root = my_pipeline.pipeline_spec.root.input_definitions.artifacts[
'x']
self.assertTrue(artifact_spec_from_root.is_optional)

def test_pipeline_and_inner_component_together(self):

@dsl.component
def comp(x: Optional[Input[Model]] = None):
print(x)

@dsl.pipeline
def my_pipeline(x: Optional[Input[Artifact]] = None):
comp()

artifact_spec_from_root = my_pipeline.pipeline_spec.root.input_definitions.artifacts[
'x']
self.assertTrue(artifact_spec_from_root.is_optional)

artifact_spec_from_comp = my_pipeline.pipeline_spec.components[
'comp-comp'].input_definitions.artifacts['x']
self.assertTrue(artifact_spec_from_comp.is_optional)

def test_invalid_default_comp(self):
with self.assertRaisesRegex(
ValueError,
'Optional Input artifacts may only have default value None'):

@dsl.component
def comp(x: Optional[Input[Model]] = 1):
print(x)

with self.assertRaisesRegex(
ValueError,
'Optional Input artifacts may only have default value None'):

@dsl.component
def comp(x: Optional[Input[Model]] = Model(
name='', uri='', metadata={})):
print(x)

def test_invalid_default_pipeline(self):

@dsl.component
def comp():
print('hello')

with self.assertRaisesRegex(
ValueError,
'Optional Input artifacts may only have default value None'):

@dsl.pipeline
def my_pipeline(x: Input[Artifact] = 1):
comp()

with self.assertRaisesRegex(
ValueError,
'Optional Input artifacts may only have default value None'):

@dsl.pipeline
def my_pipeline(x: Input[Artifact] = Artifact(
name='', uri='', metadata={})):
comp()


if __name__ == '__main__':
unittest.main()
21 changes: 5 additions & 16 deletions sdk/python/kfp/compiler/pipeline_spec_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,34 +337,20 @@ def build_component_spec_for_task(
f'PipelineTaskFinalStatus can only be used in an exit task. Parameter {input_name} of a non exit task has type PipelineTaskFinalStatus.'
)

unprovided_artifact_inputs = []
for input_name, input_spec in (task.component_spec.inputs or {}).items():
if not type_utils.is_parameter_type(
input_spec.type) and input_name not in task.inputs:
unprovided_artifact_inputs.append(input_name)

component_spec = _build_component_spec_from_component_spec_structure(
task.component_spec, unprovided_artifact_inputs)
task.component_spec)
component_spec.executor_label = utils.sanitize_executor_label(task.name)
return component_spec


def _build_component_spec_from_component_spec_structure(
component_spec_struct: structures.ComponentSpec,
unprovided_artifact_inputs: Optional[List[str]] = None,
component_spec_struct: structures.ComponentSpec
) -> pipeline_spec_pb2.ComponentSpec:
"""Builds ComponentSpec proto from ComponentSpec structure."""
# TODO: remove unprovided_artifact_inputs from interface and all downstream logic when supporting optional artifact inputs
unprovided_artifact_inputs = unprovided_artifact_inputs or []

component_spec = pipeline_spec_pb2.ComponentSpec()

for input_name, input_spec in (component_spec_struct.inputs or {}).items():

# skip inputs not present, as a workaround to support optional inputs.
if input_name in unprovided_artifact_inputs and input_spec.default is None:
continue

# Special handling for PipelineTaskFinalStatus first.
if type_utils.is_task_final_status_type(input_spec.type):
component_spec.input_definitions.parameters[
Expand All @@ -390,6 +376,9 @@ def _build_component_spec_from_component_spec_structure(
input_name].artifact_type.CopyFrom(
type_utils.bundled_artifact_to_artifact_proto(
input_spec.type))
if input_spec.optional:
component_spec.input_definitions.artifacts[
input_name].is_optional = True

for output_name, output_spec in (component_spec_struct.outputs or
{}).items():
Expand Down
18 changes: 14 additions & 4 deletions sdk/python/kfp/components/component_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,18 @@ def extract_component_interface(
)

if parameter.default is not inspect.Parameter.empty:
raise ValueError(
'Default values for Input/Output artifacts are not supported.'
)
if passing_style in [
type_annotations.OutputAnnotation,
type_annotations.OutputPath,
]:
raise ValueError(
'Default values for Output artifacts are not supported.'
)
elif parameter.default is not None:
raise ValueError(
f'Optional Input artifacts may only have default value None. Got: {parameter.default}.'
)

elif isinstance(
parameter_type,
(type_annotations.InputPath, type_annotations.OutputPath)):
Expand Down Expand Up @@ -229,7 +238,8 @@ def extract_component_interface(
type_struct, parameter_type.schema_version
) if type_annotations.is_artifact_class(
parameter_type) else type_struct
default = None if parameter.default == inspect.Parameter.empty else parameter.default
default = None if parameter.default == inspect.Parameter.empty or type_annotations.is_artifact_class(
parameter_type) else parameter.default
optional = parameter.default is not inspect.Parameter.empty or type_utils.is_task_final_status_type(
type_struct)
input_spec = structures.InputSpec(
Expand Down
15 changes: 15 additions & 0 deletions sdk/python/kfp/components/executor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,21 @@ def test_func(
},
})

def test_function_with_optional_input_artifact(self):
executor_input = """\
{
"inputs": {},
"outputs": {
"outputFile": "%(test_dir)s/output_metadata.json"
}
}
"""

def test_func(a: Optional[Input[Artifact]] = None):
self.assertIsNone(a)

self.execute(test_func, executor_input)

def test_function_with_pipeline_task_final_status(self):
executor_input = """\
{
Expand Down
6 changes: 5 additions & 1 deletion sdk/python/kfp/components/structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,13 @@ def from_ir_component_inputs_dict(
type_ = ir_component_inputs_dict['artifactType']['schemaTitle']
schema_version = ir_component_inputs_dict['artifactType'][
'schemaVersion']
# TODO: would be better to extract these fields from the proto
# message, as False default would be preserved
optional = ir_component_inputs_dict.get('isOptional', False)
return InputSpec(
type=type_utils.create_bundled_artifact_type(
type_, schema_version))
type_, schema_version),
optional=optional)

def __eq__(self, other: Any) -> bool:
"""Equality comparison for InputSpec. Robust to different type
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ google-cloud-storage>=2.2.1,<3
# NOTE: Maintainers, please do not require google-auth>=2.x.x
# Until this issue is closed
# https://github.com/googleapis/google-cloud-python/issues/10566
kfp-pipeline-spec>=0.1.16,<0.2.0
kfp-pipeline-spec>=0.1.17,<0.2.0
# Update the upper version whenever a new major version of the
# kfp-server-api package is released.
# Update the lower version when kfp sdk depends on new apis/fields in
Expand Down
6 changes: 3 additions & 3 deletions sdk/python/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#
# This file is autogenerated by pip-compile with python 3.7
# To update, run:
# This file is autogenerated by pip-compile with Python 3.7
# by the following command:
#
# pip-compile --no-emit-index-url requirements.in
#
Expand Down Expand Up @@ -59,7 +59,7 @@ importlib-metadata==4.12.0
# jsonschema
jsonschema==3.2.0
# via -r requirements.in
kfp-pipeline-spec==0.1.16
kfp-pipeline-spec==0.1.17
# via -r requirements.in
kfp-server-api==2.0.0a4
# via -r requirements.in
Expand Down
Loading