forked from LiberTEM/LiberTEM
-
Notifications
You must be signed in to change notification settings - Fork 0
/
conftest.py
300 lines (240 loc) · 8.66 KB
/
conftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
import os
import importlib.util
import platform
import shutil
import numpy as np
import pytest
import h5py
import libertem.api as lt
from libertem.executor.inline import InlineJobExecutor
from libertem.io.dataset.hdf5 import H5DataSet
from libertem.io.dataset.raw import RawFileDataSet
from libertem.io.dataset.memory import MemoryDataSet
from libertem.executor.dask import DaskJobExecutor
# A bit of gymnastics to import the test utilities since this
# conftest.py file is shared between the doctests and unit tests
# and this file is outside the package
basedir = os.path.dirname(__file__)
location = os.path.join(basedir, "tests/utils.py")
spec = importlib.util.spec_from_file_location("utils", location)
utils = importlib.util.module_from_spec(spec)
spec.loader.exec_module(utils)
def get_or_create_hdf5(tmpdir_factory, filename, *args, **kwargs):
datadir = tmpdir_factory.mktemp('data')
filename = os.path.join(datadir, filename)
try:
with h5py.File(filename, 'r') as f:
yield f
except OSError:
with h5py.File(filename, "w") as f:
f.create_dataset("data", *args, **kwargs)
with h5py.File(filename, 'r') as f:
yield f
@pytest.fixture(scope='session')
def hdf5(tmpdir_factory):
yield from get_or_create_hdf5(tmpdir_factory, "hdf5-test.h5", data=np.ones((5, 5, 16, 16)))
@pytest.fixture(scope='session')
def hdf5_3d(tmpdir_factory):
yield from get_or_create_hdf5(tmpdir_factory, "hdf5-test-3d.h5", data=np.ones((17, 16, 16)))
@pytest.fixture(scope='session')
def hdf5_5d(tmpdir_factory):
yield from get_or_create_hdf5(tmpdir_factory, "hdf5-test-5d.h5",
data=np.ones((3, 5, 9, 16, 16)))
@pytest.fixture(scope='session')
def random_hdf5(tmpdir_factory):
yield from get_or_create_hdf5(tmpdir_factory, "hdf5-test-random.h5",
data=np.ones((5, 5, 16, 16)))
@pytest.fixture(scope='session')
def chunked_hdf5(tmpdir_factory):
yield from get_or_create_hdf5(tmpdir_factory, "hdf5-test-chunked.h5",
data=np.ones((5, 5, 16, 16)),
chunks=(1, 2, 16, 16))
@pytest.fixture(scope='session')
def empty_hdf5(tmpdir_factory):
datadir = tmpdir_factory.mktemp('data')
filename = datadir + '/hdf5-empty.h5'
try:
with h5py.File(filename, 'r') as f:
yield f
except OSError:
with h5py.File(filename, "w") as f:
pass
with h5py.File(filename, 'r') as f:
yield f
@pytest.fixture
def hdf5_ds_1(hdf5):
ds = H5DataSet(
path=hdf5.filename, ds_path="data", tileshape=(1, 5, 16, 16)
)
ds = ds.initialize(InlineJobExecutor())
return ds
@pytest.fixture
def hdf5_ds_2(random_hdf5):
ds = H5DataSet(
path=random_hdf5.filename, ds_path="data", tileshape=(1, 5, 16, 16)
)
ds = ds.initialize(InlineJobExecutor())
return ds
@pytest.fixture
def hdf5_ds_3d(hdf5_3d):
ds = H5DataSet(
path=hdf5_3d.filename, ds_path="data", tileshape=(1, 16, 16)
)
ds = ds.initialize(InlineJobExecutor())
return ds
@pytest.fixture
def hdf5_ds_5d(hdf5_5d):
ds = H5DataSet(
path=hdf5_5d.filename, ds_path="data", tileshape=(1, 1, 1, 16, 16)
)
ds = ds.initialize(InlineJobExecutor())
return ds
@pytest.fixture
def ds_complex():
data = np.random.choice(
a=[0, 1, 0+1j, 0-1j, 1+1j, 1-1j], size=(16, 16, 16, 16)
).astype('complex64')
dataset = MemoryDataSet(
data=data,
tileshape=(1, 16, 16),
num_partitions=2,
sig_dims=2,
)
return dataset
@pytest.fixture(scope='session')
def default_raw(tmpdir_factory):
datadir = tmpdir_factory.mktemp('data')
filename = datadir + '/raw-test-default'
data = utils._mk_random(size=(16, 16, 128, 128), dtype='float32')
data.tofile(str(filename))
del data
ds = RawFileDataSet(
path=str(filename),
scan_size=(16, 16),
dtype="float32",
detector_size=(128, 128),
)
ds.set_num_cores(2)
ds = ds.initialize(InlineJobExecutor())
yield ds
@pytest.fixture
def raw_on_workers(dist_ctx, tmpdir_factory):
"""
copy raw dataset to each worker
"""
datadir = tmpdir_factory.mktemp('data')
filename = str(datadir + '/raw-test-on-workers')
data = utils._mk_random(size=(16, 16, 128, 128), dtype='float32')
tmpdirpath = os.path.dirname(filename)
def _make_example_raw():
# workers don't automatically have the pytest tmp directory, create it:
if not os.path.exists(tmpdirpath):
os.makedirs(tmpdirpath)
print("creating %s" % filename)
data.tofile(filename)
print("created %s" % filename)
return tmpdirpath, os.listdir(tmpdirpath)
import cloudpickle
import pickle
dumped = cloudpickle.dumps(_make_example_raw)
pickle.loads(dumped)
print("raw_on_workers _make_example_raw: %s" %
(dist_ctx.executor.run_each_host(_make_example_raw),))
ds = dist_ctx.load("raw",
path=str(filename),
scan_size=(16, 16),
detector_size=(128, 128),
dtype="float32")
yield ds
def _cleanup():
# FIXME: this may litter /tmp/ with empty directories, as we only remove our own
# tmpdirpath, but as we run these tests in docker containers, they are eventually
# cleaned up anyways:
files = os.listdir(tmpdirpath)
shutil.rmtree(tmpdirpath, ignore_errors=True)
print("removed %s" % tmpdirpath)
return tmpdirpath, files
print("raw_on_workers cleanup: %s" % (dist_ctx.executor.run_each_host(_cleanup),))
@pytest.fixture(scope='session')
def large_raw(tmpdir_factory):
datadir = tmpdir_factory.mktemp('data')
filename = datadir + '/raw-test-large-sparse'
shape = (100, 100, 1216, 1216)
dtype = np.uint16
size = np.prod(np.int64(shape)) * np.dtype(dtype).itemsize
if platform.system() == "Windows":
os.system('FSUtil File CreateNew "%s" 0x%X' % (filename, size))
os.system('FSUtil Sparse SetFlag "%s"' % filename)
os.system('FSUtil Sparse SetRange "%s" 0 0x%X' % (filename, size))
else:
with open(filename, 'wb') as f:
f.truncate(size)
stat = os.stat(filename)
assert stat.st_blocks == 0
ds = RawFileDataSet(
path=str(filename),
scan_size=shape[:2],
dtype=dtype,
detector_size=shape[2:],
)
ds.set_num_cores(2)
ds = ds.initialize(InlineJobExecutor())
yield ds
@pytest.fixture(scope='session')
def uint16_raw(tmpdir_factory):
datadir = tmpdir_factory.mktemp('data')
filename = datadir + '/raw-test-default'
data = utils._mk_random(size=(16, 16, 128, 128), dtype='uint16')
data.tofile(str(filename))
del data
ds = RawFileDataSet(
path=str(filename),
scan_size=(16, 16),
dtype="uint16",
detector_size=(128, 128),
)
ds = ds.initialize(InlineJobExecutor())
yield ds
@pytest.fixture
def dist_ctx():
"""
This Context needs to have an external dask cluster running, with the following
assumptions:
- two workers: hostnames worker-1 and worker-2
- one scheduler node
- data availability TBD
- the address of the dask scheduler is passed in as DASK_SCHEDULER_ADDRESS
"""
scheduler_addr = os.environ['DASK_SCHEDULER_ADDRESS']
executor = DaskJobExecutor.connect(scheduler_addr)
with lt.Context(executor=executor) as ctx:
yield ctx
@pytest.fixture(autouse=True)
def auto_ctx(doctest_namespace):
ctx = lt.Context(executor=InlineJobExecutor())
doctest_namespace["ctx"] = ctx
@pytest.fixture(autouse=True)
def auto_ds(doctest_namespace):
dataset = MemoryDataSet(datashape=[16, 16, 16, 16])
doctest_namespace["dataset"] = dataset
@pytest.fixture(autouse=True)
def auto_libs(doctest_namespace):
doctest_namespace["np"] = np
@pytest.fixture(autouse=True)
def auto_libertem(doctest_namespace):
import libertem
import libertem.utils
import libertem.utils.generate
import libertem.udf.blobfinder
import libertem.masks
import libertem.api
doctest_namespace["libertem"] = libertem
doctest_namespace["libertem.utils"] = libertem.utils
doctest_namespace["libertem.utils.generate"] = libertem.utils.generate
doctest_namespace["libertem.udf.blobfinder"] = libertem.udf.blobfinder
doctest_namespace["libertem.masks"] = libertem.masks
doctest_namespace["libertem.api"] = libertem.api
@pytest.fixture(autouse=True)
def auto_files(doctest_namespace, hdf5, default_raw):
doctest_namespace["path_to_hdf5"] = hdf5.filename
doctest_namespace["path_to_raw"] = default_raw._path