-
-
Notifications
You must be signed in to change notification settings - Fork 13
/
test_functools.py
249 lines (203 loc) · 7.46 KB
/
test_functools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
from __future__ import annotations
import copy
import functools
import itertools
import os
import platform
import random
import time
from typing import Literal, TypeVar
from unittest import mock
import pytest
from jaraco.classes import properties
from jaraco.functools import Throttler, method_cache, retry, retry_call
_T = TypeVar("_T")
class TestThrottler:
@pytest.mark.xfail(
'GITHUB_ACTIONS' in os.environ and platform.system() in ('Darwin', 'Windows'),
reason="Performance is heavily throttled on Github Actions Mac/Windows runs",
)
def test_function_throttled(self) -> None:
"""
Ensure the throttler actually throttles calls.
"""
# set up a function to be called
counter = itertools.count()
# set up a version of `next` that is only called 30 times per second
limited_next = Throttler(next, 30)
# for one second, call next as fast as possible
deadline = time.time() + 1
while time.time() < deadline:
limited_next(counter)
# ensure the counter was advanced about 30 times
assert 28 <= next(counter) <= 32
# ensure that another burst of calls after some idle period will also
# get throttled
time.sleep(1)
deadline = time.time() + 1
counter = itertools.count()
while time.time() < deadline:
limited_next(counter)
assert 28 <= next(counter) <= 32
def test_reconstruct_unwraps(self) -> None:
"""
The throttler should be re-usable - if one wants to throttle a
function that's aready throttled, the original function should be
used.
"""
wrapped = Throttler(next, 30)
wrapped_again = Throttler(wrapped, 60)
assert wrapped_again.func is next
assert wrapped_again.max_rate == 60
def test_throttled_method(self) -> None:
class ThrottledMethodClass:
@Throttler
def echo(self, arg: _T) -> _T:
return arg
tmc = ThrottledMethodClass()
assert tmc.echo('foo') == 'foo'
class TestMethodCache:
bad_vers = '(3, 5, 0) <= sys.version_info < (3, 5, 2)'
@pytest.mark.skipif(bad_vers, reason="https://bugs.python.org/issue25447")
def test_deepcopy(self) -> None:
"""
A deepcopy of an object with a method cache should still
succeed.
"""
class ClassUnderTest:
calls = 0
@method_cache
def method(self, value: _T) -> _T:
self.calls += 1
return value
ob = ClassUnderTest()
copy.deepcopy(ob)
ob.method(1)
copy.deepcopy(ob)
def test_special_methods(self) -> None:
"""
Test method_cache with __getitem__ and __getattr__.
"""
class ClassUnderTest:
getitem_calls = 0
getattr_calls = 0
@method_cache
def __getitem__(self, item: _T) -> _T:
self.getitem_calls += 1
return item
@method_cache
def __getattr__(self, name: _T) -> _T:
self.getattr_calls += 1
return name
ob = ClassUnderTest()
# __getitem__
ob[1] + ob[1]
assert ob.getitem_calls == 1
# __getattr__
ob.one + ob.one # type: ignore[operator] # Using ParamSpec on methods is still limited
assert ob.getattr_calls == 1
@pytest.mark.xfail(reason="can't replace property with cache; #6")
def test_property(self) -> None:
"""
Can a method_cache decorated method also be a property?
"""
class ClassUnderTest:
@property
@method_cache
def mything(self) -> float: # pragma: nocover
return random.random()
ob = ClassUnderTest()
assert ob.mything == ob.mything
@pytest.mark.xfail(reason="can't replace property with cache; #6")
def test_non_data_property(self) -> None:
"""
A non-data property also does not work because the property
gets replaced with a method.
"""
class ClassUnderTest:
@properties.NonDataProperty
@method_cache
def mything(self) -> float:
return random.random()
ob = ClassUnderTest()
assert ob.mything == ob.mything
class TestRetry:
def attempt(self, arg: mock.Mock | None = None) -> Literal['Success']:
if next(self.fails_left):
raise ValueError("Failed!")
if arg:
arg.touch()
return "Success"
def set_to_fail(self, times: int) -> None:
self.fails_left = itertools.count(times, -1)
def test_set_to_fail(self) -> None:
"""
Test this test's internal failure mechanism.
"""
self.set_to_fail(times=2)
with pytest.raises(ValueError):
self.attempt()
with pytest.raises(ValueError):
self.attempt()
assert self.attempt() == 'Success'
def test_retry_call_succeeds(self) -> None:
self.set_to_fail(times=2)
res = retry_call(self.attempt, retries=2, trap=ValueError)
assert res == "Success"
def test_retry_call_fails(self) -> None:
"""
Failing more than the number of retries should
raise the underlying error.
"""
self.set_to_fail(times=3)
with pytest.raises(ValueError) as res:
retry_call(self.attempt, retries=2, trap=ValueError)
assert str(res.value) == 'Failed!'
def test_retry_multiple_exceptions(self) -> None:
self.set_to_fail(times=2)
errors = ValueError, NameError
res = retry_call(self.attempt, retries=2, trap=errors)
assert res == "Success"
def test_retry_exception_superclass(self) -> None:
self.set_to_fail(times=2)
res = retry_call(self.attempt, retries=2, trap=Exception)
assert res == "Success"
def test_default_traps_nothing(self) -> None:
self.set_to_fail(times=1)
with pytest.raises(ValueError):
retry_call(self.attempt, retries=1)
def test_default_does_not_retry(self) -> None:
self.set_to_fail(times=1)
with pytest.raises(ValueError):
retry_call(self.attempt, trap=Exception)
def test_cleanup_called_on_exception(self) -> None:
calls = random.randint(1, 10)
cleanup = mock.Mock()
self.set_to_fail(times=calls)
retry_call(self.attempt, retries=calls, cleanup=cleanup, trap=Exception)
assert cleanup.call_count == calls
cleanup.assert_called_with()
def test_infinite_retries(self) -> None:
self.set_to_fail(times=999)
cleanup = mock.Mock()
retry_call(self.attempt, retries=float('inf'), cleanup=cleanup, trap=Exception)
assert cleanup.call_count == 999
def test_with_arg(self) -> None:
self.set_to_fail(times=0)
arg = mock.Mock()
bound = functools.partial(self.attempt, arg)
res = retry_call(bound)
assert res == 'Success'
assert arg.touch.called
def test_decorator(self) -> None:
self.set_to_fail(times=1)
attempt = retry(retries=1, trap=Exception)(self.attempt)
res = attempt()
assert res == "Success"
def test_decorator_with_arg(self) -> None:
self.set_to_fail(times=0)
attempt = retry()(self.attempt)
arg = mock.Mock()
res = attempt(arg)
assert res == 'Success'
assert arg.touch.called