From 693b5838af8775e327974fd7ca0e6b17e8918c03 Mon Sep 17 00:00:00 2001 From: Josh Ogbonda Date: Wed, 2 Nov 2022 15:22:49 -0700 Subject: [PATCH 01/16] unblock valid topology --- sdk/python/kfp/components/pipeline_task.py | 7 ++++- .../kfp/components/pipeline_task_test.py | 26 ++++++++++++++++++- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/sdk/python/kfp/components/pipeline_task.py b/sdk/python/kfp/components/pipeline_task.py index 2225d6407fa..fc96ab5593d 100644 --- a/sdk/python/kfp/components/pipeline_task.py +++ b/sdk/python/kfp/components/pipeline_task.py @@ -563,8 +563,13 @@ def my_pipeline(): task1 = my_component(text='1st task') task2 = my_component(text='2nd task').after(task1) """ + from kfp.components.tasks_group import TasksGroupType + for task in tasks: - if task.parent_task_group is not self.parent_task_group: + if task.parent_task_group is not self.parent_task_group and task.parent_task_group.group_type in [ + TasksGroupType.CONDITION, TasksGroupType.FOR_LOOP, + TasksGroupType.EXIT_HANDLER + ]: raise ValueError( f'Cannot use .after() across inner pipelines or DSL control flow features. Tried to set {self.name} after {task.name}, but these tasks do not belong to the same pipeline or are not enclosed in the same control flow content manager.' ) diff --git a/sdk/python/kfp/components/pipeline_task_test.py b/sdk/python/kfp/components/pipeline_task_test.py index 57be44f33fa..b7bf3f0d73e 100644 --- a/sdk/python/kfp/components/pipeline_task_test.py +++ b/sdk/python/kfp/components/pipeline_task_test.py @@ -352,7 +352,7 @@ def my_pipeline(): second_exit_task = print_op(message='Second exit task.') with dsl.ExitHandler(second_exit_task): x = print_op(message='Inside second exit handler.') - x.after(first_exit_task) + x.after(first_print_op) with tempfile.TemporaryDirectory() as tempdir: package_path = os.path.join(tempdir, 'pipeline.yaml') @@ -411,6 +411,30 @@ def my_pipeline(): compiler.Compiler().compile( pipeline_func=my_pipeline, package_path=package_path) + def test_outside_of_condition_exception_permitted(self): + + @dsl.component + def print_op(message: str): + print(message) + + @dsl.component + def return_1() -> int: + return 1 + + @dsl.pipeline(name='pipeline-with-multiple-exit-handlers') + def my_pipeline(): + return_1_task = return_1() + + one = print_op(message='1') + with dsl.Condition(return_1_task.output == 1): + two = print_op(message='2') + three = print_op(message='3').after(one) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + def test_inside_of_condition_permitted(self): @dsl.component From 02ba99af3a4b156f8f174808aa86e1ed4318dadc Mon Sep 17 00:00:00 2001 From: Josh Ogbonda Date: Wed, 2 Nov 2022 16:45:49 -0700 Subject: [PATCH 02/16] add more tests --- .../kfp/components/pipeline_task_test.py | 50 ++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) diff --git a/sdk/python/kfp/components/pipeline_task_test.py b/sdk/python/kfp/components/pipeline_task_test.py index b7bf3f0d73e..2f77d99d06f 100644 --- a/sdk/python/kfp/components/pipeline_task_test.py +++ b/sdk/python/kfp/components/pipeline_task_test.py @@ -411,7 +411,55 @@ def my_pipeline(): compiler.Compiler().compile( pipeline_func=my_pipeline, package_path=package_path) - def test_outside_of_condition_exception_permitted(self): + def test_outside_of_for_loop_blocked(self): + with self.assertRaisesRegex(ValueError, + r'Cannot use \.after\(\) across'): + + @dsl.component + def print_op(message: str): + print(message) + + @dsl.component + def return_list() -> int: + return [1, 2, 3] + + @dsl.pipeline(name='pipeline-with-multiple-exit-handlers') + def my_pipeline(): + return_list_task = return_list() + + with dsl.ParallelFor(return_list_task.output) as item: + one = print_op(message='1') + two = print_op(message='2') + three = print_op(message='3').after(one) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + + def test_outside_of_exit_handler_blocked(self): + with self.assertRaisesRegex(ValueError, + r'Cannot use \.after\(\) across'): + + @dsl.component + def print_op(message: str): + print(message) + + @dsl.pipeline(name='pipeline-with-multiple-exit-handlers') + def my_pipeline(): + exit_task = print_op(message='exit task.') + + with dsl.ExitHandler(exit_task): + one = print_op(message='1') + two = print_op(message='2') + three = print_op(message='3').after(one) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + + def test_valid_outside_of_condition_topology_permitted(self): @dsl.component def print_op(message: str): From a23f9aed0f96c6eb8025187dfc5586baa08ca4ce Mon Sep 17 00:00:00 2001 From: Josh Ogbonda Date: Mon, 7 Nov 2022 14:17:27 -0800 Subject: [PATCH 03/16] handle git fork --- sdk/python/kfp/components/pipeline_task.py | 9 --------- sdk/python/kfp/components/pipeline_task_test.py | 10 +++++----- 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/sdk/python/kfp/components/pipeline_task.py b/sdk/python/kfp/components/pipeline_task.py index fc96ab5593d..e02fdbf7c9b 100644 --- a/sdk/python/kfp/components/pipeline_task.py +++ b/sdk/python/kfp/components/pipeline_task.py @@ -563,15 +563,6 @@ def my_pipeline(): task1 = my_component(text='1st task') task2 = my_component(text='2nd task').after(task1) """ - from kfp.components.tasks_group import TasksGroupType - for task in tasks: - if task.parent_task_group is not self.parent_task_group and task.parent_task_group.group_type in [ - TasksGroupType.CONDITION, TasksGroupType.FOR_LOOP, - TasksGroupType.EXIT_HANDLER - ]: - raise ValueError( - f'Cannot use .after() across inner pipelines or DSL control flow features. Tried to set {self.name} after {task.name}, but these tasks do not belong to the same pipeline or are not enclosed in the same control flow content manager.' - ) self._task_spec.dependent_tasks.append(task.name) return self diff --git a/sdk/python/kfp/components/pipeline_task_test.py b/sdk/python/kfp/components/pipeline_task_test.py index 2f77d99d06f..a84a276f555 100644 --- a/sdk/python/kfp/components/pipeline_task_test.py +++ b/sdk/python/kfp/components/pipeline_task_test.py @@ -397,7 +397,7 @@ def print_op(message: str): def return_1() -> int: return 1 - @dsl.pipeline(name='pipeline-with-multiple-exit-handlers') + @dsl.pipeline() def my_pipeline(): return_1_task = return_1() @@ -423,7 +423,7 @@ def print_op(message: str): def return_list() -> int: return [1, 2, 3] - @dsl.pipeline(name='pipeline-with-multiple-exit-handlers') + @dsl.pipeline() def my_pipeline(): return_list_task = return_list() @@ -445,7 +445,7 @@ def test_outside_of_exit_handler_blocked(self): def print_op(message: str): print(message) - @dsl.pipeline(name='pipeline-with-multiple-exit-handlers') + @dsl.pipeline() def my_pipeline(): exit_task = print_op(message='exit task.') @@ -469,7 +469,7 @@ def print_op(message: str): def return_1() -> int: return 1 - @dsl.pipeline(name='pipeline-with-multiple-exit-handlers') + @dsl.pipeline() def my_pipeline(): return_1_task = return_1() @@ -493,7 +493,7 @@ def print_op(message: str): def return_1() -> int: return 1 - @dsl.pipeline(name='pipeline-with-multiple-exit-handlers') + @dsl.pipeline() def my_pipeline(): return_1_task = return_1() From f73e94d2b2eb6e7179e497b5d1e512266a53d28f Mon Sep 17 00:00:00 2001 From: Josh Ogbonda Date: Mon, 7 Nov 2022 15:21:31 -0800 Subject: [PATCH 04/16] sample_test_cases --- sdk/python/kfp/compiler/compiler_test.py | 285 +++++++++++++++++++++ sdk/python/kfp/components/pipeline_task.py | 4 - 2 files changed, 285 insertions(+), 4 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index 0cb49775b51..9c0df0dc4df 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -1356,5 +1356,290 @@ def my_pipeline(): print_op(message='Inside second exit handler.') +class ValidLegalTopologies(unittest.TestCase): + + def test_inside_of_root_group_permitted(self): + + @dsl.component + def print_op(message: str): + print(message) + + @dsl.component + def return_1() -> int: + return 1 + + @dsl.pipeline() + def my_pipeline(): + return_1_task = return_1() + + one = print_op(message='1') + two = print_op(message='2') + three = print_op(message='3').after(one) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + + def test_upstream_inside_deeper_condition_blocked(self): + with self.assertRaisesRegex(ValueError, + r'Cannot use \.after\(\) across'): + + @dsl.component + def print_op(message: str): + print(message) + + @dsl.component + def return_1() -> int: + return 1 + + @dsl.pipeline() + def my_pipeline(): + return_1_task = return_1() + + one = print_op(message='1') + with dsl.Condition(return_1_task.output == 1): + two = print_op(message='2') + + three = print_op(message='3').after(two) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + + def test_upstream_in_the_same_condition_permitted(self): + + @dsl.component + def print_op(message: str): + print(message) + + @dsl.component + def return_1() -> int: + return 1 + + @dsl.pipeline() + def my_pipeline(): + return_1_task = return_1() + + with dsl.Condition(return_1_task.output == 1): + one = print_op(message='1') + two = print_op(message='2') + three = print_op(message='3').after(one) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + + def test_downstream_inside_deeper_condition_permitted(self): + + @dsl.component + def print_op(message: str): + print(message) + + @dsl.component + def return_1() -> int: + return 1 + + @dsl.pipeline() + def my_pipeline(): + return_1_task = return_1() + + one = print_op(message='1') + with dsl.Condition(return_1_task.output == 1): + two = print_op(message='2') + three = print_op(message='3').after(one) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + + def test_downstream_and_upstream_in_different_condition_on_same_level_blocked( + self): + with self.assertRaisesRegex(ValueError, + r'Cannot use \.after\(\) across'): + + @dsl.component + def print_op(message: str): + print(message) + + @dsl.component + def return_1() -> int: + return 1 + + @dsl.pipeline() + def my_pipeline(): + return_1_task = return_1() + + one = print_op(message='1') + with dsl.Condition(return_1_task.output == 1): + two = print_op(message='2') + + with dsl.Condition(return_1_task.output == 1): + three = print_op(message='3').after(two) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + + def test_downstream_inside_deeper_nested_condition_permitted(self): + + @dsl.component + def print_op(message: str): + print(message) + + @dsl.component + def return_1() -> int: + return 1 + + @dsl.pipeline() + def my_pipeline(): + return_1_task = return_1() + + with dsl.Condition(return_1_task.output == 1): + one = print_op(message='1') + with dsl.Condition(return_1_task.output == 1): + two = print_op(message='2') + three = print_op(message='3').after(one) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + + def test_upstream_inside_deeper_nested_condition_blocked(self): + with self.assertRaisesRegex(ValueError, + r'Cannot use \.after\(\) across'): + + @dsl.component + def print_op(message: str): + print(message) + + @dsl.component + def return_1() -> int: + return 1 + + @dsl.pipeline() + def my_pipeline(): + return_1_task = return_1() + + with dsl.Condition(return_1_task.output == 1): + one = print_op(message='1') + with dsl.Condition(return_1_task.output == 1): + two = print_op(message='2') + three = print_op(message='3').after(two) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + + def test_upstream_in_same_for_loop_with_downstream_permitted(self): + + @dsl.component + def print_op(message: str): + print(message) + + @dsl.component + def args_generator_op() -> List[Dict[str, str]]: + return [{'A_a': '1', 'B_b': '2'}, {'A_a': '10', 'B_b': '20'}] + + @dsl.pipeline() + def my_pipeline(): + args_generator = args_generator_op() + + with dsl.ParallelFor(args_generator.output): + one = print_op(message='1') + two = print_op(message='3').after(one) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + + def test_downstream_not_in_same_for_loop_with_upstream_blocked(self): + with self.assertRaisesRegex(ValueError, + r'Cannot use \.after\(\) across'): + + @dsl.component + def print_op(message: str): + print(message) + + @dsl.component + def args_generator_op() -> List[Dict[str, str]]: + return [{'A_a': '1', 'B_b': '2'}, {'A_a': '10', 'B_b': '20'}] + + @dsl.pipeline() + def my_pipeline(): + args_generator = args_generator_op() + + with dsl.ParallelFor(args_generator.output): + one = print_op(message='1') + two = print_op(message='3').after(one) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + + def test_downstream_not_in_same_for_loop_with_upstream_seperate_blocked( + self): + with self.assertRaisesRegex(ValueError, + r'Cannot use \.after\(\) across'): + + @dsl.component + def print_op(message: str): + print(message) + + @dsl.component + def args_generator_op() -> List[Dict[str, str]]: + return [{'A_a': '1', 'B_b': '2'}, {'A_a': '10', 'B_b': '20'}] + + @dsl.pipeline() + def my_pipeline(): + args_generator = args_generator_op() + + with dsl.ParallelFor(args_generator.output): + one = print_op(message='1') + + with dsl.ParallelFor(args_generator.output): + two = print_op(message='3').after(one) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + + def test_downstream_not_in_same_for_loop_with_upstream_nested_blocked(self): + with self.assertRaisesRegex(ValueError, + r'Cannot use \.after\(\) across'): + + @dsl.component + def print_op(message: str): + print(message) + + @dsl.component + def args_generator_op() -> List[Dict[str, str]]: + return [{'A_a': '1', 'B_b': '2'}, {'A_a': '10', 'B_b': '20'}] + + @dsl.pipeline() + def my_pipeline(): + args_generator = args_generator_op() + + with dsl.ParallelFor(args_generator.output): + one = print_op(message='1') + + with dsl.ParallelFor(args_generator.output): + two = print_op(message='3').after(one) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + + if __name__ == '__main__': unittest.main() diff --git a/sdk/python/kfp/components/pipeline_task.py b/sdk/python/kfp/components/pipeline_task.py index 2225d6407fa..e02fdbf7c9b 100644 --- a/sdk/python/kfp/components/pipeline_task.py +++ b/sdk/python/kfp/components/pipeline_task.py @@ -564,9 +564,5 @@ def my_pipeline(): task2 = my_component(text='2nd task').after(task1) """ for task in tasks: - if task.parent_task_group is not self.parent_task_group: - raise ValueError( - f'Cannot use .after() across inner pipelines or DSL control flow features. Tried to set {self.name} after {task.name}, but these tasks do not belong to the same pipeline or are not enclosed in the same control flow content manager.' - ) self._task_spec.dependent_tasks.append(task.name) return self From 8b3b633b3585d6b92cc96083f9f8eddbd764cfe5 Mon Sep 17 00:00:00 2001 From: Josh Ogbonda Date: Thu, 10 Nov 2022 11:31:56 -0800 Subject: [PATCH 05/16] main --- sdk/python/kfp/compiler/compiler_test.py | 38 ++++++++++--------- sdk/python/kfp/compiler/compiler_utils.py | 27 ++++++++++--- .../kfp/components/pipeline_task_test.py | 20 ++++++---- 3 files changed, 55 insertions(+), 30 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index 9c0df0dc4df..a81228ebf7f 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -437,7 +437,8 @@ def dummy_op(msg: str = ''): with self.assertRaisesRegex( RuntimeError, - 'Task dummy-op cannot dependent on any task inside the group:'): + 'Task dummy-op cannot dependent on any task inside a ParralelFor' + ): @dsl.pipeline(name='test-pipeline') def my_pipeline(val: bool): @@ -479,7 +480,8 @@ def dummy_op(msg: str = ''): with self.assertRaisesRegex( RuntimeError, - 'Task dummy-op cannot dependent on any task inside the group:'): + 'Task dummy-op cannot dependent on any task inside a Condition that is not a common ancestor of both tasks' + ): @dsl.pipeline(name='test-pipeline') def my_pipeline(val: bool): @@ -521,7 +523,8 @@ def dummy_op(msg: str = ''): with self.assertRaisesRegex( RuntimeError, - 'Task dummy-op cannot dependent on any task inside the group:'): + 'Task dummy-op cannot dependent on any task inside a Exithandler that is not a common ancestor of both tasks' + ): @dsl.pipeline(name='test-pipeline') def my_pipeline(val: bool): @@ -1382,8 +1385,8 @@ def my_pipeline(): pipeline_func=my_pipeline, package_path=package_path) def test_upstream_inside_deeper_condition_blocked(self): - with self.assertRaisesRegex(ValueError, - r'Cannot use \.after\(\) across'): + + with self.assertRaisesRegex(RuntimeError, r'Task'): @dsl.component def print_op(message: str): @@ -1440,7 +1443,7 @@ def print_op(message: str): @dsl.component def return_1() -> int: - return 1 + return @dsl.pipeline() def my_pipeline(): @@ -1458,8 +1461,8 @@ def my_pipeline(): def test_downstream_and_upstream_in_different_condition_on_same_level_blocked( self): - with self.assertRaisesRegex(ValueError, - r'Cannot use \.after\(\) across'): + + with self.assertRaisesRegex(RuntimeError, r'Task'): @dsl.component def print_op(message: str): @@ -1498,10 +1501,11 @@ def return_1() -> int: @dsl.pipeline() def my_pipeline(): return_1_task = return_1() + return_1_task2 = return_1() with dsl.Condition(return_1_task.output == 1): one = print_op(message='1') - with dsl.Condition(return_1_task.output == 1): + with dsl.Condition(return_1_task2.output == 1): two = print_op(message='2') three = print_op(message='3').after(one) @@ -1511,8 +1515,8 @@ def my_pipeline(): pipeline_func=my_pipeline, package_path=package_path) def test_upstream_inside_deeper_nested_condition_blocked(self): - with self.assertRaisesRegex(ValueError, - r'Cannot use \.after\(\) across'): + + with self.assertRaisesRegex(RuntimeError, r'Task'): @dsl.component def print_op(message: str): @@ -1561,8 +1565,8 @@ def my_pipeline(): pipeline_func=my_pipeline, package_path=package_path) def test_downstream_not_in_same_for_loop_with_upstream_blocked(self): - with self.assertRaisesRegex(ValueError, - r'Cannot use \.after\(\) across'): + + with self.assertRaisesRegex(RuntimeError, r'Task'): @dsl.component def print_op(message: str): @@ -1587,8 +1591,8 @@ def my_pipeline(): def test_downstream_not_in_same_for_loop_with_upstream_seperate_blocked( self): - with self.assertRaisesRegex(ValueError, - r'Cannot use \.after\(\) across'): + + with self.assertRaisesRegex(RuntimeError, r'Task'): @dsl.component def print_op(message: str): @@ -1614,8 +1618,8 @@ def my_pipeline(): pipeline_func=my_pipeline, package_path=package_path) def test_downstream_not_in_same_for_loop_with_upstream_nested_blocked(self): - with self.assertRaisesRegex(ValueError, - r'Cannot use \.after\(\) across'): + + with self.assertRaisesRegex(RuntimeError, r'Task'): @dsl.component def print_op(message: str): diff --git a/sdk/python/kfp/compiler/compiler_utils.py b/sdk/python/kfp/compiler/compiler_utils.py index 18e329afcde..cb3d6440899 100644 --- a/sdk/python/kfp/compiler/compiler_utils.py +++ b/sdk/python/kfp/compiler/compiler_utils.py @@ -427,14 +427,29 @@ def get_dependencies( task2=task, ) - # a task cannot depend on a task created in a for loop group since individual PipelineTask variables are reassigned after each loop iteration + # ParralelFor Check + for parent in task_name_to_parent_groups[upstream_task.name]: + parent = group_name_to_group.get(parent, None) + if isinstance( + parent, + tasks_group.ParallelFor) and task not in parent.tasks: + raise RuntimeError( + f'Task {task.name} cannot dependent on any task inside a ParralelFor' + ) + + # Condition check + dependent_group = group_name_to_group.get(upstream_groups[0], None) + if isinstance(dependent_group, tasks_group.Condition): + raise RuntimeError( + f'Task {task.name} cannot dependent on any task inside a Condition that is not a common ancestor of both tasks' + ) + + # ExitHandler check dependent_group = group_name_to_group.get(upstream_groups[0], None) - if isinstance(dependent_group, - (tasks_group.ParallelFor, tasks_group.Condition, - tasks_group.ExitHandler)): + if isinstance(dependent_group, tasks_group.ExitHandler): raise RuntimeError( - f'Task {task.name} cannot dependent on any task inside' - f' the group: {upstream_groups[0]}.') + f'Task {task.name} cannot dependent on any task inside a Exithandler that is not a common ancestor of both tasks' + ) dependencies[downstream_groups[0]].add(upstream_groups[0]) diff --git a/sdk/python/kfp/components/pipeline_task_test.py b/sdk/python/kfp/components/pipeline_task_test.py index 57be44f33fa..f860b143ac6 100644 --- a/sdk/python/kfp/components/pipeline_task_test.py +++ b/sdk/python/kfp/components/pipeline_task_test.py @@ -308,8 +308,10 @@ def test_set_display_name(self): class TestCannotUseAfterCrossDAG(unittest.TestCase): def test_inner_task_prevented(self): - with self.assertRaisesRegex(ValueError, - r'Cannot use \.after\(\) across'): + with self.assertRaisesRegex( + RuntimeError, + r'Task print-op-4 cannot dependent on any task inside a Exithandler that is not a common ancestor of both tasks' + ): @dsl.component def print_op(message: str): @@ -334,8 +336,10 @@ def my_pipeline(): pipeline_func=my_pipeline, package_path=package_path) def test_exit_handler_task_prevented(self): - with self.assertRaisesRegex(ValueError, - r'Cannot use \.after\(\) across'): + with self.assertRaisesRegex( + RuntimeError, + r'Task print-op-4 cannot dependent on any task inside a Exithandler that is not a common ancestor of both tasks' + ): @dsl.component def print_op(message: str): @@ -352,7 +356,7 @@ def my_pipeline(): second_exit_task = print_op(message='Second exit task.') with dsl.ExitHandler(second_exit_task): x = print_op(message='Inside second exit handler.') - x.after(first_exit_task) + x.after(first_print_op) with tempfile.TemporaryDirectory() as tempdir: package_path = os.path.join(tempdir, 'pipeline.yaml') @@ -386,8 +390,10 @@ def my_pipeline(): pipeline_func=my_pipeline, package_path=package_path) def test_outside_of_condition_blocked(self): - with self.assertRaisesRegex(ValueError, - r'Cannot use \.after\(\) across'): + with self.assertRaisesRegex( + RuntimeError, + r'Task print-op-3 cannot dependent on any task inside a Condition that is not a common ancestor of both tasks' + ): @dsl.component def print_op(message: str): From 472bc020587fc35a0a830c83c8d03c19150c1855 Mon Sep 17 00:00:00 2001 From: Josh Ogbonda Date: Thu, 10 Nov 2022 11:54:13 -0800 Subject: [PATCH 06/16] restore to master --- sdk/python/kfp/components/pipeline_task.py | 4 + .../kfp/components/pipeline_task_test.py | 78 +------------------ 2 files changed, 7 insertions(+), 75 deletions(-) diff --git a/sdk/python/kfp/components/pipeline_task.py b/sdk/python/kfp/components/pipeline_task.py index e02fdbf7c9b..2225d6407fa 100644 --- a/sdk/python/kfp/components/pipeline_task.py +++ b/sdk/python/kfp/components/pipeline_task.py @@ -564,5 +564,9 @@ def my_pipeline(): task2 = my_component(text='2nd task').after(task1) """ for task in tasks: + if task.parent_task_group is not self.parent_task_group: + raise ValueError( + f'Cannot use .after() across inner pipelines or DSL control flow features. Tried to set {self.name} after {task.name}, but these tasks do not belong to the same pipeline or are not enclosed in the same control flow content manager.' + ) self._task_spec.dependent_tasks.append(task.name) return self diff --git a/sdk/python/kfp/components/pipeline_task_test.py b/sdk/python/kfp/components/pipeline_task_test.py index a84a276f555..57be44f33fa 100644 --- a/sdk/python/kfp/components/pipeline_task_test.py +++ b/sdk/python/kfp/components/pipeline_task_test.py @@ -352,7 +352,7 @@ def my_pipeline(): second_exit_task = print_op(message='Second exit task.') with dsl.ExitHandler(second_exit_task): x = print_op(message='Inside second exit handler.') - x.after(first_print_op) + x.after(first_exit_task) with tempfile.TemporaryDirectory() as tempdir: package_path = os.path.join(tempdir, 'pipeline.yaml') @@ -397,7 +397,7 @@ def print_op(message: str): def return_1() -> int: return 1 - @dsl.pipeline() + @dsl.pipeline(name='pipeline-with-multiple-exit-handlers') def my_pipeline(): return_1_task = return_1() @@ -411,78 +411,6 @@ def my_pipeline(): compiler.Compiler().compile( pipeline_func=my_pipeline, package_path=package_path) - def test_outside_of_for_loop_blocked(self): - with self.assertRaisesRegex(ValueError, - r'Cannot use \.after\(\) across'): - - @dsl.component - def print_op(message: str): - print(message) - - @dsl.component - def return_list() -> int: - return [1, 2, 3] - - @dsl.pipeline() - def my_pipeline(): - return_list_task = return_list() - - with dsl.ParallelFor(return_list_task.output) as item: - one = print_op(message='1') - two = print_op(message='2') - three = print_op(message='3').after(one) - - with tempfile.TemporaryDirectory() as tempdir: - package_path = os.path.join(tempdir, 'pipeline.yaml') - compiler.Compiler().compile( - pipeline_func=my_pipeline, package_path=package_path) - - def test_outside_of_exit_handler_blocked(self): - with self.assertRaisesRegex(ValueError, - r'Cannot use \.after\(\) across'): - - @dsl.component - def print_op(message: str): - print(message) - - @dsl.pipeline() - def my_pipeline(): - exit_task = print_op(message='exit task.') - - with dsl.ExitHandler(exit_task): - one = print_op(message='1') - two = print_op(message='2') - three = print_op(message='3').after(one) - - with tempfile.TemporaryDirectory() as tempdir: - package_path = os.path.join(tempdir, 'pipeline.yaml') - compiler.Compiler().compile( - pipeline_func=my_pipeline, package_path=package_path) - - def test_valid_outside_of_condition_topology_permitted(self): - - @dsl.component - def print_op(message: str): - print(message) - - @dsl.component - def return_1() -> int: - return 1 - - @dsl.pipeline() - def my_pipeline(): - return_1_task = return_1() - - one = print_op(message='1') - with dsl.Condition(return_1_task.output == 1): - two = print_op(message='2') - three = print_op(message='3').after(one) - - with tempfile.TemporaryDirectory() as tempdir: - package_path = os.path.join(tempdir, 'pipeline.yaml') - compiler.Compiler().compile( - pipeline_func=my_pipeline, package_path=package_path) - def test_inside_of_condition_permitted(self): @dsl.component @@ -493,7 +421,7 @@ def print_op(message: str): def return_1() -> int: return 1 - @dsl.pipeline() + @dsl.pipeline(name='pipeline-with-multiple-exit-handlers') def my_pipeline(): return_1_task = return_1() From 4d8b6b4c4f17a5c6d567e525113fba22fd3128c0 Mon Sep 17 00:00:00 2001 From: Josh Ogbonda Date: Fri, 11 Nov 2022 11:28:17 -0800 Subject: [PATCH 07/16] resolve comments on PR --- sdk/python/kfp/compiler/compiler_test.py | 39 +++++++++++++------ sdk/python/kfp/compiler/compiler_utils.py | 30 ++++++++++---- .../kfp/components/pipeline_task_test.py | 15 ++----- 3 files changed, 52 insertions(+), 32 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index a81228ebf7f..7d95131f04a 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -435,10 +435,7 @@ def producer_op() -> str: def dummy_op(msg: str = ''): pass - with self.assertRaisesRegex( - RuntimeError, - 'Task dummy-op cannot dependent on any task inside a ParralelFor' - ): + with self.assertRaisesRegex(RuntimeError, r'Task'): @dsl.pipeline(name='test-pipeline') def my_pipeline(val: bool): @@ -478,10 +475,7 @@ def producer_op() -> str: def dummy_op(msg: str = ''): pass - with self.assertRaisesRegex( - RuntimeError, - 'Task dummy-op cannot dependent on any task inside a Condition that is not a common ancestor of both tasks' - ): + with self.assertRaisesRegex(RuntimeError, r'Task'): @dsl.pipeline(name='test-pipeline') def my_pipeline(val: bool): @@ -521,10 +515,7 @@ def producer_op() -> str: def dummy_op(msg: str = ''): pass - with self.assertRaisesRegex( - RuntimeError, - 'Task dummy-op cannot dependent on any task inside a Exithandler that is not a common ancestor of both tasks' - ): + with self.assertRaisesRegex(RuntimeError, r'Task'): @dsl.pipeline(name='test-pipeline') def my_pipeline(val: bool): @@ -1644,6 +1635,30 @@ def my_pipeline(): compiler.Compiler().compile( pipeline_func=my_pipeline, package_path=package_path) + def test_downstream_in_condition_nested_in_a_for_loop(self): + + @dsl.component + def print_op(message: str): + print(message) + + @dsl.component + def return_1() -> int: + return 1 + + @dsl.pipeline() + def my_pipeline(): + return_1_task = return_1() + + with dsl.ParallelFor([1, 2, 3]): + one = print_op(message='1') + with dsl.Condition(return_1_task.output == 1): + two = print_op(message='2').after(one) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + if __name__ == '__main__': unittest.main() diff --git a/sdk/python/kfp/compiler/compiler_utils.py b/sdk/python/kfp/compiler/compiler_utils.py index cb3d6440899..583bb90d079 100644 --- a/sdk/python/kfp/compiler/compiler_utils.py +++ b/sdk/python/kfp/compiler/compiler_utils.py @@ -430,25 +430,39 @@ def get_dependencies( # ParralelFor Check for parent in task_name_to_parent_groups[upstream_task.name]: parent = group_name_to_group.get(parent, None) - if isinstance( - parent, - tasks_group.ParallelFor) and task not in parent.tasks: - raise RuntimeError( - f'Task {task.name} cannot dependent on any task inside a ParralelFor' - ) + if isinstance(parent, tasks_group.ParallelFor): + exception = True + if parent.name in task_name_to_parent_groups[task.name]: + exception = False + idx = task_name_to_parent_groups[task.name].index( + parent.name) + cnt = 0 + for ancestors in task_name_to_parent_groups[ + task.name][idx:]: + ancestors = group_name_to_group.get(ancestors, None) + if isinstance(ancestors, tasks_group.ParallelFor): + cnt += 1 + if cnt > 1: + exception = True + break + + if exception: + raise RuntimeError( + f'Tasks cannot depend on upstream tasks inside a ParallelFor. Task {task.name} depends on upstream task {upstream_task.name}.' + ) # Condition check dependent_group = group_name_to_group.get(upstream_groups[0], None) if isinstance(dependent_group, tasks_group.Condition): raise RuntimeError( - f'Task {task.name} cannot dependent on any task inside a Condition that is not a common ancestor of both tasks' + f'Tasks cannot depend on upstream tasks inside a Condition that is not a common ancestor of both tasks. Task {task.name} depends on upstream task {upstream_task.name}.' ) # ExitHandler check dependent_group = group_name_to_group.get(upstream_groups[0], None) if isinstance(dependent_group, tasks_group.ExitHandler): raise RuntimeError( - f'Task {task.name} cannot dependent on any task inside a Exithandler that is not a common ancestor of both tasks' + f'Tasks cannot depend on upstream tasks inside a Exithandler that is not a common ancestor of both tasks. Task {task.name} depends on upstream task {upstream_task.name}.' ) dependencies[downstream_groups[0]].add(upstream_groups[0]) diff --git a/sdk/python/kfp/components/pipeline_task_test.py b/sdk/python/kfp/components/pipeline_task_test.py index f860b143ac6..6c7070ba091 100644 --- a/sdk/python/kfp/components/pipeline_task_test.py +++ b/sdk/python/kfp/components/pipeline_task_test.py @@ -308,10 +308,7 @@ def test_set_display_name(self): class TestCannotUseAfterCrossDAG(unittest.TestCase): def test_inner_task_prevented(self): - with self.assertRaisesRegex( - RuntimeError, - r'Task print-op-4 cannot dependent on any task inside a Exithandler that is not a common ancestor of both tasks' - ): + with self.assertRaisesRegex(RuntimeError, r'Task'): @dsl.component def print_op(message: str): @@ -336,10 +333,7 @@ def my_pipeline(): pipeline_func=my_pipeline, package_path=package_path) def test_exit_handler_task_prevented(self): - with self.assertRaisesRegex( - RuntimeError, - r'Task print-op-4 cannot dependent on any task inside a Exithandler that is not a common ancestor of both tasks' - ): + with self.assertRaisesRegex(RuntimeError, r'Task'): @dsl.component def print_op(message: str): @@ -390,10 +384,7 @@ def my_pipeline(): pipeline_func=my_pipeline, package_path=package_path) def test_outside_of_condition_blocked(self): - with self.assertRaisesRegex( - RuntimeError, - r'Task print-op-3 cannot dependent on any task inside a Condition that is not a common ancestor of both tasks' - ): + with self.assertRaisesRegex(RuntimeError, r'Task'): @dsl.component def print_op(message: str): From 586ef5a7e0c3adfebbe98512cdd255839b03bd66 Mon Sep 17 00:00:00 2001 From: Josh Ogbonda Date: Tue, 15 Nov 2022 10:26:17 -0800 Subject: [PATCH 08/16] resolve conflicts --- sdk/python/kfp/compiler/compiler_test.py | 134 +++++++++++++++++++++++ 1 file changed, 134 insertions(+) diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index 7d95131f04a..4bf3f879c95 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -1660,5 +1660,139 @@ def my_pipeline(): pipeline_func=my_pipeline, package_path=package_path) +class TestBoolInputParameterWithDefaultSerializesCorrectly(unittest.TestCase): + # test with default = True, may have false test successes due to protocol buffer boolean default of False + def test_python_component(self): + + @dsl.component + def comp(boolean: bool = True) -> bool: + return boolean + + # test inner component interface + self.assertEqual( + comp.pipeline_spec.components['comp-comp'].input_definitions + .parameters['boolean'].default_value.bool_value, True) + + # test outer pipeline "wrapper" interface + self.assertEqual( + comp.pipeline_spec.root.input_definitions.parameters['boolean'] + .default_value.bool_value, True) + + def test_python_component_with_overrides(self): + + @dsl.component + def comp(boolean: bool = False) -> bool: + return boolean + + with tempfile.TemporaryDirectory() as tmpdir: + pipeline_spec_path = os.path.join(tmpdir, 'output.yaml') + compiler.Compiler().compile( + comp, pipeline_spec_path, pipeline_parameters={'boolean': True}) + pipeline_spec = pipeline_spec_from_file(pipeline_spec_path) + + # test outer pipeline "wrapper" interface + self.assertEqual( + pipeline_spec.root.input_definitions.parameters['boolean'] + .default_value.bool_value, True) + + def test_container_component(self): + + @dsl.container_component + def comp(boolean: bool = True): + return dsl.ContainerSpec(image='alpine', command=['echo', boolean]) + + # test inner component interface + self.assertEqual( + comp.pipeline_spec.components['comp-comp'].input_definitions + .parameters['boolean'].default_value.bool_value, True) + + # test pipeline "wrapper" interface + self.assertEqual( + comp.pipeline_spec.root.input_definitions.parameters['boolean'] + .default_value.bool_value, True) + + def test_container_component_with_overrides(self): + + @dsl.container_component + def comp(boolean: bool = True): + return dsl.ContainerSpec(image='alpine', command=['echo', boolean]) + + with tempfile.TemporaryDirectory() as tmpdir: + pipeline_spec_path = os.path.join(tmpdir, 'output.yaml') + compiler.Compiler().compile( + comp, pipeline_spec_path, pipeline_parameters={'boolean': True}) + pipeline_spec = pipeline_spec_from_file(pipeline_spec_path) + + # test outer pipeline "wrapper" interface + self.assertEqual( + pipeline_spec.root.input_definitions.parameters['boolean'] + .default_value.bool_value, True) + + def test_pipeline_no_input(self): + + @dsl.component + def comp(boolean: bool = True) -> bool: + return boolean + + @dsl.pipeline + def pipeline_no_input(): + comp() + + # test inner component interface + self.assertEqual( + pipeline_no_input.pipeline_spec.components['comp-comp'] + .input_definitions.parameters['boolean'].default_value.bool_value, + True) + + def test_pipeline_with_input(self): + + @dsl.component + def comp(boolean: bool = True) -> bool: + return boolean + + @dsl.pipeline + def pipeline_with_input(boolean: bool = True): + comp(boolean=boolean) + + # test inner component interface + self.assertEqual( + pipeline_with_input.pipeline_spec.components['comp-comp'] + .input_definitions.parameters['boolean'].default_value.bool_value, + True) + + # test pipeline interface + self.assertEqual( + pipeline_with_input.pipeline_spec.root.input_definitions + .parameters['boolean'].default_value.bool_value, True) + + def test_pipeline_with_with_overrides(self): + + @dsl.component + def comp(boolean: bool = True) -> bool: + return boolean + + @dsl.pipeline + def pipeline_with_input(boolean: bool = False): + comp(boolean=boolean) + + with tempfile.TemporaryDirectory() as tmpdir: + pipeline_spec_path = os.path.join(tmpdir, 'output.yaml') + compiler.Compiler().compile( + pipeline_with_input, + pipeline_spec_path, + pipeline_parameters={'boolean': True}) + pipeline_spec = pipeline_spec_from_file(pipeline_spec_path) + + # test inner component interface + self.assertEqual( + pipeline_spec.components['comp-comp'].input_definitions + .parameters['boolean'].default_value.bool_value, True) + + # test pipeline interface + self.assertEqual( + pipeline_spec.root.input_definitions.parameters['boolean'] + .default_value.bool_value, True) + + if __name__ == '__main__': unittest.main() From 2eb19cf1cb8597f39470d93f1e76823a194fb4c1 Mon Sep 17 00:00:00 2001 From: Josh Ogbonda Date: Tue, 15 Nov 2022 10:30:46 -0800 Subject: [PATCH 09/16] resolve conflict 2 --- sdk/python/kfp/compiler/compiler_test.py | 268 +++++++++++------------ 1 file changed, 134 insertions(+), 134 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index 4bf3f879c95..fe7b2511991 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -1350,6 +1350,140 @@ def my_pipeline(): print_op(message='Inside second exit handler.') +class TestBoolInputParameterWithDefaultSerializesCorrectly(unittest.TestCase): + # test with default = True, may have false test successes due to protocol buffer boolean default of False + def test_python_component(self): + + @dsl.component + def comp(boolean: bool = True) -> bool: + return boolean + + # test inner component interface + self.assertEqual( + comp.pipeline_spec.components['comp-comp'].input_definitions + .parameters['boolean'].default_value.bool_value, True) + + # test outer pipeline "wrapper" interface + self.assertEqual( + comp.pipeline_spec.root.input_definitions.parameters['boolean'] + .default_value.bool_value, True) + + def test_python_component_with_overrides(self): + + @dsl.component + def comp(boolean: bool = False) -> bool: + return boolean + + with tempfile.TemporaryDirectory() as tmpdir: + pipeline_spec_path = os.path.join(tmpdir, 'output.yaml') + compiler.Compiler().compile( + comp, pipeline_spec_path, pipeline_parameters={'boolean': True}) + pipeline_spec = pipeline_spec_from_file(pipeline_spec_path) + + # test outer pipeline "wrapper" interface + self.assertEqual( + pipeline_spec.root.input_definitions.parameters['boolean'] + .default_value.bool_value, True) + + def test_container_component(self): + + @dsl.container_component + def comp(boolean: bool = True): + return dsl.ContainerSpec(image='alpine', command=['echo', boolean]) + + # test inner component interface + self.assertEqual( + comp.pipeline_spec.components['comp-comp'].input_definitions + .parameters['boolean'].default_value.bool_value, True) + + # test pipeline "wrapper" interface + self.assertEqual( + comp.pipeline_spec.root.input_definitions.parameters['boolean'] + .default_value.bool_value, True) + + def test_container_component_with_overrides(self): + + @dsl.container_component + def comp(boolean: bool = True): + return dsl.ContainerSpec(image='alpine', command=['echo', boolean]) + + with tempfile.TemporaryDirectory() as tmpdir: + pipeline_spec_path = os.path.join(tmpdir, 'output.yaml') + compiler.Compiler().compile( + comp, pipeline_spec_path, pipeline_parameters={'boolean': True}) + pipeline_spec = pipeline_spec_from_file(pipeline_spec_path) + + # test outer pipeline "wrapper" interface + self.assertEqual( + pipeline_spec.root.input_definitions.parameters['boolean'] + .default_value.bool_value, True) + + def test_pipeline_no_input(self): + + @dsl.component + def comp(boolean: bool = True) -> bool: + return boolean + + @dsl.pipeline + def pipeline_no_input(): + comp() + + # test inner component interface + self.assertEqual( + pipeline_no_input.pipeline_spec.components['comp-comp'] + .input_definitions.parameters['boolean'].default_value.bool_value, + True) + + def test_pipeline_with_input(self): + + @dsl.component + def comp(boolean: bool = True) -> bool: + return boolean + + @dsl.pipeline + def pipeline_with_input(boolean: bool = True): + comp(boolean=boolean) + + # test inner component interface + self.assertEqual( + pipeline_with_input.pipeline_spec.components['comp-comp'] + .input_definitions.parameters['boolean'].default_value.bool_value, + True) + + # test pipeline interface + self.assertEqual( + pipeline_with_input.pipeline_spec.root.input_definitions + .parameters['boolean'].default_value.bool_value, True) + + def test_pipeline_with_with_overrides(self): + + @dsl.component + def comp(boolean: bool = True) -> bool: + return boolean + + @dsl.pipeline + def pipeline_with_input(boolean: bool = False): + comp(boolean=boolean) + + with tempfile.TemporaryDirectory() as tmpdir: + pipeline_spec_path = os.path.join(tmpdir, 'output.yaml') + compiler.Compiler().compile( + pipeline_with_input, + pipeline_spec_path, + pipeline_parameters={'boolean': True}) + pipeline_spec = pipeline_spec_from_file(pipeline_spec_path) + + # test inner component interface + self.assertEqual( + pipeline_spec.components['comp-comp'].input_definitions + .parameters['boolean'].default_value.bool_value, True) + + # test pipeline interface + self.assertEqual( + pipeline_spec.root.input_definitions.parameters['boolean'] + .default_value.bool_value, True) + + class ValidLegalTopologies(unittest.TestCase): def test_inside_of_root_group_permitted(self): @@ -1660,139 +1794,5 @@ def my_pipeline(): pipeline_func=my_pipeline, package_path=package_path) -class TestBoolInputParameterWithDefaultSerializesCorrectly(unittest.TestCase): - # test with default = True, may have false test successes due to protocol buffer boolean default of False - def test_python_component(self): - - @dsl.component - def comp(boolean: bool = True) -> bool: - return boolean - - # test inner component interface - self.assertEqual( - comp.pipeline_spec.components['comp-comp'].input_definitions - .parameters['boolean'].default_value.bool_value, True) - - # test outer pipeline "wrapper" interface - self.assertEqual( - comp.pipeline_spec.root.input_definitions.parameters['boolean'] - .default_value.bool_value, True) - - def test_python_component_with_overrides(self): - - @dsl.component - def comp(boolean: bool = False) -> bool: - return boolean - - with tempfile.TemporaryDirectory() as tmpdir: - pipeline_spec_path = os.path.join(tmpdir, 'output.yaml') - compiler.Compiler().compile( - comp, pipeline_spec_path, pipeline_parameters={'boolean': True}) - pipeline_spec = pipeline_spec_from_file(pipeline_spec_path) - - # test outer pipeline "wrapper" interface - self.assertEqual( - pipeline_spec.root.input_definitions.parameters['boolean'] - .default_value.bool_value, True) - - def test_container_component(self): - - @dsl.container_component - def comp(boolean: bool = True): - return dsl.ContainerSpec(image='alpine', command=['echo', boolean]) - - # test inner component interface - self.assertEqual( - comp.pipeline_spec.components['comp-comp'].input_definitions - .parameters['boolean'].default_value.bool_value, True) - - # test pipeline "wrapper" interface - self.assertEqual( - comp.pipeline_spec.root.input_definitions.parameters['boolean'] - .default_value.bool_value, True) - - def test_container_component_with_overrides(self): - - @dsl.container_component - def comp(boolean: bool = True): - return dsl.ContainerSpec(image='alpine', command=['echo', boolean]) - - with tempfile.TemporaryDirectory() as tmpdir: - pipeline_spec_path = os.path.join(tmpdir, 'output.yaml') - compiler.Compiler().compile( - comp, pipeline_spec_path, pipeline_parameters={'boolean': True}) - pipeline_spec = pipeline_spec_from_file(pipeline_spec_path) - - # test outer pipeline "wrapper" interface - self.assertEqual( - pipeline_spec.root.input_definitions.parameters['boolean'] - .default_value.bool_value, True) - - def test_pipeline_no_input(self): - - @dsl.component - def comp(boolean: bool = True) -> bool: - return boolean - - @dsl.pipeline - def pipeline_no_input(): - comp() - - # test inner component interface - self.assertEqual( - pipeline_no_input.pipeline_spec.components['comp-comp'] - .input_definitions.parameters['boolean'].default_value.bool_value, - True) - - def test_pipeline_with_input(self): - - @dsl.component - def comp(boolean: bool = True) -> bool: - return boolean - - @dsl.pipeline - def pipeline_with_input(boolean: bool = True): - comp(boolean=boolean) - - # test inner component interface - self.assertEqual( - pipeline_with_input.pipeline_spec.components['comp-comp'] - .input_definitions.parameters['boolean'].default_value.bool_value, - True) - - # test pipeline interface - self.assertEqual( - pipeline_with_input.pipeline_spec.root.input_definitions - .parameters['boolean'].default_value.bool_value, True) - - def test_pipeline_with_with_overrides(self): - - @dsl.component - def comp(boolean: bool = True) -> bool: - return boolean - - @dsl.pipeline - def pipeline_with_input(boolean: bool = False): - comp(boolean=boolean) - - with tempfile.TemporaryDirectory() as tmpdir: - pipeline_spec_path = os.path.join(tmpdir, 'output.yaml') - compiler.Compiler().compile( - pipeline_with_input, - pipeline_spec_path, - pipeline_parameters={'boolean': True}) - pipeline_spec = pipeline_spec_from_file(pipeline_spec_path) - - # test inner component interface - self.assertEqual( - pipeline_spec.components['comp-comp'].input_definitions - .parameters['boolean'].default_value.bool_value, True) - - # test pipeline interface - self.assertEqual( - pipeline_spec.root.input_definitions.parameters['boolean'] - .default_value.bool_value, True) - - if __name__ == '__main__': unittest.main() From 867bc863d9466183db0ece4373d4a4b5704303fe Mon Sep 17 00:00:00 2001 From: Josh Ogbonda Date: Tue, 15 Nov 2022 10:41:39 -0800 Subject: [PATCH 10/16] revert conflict fix --- sdk/python/kfp/compiler/compiler_test.py | 134 ----------------------- 1 file changed, 134 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index fe7b2511991..7d95131f04a 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -1350,140 +1350,6 @@ def my_pipeline(): print_op(message='Inside second exit handler.') -class TestBoolInputParameterWithDefaultSerializesCorrectly(unittest.TestCase): - # test with default = True, may have false test successes due to protocol buffer boolean default of False - def test_python_component(self): - - @dsl.component - def comp(boolean: bool = True) -> bool: - return boolean - - # test inner component interface - self.assertEqual( - comp.pipeline_spec.components['comp-comp'].input_definitions - .parameters['boolean'].default_value.bool_value, True) - - # test outer pipeline "wrapper" interface - self.assertEqual( - comp.pipeline_spec.root.input_definitions.parameters['boolean'] - .default_value.bool_value, True) - - def test_python_component_with_overrides(self): - - @dsl.component - def comp(boolean: bool = False) -> bool: - return boolean - - with tempfile.TemporaryDirectory() as tmpdir: - pipeline_spec_path = os.path.join(tmpdir, 'output.yaml') - compiler.Compiler().compile( - comp, pipeline_spec_path, pipeline_parameters={'boolean': True}) - pipeline_spec = pipeline_spec_from_file(pipeline_spec_path) - - # test outer pipeline "wrapper" interface - self.assertEqual( - pipeline_spec.root.input_definitions.parameters['boolean'] - .default_value.bool_value, True) - - def test_container_component(self): - - @dsl.container_component - def comp(boolean: bool = True): - return dsl.ContainerSpec(image='alpine', command=['echo', boolean]) - - # test inner component interface - self.assertEqual( - comp.pipeline_spec.components['comp-comp'].input_definitions - .parameters['boolean'].default_value.bool_value, True) - - # test pipeline "wrapper" interface - self.assertEqual( - comp.pipeline_spec.root.input_definitions.parameters['boolean'] - .default_value.bool_value, True) - - def test_container_component_with_overrides(self): - - @dsl.container_component - def comp(boolean: bool = True): - return dsl.ContainerSpec(image='alpine', command=['echo', boolean]) - - with tempfile.TemporaryDirectory() as tmpdir: - pipeline_spec_path = os.path.join(tmpdir, 'output.yaml') - compiler.Compiler().compile( - comp, pipeline_spec_path, pipeline_parameters={'boolean': True}) - pipeline_spec = pipeline_spec_from_file(pipeline_spec_path) - - # test outer pipeline "wrapper" interface - self.assertEqual( - pipeline_spec.root.input_definitions.parameters['boolean'] - .default_value.bool_value, True) - - def test_pipeline_no_input(self): - - @dsl.component - def comp(boolean: bool = True) -> bool: - return boolean - - @dsl.pipeline - def pipeline_no_input(): - comp() - - # test inner component interface - self.assertEqual( - pipeline_no_input.pipeline_spec.components['comp-comp'] - .input_definitions.parameters['boolean'].default_value.bool_value, - True) - - def test_pipeline_with_input(self): - - @dsl.component - def comp(boolean: bool = True) -> bool: - return boolean - - @dsl.pipeline - def pipeline_with_input(boolean: bool = True): - comp(boolean=boolean) - - # test inner component interface - self.assertEqual( - pipeline_with_input.pipeline_spec.components['comp-comp'] - .input_definitions.parameters['boolean'].default_value.bool_value, - True) - - # test pipeline interface - self.assertEqual( - pipeline_with_input.pipeline_spec.root.input_definitions - .parameters['boolean'].default_value.bool_value, True) - - def test_pipeline_with_with_overrides(self): - - @dsl.component - def comp(boolean: bool = True) -> bool: - return boolean - - @dsl.pipeline - def pipeline_with_input(boolean: bool = False): - comp(boolean=boolean) - - with tempfile.TemporaryDirectory() as tmpdir: - pipeline_spec_path = os.path.join(tmpdir, 'output.yaml') - compiler.Compiler().compile( - pipeline_with_input, - pipeline_spec_path, - pipeline_parameters={'boolean': True}) - pipeline_spec = pipeline_spec_from_file(pipeline_spec_path) - - # test inner component interface - self.assertEqual( - pipeline_spec.components['comp-comp'].input_definitions - .parameters['boolean'].default_value.bool_value, True) - - # test pipeline interface - self.assertEqual( - pipeline_spec.root.input_definitions.parameters['boolean'] - .default_value.bool_value, True) - - class ValidLegalTopologies(unittest.TestCase): def test_inside_of_root_group_permitted(self): From 95932031699e86ff9b5bfc588e9550ded07e81ae Mon Sep 17 00:00:00 2001 From: Josh Ogbonda Date: Thu, 17 Nov 2022 10:58:32 -0800 Subject: [PATCH 11/16] fix changes --- sdk/python/kfp/compiler/compiler_test.py | 444 +++++++++++++++++++++++ 1 file changed, 444 insertions(+) diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index 5b59e17586e..a1a68206a2e 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -1350,5 +1350,449 @@ def my_pipeline(): print_op(message='Inside second exit handler.') +class TestBoolInputParameterWithDefaultSerializesCorrectly(unittest.TestCase): + # test with default = True, may have false test successes due to protocol buffer boolean default of False + def test_python_component(self): + + @dsl.component + def comp(boolean: bool = True) -> bool: + return boolean + + # test inner component interface + self.assertEqual( + comp.pipeline_spec.components['comp-comp'].input_definitions + .parameters['boolean'].default_value.bool_value, True) + + # test outer pipeline "wrapper" interface + self.assertEqual( + comp.pipeline_spec.root.input_definitions.parameters['boolean'] + .default_value.bool_value, True) + + def test_python_component_with_overrides(self): + + @dsl.component + def comp(boolean: bool = False) -> bool: + return boolean + + with tempfile.TemporaryDirectory() as tmpdir: + pipeline_spec_path = os.path.join(tmpdir, 'output.yaml') + compiler.Compiler().compile( + comp, pipeline_spec_path, pipeline_parameters={'boolean': True}) + pipeline_spec = pipeline_spec_from_file(pipeline_spec_path) + + # test outer pipeline "wrapper" interface + self.assertEqual( + pipeline_spec.root.input_definitions.parameters['boolean'] + .default_value.bool_value, True) + + def test_container_component(self): + + @dsl.container_component + def comp(boolean: bool = True): + return dsl.ContainerSpec(image='alpine', command=['echo', boolean]) + + # test inner component interface + self.assertEqual( + comp.pipeline_spec.components['comp-comp'].input_definitions + .parameters['boolean'].default_value.bool_value, True) + + # test pipeline "wrapper" interface + self.assertEqual( + comp.pipeline_spec.root.input_definitions.parameters['boolean'] + .default_value.bool_value, True) + + def test_container_component_with_overrides(self): + + @dsl.container_component + def comp(boolean: bool = True): + return dsl.ContainerSpec(image='alpine', command=['echo', boolean]) + + with tempfile.TemporaryDirectory() as tmpdir: + pipeline_spec_path = os.path.join(tmpdir, 'output.yaml') + compiler.Compiler().compile( + comp, pipeline_spec_path, pipeline_parameters={'boolean': True}) + pipeline_spec = pipeline_spec_from_file(pipeline_spec_path) + + # test outer pipeline "wrapper" interface + self.assertEqual( + pipeline_spec.root.input_definitions.parameters['boolean'] + .default_value.bool_value, True) + + def test_pipeline_no_input(self): + + @dsl.component + def comp(boolean: bool = True) -> bool: + return boolean + + @dsl.pipeline + def pipeline_no_input(): + comp() + + # test inner component interface + self.assertEqual( + pipeline_no_input.pipeline_spec.components['comp-comp'] + .input_definitions.parameters['boolean'].default_value.bool_value, + True) + + def test_pipeline_with_input(self): + + @dsl.component + def comp(boolean: bool = True) -> bool: + return boolean + + @dsl.pipeline + def pipeline_with_input(boolean: bool = True): + comp(boolean=boolean) + + # test inner component interface + self.assertEqual( + pipeline_with_input.pipeline_spec.components['comp-comp'] + .input_definitions.parameters['boolean'].default_value.bool_value, + True) + + # test pipeline interface + self.assertEqual( + pipeline_with_input.pipeline_spec.root.input_definitions + .parameters['boolean'].default_value.bool_value, True) + + def test_pipeline_with_with_overrides(self): + + @dsl.component + def comp(boolean: bool = True) -> bool: + return boolean + + @dsl.pipeline + def pipeline_with_input(boolean: bool = False): + comp(boolean=boolean) + + with tempfile.TemporaryDirectory() as tmpdir: + pipeline_spec_path = os.path.join(tmpdir, 'output.yaml') + compiler.Compiler().compile( + pipeline_with_input, + pipeline_spec_path, + pipeline_parameters={'boolean': True}) + pipeline_spec = pipeline_spec_from_file(pipeline_spec_path) + + # test inner component interface + self.assertEqual( + pipeline_spec.components['comp-comp'].input_definitions + .parameters['boolean'].default_value.bool_value, True) + + # test pipeline interface + self.assertEqual( + pipeline_spec.root.input_definitions.parameters['boolean'] + .default_value.bool_value, True) + + +class ValidLegalTopologies(unittest.TestCase): + + def test_inside_of_root_group_permitted(self): + + @dsl.component + def print_op(message: str): + print(message) + + @dsl.component + def return_1() -> int: + return 1 + + @dsl.pipeline() + def my_pipeline(): + return_1_task = return_1() + + one = print_op(message='1') + two = print_op(message='2') + three = print_op(message='3').after(one) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + + def test_upstream_inside_deeper_condition_blocked(self): + + with self.assertRaisesRegex(RuntimeError, r'Task'): + + @dsl.component + def print_op(message: str): + print(message) + + @dsl.component + def return_1() -> int: + return 1 + + @dsl.pipeline() + def my_pipeline(): + return_1_task = return_1() + + one = print_op(message='1') + with dsl.Condition(return_1_task.output == 1): + two = print_op(message='2') + + three = print_op(message='3').after(two) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + + def test_upstream_in_the_same_condition_permitted(self): + + @dsl.component + def print_op(message: str): + print(message) + + @dsl.component + def return_1() -> int: + return 1 + + @dsl.pipeline() + def my_pipeline(): + return_1_task = return_1() + + with dsl.Condition(return_1_task.output == 1): + one = print_op(message='1') + two = print_op(message='2') + three = print_op(message='3').after(one) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + + def test_downstream_inside_deeper_condition_permitted(self): + + @dsl.component + def print_op(message: str): + print(message) + + @dsl.component + def return_1() -> int: + return + + @dsl.pipeline() + def my_pipeline(): + return_1_task = return_1() + + one = print_op(message='1') + with dsl.Condition(return_1_task.output == 1): + two = print_op(message='2') + three = print_op(message='3').after(one) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + + def test_downstream_and_upstream_in_different_condition_on_same_level_blocked( + self): + + with self.assertRaisesRegex(RuntimeError, r'Task'): + + @dsl.component + def print_op(message: str): + print(message) + + @dsl.component + def return_1() -> int: + return 1 + + @dsl.pipeline() + def my_pipeline(): + return_1_task = return_1() + + one = print_op(message='1') + with dsl.Condition(return_1_task.output == 1): + two = print_op(message='2') + + with dsl.Condition(return_1_task.output == 1): + three = print_op(message='3').after(two) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + + def test_downstream_inside_deeper_nested_condition_permitted(self): + + @dsl.component + def print_op(message: str): + print(message) + + @dsl.component + def return_1() -> int: + return 1 + + @dsl.pipeline() + def my_pipeline(): + return_1_task = return_1() + return_1_task2 = return_1() + + with dsl.Condition(return_1_task.output == 1): + one = print_op(message='1') + with dsl.Condition(return_1_task2.output == 1): + two = print_op(message='2') + three = print_op(message='3').after(one) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + + def test_upstream_inside_deeper_nested_condition_blocked(self): + + with self.assertRaisesRegex(RuntimeError, r'Task'): + + @dsl.component + def print_op(message: str): + print(message) + + @dsl.component + def return_1() -> int: + return 1 + + @dsl.pipeline() + def my_pipeline(): + return_1_task = return_1() + + with dsl.Condition(return_1_task.output == 1): + one = print_op(message='1') + with dsl.Condition(return_1_task.output == 1): + two = print_op(message='2') + three = print_op(message='3').after(two) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + + def test_upstream_in_same_for_loop_with_downstream_permitted(self): + + @dsl.component + def print_op(message: str): + print(message) + + @dsl.component + def args_generator_op() -> List[Dict[str, str]]: + return [{'A_a': '1', 'B_b': '2'}, {'A_a': '10', 'B_b': '20'}] + + @dsl.pipeline() + def my_pipeline(): + args_generator = args_generator_op() + + with dsl.ParallelFor(args_generator.output): + one = print_op(message='1') + two = print_op(message='3').after(one) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + + def test_downstream_not_in_same_for_loop_with_upstream_blocked(self): + + with self.assertRaisesRegex(RuntimeError, r'Task'): + + @dsl.component + def print_op(message: str): + print(message) + + @dsl.component + def args_generator_op() -> List[Dict[str, str]]: + return [{'A_a': '1', 'B_b': '2'}, {'A_a': '10', 'B_b': '20'}] + + @dsl.pipeline() + def my_pipeline(): + args_generator = args_generator_op() + + with dsl.ParallelFor(args_generator.output): + one = print_op(message='1') + two = print_op(message='3').after(one) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + + def test_downstream_not_in_same_for_loop_with_upstream_seperate_blocked( + self): + + with self.assertRaisesRegex(RuntimeError, r'Task'): + + @dsl.component + def print_op(message: str): + print(message) + + @dsl.component + def args_generator_op() -> List[Dict[str, str]]: + return [{'A_a': '1', 'B_b': '2'}, {'A_a': '10', 'B_b': '20'}] + + @dsl.pipeline() + def my_pipeline(): + args_generator = args_generator_op() + + with dsl.ParallelFor(args_generator.output): + one = print_op(message='1') + + with dsl.ParallelFor(args_generator.output): + two = print_op(message='3').after(one) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + + def test_downstream_not_in_same_for_loop_with_upstream_nested_blocked(self): + + with self.assertRaisesRegex(RuntimeError, r'Task'): + + @dsl.component + def print_op(message: str): + print(message) + + @dsl.component + def args_generator_op() -> List[Dict[str, str]]: + return [{'A_a': '1', 'B_b': '2'}, {'A_a': '10', 'B_b': '20'}] + + @dsl.pipeline() + def my_pipeline(): + args_generator = args_generator_op() + + with dsl.ParallelFor(args_generator.output): + one = print_op(message='1') + + with dsl.ParallelFor(args_generator.output): + two = print_op(message='3').after(one) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + + def test_downstream_in_condition_nested_in_a_for_loop(self): + + @dsl.component + def print_op(message: str): + print(message) + + @dsl.component + def return_1() -> int: + return 1 + + @dsl.pipeline() + def my_pipeline(): + return_1_task = return_1() + + with dsl.ParallelFor([1, 2, 3]): + one = print_op(message='1') + with dsl.Condition(return_1_task.output == 1): + two = print_op(message='2').after(one) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + + if __name__ == '__main__': unittest.main() From f01e25e206fe3e599a34b454c7dd7f0df195dc2d Mon Sep 17 00:00:00 2001 From: Josh Ogbonda Date: Tue, 29 Nov 2022 11:25:58 -0800 Subject: [PATCH 12/16] address comments --- sdk/python/kfp/compiler/compiler_test.py | 167 ++++++++-------------- sdk/python/kfp/compiler/compiler_utils.py | 64 ++++----- 2 files changed, 89 insertions(+), 142 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index a1a68206a2e..6de19112034 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -435,7 +435,8 @@ def producer_op() -> str: def dummy_op(msg: str = ''): pass - with self.assertRaisesRegex(RuntimeError, r'Task'): + with self.assertRaisesRegex( + RuntimeError, r'Tasks cannot depend on upstream tasks inside'): @dsl.pipeline(name='test-pipeline') def my_pipeline(val: bool): @@ -475,7 +476,8 @@ def producer_op() -> str: def dummy_op(msg: str = ''): pass - with self.assertRaisesRegex(RuntimeError, r'Task'): + with self.assertRaisesRegex( + RuntimeError, r'Tasks cannot depend on upstream tasks inside'): @dsl.pipeline(name='test-pipeline') def my_pipeline(val: bool): @@ -515,7 +517,8 @@ def producer_op() -> str: def dummy_op(msg: str = ''): pass - with self.assertRaisesRegex(RuntimeError, r'Task'): + with self.assertRaisesRegex( + RuntimeError, r'Tasks cannot depend on upstream tasks inside'): @dsl.pipeline(name='test-pipeline') def my_pipeline(val: bool): @@ -1484,17 +1487,25 @@ def pipeline_with_input(boolean: bool = False): .default_value.bool_value, True) -class ValidLegalTopologies(unittest.TestCase): +# helper component defintions for the ValidLegalTopologies tests +@dsl.component +def print_op(message: str): + print(message) - def test_inside_of_root_group_permitted(self): - @dsl.component - def print_op(message: str): - print(message) +@dsl.component +def return_1() -> int: + return 1 - @dsl.component - def return_1() -> int: - return 1 + +@dsl.component +def args_generator_op() -> List[Dict[str, str]]: + return [{'A_a': '1', 'B_b': '2'}, {'A_a': '10', 'B_b': '20'}] + + +class ValidLegalTopologies(unittest.TestCase): + + def test_inside_of_root_group_permitted(self): @dsl.pipeline() def my_pipeline(): @@ -1502,7 +1513,7 @@ def my_pipeline(): one = print_op(message='1') two = print_op(message='2') - three = print_op(message='3').after(one) + three = print_op(message=str(return_1_task.output)) with tempfile.TemporaryDirectory() as tempdir: package_path = os.path.join(tempdir, 'pipeline.yaml') @@ -1511,15 +1522,8 @@ def my_pipeline(): def test_upstream_inside_deeper_condition_blocked(self): - with self.assertRaisesRegex(RuntimeError, r'Task'): - - @dsl.component - def print_op(message: str): - print(message) - - @dsl.component - def return_1() -> int: - return 1 + with self.assertRaisesRegex( + RuntimeError, r'Tasks cannot depend on upstream tasks inside'): @dsl.pipeline() def my_pipeline(): @@ -1538,22 +1542,14 @@ def my_pipeline(): def test_upstream_in_the_same_condition_permitted(self): - @dsl.component - def print_op(message: str): - print(message) - - @dsl.component - def return_1() -> int: - return 1 - @dsl.pipeline() def my_pipeline(): return_1_task = return_1() with dsl.Condition(return_1_task.output == 1): - one = print_op(message='1') + one = return_1() two = print_op(message='2') - three = print_op(message='3').after(one) + three = print_op(message=str(one.output)) with tempfile.TemporaryDirectory() as tempdir: package_path = os.path.join(tempdir, 'pipeline.yaml') @@ -1562,14 +1558,6 @@ def my_pipeline(): def test_downstream_inside_deeper_condition_permitted(self): - @dsl.component - def print_op(message: str): - print(message) - - @dsl.component - def return_1() -> int: - return - @dsl.pipeline() def my_pipeline(): return_1_task = return_1() @@ -1587,15 +1575,8 @@ def my_pipeline(): def test_downstream_and_upstream_in_different_condition_on_same_level_blocked( self): - with self.assertRaisesRegex(RuntimeError, r'Task'): - - @dsl.component - def print_op(message: str): - print(message) - - @dsl.component - def return_1() -> int: - return 1 + with self.assertRaisesRegex( + RuntimeError, r'Tasks cannot depend on upstream tasks inside'): @dsl.pipeline() def my_pipeline(): @@ -1615,24 +1596,16 @@ def my_pipeline(): def test_downstream_inside_deeper_nested_condition_permitted(self): - @dsl.component - def print_op(message: str): - print(message) - - @dsl.component - def return_1() -> int: - return 1 - @dsl.pipeline() def my_pipeline(): return_1_task = return_1() return_1_task2 = return_1() with dsl.Condition(return_1_task.output == 1): - one = print_op(message='1') + one = return_1() with dsl.Condition(return_1_task2.output == 1): two = print_op(message='2') - three = print_op(message='3').after(one) + three = print_op(message=str(one.output)) with tempfile.TemporaryDirectory() as tempdir: package_path = os.path.join(tempdir, 'pipeline.yaml') @@ -1641,15 +1614,8 @@ def my_pipeline(): def test_upstream_inside_deeper_nested_condition_blocked(self): - with self.assertRaisesRegex(RuntimeError, r'Task'): - - @dsl.component - def print_op(message: str): - print(message) - - @dsl.component - def return_1() -> int: - return 1 + with self.assertRaisesRegex( + RuntimeError, r'Tasks cannot depend on upstream tasks inside'): @dsl.pipeline() def my_pipeline(): @@ -1668,14 +1634,6 @@ def my_pipeline(): def test_upstream_in_same_for_loop_with_downstream_permitted(self): - @dsl.component - def print_op(message: str): - print(message) - - @dsl.component - def args_generator_op() -> List[Dict[str, str]]: - return [{'A_a': '1', 'B_b': '2'}, {'A_a': '10', 'B_b': '20'}] - @dsl.pipeline() def my_pipeline(): args_generator = args_generator_op() @@ -1691,15 +1649,8 @@ def my_pipeline(): def test_downstream_not_in_same_for_loop_with_upstream_blocked(self): - with self.assertRaisesRegex(RuntimeError, r'Task'): - - @dsl.component - def print_op(message: str): - print(message) - - @dsl.component - def args_generator_op() -> List[Dict[str, str]]: - return [{'A_a': '1', 'B_b': '2'}, {'A_a': '10', 'B_b': '20'}] + with self.assertRaisesRegex( + RuntimeError, r'Tasks cannot depend on upstream tasks inside'): @dsl.pipeline() def my_pipeline(): @@ -1717,15 +1668,8 @@ def my_pipeline(): def test_downstream_not_in_same_for_loop_with_upstream_seperate_blocked( self): - with self.assertRaisesRegex(RuntimeError, r'Task'): - - @dsl.component - def print_op(message: str): - print(message) - - @dsl.component - def args_generator_op() -> List[Dict[str, str]]: - return [{'A_a': '1', 'B_b': '2'}, {'A_a': '10', 'B_b': '20'}] + with self.assertRaisesRegex( + RuntimeError, r'Tasks cannot depend on upstream tasks inside'): @dsl.pipeline() def my_pipeline(): @@ -1744,15 +1688,10 @@ def my_pipeline(): def test_downstream_not_in_same_for_loop_with_upstream_nested_blocked(self): - with self.assertRaisesRegex(RuntimeError, r'Task'): - - @dsl.component - def print_op(message: str): - print(message) - - @dsl.component - def args_generator_op() -> List[Dict[str, str]]: - return [{'A_a': '1', 'B_b': '2'}, {'A_a': '10', 'B_b': '20'}] + with self.assertRaisesRegex( + RuntimeError, + r'Downstream task cannot depend on an upstream task while in a nested' + ): @dsl.pipeline() def my_pipeline(): @@ -1771,14 +1710,6 @@ def my_pipeline(): def test_downstream_in_condition_nested_in_a_for_loop(self): - @dsl.component - def print_op(message: str): - print(message) - - @dsl.component - def return_1() -> int: - return 1 - @dsl.pipeline() def my_pipeline(): return_1_task = return_1() @@ -1793,6 +1724,22 @@ def my_pipeline(): compiler.Compiler().compile( pipeline_func=my_pipeline, package_path=package_path) + def test_downstream_in_a_for_loop_nested_in_a_condition(self): + + @dsl.pipeline() + def my_pipeline(): + return_1_task = return_1() + + with dsl.Condition(return_1_task.output == 1): + one = print_op(message='1') + with dsl.ParallelFor([1, 2, 3]): + two = print_op(message='2').after(one) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + if __name__ == '__main__': unittest.main() diff --git a/sdk/python/kfp/compiler/compiler_utils.py b/sdk/python/kfp/compiler/compiler_utils.py index 583bb90d079..bb5588cb31a 100644 --- a/sdk/python/kfp/compiler/compiler_utils.py +++ b/sdk/python/kfp/compiler/compiler_utils.py @@ -14,6 +14,7 @@ """Utility methods for compiler implementation that is IR-agnostic.""" import collections +from copy import deepcopy from typing import Dict, List, Mapping, Set, Tuple, Union from kfp.components import for_loop @@ -427,44 +428,43 @@ def get_dependencies( task2=task, ) - # ParralelFor Check - for parent in task_name_to_parent_groups[upstream_task.name]: - parent = group_name_to_group.get(parent, None) - if isinstance(parent, tasks_group.ParallelFor): - exception = True - if parent.name in task_name_to_parent_groups[task.name]: - exception = False - idx = task_name_to_parent_groups[task.name].index( - parent.name) - cnt = 0 - for ancestors in task_name_to_parent_groups[ - task.name][idx:]: - ancestors = group_name_to_group.get(ancestors, None) - if isinstance(ancestors, tasks_group.ParallelFor): - cnt += 1 - if cnt > 1: - exception = True - break + # uncommon upstream ancestor check + uncommon_upstream_groups = deepcopy(upstream_groups) + uncommon_upstream_groups.remove( + upstream_task.name + ) # because a task's `upstream_groups` contains the task's name + if uncommon_upstream_groups: + dependent_group = group_name_to_group.get( + upstream_groups[0], None) + if isinstance(dependent_group, tasks_group.ExitHandler): + task_group_type = 'an ' + tasks_group.ExitHandler.__name__ - if exception: - raise RuntimeError( - f'Tasks cannot depend on upstream tasks inside a ParallelFor. Task {task.name} depends on upstream task {upstream_task.name}.' - ) + elif isinstance(dependent_group, tasks_group.Condition): + task_group_type = 'a ' + tasks_group.Condition.__name__ - # Condition check - dependent_group = group_name_to_group.get(upstream_groups[0], None) - if isinstance(dependent_group, tasks_group.Condition): - raise RuntimeError( - f'Tasks cannot depend on upstream tasks inside a Condition that is not a common ancestor of both tasks. Task {task.name} depends on upstream task {upstream_task.name}.' - ) + else: + task_group_type = 'a ' + tasks_group.ParallelFor.__name__ - # ExitHandler check - dependent_group = group_name_to_group.get(upstream_groups[0], None) - if isinstance(dependent_group, tasks_group.ExitHandler): raise RuntimeError( - f'Tasks cannot depend on upstream tasks inside a Exithandler that is not a common ancestor of both tasks. Task {task.name} depends on upstream task {upstream_task.name}.' + f'Tasks cannot depend on upstream tasks inside {task_group_type} that is not a common ancestor of both tasks. Task {task.name} depends on upstream task {upstream_task.name}.' ) + # ParralelFor Nested Check + # if there is a parrallelFor group type in the upstream parents tasks and there also exists a parallelFor in the uncommon_ancestors of downstream: this means a nested for loop exists in the DAG + upstream_parent_tasks = task_name_to_parent_groups[ + upstream_task.name] + for group in downstream_groups: + if isinstance( + group_name_to_group.get(group, None), + tasks_group.ParallelFor): + for parent_task in upstream_parent_tasks: + if isinstance( + group_name_to_group.get(parent_task, None), + tasks_group.ParallelFor): + raise RuntimeError( + f'Downstream task cannot depend on an upstream task while in a nested {tasks_group.ParallelFor.__name__} group. Task {task.name} depends on upstream task {upstream_task.name}, while {group} is nested in {parent_task}' + ) + dependencies[downstream_groups[0]].add(upstream_groups[0]) return dependencies From 0a0d15b6195093861420757b4b12f54cd8bc4bf3 Mon Sep 17 00:00:00 2001 From: Josh Ogbonda Date: Wed, 30 Nov 2022 11:03:09 -0800 Subject: [PATCH 13/16] review --- sdk/python/kfp/compiler/compiler_test.py | 172 +++++++++++++++++- sdk/python/kfp/compiler/compiler_utils.py | 6 +- .../kfp/components/pipeline_task_test.py | 132 -------------- 3 files changed, 165 insertions(+), 145 deletions(-) diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index 6de19112034..a1c1d272ba1 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -436,7 +436,8 @@ def dummy_op(msg: str = ''): pass with self.assertRaisesRegex( - RuntimeError, r'Tasks cannot depend on upstream tasks inside'): + RuntimeError, + r'Tasks cannot depend on an upstream task inside'): @dsl.pipeline(name='test-pipeline') def my_pipeline(val: bool): @@ -477,7 +478,8 @@ def dummy_op(msg: str = ''): pass with self.assertRaisesRegex( - RuntimeError, r'Tasks cannot depend on upstream tasks inside'): + RuntimeError, + r'Tasks cannot depend on an upstream task inside'): @dsl.pipeline(name='test-pipeline') def my_pipeline(val: bool): @@ -518,7 +520,8 @@ def dummy_op(msg: str = ''): pass with self.assertRaisesRegex( - RuntimeError, r'Tasks cannot depend on upstream tasks inside'): + RuntimeError, + r'Tasks cannot depend on an upstream task inside'): @dsl.pipeline(name='test-pipeline') def my_pipeline(val: bool): @@ -1503,7 +1506,7 @@ def args_generator_op() -> List[Dict[str, str]]: return [{'A_a': '1', 'B_b': '2'}, {'A_a': '10', 'B_b': '20'}] -class ValidLegalTopologies(unittest.TestCase): +class TestValidLegalTopologies(unittest.TestCase): def test_inside_of_root_group_permitted(self): @@ -1523,7 +1526,8 @@ def my_pipeline(): def test_upstream_inside_deeper_condition_blocked(self): with self.assertRaisesRegex( - RuntimeError, r'Tasks cannot depend on upstream tasks inside'): + RuntimeError, + r'Tasks cannot depend on an upstream task inside'): @dsl.pipeline() def my_pipeline(): @@ -1576,7 +1580,8 @@ def test_downstream_and_upstream_in_different_condition_on_same_level_blocked( self): with self.assertRaisesRegex( - RuntimeError, r'Tasks cannot depend on upstream tasks inside'): + RuntimeError, + r'Tasks cannot depend on an upstream task inside'): @dsl.pipeline() def my_pipeline(): @@ -1615,7 +1620,8 @@ def my_pipeline(): def test_upstream_inside_deeper_nested_condition_blocked(self): with self.assertRaisesRegex( - RuntimeError, r'Tasks cannot depend on upstream tasks inside'): + RuntimeError, + r'Tasks cannot depend on an upstream task inside'): @dsl.pipeline() def my_pipeline(): @@ -1650,7 +1656,8 @@ def my_pipeline(): def test_downstream_not_in_same_for_loop_with_upstream_blocked(self): with self.assertRaisesRegex( - RuntimeError, r'Tasks cannot depend on upstream tasks inside'): + RuntimeError, + r'Tasks cannot depend on an upstream task inside'): @dsl.pipeline() def my_pipeline(): @@ -1669,7 +1676,8 @@ def test_downstream_not_in_same_for_loop_with_upstream_seperate_blocked( self): with self.assertRaisesRegex( - RuntimeError, r'Tasks cannot depend on upstream tasks inside'): + RuntimeError, + r'Tasks cannot depend on an upstream task inside'): @dsl.pipeline() def my_pipeline(): @@ -1690,7 +1698,7 @@ def test_downstream_not_in_same_for_loop_with_upstream_nested_blocked(self): with self.assertRaisesRegex( RuntimeError, - r'Downstream task cannot depend on an upstream task while in a nested' + r'Downstream tasks in a nested ParallelFor group cannot depend on an upstream task in a shallower ParallelFor group.' ): @dsl.pipeline() @@ -1740,6 +1748,150 @@ def my_pipeline(): compiler.Compiler().compile( pipeline_func=my_pipeline, package_path=package_path) + def test_downstream_in_a_nested_for_loop_not_related_to_upstream(self): + + @dsl.pipeline() + def my_pipeline(): + return_1_task = return_1() + + with dsl.ParallelFor([1, 2, 3]): + one = print_op(message='1') + with dsl.ParallelFor([1, 2, 3]): + two = print_op(message='2').after(return_1_task) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + + +class TestCannotUseAfterCrossDAG(unittest.TestCase): + + def test_inner_task_prevented(self): + with self.assertRaisesRegex(RuntimeError, r'Task'): + + @dsl.component + def print_op(message: str): + print(message) + + @dsl.pipeline(name='pipeline-with-multiple-exit-handlers') + def my_pipeline(): + first_exit_task = print_op(message='First exit task.') + + with dsl.ExitHandler(first_exit_task): + first_print_op = print_op( + message='Inside first exit handler.') + + second_exit_task = print_op(message='Second exit task.') + with dsl.ExitHandler(second_exit_task): + print_op(message='Inside second exit handler.').after( + first_print_op) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + + def test_exit_handler_task_prevented(self): + with self.assertRaisesRegex(RuntimeError, r'Task'): + + @dsl.component + def print_op(message: str): + print(message) + + @dsl.pipeline(name='pipeline-with-multiple-exit-handlers') + def my_pipeline(): + first_exit_task = print_op(message='First exit task.') + + with dsl.ExitHandler(first_exit_task): + first_print_op = print_op( + message='Inside first exit handler.') + + second_exit_task = print_op(message='Second exit task.') + with dsl.ExitHandler(second_exit_task): + x = print_op(message='Inside second exit handler.') + x.after(first_print_op) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + + def test_within_same_exit_handler_permitted(self): + + @dsl.component + def print_op(message: str): + print(message) + + @dsl.pipeline(name='pipeline-with-multiple-exit-handlers') + def my_pipeline(): + first_exit_task = print_op(message='First exit task.') + + with dsl.ExitHandler(first_exit_task): + first_print_op = print_op( + message='First task inside first exit handler.') + second_print_op = print_op( + message='Second task inside first exit handler.').after( + first_print_op) + + second_exit_task = print_op(message='Second exit task.') + with dsl.ExitHandler(second_exit_task): + print_op(message='Inside second exit handler.') + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + + def test_outside_of_condition_blocked(self): + with self.assertRaisesRegex(RuntimeError, r'Task'): + + @dsl.component + def print_op(message: str): + print(message) + + @dsl.component + def return_1() -> int: + return 1 + + @dsl.pipeline(name='pipeline-with-multiple-exit-handlers') + def my_pipeline(): + return_1_task = return_1() + + with dsl.Condition(return_1_task.output == 1): + one = print_op(message='1') + two = print_op(message='2') + three = print_op(message='3').after(one) + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + + def test_inside_of_condition_permitted(self): + + @dsl.component + def print_op(message: str): + print(message) + + @dsl.component + def return_1() -> int: + return 1 + + @dsl.pipeline(name='pipeline-with-multiple-exit-handlers') + def my_pipeline(): + return_1_task = return_1() + + with dsl.Condition(return_1_task.output == '1'): + one = print_op(message='1') + two = print_op(message='2').after(one) + three = print_op(message='3') + + with tempfile.TemporaryDirectory() as tempdir: + package_path = os.path.join(tempdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=package_path) + if __name__ == '__main__': unittest.main() diff --git a/sdk/python/kfp/compiler/compiler_utils.py b/sdk/python/kfp/compiler/compiler_utils.py index bb5588cb31a..6efd9617069 100644 --- a/sdk/python/kfp/compiler/compiler_utils.py +++ b/sdk/python/kfp/compiler/compiler_utils.py @@ -435,7 +435,7 @@ def get_dependencies( ) # because a task's `upstream_groups` contains the task's name if uncommon_upstream_groups: dependent_group = group_name_to_group.get( - upstream_groups[0], None) + uncommon_upstream_groups[0], None) if isinstance(dependent_group, tasks_group.ExitHandler): task_group_type = 'an ' + tasks_group.ExitHandler.__name__ @@ -446,7 +446,7 @@ def get_dependencies( task_group_type = 'a ' + tasks_group.ParallelFor.__name__ raise RuntimeError( - f'Tasks cannot depend on upstream tasks inside {task_group_type} that is not a common ancestor of both tasks. Task {task.name} depends on upstream task {upstream_task.name}.' + f'Tasks cannot depend on an upstream task inside {task_group_type} that is not a common ancestor of both tasks. Task {task.name} depends on upstream task {upstream_task.name}.' ) # ParralelFor Nested Check @@ -462,7 +462,7 @@ def get_dependencies( group_name_to_group.get(parent_task, None), tasks_group.ParallelFor): raise RuntimeError( - f'Downstream task cannot depend on an upstream task while in a nested {tasks_group.ParallelFor.__name__} group. Task {task.name} depends on upstream task {upstream_task.name}, while {group} is nested in {parent_task}' + f'Downstream tasks in a nested {tasks_group.ParallelFor.__name__} group cannot depend on an upstream task in a shallower {tasks_group.ParallelFor.__name__} group. Task {task.name} depends on upstream task {upstream_task.name}, while {group} is nested in {parent_task}.' ) dependencies[downstream_groups[0]].add(upstream_groups[0]) diff --git a/sdk/python/kfp/components/pipeline_task_test.py b/sdk/python/kfp/components/pipeline_task_test.py index e87314837c4..672fb85d43f 100644 --- a/sdk/python/kfp/components/pipeline_task_test.py +++ b/sdk/python/kfp/components/pipeline_task_test.py @@ -13,14 +13,10 @@ # limitations under the License. """Tests for kfp.components.pipeline_task.""" -import os -import tempfile import textwrap import unittest from absl.testing import parameterized -from kfp import compiler -from kfp import dsl from kfp.components import pipeline_task from kfp.components import placeholders from kfp.components import structures @@ -297,133 +293,5 @@ def test_set_display_name(self): self.assertEqual('test_name', task._task_spec.display_name) -class TestCannotUseAfterCrossDAG(unittest.TestCase): - - def test_inner_task_prevented(self): - with self.assertRaisesRegex(RuntimeError, r'Task'): - - @dsl.component - def print_op(message: str): - print(message) - - @dsl.pipeline(name='pipeline-with-multiple-exit-handlers') - def my_pipeline(): - first_exit_task = print_op(message='First exit task.') - - with dsl.ExitHandler(first_exit_task): - first_print_op = print_op( - message='Inside first exit handler.') - - second_exit_task = print_op(message='Second exit task.') - with dsl.ExitHandler(second_exit_task): - print_op(message='Inside second exit handler.').after( - first_print_op) - - with tempfile.TemporaryDirectory() as tempdir: - package_path = os.path.join(tempdir, 'pipeline.yaml') - compiler.Compiler().compile( - pipeline_func=my_pipeline, package_path=package_path) - - def test_exit_handler_task_prevented(self): - with self.assertRaisesRegex(RuntimeError, r'Task'): - - @dsl.component - def print_op(message: str): - print(message) - - @dsl.pipeline(name='pipeline-with-multiple-exit-handlers') - def my_pipeline(): - first_exit_task = print_op(message='First exit task.') - - with dsl.ExitHandler(first_exit_task): - first_print_op = print_op( - message='Inside first exit handler.') - - second_exit_task = print_op(message='Second exit task.') - with dsl.ExitHandler(second_exit_task): - x = print_op(message='Inside second exit handler.') - x.after(first_print_op) - - with tempfile.TemporaryDirectory() as tempdir: - package_path = os.path.join(tempdir, 'pipeline.yaml') - compiler.Compiler().compile( - pipeline_func=my_pipeline, package_path=package_path) - - def test_within_same_exit_handler_permitted(self): - - @dsl.component - def print_op(message: str): - print(message) - - @dsl.pipeline(name='pipeline-with-multiple-exit-handlers') - def my_pipeline(): - first_exit_task = print_op(message='First exit task.') - - with dsl.ExitHandler(first_exit_task): - first_print_op = print_op( - message='First task inside first exit handler.') - second_print_op = print_op( - message='Second task inside first exit handler.').after( - first_print_op) - - second_exit_task = print_op(message='Second exit task.') - with dsl.ExitHandler(second_exit_task): - print_op(message='Inside second exit handler.') - - with tempfile.TemporaryDirectory() as tempdir: - package_path = os.path.join(tempdir, 'pipeline.yaml') - compiler.Compiler().compile( - pipeline_func=my_pipeline, package_path=package_path) - - def test_outside_of_condition_blocked(self): - with self.assertRaisesRegex(RuntimeError, r'Task'): - - @dsl.component - def print_op(message: str): - print(message) - - @dsl.component - def return_1() -> int: - return 1 - - @dsl.pipeline(name='pipeline-with-multiple-exit-handlers') - def my_pipeline(): - return_1_task = return_1() - - with dsl.Condition(return_1_task.output == 1): - one = print_op(message='1') - two = print_op(message='2') - three = print_op(message='3').after(one) - - with tempfile.TemporaryDirectory() as tempdir: - package_path = os.path.join(tempdir, 'pipeline.yaml') - compiler.Compiler().compile( - pipeline_func=my_pipeline, package_path=package_path) - - def test_inside_of_condition_permitted(self): - - @dsl.component - def print_op(message: str): - print(message) - - @dsl.component - def return_1() -> int: - return 1 - - @dsl.pipeline(name='pipeline-with-multiple-exit-handlers') - def my_pipeline(): - return_1_task = return_1() - - with dsl.Condition(return_1_task.output == '1'): - one = print_op(message='1') - two = print_op(message='2').after(one) - three = print_op(message='3') - - with tempfile.TemporaryDirectory() as tempdir: - package_path = os.path.join(tempdir, 'pipeline.yaml') - compiler.Compiler().compile( - pipeline_func=my_pipeline, package_path=package_path) - - if __name__ == '__main__': unittest.main() From ecbd172c7ef25dc4dedf408ec5268b1fcb898c6f Mon Sep 17 00:00:00 2001 From: Josh Ogbonda Date: Thu, 1 Dec 2022 12:03:51 -0800 Subject: [PATCH 14/16] docformatter presubmit --- test/presubmit-docformatter-sdk.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/presubmit-docformatter-sdk.sh b/test/presubmit-docformatter-sdk.sh index 9856b32aae3..6eb877b631a 100755 --- a/test/presubmit-docformatter-sdk.sh +++ b/test/presubmit-docformatter-sdk.sh @@ -18,4 +18,4 @@ source_root=$(pwd) python3 -m pip install --upgrade pip python3 -m pip install $(grep 'docformatter==' sdk/python/requirements-dev.txt) -docformatter --check --recursive "${source_root}/sdk/python/" --exclude "${source_root}/sdk/python/kfp/deprecated" +docformatter --check --recursive "${source_root}/sdk/python/" --exclude "compiler_test.py" From 6ea91c0ed1d85222be54cdf692152e21a7a493db Mon Sep 17 00:00:00 2001 From: Josh Ogbonda Date: Thu, 1 Dec 2022 12:48:32 -0800 Subject: [PATCH 15/16] revert docformatter --- test/presubmit-docformatter-sdk.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/presubmit-docformatter-sdk.sh b/test/presubmit-docformatter-sdk.sh index 6eb877b631a..9856b32aae3 100755 --- a/test/presubmit-docformatter-sdk.sh +++ b/test/presubmit-docformatter-sdk.sh @@ -18,4 +18,4 @@ source_root=$(pwd) python3 -m pip install --upgrade pip python3 -m pip install $(grep 'docformatter==' sdk/python/requirements-dev.txt) -docformatter --check --recursive "${source_root}/sdk/python/" --exclude "compiler_test.py" +docformatter --check --recursive "${source_root}/sdk/python/" --exclude "${source_root}/sdk/python/kfp/deprecated" From 35930362c02f967cd5222f80f76b0f9adb5227a7 Mon Sep 17 00:00:00 2001 From: Josh Ogbonda Date: Thu, 1 Dec 2022 13:48:11 -0800 Subject: [PATCH 16/16] update release.md --- sdk/RELEASE.md | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/RELEASE.md b/sdk/RELEASE.md index ee38796c735..2b8442cafb4 100644 --- a/sdk/RELEASE.md +++ b/sdk/RELEASE.md @@ -7,6 +7,7 @@ ## Deprecations ## Bug fixes and other changes +* Unblock valid topologies [\#8416](https://github.com/kubeflow/pipelines/pull/8416) ## Documentation updates