From 3104ddca9e407dc4c37fbf5126c6f5eefae5a45e Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Wed, 21 Feb 2024 13:54:57 +0100 Subject: [PATCH] gh-104090: Add exit code to multiprocessing ResourceTracker (GH-115410) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This builds on https://github.com/python/cpython/pull/106807, which adds a return code to ResourceTracker, to make future debugging easier. Testing this “in situ” proved difficult, since the global ResourceTracker is involved in test infrastructure. So, the tests here create a new instance and feed it fake data. --------- Co-authored-by: Yonatan Bitton Co-authored-by: Yonatan Bitton Co-authored-by: Antoine Pitrou --- Lib/multiprocessing/resource_tracker.py | 38 ++++++++++++++++--- Lib/test/_test_multiprocessing.py | 35 ++++++++++++++++- Lib/test/test_concurrent_futures/test_init.py | 26 +++++++++++++ ...-07-16-15-02-47.gh-issue-104090.oMjNa9.rst | 2 + 4 files changed, 94 insertions(+), 7 deletions(-) create mode 100644 Misc/NEWS.d/next/Core and Builtins/2023-07-16-15-02-47.gh-issue-104090.oMjNa9.rst diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py index 8e41f461cc934e0..20ddd9c50e3d884 100644 --- a/Lib/multiprocessing/resource_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -29,8 +29,12 @@ _HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask') _IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM) +def cleanup_noop(name): + raise RuntimeError('noop should never be registered or cleaned up') + _CLEANUP_FUNCS = { - 'noop': lambda: None, + 'noop': cleanup_noop, + 'dummy': lambda name: None, # Dummy resource used in tests } if os.name == 'posix': @@ -61,6 +65,7 @@ def __init__(self): self._lock = threading.RLock() self._fd = None self._pid = None + self._exitcode = None def _reentrant_call_error(self): # gh-109629: this happens if an explicit call to the ResourceTracker @@ -84,9 +89,16 @@ def _stop(self): os.close(self._fd) self._fd = None - os.waitpid(self._pid, 0) + _, status = os.waitpid(self._pid, 0) + self._pid = None + try: + self._exitcode = os.waitstatus_to_exitcode(status) + except ValueError: + # os.waitstatus_to_exitcode may raise an exception for invalid values + self._exitcode = None + def getfd(self): self.ensure_running() return self._fd @@ -119,6 +131,7 @@ def ensure_running(self): pass self._fd = None self._pid = None + self._exitcode = None warnings.warn('resource_tracker: process died unexpectedly, ' 'relaunching. Some resources might leak.') @@ -221,6 +234,8 @@ def main(fd): pass cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()} + exit_code = 0 + try: # keep track of registered/unregistered resources with open(fd, 'rb') as f: @@ -242,6 +257,7 @@ def main(fd): else: raise RuntimeError('unrecognized command %r' % cmd) except Exception: + exit_code = 3 try: sys.excepthook(*sys.exc_info()) except: @@ -251,10 +267,17 @@ def main(fd): for rtype, rtype_cache in cache.items(): if rtype_cache: try: - warnings.warn( - f'resource_tracker: There appear to be {len(rtype_cache)} ' - f'leaked {rtype} objects to clean up at shutdown: {rtype_cache}' - ) + exit_code = 1 + if rtype == 'dummy': + # The test 'dummy' resource is expected to leak. + # We skip the warning (and *only* the warning) for it. + pass + else: + warnings.warn( + f'resource_tracker: There appear to be ' + f'{len(rtype_cache)} leaked {rtype} objects to ' + f'clean up at shutdown: {rtype_cache}' + ) except Exception: pass for name in rtype_cache: @@ -265,6 +288,9 @@ def main(fd): try: _CLEANUP_FUNCS[rtype](name) except Exception as e: + exit_code = 2 warnings.warn('resource_tracker: %r: %s' % (name, e)) finally: pass + + sys.exit(exit_code) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 94ce85cac754ae5..e4183eea959dd58 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -5609,8 +5609,9 @@ def create_and_register_resource(rtype): ''' for rtype in resource_tracker._CLEANUP_FUNCS: with self.subTest(rtype=rtype): - if rtype == "noop": + if rtype in ("noop", "dummy"): # Artefact resource type used by the resource_tracker + # or tests continue r, w = os.pipe() p = subprocess.Popen([sys.executable, @@ -5730,6 +5731,38 @@ def test_too_long_name_resource(self): with self.assertRaises(ValueError): resource_tracker.register(too_long_name_resource, rtype) + def _test_resource_tracker_leak_resources(self, cleanup): + # We use a separate instance for testing, since the main global + # _resource_tracker may be used to watch test infrastructure. + from multiprocessing.resource_tracker import ResourceTracker + tracker = ResourceTracker() + tracker.ensure_running() + self.assertTrue(tracker._check_alive()) + + self.assertIsNone(tracker._exitcode) + tracker.register('somename', 'dummy') + if cleanup: + tracker.unregister('somename', 'dummy') + expected_exit_code = 0 + else: + expected_exit_code = 1 + + self.assertTrue(tracker._check_alive()) + self.assertIsNone(tracker._exitcode) + tracker._stop() + self.assertEqual(tracker._exitcode, expected_exit_code) + + def test_resource_tracker_exit_code(self): + """ + Test the exit code of the resource tracker. + + If no leaked resources were found, exit code should be 0, otherwise 1 + """ + for cleanup in [True, False]: + with self.subTest(cleanup=cleanup): + self._test_resource_tracker_leak_resources( + cleanup=cleanup, + ) class TestSimpleQueue(unittest.TestCase): diff --git a/Lib/test/test_concurrent_futures/test_init.py b/Lib/test/test_concurrent_futures/test_init.py index ce01e0ff0f287a0..d79a6367701fb4b 100644 --- a/Lib/test/test_concurrent_futures/test_init.py +++ b/Lib/test/test_concurrent_futures/test_init.py @@ -3,6 +3,7 @@ import queue import time import unittest +import sys from concurrent.futures._base import BrokenExecutor from logging.handlers import QueueHandler @@ -109,6 +110,31 @@ def _assert_logged(self, msg): create_executor_tests(globals(), FailingInitializerMixin) +@unittest.skipIf(sys.platform == "win32", "Resource Tracker doesn't run on Windows") +class FailingInitializerResourcesTest(unittest.TestCase): + """ + Source: https://github.com/python/cpython/issues/104090 + """ + + def _test(self, test_class): + runner = unittest.TextTestRunner() + runner.run(test_class('test_initializer')) + + # GH-104090: + # Stop resource tracker manually now, so we can verify there are not leaked resources by checking + # the process exit code + from multiprocessing.resource_tracker import _resource_tracker + _resource_tracker._stop() + + self.assertEqual(_resource_tracker._exitcode, 0) + + def test_spawn(self): + self._test(ProcessPoolSpawnFailingInitializerTest) + + def test_forkserver(self): + self._test(ProcessPoolForkserverFailingInitializerTest) + + def setUpModule(): setup_module() diff --git a/Misc/NEWS.d/next/Core and Builtins/2023-07-16-15-02-47.gh-issue-104090.oMjNa9.rst b/Misc/NEWS.d/next/Core and Builtins/2023-07-16-15-02-47.gh-issue-104090.oMjNa9.rst new file mode 100644 index 000000000000000..e581d291d047aee --- /dev/null +++ b/Misc/NEWS.d/next/Core and Builtins/2023-07-16-15-02-47.gh-issue-104090.oMjNa9.rst @@ -0,0 +1,2 @@ +The multiprocessing resource tracker now exits with non-zero status code if a resource +leak was detected. It still exits with status code 0 otherwise.