From cd259ceb0441252f093be7276e12ea041f8140a1 Mon Sep 17 00:00:00 2001 From: connor-mccarthy Date: Sun, 17 Dec 2023 14:25:02 -0500 Subject: [PATCH] feat(sdk): add local execution logging #localexecution --- sdk/python/kfp/local/e2e_test.py | 55 ----- sdk/python/kfp/local/logging_utils.py | 135 ++++++++++++ sdk/python/kfp/local/logging_utils_test.py | 206 ++++++++++++++++++ .../kfp/local/subprocess_task_handler.py | 6 +- sdk/python/kfp/local/task_dispatcher.py | 74 +++++-- sdk/python/kfp/local/task_dispatcher_test.py | 98 +++++++++ 6 files changed, 493 insertions(+), 81 deletions(-) create mode 100644 sdk/python/kfp/local/logging_utils.py create mode 100644 sdk/python/kfp/local/logging_utils_test.py diff --git a/sdk/python/kfp/local/e2e_test.py b/sdk/python/kfp/local/e2e_test.py index cffaf84638e..93d6622f0da 100644 --- a/sdk/python/kfp/local/e2e_test.py +++ b/sdk/python/kfp/local/e2e_test.py @@ -17,8 +17,6 @@ These can be thought of as local runner conformance tests. The test results should be the same irrespective of the runner. """ -import io -import sys from typing import NamedTuple import unittest @@ -323,58 +321,5 @@ def my_comp(out_param: dsl.OutputPath(str),) -> int: self.assertEqual(task.outputs['Output'], 1) -@parameterized.parameters(ALL_RUNNERS) -class TestExceptionHandling(testing_utilities.LocalRunnerEnvironmentTestCase): - - def setUp(self): - super().setUp() - # capture logs on a test-by-test basis - self.captured_stdout = io.StringIO() - sys.stdout = self.captured_stdout - - def tearDown(self): - super().setUp() - # reset stdout - sys.stdout = sys.__stdout__ - - def test_user_code_throws_exception_if_raise_on_error(self, runner): - local.init(runner=runner, raise_on_error=True) - - @dsl.component - def fail_comp(): - raise Exception('String to match on') - - # use end of line anchor $, since the user code error should be the last thing surfaced to the user - with self.assertRaisesRegex( - RuntimeError, - r'Local execution exited with status FAILURE\.$', - ): - fail_comp() - - self.assertIn( - 'Exception: String to match on', - self.captured_stdout.getvalue(), - ) - - def test_user_code_no_exception_if_not_raise_on_error(self, runner): - local.init(runner=runner, raise_on_error=False) - - @dsl.component - def fail_comp(): - raise Exception('String to match on') - - task = fail_comp() - self.assertDictEqual(task.outputs, {}) - - self.assertIn( - 'Local execution exited with status FAILURE.', - self.captured_stdout.getvalue(), - ) - self.assertIn( - 'Exception: String to match on', - self.captured_stdout.getvalue(), - ) - - if __name__ == '__main__': unittest.main() diff --git a/sdk/python/kfp/local/logging_utils.py b/sdk/python/kfp/local/logging_utils.py new file mode 100644 index 00000000000..dd4d2d90843 --- /dev/null +++ b/sdk/python/kfp/local/logging_utils.py @@ -0,0 +1,135 @@ +# Copyright 2023 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Utilitites for formatting, coloring, and controlling the output of logs.""" +import builtins +import contextlib +import datetime +import logging +from typing import Any, Dict, Generator, List + +from kfp import dsl + + +class Color: + CYAN = '\033[96m' + GREEN = '\033[92m' + RED = '\033[91m' + RESET = '\033[0m' + + +class MillisecondFormatter(logging.Formatter): + + def formatTime( + self, + record: logging.LogRecord, + datefmt: str = None, + ) -> str: + created = datetime.datetime.fromtimestamp(record.created) + s = created.strftime(datefmt) + # truncate microseconds to milliseconds + return s[:-3] + + +@contextlib.contextmanager +def local_logger_context() -> Generator[None, None, None]: + """Context manager for creating and reseting the local execution logger.""" + + logger = logging.getLogger() + original_level = logger.level + original_handlers = logger.handlers[:] + formatter = MillisecondFormatter( + fmt='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%H:%M:%S.%f', + ) + handler = logging.StreamHandler() + handler.setFormatter(formatter) + logger.handlers.clear() + logger.addHandler(handler) + logger.setLevel(logging.INFO) + + try: + yield + finally: + logger.setLevel(original_level) + logger.handlers.clear() + for handler in original_handlers: + logger.addHandler(handler) + + +@contextlib.contextmanager +def indented_print(num_spaces: int = 4) -> Generator[None, None, None]: + """Context manager to indent all print statements in its scope by + num_prints. + + Useful for visually separating a subprocess logs from the outer + process logs. + """ + original_print = builtins.print + + def indented_print_function(*args, **kwargs): + original_print(' ' * num_spaces, end='') + return original_print(*args, **kwargs) + + builtins.print = indented_print_function + try: + yield + finally: + builtins.print = original_print + + +def color_text(text: str, color: Color) -> str: + return f'{color}{text}{Color.RESET}' + + +def make_log_lines_for_artifact(artifact: dsl.Artifact,) -> List[str]: + """Returns a list of log lines that represent a single artifact output.""" + artifact_class_name_and_paren = f'{artifact.__class__.__name__}( ' + # name + artifact_lines = [f'{artifact_class_name_and_paren}name={artifact.name},'] + newline_spaces = len(artifact_class_name_and_paren) * ' ' + # uri + artifact_lines.append(f'{newline_spaces}uri={artifact.uri},') + # metadata + artifact_lines.append(f'{newline_spaces}metadata={artifact.metadata} )') + return artifact_lines + + +def make_log_lines_for_outputs(outputs: Dict[str, Any]) -> List[str]: + """Returns a list of log lines to repesent the outputs of a task.""" + INDENT = ' ' * 4 + SEPARATOR = ': ' + output_lines = [] + for key, value in outputs.items(): + key_chars = INDENT + key + SEPARATOR + + # present artifacts + if isinstance(value, dsl.Artifact): + artifact_lines = make_log_lines_for_artifact(value) + + first_artifact_line = artifact_lines[0] + output_lines.append(f'{key_chars}{first_artifact_line}') + + remaining_artifact_lines = artifact_lines[1:] + # indent to align with first char in artifact + # to visually separate output keys + remaining_artifact_lines = [ + len(key_chars) * ' ' + l for l in remaining_artifact_lines + ] + output_lines.extend(remaining_artifact_lines) + + # present params + else: + output_lines.append(f'{key_chars}{value}') + + return output_lines diff --git a/sdk/python/kfp/local/logging_utils_test.py b/sdk/python/kfp/local/logging_utils_test.py new file mode 100644 index 00000000000..9438863e16d --- /dev/null +++ b/sdk/python/kfp/local/logging_utils_test.py @@ -0,0 +1,206 @@ +# Copyright 2023 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for logging_utils.py.""" + +import io +import unittest +from unittest import mock + +from kfp import dsl +from kfp.local import logging_utils + + +class TestIndentedPrint(unittest.TestCase): + + @mock.patch('sys.stdout', new_callable=io.StringIO) + def test(self, mocked_stdout): + with logging_utils.indented_print(num_spaces=6): + print('foo should be indented') + expected = ' foo should be indented\n' + actual = mocked_stdout.getvalue() + self.assertEqual( + actual, + expected, + ) + + +class TestColorText(unittest.TestCase): + + def test_cyan(self): + + actual = logging_utils.color_text( + 'text to color', + logging_utils.Color.CYAN, + ) + expected = '\x1b[91mtext to color\x1b[0m' + self.assertEqual(actual, expected) + + def test_cyan(self): + + actual = logging_utils.color_text( + 'text to color', + logging_utils.Color.RED, + ) + expected = '\x1b[91mtext to color\x1b[0m' + self.assertEqual(actual, expected) + + +class TestRenderArtifact(unittest.TestCase): + + def test_empty(self): + actual = logging_utils.make_log_lines_for_artifact(dsl.Artifact()) + expected = [ + 'Artifact( name=,', + ' uri=,', + ' metadata={} )', + ] + self.assertListEqual(actual, expected) + + def test_contains_value(self): + actual = logging_utils.make_log_lines_for_artifact( + dsl.Model( + name='my_artifact', + uri='/local/foo/bar', + metadata={ + 'dict_field': { + 'baz': 'bat' + }, + 'float_field': 3.14 + })) + expected = [ + 'Model( name=my_artifact,', + ' uri=/local/foo/bar,', + " metadata={'dict_field': {'baz': 'bat'}, 'float_field': 3.14} )", + ] + self.assertListEqual(actual, expected) + + +class TestMakeLogLinesForOutputs(unittest.TestCase): + + def test_empty(self): + actual = logging_utils.make_log_lines_for_outputs(dict()) + expected = [] + self.assertListEqual(actual, expected) + + def test_only_params(self): + actual = logging_utils.make_log_lines_for_outputs({ + 'foo': 'bar', + 'baz': 100, + 'bat': 1.0, + 'brap': True, + 'my_list': [1, 2, 3], + 'my_dict': { + 'foo': 'bar' + } + }) + expected = [ + ' foo: bar', + ' baz: 100', + ' bat: 1.0', + ' brap: True', + ' my_list: [1, 2, 3]', + " my_dict: {'foo': 'bar'}", + ] + self.assertListEqual(actual, expected) + + def test_only_artifacts(self): + actual = logging_utils.make_log_lines_for_outputs({ + 'my_artifact': + dsl.Artifact(name=''), + 'my_model': + dsl.Model( + name='my_artifact', + uri='/local/foo/bar/1234567890/1234567890/1234567890/1234567890/1234567890', + metadata={ + 'dict_field': { + 'baz': 'bat' + }, + 'float_field': 3.14 + }), + 'my_dataset': + dsl.Dataset( + name='my_dataset', + uri='/local/foo/baz', + metadata={}, + ), + }) + expected = [ + ' my_artifact: Artifact( name=,', + ' uri=,', + ' metadata={} )', + ' my_model: Model( name=my_artifact,', + ' uri=/local/foo/bar/1234567890/1234567890/1234567890/1234567890/1234567890,', + " metadata={'dict_field': {'baz': 'bat'}, 'float_field': 3.14} )", + ' my_dataset: Dataset( name=my_dataset,', + ' uri=/local/foo/baz,', + ' metadata={} )', + ] + self.assertListEqual(actual, expected) + + def test_mix_params_and_artifacts(self): + actual = logging_utils.make_log_lines_for_outputs({ + 'foo': + 'bar', + 'baz': + 100, + 'bat': + 1.0, + 'brap': + True, + 'my_list': [1, 2, 3], + 'my_dict': { + 'foo': 'bar' + }, + 'my_artifact': + dsl.Artifact(name=''), + 'my_model': + dsl.Model( + name='my_artifact', + uri='/local/foo/bar/1234567890/1234567890/1234567890/1234567890/1234567890', + metadata={ + 'dict_field': { + 'baz': 'bat' + }, + 'float_field': 3.14 + }), + 'my_dataset': + dsl.Dataset( + name='my_dataset', + uri='/local/foo/baz', + metadata={}, + ), + }) + expected = [ + ' foo: bar', + ' baz: 100', + ' bat: 1.0', + ' brap: True', + ' my_list: [1, 2, 3]', + " my_dict: {'foo': 'bar'}", + ' my_artifact: Artifact( name=,', + ' uri=,', + ' metadata={} )', + ' my_model: Model( name=my_artifact,', + ' uri=/local/foo/bar/1234567890/1234567890/1234567890/1234567890/1234567890,', + " metadata={'dict_field': {'baz': 'bat'}, 'float_field': 3.14} )", + ' my_dataset: Dataset( name=my_dataset,', + ' uri=/local/foo/baz,', + ' metadata={} )', + ] + + self.assertListEqual(actual, expected) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdk/python/kfp/local/subprocess_task_handler.py b/sdk/python/kfp/local/subprocess_task_handler.py index 5821746420d..b22d70142f7 100644 --- a/sdk/python/kfp/local/subprocess_task_handler.py +++ b/sdk/python/kfp/local/subprocess_task_handler.py @@ -95,8 +95,10 @@ def run_local_subprocess(full_command: List[str]) -> int: with subprocess.Popen( full_command, stdout=subprocess.PIPE, - # no change to behavior in terminal for user, - # but allows more seamless capture/testing of subprocess logs + # No change to behavior in terminal for user, + # but inner process logs redirected to stdout. This separates from + # the outer process logs which, per logging module default, go to + # stderr. stderr=subprocess.STDOUT, text=True, # buffer line-by-line diff --git a/sdk/python/kfp/local/task_dispatcher.py b/sdk/python/kfp/local/task_dispatcher.py index 0e73cdfecf5..4c9f96158ca 100644 --- a/sdk/python/kfp/local/task_dispatcher.py +++ b/sdk/python/kfp/local/task_dispatcher.py @@ -12,12 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. """Code for dispatching a local task execution.""" +import logging from typing import Any, Dict from kfp import local from kfp.local import config from kfp.local import executor_input_utils from kfp.local import executor_output_utils +from kfp.local import logging_utils from kfp.local import placeholder_utils from kfp.local import status from kfp.local import subprocess_task_handler @@ -108,33 +110,57 @@ def _run_single_component_implementation( subprocess_task_handler.SubprocessTaskHandler, } TaskHandler = task_handler_map[runner_type] - # TODO: add logging throughout for observability of state, execution progress, outputs, errors, etc. - task_handler = TaskHandler( - image=image, - full_command=full_command, - pipeline_root=pipeline_root, - runner=runner, - ) - task_status = task_handler.run() + with logging_utils.local_logger_context(): + task_name_for_logs = logging_utils.color_text( + f'{task_resource_name!r}', + logging_utils.Color.CYAN, + ) - if task_status == status.Status.SUCCESS: - outputs = executor_output_utils.get_outputs_for_task( - executor_input=executor_input, - component_spec=component_spec, + logging.info(f'Executing task {task_name_for_logs}') + task_handler = TaskHandler( + image=image, + full_command=full_command, + pipeline_root=pipeline_root, + runner=runner, ) - elif task_status == status.Status.FAILURE: - msg = f'Local execution exited with status {task_status.name}.' - if raise_on_error: - raise RuntimeError(msg) - else: - # TODO: replace with robust logging - print(msg) - outputs = {} + # trailing newline helps visually separate subprocess logs + logging.info(f'Streamed logs:\n') + + with logging_utils.indented_print(): + # subprocess logs printed here + task_status = task_handler.run() + + if task_status == status.Status.SUCCESS: + logging.info( + f'Task {task_name_for_logs} finished with status {logging_utils.color_text(task_status.value, logging_utils.Color.GREEN)}' + ) + + outputs = executor_output_utils.get_outputs_for_task( + executor_input=executor_input, + component_spec=component_spec, + ) + if outputs: + output_string = [ + f'Task {task_name_for_logs} outputs:', + *logging_utils.make_log_lines_for_outputs(outputs), + '\n', + ] + logging.info('\n'.join(output_string)) + else: + logging.info(f'Task {task_name_for_logs} has no outputs') + + elif task_status == status.Status.FAILURE: + msg = f'Task {task_name_for_logs} finished with status {logging_utils.color_text(task_status.value, logging_utils.Color.RED)}' + if raise_on_error: + raise RuntimeError(msg) + else: + logging.error(msg) + outputs = {} - else: - # for developers; user should never hit this - raise ValueError(f'Got unknown status: {task_status}') + else: + # for developers; user should never hit this + raise ValueError(f'Got unknown status: {task_status}') - return outputs + return outputs diff --git a/sdk/python/kfp/local/task_dispatcher_test.py b/sdk/python/kfp/local/task_dispatcher_test.py index c158fb25e68..02e94a1dde1 100644 --- a/sdk/python/kfp/local/task_dispatcher_test.py +++ b/sdk/python/kfp/local/task_dispatcher_test.py @@ -19,12 +19,16 @@ irrespective of the runner. While there will inevitably some overlap, we should seek to minimize it. """ +import io import unittest +from unittest import mock from absl.testing import parameterized from kfp import dsl from kfp import local from kfp.dsl import Artifact +from kfp.dsl import Model +from kfp.dsl import Output from kfp.local import testing_utilities ALL_RUNNERS = [ @@ -184,5 +188,99 @@ def identity(x: str) -> str: self.assertEqual(actual, expected) +@parameterized.parameters(ALL_RUNNERS) +class TestExceptionHandlingAndLogging( + testing_utilities.LocalRunnerEnvironmentTestCase): + + @mock.patch('sys.stdout', new_callable=io.StringIO) + def test_user_code_throws_exception_if_raise_on_error( + self, + runner, + mock_stdout, + ): + local.init(runner=runner, raise_on_error=True) + + @dsl.component + def fail_comp(): + raise Exception('String to match on') + + with self.assertRaisesRegex( + RuntimeError, + r"Task \x1b\[96m'fail-comp'\x1b\[0m finished with status \x1b\[91mFAILURE\x1b\[0m", + ): + fail_comp() + + self.assertIn( + 'Exception: String to match on', + mock_stdout.getvalue(), + ) + + @mock.patch('sys.stdout', new_callable=io.StringIO) + @mock.patch('sys.stderr', new_callable=io.StringIO) + def test_user_code_no_exception_if_not_raise_on_error( + self, + runner, + mock_stderr, + mock_stdout, + ): + local.init(runner=runner, raise_on_error=False) + + @dsl.component + def fail_comp(): + raise Exception('String to match on') + + task = fail_comp() + self.assertDictEqual(task.outputs, {}) + + self.assertRegex( + mock_stderr.getvalue(), + r"\d+:\d+:\d+\.\d+ - ERROR - Task \x1b\[96m'fail-comp'\x1b\[0m finished with status \x1b\[91mFAILURE\x1b\[0m", + ) + self.assertIn( + 'Exception: String to match on', + mock_stdout.getvalue(), + ) + + @mock.patch('sys.stdout', new_callable=io.StringIO) + @mock.patch('sys.stderr', new_callable=io.StringIO) + def test_all_logs( + self, + runner, + mock_stderr, + mock_stdout, + ): + local.init(runner=runner) + + @dsl.component + def many_type_component( + num: int, + model: Output[Model], + ) -> str: + print('Inside of my component!') + model.metadata['foo'] = 'bar' + return 'hello' * num + + many_type_component(num=2) + + # outer process logs in stderr + outer_log_regex = ( + r"\d+:\d+:\d+\.\d+ - INFO - Executing task \x1b\[96m'many-type-component'\x1b\[0m\n" + + r'\d+:\d+:\d+\.\d+ - INFO - Streamed logs:\n\n' + + r"\d+:\d+:\d+\.\d+ - INFO - Task \x1b\[96m'many-type-component'\x1b\[0m finished with status \x1b\[92mSUCCESS\x1b\[0m\n" + + + r"\d+:\d+:\d+\.\d+ - INFO - Task \x1b\[96m'many-type-component'\x1b\[0m outputs:\n Output: hellohello\n model: Model\( name=model,\n uri=\./local_outputs/many-type-component-\d+-\d+-\d+-\d+-\d+-\d+-\d+/many-type-component/model,\n metadata={'foo': 'bar'} \)\n\n" + ) + + self.assertRegex( + mock_stderr.getvalue(), + outer_log_regex, + ) + # inner process logs in stdout + self.assertIn('[KFP Executor', mock_stdout.getvalue()) + self.assertIn('Got executor_input:', mock_stdout.getvalue()) + self.assertIn('Inside of my component!', mock_stdout.getvalue()) + self.assertIn('Wrote executor output file to', mock_stdout.getvalue()) + + if __name__ == '__main__': unittest.main()