-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
conftest.py
192 lines (151 loc) · 5.07 KB
/
conftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
from __future__ import unicode_literals
import os
import mockssh
import pytest
from git import Repo
from git.exc import GitCommandNotFound
from .basic_env import TestDirFixture
from .basic_env import TestDvcGitFixture
from .basic_env import TestGitFixture
from dvc.remote.config import RemoteConfig
from dvc.remote.ssh.connection import SSHConnection
from dvc.repo import Repo as DvcRepo
from dvc.utils.compat import cast_bytes_py2
# Prevent updater and analytics from running their processes
os.environ[cast_bytes_py2("DVC_TEST")] = cast_bytes_py2("true")
# Ensure progress output even when not outputting to raw sys.stderr console
os.environ[cast_bytes_py2("DVC_IGNORE_ISATTY")] = cast_bytes_py2("true")
@pytest.fixture(autouse=True)
def reset_loglevel(request, caplog):
"""
Use it to ensure log level at the start of each test
regardless of dvc.logger.setup(), Repo configs or whatever.
"""
level = request.config.getoption("--log-level")
if level:
with caplog.at_level(level.upper(), logger="dvc"):
yield
else:
yield
# Wrap class like fixture as pytest-like one to avoid code duplication
@pytest.fixture
def repo_dir():
old_fixture = TestDirFixture()
old_fixture.setUp()
try:
yield old_fixture
finally:
old_fixture.tearDown()
# NOTE: this duplicates code from GitFixture,
# would fix itself once class-based fixtures are removed
@pytest.fixture
def git(repo_dir):
# NOTE: handles EAGAIN error on BSD systems (osx in our case).
# Otherwise when running tests you might get this exception:
#
# GitCommandNotFound: Cmd('git') not found due to:
# OSError('[Errno 35] Resource temporarily unavailable')
retries = 5
while True:
try:
git = Repo.init()
break
except GitCommandNotFound:
retries -= 1
if not retries:
raise
try:
git.index.add([repo_dir.CODE])
git.index.commit("add code")
yield git
finally:
git.close()
@pytest.fixture
def dvc_repo(repo_dir):
yield DvcRepo.init(repo_dir._root_dir, no_scm=True)
here = os.path.abspath(os.path.dirname(__file__))
user = "user"
key_path = os.path.join(here, "{0}.key".format(user))
@pytest.fixture
def ssh_server():
users = {user: key_path}
with mockssh.Server(users) as s:
s.test_creds = {
"host": s.host,
"port": s.port,
"username": user,
"key_filename": key_path,
}
yield s
@pytest.fixture
def ssh(ssh_server):
yield SSHConnection(**ssh_server.test_creds)
@pytest.fixture
def temporary_windows_drive(repo_dir):
import string
import win32api
from ctypes import windll
from win32con import DDD_REMOVE_DEFINITION
drives = [
s[0].upper()
for s in win32api.GetLogicalDriveStrings().split("\000")
if len(s) > 0
]
new_drive_name = [
letter for letter in string.ascii_uppercase if letter not in drives
][0]
new_drive = "{}:".format(new_drive_name)
target_path = repo_dir.mkdtemp()
set_up_result = windll.kernel32.DefineDosDeviceW(0, new_drive, target_path)
if set_up_result == 0:
raise RuntimeError("Failed to mount windows drive!")
# NOTE: new_drive has form of `A:` and joining it with some relative
# path might result in non-existing path (A:path\\to)
yield os.path.join(new_drive, os.sep)
tear_down_result = windll.kernel32.DefineDosDeviceW(
DDD_REMOVE_DEFINITION, new_drive, target_path
)
if tear_down_result == 0:
raise RuntimeError("Could not unmount windows drive!")
@pytest.fixture
def erepo(repo_dir):
repo = TestDvcGitFixture()
repo.setUp()
try:
stage_foo = repo.dvc.add(repo.FOO)[0]
stage_bar = repo.dvc.add(repo.BAR)[0]
stage_data_dir = repo.dvc.add(repo.DATA_DIR)[0]
repo.dvc.scm.add([stage_foo.path, stage_bar.path, stage_data_dir.path])
repo.dvc.scm.commit("init repo")
rconfig = RemoteConfig(repo.dvc.config)
rconfig.add("upstream", repo.dvc.cache.local.cache_dir, default=True)
repo.dvc.scm.add([repo.dvc.config.config_file])
repo.dvc.scm.commit("add remote")
repo.create("version", "master")
repo.dvc.add("version")
repo.dvc.scm.add([".gitignore", "version.dvc"])
repo.dvc.scm.commit("master")
repo.dvc.scm.checkout("branch", create_new=True)
os.unlink(os.path.join(repo.root_dir, "version"))
repo.create("version", "branch")
repo.dvc.add("version")
repo.dvc.scm.add([".gitignore", "version.dvc"])
repo.dvc.scm.commit("branch")
repo.dvc.scm.checkout("master")
repo.dvc.scm.close()
repo.git.close()
os.chdir(repo._saved_dir)
yield repo
finally:
repo.tearDown()
@pytest.fixture(scope="session", autouse=True)
def _close_pools():
from dvc.remote.pool import close_pools
yield
close_pools()
@pytest.fixture
def git_erepo():
repo = TestGitFixture()
repo.setUp()
yield repo
repo.tearDown()