diff --git a/sdk/python/kfp/components/_components.py b/sdk/python/kfp/components/_components.py index 325a8f672b1..1cc670e4ede 100644 --- a/sdk/python/kfp/components/_components.py +++ b/sdk/python/kfp/components/_components.py @@ -30,6 +30,8 @@ from .structures import * from ._data_passing import serialize_value, get_canonical_type_for_type_name +from kfp.v2.components.types import type_utils + _default_component_name = 'Component' @@ -124,7 +126,8 @@ def load_component_from_text(text): def load_component_from_spec(component_spec): - """Loads component from a ComponentSpec and creates a task factory function. + """Loads component from a ComponentSpec and creates a task factory + function. Args: component_spec: A ComponentSpec containing the component definition. @@ -136,7 +139,7 @@ def load_component_from_spec(component_spec): if component_spec is None: raise TypeError return _create_task_factory_from_component_spec( - component_spec=component_spec) + component_spec=component_spec) def _fix_component_uri(uri: str) -> str: @@ -416,7 +419,9 @@ def component_default_to_func_default(component_default: str, port.type) if port.type else inspect.Parameter.empty), default=component_default_to_func_default(port.default, port.optional), - ) for port in reordered_input_list + ) + for port in reordered_input_list + if not type_utils.is_task_final_status_type(port.type) ] factory_function_parameters = input_parameters #Outputs are no longer part of the task factory function signature. The paths are always generated by the system. @@ -504,7 +509,8 @@ def expand_command_part(arg) -> Union[str, List[str], None]: inputs_consumed_by_value[input_name] = serialized_argument return serialized_argument else: - if input_spec.optional: + if input_spec.optional or type_utils.is_task_final_status_type( + input_spec.type): return None else: raise ValueError( diff --git a/sdk/python/kfp/dsl/_component_bridge.py b/sdk/python/kfp/dsl/_component_bridge.py index a7277fe3518..cb374d319d9 100644 --- a/sdk/python/kfp/dsl/_component_bridge.py +++ b/sdk/python/kfp/dsl/_component_bridge.py @@ -433,15 +433,15 @@ def _resolve_ir_placeholders_v2( if isinstance(arg, _structures.InputValuePlaceholder): input_name = arg.input_name input_value = arguments.get(input_name, None) - if input_value is not None: + input_spec = inputs_dict[input_name] + if input_value is not None or type_utils.is_task_final_status_type( + input_spec.type): return _input_parameter_placeholder(input_name) + elif input_spec.optional: + return None else: - input_spec = inputs_dict[input_name] - if input_spec.optional: - return None - else: - raise ValueError( - 'No value provided for input {}'.format(input_name)) + raise ValueError( + 'No value provided for input {}'.format(input_name)) elif isinstance(arg, _structures.InputUriPlaceholder): input_name = arg.input_name diff --git a/sdk/python/kfp/dsl/_ops_group.py b/sdk/python/kfp/dsl/_ops_group.py index a74ebb6c8ca..e3cd4fc1016 100644 --- a/sdk/python/kfp/dsl/_ops_group.py +++ b/sdk/python/kfp/dsl/_ops_group.py @@ -11,7 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import Union + +from typing import Optional, Union import uuid from kfp.dsl import _for_loop, _pipeline_param @@ -144,8 +145,16 @@ class ExitHandler(OpsGroup): op2 = ContainerOp(...) """ - def __init__(self, exit_op: _container_op.ContainerOp): + def __init__( + self, + exit_op: _container_op.ContainerOp, + name: Optional[str] = None, + ): super(ExitHandler, self).__init__('exit_handler') + + # Use user specified name as display name directly, instead of passing + # it to the super class constructor. + self.display_name = name if exit_op.dependent_names: raise ValueError('exit_op cannot depend on any other ops.') diff --git a/sdk/python/kfp/dsl/component_spec.py b/sdk/python/kfp/dsl/component_spec.py index 756849e8574..6170900f368 100644 --- a/sdk/python/kfp/dsl/component_spec.py +++ b/sdk/python/kfp/dsl/component_spec.py @@ -93,9 +93,17 @@ def build_component_spec_from_structure( result.executor_label = executor_label for input_spec in component_spec.inputs or []: + + # Special handling for PipelineTaskFinalStatus first. + if type_utils.is_task_final_status_type(input_spec.type): + result.input_definitions.parameters[ + input_spec.name].type = pipeline_spec_pb2.PrimitiveType.STRING + continue + # skip inputs not present if input_spec.name not in actual_inputs: continue + if type_utils.is_parameter_type(input_spec.type): result.input_definitions.parameters[ input_spec.name].type = type_utils.get_parameter_type( diff --git a/sdk/python/kfp/v2/compiler/compiler.py b/sdk/python/kfp/v2/compiler/compiler.py index a6696be4705..01dd46bbde3 100644 --- a/sdk/python/kfp/v2/compiler/compiler.py +++ b/sdk/python/kfp/v2/compiler/compiler.py @@ -36,6 +36,7 @@ from kfp.pipeline_spec import pipeline_spec_pb2 from kfp.v2.compiler import compiler_utils from kfp.v2.components import component_factory +from kfp.v2.components import task_final_status from kfp.v2.components.types import artifact_types, type_utils _GroupOrOp = Union[dsl.OpsGroup, dsl.BaseOp] @@ -728,6 +729,13 @@ def _group_to_dag_spec( deployment_config.executors[ importer_exec_label].importer.CopyFrom( subgroup.importer_spec) + else: + for input_spec in subgroup._metadata.inputs or []: + if type_utils.is_task_final_status_type( + input_spec.type): + raise ValueError( + f'{task_final_status.PipelineTaskFinalStatus.__name__}' + ' can only be used in an exit task.') # Task level caching option. subgroup.task_spec.caching_options.enable_cache = subgroup.enable_caching @@ -1045,7 +1053,14 @@ def _create_pipeline_spec( task_name = exit_handler_op.name display_name = exit_handler_op.display_name - exit_handler_op.task_spec.task_info.name = display_name or task_name + exit_handler_op.task_spec.task_info.name = ( + display_name or task_name) + for input_spec in exit_handler_op._metadata.inputs or []: + if type_utils.is_task_final_status_type(input_spec.type): + exit_handler_op.task_spec.inputs.parameters[ + input_spec.name].task_final_status.producer_task = ( + first_group.name) + exit_handler_op.task_spec.dependent_tasks.extend( pipeline_spec.root.dag.tasks.keys()) exit_handler_op.task_spec.trigger_policy.strategy = ( diff --git a/sdk/python/kfp/v2/compiler/compiler_test.py b/sdk/python/kfp/v2/compiler/compiler_test.py index e0033bf5dd0..d291b626786 100644 --- a/sdk/python/kfp/v2/compiler/compiler_test.py +++ b/sdk/python/kfp/v2/compiler/compiler_test.py @@ -21,6 +21,7 @@ from kfp import components from kfp.v2 import compiler from kfp.v2 import dsl +from kfp.v2.dsl import PipelineTaskFinalStatus from kfp.dsl import types VALID_PRODUCER_COMPONENT_SAMPLE = components.load_component_from_text(""" @@ -451,6 +452,46 @@ def my_pipeline(): compiler.Compiler().compile( pipeline_func=my_pipeline, package_path='result.json') + def test_use_task_final_status_in_non_exit_op(self): + + @dsl.component + def print_op(status: PipelineTaskFinalStatus): + return status + + @dsl.pipeline(name='test-pipeline') + def my_pipeline(): + print_op() + + with self.assertRaisesRegex( + ValueError, + 'PipelineTaskFinalStatus can only be used in an exit task.'): + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path='result.json') + + def test_use_task_final_status_in_non_exit_op_yaml(self): + + print_op = components.load_component_from_text(""" +name: Print Op +inputs: +- {name: message, type: PipelineTaskFinalStatus} +implementation: + container: + image: python:3.7 + command: + - echo + - {inputValue: message} +""") + + @dsl.pipeline(name='test-pipeline') + def my_pipeline(): + print_op() + + with self.assertRaisesRegex( + ValueError, + 'PipelineTaskFinalStatus can only be used in an exit task.'): + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path='result.json') + if __name__ == '__main__': unittest.main() diff --git a/sdk/python/kfp/v2/compiler_cli_tests/compiler_cli_tests.py b/sdk/python/kfp/v2/compiler_cli_tests/compiler_cli_tests.py index 60f77a9fb9f..d846779dc22 100644 --- a/sdk/python/kfp/v2/compiler_cli_tests/compiler_cli_tests.py +++ b/sdk/python/kfp/v2/compiler_cli_tests/compiler_cli_tests.py @@ -202,6 +202,12 @@ def test_experimental_v2_component(self): def test_pipeline_with_gcpc_types(self): self._test_compile_py_to_json('pipeline_with_gcpc_types') + def test_pipeline_with_task_final_status(self): + self._test_compile_py_to_json('pipeline_with_task_final_status') + + def test_pipeline_with_task_final_status_yaml(self): + self._test_compile_py_to_json('pipeline_with_task_final_status_yaml') + def test_v2_component_with_pip_index_urls(self): self._test_compile_py_to_json('v2_component_with_pip_index_urls') diff --git a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_task_final_status.json b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_task_final_status.json new file mode 100644 index 00000000000..eacd1d1a4cb --- /dev/null +++ b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_task_final_status.json @@ -0,0 +1,222 @@ +{ + "pipelineSpec": { + "components": { + "comp-exit-handler-1": { + "dag": { + "tasks": { + "fail-op": { + "cachingOptions": { + "enableCache": true + }, + "componentRef": { + "name": "comp-fail-op" + }, + "inputs": { + "parameters": { + "message": { + "runtimeValue": { + "constantValue": { + "stringValue": "Task failed." + } + } + } + } + }, + "taskInfo": { + "name": "fail-op" + } + }, + "print-op": { + "cachingOptions": { + "enableCache": true + }, + "componentRef": { + "name": "comp-print-op" + }, + "inputs": { + "parameters": { + "message": { + "componentInputParameter": "pipelineparam--message" + } + } + }, + "taskInfo": { + "name": "print-op" + } + } + } + }, + "inputDefinitions": { + "parameters": { + "pipelineparam--message": { + "type": "STRING" + } + } + } + }, + "comp-exit-op": { + "executorLabel": "exec-exit-op", + "inputDefinitions": { + "parameters": { + "status": { + "type": "STRING" + }, + "user_input": { + "type": "STRING" + } + } + } + }, + "comp-fail-op": { + "executorLabel": "exec-fail-op", + "inputDefinitions": { + "parameters": { + "message": { + "type": "STRING" + } + } + } + }, + "comp-print-op": { + "executorLabel": "exec-print-op", + "inputDefinitions": { + "parameters": { + "message": { + "type": "STRING" + } + } + } + } + }, + "deploymentSpec": { + "executors": { + "exec-exit-op": { + "container": { + "args": [ + "--executor_input", + "{{$}}", + "--function_to_execute", + "exit_op" + ], + "command": [ + "sh", + "-c", + "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.11' && \"$0\" \"$@\"\n", + "sh", + "-ec", + "program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n", + "\nimport kfp\nfrom kfp.v2 import dsl\nfrom kfp.v2.dsl import *\nfrom typing import *\n\ndef exit_op(user_input: str, status: PipelineTaskFinalStatus):\n \"\"\"Checks pipeline run status.\"\"\"\n print('Pipeline status: ', status.state)\n print('Job resource name: ', status.pipeline_job_resource_name)\n print('Pipeline task name: ', status.pipeline_task_name)\n print('Error code: ', status.error_code)\n print('Error message: ', status.error_message)\n\n" + ], + "image": "python:3.7" + } + }, + "exec-fail-op": { + "container": { + "args": [ + "--executor_input", + "{{$}}", + "--function_to_execute", + "fail_op" + ], + "command": [ + "sh", + "-c", + "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.11' && \"$0\" \"$@\"\n", + "sh", + "-ec", + "program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n", + "\nimport kfp\nfrom kfp.v2 import dsl\nfrom kfp.v2.dsl import *\nfrom typing import *\n\ndef fail_op(message: str):\n \"\"\"Fails.\"\"\"\n import sys\n print(message)\n sys.exit(1)\n\n" + ], + "image": "python:3.7" + } + }, + "exec-print-op": { + "container": { + "args": [ + "--executor_input", + "{{$}}", + "--function_to_execute", + "print_op" + ], + "command": [ + "sh", + "-c", + "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.11' && \"$0\" \"$@\"\n", + "sh", + "-ec", + "program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n", + "\nimport kfp\nfrom kfp.v2 import dsl\nfrom kfp.v2.dsl import *\nfrom typing import *\n\ndef print_op(message: str):\n \"\"\"Prints a message.\"\"\"\n print(message)\n\n" + ], + "image": "python:3.7" + } + } + } + }, + "pipelineInfo": { + "name": "pipeline-with-task-final-status" + }, + "root": { + "dag": { + "tasks": { + "exit-handler-1": { + "componentRef": { + "name": "comp-exit-handler-1" + }, + "inputs": { + "parameters": { + "pipelineparam--message": { + "componentInputParameter": "message" + } + } + }, + "taskInfo": { + "name": "my-pipeline" + } + }, + "exit-op": { + "componentRef": { + "name": "comp-exit-op" + }, + "dependentTasks": [ + "exit-handler-1" + ], + "inputs": { + "parameters": { + "status": { + "taskFinalStatus": { + "producerTask": "exit-handler-1" + } + }, + "user_input": { + "componentInputParameter": "message" + } + } + }, + "taskInfo": { + "name": "exit-op" + }, + "triggerPolicy": { + "strategy": "ALL_UPSTREAM_TASKS_COMPLETED" + } + } + } + }, + "inputDefinitions": { + "parameters": { + "message": { + "type": "STRING" + } + } + } + }, + "schemaVersion": "2.0.0", + "sdkVersion": "kfp-1.8.11" + }, + "runtimeConfig": { + "parameters": { + "message": { + "stringValue": "Hello World!" + } + } + } +} \ No newline at end of file diff --git a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_task_final_status.py b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_task_final_status.py new file mode 100644 index 00000000000..7c5b2269931 --- /dev/null +++ b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_task_final_status.py @@ -0,0 +1,59 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Pipeline using ExitHandler with PipelineTaskFinalStatus.""" + +from kfp.v2 import dsl +from kfp.v2 import compiler +from kfp.v2.dsl import component + +from kfp.v2.dsl import PipelineTaskFinalStatus + + +@component +def exit_op(user_input: str, status: PipelineTaskFinalStatus): + """Checks pipeline run status.""" + print('Pipeline status: ', status.state) + print('Job resource name: ', status.pipeline_job_resource_name) + print('Pipeline task name: ', status.pipeline_task_name) + print('Error code: ', status.error_code) + print('Error message: ', status.error_message) + + +@component +def print_op(message: str): + """Prints a message.""" + print(message) + + +@component +def fail_op(message: str): + """Fails.""" + import sys + print(message) + sys.exit(1) + + +@dsl.pipeline(name='pipeline-with-task-final-status') +def my_pipeline(message: str = 'Hello World!'): + exit_task = exit_op(user_input=message) + + with dsl.ExitHandler(exit_task, name='my-pipeline'): + print_op(message=message) + fail_op(message='Task failed.') + + +if __name__ == '__main__': + compiler.Compiler().compile( + pipeline_func=my_pipeline, + package_path=__file__.replace('.py', '.json')) diff --git a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_task_final_status_yaml.json b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_task_final_status_yaml.json new file mode 100644 index 00000000000..302de190af7 --- /dev/null +++ b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_task_final_status_yaml.json @@ -0,0 +1,151 @@ +{ + "pipelineSpec": { + "components": { + "comp-exit-handler-1": { + "dag": { + "tasks": { + "print-op": { + "cachingOptions": { + "enableCache": true + }, + "componentRef": { + "name": "comp-print-op" + }, + "inputs": { + "parameters": { + "message": { + "componentInputParameter": "pipelineparam--message" + } + } + }, + "taskInfo": { + "name": "print-op" + } + } + } + }, + "inputDefinitions": { + "parameters": { + "pipelineparam--message": { + "type": "STRING" + } + } + } + }, + "comp-exit-op": { + "executorLabel": "exec-exit-op", + "inputDefinitions": { + "parameters": { + "status": { + "type": "STRING" + }, + "user_input": { + "type": "STRING" + } + } + } + }, + "comp-print-op": { + "executorLabel": "exec-print-op", + "inputDefinitions": { + "parameters": { + "message": { + "type": "STRING" + } + } + } + } + }, + "deploymentSpec": { + "executors": { + "exec-exit-op": { + "container": { + "command": [ + "echo", + "user input:", + "{{$.inputs.parameters['user_input']}}", + "pipeline status:", + "{{$.inputs.parameters['status']}}" + ], + "image": "python:3.7" + } + }, + "exec-print-op": { + "container": { + "command": [ + "echo", + "{{$.inputs.parameters['message']}}" + ], + "image": "python:3.7" + } + } + } + }, + "pipelineInfo": { + "name": "pipeline-with-task-final-status-yaml" + }, + "root": { + "dag": { + "tasks": { + "exit-handler-1": { + "componentRef": { + "name": "comp-exit-handler-1" + }, + "inputs": { + "parameters": { + "pipelineparam--message": { + "componentInputParameter": "message" + } + } + }, + "taskInfo": { + "name": "my-pipeline" + } + }, + "exit-op": { + "componentRef": { + "name": "comp-exit-op" + }, + "dependentTasks": [ + "exit-handler-1" + ], + "inputs": { + "parameters": { + "status": { + "taskFinalStatus": { + "producerTask": "exit-handler-1" + } + }, + "user_input": { + "componentInputParameter": "message" + } + } + }, + "taskInfo": { + "name": "exit-op" + }, + "triggerPolicy": { + "strategy": "ALL_UPSTREAM_TASKS_COMPLETED" + } + } + } + }, + "inputDefinitions": { + "parameters": { + "message": { + "type": "STRING" + } + } + } + }, + "schemaVersion": "2.0.0", + "sdkVersion": "kfp-1.8.11" + }, + "runtimeConfig": { + "parameters": { + "message": { + "stringValue": "Hello World!" + } + } + } +} \ No newline at end of file diff --git a/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_task_final_status_yaml.py b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_task_final_status_yaml.py new file mode 100644 index 00000000000..5b9d5bfde50 --- /dev/null +++ b/sdk/python/kfp/v2/compiler_cli_tests/test_data/pipeline_with_task_final_status_yaml.py @@ -0,0 +1,61 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Pipeline using ExitHandler with PipelineTaskFinalStatus (YAML).""" + +from kfp import components +from kfp.v2 import dsl +from kfp.v2 import compiler +from kfp.v2.dsl import component + +exit_op = components.load_component_from_text(""" +name: Exit Op +inputs: +- {name: user_input, type: String} +- {name: status, type: PipelineTaskFinalStatus} +implementation: + container: + image: python:3.7 + command: + - echo + - "user input:" + - {inputValue: user_input} + - "pipeline status:" + - {inputValue: status} +""") + +print_op = components.load_component_from_text(""" +name: Print Op +inputs: +- {name: message, type: String} +implementation: + container: + image: python:3.7 + command: + - echo + - {inputValue: message} +""") + + +@dsl.pipeline(name='pipeline-with-task-final-status-yaml') +def my_pipeline(message: str = 'Hello World!'): + exit_task = exit_op(user_input=message) + + with dsl.ExitHandler(exit_task, name='my-pipeline'): + print_op(message=message) + + +if __name__ == '__main__': + compiler.Compiler().compile( + pipeline_func=my_pipeline, + package_path=__file__.replace('.py', '.json')) diff --git a/sdk/python/kfp/v2/components/executor.py b/sdk/python/kfp/v2/components/executor.py index 2e0a57f06ec..4c5355d40d3 100644 --- a/sdk/python/kfp/v2/components/executor.py +++ b/sdk/python/kfp/v2/components/executor.py @@ -16,6 +16,7 @@ from typing import Any, Callable, Dict, List, Optional, Union from kfp.v2.components.types import artifact_types, type_annotations +from kfp.v2.components import task_final_status class Executor(): @@ -273,8 +274,23 @@ def execute(self): # `Optional[]` to get the actual parameter type. v = type_annotations.maybe_strip_optional_from_annotation(v) - if self._is_parameter(v): - func_kwargs[k] = self._get_input_parameter_value(k, v) + if v is task_final_status.PipelineTaskFinalStatus: + value = self._get_input_parameter_value(k, v) + func_kwargs[k] = task_final_status.PipelineTaskFinalStatus( + state=value.get('state'), + pipeline_job_resource_name=value.get( + 'pipelineJobResourceName'), + # pipelineTaskName won't be None once the Vertex Pipelines + # BE change is rolled out + pipeline_task_name=value.get('pipelineTaskName', None), + error_code=value.get('error').get('code', None), + error_message=value.get('error').get('message', None), + ) + + elif self._is_parameter(v): + value = self._get_input_parameter_value(k, v) + if value is not None: + func_kwargs[k] = value if type_annotations.is_artifact_annotation(v): if type_annotations.is_input_artifact(v): diff --git a/sdk/python/kfp/v2/components/executor_test.py b/sdk/python/kfp/v2/components/executor_test.py index 607f9a7fa2e..9af2023b4ea 100644 --- a/sdk/python/kfp/v2/components/executor_test.py +++ b/sdk/python/kfp/v2/components/executor_test.py @@ -25,6 +25,7 @@ Model) from kfp.v2.components.types.type_annotations import (Input, InputPath, Output, OutputPath) +from kfp.v2.components.task_final_status import PipelineTaskFinalStatus _EXECUTOR_INPUT = """\ { @@ -78,10 +79,10 @@ }, "parameters": { "output_parameter_path": { - "outputFile": "gs://some-bucket/some_task/nested/output_parameter" + "outputFile": "%(test_dir)s/gcs/some-bucket/some_task/nested/output_parameter" } }, - "outputFile": "%s/output_metadata.json" + "outputFile": "%(test_dir)s/output_metadata.json" } } """ @@ -89,13 +90,13 @@ class ExecutorTest(unittest.TestCase): - def setUp(self): - self.maxDiff = None - self._test_dir = tempfile.mkdtemp() - artifact_types._GCS_LOCAL_MOUNT_PREFIX = self._test_dir + '/' - artifact_types._MINIO_LOCAL_MOUNT_PREFIX = self._test_dir + '/minio/' - artifact_types._S3_LOCAL_MOUNT_PREFIX = self._test_dir + '/s3/' - return super().setUp() + @classmethod + def setUp(cls): + cls.maxDiff = None + cls._test_dir = tempfile.mkdtemp() + artifact_types._GCS_LOCAL_MOUNT_PREFIX = cls._test_dir + '/' + artifact_types._MINIO_LOCAL_MOUNT_PREFIX = cls._test_dir + '/minio/' + artifact_types._S3_LOCAL_MOUNT_PREFIX = cls._test_dir + '/s3/' def _get_executor( self, @@ -104,7 +105,8 @@ def _get_executor( if executor_input is None: executor_input = _EXECUTOR_INPUT - executor_input_dict = json.loads(executor_input % self._test_dir) + executor_input_dict = json.loads(executor_input % + {'test_dir': self._test_dir}) return executor.Executor( executor_input=executor_input_dict, function_to_execute=func) @@ -147,11 +149,9 @@ def test_output_parameter(self): def test_func(output_parameter_path: OutputPath(str)): # Test that output parameters just use the passed in filename. self.assertEqual( - output_parameter_path, - 'gs://some-bucket/some_task/nested/output_parameter') + output_parameter_path, self._test_dir + + '/gcs/some-bucket/some_task/nested/output_parameter') - # Test writing to the path succeeds. This fails if parent directories - # don't exist. with open(output_parameter_path, 'w') as f: f.write('Hello, World!') @@ -235,7 +235,7 @@ def test_function_string_output(self): "outputFile": "gs://some-bucket/output" } }, - "outputFile": "%s/output_metadata.json" + "outputFile": "%(test_dir)s/output_metadata.json" } } """ @@ -278,7 +278,7 @@ def test_function_with_int_output(self): "outputFile": "gs://some-bucket/output" } }, - "outputFile": "%s/output_metadata.json" + "outputFile": "%(test_dir)s/output_metadata.json" } } """ @@ -317,7 +317,7 @@ def test_function_with_float_output(self): "outputFile": "gs://some-bucket/output" } }, - "outputFile": "%s/output_metadata.json" + "outputFile": "%(test_dir)s/output_metadata.json" } } """ @@ -356,7 +356,7 @@ def test_function_with_list_output(self): "outputFile": "gs://some-bucket/output" } }, - "outputFile": "%s/output_metadata.json" + "outputFile": "%(test_dir)s/output_metadata.json" } } """ @@ -395,7 +395,7 @@ def test_function_with_dict_output(self): "outputFile": "gs://some-bucket/output" } }, - "outputFile": "%s/output_metadata.json" + "outputFile": "%(test_dir)s/output_metadata.json" } } """ @@ -407,13 +407,14 @@ def test_func(first: int, second: int) -> Dict: with open(os.path.join(self._test_dir, 'output_metadata.json'), 'r') as f: output_metadata = json.loads(f.read()) - self.assertDictEqual(output_metadata, { - "parameters": { - "Output": { - "stringValue": "{\"first\": 40, \"second\": 2}" - } - }, - }) + self.assertDictEqual( + output_metadata, { + "parameters": { + "Output": { + "stringValue": "{\"first\": 40, \"second\": 2}" + } + }, + }) def test_function_with_typed_list_output(self): executor_input = """\ @@ -434,7 +435,7 @@ def test_function_with_typed_list_output(self): "outputFile": "gs://some-bucket/output" } }, - "outputFile": "%s/output_metadata.json" + "outputFile": "%(test_dir)s/output_metadata.json" } } """ @@ -473,7 +474,7 @@ def test_function_with_typed_dict_output(self): "outputFile": "gs://some-bucket/output" } }, - "outputFile": "%s/output_metadata.json" + "outputFile": "%(test_dir)s/output_metadata.json" } } """ @@ -485,13 +486,14 @@ def test_func(first: int, second: int) -> Dict[str, int]: with open(os.path.join(self._test_dir, 'output_metadata.json'), 'r') as f: output_metadata = json.loads(f.read()) - self.assertDictEqual(output_metadata, { - "parameters": { - "Output": { - "stringValue": "{\"first\": 40, \"second\": 2}" - } - }, - }) + self.assertDictEqual( + output_metadata, { + "parameters": { + "Output": { + "stringValue": "{\"first\": 40, \"second\": 2}" + } + }, + }) def test_artifact_output(self): executor_input = """\ @@ -520,7 +522,7 @@ def test_artifact_output(self): ] } }, - "outputFile": "%s/output_metadata.json" + "outputFile": "%(test_dir)s/output_metadata.json" } } """ @@ -574,7 +576,7 @@ def test_named_tuple_output(self): "outputFile": "gs://some-bucket/output_string" } }, - "outputFile": "%s/output_metadata.json" + "outputFile": "%(test_dir)s/output_metadata.json" } } """ @@ -652,7 +654,7 @@ def test_function_with_optional_inputs(self): "outputFile": "gs://some-bucket/output" } }, - "outputFile": "%s/output_metadata.json" + "outputFile": "%(test_dir)s/output_metadata.json" } } """ @@ -681,6 +683,52 @@ def test_func( }, }) + def test_function_with_pipeline_task_final_status(self): + executor_input = """\ + { + "inputs": { + "parameters": { + "status": { + "stringValue": "{\\"error\\":{\\"code\\":9,\\"message\\":\\"The DAG failed because some tasks failed. The failed tasks are: [fail-op].\\"},\\"pipelineJobResourceName\\":\\"projects/123/locations/us-central1/pipelineJobs/pipeline-456\\",\\"pipelineTaskName\\":\\"upstream-task\\",\\"state\\":\\"FAILED\\"}" + } + } + }, + "outputs": { + "parameters": { + "output": { + "outputFile": "gs://some-bucket/output" + } + }, + "outputFile": "%(test_dir)s/output_metadata.json" + } + } + """ + + def test_func(status: PipelineTaskFinalStatus) -> str: + return (f'Pipeline status: {status.state}\n' + f'Job resource name: {status.pipeline_job_resource_name}\n' + f'Pipeline task name: {status.pipeline_task_name}\n' + f'Error code: {status.error_code}\n' + f'Error message: {status.error_message}') + + self._get_executor(test_func, executor_input).execute() + with open(os.path.join(self._test_dir, 'output_metadata.json'), + 'r') as f: + output_metadata = json.loads(f.read()) + self.assertDictEqual( + output_metadata, { + 'parameters': { + 'Output': { + 'stringValue': + 'Pipeline status: FAILED\n' + 'Job resource name: projects/123/locations/us-central1/pipelineJobs/pipeline-456\n' + 'Pipeline task name: upstream-task\n' + 'Error code: 9\n' + 'Error message: The DAG failed because some tasks failed. The failed tasks are: [fail-op].' + } + }, + }) + if __name__ == '__main__': unittest.main() diff --git a/sdk/python/kfp/v2/components/task_final_status.py b/sdk/python/kfp/v2/components/task_final_status.py new file mode 100644 index 00000000000..938eafbb105 --- /dev/null +++ b/sdk/python/kfp/v2/components/task_final_status.py @@ -0,0 +1,43 @@ +# Copyright 2022 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Definition for PipelineTaskFinalStatus.""" + +import dataclasses +from typing import Optional + + +@dataclasses.dataclass +class PipelineTaskFinalStatus: + """The final status of a pipeline task. + + This is the Python representation of the proto: PipelineTaskFinalStatus + https://github.com/kubeflow/pipelines/blob/1c3e2768e6177d5d6e3f4b8eff8fafb9a3b76c1f/api/v2alpha1/pipeline_spec.proto#L886 + + Attributes: + state: The final state of the task. The value could be one of + 'SUCCEEDED', 'FAILED' or 'CANCELLED'. + pipeline_job_resource_name: The pipeline job resource name, in the format + of `projects/{project}/locations/{location}/pipelineJobs/{pipeline_job}`. + pipeline_task_name: The pipeline task that produces this status. + error_code: In case of error, the oogle.rpc.Code + https://github.com/googleapis/googleapis/blob/master/google/rpc/code.proto + If state is 'SUCCEEDED', this is None. + error_message: In case of error, the detailed error message. + If state is 'SUCCEEDED', this is None. + """ + state: str + pipeline_job_resource_name: str + pipeline_task_name: str + error_code: Optional[int] + error_message: Optional[str] diff --git a/sdk/python/kfp/v2/components/types/type_utils.py b/sdk/python/kfp/v2/components/types/type_utils.py index 911e00369a9..b02590c597b 100644 --- a/sdk/python/kfp/v2/components/types/type_utils.py +++ b/sdk/python/kfp/v2/components/types/type_utils.py @@ -19,6 +19,7 @@ from kfp.components import structures, type_annotation_utils from kfp.pipeline_spec import pipeline_spec_pb2 +from kfp.v2.components import task_final_status from kfp.v2.components.types import artifact_types PARAMETER_TYPES = Union[str, int, float, bool, dict, list] @@ -65,6 +66,19 @@ } +def is_task_final_status_type(type_name: Optional[Union[str, dict]]) -> bool: + """Check if a ComponentSpec I/O type is PipelineTaskFinalStatus. + + Args: + type_name: type name of the ComponentSpec I/O type. + + Returns: + True if the type name is 'PipelineTaskFinalStatus'. + """ + return isinstance(type_name, str) and ( + type_name == task_final_status.PipelineTaskFinalStatus.__name__) + + def is_parameter_type(type_name: Optional[Union[str, dict]]) -> bool: """Check if a ComponentSpec I/O type is considered as a parameter type. @@ -81,7 +95,8 @@ def is_parameter_type(type_name: Optional[Union[str, dict]]) -> bool: else: return False - return type_name.lower() in _PARAMETER_TYPES_MAPPING + return type_name.lower( + ) in _PARAMETER_TYPES_MAPPING or is_task_final_status_type(type_name) def get_artifact_type_schema( diff --git a/sdk/python/kfp/v2/components/types/type_utils_test.py b/sdk/python/kfp/v2/components/types/type_utils_test.py index 2fa1fbc7b6b..377e8d99f1c 100644 --- a/sdk/python/kfp/v2/components/types/type_utils_test.py +++ b/sdk/python/kfp/v2/components/types/type_utils_test.py @@ -40,6 +40,7 @@ 'data_type': 'proto:tfx.components.trainer.TrainArgs' } }, + 'PipelineTaskFinalStatus', ] _KNOWN_ARTIFACT_TYPES = ['Model', 'Dataset', 'Schema', 'Metrics'] _UNKNOWN_ARTIFACT_TYPES = [None, 'Arbtrary Model', 'dummy'] @@ -391,6 +392,24 @@ def test_verify_type_compatibility( error_message_prefix='', ) + @parameterized.parameters( + { + 'given_type': 'PipelineTaskFinalStatus', + 'expected_result': True, + }, + { + 'given_type': 'pipelineTaskFinalstatus', + 'expected_result': False, + }, + { + 'given_type': int, + 'expected_result': False, + }, + ) + def test_is_task_final_statu_type(self, given_type, expected_result): + self.assertEqual(expected_result, + type_utils.is_task_final_status_type(given_type)) + if __name__ == '__main__': unittest.main() diff --git a/sdk/python/kfp/v2/dsl/__init__.py b/sdk/python/kfp/v2/dsl/__init__.py index 5dfc3958639..b5e5f9e19b6 100644 --- a/sdk/python/kfp/v2/dsl/__init__.py +++ b/sdk/python/kfp/v2/dsl/__init__.py @@ -43,8 +43,10 @@ ParallelFor, ) +from kfp.v2.components.task_final_status import PipelineTaskFinalStatus + PIPELINE_JOB_NAME_PLACEHOLDER = '{{$.pipeline_job_name}}' PIPELINE_JOB_RESOURCE_NAME_PLACEHOLDER = '{{$.pipeline_job_resource_name}}' PIPELINE_JOB_ID_PLACEHOLDER = '{{$.pipeline_job_uuid}}' PIPELINE_TASK_NAME_PLACEHOLDER = '{{$.pipeline_task_name}}' -PIPELINE_TASK_ID_PLACEHOLDER = '{{$.pipeline_task_uuid}}' \ No newline at end of file +PIPELINE_TASK_ID_PLACEHOLDER = '{{$.pipeline_task_uuid}}' diff --git a/sdk/python/requirements.in b/sdk/python/requirements.in index 01373cc98f2..9d592db7f82 100644 --- a/sdk/python/requirements.in +++ b/sdk/python/requirements.in @@ -32,7 +32,7 @@ typer>=0.3.2,<1.0 # kfp.v2 absl-py>=0.9,<=0.11 -kfp-pipeline-spec>=0.1.13,<0.2.0 +kfp-pipeline-spec>=0.1.14,<0.2.0 fire>=0.3.1,<1 google-api-python-client>=1.7.8,<2 pydantic>=1.8.2,<2 diff --git a/sdk/python/requirements.txt b/sdk/python/requirements.txt index 72057e5ba62..ddbcb13c5c6 100644 --- a/sdk/python/requirements.txt +++ b/sdk/python/requirements.txt @@ -66,7 +66,7 @@ idna==3.2 # via requests jsonschema==3.2.0 # via -r requirements.in -kfp-pipeline-spec==0.1.13 +kfp-pipeline-spec==0.1.14 # via -r requirements.in kfp-server-api==1.7.0 # via -r requirements.in diff --git a/sdk/python/setup.py b/sdk/python/setup.py index 20a38add1ab..d7a614c7da3 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -50,7 +50,7 @@ 'Deprecated>=1.2.7,<2', 'strip-hints>=0.1.8,<1', 'docstring-parser>=0.7.3,<1', - 'kfp-pipeline-spec>=0.1.13,<0.2.0', + 'kfp-pipeline-spec>=0.1.14,<0.2.0', 'fire>=0.3.1,<1', 'protobuf>=3.13.0,<4', 'uritemplate>=3.0.1,<4',