diff --git a/sdk/python/kfp/local/docker_task_handler_test.py b/sdk/python/kfp/local/docker_task_handler_test.py index 8fa7ab5f1d5..06e3e8a18c0 100755 --- a/sdk/python/kfp/local/docker_task_handler_test.py +++ b/sdk/python/kfp/local/docker_task_handler_test.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import os +from typing import Optional import unittest from unittest import mock @@ -159,17 +160,20 @@ def test_latest_tag(self): def test_no_tag(self): actual = docker_task_handler.add_latest_tag_if_not_present( - image='alpine:123') - expected = 'alpine:123' + image='alpine:3.19.0') + expected = 'alpine:3.19.0' self.assertEqual(actual, expected) class TestE2E(DockerMockTestCase, testing_utilities.LocalRunnerEnvironmentTestCase): - def test_python(self): + def setUp(self): + super().setUp() local.init(runner=local.DockerRunner()) + def test_python(self): + @dsl.component def artifact_maker(x: str, a: Output[Artifact]): with open(a.path, 'w') as f: @@ -200,7 +204,6 @@ def artifact_maker(x: str, a: Output[Artifact]): self.assertEqual(kwargs['volumes'][root_vol_key]['mode'], 'rw') def test_empty_container_component(self): - local.init(runner=local.DockerRunner()) @dsl.container_component def comp(): @@ -222,7 +225,6 @@ def comp(): self.assertEqual(kwargs['command'], []) def test_container_component(self): - local.init(runner=local.DockerRunner()) @dsl.container_component def artifact_maker(x: str,): @@ -258,6 +260,272 @@ def artifact_maker(x: str,): self.assertEqual(kwargs['volumes'][root_vol_key]['bind'], root_vol_key) self.assertEqual(kwargs['volumes'][root_vol_key]['mode'], 'rw') + def test_if_present_with_string_omitted(self): + + @dsl.container_component + def comp(x: Optional[str] = None): + return dsl.ContainerSpec( + image='alpine:3.19.0', + command=[ + dsl.IfPresentPlaceholder( + input_name='x', + then=['echo', x], + else_=['echo', 'No input provided!']) + ]) + + comp() + + run_mock = self.mocked_docker_client.containers.run + run_mock.assert_called_once() + kwargs = run_mock.call_args[1] + self.assertEqual( + kwargs['image'], + 'alpine:3.19.0', + ) + self.assertEqual(kwargs['command'], [ + 'echo', + 'No input provided!', + ]) + + def test_if_present_with_string_provided(self): + + @dsl.container_component + def comp(x: Optional[str] = None): + return dsl.ContainerSpec( + image='alpine:3.19.0', + command=[ + dsl.IfPresentPlaceholder( + input_name='x', + then=['echo', x], + else_=['echo', 'No artifact provided!']) + ]) + + comp(x='foo') + + run_mock = self.mocked_docker_client.containers.run + run_mock.assert_called_once() + kwargs = run_mock.call_args[1] + self.assertEqual( + kwargs['image'], + 'alpine:3.19.0', + ) + self.assertEqual(kwargs['command'], [ + 'echo', + 'foo', + ]) + + def test_if_present_single_element_with_string_omitted(self): + + @dsl.container_component + def comp(x: Optional[str] = None): + return dsl.ContainerSpec( + image='alpine:3.19.0', + command=[ + 'echo', + dsl.IfPresentPlaceholder( + input_name='x', + then=x, + else_='No artifact provided!', + ) + ]) + + comp() + + run_mock = self.mocked_docker_client.containers.run + run_mock.assert_called_once() + kwargs = run_mock.call_args[1] + self.assertEqual( + kwargs['image'], + 'alpine:3.19.0', + ) + self.assertEqual(kwargs['command'], [ + 'echo', + 'No artifact provided!', + ]) + + def test_if_present_single_element_with_string_provided(self): + + @dsl.container_component + def comp(x: Optional[str] = None): + return dsl.ContainerSpec( + image='alpine:3.19.0', + command=[ + 'echo', + dsl.IfPresentPlaceholder( + input_name='x', + then=x, + else_='No artifact provided!', + ) + ]) + + comp(x='foo') + + run_mock = self.mocked_docker_client.containers.run + run_mock.assert_called_once() + kwargs = run_mock.call_args[1] + self.assertEqual( + kwargs['image'], + 'alpine:3.19.0', + ) + self.assertEqual(kwargs['command'], [ + 'echo', + 'foo', + ]) + + def test_concat_placeholder(self): + + @dsl.container_component + def comp(x: Optional[str] = None): + return dsl.ContainerSpec( + image='alpine', + command=[dsl.ConcatPlaceholder(['prefix-', x, '-suffix'])]) + + comp() + + run_mock = self.mocked_docker_client.containers.run + run_mock.assert_called_once() + kwargs = run_mock.call_args[1] + self.assertEqual( + kwargs['image'], + 'alpine:latest', + ) + self.assertEqual(kwargs['command'], ['prefix-null-suffix']) + + def test_nested_concat_placeholder(self): + + @dsl.container_component + def comp(x: Optional[str] = None): + return dsl.ContainerSpec( + image='alpine', + command=[ + 'echo', + dsl.ConcatPlaceholder( + ['a', dsl.ConcatPlaceholder(['b', x, 'd'])]) + ]) + + comp(x='c') + + run_mock = self.mocked_docker_client.containers.run + run_mock.assert_called_once() + kwargs = run_mock.call_args[1] + self.assertEqual( + kwargs['image'], + 'alpine:latest', + ) + self.assertEqual(kwargs['command'], ['echo', 'abcd']) + + def test_ifpresent_in_concat_provided(self): + + @dsl.container_component + def comp(x: Optional[str] = None): + return dsl.ContainerSpec( + image='alpine', + command=[ + 'echo', + dsl.ConcatPlaceholder([ + 'there ', + dsl.ConcatPlaceholder([ + 'is ', + dsl.IfPresentPlaceholder( + input_name='x', + then='one thing', + else_='another thing') + ]) + ]) + ]) + + comp(x='c') + + run_mock = self.mocked_docker_client.containers.run + run_mock.assert_called_once() + kwargs = run_mock.call_args[1] + self.assertEqual( + kwargs['image'], + 'alpine:latest', + ) + self.assertEqual(kwargs['command'], ['echo', 'there is one thing']) + + def test_ifpresent_in_concat_omitted(self): + + @dsl.container_component + def comp(x: Optional[str] = None): + return dsl.ContainerSpec( + image='alpine', + command=[ + 'echo', + dsl.ConcatPlaceholder([ + 'there ', + dsl.ConcatPlaceholder([ + 'is ', + dsl.IfPresentPlaceholder( + input_name='x', + then='one thing', + else_='another thing') + ]) + ]) + ]) + + comp() + + run_mock = self.mocked_docker_client.containers.run + run_mock.assert_called_once() + kwargs = run_mock.call_args[1] + self.assertEqual( + kwargs['image'], + 'alpine:latest', + ) + self.assertEqual(kwargs['command'], ['echo', 'there is another thing']) + + def test_concat_in_ifpresent_provided(self): + + @dsl.container_component + def comp(x: Optional[str] = None): + return dsl.ContainerSpec( + image='alpine', + command=[ + 'echo', + dsl.IfPresentPlaceholder( + input_name='x', + then=dsl.ConcatPlaceholder([x]), + else_=dsl.ConcatPlaceholder(['something', ' ', 'else'])) + ]) + + comp(x='something') + + run_mock = self.mocked_docker_client.containers.run + run_mock.assert_called_once() + kwargs = run_mock.call_args[1] + self.assertEqual( + kwargs['image'], + 'alpine:latest', + ) + self.assertEqual(kwargs['command'], ['echo', 'something']) + + def test_concat_in_ifpresent_omitted(self): + + @dsl.container_component + def comp(x: Optional[str] = None): + return dsl.ContainerSpec( + image='alpine', + command=[ + 'echo', + dsl.IfPresentPlaceholder( + input_name='x', + then=dsl.ConcatPlaceholder([x]), + else_=dsl.ConcatPlaceholder(['another', ' ', 'thing'])) + ]) + + comp() + + run_mock = self.mocked_docker_client.containers.run + run_mock.assert_called_once() + kwargs = run_mock.call_args[1] + self.assertEqual( + kwargs['image'], + 'alpine:latest', + ) + self.assertEqual(kwargs['command'], ['echo', 'another thing']) + if __name__ == '__main__': unittest.main() diff --git a/sdk/python/kfp/local/placeholder_utils.py b/sdk/python/kfp/local/placeholder_utils.py index c84422b3968..3333fff6e5e 100644 --- a/sdk/python/kfp/local/placeholder_utils.py +++ b/sdk/python/kfp/local/placeholder_utils.py @@ -15,7 +15,7 @@ import json import random import re -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union from kfp import dsl @@ -35,8 +35,17 @@ def replace_placeholders( """Iterates over each element in the command and replaces placeholders.""" unique_pipeline_id = make_random_id() unique_task_id = make_random_id() - return [ - replace_placeholder_for_element( + provided_inputs = get_provided_inputs(executor_input_dict) + full_command = [ + resolve_struct_placeholders( + placeholder, + provided_inputs, + ) for placeholder in full_command + ] + full_command = flatten_list(full_command) + resolved_command = [] + for el in full_command: + resolved_el = resolve_individual_placeholder( element=el, executor_input_dict=executor_input_dict, pipeline_resource_name=pipeline_resource_name, @@ -44,8 +53,40 @@ def replace_placeholders( pipeline_root=pipeline_root, pipeline_job_id=unique_pipeline_id, pipeline_task_id=unique_task_id, - ) for el in full_command - ] + ) + if resolved_el is None: + continue + elif isinstance(resolved_el, str): + resolved_command.append(resolved_el) + elif isinstance(resolved_el, list): + resolved_command.extend(resolved_el) + else: + raise ValueError( + f'Got unknown command element {resolved_el} of type {type(resolved_el)}.' + ) + return resolved_command + + +def flatten_list(l: List[Union[str, list, None]]) -> List[str]: + """Iteratively flattens arbitrarily deeply nested lists, filtering out + elements that are None.""" + result = [] + stack = l.copy() + while stack: + element = stack.pop(0) + if isinstance(element, list): + stack = element + stack + elif element is not None: + result.append(element) + return result + + +def get_provided_inputs(executor_input_dict: Dict[str, Any]) -> Dict[str, Any]: + params = executor_input_dict.get('inputs', {}).get('parameterValues', {}) + pkeys = [k for k, v in params.items() if v is not None] + artifacts = executor_input_dict.get('inputs', {}).get('artifacts', {}) + akeys = [k for k, v in artifacts.items() if v is not None] + return pkeys + akeys def get_value_using_path( @@ -110,16 +151,74 @@ def resolve_io_placeholders( # e.g., path = ['inputs', 'parameterValues', 'text'] value = get_value_using_path(executor_input, path) - if value is not None: - if not isinstance(value, str): - value = json.dumps(value) - command = command.replace('{{$.' + placeholder + '}}', value) + if not isinstance(value, str): + # even if value is None, should json.dumps to null + # and still resolve placeholder + value = json.dumps(value) + command = command.replace('{{$.' + placeholder + '}}', value) return command -# TODO: support concat and if-present placeholders -def replace_placeholder_for_element( +def resolve_struct_placeholders( + placeholder: str, + provided_inputs: List[str], +) -> List[Any]: + """Resolves IfPresent and Concat placeholders to an arbitrarily deeply + nested list of strings, which may contain None.""" + + # throughout, filter out None for the case where IfPresent False and no else + def filter_none(l: List[Any]) -> List[Any]: + return [e for e in l if e is not None] + + def recursively_resolve_struct(placeholder: Dict[str, Any]) -> str: + if isinstance(placeholder, str): + return placeholder + elif isinstance(placeholder, list): + raise ValueError( + f"You have an incorrectly nested {dsl.IfPresentPlaceholder!r} with a list provided for 'then' or 'else'." + ) + + first_key = list(placeholder.keys())[0] + if first_key == 'Concat': + concat = [ + recursively_resolve_struct(p) for p in placeholder['Concat'] + ] + return ''.join(filter_none(concat)) + elif first_key == 'IfPresent': + inner_struct = placeholder['IfPresent'] + if inner_struct['InputName'] in provided_inputs: + then = inner_struct['Then'] + if isinstance(then, str): + return then + elif isinstance(then, list): + return filter_none( + [recursively_resolve_struct(p) for p in then]) + elif isinstance(then, dict): + return recursively_resolve_struct(then) + else: + else_ = inner_struct.get('Else') + if else_ is None: + return else_ + if isinstance(else_, str): + return else_ + elif isinstance(else_, list): + return filter_none( + [recursively_resolve_struct(p) for p in else_]) + elif isinstance(else_, dict): + return recursively_resolve_struct(else_) + else: + raise ValueError + + if placeholder.startswith('{"Concat": ') or placeholder.startswith( + '{"IfPresent": '): + des_placeholder = json.loads(placeholder) + return recursively_resolve_struct(des_placeholder) + else: + return placeholder + + +def resolve_individual_placeholder( element: str, executor_input_dict: Dict[str, Any], pipeline_resource_name: str, @@ -129,6 +228,7 @@ def replace_placeholder_for_element( pipeline_task_id: str, ) -> str: """Replaces placeholders for a single element.""" + # match on literal for constant placeholders PLACEHOLDERS = { r'{{$.outputs.output_file}}': executor_input_dict['outputs']['outputFile'], @@ -147,10 +247,8 @@ def replace_placeholder_for_element( dsl.PIPELINE_ROOT_PLACEHOLDER: pipeline_root, } - - # match on literal for constant placeholders for placeholder, value in PLACEHOLDERS.items(): element = element.replace(placeholder, value) - # match differently for non-constant placeholders (i.e., have key(s)) + # match non-constant placeholders (i.e., have key(s)) return resolve_io_placeholders(executor_input_dict, element) diff --git a/sdk/python/kfp/local/placeholder_utils_test.py b/sdk/python/kfp/local/placeholder_utils_test.py index 76d4a5d0d36..7ecd71dfa07 100644 --- a/sdk/python/kfp/local/placeholder_utils_test.py +++ b/sdk/python/kfp/local/placeholder_utils_test.py @@ -14,6 +14,7 @@ """Tests for placeholder_utils.py.""" import json +from typing import List, Optional import unittest from absl.testing import parameterized @@ -92,7 +93,7 @@ def test(self): self.assertEqual(actual, expected) -class TestReplacePlaceholderForElement(parameterized.TestCase): +class TestResolveIndividualPlaceholder(parameterized.TestCase): # TODO: consider supporting JSON escape # TODO: update when input artifact constants supported @@ -132,7 +133,7 @@ class TestReplacePlaceholderForElement(parameterized.TestCase): ), ]) def test_constant_placeholders(self, element: str, expected: str): - actual = placeholder_utils.replace_placeholder_for_element( + actual = placeholder_utils.resolve_individual_placeholder( element=element, executor_input_dict=EXECUTOR_INPUT_DICT, pipeline_resource_name='my-pipeline-2023-10-10-13-32-59-420710', @@ -159,7 +160,7 @@ def test_constant_placeholders(self, element: str, expected: str): ]) def test_concatenated_placeholders_resolve(self, element: str, expected: str): - actual = placeholder_utils.replace_placeholder_for_element( + actual = placeholder_utils.resolve_individual_placeholder( element=element, executor_input_dict=EXECUTOR_INPUT_DICT, pipeline_resource_name='my-pipeline-2023-10-10-13-32-59-420710', @@ -175,6 +176,10 @@ def test_concatenated_placeholders_resolve(self, element: str, "{{$.inputs.parameters[''boolean'']}}", json.dumps(False), ), + ( + "{{$.inputs.parameters[''not_present'']}}", + json.dumps(None), + ), ( "{{$.outputs.artifacts[''out_a''].metadata}}", json.dumps({'foo': { @@ -199,7 +204,7 @@ def test_concatenated_placeholders_resolve(self, element: str, ), ]) def test_io_placeholders(self, element: str, expected: str): - actual = placeholder_utils.replace_placeholder_for_element( + actual = placeholder_utils.resolve_individual_placeholder( element=element, executor_input_dict=EXECUTOR_INPUT_DICT, pipeline_resource_name='my-pipeline-2023-10-10-13-32-59-420710', @@ -215,6 +220,10 @@ def test_io_placeholders(self, element: str, expected: str): "my-prefix-{{$.inputs.parameters[''boolean'']}}-suffix", 'my-prefix-false-suffix', ), + ( + "--param={{$.inputs.parameters[''not_present'']}}", + '--param=null', + ), ( "prefix{{$.outputs.parameters[''Output''].output_file}}/suffix", 'prefix/foo/bar/my-pipeline-2023-10-10-13-32-59-420710/comp/Output/suffix', @@ -226,7 +235,7 @@ def test_io_placeholders(self, element: str, expected: str): ]) def test_io_placeholder_with_string_concat(self, element: str, expected: str): - actual = placeholder_utils.replace_placeholder_for_element( + actual = placeholder_utils.resolve_individual_placeholder( element=element, executor_input_dict=EXECUTOR_INPUT_DICT, pipeline_resource_name='my-pipeline-2023-10-10-13-32-59-420710', @@ -268,5 +277,102 @@ def test_empty_path(self): placeholder_utils.get_value_using_path({'a': 20}, []) +class TestResolveStructPlaceholders(parameterized.TestCase): + + @parameterized.parameters([ + ( + """{"Concat": ["a", "b", "c"]}""", + [], + 'abc', + ), + ( + """{"Concat": ["prefix", "-", "{{$.outputs.artifacts[''x''].uri}}"]}""", + [], + "prefix-{{$.outputs.artifacts[''x''].uri}}", + ), + ( + """{"Concat": ["a", {"Concat": ["b", "c"]}]}""", + [], + 'abc', + ), + ( + """{"IfPresent": {"InputName": "x", "Then": ["foo"], "Else": ["bar"]}}""", + [], + ['bar'], + ), + ( + """{"IfPresent": {"InputName": "x", "Then": ["foo"], "Else": ["bar"]}}""", + ['x'], + ['foo'], + ), + ( + """{"Concat": ["a", {"Concat": ["b", {"Concat": ["c", "{{$.inputs.parameters[''input2'']}}"]}]}]}""", + [], + "abc{{$.inputs.parameters[''input2'']}}", + ), + ( + """{"Concat": ["a", {"Concat": ["b", {"IfPresent": {"InputName": "foo", "Then": "c", "Else": "d"}}]}]}""", + [], + 'abd', + ), + ( + """{"Concat": ["--flag", {"Concat": ["=", {"IfPresent": {"InputName": "x", "Then": "thing", "Else": "otherwise"}}]}]}""", + ['x'], + '--flag=thing', + ), + ( + """{"Concat": ["a", {"IfPresent": {"InputName": "foo", "Then": {"Concat": ["--", "flag", "{{$.inputs.artifacts['input2'].path}}"]}, "Else": "b"}}, "c"]}""", + [], + 'abc', + ), + ( + """{"Concat": ["--flag", {"IfPresent": {"InputName": "foo", "Then": {"Concat": ["=", "{{$.inputs.artifacts['input2'].path}}"]}, "Else": "b"}}, "-suffix"]}""", + ['foo'], + "--flag={{$.inputs.artifacts['input2'].path}}-suffix", + ), + ( + """{"Concat": ["a-", {"IfPresent": {"InputName": "foo", "Then": {"Concat": ["--", "flag"]}, "Else": "{{$.inputs.artifacts['input2'].path}}"}}, "-c"]}""", + [], + "a-{{$.inputs.artifacts['input2'].path}}-c", + ), + ( + """{"Concat": ["--", {"IfPresent": {"InputName": "foo", "Then": {"Concat": ["flag"]}, "Else": "{{$.inputs.artifacts['input2'].path}}"}}, "=c"]}""", + ['foo'], + '--flag=c', + ), + ( + """{"Concat": ["--", {"IfPresent": {"InputName": "foo", "Then": {"Concat": ["flag"]}}}, "=c"]}""", + ['foo'], + '--flag=c', + ), + ( + """{"Concat": ["--flag", {"IfPresent": {"InputName": "foo", "Then": {"Concat": ["=", "other", "_val"]}}}, "=foo"]}""", + [], + '--flag=foo', + ), + ( + """{"IfPresent": {"InputName": "foo", "Then": {"Concat": ["--", "flag"]}}}""", + ['foo'], + '--flag', + ), + ( + """{"IfPresent": {"InputName": "foo", "Then": {"Concat": ["--", "flag"]}}}""", + [], + None, + ), + ]) + def test( + self, + placeholder: str, + provided_inputs: List[str], + expected: Optional[None], + ): + actual = placeholder_utils.resolve_struct_placeholders( + placeholder, + provided_inputs, + ) + self.assertEqual(actual, expected) + + if __name__ == '__main__': unittest.main()