Skip to content

Commit

Permalink
fix(sdk.v2): Support dict, list, bool typed input parameters from con…
Browse files Browse the repository at this point in the history
…stant values and pipeline inputs. (#6523)

* fix dict list typed inputs support

* Support passing dict, list, bool typed parameter via client

* update release note

* fix test
  • Loading branch information
chensun authored Sep 8, 2021
1 parent 439d8c8 commit 0fba85c
Show file tree
Hide file tree
Showing 10 changed files with 122 additions and 42 deletions.
1 change: 1 addition & 0 deletions sdk/RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
## Bug Fixes and Other Changes

* Remove dead code on importer check in v1. [\#6508](https://github.com/kubeflow/pipelines/pull/6508)
* Fix issue where dict, list, bool typed input parameters don't accept constant values or pipeline inputs. [\#6523](https://github.com/kubeflow/pipelines/pull/6523)
* Depends on `kfp-pipeline-spec>=0.1.10,<0.2.0` [\#6515](https://github.com/kubeflow/pipelines/pull/6515)

## Documentation Updates
Expand Down
22 changes: 16 additions & 6 deletions sdk/python/kfp/dsl/_component_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import collections
import copy
import inspect
import json
import pathlib
from typing import Any, Mapping, Optional

Expand Down Expand Up @@ -577,20 +578,29 @@ def _resolve_ir_placeholders_v2(
elif isinstance(argument_value, int):
argument_type = 'Integer'
pipeline_task_spec.inputs.parameters[
input_name].runtime_value.constant_value.int_value = argument_value
input_name].runtime_value.constant_value.int_value = (
argument_value)
elif isinstance(argument_value, float):
argument_type = 'Float'
pipeline_task_spec.inputs.parameters[
input_name].runtime_value.constant_value.double_value = argument_value
input_name].runtime_value.constant_value.double_value = (
argument_value)
elif isinstance(argument_value,
(dict, list, bool)) and kfp.COMPILING_FOR_V2:
argument_type = type(argument_value).__name__
pipeline_task_spec.inputs.parameters[
input_name].runtime_value.constant_value.string_value = (
json.dumps(argument_value))
elif isinstance(argument_value, _container_op.ContainerOp):
raise TypeError(
'ContainerOp object {} was passed to component as an input argument. '
'Pass a single output instead.'.format(input_name))
f'ContainerOp object {input_name} was passed to component as an '
'input argument. Pass a single output instead.')
else:
if kfp.COMPILING_FOR_V2:
raise NotImplementedError(
'Input argument supports only the following types: PipelineParam'
', str, int, float. Got: "{}".'.format(argument_value))
'Input argument supports only the following types: '
'PipelineParam, str, int, float, bool, dict, and list. Got: '
f'"{argument_value}".')

argument_is_parameter_type = type_utils.is_parameter_type(argument_type)
input_is_parameter_type = type_utils.is_parameter_type(input_type)
Expand Down
23 changes: 12 additions & 11 deletions sdk/python/kfp/v2/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,20 @@
import json
import uuid
import warnings
from typing import Any, Callable, Dict, List, Mapping, Optional, Set, Tuple, Union

from google.protobuf import json_format
from typing import (Any, Callable, Dict, List, Mapping, Optional, Set, Tuple,
Union)

import kfp
from kfp.compiler._k8s_helper import sanitize_k8s_name
from google.protobuf import json_format
from kfp import dsl
from kfp.compiler._k8s_helper import sanitize_k8s_name
from kfp.dsl import _for_loop
from kfp.v2.compiler import compiler_utils
from kfp.dsl import component_spec as dsl_component_spec
from kfp.dsl import dsl_utils
from kfp.pipeline_spec import pipeline_spec_pb2
from kfp.v2.components.types import artifact_types, type_utils
from kfp.v2.compiler import compiler_utils
from kfp.v2.components import component_factory
from kfp.v2.components.types import artifact_types, type_utils

_GroupOrOp = Union[dsl.OpsGroup, dsl.BaseOp]

Expand Down Expand Up @@ -1094,11 +1094,12 @@ def _create_pipeline_v2(
break
if not type_utils.is_parameter_type(arg_type):
raise TypeError(
'The pipeline argument "{arg_name}" is viewed as an artifact due to '
'its type "{arg_type}". And we currently do not support passing '
'artifacts as pipeline inputs. Consider type annotating the argument'
' with a primitive type, such as "str", "int", and "float".'
.format(arg_name=arg_name, arg_type=arg_type))
'The pipeline argument "{arg_name}" is viewed as an artifact'
' due to its type "{arg_type}". And we currently do not '
'support passing artifacts as pipeline inputs. Consider type'
' annotating the argument with a primitive type, such as '
'"str", "int", "float", "bool", "dict", and "list".'.format(
arg_name=arg_name, arg_type=arg_type))
args_list.append(
dsl.PipelineParam(
sanitize_k8s_name(arg_name, True), param_type=arg_type))
Expand Down
9 changes: 5 additions & 4 deletions sdk/python/kfp/v2/compiler/compiler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,11 @@ def my_pipeline(input1):

with self.assertRaisesRegex(
TypeError,
'The pipeline argument \"input1\" is viewed as an artifact due to its '
'type \"None\". And we currently do not support passing artifacts as '
'pipeline inputs. Consider type annotating the argument with a primitive'
' type, such as \"str\", \"int\", and \"float\".'):
'The pipeline argument \"input1\" is viewed as an artifact due '
'to its type \"None\". And we currently do not support passing '
'artifacts as pipeline inputs. Consider type annotating the '
'argument with a primitive type, such as \"str\", \"int\", '
'\"float\", \"bool\", \"dict\", and \"list\".'):
compiler.Compiler().compile(
pipeline_func=my_pipeline, package_path='output.json')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@
"executorLabel": "exec-preprocess",
"inputDefinitions": {
"parameters": {
"input_dict_parameter": {
"type": "STRING"
},
"input_list_parameter": {
"type": "STRING"
},
"message": {
"type": "STRING"
}
Expand Down Expand Up @@ -101,11 +107,11 @@
"command": [
"sh",
"-c",
"(python3 -m ensurepip || python3 -m ensurepip --user) && (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.7.2' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.7.2' --user) && \"$0\" \"$@\"",
"(python3 -m ensurepip || python3 -m ensurepip --user) && (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.0' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.0' --user) && \"$0\" \"$@\"",
"sh",
"-ec",
"program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n",
"\nfrom kfp.v2.dsl import *\nfrom typing import *\n\ndef preprocess(\n # An input parameter of type string.\n message: str,\n # Use Output[T] to get a metadata-rich handle to the output artifact\n # of type `Dataset`.\n output_dataset_one: Output[Dataset],\n # A locally accessible filepath for another output artifact of type\n # `Dataset`.\n output_dataset_two_path: OutputPath('Dataset'),\n # A locally accessible filepath for an output parameter of type string.\n output_parameter_path: OutputPath(str),\n # A locally accessible filepath for an output parameter of type bool.\n output_bool_parameter_path: OutputPath(bool),\n # A locally accessible filepath for an output parameter of type dict.\n output_dict_parameter_path: OutputPath(Dict[str, int]),\n # A locally accessible filepath for an output parameter of type list.\n output_list_parameter_path: OutputPath(List[str]),\n):\n \"\"\"Dummy preprocessing step.\"\"\"\n\n # Use Dataset.path to access a local file path for writing.\n # One can also use Dataset.uri to access the actual URI file path.\n with open(output_dataset_one.path, 'w') as f:\n f.write(message)\n\n # OutputPath is used to just pass the local file path of the output artifact\n # to the function.\n with open(output_dataset_two_path, 'w') as f:\n f.write(message)\n\n with open(output_parameter_path, 'w') as f:\n f.write(message)\n\n with open(output_bool_parameter_path, 'w') as f:\n f.write(\n str(True)) # use either `str()` or `json.dumps()` for bool values.\n\n import json\n with open(output_dict_parameter_path, 'w') as f:\n f.write(json.dumps({'A': 1, 'B': 2}))\n\n with open(output_list_parameter_path, 'w') as f:\n f.write(json.dumps(['a', 'b', 'c']))\n\n"
"\nfrom kfp.v2.dsl import *\nfrom typing import *\n\ndef preprocess(\n # An input parameter of type string.\n message: str,\n # An input parameter of type dict.\n input_dict_parameter: Dict[str, int],\n # An input parameter of type list.\n input_list_parameter: List[str],\n # Use Output[T] to get a metadata-rich handle to the output artifact\n # of type `Dataset`.\n output_dataset_one: Output[Dataset],\n # A locally accessible filepath for another output artifact of type\n # `Dataset`.\n output_dataset_two_path: OutputPath('Dataset'),\n # A locally accessible filepath for an output parameter of type string.\n output_parameter_path: OutputPath(str),\n # A locally accessible filepath for an output parameter of type bool.\n output_bool_parameter_path: OutputPath(bool),\n # A locally accessible filepath for an output parameter of type dict.\n output_dict_parameter_path: OutputPath(Dict[str, int]),\n # A locally accessible filepath for an output parameter of type list.\n output_list_parameter_path: OutputPath(List[str]),\n):\n \"\"\"Dummy preprocessing step.\"\"\"\n\n # Use Dataset.path to access a local file path for writing.\n # One can also use Dataset.uri to access the actual URI file path.\n with open(output_dataset_one.path, 'w') as f:\n f.write(message)\n\n # OutputPath is used to just pass the local file path of the output artifact\n # to the function.\n with open(output_dataset_two_path, 'w') as f:\n f.write(message)\n\n with open(output_parameter_path, 'w') as f:\n f.write(message)\n\n with open(output_bool_parameter_path, 'w') as f:\n f.write(\n str(True)) # use either `str()` or `json.dumps()` for bool values.\n\n import json\n with open(output_dict_parameter_path, 'w') as f:\n f.write(json.dumps(input_dict_parameter))\n\n with open(output_list_parameter_path, 'w') as f:\n f.write(json.dumps(input_list_parameter))\n\n"
],
"image": "python:3.7"
}
Expand All @@ -121,7 +127,7 @@
"command": [
"sh",
"-c",
"(python3 -m ensurepip || python3 -m ensurepip --user) && (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.7.2' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.7.2' --user) && \"$0\" \"$@\"",
"(python3 -m ensurepip || python3 -m ensurepip --user) && (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.0' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.0' --user) && \"$0\" \"$@\"",
"sh",
"-ec",
"program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n",
Expand All @@ -147,6 +153,16 @@
},
"inputs": {
"parameters": {
"input_dict_parameter": {
"componentInputParameter": "input_dict"
},
"input_list_parameter": {
"runtimeValue": {
"constantValue": {
"stringValue": "[\"a\", \"b\", \"c\"]"
}
}
},
"message": {
"componentInputParameter": "message"
}
Expand Down Expand Up @@ -223,16 +239,24 @@
},
"inputDefinitions": {
"parameters": {
"input_dict": {
"type": "STRING"
},
"message": {
"type": "STRING"
}
}
}
},
"schemaVersion": "2.0.0",
"sdkVersion": "kfp-1.7.2"
"sdkVersion": "kfp-1.8.0"
},
"runtimeConfig": {
"gcsOutputDirectory": "dummy_root"
"gcsOutputDirectory": "dummy_root",
"parameters": {
"input_dict": {
"stringValue": "{\"A\": 1, \"B\": 2}"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,20 @@
"""Sample pipeline for passing data in KFP v2."""
from typing import Dict, List

from kfp.v2 import dsl
from kfp.v2.dsl import Input, InputPath, Output, OutputPath, Dataset, Model, component
import kfp.v2.compiler as compiler
from kfp.v2 import dsl
from kfp.v2.dsl import (Dataset, Input, InputPath, Model, Output, OutputPath,
component)


@component
def preprocess(
# An input parameter of type string.
message: str,
# An input parameter of type dict.
input_dict_parameter: Dict[str, int],
# An input parameter of type list.
input_list_parameter: List[str],
# Use Output[T] to get a metadata-rich handle to the output artifact
# of type `Dataset`.
output_dataset_one: Output[Dataset],
Expand Down Expand Up @@ -59,10 +64,10 @@ def preprocess(

import json
with open(output_dict_parameter_path, 'w') as f:
f.write(json.dumps({'A': 1, 'B': 2}))
f.write(json.dumps(input_dict_parameter))

with open(output_list_parameter_path, 'w') as f:
f.write(json.dumps(['a', 'b', 'c']))
f.write(json.dumps(input_list_parameter))


@component
Expand Down Expand Up @@ -111,9 +116,13 @@ def train(


@dsl.pipeline(pipeline_root='dummy_root', name='my-test-pipeline-beta')
def pipeline(message: str):
def pipeline(message: str, input_dict: Dict[str, int] = {'A': 1, 'B': 2}):

preprocess_task = preprocess(message=message)
preprocess_task = preprocess(
message=message,
input_dict_parameter=input_dict,
input_list_parameter=['a', 'b', 'c'],
)
train_task = train(
dataset_one_path=preprocess_task.outputs['output_dataset_one'],
dataset_two=preprocess_task.outputs['output_dataset_two_path'],
Expand Down
17 changes: 10 additions & 7 deletions sdk/python/kfp/v2/google/client/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,12 @@
import datetime
import json
import os
from typing import Any, Dict
import unittest
from typing import Any, Dict
from unittest import mock

from googleapiclient import discovery
from googleapiclient import http

from kfp.v2.google.client import client
from kfp.v2.google.client import client_utils
from googleapiclient import discovery, http
from kfp.v2.google.client import client, client_utils

# Mock response for get job request.
_EXPECTED_GET_RESPONSE = 'good job spec'
Expand Down Expand Up @@ -124,7 +121,10 @@ def test_job_id_parameters_override(self, mock_load_json,
job_spec_path='path/to/pipeline_job.json',
job_id='my-new-id',
pipeline_root='gs://bucket/new-blob',
parameter_values={'text': 'Hello test!'})
parameter_values={
'text': 'Hello test!',
'list': [1, 2, 3],
})

golden = _load_test_data('pipeline_job.json')
golden['name'] = ('projects/test-project/locations/us-central1/'
Expand All @@ -135,6 +135,9 @@ def test_job_id_parameters_override(self, mock_load_json,
golden['runtimeConfig']['parameters']['text'] = {
'stringValue': 'Hello test!'
}
golden['runtimeConfig']['parameters']['list'] = {
'stringValue': '[1, 2, 3]'
}
mock_submit.assert_called_once_with(
job_spec=golden, job_id='my-new-id')

Expand Down
5 changes: 5 additions & 0 deletions sdk/python/kfp/v2/google/client/runtime_config_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""Builder for CAIP pipelines Pipeline level proto spec."""

import copy
import json
from typing import Any, Dict, Mapping, Optional, Union


Expand Down Expand Up @@ -79,6 +80,10 @@ def update_runtime_parameters(
Args:
parameter_values: The mapping from runtime parameter names to its values.
"""
if parameter_values:
for k, v in parameter_values.items():
if isinstance(v, (dict, list, bool)):
parameter_values[k] = json.dumps(v)
if parameter_values:
self._parameter_values.update(parameter_values)

Expand Down
29 changes: 26 additions & 3 deletions sdk/python/kfp/v2/google/client/runtime_config_builder_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
# limitations under the License.
"""Tests for kfp.v2.google.client.runtime_config_builder."""

import frozendict
import unittest

import frozendict
from kfp.v2.google.client import runtime_config_builder


Expand All @@ -37,7 +37,16 @@ class RuntimeConfigBuilderTest(unittest.TestCase):
},
'new_param': {
'type': 'STRING'
}
},
'bool_param': {
'type': 'STRING'
},
'dict_param': {
'type': 'STRING'
},
'list_param': {
'type': 'STRING'
},
}
}
}
Expand Down Expand Up @@ -103,7 +112,12 @@ def testBuildRuntimeConfigWithMergeUpdates(self):
my_builder.update_pipeline_root('path/to/my/new/root')
my_builder.update_runtime_parameters({
'int_param': 888,
'new_param': 'new-string'
'new_param': 'new-string',
'dict_param': {
'a': 1
},
'list_param': [1, 2, 3],
'bool_param': True,
})
actual_runtime_config = my_builder.build()

Expand All @@ -122,6 +136,15 @@ def testBuildRuntimeConfigWithMergeUpdates(self):
'new_param': {
'stringValue': 'new-string'
},
'dict_param': {
'stringValue': '{"a": 1}'
},
'list_param': {
'stringValue': '[1, 2, 3]'
},
'bool_param': {
'stringValue': 'true'
},
}
}
self.assertEqual(expected_runtime_config, actual_runtime_config)
Expand Down
3 changes: 3 additions & 0 deletions sdk/python/kfp/v2/google/client/testdata/pipeline_job.json
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,9 @@
"parameters": {
"text": {
"type": "STRING"
},
"list": {
"type": "STRING"
}
}
}
Expand Down

0 comments on commit 0fba85c

Please sign in to comment.