Skip to content

Commit

Permalink
Logging updates (#775)
Browse files Browse the repository at this point in the history
Signed-off-by: Yee Hing Tong <[email protected]>
  • Loading branch information
wild-endeavor authored Dec 14, 2021
1 parent 9e77e62 commit 94cf315
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 43 deletions.
59 changes: 30 additions & 29 deletions flytekit/bin/entrypoint.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import contextlib
import datetime as _datetime
import importlib as _importlib
import logging as _logging
import logging as python_logging
import os as _os
import pathlib
import random as _random
Expand Down Expand Up @@ -37,6 +37,7 @@
from flytekit.interfaces import random as _flyte_random
from flytekit.interfaces.data import data_proxy as _data_proxy
from flytekit.interfaces.stats.taggable import get_stats as _get_stats
from flytekit.loggers import entrypoint_logger as logger
from flytekit.models import dynamic_job as _dynamic_job
from flytekit.models import literals as _literal_models
from flytekit.models.core import errors as _error_models
Expand Down Expand Up @@ -95,6 +96,7 @@ def _dispatch_execute(
c: OR if an unhandled exception is retrieved - record it as an errors.pb
"""
output_file_dict = {}
logger.debug(f"Starting _dispatch_execute for {task_def.name}")
try:
# Step1
local_inputs_file = _os.path.join(ctx.execution_state.working_dir, "inputs.pb")
Expand All @@ -108,14 +110,14 @@ def _dispatch_execute(
outputs = _scoped_exceptions.system_entry_point(task_def.dispatch_execute)(ctx, idl_input_literals)
# Step3a
if isinstance(outputs, VoidPromise):
_logging.getLogger().warning("Task produces no outputs")
logger.warning("Task produces no outputs")
output_file_dict = {_constants.OUTPUT_FILE_NAME: _literal_models.LiteralMap(literals={})}
elif isinstance(outputs, _literal_models.LiteralMap):
output_file_dict = {_constants.OUTPUT_FILE_NAME: outputs}
elif isinstance(outputs, _dynamic_job.DynamicJobSpec):
output_file_dict = {_constants.FUTURES_FILE_NAME: outputs}
else:
_logging.getLogger().error(f"SystemError: received unknown outputs from task {outputs}")
logger.error(f"SystemError: received unknown outputs from task {outputs}")
output_file_dict[_constants.ERROR_FILE_NAME] = _error_models.ErrorDocument(
_error_models.ContainerError(
"UNKNOWN_OUTPUT",
Expand All @@ -128,30 +130,30 @@ def _dispatch_execute(
# Handle user-scoped errors
except _scoped_exceptions.FlyteScopedUserException as e:
if isinstance(e.value, IgnoreOutputs):
_logging.warning(f"User-scoped IgnoreOutputs received! Outputs.pb will not be uploaded. reason {e}!!")
logger.warning(f"User-scoped IgnoreOutputs received! Outputs.pb will not be uploaded. reason {e}!!")
return
output_file_dict[_constants.ERROR_FILE_NAME] = _error_models.ErrorDocument(
_error_models.ContainerError(
e.error_code, e.verbose_message, e.kind, _execution_models.ExecutionError.ErrorKind.USER
)
)
_logging.error("!! Begin User Error Captured by Flyte !!")
_logging.error(e.verbose_message)
_logging.error("!! End Error Captured by Flyte !!")
logger.error("!! Begin User Error Captured by Flyte !!")
logger.error(e.verbose_message)
logger.error("!! End Error Captured by Flyte !!")

# Handle system-scoped errors
except _scoped_exceptions.FlyteScopedSystemException as e:
if isinstance(e.value, IgnoreOutputs):
_logging.warning(f"System-scoped IgnoreOutputs received! Outputs.pb will not be uploaded. reason {e}!!")
logger.warning(f"System-scoped IgnoreOutputs received! Outputs.pb will not be uploaded. reason {e}!!")
return
output_file_dict[_constants.ERROR_FILE_NAME] = _error_models.ErrorDocument(
_error_models.ContainerError(
e.error_code, e.verbose_message, e.kind, _execution_models.ExecutionError.ErrorKind.SYSTEM
)
)
_logging.error("!! Begin System Error Captured by Flyte !!")
_logging.error(e.verbose_message)
_logging.error("!! End Error Captured by Flyte !!")
logger.error("!! Begin System Error Captured by Flyte !!")
logger.error(e.verbose_message)
logger.error("!! End Error Captured by Flyte !!")

# Interpret all other exceptions (some of which may be caused by the code in the try block outside of
# dispatch_execute) as recoverable system exceptions.
Expand All @@ -166,16 +168,17 @@ def _dispatch_execute(
_execution_models.ExecutionError.ErrorKind.SYSTEM,
)
)
_logging.error(f"Exception when executing task {task_def.name or task_def.id.name}, reason {str(e)}")
_logging.error("!! Begin Unknown System Error Captured by Flyte !!")
_logging.error(exc_str)
_logging.error("!! End Error Captured by Flyte !!")
logger.error(f"Exception when executing task {task_def.name or task_def.id.name}, reason {str(e)}")
logger.error("!! Begin Unknown System Error Captured by Flyte !!")
logger.error(exc_str)
logger.error("!! End Error Captured by Flyte !!")

for k, v in output_file_dict.items():
_common_utils.write_proto_to_file(v.to_flyte_idl(), _os.path.join(ctx.execution_state.engine_dir, k))

ctx.file_access.put_data(ctx.execution_state.engine_dir, output_prefix, is_multipart=True)
_logging.info(f"Engine folder written successfully to the output prefix {output_prefix}")
logger.info(f"Engine folder written successfully to the output prefix {output_prefix}")
logger.debug("Finished _dispatch_execute")


@contextlib.contextmanager
Expand All @@ -184,14 +187,11 @@ def setup_execution(
dynamic_addl_distro: str = None,
dynamic_dest_dir: str = None,
):
log_level = _internal_config.LOGGING_LEVEL.get() or _sdk_config.LOGGING_LEVEL.get()
_logging.getLogger().setLevel(log_level)

ctx = FlyteContextManager.current_context()

# Create directories
user_workspace_dir = ctx.file_access.get_random_local_directory()
_click.echo(f"Using user directory {user_workspace_dir}")
logger.info(f"Using user directory {user_workspace_dir}")
pathlib.Path(user_workspace_dir).mkdir(parents=True, exist_ok=True)
from flytekit import __version__ as _api_version

Expand Down Expand Up @@ -219,7 +219,7 @@ def setup_execution(
"api_version": _api_version,
},
),
logging=_logging,
logging=python_logging,
tmp_dir=user_workspace_dir,
)

Expand All @@ -231,7 +231,7 @@ def setup_execution(
raw_output_prefix=raw_output_data_prefix,
)
except TypeError: # would be thrown from DataPersistencePlugins.find_plugin
_logging.error(f"No data plugin found for raw output prefix {raw_output_data_prefix}")
logger.error(f"No data plugin found for raw output prefix {raw_output_data_prefix}")
raise
else:
raise Exception("No raw output prefix detected. Please upgrade your version of Propeller to 0.4.0 or later.")
Expand Down Expand Up @@ -280,7 +280,6 @@ def _handle_annotated_task(
"""
Entrypoint for all PythonTask extensions
"""
_click.echo("Running native-typed task")
_dispatch_execute(ctx, task_def, inputs, output_prefix)


Expand Down Expand Up @@ -366,7 +365,7 @@ def _execute_task(
# Use the resolver to load the actual task object
_task_def = resolver_obj.load_task(loader_args=resolver_args)
if test:
_click.echo(
logger.info(
f"Test detected, returning. Args were {inputs} {output_prefix} {raw_output_data_prefix} {resolver} {resolver_args}"
)
return
Expand Down Expand Up @@ -401,7 +400,7 @@ def _execute_map_task(
output_prefix = _os.path.join(output_prefix, str(task_index))

if test:
_click.echo(
logger.info(
f"Test detected, returning. Inputs: {inputs} Computed task index: {task_index} "
f"New output prefix: {output_prefix} Raw output path: {raw_output_data_prefix} "
f"Resolver and args: {resolver} {resolver_args}"
Expand Down Expand Up @@ -443,7 +442,9 @@ def execute_task_cmd(
resolver,
resolver_args,
):
_click.echo(_utils.get_version_message())
logger.info(_utils.get_version_message())
# We get weird errors if there are no click echo messages at all, so emit an empty string so that unit tests pass.
_click.echo("")
# Backwards compatibility - if Propeller hasn't filled this in, then it'll come through here as the original
# template string, so let's explicitly set it to None so that the downstream functions will know to fall back
# to the original shard formatter/prefix config.
Expand All @@ -455,10 +456,10 @@ def execute_task_cmd(
# The addition of a new top-level command seemed out of scope at the time of this writing to pursue given how
# pervasive this top level command already (plugins mostly).
if not resolver:
_click.echo("No resolver found, assuming legacy API task...")
logger.info("No resolver found, assuming legacy API task...")
_legacy_execute_task(task_module, task_name, inputs, output_prefix, raw_output_data_prefix, test)
else:
_click.echo(f"Attempting to run with {resolver}...")
logger.debug(f"Running task execution with resolver {resolver}...")
_execute_task(
inputs,
output_prefix,
Expand Down Expand Up @@ -527,7 +528,7 @@ def map_execute_task_cmd(
resolver,
resolver_args,
):
_click.echo(_utils.get_version_message())
logger.info(_utils.get_version_message())

_execute_map_task(
inputs,
Expand Down
3 changes: 2 additions & 1 deletion flytekit/configuration/images.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import typing

from flytekit.configuration import common as _config_common
from flytekit.loggers import logger


def get_specified_images() -> typing.Dict[str, str]:
Expand All @@ -21,7 +22,7 @@ def get_specified_images() -> typing.Dict[str, str]:
try:
image_names = _config_common.CONFIGURATION_SINGLETON.config.options("images")
except configparser.NoSectionError:
print("No images specified, will use the default image")
logger.info("No images specified, will use the default image")
image_names = None
if image_names:
for i in image_names:
Expand Down
57 changes: 44 additions & 13 deletions flytekit/loggers.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,60 @@
import logging as _logging
import os as _os
import logging
import os

from pythonjsonlogger import jsonlogger

logger = _logging.getLogger("flytekit")
# Always set the root logger to debug until we can add more user based controls
logger.setLevel(_logging.WARNING)
# Note:
# The environment variable controls exposed to affect the individual loggers should be considered to be beta.
# The ux/api may change in the future.
# At time of writing, the code was written to preserve existing default behavior
# For now, assume this is the environment variable whose usage will remain unchanged and controls output for all
# loggers defined in this file.
LOGGING_ENV_VAR = "FLYTE_SDK_LOGGING_LEVEL"

# By default, the root flytekit logger to debug so everything is logged, but enable fine-tuning
logger = logging.getLogger("flytekit")
# Root logger control
flytekit_root_env_var = f"{LOGGING_ENV_VAR}_ROOT"
if os.getenv(flytekit_root_env_var) is not None:
logger.setLevel(int(os.getenv(flytekit_root_env_var)))
else:
logger.setLevel(logging.DEBUG)

# Stop propagation so that configuration is isolated to this file (so that it doesn't matter what the
# global Python root logger is set to).
logger.propagate = False

# Child loggers
auth_logger = logger.getChild("auth")
cli_logger = logger.getChild("cli")
remote_logger = logger.getChild("remote")
child_loggers = {
"auth": logger.getChild("auth"),
"cli": logger.getChild("cli"),
"remote": logger.getChild("remote"),
"entrypoint": logger.getChild("entrypoint"),
}
auth_logger = child_loggers["auth"]
cli_logger = child_loggers["cli"]
remote_logger = child_loggers["remote"]
entrypoint_logger = child_loggers["entrypoint"]

# create console handler and set level to debug
ch = _logging.StreamHandler()
# create console handler
ch = logging.StreamHandler()

# Don't want to import the configuration library since that will cause all sorts of circular imports, let's
# just use the environment variable if it's defined. Decide in the future when we implement better controls
# if we should control with the channel or with the logger level.
logging_env_var = "FLYTE_SDK_LOGGING_LEVEL"
level_from_env = _os.getenv(logging_env_var)
# The handler log level controls whether log statements will actually print to the screen
level_from_env = os.getenv(LOGGING_ENV_VAR)
if level_from_env is not None:
ch.setLevel(int(level_from_env))
else:
ch.setLevel(_logging.WARNING)
ch.setLevel(logging.WARNING)

# Consider this API to be beta
for log_name, child_logger in child_loggers.items():
env_var = f"{LOGGING_ENV_VAR}_{log_name.upper()}"
level_from_env = os.getenv(env_var)
if level_from_env is not None:
child_logger.setLevel(int(level_from_env))

# create formatter
formatter = jsonlogger.JsonFormatter(fmt="%(asctime)s %(name)s %(levelname)s %(message)s")
Expand Down

0 comments on commit 94cf315

Please sign in to comment.