diff --git a/flytekit/bin/entrypoint.py b/flytekit/bin/entrypoint.py index 86c94fa2d7..5e5c463738 100644 --- a/flytekit/bin/entrypoint.py +++ b/flytekit/bin/entrypoint.py @@ -5,6 +5,7 @@ import pathlib import random as _random import traceback as _traceback +from sys import exc_info as _exc_info from typing import List import click as _click @@ -14,9 +15,9 @@ from flytekit.common import constants as _constants from flytekit.common import utils as _common_utils from flytekit.common import utils as _utils -from flytekit.common.exceptions import scopes as _scoped_exceptions from flytekit.common.exceptions import scopes as _scopes from flytekit.common.exceptions import system as _system_exceptions +from flytekit.common.exceptions import user as _user_exceptions from flytekit.common.tasks.sdk_runnable import ExecutionParameters from flytekit.configuration import TemporaryConfiguration as _TemporaryConfiguration from flytekit.configuration import internal as _internal_config @@ -91,25 +92,32 @@ def _dispatch_execute(ctx: FlyteContext, task_def: PythonTask, inputs_path: str, input_proto = _utils.load_proto_from_file(_literals_pb2.LiteralMap, local_inputs_file) idl_input_literals = _literal_models.LiteralMap.from_flyte_idl(input_proto) # Step2 - outputs = task_def.dispatch_execute(ctx, idl_input_literals) - # Step3a - if isinstance(outputs, VoidPromise): - _logging.getLogger().warning("Task produces no outputs") - output_file_dict = {_constants.OUTPUT_FILE_NAME: _literal_models.LiteralMap(literals={})} - elif isinstance(outputs, _literal_models.LiteralMap): - output_file_dict = {_constants.OUTPUT_FILE_NAME: outputs} - elif isinstance(outputs, _dynamic_job.DynamicJobSpec): - output_file_dict = {_constants.FUTURES_FILE_NAME: outputs} - else: - _logging.getLogger().error(f"SystemError: received unknown outputs from task {outputs}") - output_file_dict[_constants.ERROR_FILE_NAME] = _error_models.ErrorDocument( - _error_models.ContainerError( - "UNKNOWN_OUTPUT", - f"Type of output received not handled {type(outputs)} outputs: {outputs}", - _error_models.ContainerError.Kind.RECOVERABLE, + # TODO: This try mocks the user_entry_point. Remove this and decorate where appropriate after migrating + # scopes off of six + try: + outputs = task_def.dispatch_execute(ctx, idl_input_literals) + # Step3a + if isinstance(outputs, VoidPromise): + _logging.getLogger().warning("Task produces no outputs") + output_file_dict = {_constants.OUTPUT_FILE_NAME: _literal_models.LiteralMap(literals={})} + elif isinstance(outputs, _literal_models.LiteralMap): + output_file_dict = {_constants.OUTPUT_FILE_NAME: outputs} + elif isinstance(outputs, _dynamic_job.DynamicJobSpec): + output_file_dict = {_constants.FUTURES_FILE_NAME: outputs} + else: + _logging.getLogger().error(f"SystemError: received unknown outputs from task {outputs}") + output_file_dict[_constants.ERROR_FILE_NAME] = _error_models.ErrorDocument( + _error_models.ContainerError( + "UNKNOWN_OUTPUT", + f"Type of output received not handled {type(outputs)} outputs: {outputs}", + _error_models.ContainerError.Kind.RECOVERABLE, + ) ) - ) - except _scoped_exceptions.FlyteScopedException as e: + except _user_exceptions.FlyteUserException as ex: + raise _scopes.FlyteScopedUserException(*_exc_info()) + except _system_exceptions.FlyteSystemException as ex: + raise _scopes.FlyteScopedSystemException(*_exc_info()) + except _scopes.FlyteScopedException as e: _logging.error("!! Begin Error Captured by Flyte !!") output_file_dict[_constants.ERROR_FILE_NAME] = _error_models.ErrorDocument( _error_models.ContainerError(e.error_code, e.verbose_message, e.kind)