-
Notifications
You must be signed in to change notification settings - Fork 2
/
load.py
72 lines (64 loc) · 2 KB
/
load.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2023 Scipp contributors (https://github.com/scipp)
import os
import pandas as pd
import scipp as sc
import scippnexus as sx
from typing import Tuple
import warnings
def load_nexus(path: str) -> sc.DataArray:
"""
Load a SANS nexus file and return a scipp DataArray with the data.
"""
fname = os.path.join(path, "mccode.h5")
with warnings.catch_warnings():
warnings.simplefilter("ignore")
with sx.File(fname) as f:
dg = f[...]
events = sc.collapse(
dg["entry1"]["data"]["detector_signal_event_dat"].data, keep="dim_0"
)
meta = dg["entry1"]["simulation"]["Param"]
columns = ["p", "x", "y", "n", "id", "t"]
events = {
c: v.rename_dims(dim_0="event").copy() for c, v in zip(columns, events.values())
}
return events, meta
def _load_header(fname: str, comment: str = "#") -> dict:
lines = []
maxlines = 100
with open(fname, "r") as f:
for _ in range(maxlines):
line = f.readline()
if line.startswith(comment):
lines.append(line.lstrip(f" {comment}").strip())
else:
break
header = {}
for l in lines:
if l.startswith("Param"):
key, value = l.split(":")[1].split("=")
header[key.strip()] = value.strip()
else:
pieces = l.split(":")
if len(pieces) == 2:
value = pieces[1].strip()
else:
value = pieces[1:]
header[pieces[0].strip()] = value
return header
def load_ascii(
filename: str,
) -> Tuple[sc.DataArray, dict]:
meta = _load_header(fname=filename)
ds = sc.compat.from_pandas(
pd.read_csv(
filename,
delimiter=" ",
comment="#",
names=["p", "x", "y", "n", "id", "t"],
index_col=False,
)
)
events = {key: c.data.rename_dims(row="event") for key, c in ds.items()}
return events, meta