-
-
Notifications
You must be signed in to change notification settings - Fork 2.7k
/
fixtures.py
1713 lines (1495 loc) · 65.5 KB
/
fixtures.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import dataclasses
import functools
import inspect
import os
import sys
import warnings
from collections import defaultdict
from collections import deque
from contextlib import suppress
from pathlib import Path
from types import TracebackType
from typing import Any
from typing import Callable
from typing import cast
from typing import Dict
from typing import Generator
from typing import Generic
from typing import Iterable
from typing import Iterator
from typing import List
from typing import MutableMapping
from typing import NoReturn
from typing import Optional
from typing import Sequence
from typing import Set
from typing import Tuple
from typing import Type
from typing import TYPE_CHECKING
from typing import TypeVar
from typing import Union
import _pytest
from _pytest import nodes
from _pytest._code import getfslineno
from _pytest._code.code import FormattedExcinfo
from _pytest._code.code import TerminalRepr
from _pytest._io import TerminalWriter
from _pytest.compat import _format_args
from _pytest.compat import _PytestWrapper
from _pytest.compat import assert_never
from _pytest.compat import final
from _pytest.compat import get_real_func
from _pytest.compat import get_real_method
from _pytest.compat import getfuncargnames
from _pytest.compat import getimfunc
from _pytest.compat import getlocation
from _pytest.compat import is_generator
from _pytest.compat import NOTSET
from _pytest.compat import NotSetType
from _pytest.compat import overload
from _pytest.compat import safe_getattr
from _pytest.config import _PluggyPlugin
from _pytest.config import Config
from _pytest.config.argparsing import Parser
from _pytest.deprecated import check_ispytest
from _pytest.deprecated import YIELD_FIXTURE
from _pytest.mark import Mark
from _pytest.mark import ParameterSet
from _pytest.mark.structures import MarkDecorator
from _pytest.outcomes import fail
from _pytest.outcomes import skip
from _pytest.outcomes import TEST_OUTCOME
from _pytest.pathlib import absolutepath
from _pytest.pathlib import bestrelpath
from _pytest.scope import HIGH_SCOPES
from _pytest.scope import Scope
from _pytest.stash import StashKey
if TYPE_CHECKING:
from typing import Deque
from _pytest.scope import _ScopeName
from _pytest.main import Session
from _pytest.python import CallSpec2
from _pytest.python import Metafunc
# The value of the fixture -- return/yield of the fixture function (type variable).
FixtureValue = TypeVar("FixtureValue")
# The type of the fixture function (type variable).
FixtureFunction = TypeVar("FixtureFunction", bound=Callable[..., object])
# The type of a fixture function (type alias generic in fixture value).
_FixtureFunc = Union[
Callable[..., FixtureValue], Callable[..., Generator[FixtureValue, None, None]]
]
# The type of FixtureDef.cached_result (type alias generic in fixture value).
_FixtureCachedResult = Union[
Tuple[
# The result.
FixtureValue,
# Cache key.
object,
None,
],
Tuple[
None,
# Cache key.
object,
# Exc info if raised.
Tuple[Type[BaseException], BaseException, TracebackType],
],
]
@dataclasses.dataclass(frozen=True)
class PseudoFixtureDef(Generic[FixtureValue]):
cached_result: "_FixtureCachedResult[FixtureValue]"
_scope: Scope
def pytest_sessionstart(session: "Session") -> None:
session._fixturemanager = FixtureManager(session)
def get_scope_package(
node: nodes.Item,
fixturedef: "FixtureDef[object]",
) -> Optional[Union[nodes.Item, nodes.Collector]]:
from _pytest.python import Package
current: Optional[Union[nodes.Item, nodes.Collector]] = node
fixture_package_name = "{}/{}".format(fixturedef.baseid, "__init__.py")
while current and (
not isinstance(current, Package) or fixture_package_name != current.nodeid
):
current = current.parent # type: ignore[assignment]
if current is None:
return node.session
return current
def get_scope_node(
node: nodes.Node, scope: Scope
) -> Optional[Union[nodes.Item, nodes.Collector]]:
import _pytest.python
if scope is Scope.Function:
return node.getparent(nodes.Item)
elif scope is Scope.Class:
return node.getparent(_pytest.python.Class)
elif scope is Scope.Module:
return node.getparent(_pytest.python.Module)
elif scope is Scope.Package:
return node.getparent(_pytest.python.Package)
elif scope is Scope.Session:
return node.getparent(_pytest.main.Session)
else:
assert_never(scope)
# Used for storing artificial fixturedefs for direct parametrization.
name2pseudofixturedef_key = StashKey[Dict[str, "FixtureDef[Any]"]]()
def add_funcarg_pseudo_fixture_def(
collector: nodes.Collector, metafunc: "Metafunc", fixturemanager: "FixtureManager"
) -> None:
# This function will transform all collected calls to functions
# if they use direct funcargs (i.e. direct parametrization)
# because we want later test execution to be able to rely on
# an existing FixtureDef structure for all arguments.
# XXX we can probably avoid this algorithm if we modify CallSpec2
# to directly care for creating the fixturedefs within its methods.
if not metafunc._calls[0].funcargs:
# This function call does not have direct parametrization.
return
# Collect funcargs of all callspecs into a list of values.
arg2params: Dict[str, List[object]] = {}
arg2scope: Dict[str, Scope] = {}
for callspec in metafunc._calls:
for argname, argvalue in callspec.funcargs.items():
assert argname not in callspec.params
callspec.params[argname] = argvalue
arg2params_list = arg2params.setdefault(argname, [])
callspec.indices[argname] = len(arg2params_list)
arg2params_list.append(argvalue)
if argname not in arg2scope:
scope = callspec._arg2scope.get(argname, Scope.Function)
arg2scope[argname] = scope
callspec.funcargs.clear()
# Register artificial FixtureDef's so that later at test execution
# time we can rely on a proper FixtureDef to exist for fixture setup.
arg2fixturedefs = metafunc._arg2fixturedefs
for argname, valuelist in arg2params.items():
# If we have a scope that is higher than function, we need
# to make sure we only ever create an according fixturedef on
# a per-scope basis. We thus store and cache the fixturedef on the
# node related to the scope.
scope = arg2scope[argname]
node = None
if scope is not Scope.Function:
node = get_scope_node(collector, scope)
if node is None:
assert scope is Scope.Class and isinstance(
collector, _pytest.python.Module
)
# Use module-level collector for class-scope (for now).
node = collector
if node is None:
name2pseudofixturedef = None
else:
default: Dict[str, FixtureDef[Any]] = {}
name2pseudofixturedef = node.stash.setdefault(
name2pseudofixturedef_key, default
)
if name2pseudofixturedef is not None and argname in name2pseudofixturedef:
arg2fixturedefs[argname] = [name2pseudofixturedef[argname]]
else:
fixturedef = FixtureDef(
fixturemanager=fixturemanager,
baseid="",
argname=argname,
func=get_direct_param_fixture_func,
scope=arg2scope[argname],
params=valuelist,
unittest=False,
ids=None,
)
arg2fixturedefs[argname] = [fixturedef]
if name2pseudofixturedef is not None:
name2pseudofixturedef[argname] = fixturedef
def getfixturemarker(obj: object) -> Optional["FixtureFunctionMarker"]:
"""Return fixturemarker or None if it doesn't exist or raised
exceptions."""
return cast(
Optional[FixtureFunctionMarker],
safe_getattr(obj, "_pytestfixturefunction", None),
)
# Parametrized fixture key, helper alias for code below.
_Key = Tuple[object, ...]
def get_parametrized_fixture_keys(item: nodes.Item, scope: Scope) -> Iterator[_Key]:
"""Return list of keys for all parametrized arguments which match
the specified scope."""
assert scope is not Scope.Function
try:
callspec = item.callspec # type: ignore[attr-defined]
except AttributeError:
pass
else:
cs: CallSpec2 = callspec
# cs.indices.items() is random order of argnames. Need to
# sort this so that different calls to
# get_parametrized_fixture_keys will be deterministic.
for argname, param_index in sorted(cs.indices.items()):
if cs._arg2scope[argname] != scope:
continue
if scope is Scope.Session:
key: _Key = (argname, param_index)
elif scope is Scope.Package:
key = (argname, param_index, item.path.parent)
elif scope is Scope.Module:
key = (argname, param_index, item.path)
elif scope is Scope.Class:
item_cls = item.cls # type: ignore[attr-defined]
key = (argname, param_index, item.path, item_cls)
else:
assert_never(scope)
yield key
# Algorithm for sorting on a per-parametrized resource setup basis.
# It is called for Session scope first and performs sorting
# down to the lower scopes such as to minimize number of "high scope"
# setups and teardowns.
def reorder_items(items: Sequence[nodes.Item]) -> List[nodes.Item]:
argkeys_cache: Dict[Scope, Dict[nodes.Item, Dict[_Key, None]]] = {}
items_by_argkey: Dict[Scope, Dict[_Key, Deque[nodes.Item]]] = {}
for scope in HIGH_SCOPES:
d: Dict[nodes.Item, Dict[_Key, None]] = {}
argkeys_cache[scope] = d
item_d: Dict[_Key, Deque[nodes.Item]] = defaultdict(deque)
items_by_argkey[scope] = item_d
for item in items:
keys = dict.fromkeys(get_parametrized_fixture_keys(item, scope), None)
if keys:
d[item] = keys
for key in keys:
item_d[key].append(item)
items_dict = dict.fromkeys(items, None)
return list(
reorder_items_atscope(items_dict, argkeys_cache, items_by_argkey, Scope.Session)
)
def fix_cache_order(
item: nodes.Item,
argkeys_cache: Dict[Scope, Dict[nodes.Item, Dict[_Key, None]]],
items_by_argkey: Dict[Scope, Dict[_Key, "Deque[nodes.Item]"]],
) -> None:
for scope in HIGH_SCOPES:
for key in argkeys_cache[scope].get(item, []):
items_by_argkey[scope][key].appendleft(item)
def reorder_items_atscope(
items: Dict[nodes.Item, None],
argkeys_cache: Dict[Scope, Dict[nodes.Item, Dict[_Key, None]]],
items_by_argkey: Dict[Scope, Dict[_Key, "Deque[nodes.Item]"]],
scope: Scope,
) -> Dict[nodes.Item, None]:
if scope is Scope.Function or len(items) < 3:
return items
ignore: Set[Optional[_Key]] = set()
items_deque = deque(items)
items_done: Dict[nodes.Item, None] = {}
scoped_items_by_argkey = items_by_argkey[scope]
scoped_argkeys_cache = argkeys_cache[scope]
while items_deque:
no_argkey_group: Dict[nodes.Item, None] = {}
slicing_argkey = None
while items_deque:
item = items_deque.popleft()
if item in items_done or item in no_argkey_group:
continue
argkeys = dict.fromkeys(
(k for k in scoped_argkeys_cache.get(item, []) if k not in ignore), None
)
if not argkeys:
no_argkey_group[item] = None
else:
slicing_argkey, _ = argkeys.popitem()
# We don't have to remove relevant items from later in the
# deque because they'll just be ignored.
matching_items = [
i for i in scoped_items_by_argkey[slicing_argkey] if i in items
]
for i in reversed(matching_items):
fix_cache_order(i, argkeys_cache, items_by_argkey)
items_deque.appendleft(i)
break
if no_argkey_group:
no_argkey_group = reorder_items_atscope(
no_argkey_group, argkeys_cache, items_by_argkey, scope.next_lower()
)
for item in no_argkey_group:
items_done[item] = None
ignore.add(slicing_argkey)
return items_done
def get_direct_param_fixture_func(request: "FixtureRequest") -> Any:
return request.param
@dataclasses.dataclass
class FuncFixtureInfo:
__slots__ = ("argnames", "initialnames", "names_closure", "name2fixturedefs")
# Original function argument names.
argnames: Tuple[str, ...]
# Argnames that function immediately requires. These include argnames +
# fixture names specified via usefixtures and via autouse=True in fixture
# definitions.
initialnames: Tuple[str, ...]
names_closure: List[str]
name2fixturedefs: Dict[str, Sequence["FixtureDef[Any]"]]
def prune_dependency_tree(self) -> None:
"""Recompute names_closure from initialnames and name2fixturedefs.
Can only reduce names_closure, which means that the new closure will
always be a subset of the old one. The order is preserved.
This method is needed because direct parametrization may shadow some
of the fixtures that were included in the originally built dependency
tree. In this way the dependency tree can get pruned, and the closure
of argnames may get reduced.
"""
closure: Set[str] = set()
working_set = set(self.initialnames)
while working_set:
argname = working_set.pop()
# Argname may be smth not included in the original names_closure,
# in which case we ignore it. This currently happens with pseudo
# FixtureDefs which wrap 'get_direct_param_fixture_func(request)'.
# So they introduce the new dependency 'request' which might have
# been missing in the original tree (closure).
if argname not in closure and argname in self.names_closure:
closure.add(argname)
if argname in self.name2fixturedefs:
working_set.update(self.name2fixturedefs[argname][-1].argnames)
self.names_closure[:] = sorted(closure, key=self.names_closure.index)
class FixtureRequest:
"""A request for a fixture from a test or fixture function.
A request object gives access to the requesting test context and has
an optional ``param`` attribute in case the fixture is parametrized
indirectly.
"""
def __init__(self, pyfuncitem, *, _ispytest: bool = False) -> None:
check_ispytest(_ispytest)
self._pyfuncitem = pyfuncitem
#: Fixture for which this request is being performed.
self.fixturename: Optional[str] = None
self._scope = Scope.Function
self._fixture_defs: Dict[str, FixtureDef[Any]] = {}
fixtureinfo: FuncFixtureInfo = pyfuncitem._fixtureinfo
self._arg2fixturedefs = fixtureinfo.name2fixturedefs.copy()
self._arg2index: Dict[str, int] = {}
self._fixturemanager: FixtureManager = pyfuncitem.session._fixturemanager
# Notes on the type of `param`:
# -`request.param` is only defined in parametrized fixtures, and will raise
# AttributeError otherwise. Python typing has no notion of "undefined", so
# this cannot be reflected in the type.
# - Technically `param` is only (possibly) defined on SubRequest, not
# FixtureRequest, but the typing of that is still in flux so this cheats.
# - In the future we might consider using a generic for the param type, but
# for now just using Any.
self.param: Any
@property
def scope(self) -> "_ScopeName":
"""Scope string, one of "function", "class", "module", "package", "session"."""
return self._scope.value
@property
def fixturenames(self) -> List[str]:
"""Names of all active fixtures in this request."""
result = list(self._pyfuncitem._fixtureinfo.names_closure)
result.extend(set(self._fixture_defs).difference(result))
return result
@property
def node(self):
"""Underlying collection node (depends on current request scope)."""
scope = self._scope
if scope is Scope.Function:
# This might also be a non-function Item despite its attribute name.
node: Optional[Union[nodes.Item, nodes.Collector]] = self._pyfuncitem
elif scope is Scope.Package:
# FIXME: _fixturedef is not defined on FixtureRequest (this class),
# but on FixtureRequest (a subclass).
node = get_scope_package(self._pyfuncitem, self._fixturedef) # type: ignore[attr-defined]
else:
node = get_scope_node(self._pyfuncitem, scope)
if node is None and scope is Scope.Class:
# Fallback to function item itself.
node = self._pyfuncitem
assert node, 'Could not obtain a node for scope "{}" for function {!r}'.format(
scope, self._pyfuncitem
)
return node
def _getnextfixturedef(self, argname: str) -> "FixtureDef[Any]":
fixturedefs = self._arg2fixturedefs.get(argname, None)
if fixturedefs is None:
# We arrive here because of a dynamic call to
# getfixturevalue(argname) usage which was naturally
# not known at parsing/collection time.
assert self._pyfuncitem.parent is not None
parentid = self._pyfuncitem.parent.nodeid
fixturedefs = self._fixturemanager.getfixturedefs(argname, parentid)
# TODO: Fix this type ignore. Either add assert or adjust types.
# Can this be None here?
self._arg2fixturedefs[argname] = fixturedefs # type: ignore[assignment]
# fixturedefs list is immutable so we maintain a decreasing index.
index = self._arg2index.get(argname, 0) - 1
if fixturedefs is None or (-index > len(fixturedefs)):
raise FixtureLookupError(argname, self)
self._arg2index[argname] = index
return fixturedefs[index]
@property
def config(self) -> Config:
"""The pytest config object associated with this request."""
return self._pyfuncitem.config # type: ignore[no-any-return]
@property
def function(self):
"""Test function object if the request has a per-function scope."""
if self.scope != "function":
raise AttributeError(
f"function not available in {self.scope}-scoped context"
)
return self._pyfuncitem.obj
@property
def cls(self):
"""Class (can be None) where the test function was collected."""
if self.scope not in ("class", "function"):
raise AttributeError(f"cls not available in {self.scope}-scoped context")
clscol = self._pyfuncitem.getparent(_pytest.python.Class)
if clscol:
return clscol.obj
@property
def instance(self):
"""Instance (can be None) on which test function was collected."""
# unittest support hack, see _pytest.unittest.TestCaseFunction.
try:
return self._pyfuncitem._testcase
except AttributeError:
function = getattr(self, "function", None)
return getattr(function, "__self__", None)
@property
def module(self):
"""Python module object where the test function was collected."""
if self.scope not in ("function", "class", "module"):
raise AttributeError(f"module not available in {self.scope}-scoped context")
return self._pyfuncitem.getparent(_pytest.python.Module).obj
@property
def path(self) -> Path:
"""Path where the test function was collected."""
if self.scope not in ("function", "class", "module", "package"):
raise AttributeError(f"path not available in {self.scope}-scoped context")
# TODO: Remove ignore once _pyfuncitem is properly typed.
return self._pyfuncitem.path # type: ignore
@property
def keywords(self) -> MutableMapping[str, Any]:
"""Keywords/markers dictionary for the underlying node."""
node: nodes.Node = self.node
return node.keywords
@property
def session(self) -> "Session":
"""Pytest session object."""
return self._pyfuncitem.session # type: ignore[no-any-return]
def addfinalizer(self, finalizer: Callable[[], object]) -> None:
"""Add finalizer/teardown function to be called without arguments after
the last test within the requesting test context finished execution."""
# XXX usually this method is shadowed by fixturedef specific ones.
self.node.addfinalizer(finalizer)
def applymarker(self, marker: Union[str, MarkDecorator]) -> None:
"""Apply a marker to a single test function invocation.
This method is useful if you don't want to have a keyword/marker
on all function invocations.
:param marker:
An object created by a call to ``pytest.mark.NAME(...)``.
"""
self.node.add_marker(marker)
def raiseerror(self, msg: Optional[str]) -> NoReturn:
"""Raise a FixtureLookupError exception.
:param msg:
An optional custom error message.
"""
raise self._fixturemanager.FixtureLookupError(None, self, msg)
def _fillfixtures(self) -> None:
item = self._pyfuncitem
fixturenames = getattr(item, "fixturenames", self.fixturenames)
for argname in fixturenames:
if argname not in item.funcargs:
item.funcargs[argname] = self.getfixturevalue(argname)
def getfixturevalue(self, argname: str) -> Any:
"""Dynamically run a named fixture function.
Declaring fixtures via function argument is recommended where possible.
But if you can only decide whether to use another fixture at test
setup time, you may use this function to retrieve it inside a fixture
or test function body.
This method can be used during the test setup phase or the test run
phase, but during the test teardown phase a fixture's value may not
be available.
:param argname:
The fixture name.
:raises pytest.FixtureLookupError:
If the given fixture could not be found.
"""
fixturedef = self._get_active_fixturedef(argname)
assert fixturedef.cached_result is not None, (
f'The fixture value for "{argname}" is not available. '
"This can happen when the fixture has already been torn down."
)
return fixturedef.cached_result[0]
def _get_active_fixturedef(
self, argname: str
) -> Union["FixtureDef[object]", PseudoFixtureDef[object]]:
try:
return self._fixture_defs[argname]
except KeyError:
try:
fixturedef = self._getnextfixturedef(argname)
except FixtureLookupError:
if argname == "request":
cached_result = (self, [0], None)
return PseudoFixtureDef(cached_result, Scope.Function)
raise
# Remove indent to prevent the python3 exception
# from leaking into the call.
self._compute_fixture_value(fixturedef)
self._fixture_defs[argname] = fixturedef
return fixturedef
def _get_fixturestack(self) -> List["FixtureDef[Any]"]:
current = self
values: List[FixtureDef[Any]] = []
while isinstance(current, SubRequest):
values.append(current._fixturedef) # type: ignore[has-type]
current = current._parent_request
values.reverse()
return values
def _compute_fixture_value(self, fixturedef: "FixtureDef[object]") -> None:
"""Create a SubRequest based on "self" and call the execute method
of the given FixtureDef object.
This will force the FixtureDef object to throw away any previous
results and compute a new fixture value, which will be stored into
the FixtureDef object itself.
"""
# prepare a subrequest object before calling fixture function
# (latter managed by fixturedef)
argname = fixturedef.argname
funcitem = self._pyfuncitem
scope = fixturedef._scope
try:
callspec = funcitem.callspec
except AttributeError:
callspec = None
if callspec is not None and argname in callspec.params:
param = callspec.params[argname]
param_index = callspec.indices[argname]
# If a parametrize invocation set a scope it will override
# the static scope defined with the fixture function.
with suppress(KeyError):
scope = callspec._arg2scope[argname]
else:
param = NOTSET
param_index = 0
has_params = fixturedef.params is not None
fixtures_not_supported = getattr(funcitem, "nofuncargs", False)
if has_params and fixtures_not_supported:
msg = (
"{name} does not support fixtures, maybe unittest.TestCase subclass?\n"
"Node id: {nodeid}\n"
"Function type: {typename}"
).format(
name=funcitem.name,
nodeid=funcitem.nodeid,
typename=type(funcitem).__name__,
)
fail(msg, pytrace=False)
if has_params:
frame = inspect.stack()[3]
frameinfo = inspect.getframeinfo(frame[0])
source_path = absolutepath(frameinfo.filename)
source_lineno = frameinfo.lineno
try:
source_path_str = str(
source_path.relative_to(funcitem.config.rootpath)
)
except ValueError:
source_path_str = str(source_path)
msg = (
"The requested fixture has no parameter defined for test:\n"
" {}\n\n"
"Requested fixture '{}' defined in:\n{}"
"\n\nRequested here:\n{}:{}".format(
funcitem.nodeid,
fixturedef.argname,
getlocation(fixturedef.func, funcitem.config.rootpath),
source_path_str,
source_lineno,
)
)
fail(msg, pytrace=False)
subrequest = SubRequest(
self, scope, param, param_index, fixturedef, _ispytest=True
)
# Check if a higher-level scoped fixture accesses a lower level one.
subrequest._check_scope(argname, self._scope, scope)
try:
# Call the fixture function.
fixturedef.execute(request=subrequest)
finally:
self._schedule_finalizers(fixturedef, subrequest)
def _schedule_finalizers(
self, fixturedef: "FixtureDef[object]", subrequest: "SubRequest"
) -> None:
# If fixture function failed it might have registered finalizers.
subrequest.node.addfinalizer(lambda: fixturedef.finish(request=subrequest))
def _check_scope(
self,
argname: str,
invoking_scope: Scope,
requested_scope: Scope,
) -> None:
if argname == "request":
return
if invoking_scope > requested_scope:
# Try to report something helpful.
text = "\n".join(self._factorytraceback())
fail(
f"ScopeMismatch: You tried to access the {requested_scope.value} scoped "
f"fixture {argname} with a {invoking_scope.value} scoped request object, "
f"involved factories:\n{text}",
pytrace=False,
)
def _factorytraceback(self) -> List[str]:
lines = []
for fixturedef in self._get_fixturestack():
factory = fixturedef.func
fs, lineno = getfslineno(factory)
if isinstance(fs, Path):
session: Session = self._pyfuncitem.session
p = bestrelpath(session.path, fs)
else:
p = fs
args = _format_args(factory)
lines.append("%s:%d: def %s%s" % (p, lineno + 1, factory.__name__, args))
return lines
def __repr__(self) -> str:
return "<FixtureRequest for %r>" % (self.node)
@final
class SubRequest(FixtureRequest):
"""A sub request for handling getting a fixture from a test function/fixture."""
def __init__(
self,
request: "FixtureRequest",
scope: Scope,
param: Any,
param_index: int,
fixturedef: "FixtureDef[object]",
*,
_ispytest: bool = False,
) -> None:
check_ispytest(_ispytest)
self._parent_request = request
self.fixturename = fixturedef.argname
if param is not NOTSET:
self.param = param
self.param_index = param_index
self._scope = scope
self._fixturedef = fixturedef
self._pyfuncitem = request._pyfuncitem
self._fixture_defs = request._fixture_defs
self._arg2fixturedefs = request._arg2fixturedefs
self._arg2index = request._arg2index
self._fixturemanager = request._fixturemanager
def __repr__(self) -> str:
return f"<SubRequest {self.fixturename!r} for {self._pyfuncitem!r}>"
def addfinalizer(self, finalizer: Callable[[], object]) -> None:
"""Add finalizer/teardown function to be called without arguments after
the last test within the requesting test context finished execution."""
self._fixturedef.addfinalizer(finalizer)
def _schedule_finalizers(
self, fixturedef: "FixtureDef[object]", subrequest: "SubRequest"
) -> None:
# If the executing fixturedef was not explicitly requested in the argument list (via
# getfixturevalue inside the fixture call) then ensure this fixture def will be finished
# first.
if fixturedef.argname not in self.fixturenames:
fixturedef.addfinalizer(
functools.partial(self._fixturedef.finish, request=self)
)
super()._schedule_finalizers(fixturedef, subrequest)
@final
class FixtureLookupError(LookupError):
"""Could not return a requested fixture (missing or invalid)."""
def __init__(
self, argname: Optional[str], request: FixtureRequest, msg: Optional[str] = None
) -> None:
self.argname = argname
self.request = request
self.fixturestack = request._get_fixturestack()
self.msg = msg
def formatrepr(self) -> "FixtureLookupErrorRepr":
tblines: List[str] = []
addline = tblines.append
stack = [self.request._pyfuncitem.obj]
stack.extend(map(lambda x: x.func, self.fixturestack))
msg = self.msg
if msg is not None:
# The last fixture raise an error, let's present
# it at the requesting side.
stack = stack[:-1]
for function in stack:
fspath, lineno = getfslineno(function)
try:
lines, _ = inspect.getsourcelines(get_real_func(function))
except (OSError, IndexError, TypeError):
error_msg = "file %s, line %s: source code not available"
addline(error_msg % (fspath, lineno + 1))
else:
addline(f"file {fspath}, line {lineno + 1}")
for i, line in enumerate(lines):
line = line.rstrip()
addline(" " + line)
if line.lstrip().startswith("def"):
break
if msg is None:
fm = self.request._fixturemanager
available = set()
parentid = self.request._pyfuncitem.parent.nodeid
for name, fixturedefs in fm._arg2fixturedefs.items():
faclist = list(fm._matchfactories(fixturedefs, parentid))
if faclist:
available.add(name)
if self.argname in available:
msg = " recursive dependency involving fixture '{}' detected".format(
self.argname
)
else:
msg = f"fixture '{self.argname}' not found"
msg += "\n available fixtures: {}".format(", ".join(sorted(available)))
msg += "\n use 'pytest --fixtures [testpath]' for help on them."
return FixtureLookupErrorRepr(fspath, lineno, tblines, msg, self.argname)
class FixtureLookupErrorRepr(TerminalRepr):
def __init__(
self,
filename: Union[str, "os.PathLike[str]"],
firstlineno: int,
tblines: Sequence[str],
errorstring: str,
argname: Optional[str],
) -> None:
self.tblines = tblines
self.errorstring = errorstring
self.filename = filename
self.firstlineno = firstlineno
self.argname = argname
def toterminal(self, tw: TerminalWriter) -> None:
# tw.line("FixtureLookupError: %s" %(self.argname), red=True)
for tbline in self.tblines:
tw.line(tbline.rstrip())
lines = self.errorstring.split("\n")
if lines:
tw.line(
f"{FormattedExcinfo.fail_marker} {lines[0].strip()}",
red=True,
)
for line in lines[1:]:
tw.line(
f"{FormattedExcinfo.flow_marker} {line.strip()}",
red=True,
)
tw.line()
tw.line("%s:%d" % (os.fspath(self.filename), self.firstlineno + 1))
def fail_fixturefunc(fixturefunc, msg: str) -> NoReturn:
fs, lineno = getfslineno(fixturefunc)
location = f"{fs}:{lineno + 1}"
source = _pytest._code.Source(fixturefunc)
fail(msg + ":\n\n" + str(source.indent()) + "\n" + location, pytrace=False)
def call_fixture_func(
fixturefunc: "_FixtureFunc[FixtureValue]", request: FixtureRequest, kwargs
) -> FixtureValue:
if is_generator(fixturefunc):
fixturefunc = cast(
Callable[..., Generator[FixtureValue, None, None]], fixturefunc
)
generator = fixturefunc(**kwargs)
try:
fixture_result = next(generator)
except StopIteration:
raise ValueError(f"{request.fixturename} did not yield a value") from None
finalizer = functools.partial(_teardown_yield_fixture, fixturefunc, generator)
request.addfinalizer(finalizer)
else:
fixturefunc = cast(Callable[..., FixtureValue], fixturefunc)
fixture_result = fixturefunc(**kwargs)
return fixture_result
def _teardown_yield_fixture(fixturefunc, it) -> None:
"""Execute the teardown of a fixture function by advancing the iterator
after the yield and ensure the iteration ends (if not it means there is
more than one yield in the function)."""
try:
next(it)
except StopIteration:
pass
else:
fail_fixturefunc(fixturefunc, "fixture function has more than one 'yield'")
def _eval_scope_callable(
scope_callable: "Callable[[str, Config], _ScopeName]",
fixture_name: str,
config: Config,
) -> "_ScopeName":
try:
# Type ignored because there is no typing mechanism to specify
# keyword arguments, currently.
result = scope_callable(fixture_name=fixture_name, config=config) # type: ignore[call-arg]
except Exception as e:
raise TypeError(
"Error evaluating {} while defining fixture '{}'.\n"
"Expected a function with the signature (*, fixture_name, config)".format(
scope_callable, fixture_name
)
) from e
if not isinstance(result, str):
fail(
"Expected {} to return a 'str' while defining fixture '{}', but it returned:\n"
"{!r}".format(scope_callable, fixture_name, result),
pytrace=False,
)
return result
@final
class FixtureDef(Generic[FixtureValue]):
"""A container for a fixture definition."""
def __init__(
self,
fixturemanager: "FixtureManager",
baseid: Optional[str],
argname: str,
func: "_FixtureFunc[FixtureValue]",
scope: Union[Scope, "_ScopeName", Callable[[str, Config], "_ScopeName"], None],
params: Optional[Sequence[object]],
unittest: bool = False,
ids: Optional[
Union[Tuple[Optional[object], ...], Callable[[Any], Optional[object]]]
] = None,
) -> None:
self._fixturemanager = fixturemanager
# The "base" node ID for the fixture.
#
# This is a node ID prefix. A fixture is only available to a node (e.g.
# a `Function` item) if the fixture's baseid is a parent of the node's
# nodeid (see the `iterparentnodeids` function for what constitutes a
# "parent" and a "prefix" in this context).
#
# For a fixture found in a Collector's object (e.g. a `Module`s module,
# a `Class`'s class), the baseid is the Collector's nodeid.
#
# For a fixture found in a conftest plugin, the baseid is the conftest's
# directory path relative to the rootdir.
#
# For other plugins, the baseid is the empty string (always matches).
self.baseid = baseid or ""
# Whether the fixture was found from a node or a conftest in the
# collection tree. Will be false for fixtures defined in non-conftest
# plugins.
self.has_location = baseid is not None
# The fixture factory function.
self.func = func
# The name by which the fixture may be requested.
self.argname = argname
if scope is None:
scope = Scope.Function
elif callable(scope):
scope = _eval_scope_callable(scope, argname, fixturemanager.config)
if isinstance(scope, str):
scope = Scope.from_user(
scope, descr=f"Fixture '{func.__name__}'", where=baseid
)
self._scope = scope
# If the fixture is directly parametrized, the parameter values.
self.params: Optional[Sequence[object]] = params
# If the fixture is directly parametrized, a tuple of explicit IDs to
# assign to the parameter values, or a callable to generate an ID given
# a parameter value.
self.ids = ids
# The names requested by the fixtures.
self.argnames = getfuncargnames(func, name=argname, is_method=unittest)