Skip to content

Commit

Permalink
BUG: Do not create global multiprocessing RLock
Browse files Browse the repository at this point in the history
This addresses:

> /opt/conda/lib/python3.6/multiprocessing/semaphore_tracker.py:143: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown len(cache))

Reported in Project-MONAI/MONAI#1629

Here we adopt the approach in tqdm:

  tqdm/tqdm#617

to address the similar issues with PyTorch:

  tqdm/tqdm#611

but for our use case. A global set of locks is added to the class on
first use.
  • Loading branch information
thewtex committed May 26, 2021
1 parent 7b402ba commit cfdb502
Showing 1 changed file with 69 additions and 13 deletions.
82 changes: 69 additions & 13 deletions Wrapping/Generators/Python/itk/support/lazy.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,6 @@
# use of the builtin set
from builtins import set as _builtin_set

# Need to use a recursive lock for thread ownership
# within the given thread you can acquire a RLock as often as you like.
# Other threads need to wait until this thread releases the resource again.
from multiprocessing import RLock as _mp_RLock

# A single lock is needed for all lazy loading. This lock blocks
# across all threads until this thread has completed all its imports
# and dependancies. The complex inter-relationship, and the recursive
# nature of imports, makes a more fine-grained locking very difficult
# to implement robustly.
_gbl_lazy_load_lock: _mp_RLock = _mp_RLock()

not_loaded: str = "not loaded"


Expand All @@ -45,6 +33,62 @@ def _lazy_itk_module_reconstructor(module_name, state):

return lazy_module

def _ThreadingRLock(*args, **kwargs):
"""threading RLock"""
try:
from threading import RLock
return RLock(*args, **kwargs)
except (ImportError, OSError):
pass

class ITKLazyLoadLock(object):
"""Need to use a recursive lock for thread ownership
within the given thread you can acquire a RLock as often as you like.
Other threads need to wait until this thread releases the resource again.
A single lock is needed for all lazy loading. This lock blocks
across all threads until this thread has completed all its imports
and dependancies. The complex inter-relationship, and the recursive
nature of imports, makes a more fine-grained locking very difficult
to implement robustly."""

# global thread lock so no setup required for multithreading.
# NB: Do not create multiprocessing lock as it sets the multiprocessing
# context, disallowing `spawn()`/`forkserver()`
th_lock = _ThreadingRLock()

def __init__(self):
cls = type(self)
root_lock = cls.th_lock
if root_lock is not None:
root_lock.acquire()
cls.create_mp_lock()
self.locks = [lk for lk in [cls.mp_lock, cls.th_lock] if lk is not None]
if root_lock is not None:
root_lock.release()

def acquire(self, *a, **k):
for lock in self.locks:
lock.acquire(*a, **k)

def release(self):
for lock in self.locks[::-1]: # Release in inverse order of acquisition
lock.release()

def __enter__(self):
self.acquire()

def __exit__(self, *exc):
self.release()

@classmethod
def create_mp_lock(cls):
if not hasattr(cls, 'mp_lock'):
try:
from multiprocessing import RLock
cls.mp_lock = RLock()
except (ImportError, OSError):
cls.mp_lock = None

class LazyITKModule(types.ModuleType):

Expand All @@ -65,10 +109,22 @@ def __init__(self, name, lazy_attributes):
setattr(self, "itk_base_global_lazy_attributes", lazy_attributes)
setattr(self, "loaded_lazy_modules", _builtin_set())

@classmethod
def set_lock(cls, lock):
"""Set the global lock."""
cls._lock = lock

@classmethod
def get_lock(cls):
"""Get the global lock. Construct it if it does not exist."""
if not hasattr(cls, '_lock'):
cls._lock = ITKLazyLoadLock()
return cls._lock

def __getattribute__(self, attr):
value = types.ModuleType.__getattribute__(self, attr)
if value is not_loaded:
with _gbl_lazy_load_lock: # All but one thread will block here.
with type(self).get_lock(): # All but one thread will block here.
if value is not_loaded:
# Only the first thread needs to run this code, all other blocked threads skip
module = self.__belong_lazy_attributes[attr]
Expand Down

0 comments on commit cfdb502

Please sign in to comment.