Skip to content

Commit

Permalink
feat(sdk): add local execution logging #localexecution (#10326)
Browse files Browse the repository at this point in the history
  • Loading branch information
connor-mccarthy authored Dec 18, 2023
1 parent d0da0ad commit 7849272
Show file tree
Hide file tree
Showing 6 changed files with 493 additions and 81 deletions.
55 changes: 0 additions & 55 deletions sdk/python/kfp/local/e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
These can be thought of as local runner conformance tests. The test results should be the same irrespective of the runner.
"""
import io
import sys
from typing import NamedTuple
import unittest

Expand Down Expand Up @@ -323,58 +321,5 @@ def my_comp(out_param: dsl.OutputPath(str),) -> int:
self.assertEqual(task.outputs['Output'], 1)


@parameterized.parameters(ALL_RUNNERS)
class TestExceptionHandling(testing_utilities.LocalRunnerEnvironmentTestCase):

def setUp(self):
super().setUp()
# capture logs on a test-by-test basis
self.captured_stdout = io.StringIO()
sys.stdout = self.captured_stdout

def tearDown(self):
super().setUp()
# reset stdout
sys.stdout = sys.__stdout__

def test_user_code_throws_exception_if_raise_on_error(self, runner):
local.init(runner=runner, raise_on_error=True)

@dsl.component
def fail_comp():
raise Exception('String to match on')

# use end of line anchor $, since the user code error should be the last thing surfaced to the user
with self.assertRaisesRegex(
RuntimeError,
r'Local execution exited with status FAILURE\.$',
):
fail_comp()

self.assertIn(
'Exception: String to match on',
self.captured_stdout.getvalue(),
)

def test_user_code_no_exception_if_not_raise_on_error(self, runner):
local.init(runner=runner, raise_on_error=False)

@dsl.component
def fail_comp():
raise Exception('String to match on')

task = fail_comp()
self.assertDictEqual(task.outputs, {})

self.assertIn(
'Local execution exited with status FAILURE.',
self.captured_stdout.getvalue(),
)
self.assertIn(
'Exception: String to match on',
self.captured_stdout.getvalue(),
)


if __name__ == '__main__':
unittest.main()
135 changes: 135 additions & 0 deletions sdk/python/kfp/local/logging_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Copyright 2023 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilitites for formatting, coloring, and controlling the output of logs."""
import builtins
import contextlib
import datetime
import logging
from typing import Any, Dict, Generator, List

from kfp import dsl


class Color:
CYAN = '\033[96m'
GREEN = '\033[92m'
RED = '\033[91m'
RESET = '\033[0m'


class MillisecondFormatter(logging.Formatter):

def formatTime(
self,
record: logging.LogRecord,
datefmt: str = None,
) -> str:
created = datetime.datetime.fromtimestamp(record.created)
s = created.strftime(datefmt)
# truncate microseconds to milliseconds
return s[:-3]


@contextlib.contextmanager
def local_logger_context() -> Generator[None, None, None]:
"""Context manager for creating and reseting the local execution logger."""

logger = logging.getLogger()
original_level = logger.level
original_handlers = logger.handlers[:]
formatter = MillisecondFormatter(
fmt='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S.%f',
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.handlers.clear()
logger.addHandler(handler)
logger.setLevel(logging.INFO)

try:
yield
finally:
logger.setLevel(original_level)
logger.handlers.clear()
for handler in original_handlers:
logger.addHandler(handler)


@contextlib.contextmanager
def indented_print(num_spaces: int = 4) -> Generator[None, None, None]:
"""Context manager to indent all print statements in its scope by
num_prints.
Useful for visually separating a subprocess logs from the outer
process logs.
"""
original_print = builtins.print

def indented_print_function(*args, **kwargs):
original_print(' ' * num_spaces, end='')
return original_print(*args, **kwargs)

builtins.print = indented_print_function
try:
yield
finally:
builtins.print = original_print


def color_text(text: str, color: Color) -> str:
return f'{color}{text}{Color.RESET}'


def make_log_lines_for_artifact(artifact: dsl.Artifact,) -> List[str]:
"""Returns a list of log lines that represent a single artifact output."""
artifact_class_name_and_paren = f'{artifact.__class__.__name__}( '
# name
artifact_lines = [f'{artifact_class_name_and_paren}name={artifact.name},']
newline_spaces = len(artifact_class_name_and_paren) * ' '
# uri
artifact_lines.append(f'{newline_spaces}uri={artifact.uri},')
# metadata
artifact_lines.append(f'{newline_spaces}metadata={artifact.metadata} )')
return artifact_lines


def make_log_lines_for_outputs(outputs: Dict[str, Any]) -> List[str]:
"""Returns a list of log lines to repesent the outputs of a task."""
INDENT = ' ' * 4
SEPARATOR = ': '
output_lines = []
for key, value in outputs.items():
key_chars = INDENT + key + SEPARATOR

# present artifacts
if isinstance(value, dsl.Artifact):
artifact_lines = make_log_lines_for_artifact(value)

first_artifact_line = artifact_lines[0]
output_lines.append(f'{key_chars}{first_artifact_line}')

remaining_artifact_lines = artifact_lines[1:]
# indent to align with first char in artifact
# to visually separate output keys
remaining_artifact_lines = [
len(key_chars) * ' ' + l for l in remaining_artifact_lines
]
output_lines.extend(remaining_artifact_lines)

# present params
else:
output_lines.append(f'{key_chars}{value}')

return output_lines
Loading

0 comments on commit 7849272

Please sign in to comment.