From 838fd4c2b5cf1a84ccc49f4f9fff809673772541 Mon Sep 17 00:00:00 2001 From: Sam Lai Date: Wed, 7 Jul 2021 17:17:08 +0100 Subject: [PATCH] New-style tasks/workflows use user exception scope New-style tasks/workflows are currently not wrapped in a user exception scope, so all exceptions in user code are treated as unknown system errors which are automatically retried. Further context - https://flyte-org.slack.com/archives/CREL4QVAQ/p1625581856326400 This PR restores the user exception scope to new-style tasks/workflows so exceptions in user code are not retried unless FlyteRecoverableException is raised. --- flytekit/bin/entrypoint.py | 10 +++++++++- flytekit/core/map_task.py | 5 +++-- flytekit/core/python_function_task.py | 5 +++-- flytekit/core/workflow.py | 5 +++-- 4 files changed, 18 insertions(+), 7 deletions(-) diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index 98930d2502..92df3db876 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -96,7 +96,11 @@ def _dispatch_execute( c: OR if an unhandled exception is retrieved - record it as an errors.pb """ output_file_dict = {} - try: + + @_scopes.system_entry_point + def do_dispatch_execute(): + nonlocal output_file_dict + # Step1 local_inputs_file = _os.path.join(ctx.execution_state.working_dir, "inputs.pb") ctx.file_access.get_data(inputs_path, local_inputs_file) @@ -122,6 +126,9 @@ def _dispatch_execute( _error_models.ContainerError.Kind.RECOVERABLE, ) ) + + try: + do_dispatch_execute() except _scoped_exceptions.FlyteScopedException as e: _logging.error("!! Begin Error Captured by Flyte !!") output_file_dict[_constants.ERROR_FILE_NAME] = _error_models.ErrorDocument( @@ -130,6 +137,7 @@ def _dispatch_execute( _logging.error(e.verbose_message) _logging.error("!! End Error Captured by Flyte !!") except Exception as e: + # TODO: need to preserve IgnoreOutputs exception from system_entry_point handling if isinstance(e, IgnoreOutputs): # Step 3b _logging.warning(f"IgnoreOutputs received! Outputs.pb will not be uploaded. reason {e}") diff --git a/flytekit/core/map_task.py b/flytekit/core/map_task.py index 89ef4343bd..f8b3094b29 100644 --- a/flytekit/core/map_task.py +++ b/flytekit/core/map_task.py @@ -8,6 +8,7 @@ from typing import Any, Dict, List, Optional, Type from flytekit.common.constants import SdkTaskType +from flytekit.common.exceptions import scopes as _exception_scopes from flytekit.core.base_task import PythonTask from flytekit.core.context_manager import ExecutionState, FlyteContext, FlyteContextManager, SerializationSettings from flytekit.core.interface import transform_interface_to_list_interface @@ -168,7 +169,7 @@ def _execute_map_task(self, ctx: FlyteContext, **kwargs) -> Any: map_task_inputs = {} for k in self.interface.inputs.keys(): map_task_inputs[k] = kwargs[k][task_index] - return self._run_task.execute(**map_task_inputs) + return _exception_scopes.user_entry_point(self._run_task.execute)(**map_task_inputs) def _raw_execute(self, **kwargs) -> Any: """ @@ -190,7 +191,7 @@ def _raw_execute(self, **kwargs) -> Any: single_instance_inputs = {} for k in self.interface.inputs.keys(): single_instance_inputs[k] = kwargs[k][i] - o = self._run_task.execute(**single_instance_inputs) + o = _exception_scopes.user_entry_point(self._run_task.execute)(**single_instance_inputs) if outputs_expected: outputs.append(o) diff --git a/flytekit/core/python_function_task.py b/flytekit/core/python_function_task.py index 43b8e864cb..e621c5c0bb 100644 --- a/flytekit/core/python_function_task.py +++ b/flytekit/core/python_function_task.py @@ -20,6 +20,7 @@ from enum import Enum from typing import Any, Callable, List, Optional, TypeVar, Union +from flytekit.common.exceptions import scopes as _exception_scopes from flytekit.core.base_task import TaskResolverMixin from flytekit.core.context_manager import ( ExecutionState, @@ -156,7 +157,7 @@ def execute(self, **kwargs) -> Any: handle dynamic tasks or you will no longer be able to use the task as a dynamic task generator. """ if self.execution_mode == self.ExecutionBehavior.DEFAULT: - return self._task_function(**kwargs) + return _exception_scopes.user_entry_point(self._task_function)(**kwargs) elif self.execution_mode == self.ExecutionBehavior.DYNAMIC: return self.dynamic_execute(self._task_function, **kwargs) @@ -267,7 +268,7 @@ def dynamic_execute(self, task_function: Callable, **kwargs) -> Any: updated_exec_state = ctx.execution_state.with_params(mode=ExecutionState.Mode.TASK_EXECUTION) with FlyteContextManager.with_context(ctx.with_execution_state(updated_exec_state)): logger.info("Executing Dynamic workflow, using raw inputs") - return task_function(**kwargs) + return _exception_scopes.user_entry_point(task_function)(**kwargs) if ctx.execution_state and ctx.execution_state.mode == ExecutionState.Mode.TASK_EXECUTION: is_fast_execution = bool( diff --git a/flytekit/core/workflow.py b/flytekit/core/workflow.py index e2fba62704..94271519e7 100644 --- a/flytekit/core/workflow.py +++ b/flytekit/core/workflow.py @@ -7,6 +7,7 @@ from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union from flytekit.common import constants as _common_constants +from flytekit.common.exceptions import scopes as _exception_scopes from flytekit.common.exceptions.user import FlyteValidationException, FlyteValueException from flytekit.core.base_task import PythonTask from flytekit.core.class_based_resolver import ClassStorageTaskResolver @@ -668,7 +669,7 @@ def compile(self, **kwargs): # Construct the default input promise bindings, but then override with the provided inputs, if any input_kwargs = construct_input_promises([k for k in self.interface.inputs.keys()]) input_kwargs.update(kwargs) - workflow_outputs = self._workflow_function(**input_kwargs) + workflow_outputs = _exception_scopes.user_entry_point(self._workflow_function)(**input_kwargs) all_nodes.extend(comp_ctx.compilation_state.nodes) # This little loop was added as part of the task resolver change. The task resolver interface itself is @@ -740,7 +741,7 @@ def execute(self, **kwargs): call execute from dispatch_execute which is in _local_execute, workflows should also call an execute inside _local_execute. This makes mocking cleaner. """ - return self._workflow_function(**kwargs) + return _exception_scopes.user_entry_point(self._workflow_function)(**kwargs) def workflow(