-
-
Notifications
You must be signed in to change notification settings - Fork 1k
/
lock.py
168 lines (132 loc) · 4.62 KB
/
lock.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
"""holds locking functionality that works across processes"""
from __future__ import absolute_import, unicode_literals
import logging
import os
from abc import ABCMeta, abstractmethod
from contextlib import contextmanager
from threading import Lock, RLock
from filelock import FileLock, Timeout
from six import add_metaclass
from virtualenv.util.path import Path
class _CountedFileLock(FileLock):
def __init__(self, lock_file):
parent = os.path.dirname(lock_file)
if not os.path.isdir(parent):
try:
os.makedirs(parent)
except OSError:
pass
super(_CountedFileLock, self).__init__(lock_file)
self.count = 0
self.thread_safe = RLock()
def acquire(self, timeout=None, poll_intervall=0.05):
with self.thread_safe:
if self.count == 0:
super(_CountedFileLock, self).acquire(timeout=timeout, poll_intervall=poll_intervall)
self.count += 1
def release(self, force=False):
with self.thread_safe:
if self.count == 1:
super(_CountedFileLock, self).release(force=force)
self.count = max(self.count - 1, 0)
_lock_store = {}
_store_lock = Lock()
@add_metaclass(ABCMeta)
class PathLockBase(object):
def __init__(self, folder):
path = Path(folder)
self.path = path.resolve() if path.exists() else path
def __repr__(self):
return "{}({})".format(self.__class__.__name__, self.path)
def __div__(self, other):
return type(self)(self.path / other)
def __truediv__(self, other):
return self.__div__(other)
@abstractmethod
def __enter__(self):
raise NotImplementedError
@abstractmethod
def __exit__(self, exc_type, exc_val, exc_tb):
raise NotImplementedError
@abstractmethod
@contextmanager
def lock_for_key(self, name, no_block=False):
raise NotImplementedError
@abstractmethod
@contextmanager
def non_reentrant_lock_for_key(name):
raise NotImplementedError
class ReentrantFileLock(PathLockBase):
def __init__(self, folder):
super(ReentrantFileLock, self).__init__(folder)
self._lock = None
def _create_lock(self, name=""):
lock_file = str(self.path / "{}.lock".format(name))
with _store_lock:
if lock_file not in _lock_store:
_lock_store[lock_file] = _CountedFileLock(lock_file)
return _lock_store[lock_file]
@staticmethod
def _del_lock(lock):
with _store_lock:
if lock is not None:
with lock.thread_safe:
if lock.count == 0:
_lock_store.pop(lock.lock_file, None)
def __del__(self):
self._del_lock(self._lock)
def __enter__(self):
self._lock = self._create_lock()
self._lock_file(self._lock)
def __exit__(self, exc_type, exc_val, exc_tb):
self._release(self._lock)
def _lock_file(self, lock, no_block=False):
# multiple processes might be trying to get a first lock... so we cannot check if this directory exist without
# a lock, but that lock might then become expensive, and it's not clear where that lock should live.
# Instead here we just ignore if we fail to create the directory.
try:
os.makedirs(str(self.path))
except OSError:
pass
try:
lock.acquire(0.0001)
except Timeout:
if no_block:
raise
logging.debug("lock file %s present, will block until released", lock.lock_file)
lock.release() # release the acquire try from above
lock.acquire()
@staticmethod
def _release(lock):
lock.release()
@contextmanager
def lock_for_key(self, name, no_block=False):
lock = self._create_lock(name)
try:
try:
self._lock_file(lock, no_block)
yield
finally:
self._release(lock)
finally:
self._del_lock(lock)
@contextmanager
def non_reentrant_lock_for_key(self, name):
with _CountedFileLock(str(self.path / "{}.lock".format(name))):
yield
class NoOpFileLock(PathLockBase):
def __enter__(self):
raise NotImplementedError
def __exit__(self, exc_type, exc_val, exc_tb):
raise NotImplementedError
@contextmanager
def lock_for_key(self, name, no_block=False):
yield
@contextmanager
def non_reentrant_lock_for_key(self, name):
yield
__all__ = (
"NoOpFileLock",
"ReentrantFileLock",
"Timeout",
)