Skip to content

Commit

Permalink
refactor(utility): remove multiprocess lock and add pid to cache path
Browse files Browse the repository at this point in the history
The default start method of multiprocessing on Windows is "spawn", which
need to use `main` to protect the entry point. And "fork" is not
supported on Windows.

Beside, the cache path need to add pid to avoid conflict "mv" operation.

PR Closed: #1188
  • Loading branch information
Lee-000 committed Jan 5, 2022
1 parent 9868823 commit f60d22c
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 62 deletions.
70 changes: 11 additions & 59 deletions tensorbay/utility/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@


from collections import defaultdict
from contextlib import contextmanager
from functools import wraps
from multiprocessing import Manager
from threading import Lock
from typing import Any, Callable, DefaultDict, Dict, Iterator, Sequence, Type, TypeVar, Union
from typing import Any, Callable, DefaultDict, Sequence, Type, TypeVar, Union

import numpy as np

Expand Down Expand Up @@ -51,16 +49,7 @@ def __eq__(self, other: object) -> bool:
return self.__dict__ == other.__dict__


@contextmanager
def _acquire(lock: Lock) -> Iterator[bool]:
acquire = lock.acquire(blocking=False)
yield acquire
if not acquire:
lock.acquire()
lock.release()


thread_locks: DefaultDict[int, Lock] = defaultdict(Lock)
locks: DefaultDict[int, Lock] = defaultdict(Lock)


def locked(func: _CallableWithoutReturnValue) -> _CallableWithoutReturnValue:
Expand All @@ -77,52 +66,15 @@ def locked(func: _CallableWithoutReturnValue) -> _CallableWithoutReturnValue:
@wraps(func)
def wrapper(self: Any, *arg: Any, **kwargs: Any) -> None:
key = id(self)
lock = thread_locks[key]
with _acquire(lock) as success:
if success:
lock = locks[key]
acquire = lock.acquire(blocking=False)
try:
if acquire:
func(self, *arg, **kwargs)
del thread_locks[key]
del locks[key]
else:
lock.acquire()
finally:
lock.release()

return wrapper # type: ignore[return-value]


class ProcessLocked: # pylint: disable=too-few-public-methods
"""A decorator to add lock for methods called from different processes.
Arguments:
attr_name: The name of the attr to be taken as the key of the lock.
"""

_manager = Manager()

process_locks: Dict[str, Lock] = _manager.dict()

def __init__(self, attr_name: str) -> None:
self._attr_name = attr_name

def __call__(self, func: _CallableWithoutReturnValue) -> _CallableWithoutReturnValue:
"""Return the locked function.
Arguments:
func: The function to add lock.
Returns:
The locked function.
"""

@wraps(func)
def wrapper(func_self: Any, *arg: Any, **kwargs: Any) -> None:
key = getattr(func_self, self._attr_name)
# https://github.com/PyCQA/pylint/issues/3313
lock = self.process_locks.setdefault(
key, self._manager.Lock() # pylint: disable=no-member
)

with _acquire(lock) as success:
if success:
func(func_self, *arg, **kwargs)
del self.process_locks[key]

return wrapper # type: ignore[return-value]
4 changes: 1 addition & 3 deletions tensorbay/utility/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from _io import BufferedReader

from tensorbay.exception import ResponseError
from tensorbay.utility.common import ProcessLocked
from tensorbay.utility.repr import ReprMixin
from tensorbay.utility.requests import UserResponse, config, get_session

Expand Down Expand Up @@ -180,11 +179,10 @@ def _urlopen(self) -> UserResponse:
)
raise

@ProcessLocked("cache_path")
def _write_cache(self, cache_path: str) -> None:
dirname = os.path.dirname(cache_path)
os.makedirs(dirname, exist_ok=True)
temp_path = f"{cache_path}.tensorbay.downloading"
temp_path = f"{cache_path}.tensorbay.downloading.{os.getpid()}"
with self._urlopen() as fp:
with open(temp_path, "wb") as cache:
cache.write(fp.read())
Expand Down

0 comments on commit f60d22c

Please sign in to comment.