Skip to content

Commit

Permalink
Merge branch 'pythongh-104090-leaked-semaphore-objects-in-test_concur…
Browse files Browse the repository at this point in the history
…rent_futures-debug-session' into pythongh-104090-fix-leaked-semaphors-on-test_concurrent_futures
  • Loading branch information
bityob committed Jul 15, 2023
2 parents 09e72be + 9923f46 commit edcafcf
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 5 deletions.
1 change: 1 addition & 0 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,7 @@ def __init__(self, max_workers=None, mp_context=None,
# tracebacks in the queue's own worker thread. But we detect killed
# processes anyway, so silence the tracebacks.
self._call_queue._ignore_epipe = True
print(f"[pid={os.getpid()}, tid={threading.get_native_id()}] ProcessPoolExecutor.__init__: mp_context.SimpleQueue()")
self._result_queue = mp_context.SimpleQueue()
self._work_ids = queue.Queue()

Expand Down
4 changes: 4 additions & 0 deletions Lib/multiprocessing/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@ def __init__(self, maxsize=0, *, ctx):
from .synchronize import SEM_VALUE_MAX as maxsize
self._maxsize = maxsize
self._reader, self._writer = connection.Pipe(duplex=False)
print(f"[pid={os.getpid()}, tid={threading.get_native_id()}] Queue __init__: Set self._rlock = ctx.Lock()")
self._rlock = ctx.Lock()
self._opid = os.getpid()
if sys.platform == 'win32':
self._wlock = None
else:
print(f"[pid={os.getpid()}, tid={threading.get_native_id()}] Queue __init__: Set self._wlock = ctx.Lock()")
self._wlock = ctx.Lock()
print(f"[pid={os.getpid()}, tid={threading.get_native_id()}] Queue __init__: Set ctx.BoundedSemaphore(maxsize)")
self._sem = ctx.BoundedSemaphore(maxsize)
# For use by concurrent.futures
self._ignore_epipe = False
Expand Down Expand Up @@ -345,6 +348,7 @@ def __init__(self, *, ctx):
if sys.platform == 'win32':
self._wlock = None
else:
print(f"[pid={os.getpid()}, tid={threading.get_native_id()}] SimpleQueue.__init__: self._wlock = ctx.Lock()")
self._wlock = ctx.Lock()

def close(self):
Expand Down
32 changes: 30 additions & 2 deletions Lib/multiprocessing/resource_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@
'noop': lambda: None,
}

def semwrapper():
def inner(name):
with open("/dev/stdin", "w") as sys.stdin:
with open("/dev/stdout", "w") as sys.stdout:
# import traceback
# traceback.print_stack()
_multiprocessing.sem_unlink(name)
return inner


if os.name == 'posix':
import _multiprocessing
import _posixshmem
Expand All @@ -43,8 +53,11 @@
# absence of POSIX named semaphores. In that case, no named semaphores were
# ever opened, so no cleanup would be necessary.
if hasattr(_multiprocessing, 'sem_unlink'):
# import traceback
# traceback.print_stack()
# print("add _CLEANUP_FUNCS for semaphore")
_CLEANUP_FUNCS.update({
'semaphore': _multiprocessing.sem_unlink,
'semaphore': semwrapper(), #_multiprocessing.sem_unlink,
})
_CLEANUP_FUNCS.update({
'shared_memory': _posixshmem.shm_unlink,
Expand All @@ -65,6 +78,8 @@ def _stop(self):
return

# closing the "alive" file descriptor stops main()
import os, threading
print(f"[pid={os.getpid()}, tid={threading.get_native_id()}] ResourceTracker._stop: os.close(self._fd)")
os.close(self._fd)
self._fd = None

Expand Down Expand Up @@ -151,11 +166,13 @@ def _check_alive(self):
return True

def register(self, name, rtype):
# print(f"send UNREGISTER {name} {rtype}")
'''Register name of resource with resource tracker.'''
self._send('REGISTER', name, rtype)

def unregister(self, name, rtype):
'''Unregister name of resource with resource tracker.'''
# print(f"send UNREGISTER {name} {rtype}")
self._send('UNREGISTER', name, rtype)

def _send(self, cmd, name, rtype):
Expand All @@ -177,6 +194,9 @@ def _send(self, cmd, name, rtype):
getfd = _resource_tracker.getfd

def main(fd):
# import pdb; pdb.set_trace()
# import traceback
# traceback.print_stack()
'''Run resource tracker.'''
# protect the process from ^C and "killall python" etc
signal.signal(signal.SIGINT, signal.SIG_IGN)
Expand All @@ -197,15 +217,21 @@ def main(fd):
for line in f:
try:
cmd, name, rtype = line.strip().decode('ascii').split(':')
with open("/dev/stdin", "w") as sys.stdin, open("/dev/stdout", "w") as sys.stdout:
print(f"[pid={os.getpid()}, tid={threading.get_native_id()}] {cmd=}, {name=}, {rtype=}")
cleanup_func = _CLEANUP_FUNCS.get(rtype, None)
if cleanup_func is None:
raise ValueError(
f'Cannot register {name} for automatic cleanup: '
f'unknown resource type {rtype}')

if cmd == 'REGISTER':
# warnings.warn(f"REGISTER:{rtype}:{name}:{fd}")
cache[rtype].add(name)
elif cmd == 'UNREGISTER':
# import traceback
# traceback.print_stack()
# warnings.warn(f"UNREGISTER:{rtype}:{name}:{fd}")
cache[rtype].remove(name)
elif cmd == 'PROBE':
pass
Expand All @@ -225,13 +251,15 @@ def main(fd):
'leaked %s objects to clean up at shutdown' %
(len(rtype_cache), rtype))
except Exception:
pass
raise
for name in rtype_cache:
# For some reason the process which created and registered this
# resource has failed to unregister it. Presumably it has
# died. We therefore unlink it.
try:
try:
with open("/dev/stdin", "w") as sys.stdin, open("/dev/stdout", "w") as sys.stdout:
print(f"[pid={os.getpid()}, tid={threading.get_native_id()}] call _CLEANUP_FUNCS {rtype} function for {name} file")
_CLEANUP_FUNCS[rtype](name)
except Exception as e:
warnings.warn('resource_tracker: %r: %s' % (name, e))
Expand Down
31 changes: 31 additions & 0 deletions Lib/multiprocessing/synchronize.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,12 @@ class SemLock(object):
_rand = tempfile._RandomNameSequence()

def __init__(self, kind, value, maxvalue, *, ctx):
# import pdb; pdb.set_trace()
# print(f"new semlock {self.__class__.__name__=} {kind=} {value=} {maxvalue=} {ctx=}")
if ctx is None:
ctx = context._default_context.get_context()
name = ctx.get_start_method()
# print(f"new semlock {name=}")
unlink_now = sys.platform == 'win32' or name == 'fork'
for i in range(100):
try:
Expand All @@ -70,9 +73,12 @@ def __init__(self, kind, value, maxvalue, *, ctx):
if sys.platform != 'win32':
def _after_fork(obj):
obj._semlock._after_fork()
# print(f"Add semlock register_after_fork {self._semlock.name=}, {self.__class__.__name__=}")
util.register_after_fork(self, _after_fork)

if self._semlock.name is not None:
import os, threading
print(f"[pid={os.getpid()}, tid={threading.get_native_id()}] Add semlock to register and finalize {self._semlock.name=}, {self.__class__.__name__=}")
# We only get here if we are on Unix with forking
# disabled. When the object is garbage collected or the
# process shuts down we unlink the semaphore name
Expand All @@ -84,6 +90,29 @@ def _after_fork(obj):
@staticmethod
def _cleanup(name):
from .resource_tracker import unregister
# import pdb; pdb.set_trace()
import traceback, os
tt="".join(traceback.format_stack())
# import pdb; pdb.set_trace()
if "_callTearDown" in tt:
# traceback.print_stack()
print(f"[pid={os.getpid()}, tid={threading.get_native_id()}] case.py(638)run() -> self._callTearDown() -> run sem_unlink(name) on {name}")
elif "_handleModuleTearDown" in tt:
# traceback.print_stack()
# import pdb; pdb.set_trace()
print(f"[pid={os.getpid()}, tid={threading.get_native_id()}] _run_finalizers() -> finalizer() -> run sem_unlink(name) on {name}")
elif "suite.py\", line 122" in tt or "suite.py\", line 123" in tt or "suite.py\", line 124" in tt or "suite.py\", line 125" in tt or "suite.py\", line 126" in tt:
# import pdb; pdb.set_trace()
# import signal
# os.kill(os.getpid(), signal.SIGUSR1)
# traceback.print_stack()
# breakpoint()
print(f"[pid={os.getpid()}, tid={threading.get_native_id()}] suite.py(122)test(result) -> suite.py(84)return self.run(*args, **kwds) -> run sem_unlink(name) on {name}")
# pass
else:
# import pdb; pdb.set_trace()
# traceback.print_stack()
print(f"[pid={os.getpid()}, tid={threading.get_native_id()}] ERROR: UNKNOEN TRACE: run sem_unlink(name) on {name}")
sem_unlink(name)
unregister(name, "semaphore")

Expand All @@ -92,9 +121,11 @@ def _make_methods(self):
self.release = self._semlock.release

def __enter__(self):
print(f"enter semlock {self.__class__.__name__}")
return self._semlock.__enter__()

def __exit__(self, *args):
print(f"exit semlock {self.__class__.__name__}")
return self._semlock.__exit__(*args)

def __getstate__(self):
Expand Down
17 changes: 16 additions & 1 deletion Lib/multiprocessing/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ def _run_after_forkers():
info('after forker raised exception %s', e)

def register_after_fork(obj, func):
# print(f"register_after_fork: {obj=}, {func=}, {id(obj)=}")
_afterfork_registry[(next(_afterfork_counter), id(obj), func)] = obj

#
Expand Down Expand Up @@ -221,6 +222,16 @@ def __call__(self, wr=None,
else:
sub_debug('finalizer calling %s with args %s and kwargs %s',
self._callback, self._args, self._kwargs)
if str(self._callback.__name__) == "_cleanup":
import os, threading
print(
f"[pid={os.getpid()}, tid={threading.get_native_id()}] Finalize called on {self._callback=} {self._args=} {self._kwargs=}")
# import pdb; pdb.set_trace()
import os, threading, _multiprocessing, gc, sys, multiprocessing
# for sl in [x for x in gc.get_objects() if isinstance(x, multiprocessing.queues.Queue)]:
# print(sl, sys.getrefcount(sl))

# import pdb; pdb.set_trace()
res = self._callback(*self._args, **self._kwargs)
self._weakref = self._callback = self._args = \
self._kwargs = self._key = None
Expand Down Expand Up @@ -295,6 +306,7 @@ def _run_finalizers(minpriority=None):
finalizer = _finalizer_registry.get(key)
# key may have been removed from the registry
if finalizer is not None:
# import pdb; pdb.set_trace()
sub_debug('calling %s', finalizer)
try:
finalizer()
Expand Down Expand Up @@ -465,7 +477,6 @@ def close_fds(*fds):
for fd in fds:
os.close(fd)


def _cleanup_tests():
"""Cleanup multiprocessing resources when multiprocessing tests
completed."""
Expand All @@ -479,13 +490,17 @@ def _cleanup_tests():
from multiprocessing import forkserver
forkserver._forkserver._stop()

# import pdb; pdb.set_trace()

# Stop the ResourceTracker process if it's running
print(f"[pid={os.getpid()}, tid={threading.get_native_id()}] Stop the ResourceTracker process if it's running")
from multiprocessing import resource_tracker
resource_tracker._resource_tracker._stop()

# bpo-37421: Explicitly call _run_finalizers() to remove immediately
# temporary directories created by multiprocessing.util.get_temp_dir().
_run_finalizers()

support.gc_collect()

support.reap_children()
27 changes: 26 additions & 1 deletion Lib/test/test_concurrent_futures.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@


from test import support
from test.support import import_helper
from test.support import threading_helper
Expand Down Expand Up @@ -120,6 +122,8 @@ class ExecutorMixin:
executor_kwargs = {}

def setUp(self):
print(f"[pid={os.getpid()}, tid={threading.get_native_id()}] ###########")
print(f"[pid={os.getpid()}, tid={threading.get_native_id()}] ExecutorMixin.__setUp start")
super().setUp()

self.t1 = time.monotonic()
Expand All @@ -132,8 +136,12 @@ def setUp(self):
self.executor = self.executor_type(
max_workers=self.worker_count,
**self.executor_kwargs)
print(f"[pid={os.getpid()}, tid={threading.get_native_id()}] ExecutorMixin.__setUp end")
print(f"[pid={os.getpid()}, tid={threading.get_native_id()}] ###########")

def tearDown(self):
print(f"[pid={os.getpid()}, tid={threading.get_native_id()}] ###########")
print(f"[pid={os.getpid()}, tid={threading.get_native_id()}] ExecutorMixin.tearDown start")
self.executor.shutdown(wait=True)
self.executor = None

Expand All @@ -143,6 +151,8 @@ def tearDown(self):
self.assertLess(dt, 300, "synchronization issue: test lasted too long")

super().tearDown()
print(f"[pid={os.getpid()}, tid={threading.get_native_id()}] ExecutorMixin.tearDown end")
print(f"[pid={os.getpid()}, tid={threading.get_native_id()}] ###########")

def get_context(self):
return mp.get_context(self.ctx)
Expand Down Expand Up @@ -237,7 +247,10 @@ def setUp(self):
if hasattr(self, "ctx"):
# Pass a queue to redirect the child's logging output
self.mp_context = self.get_context()
print(f"[pid={os.getpid()}, tid={threading.get_native_id()}] ################")
print(f"[pid={os.getpid()}, tid={threading.get_native_id()}] FailingInitializerMixin.setUp: Calling self.mp_context.Queue()")
self.log_queue = self.mp_context.Queue()
print(f"[pid={os.getpid()}, tid={threading.get_native_id()}] ################")
self.executor_kwargs = dict(initializer=init_fail,
initargs=(self.log_queue,))
else:
Expand All @@ -248,6 +261,14 @@ def setUp(self):
self.executor_kwargs = dict(initializer=init_fail)
super().setUp()

# def tearDown(self):
# print("###########")
# print("FailingInitializerMixin.tearDown start")
# del self.log_queue
# super().tearDown()
# print("FailingInitializerMixin.tearDown end")
# print("###########")

def test_initializer(self):
with self._assert_logged('ValueError: error in initializer'):
try:
Expand Down Expand Up @@ -275,8 +296,12 @@ def _assert_logged(self, msg):
yield
output = []
try:
# print("############## _assert_logged: while True start")
while True:
output.append(self.log_queue.get_nowait().getMessage())
new_msg = self.log_queue.get_nowait().getMessage()
# print(f"_assert_logged.get_Message: {new_msg}")
output.append(new_msg)
# print("############## _assert_logged: while True end")
except queue.Empty:
pass
else:
Expand Down
2 changes: 1 addition & 1 deletion Lib/unittest/__main__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Main entry point"""

breakpoint()
import sys
if sys.argv[0].endswith("__main__.py"):
import os.path
Expand Down
Loading

0 comments on commit edcafcf

Please sign in to comment.