Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk): add local execution logging #localexecution #10326

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 0 additions & 55 deletions sdk/python/kfp/local/e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

These can be thought of as local runner conformance tests. The test results should be the same irrespective of the runner.
"""
import io
import sys
from typing import NamedTuple
import unittest

Expand Down Expand Up @@ -323,58 +321,5 @@ def my_comp(out_param: dsl.OutputPath(str),) -> int:
self.assertEqual(task.outputs['Output'], 1)


@parameterized.parameters(ALL_RUNNERS)
class TestExceptionHandling(testing_utilities.LocalRunnerEnvironmentTestCase):

def setUp(self):
super().setUp()
# capture logs on a test-by-test basis
self.captured_stdout = io.StringIO()
sys.stdout = self.captured_stdout

def tearDown(self):
super().setUp()
# reset stdout
sys.stdout = sys.__stdout__

def test_user_code_throws_exception_if_raise_on_error(self, runner):
local.init(runner=runner, raise_on_error=True)

@dsl.component
def fail_comp():
raise Exception('String to match on')

# use end of line anchor $, since the user code error should be the last thing surfaced to the user
with self.assertRaisesRegex(
RuntimeError,
r'Local execution exited with status FAILURE\.$',
):
fail_comp()

self.assertIn(
'Exception: String to match on',
self.captured_stdout.getvalue(),
)

def test_user_code_no_exception_if_not_raise_on_error(self, runner):
local.init(runner=runner, raise_on_error=False)

@dsl.component
def fail_comp():
raise Exception('String to match on')

task = fail_comp()
self.assertDictEqual(task.outputs, {})

self.assertIn(
'Local execution exited with status FAILURE.',
self.captured_stdout.getvalue(),
)
self.assertIn(
'Exception: String to match on',
self.captured_stdout.getvalue(),
)


if __name__ == '__main__':
unittest.main()
135 changes: 135 additions & 0 deletions sdk/python/kfp/local/logging_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Copyright 2023 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilitites for formatting, coloring, and controlling the output of logs."""
import builtins
import contextlib
import datetime
import logging
from typing import Any, Dict, Generator, List

from kfp import dsl


class Color:
CYAN = '\033[96m'
GREEN = '\033[92m'
RED = '\033[91m'
RESET = '\033[0m'


class MillisecondFormatter(logging.Formatter):

def formatTime(
self,
record: logging.LogRecord,
datefmt: str = None,
) -> str:
created = datetime.datetime.fromtimestamp(record.created)
s = created.strftime(datefmt)
# truncate microseconds to milliseconds
return s[:-3]


@contextlib.contextmanager
def local_logger_context() -> Generator[None, None, None]:
"""Context manager for creating and reseting the local execution logger."""

logger = logging.getLogger()
original_level = logger.level
original_handlers = logger.handlers[:]
formatter = MillisecondFormatter(
fmt='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S.%f',
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.handlers.clear()
logger.addHandler(handler)
logger.setLevel(logging.INFO)

try:
yield
finally:
logger.setLevel(original_level)
logger.handlers.clear()
for handler in original_handlers:
logger.addHandler(handler)


@contextlib.contextmanager
def indented_print(num_spaces: int = 4) -> Generator[None, None, None]:
"""Context manager to indent all print statements in its scope by
num_prints.

Useful for visually separating a subprocess logs from the outer
process logs.
"""
original_print = builtins.print

def indented_print_function(*args, **kwargs):
original_print(' ' * num_spaces, end='')
return original_print(*args, **kwargs)

builtins.print = indented_print_function
try:
yield
finally:
builtins.print = original_print


def color_text(text: str, color: Color) -> str:
return f'{color}{text}{Color.RESET}'


def make_log_lines_for_artifact(artifact: dsl.Artifact,) -> List[str]:
"""Returns a list of log lines that represent a single artifact output."""
artifact_class_name_and_paren = f'{artifact.__class__.__name__}( '
# name
artifact_lines = [f'{artifact_class_name_and_paren}name={artifact.name},']
newline_spaces = len(artifact_class_name_and_paren) * ' '
# uri
artifact_lines.append(f'{newline_spaces}uri={artifact.uri},')
# metadata
artifact_lines.append(f'{newline_spaces}metadata={artifact.metadata} )')
return artifact_lines


def make_log_lines_for_outputs(outputs: Dict[str, Any]) -> List[str]:
"""Returns a list of log lines to repesent the outputs of a task."""
INDENT = ' ' * 4
SEPARATOR = ': '
output_lines = []
for key, value in outputs.items():
key_chars = INDENT + key + SEPARATOR

# present artifacts
if isinstance(value, dsl.Artifact):
artifact_lines = make_log_lines_for_artifact(value)

first_artifact_line = artifact_lines[0]
output_lines.append(f'{key_chars}{first_artifact_line}')

remaining_artifact_lines = artifact_lines[1:]
# indent to align with first char in artifact
# to visually separate output keys
remaining_artifact_lines = [
len(key_chars) * ' ' + l for l in remaining_artifact_lines
]
output_lines.extend(remaining_artifact_lines)

# present params
else:
output_lines.append(f'{key_chars}{value}')

return output_lines
Loading