diff --git a/docs/source/v1.6.md.inc b/docs/source/v1.6.md.inc index 01bfd87e4..3c87dac23 100644 --- a/docs/source/v1.6.md.inc +++ b/docs/source/v1.6.md.inc @@ -6,6 +6,8 @@ - Added [`regress_artifact`][mne_bids_pipeline._config.regress_artifact] to allow artifact regression (e.g., of MEG reference sensors in KIT systems) (#837 by @larsoner) - Chosen `reject` parameters are now saved in the generated HTML reports (#839 by @larsoner) +- Added saving of clean raw data in addition to epochs (#840 by @larsoner) +- Added saving of detected blink and cardiac events used to calculate SSP projectors (#840 by @larsoner) [//]: # (### :warning: Behavior changes) @@ -21,6 +23,7 @@ - Fix bug where EEG `reject` params were not used for `ch_types = ["meg", "eeg"]` (#839 by @larsoner) - Fix bug where implicit `mf_reference_run` could change across invocations of `mne_bids_pipeline`, breaking caching (#839 by @larsoner) - Fix bug where `--no-cache` had no effect (#839 by @larsoner) +- Fix bug where raw, empty-room, and custom noise covariances were errantly calculated on data without ICA or SSP applied (#840 by @larsoner) ### :medical_symbol: Code health diff --git a/mne_bids_pipeline/_config.py b/mne_bids_pipeline/_config.py index e3c7626bb..041331da5 100644 --- a/mne_bids_pipeline/_config.py +++ b/mne_bids_pipeline/_config.py @@ -1161,14 +1161,14 @@ ways using the configuration options you can find below. """ -min_ecg_epochs: int = 5 +min_ecg_epochs: Annotated[int, Ge(1)] = 5 """ -Minimal number of ECG epochs needed to compute SSP or ICA rejection. +Minimal number of ECG epochs needed to compute SSP projectors. """ -min_eog_epochs: int = 5 +min_eog_epochs: Annotated[int, Ge(1)] = 5 """ -Minimal number of EOG epochs needed to compute SSP or ICA rejection. +Minimal number of EOG epochs needed to compute SSP projectors. """ diff --git a/mne_bids_pipeline/_config_utils.py b/mne_bids_pipeline/_config_utils.py index 7b555a2a4..784752028 100644 --- a/mne_bids_pipeline/_config_utils.py +++ b/mne_bids_pipeline/_config_utils.py @@ -484,7 +484,7 @@ def get_noise_cov_bids_path( task=cfg.task, acquisition=cfg.acq, run=None, - processing=cfg.proc, + processing="clean", recording=cfg.rec, space=cfg.space, suffix="cov", @@ -638,3 +638,24 @@ def _pl(x, *, non_pl="", pl="s"): """Determine if plural should be used.""" len_x = x if isinstance(x, (int, np.generic)) else len(x) return non_pl if len_x == 1 else pl + + +def _proj_path( + *, + cfg: SimpleNamespace, + subject: str, + session: Optional[str], +) -> BIDSPath: + return BIDSPath( + subject=subject, + session=session, + task=cfg.task, + acquisition=cfg.acq, + recording=cfg.rec, + space=cfg.space, + datatype=cfg.datatype, + root=cfg.deriv_root, + extension=".fif", + suffix="proj", + check=False, + ) diff --git a/mne_bids_pipeline/steps/preprocessing/_05_regress_artifact.py b/mne_bids_pipeline/steps/preprocessing/_05_regress_artifact.py index 5ab1119a6..cb31df04d 100644 --- a/mne_bids_pipeline/steps/preprocessing/_05_regress_artifact.py +++ b/mne_bids_pipeline/steps/preprocessing/_05_regress_artifact.py @@ -82,7 +82,7 @@ def run_regress_artifact( model.apply(raw, copy=False) if projs: raw.add_proj(projs) - raw.save(out_files[in_key], overwrite=True) + raw.save(out_files[in_key], overwrite=True, split_size=cfg._raw_split_size) _update_for_splits(out_files, in_key) model.save(out_files["regress"], overwrite=True) assert len(in_files) == 0, in_files.keys() diff --git a/mne_bids_pipeline/steps/preprocessing/_06b_run_ssp.py b/mne_bids_pipeline/steps/preprocessing/_06b_run_ssp.py index 7aa0e97de..7ec75ef91 100644 --- a/mne_bids_pipeline/steps/preprocessing/_06b_run_ssp.py +++ b/mne_bids_pipeline/steps/preprocessing/_06b_run_ssp.py @@ -7,13 +7,15 @@ from typing import Optional import mne +import numpy as np from mne import compute_proj_epochs, compute_proj_evoked -from mne.preprocessing import create_ecg_epochs, create_eog_epochs +from mne.preprocessing import find_ecg_events, find_eog_events from mne_bids import BIDSPath from ..._config_utils import ( _bids_kwargs, _pl, + _proj_path, get_runs, get_sessions, get_subjects, @@ -25,6 +27,11 @@ from ..._run import _prep_out_files, _update_for_splits, failsafe_run, save_logs +def _find_ecg_events(raw: mne.io.Raw, ch_name: Optional[str]) -> np.ndarray: + """Wrap find_ecg_events to use the same defaults as create_ecg_events.""" + return find_ecg_events(raw, ch_name=ch_name, l_freq=8, h_freq=16)[0] + + def get_input_fnames_run_ssp( *, cfg: SimpleNamespace, @@ -69,14 +76,7 @@ def run_ssp( # compute SSP on all runs of raw raw_fnames = [in_files.pop(f"raw_run-{run}") for run in cfg.runs] - # when saving proj, use run=None - out_files = dict() - out_files["proj"] = ( - raw_fnames[0] - .copy() - .update(run=None, suffix="proj", split=None, processing=None, check=False) - ) - + out_files = dict(proj=_proj_path(cfg=cfg, subject=subject, session=session)) msg = ( f"Input{_pl(raw_fnames)} ({len(raw_fnames)}): " f'{raw_fnames[0].basename}{_pl(raw_fnames, pl=" ...")}' @@ -93,7 +93,7 @@ def run_ssp( projs = dict() proj_kinds = ("ecg", "eog") rate_names = dict(ecg="heart", eog="blink") - epochs_fun = dict(ecg=create_ecg_epochs, eog=create_eog_epochs) + events_fun = dict(ecg=_find_ecg_events, eog=find_eog_events) minimums = dict(ecg=cfg.min_ecg_epochs, eog=cfg.min_eog_epochs) rejects = dict(ecg=cfg.ssp_reject_ecg, eog=cfg.ssp_reject_eog) avg = dict(ecg=cfg.ecg_proj_from_average, eog=cfg.eog_proj_from_average) @@ -111,17 +111,38 @@ def run_ssp( projs[kind] = [] if not any(n_projs[kind].values()): continue - proj_epochs = epochs_fun[kind]( - raw, - ch_name=ch_name[kind], - decim=cfg.epochs_decim, - ) - n_orig = len(proj_epochs.selection) + events = events_fun[kind](raw=raw, ch_name=ch_name[kind]) + n_orig = len(events) rate = n_orig / raw.times[-1] * 60 bpm_msg = f"{rate:5.1f} bpm" msg = f"Detected {rate_names[kind]} rate: {bpm_msg}" logger.info(**gen_log_kwargs(message=msg)) - # Enough to start + # Enough to create epochs + if len(events) < minimums[kind]: + msg = ( + f"No {kind.upper()} projectors computed: got " + f"{len(events)} original events < {minimums[kind]} {bpm_msg}" + ) + logger.warning(**gen_log_kwargs(message=msg)) + continue + out_files[f"events_{kind}"] = ( + out_files["proj"] + .copy() + .update(suffix=f"{kind}-eve", split=None, check=False, extension=".txt") + ) + mne.write_events(out_files[f"events_{kind}"], events, overwrite=True) + proj_epochs = mne.Epochs( + raw, + events=events, + event_id=events[0, 2], + tmin=-0.5, + tmax=0.5, + proj=False, + baseline=(None, None), + reject_by_annotation=True, + preload=True, + decim=cfg.epochs_decim, + ) if len(proj_epochs) >= minimums[kind]: reject_ = _get_reject( subject=subject, @@ -134,7 +155,6 @@ def run_ssp( proj_epochs.drop_bad(reject=reject_) # Still enough after rejection if len(proj_epochs) >= minimums[kind]: - proj_epochs.apply_baseline((None, None)) use = proj_epochs.average() if avg[kind] else proj_epochs fun = compute_proj_evoked if avg[kind] else compute_proj_epochs desc_prefix = ( diff --git a/mne_bids_pipeline/steps/preprocessing/_07_make_epochs.py b/mne_bids_pipeline/steps/preprocessing/_07_make_epochs.py index 0cebb033e..42bf721df 100644 --- a/mne_bids_pipeline/steps/preprocessing/_07_make_epochs.py +++ b/mne_bids_pipeline/steps/preprocessing/_07_make_epochs.py @@ -54,7 +54,7 @@ def get_input_fnames_epochs( extension=".fif", datatype=cfg.datatype, root=cfg.deriv_root, - processing="filt", + processing=cfg.processing, ).update(suffix="raw", check=False) # Generate a list of raw data paths (i.e., paths of individual runs) @@ -276,7 +276,7 @@ def _get_events(cfg, subject, session): acquisition=cfg.acq, recording=cfg.rec, space=cfg.space, - processing="filt", + processing=cfg.processing, suffix="raw", extension=".fif", datatype=cfg.datatype, @@ -322,6 +322,7 @@ def get_config( rest_epochs_overlap=config.rest_epochs_overlap, _epochs_split_size=config._epochs_split_size, runs=get_runs(config=config, subject=subject), + processing="filt" if config.regress_artifact is None else "regress", **_bids_kwargs(config=config), ) return cfg diff --git a/mne_bids_pipeline/steps/preprocessing/_08a_apply_ica.py b/mne_bids_pipeline/steps/preprocessing/_08a_apply_ica.py index f4b999cc8..e53a4758f 100644 --- a/mne_bids_pipeline/steps/preprocessing/_08a_apply_ica.py +++ b/mne_bids_pipeline/steps/preprocessing/_08a_apply_ica.py @@ -21,22 +21,23 @@ from mne_bids import BIDSPath from ..._config_utils import ( - _bids_kwargs, + get_runs_tasks, get_sessions, get_subjects, ) +from ..._import_data import _get_run_rest_noise_path, _import_data_kwargs from ..._logging import gen_log_kwargs, logger from ..._parallel import get_parallel_backend, parallel_func -from ..._report import _agg_backend, _open_report +from ..._report import _add_raw, _agg_backend, _open_report from ..._run import _prep_out_files, _update_for_splits, failsafe_run, save_logs -def get_input_fnames_apply_ica( +def _ica_paths( *, cfg: SimpleNamespace, subject: str, session: Optional[str], -) -> dict: +): bids_basename = BIDSPath( subject=subject, session=session, @@ -53,15 +54,56 @@ def get_input_fnames_apply_ica( in_files["components"] = bids_basename.copy().update( processing="ica", suffix="components", extension=".tsv" ) - in_files["epochs"] = bids_basename.copy().update(suffix="epo", extension=".fif") + return in_files + + +def _read_ica_and_exclude( + in_files: dict, +) -> None: + ica = read_ica(fname=in_files.pop("ica")) + tsv_data = pd.read_csv(in_files.pop("components"), sep="\t") + ica.exclude = tsv_data.loc[tsv_data["status"] == "bad", "component"].to_list() + return ica + + +def get_input_fnames_apply_ica_epochs( + *, + cfg: SimpleNamespace, + subject: str, + session: Optional[str], +) -> dict: + in_files = _ica_paths(cfg=cfg, subject=subject, session=session) + in_files["epochs"] = in_files["ica"].copy().update(suffix="epo", extension=".fif") _update_for_splits(in_files, "epochs", single=True) return in_files +def get_input_fnames_apply_ica_raw( + *, + cfg: SimpleNamespace, + subject: str, + session: Optional[str], + run: str, + task: Optional[str], +) -> dict: + in_files = _get_run_rest_noise_path( + cfg=cfg, + subject=subject, + session=session, + run=run, + task=task, + kind="filt", + mf_reference_run=cfg.mf_reference_run, + ) + assert len(in_files) + in_files.update(_ica_paths(cfg=cfg, subject=subject, session=session)) + return in_files + + @failsafe_run( - get_input_fnames=get_input_fnames_apply_ica, + get_input_fnames=get_input_fnames_apply_ica_epochs, ) -def apply_ica( +def apply_ica_epochs( *, cfg: SimpleNamespace, exec_params: SimpleNamespace, @@ -85,11 +127,7 @@ def apply_ica( # Load ICA. msg = f"Reading ICA: {in_files['ica']}" logger.debug(**gen_log_kwargs(message=msg)) - ica = read_ica(fname=in_files.pop("ica")) - - # Select ICs to remove. - tsv_data = pd.read_csv(in_files.pop("components"), sep="\t") - ica.exclude = tsv_data.loc[tsv_data["status"] == "bad", "component"].to_list() + ica = _read_ica_and_exclude(in_files) # Load epochs. msg = f'Input: {in_files["epochs"].basename}' @@ -168,16 +206,65 @@ def apply_ica( return _prep_out_files(exec_params=exec_params, out_files=out_files) +@failsafe_run( + get_input_fnames=get_input_fnames_apply_ica_raw, +) +def apply_ica_raw( + *, + cfg: SimpleNamespace, + exec_params: SimpleNamespace, + subject: str, + session: Optional[str], + run: str, + task: Optional[str], + in_files: dict, +) -> dict: + ica = _read_ica_and_exclude(in_files) + in_key = list(in_files)[0] + assert in_key.startswith("raw"), in_key + raw_fname = in_files.pop(in_key) + assert len(in_files) == 0, in_files + out_files = dict() + out_files[in_key] = raw_fname.copy().update(processing="clean") + msg = f"Writing {out_files[in_key].basename} …" + logger.info(**gen_log_kwargs(message=msg)) + raw = mne.io.read_raw_fif(raw_fname, preload=True) + ica.apply(raw) + raw.save(out_files[in_key], overwrite=True, split_size=cfg._raw_split_size) + _update_for_splits(out_files, in_key) + # Report + with _open_report( + cfg=cfg, + exec_params=exec_params, + subject=subject, + session=session, + run=run, + task=task, + ) as report: + msg = "Adding cleaned raw data to report" + logger.info(**gen_log_kwargs(message=msg)) + _add_raw( + cfg=cfg, + report=report, + bids_path_in=out_files[in_key], + title="Raw (clean)", + tags=("clean",), + raw=raw, + ) + return _prep_out_files(exec_params=exec_params, out_files=out_files) + + def get_config( *, config: SimpleNamespace, + subject: str, ) -> SimpleNamespace: cfg = SimpleNamespace( baseline=config.baseline, ica_reject=config.ica_reject, - ch_types=config.ch_types, + processing="filt" if config.regress_artifact is None else "regress", _epochs_split_size=config._epochs_split_size, - **_bids_kwargs(config=config), + **_import_data_kwargs(config=config, subject=subject), ) return cfg @@ -190,17 +277,45 @@ def main(*, config: SimpleNamespace) -> None: return with get_parallel_backend(config.exec_params): - parallel, run_func = parallel_func(apply_ica, exec_params=config.exec_params) + # Epochs + parallel, run_func = parallel_func( + apply_ica_epochs, exec_params=config.exec_params + ) logs = parallel( run_func( cfg=get_config( config=config, + subject=subject, + ), + exec_params=config.exec_params, + subject=subject, + session=session, + ) + for subject in get_subjects(config) + for session in get_sessions(config) + ) + # Raw + parallel, run_func = parallel_func( + apply_ica_raw, exec_params=config.exec_params + ) + logs += parallel( + run_func( + cfg=get_config( + config=config, + subject=subject, ), exec_params=config.exec_params, subject=subject, session=session, + run=run, + task=task, ) for subject in get_subjects(config) for session in get_sessions(config) + for run, task in get_runs_tasks( + config=config, + subject=subject, + session=session, + ) ) save_logs(config=config, logs=logs) diff --git a/mne_bids_pipeline/steps/preprocessing/_08b_apply_ssp.py b/mne_bids_pipeline/steps/preprocessing/_08b_apply_ssp.py index b1eda9cd1..e6fad4b8f 100644 --- a/mne_bids_pipeline/steps/preprocessing/_08b_apply_ssp.py +++ b/mne_bids_pipeline/steps/preprocessing/_08b_apply_ssp.py @@ -9,47 +9,37 @@ from typing import Optional import mne -from mne_bids import BIDSPath from ..._config_utils import ( - _bids_kwargs, + _proj_path, + get_runs_tasks, get_sessions, get_subjects, ) +from ..._import_data import _get_run_rest_noise_path, _import_data_kwargs from ..._logging import gen_log_kwargs, logger from ..._parallel import get_parallel_backend, parallel_func +from ..._report import _add_raw, _open_report from ..._run import _prep_out_files, _update_for_splits, failsafe_run, save_logs -def get_input_fnames_apply_ssp( +def get_input_fnames_apply_ssp_epochs( *, cfg: SimpleNamespace, subject: str, session: Optional[str], ) -> dict: - bids_basename = BIDSPath( - subject=subject, - session=session, - task=cfg.task, - acquisition=cfg.acq, - recording=cfg.rec, - space=cfg.space, - datatype=cfg.datatype, - root=cfg.deriv_root, - extension=".fif", - check=False, - ) in_files = dict() - in_files["epochs"] = bids_basename.copy().update(suffix="epo", check=False) + in_files["proj"] = _proj_path(cfg=cfg, subject=subject, session=session) + in_files["epochs"] = in_files["proj"].copy().update(suffix="epo", check=False) _update_for_splits(in_files, "epochs", single=True) - in_files["proj"] = bids_basename.copy().update(suffix="proj", check=False) return in_files @failsafe_run( - get_input_fnames=get_input_fnames_apply_ssp, + get_input_fnames=get_input_fnames_apply_ssp_epochs, ) -def apply_ssp( +def apply_ssp_epochs( *, cfg: SimpleNamespace, exec_params: SimpleNamespace, @@ -81,13 +71,85 @@ def apply_ssp( return _prep_out_files(exec_params=exec_params, out_files=out_files) +def get_input_fnames_apply_ssp_raw( + *, + cfg: SimpleNamespace, + subject: str, + session: Optional[str], + run: str, + task: Optional[str], +) -> dict: + in_files = _get_run_rest_noise_path( + cfg=cfg, + subject=subject, + session=session, + run=run, + task=task, + kind="filt", + mf_reference_run=cfg.mf_reference_run, + ) + assert len(in_files) + in_files["proj"] = _proj_path(cfg=cfg, subject=subject, session=session) + return in_files + + +@failsafe_run( + get_input_fnames=get_input_fnames_apply_ssp_raw, +) +def apply_ssp_raw( + *, + cfg: SimpleNamespace, + exec_params: SimpleNamespace, + subject: str, + session: Optional[str], + run: str, + task: Optional[str], + in_files: dict, +) -> dict: + projs = mne.read_proj(in_files.pop("proj")) + in_key = list(in_files.keys())[0] + assert in_key.startswith("raw"), in_key + raw_fname = in_files.pop(in_key) + assert len(in_files) == 0, in_files.keys() + raw = mne.io.read_raw_fif(raw_fname) + raw.add_proj(projs) + out_files = dict() + out_files[in_key] = raw_fname.copy().update(processing="clean") + msg = f"Writing {out_files[in_key].basename} …" + logger.info(**gen_log_kwargs(message=msg)) + raw.save(out_files[in_key], overwrite=True, split_size=cfg._raw_split_size) + _update_for_splits(out_files, in_key) + # Report + with _open_report( + cfg=cfg, + exec_params=exec_params, + subject=subject, + session=session, + run=run, + task=task, + ) as report: + msg = "Adding cleaned raw data to report" + logger.info(**gen_log_kwargs(message=msg)) + _add_raw( + cfg=cfg, + report=report, + bids_path_in=out_files[in_key], + title="Raw (clean)", + tags=("clean",), + raw=raw, + ) + return _prep_out_files(exec_params=exec_params, out_files=out_files) + + def get_config( *, config: SimpleNamespace, + subject: str, ) -> SimpleNamespace: cfg = SimpleNamespace( + processing="filt" if config.regress_artifact is None else "regress", _epochs_split_size=config._epochs_split_size, - **_bids_kwargs(config=config), + **_import_data_kwargs(config=config, subject=subject), ) return cfg @@ -100,11 +162,15 @@ def main(*, config: SimpleNamespace) -> None: return with get_parallel_backend(config.exec_params): - parallel, run_func = parallel_func(apply_ssp, exec_params=config.exec_params) + # Epochs + parallel, run_func = parallel_func( + apply_ssp_epochs, exec_params=config.exec_params + ) logs = parallel( run_func( cfg=get_config( config=config, + subject=subject, ), exec_params=config.exec_params, subject=subject, @@ -113,4 +179,28 @@ def main(*, config: SimpleNamespace) -> None: for subject in get_subjects(config) for session in get_sessions(config) ) + # Raw + parallel, run_func = parallel_func( + apply_ssp_raw, exec_params=config.exec_params + ) + logs += parallel( + run_func( + cfg=get_config( + config=config, + subject=subject, + ), + exec_params=config.exec_params, + subject=subject, + session=session, + run=run, + task=task, + ) + for subject in get_subjects(config) + for session in get_sessions(config) + for run, task in get_runs_tasks( + config=config, + subject=subject, + session=session, + ) + ) save_logs(config=config, logs=logs) diff --git a/mne_bids_pipeline/steps/preprocessing/_09_ptp_reject.py b/mne_bids_pipeline/steps/preprocessing/_09_ptp_reject.py index 3584aa72f..d08469b3c 100644 --- a/mne_bids_pipeline/steps/preprocessing/_09_ptp_reject.py +++ b/mne_bids_pipeline/steps/preprocessing/_09_ptp_reject.py @@ -187,7 +187,7 @@ def drop_ptp( psd = True else: psd = 30 - tags = ("epochs", "reject") + tags = ("epochs", "clean") kind = cfg.reject if isinstance(cfg.reject, str) else "Rejection" title = "Epochs: after cleaning" with _open_report( diff --git a/mne_bids_pipeline/steps/sensor/_06_make_cov.py b/mne_bids_pipeline/steps/sensor/_06_make_cov.py index a9c211df4..5a210d45f 100644 --- a/mne_bids_pipeline/steps/sensor/_06_make_cov.py +++ b/mne_bids_pipeline/steps/sensor/_06_make_cov.py @@ -71,7 +71,7 @@ def get_input_fnames_cov( run=None, recording=cfg.rec, space=cfg.space, - processing="filt", + processing="clean", suffix="raw", extension=".fif", datatype=cfg.datatype, @@ -173,7 +173,7 @@ def retrieve_custom_cov( task=cfg.task, acquisition=cfg.acq, run=None, - processing=cfg.proc, + processing="clean", recording=cfg.rec, space=cfg.space, suffix="ave", diff --git a/mne_bids_pipeline/steps/sensor/_99_group_average.py b/mne_bids_pipeline/steps/sensor/_99_group_average.py index 7ac19e7de..63e4e6ea2 100644 --- a/mne_bids_pipeline/steps/sensor/_99_group_average.py +++ b/mne_bids_pipeline/steps/sensor/_99_group_average.py @@ -107,7 +107,7 @@ def average_evokeds( task=cfg.task, acquisition=cfg.acq, run=None, - processing=cfg.proc, + processing="clean", recording=cfg.rec, space=cfg.space, suffix="ave", diff --git a/mne_bids_pipeline/tests/configs/config_ds000248_base.py b/mne_bids_pipeline/tests/configs/config_ds000248_base.py index 6ffd9644e..9888e1cee 100644 --- a/mne_bids_pipeline/tests/configs/config_ds000248_base.py +++ b/mne_bids_pipeline/tests/configs/config_ds000248_base.py @@ -23,7 +23,7 @@ def noise_cov(bp): # Use pre-stimulus period as noise source - bp = bp.copy().update(processing="clean", suffix="epo") + bp = bp.copy().update(suffix="epo") if not bp.fpath.exists(): bp.update(split="01") epo = mne.read_epochs(bp)