Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up scopes and exception handling for new tasks #543

Merged
merged 10 commits into from
Jul 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions docs/source/design/authoring.rst
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,31 @@ There is also only one :py:class:`LaunchPlan <flytekit.core.launch_plan.LaunchPl
.. autoclass:: flytekit.core.launch_plan.LaunchPlan
:noindex:

******************
Exception Handling
******************
Exception handling is done along two dimensions

* System vs User: We try to differentiate between user exceptions and flytekit/system level exceptions. For instance, if flytekit
fails to upload its outputs, that's a system exception. If you the user raise a ``ValueError`` because of unexpected input
in the task code, that's a user exception.
* Recoverable vs Non-recoverable: Recoverable errors will be retried and count against your task's retry count. Non-recoverable errors will just fail. System exceptions are by default recoverable (since there's a good chance it was just a blip).

This is the user exception tree. Feel free to raise any of these exception classes. Note that the ``FlyteRecoverableException`` is the only recoverable one. All others, along with all non-flytekit defined exceptions, are non-recoverable.

.. inheritance-diagram:: flytekit.common.exceptions.user.FlyteValidationException flytekit.common.exceptions.user.FlyteEntityAlreadyExistsException flytekit.common.exceptions.user.FlyteValueException flytekit.common.exceptions.user.FlyteTimeout flytekit.common.exceptions.user.FlyteAuthenticationException flytekit.common.exceptions.user.FlyteRecoverableException
:parts: 1
:top-classes: Exception

Implementation
==============
For those that want to dig a bit deeper, take a look at the :py:class:`flytekit.common.exceptions.scopes.FlyteScopedException` classes.
There are also two decorators which you'll find interspersed througout the codebase.

.. autofunction:: flytekit.common.exceptions.scopes.system_entry_point

.. autofunction:: flytekit.common.exceptions.scopes.user_entry_point

**************
Call Patterns
**************
Expand Down
45 changes: 35 additions & 10 deletions flytekit/bin/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from flytekit.models import dynamic_job as _dynamic_job
from flytekit.models import literals as _literal_models
from flytekit.models.core import errors as _error_models
from flytekit.models.core import execution as _execution_models
from flytekit.models.core import identifier as _identifier
from flytekit.tools.fast_registration import download_distribution as _download_distribution
from flytekit.tools.module_loader import load_object_from_module
Expand Down Expand Up @@ -104,7 +105,9 @@ def _dispatch_execute(
idl_input_literals = _literal_models.LiteralMap.from_flyte_idl(input_proto)

# Step2
outputs = task_def.dispatch_execute(ctx, idl_input_literals)
# Decorate the dispatch execute function before calling it, this wraps all exceptions into one
# of the FlyteScopedExceptions
outputs = _scoped_exceptions.system_entry_point(task_def.dispatch_execute)(ctx, idl_input_literals)
Copy link
Contributor

@kumare3 kumare3 Jul 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

omg, this is weird?
why not a simple

try:
   outputs = ....
except :
  ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is effectively what the decorator does. The reason the decorator exists is mostly for two reasons:

  1. We enter and leave user-code in multiple places,
  2. The excpetions that we, flytekit authors, throw do not conform to any hierarchy easily distinguishable from potential user exceptions. After we complete [Housekeeping] [flytekit] Exception cleanup flyte#1033, and all flytekit exceptions have a base class that users will never use, we can get rid of these scoped exceptions and just use a few try catches to distinguish things.

I think it's preferable to get rid of these scoped exceptions actually, I just don't want to go through everything and update them right now. Maybe at 1.0

# Step3a
if isinstance(outputs, VoidPromise):
_logging.getLogger().warning("Task produces no outputs")
Expand All @@ -120,31 +123,53 @@ def _dispatch_execute(
"UNKNOWN_OUTPUT",
f"Type of output received not handled {type(outputs)} outputs: {outputs}",
_error_models.ContainerError.Kind.RECOVERABLE,
_execution_models.ExecutionError.ErrorKind.SYSTEM,
)
)
except _scoped_exceptions.FlyteScopedException as e:
_logging.error("!! Begin Error Captured by Flyte !!")

# Handle user-scoped errors
except _scoped_exceptions.FlyteScopedUserException as e:
if isinstance(e.value, IgnoreOutputs):
_logging.warning(f"User-scoped IgnoreOutputs received! Outputs.pb will not be uploaded. reason {e}!!")
return
output_file_dict[_constants.ERROR_FILE_NAME] = _error_models.ErrorDocument(
_error_models.ContainerError(e.error_code, e.verbose_message, e.kind)
_error_models.ContainerError(
e.error_code, e.verbose_message, e.kind, _execution_models.ExecutionError.ErrorKind.USER
)
)
_logging.error("!! Begin User Error Captured by Flyte !!")
_logging.error(e.verbose_message)
_logging.error("!! End Error Captured by Flyte !!")
except Exception as e:
if isinstance(e, IgnoreOutputs):
# Step 3b
_logging.warning(f"IgnoreOutputs received! Outputs.pb will not be uploaded. reason {e}")

# Handle system-scoped errors
except _scoped_exceptions.FlyteScopedSystemException as e:
if isinstance(e.value, IgnoreOutputs):
_logging.warning(f"System-scoped IgnoreOutputs received! Outputs.pb will not be uploaded. reason {e}!!")
return
output_file_dict[_constants.ERROR_FILE_NAME] = _error_models.ErrorDocument(
_error_models.ContainerError(
e.error_code, e.verbose_message, e.kind, _execution_models.ExecutionError.ErrorKind.SYSTEM
)
)
_logging.error("!! Begin System Error Captured by Flyte !!")
_logging.error(e.verbose_message)
_logging.error("!! End Error Captured by Flyte !!")

# Interpret all other exceptions (some of which may be caused by the code in the try block outside of
# dispatch_execute) as recoverable system exceptions.
except Exception as e:
# Step 3c
_logging.error(f"Exception when executing task {task_def.name or task_def.id.name}, reason {str(e)}")
_logging.error("!! Begin Unknown System Error Captured by Flyte !!")
exc_str = _traceback.format_exc()
output_file_dict[_constants.ERROR_FILE_NAME] = _error_models.ErrorDocument(
_error_models.ContainerError(
"SYSTEM:Unknown",
exc_str,
_error_models.ContainerError.Kind.RECOVERABLE,
_execution_models.ExecutionError.ErrorKind.SYSTEM,
)
)
_logging.error(f"Exception when executing task {task_def.name or task_def.id.name}, reason {str(e)}")
_logging.error("!! Begin Unknown System Error Captured by Flyte !!")
_logging.error(exc_str)
_logging.error("!! End Error Captured by Flyte !!")

Expand Down
74 changes: 34 additions & 40 deletions flytekit/common/exceptions/scopes.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from sys import exc_info as _exc_info
from traceback import format_tb as _format_tb

from six import reraise as _reraise
from wrapt import decorator as _decorator

from flytekit.common.exceptions import base as _base_exceptions
Expand Down Expand Up @@ -77,7 +76,7 @@ def error_code(self):
return "{}:Unknown".format(self._context)

@property
def kind(self):
def kind(self) -> int:
"""
:rtype: int
"""
Expand Down Expand Up @@ -138,45 +137,51 @@ def _is_base_context():
@_decorator
def system_entry_point(wrapped, instance, args, kwargs):
"""
Decorator for wrapping functions that enter a system context. This should decorate every method a user might
call. This will allow us to add differentiation between what is a user error and what is a system failure.
Furthermore, we will clean the exception trace so as to make more sense to the user--allowing them to know if they
should take action themselves or pass on to the platform owners. We will dispatch metrics and such appropriately.
The reason these two (see the user one below) decorators exist is to categorize non-Flyte exceptions at arbitrary
locations. For example, while there is a separate ecosystem of Flyte-defined user and system exceptions
(see the FlyteException hierarchy), and we can easily understand and categorize those, if flytekit comes upon
a random ``ValueError`` or other non-flytekit defined error, how would we know if it was a bug in flytekit versus an
error with user code or something the user called? The purpose of these decorators is to categorize those (see
the last case in the nested try/catch below.

Decorator for wrapping functions that enter a system context. This should decorate every method that may invoke some
user code later on down the line. This will allow us to add differentiation between what is a user error and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

user code later on down the line. This will allow us to add differentiation between what is a
WHY not at the point where the user code is called? i.e. only the last parent of the user code function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment above.

what is a system failure. Furthermore, we will clean the exception trace so as to make more sense to the
user -- allowing them to know if they should take action themselves or pass on to the platform owners.
We will dispatch metrics and such appropriately.
"""
try:
_CONTEXT_STACK.append(_SYSTEM_CONTEXT)
if _is_base_context():
# If this is the first time either of this decorator, or the one below is called, then we unwrap the
# exception. The first time these decorators are used is currently in the entrypoint.py file. The scoped
# exceptions are unwrapped because at that point, we want to return the underlying error to the user.
try:
return wrapped(*args, **kwargs)
except FlyteScopedException as ex:
_reraise(ex.type, ex.value, ex.traceback)
raise ex.value
else:
try:
return wrapped(*args, **kwargs)
except FlyteScopedException:
# Just pass-on the exception that is already wrapped and scoped
_reraise(*_exc_info())
except FlyteScopedException as scoped:
raise scoped
except _user_exceptions.FlyteUserException:
# Re-raise from here.
_reraise(
FlyteScopedUserException,
FlyteScopedUserException(*_exc_info()),
_exc_info()[2],
)
raise FlyteScopedUserException(*_exc_info())
except Exception:
# This is why this function exists - arbitrary exceptions that we don't know what to do with are
# interpreted as system errors.
# System error, raise full stack-trace all the way up the chain.
_reraise(
FlyteScopedSystemException,
FlyteScopedSystemException(*_exc_info(), kind=_error_model.ContainerError.Kind.RECOVERABLE),
_exc_info()[2],
)
raise FlyteScopedSystemException(*_exc_info(), kind=_error_model.ContainerError.Kind.RECOVERABLE)
finally:
_CONTEXT_STACK.pop()


@_decorator
def user_entry_point(wrapped, instance, args, kwargs):
"""
See the comment for the system_entry_point above as well.

Decorator for wrapping functions that enter into a user context. This will help us differentiate user-created
failures even when it is re-entrant into system code.

Expand All @@ -188,35 +193,24 @@ def user_entry_point(wrapped, instance, args, kwargs):
try:
_CONTEXT_STACK.append(_USER_CONTEXT)
if _is_base_context():
# See comment at this location for system_entry_point
try:
return wrapped(*args, **kwargs)
except FlyteScopedException as ex:
_reraise(ex.type, ex.value, ex.traceback)
raise ex.value
else:
try:
return wrapped(*args, **kwargs)
except FlyteScopedException:
# Just pass on the already wrapped and scoped exception
_reraise(*_exc_info())
except FlyteScopedException as scoped:
raise scoped
except _user_exceptions.FlyteUserException:
_reraise(
FlyteScopedUserException,
FlyteScopedUserException(*_exc_info()),
_exc_info()[2],
)
raise FlyteScopedUserException(*_exc_info())
except _system_exceptions.FlyteSystemException:
_reraise(
FlyteScopedSystemException,
FlyteScopedSystemException(*_exc_info()),
_exc_info()[2],
)
raise FlyteScopedSystemException(*_exc_info())
except Exception:
# Any non-platform raised exception is a user exception.
# This is why this function exists - arbitrary exceptions that we don't know what to do with are
# interpreted as user exceptions.
# This will also catch FlyteUserException re-raised by the system_entry_point handler
_reraise(
FlyteScopedUserException,
FlyteScopedUserException(*_exc_info()),
_exc_info()[2],
)
raise FlyteScopedUserException(*_exc_info())
finally:
_CONTEXT_STACK.pop()
6 changes: 4 additions & 2 deletions flytekit/core/map_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
Flytekit map tasks specify how to run a single task across a list of inputs. Map tasks themselves are constructed with
a reference task as well as run-time parameters that limit execution concurrency and failure tolerations.
"""

import os
from contextlib import contextmanager
from itertools import count
from typing import Any, Dict, List, Optional, Type

from flytekit.common.constants import SdkTaskType
from flytekit.common.exceptions import scopes as exception_scopes
from flytekit.core.base_task import PythonTask
from flytekit.core.context_manager import ExecutionState, FlyteContext, FlyteContextManager, SerializationSettings
from flytekit.core.interface import transform_interface_to_list_interface
Expand Down Expand Up @@ -168,7 +170,7 @@ def _execute_map_task(self, ctx: FlyteContext, **kwargs) -> Any:
map_task_inputs = {}
for k in self.interface.inputs.keys():
map_task_inputs[k] = kwargs[k][task_index]
return self._run_task.execute(**map_task_inputs)
return exception_scopes.user_entry_point(self._run_task.execute)(**map_task_inputs)

def _raw_execute(self, **kwargs) -> Any:
"""
Expand All @@ -190,7 +192,7 @@ def _raw_execute(self, **kwargs) -> Any:
single_instance_inputs = {}
for k in self.interface.inputs.keys():
single_instance_inputs[k] = kwargs[k][i]
o = self._run_task.execute(**single_instance_inputs)
o = exception_scopes.user_entry_point(self._run_task.execute)(**single_instance_inputs)
if outputs_expected:
outputs.append(o)

Expand Down
5 changes: 3 additions & 2 deletions flytekit/core/python_function_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from enum import Enum
from typing import Any, Callable, List, Optional, TypeVar, Union

from flytekit.common.exceptions import scopes as exception_scopes
from flytekit.core.base_task import TaskResolverMixin
from flytekit.core.context_manager import (
ExecutionState,
Expand Down Expand Up @@ -156,7 +157,7 @@ def execute(self, **kwargs) -> Any:
handle dynamic tasks or you will no longer be able to use the task as a dynamic task generator.
"""
if self.execution_mode == self.ExecutionBehavior.DEFAULT:
return self._task_function(**kwargs)
return exception_scopes.user_entry_point(self._task_function)(**kwargs)
elif self.execution_mode == self.ExecutionBehavior.DYNAMIC:
return self.dynamic_execute(self._task_function, **kwargs)

Expand Down Expand Up @@ -267,7 +268,7 @@ def dynamic_execute(self, task_function: Callable, **kwargs) -> Any:
updated_exec_state = ctx.execution_state.with_params(mode=ExecutionState.Mode.TASK_EXECUTION)
with FlyteContextManager.with_context(ctx.with_execution_state(updated_exec_state)):
logger.info("Executing Dynamic workflow, using raw inputs")
return task_function(**kwargs)
return exception_scopes.user_entry_point(task_function)(**kwargs)

if ctx.execution_state and ctx.execution_state.mode == ExecutionState.Mode.TASK_EXECUTION:
is_fast_execution = bool(
Expand Down
2 changes: 1 addition & 1 deletion flytekit/core/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class Resources(object):

Storage is not currently supported on the Flyte backend.

Please see the :std:ref:`User Guide <cookbook:sphx_glr_auto_deployment_workflow_customizing_resources.py>` for detailed examples.
Please see the :std:ref:`User Guide <cookbook:customizing task resources>` for detailed examples.
Also refer to the `K8s conventions. <https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#resource-units-in-kubernetes>`__
"""

Expand Down
2 changes: 1 addition & 1 deletion flytekit/core/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class CronSchedule(_schedule_models.Schedule):
cron_expression="0 10 * * ? *",
)

See the :std:ref:`User Guide <cookbook:sphx_glr_auto_deployment_workflow_lp_schedules.py>` for further examples.
See the :std:ref:`User Guide <cookbook:cron schedules>` for further examples.
"""

_VALID_CRON_ALIASES = [
Expand Down
19 changes: 15 additions & 4 deletions flytekit/core/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union

from flytekit.common import constants as _common_constants
from flytekit.common.exceptions import scopes as exception_scopes
from flytekit.common.exceptions.user import FlyteValidationException, FlyteValueException
from flytekit.core.base_task import PythonTask
from flytekit.core.class_based_resolver import ClassStorageTaskResolver
Expand Down Expand Up @@ -384,11 +385,21 @@ class ImperativeWorkflow(WorkflowBase):

.. literalinclude:: ../../../tests/flytekit/unit/core/test_imperative.py
:start-after: # docs_start
:end-before: # docs_start
:end-before: # docs_end
:language: python
:dedent: 4

This workflow would be identical on the backed to the
This workflow would be identical on the back-end to

.. literalinclude:: ../../../tests/flytekit/unit/core/test_imperative.py
:start-after: # docs_equivalent_start
:end-before: # docs_equivalent_end
:language: python
:dedent: 4

Note that the only reason we need the ``NamedTuple`` is so we can name the output the same thing as in the
imperative example. The imperative paradigm makes the naming of workflow outputs easier, but this isn't a big
deal in function-workflows because names tend to not be necessary.
"""

def __init__(
Expand Down Expand Up @@ -668,7 +679,7 @@ def compile(self, **kwargs):
# Construct the default input promise bindings, but then override with the provided inputs, if any
input_kwargs = construct_input_promises([k for k in self.interface.inputs.keys()])
input_kwargs.update(kwargs)
workflow_outputs = self._workflow_function(**input_kwargs)
workflow_outputs = exception_scopes.user_entry_point(self._workflow_function)(**input_kwargs)
all_nodes.extend(comp_ctx.compilation_state.nodes)

# This little loop was added as part of the task resolver change. The task resolver interface itself is
Expand Down Expand Up @@ -740,7 +751,7 @@ def execute(self, **kwargs):
call execute from dispatch_execute which is in _local_execute, workflows should also call an execute inside
_local_execute. This makes mocking cleaner.
"""
return self._workflow_function(**kwargs)
return exception_scopes.user_entry_point(self._workflow_function)(**kwargs)


def workflow(
Expand Down
Loading