-
Notifications
You must be signed in to change notification settings - Fork 2
/
bids_utils.py
315 lines (245 loc) · 11.1 KB
/
bids_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
import json
import logging
import os
import shutil
from collections import defaultdict
import pydicom
from xnat_tools.xnat_utils import download, get
_logger = logging.getLogger(__name__)
def insert_intended_for_fmap(bids_dir, sub_list):
"""Insert the IntendedFor field to JSON sidecart for fieldmap data"""
for subj in sub_list:
# makes list of the json files to edit
subj_path = f"{bids_dir}/sub-{subj}"
_logger.info(f"Processing participant {subj} at path {subj_path}")
subj_sub_dirs = os.listdir(subj_path)
sess_list = [x for x in subj_sub_dirs if x.startswith("ses-")]
_logger.info(f"List of sessions sub-directories {sess_list}")
for sess in sess_list:
fmap_path = f"{bids_dir}/sub-{subj}/{sess}/fmap"
func_path = f"{bids_dir}/sub-{subj}/{sess}/func"
fmap_files = [os.path.join(fmap_path, f) for f in os.listdir(fmap_path)]
json_files = [f for f in fmap_files if f.endswith(".json")]
_logger.info(f"List of JSON files to amend {json_files}")
# makes list of the func files to add into the intended for field
func_files = [f"{sess}/func/{file}" for file in os.listdir(func_path)]
nii_files = [i for i in func_files if i.endswith(".nii.gz")]
_logger.info("List of NII files")
# Open the json files ('r' for read only) as a dictionary
# Adds the Intended for key
# Add the func files to the key value
# The f.close is a duplication.
# f can only be used inside the with "loop"
# we open the file again to write only and
# dump the dictionary to the files
for file in json_files:
os.chmod(file, 0o664)
with open(file, "r") as f:
_logger.info(f"Processing file {f}")
data = json.load(f)
data["IntendedFor"] = nii_files
f.close
with open(file, "w") as f:
json.dump(data, f, indent=4, sort_keys=True)
f.close
_logger.info("Done with re-write")
def prepare_bids_prefixes(project, subject, session):
# get PI from project name
pi_prefix = project.lower().split("_")[0]
# Paths to export source data in a BIDS friendly way
study_prefix = "study-" + project.lower().split("_")[1]
subject_prefix = "sub-" + subject.lower().replace("_", "")
session_prefix = "ses-" + session.lower().replace("_", "")
return pi_prefix, study_prefix, subject_prefix, session_prefix
def prepare_export_output_path(
bids_root_dir,
pi_prefix,
study_prefix,
subject_prefix,
session_prefix,
overwrite=False,
):
study_dir = os.path.join(bids_root_dir, pi_prefix, study_prefix)
subject_dir = os.path.join(study_dir, "xnat-export", subject_prefix)
session_dir = os.path.join(subject_dir, session_prefix)
# Set up working directory
if overwrite and os.path.exists(session_dir):
_logger.info("Removing existing xnat-export session directory %s" % session_dir)
shutil.rmtree(session_dir, ignore_errors=True)
if not os.path.isdir(session_dir):
_logger.info("Making output xnat-export session directory %s" % session_dir)
os.makedirs(session_dir)
return session_dir
def prepare_heudi_prefixes(project, subject, session):
# get PI from project name
pi_prefix = project.lower().split("_")[0]
# Paths to export source data in a BIDS friendly way
study_prefix = "study-" + project.lower().split("_")[1]
subject_prefix = "sub-" + subject.lower().replace("_", "")
session_prefix = "ses-" + session.lower().replace("_", "")
return pi_prefix, study_prefix, subject_prefix, session_prefix
def prepare_heudiconv_output_path(
bids_root_dir,
pi_prefix,
study_prefix,
subject_prefix,
session_prefix,
overwrite=False,
):
heudi_study_dir = os.path.join(bids_root_dir, pi_prefix, study_prefix)
heudi_output_dir = os.path.join(heudi_study_dir, "bids")
subject_dir = os.path.join(heudi_output_dir, subject_prefix)
session_dir = os.path.join(subject_dir, session_prefix)
# Set up working directory
if overwrite:
print("Overwrite - Removing existing heudi session directory %s" % session_dir)
shutil.rmtree(session_dir, ignore_errors=True)
if not os.path.isdir(heudi_output_dir):
print("Making output BIDS Session directory %s" % heudi_output_dir)
os.makedirs(heudi_output_dir)
return heudi_output_dir
def bidsmap_scans(scans, bidsmap=None):
"""Filter the series descriptions based on the bidsmap file"""
# NOTE (BNR): We could break these down into smaller functions, one for
# bidsmap, one for scanner exceptions, one for run+, but that
# would add an extra loop per function. I feel like this
# strikes a balance. One loop to handle the scan_id stuff, one
# loop to handle the series_description stuff.
if bidsmap is None:
bidsmap = []
# NOTE (BNR): First thing we do is flatten the bidsmap structure. This makes
# the bidsmap much easier to use when trying to figure out which
# sequences match something in the bidsmap file.
bidsmap = {i["series_description"]: i["bidsname"] for i in bidsmap}
# NOTE (BNR): In order to replace run+ we need to keep a count of how many
# times we've seen a particular series_description before. That
# is where the defaultdict comes in. It helps us keep track of
# the series_descriptions we've seen before.
run_count_cache = defaultdict(int)
desired_scans = []
for scan_id, series_description in scans:
if series_description in bidsmap:
series_description = bidsmap[series_description]
series_description = handle_scanner_exceptions(series_description)
if "run+" in series_description:
run_count_cache[series_description] += 1
series_description = series_description.replace(
"run+", f"run-{run_count_cache[series_description]:02}"
)
desired_scans.append((scan_id, series_description))
return desired_scans
def handle_scanner_exceptions(match):
# T1W and T2W need to be upper case
match = match.replace("t1w", "T1w")
match = match.replace("t2w", "T2w")
# Handle the aascout planes
match = match.replace("_MPR_sag", "MPRsag")
match = match.replace("_MPR_cor", "MPRcor")
match = match.replace("_MPR_tra", "MPRtra")
# Handle the mprage rms
match = match.replace(" RMS", "RMS")
return match
def bidsify_dicom_headers(filename, series_description):
"""Updates the DICOM headers to match the new series_description"""
dataset = pydicom.dcmread(filename)
if "ProtocolName" not in dataset:
return
if dataset.data_element("ProtocolName").value != series_description:
dataset.data_element("ProtocolName").value = series_description
dataset.data_element("SeriesDescription").value = series_description
def scan_contains_dicom(connection, host, session, scanid):
"""Checks to see if the scan has suitable DICOM files for BIDS conversion"""
resp = get(
connection,
host + "/data/experiments/%s/scans/%s/resources" % (session, scanid),
params={"format": "json"},
)
dicomResourceList = [
r for r in resp.json()["ResultSet"]["Result"] if r["label"] == "DICOM"
]
# NOTE (BNR): A scan contains multiple resources. A resource can be thought
# of as a folder. We only want a single DICOM folder. If we have
# multiple, something is weird. If we don't have any DICOM
# resources the scan doesn't have any DICOM images. We only
# download the scan if there's a single DICOM resource
if len(dicomResourceList) <= 0:
return False
elif len(dicomResourceList) > 1:
return False
else:
dicomResource = dicomResourceList[0]
# NOTE (BNR): We only want to process the scan if we have dicom files. But
# sometimes the file_count field is empty and we process anyway even
# though that might make things break later
if dicomResource.get("file_count") is None:
_logger.warning(
'DICOM resources for scan %s have a blank "file_count". '
"I cannot check to see if there are no files. "
"I am not skipping the scan. "
"This may lead to errors later if there are no DICOM files in the scan.",
scanid,
)
return True
elif int(dicomResource["file_count"]) == 0:
return False
return True
def assign_bids_name(
connection,
host,
subject,
session,
scans,
build_dir,
bids_session_dir,
):
"""
subject: Subject to process
scans: Tuple of scan id and series descriptions
build_dir: build director. What is this?
study_bids_dir: BIDS directory to copy simlinks to. Typically the RESOURCES/BIDS
"""
for scanid, seriesdesc in scans:
if not scan_contains_dicom(connection, host, session, scanid):
continue
# BIDS sourcedatadirectory for this scan
_logger.info(f"bids_session_dir: {bids_session_dir}")
_logger.info(f"BIDSNAME: {seriesdesc}")
bids_scan_directory = os.path.join(bids_session_dir, seriesdesc)
if not os.path.isdir(bids_scan_directory):
_logger.info("Making scan DICOM directory %s." % bids_scan_directory)
os.mkdir(bids_scan_directory)
else:
_logger.warning(
f"{bids_scan_directory} already exists. \
See documentation to understand behavior for repeated sequences."
)
filesURL = host + "/data/experiments/%s/scans/%s/resources/DICOM/files" % (
session,
scanid,
)
r = get(connection, filesURL, params={"format": "json"})
# Build a dict keyed off file name
dicomFileDict = {
dicom["Name"]: {"URI": host + dicom["URI"]}
for dicom in r.json()["ResultSet"]["Result"]
}
# Have to manually add absolutePath with a separate request
r = get(
connection, filesURL, params={"format": "json", "locator": "absolutePath"}
)
for dicom in r.json()["ResultSet"]["Result"]:
dicomFileDict[dicom["Name"]]["absolutePath"] = dicom["absolutePath"]
# Download DICOMs
_logger.info("Downloading files")
os.chdir(bids_scan_directory)
dicomFileList = list(dicomFileDict.items())
(name, pathDict) = dicomFileList[0]
download(connection, name, pathDict)
bidsify_dicom_headers(name, seriesdesc)
# Download remaining DICOMs
for name, pathDict in dicomFileList[1:]:
download(connection, name, pathDict)
bidsify_dicom_headers(name, seriesdesc)
os.chdir(build_dir)
_logger.info("Done.")
_logger.info("---------------------------------")