From 6a6af5a5693216b88487a51afd5037c1af3296f0 Mon Sep 17 00:00:00 2001 From: Ben Dichter Date: Mon, 12 Aug 2024 21:12:46 -0400 Subject: [PATCH 1/6] Add a docs page in the user guide about adding trials --- docs/user_guide/adding_trials.rst | 36 ++++++++++++++++++++++++++++++ docs/user_guide/datainterfaces.rst | 25 +++++++++++++++++++-- docs/user_guide/index.rst | 1 + docs/user_guide/nwbconverter.rst | 17 ++++++++++---- 4 files changed, 73 insertions(+), 6 deletions(-) create mode 100644 docs/user_guide/adding_trials.rst diff --git a/docs/user_guide/adding_trials.rst b/docs/user_guide/adding_trials.rst new file mode 100644 index 000000000..7fbc63c61 --- /dev/null +++ b/docs/user_guide/adding_trials.rst @@ -0,0 +1,36 @@ +.. _adding_trials: + +Adding Trials to NWB Files +========================== + +NWB allows you to store information about time intervals in a structured way. These structure are often used to store +information about trials, epochs, or other time intervals in the data. +You can add time intervals to an NWBFile object before writing it using PyNWB. +Here is an example of how to add trials to an NWBFile object. +Here is how you would add trials to an NWB file: + +.. code-block:: python + + # you can add custom columns to the trials table + nwbfile.add_trials_column(name="trial_type", description="the type of trial") + + nwbfile.add_trial(start_time=0.0, stop_time=1.0, trial_type="go") + nwbfile.add_trial(start_time=1.0, stop_time=2.0, trial_type="nogo") + +You can also add epochs or other types of time intervals to an NWB File. See +`PyNWB Annotating Time Intervals `_ +for more information. + +Once this information is added, you can write the NWB file to disk. + +.. code-block:: python + + from neuroconv.tools.nwb_helpers import configure_and_write_nwbfile + + configure_and_write_nwbfile(nwbfile, save_path="path/to/destination.nwb", backend="hdf5") + +.. note:: + + NWB generally recommends storing the full continuous stream of data in the NWB file, and then adding trials or + epochs as time intervals. Trial-aligning the data is then done on-the-fly when reading the file. This allows for + more flexibility in the analysis of the data. \ No newline at end of file diff --git a/docs/user_guide/datainterfaces.rst b/docs/user_guide/datainterfaces.rst index 8752bd387..3fa1d07c3 100644 --- a/docs/user_guide/datainterfaces.rst +++ b/docs/user_guide/datainterfaces.rst @@ -143,8 +143,8 @@ Here we can see that ``metadata["Ecephys"]["ElectrodeGroup"][0]["location"]`` is Use ``.get_metadata_schema()`` to get the schema of the metadata dictionary. This schema is a JSON-schema-like dictionary that specifies required and optional fields in the metadata dictionary. See :ref:`metadata schema ` for more information. -4. Run conversion -~~~~~~~~~~~~~~~~~ +4a. Run conversion +~~~~~~~~~~~~~~~~~~ The ``.run_conversion`` method takes the (edited) metadata dictionary and the path of an NWB file, and launches the actual data conversion into NWB. @@ -159,3 +159,24 @@ This method reads and writes large datasets piece-by-piece, so you can convert large datasets without overloading the computer's available RAM. It also uses good defaults for data chunking and lossless compression, reducing the file size of the output NWB file and optimizing the file for cloud compute. + +4b. Create an in-memory NWB file +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +If you want to create an in-memory NWB file, you can use the ``.create_nwbfile`` method. + +.. code-block:: python + + nwbfile = spikeglx_interface.create_nwbfile(metadata=metadata) + +This is useful for add data such as trials, epochs, or other time intervals to the NWB file. See +:ref:`Adding Time Intervals to NWB Files ` for more information. + +This does not load large datasets into memory. Those remain in the source files and are read piece-by-piece during the +write process. Once you make all the modifications you want to the NWBfile, you can save it to disk. The following code +automatically optimizes datasets for cloud compute and writes the file to disk. + +.. code-block:: python + + from neuroconv.tools.nwb_helpers import configure_and_write_nwbfile + + configure_and_write_nwbfile(nwbfile, save_path="path/to/destination.nwb", backend="hdf5") \ No newline at end of file diff --git a/docs/user_guide/index.rst b/docs/user_guide/index.rst index e8c0827c8..4077f49be 100644 --- a/docs/user_guide/index.rst +++ b/docs/user_guide/index.rst @@ -20,6 +20,7 @@ and synchronize data across multiple sources. datainterfaces nwbconverter + adding_trials temporal_alignment csvs expand_path diff --git a/docs/user_guide/nwbconverter.rst b/docs/user_guide/nwbconverter.rst index 2360905e6..e1db63945 100644 --- a/docs/user_guide/nwbconverter.rst +++ b/docs/user_guide/nwbconverter.rst @@ -44,21 +44,30 @@ keys of``data_interface_classes``. This creates an :py:class:`.NWBConverter` object that can aggregate and distribute across the data interfaces. To fetch metadata across all of the interfaces and merge -them together, call:: +them together, call. + +.. code-block:: python metadata = converter.get_metadata() -The metadata can then be manually modified with any additional user-input, just like ``DataInterface`` objects:: +The metadata can then be manually modified with any additional user-input, just like ``DataInterface`` objects. + +.. code-block:: python metadata["NWBFile"]["session_description"] = "NeuroConv tutorial." metadata["NWBFile"]["experimenter"] = "My name" metadata["Subject"]["subject_id"] = "ID of experimental subject" -The final metadata dictionary should follow the form defined by -``converter.get_metadata_schema()``. Now run the entire conversion with:: +The final metadata dictionary should follow the form defined by :meth:`.NWBConverter.get_metadata_schema`. +Now run the entire conversion with. + +.. code-block:: python converter.run_conversion(metadata=metadata, nwbfile_path="my_nwbfile.nwb") +Like ``DataInterface`` objects, :py:class:`.NWBConverter` objects can output an in-memory NWBFile object by +calling :meth:`.NWBConverter.create_nwbfile`. This can be useful for debugging or for further processing. + Though this example was only for two data streams (recording and spike-sorted data), it can easily extend to any number of sources, including video of a subject, extracted position estimates, stimuli, or any other data source. From 91dedef273e8dcf0d793ad02a555d64aa513a8ac Mon Sep 17 00:00:00 2001 From: Ben Dichter Date: Wed, 21 Aug 2024 10:08:37 -0400 Subject: [PATCH 2/6] improve typehints --- src/neuroconv/basedatainterface.py | 6 +- .../behavior/audio/audiointerface.py | 6 +- .../behavior/deeplabcut/_dlc_utils.py | 4 +- .../deeplabcut/deeplabcutdatainterface.py | 4 +- .../lightningpose/lightningposeconverter.py | 6 +- .../lightningposedatainterface.py | 2 +- .../behavior/neuralynx/nvt_utils.py | 8 +-- .../behavior/video/video_utils.py | 2 +- .../behavior/video/videodatainterface.py | 12 ++-- .../baserecordingextractorinterface.py | 10 +-- .../ecephys/basesortingextractorinterface.py | 8 +-- .../neuralynx/neuralynxdatainterface.py | 10 +-- .../neuroscope/neuroscopedatainterface.py | 2 +- .../openephys/openephysbinarydatainterface.py | 4 +- .../openephys/openephyslegacydatainterface.py | 4 +- .../ecephys/phy/phydatainterface.py | 2 +- .../ecephys/spikeglx/spikeglxconverter.py | 6 +- .../ecephys/spikeglx/spikeglxnidqinterface.py | 3 +- .../icephys/abf/abfdatainterface.py | 3 +- .../icephys/baseicephysinterface.py | 2 +- .../brukertiff/brukertiffdatainterface.py | 6 +- .../text/timeintervalsinterface.py | 6 +- src/neuroconv/nwbconverter.py | 18 ++--- .../tools/aws/_submit_aws_batch_job.py | 10 +-- src/neuroconv/tools/data_transfers/_dandi.py | 4 +- src/neuroconv/tools/data_transfers/_globus.py | 14 ++-- src/neuroconv/tools/hdmf.py | 10 +-- src/neuroconv/tools/importing.py | 8 +-- src/neuroconv/tools/neo/neo.py | 10 +-- .../_configuration_models/_base_backend.py | 16 ++--- .../_configuration_models/_base_dataset_io.py | 24 +++---- .../_configuration_models/_hdf5_backend.py | 4 +- .../_configuration_models/_hdf5_dataset_io.py | 6 +- .../_configuration_models/_zarr_backend.py | 4 +- .../_configuration_models/_zarr_dataset_io.py | 12 ++-- src/neuroconv/tools/optogenetics.py | 2 +- src/neuroconv/tools/path_expansion.py | 6 +- .../imagingextractordatachunkiterator.py | 2 +- .../tools/roiextractors/roiextractors.py | 6 +- .../tools/spikeinterface/spikeinterface.py | 66 +++++++++---------- ...pikeinterfacerecordingdatachunkiterator.py | 4 +- .../testing/_mock/_mock_dataset_models.py | 24 +++---- .../tools/testing/data_interface_mixins.py | 6 +- .../tools/testing/mock_interfaces.py | 6 +- src/neuroconv/tools/testing/mock_probes.py | 4 +- src/neuroconv/tools/text.py | 6 +- src/neuroconv/utils/json_schema.py | 12 ++-- 47 files changed, 197 insertions(+), 203 deletions(-) diff --git a/src/neuroconv/basedatainterface.py b/src/neuroconv/basedatainterface.py index 9de06e827..de6d0bad0 100644 --- a/src/neuroconv/basedatainterface.py +++ b/src/neuroconv/basedatainterface.py @@ -3,7 +3,7 @@ import uuid from abc import ABC, abstractmethod from pathlib import Path -from typing import Literal, Optional, Tuple, Union +from typing import Literal, Optional, Union from jsonschema.validators import validate from pynwb import NWBFile @@ -29,8 +29,8 @@ class BaseDataInterface(ABC): """Abstract class defining the structure of all DataInterfaces.""" display_name: Union[str, None] = None - keywords: Tuple[str] = tuple() - associated_suffixes: Tuple[str] = tuple() + keywords: tuple[str] = tuple() + associated_suffixes: tuple[str] = tuple() info: Union[str, None] = None @classmethod diff --git a/src/neuroconv/datainterfaces/behavior/audio/audiointerface.py b/src/neuroconv/datainterfaces/behavior/audio/audiointerface.py index 6054a65b9..d038916e7 100644 --- a/src/neuroconv/datainterfaces/behavior/audio/audiointerface.py +++ b/src/neuroconv/datainterfaces/behavior/audio/audiointerface.py @@ -1,6 +1,6 @@ import json from pathlib import Path -from typing import List, Literal, Optional +from typing import Literal, Optional import numpy as np import scipy @@ -104,7 +104,7 @@ def get_original_timestamps(self) -> np.ndarray: def get_timestamps(self) -> Optional[np.ndarray]: raise NotImplementedError("The AudioInterface does not yet support timestamps.") - def set_aligned_timestamps(self, aligned_timestamps: List[np.ndarray]): + def set_aligned_timestamps(self, aligned_timestamps: list[np.ndarray]): raise NotImplementedError("The AudioInterface does not yet support timestamps.") def set_aligned_starting_time(self, aligned_starting_time: float): @@ -131,7 +131,7 @@ def set_aligned_starting_time(self, aligned_starting_time: float): "Please set them using 'set_aligned_segment_starting_times'." ) - def set_aligned_segment_starting_times(self, aligned_segment_starting_times: List[float]): + def set_aligned_segment_starting_times(self, aligned_segment_starting_times: list[float]): """ Align the individual starting time for each audio file in this interface relative to the common session start time. diff --git a/src/neuroconv/datainterfaces/behavior/deeplabcut/_dlc_utils.py b/src/neuroconv/datainterfaces/behavior/deeplabcut/_dlc_utils.py index ddea0751d..fd1f3078f 100644 --- a/src/neuroconv/datainterfaces/behavior/deeplabcut/_dlc_utils.py +++ b/src/neuroconv/datainterfaces/behavior/deeplabcut/_dlc_utils.py @@ -2,7 +2,7 @@ import pickle import warnings from pathlib import Path -from typing import List, Optional, Union +from typing import Optional, Union import numpy as np import pandas as pd @@ -306,7 +306,7 @@ def add_subject_to_nwbfile( h5file: FilePathType, individual_name: str, config_file: FilePathType, - timestamps: Optional[Union[List, np.ndarray]] = None, + timestamps: Optional[Union[list, np.ndarray]] = None, pose_estimation_container_kwargs: Optional[dict] = None, ) -> NWBFile: """ diff --git a/src/neuroconv/datainterfaces/behavior/deeplabcut/deeplabcutdatainterface.py b/src/neuroconv/datainterfaces/behavior/deeplabcut/deeplabcutdatainterface.py index a0bfa3fb4..9333c92e7 100644 --- a/src/neuroconv/datainterfaces/behavior/deeplabcut/deeplabcutdatainterface.py +++ b/src/neuroconv/datainterfaces/behavior/deeplabcut/deeplabcutdatainterface.py @@ -1,5 +1,5 @@ from pathlib import Path -from typing import List, Optional, Union +from typing import Optional, Union import numpy as np from pynwb.file import NWBFile @@ -76,7 +76,7 @@ def get_timestamps(self) -> np.ndarray: "Unable to retrieve timestamps for this interface! Define the `get_timestamps` method for this interface." ) - def set_aligned_timestamps(self, aligned_timestamps: Union[List, np.ndarray]): + def set_aligned_timestamps(self, aligned_timestamps: Union[list, np.ndarray]): """ Set aligned timestamps vector for DLC data with user defined timestamps diff --git a/src/neuroconv/datainterfaces/behavior/lightningpose/lightningposeconverter.py b/src/neuroconv/datainterfaces/behavior/lightningpose/lightningposeconverter.py index 3907134a6..d8b7045ba 100644 --- a/src/neuroconv/datainterfaces/behavior/lightningpose/lightningposeconverter.py +++ b/src/neuroconv/datainterfaces/behavior/lightningpose/lightningposeconverter.py @@ -1,5 +1,5 @@ from copy import deepcopy -from typing import List, Optional +from typing import Optional from pynwb import NWBFile @@ -106,8 +106,8 @@ def add_to_nwbfile( reference_frame: Optional[str] = None, confidence_definition: Optional[str] = None, external_mode: bool = True, - starting_frames_original_videos: Optional[List[int]] = None, - starting_frames_labeled_videos: Optional[List[int]] = None, + starting_frames_original_videos: Optional[list[int]] = None, + starting_frames_labeled_videos: Optional[list[int]] = None, stub_test: bool = False, ): original_video_interface = self.data_interface_objects["OriginalVideo"] diff --git a/src/neuroconv/datainterfaces/behavior/lightningpose/lightningposedatainterface.py b/src/neuroconv/datainterfaces/behavior/lightningpose/lightningposedatainterface.py index 28c93db9c..e15425785 100644 --- a/src/neuroconv/datainterfaces/behavior/lightningpose/lightningposedatainterface.py +++ b/src/neuroconv/datainterfaces/behavior/lightningpose/lightningposedatainterface.py @@ -116,7 +116,7 @@ def _load_source_data(self): pose_estimation_data = pd.read_csv(self.file_path, header=[0, 1, 2]) return pose_estimation_data - def _get_original_video_shape(self) -> Tuple[int, int]: + def _get_original_video_shape(self) -> tuple[int, int]: with self._vc(file_path=str(self.original_video_file_path)) as video: video_shape = video.get_frame_shape() # image size of the original video is in height x width diff --git a/src/neuroconv/datainterfaces/behavior/neuralynx/nvt_utils.py b/src/neuroconv/datainterfaces/behavior/neuralynx/nvt_utils.py index 278433b5c..ddc7a7f16 100644 --- a/src/neuroconv/datainterfaces/behavior/neuralynx/nvt_utils.py +++ b/src/neuroconv/datainterfaces/behavior/neuralynx/nvt_utils.py @@ -5,7 +5,7 @@ import os from datetime import datetime from shutil import copy -from typing import Dict, List, Union +from typing import Union import numpy as np @@ -28,7 +28,7 @@ ] -def read_header(filename: str) -> Dict[str, Union[str, datetime, float, int, List[int]]]: +def read_header(filename: str) -> dict[str, Union[str, datetime, float, int, list[int]]]: """ Parses a Neuralynx NVT File Header and returns it as a dictionary. @@ -85,7 +85,7 @@ def parse_bool(x): return out -def read_data(filename: str) -> Dict[str, np.ndarray]: +def read_data(filename: str) -> dict[str, np.ndarray]: """ Reads a NeuroLynx NVT file and returns its data. @@ -99,7 +99,7 @@ def read_data(filename: str) -> Dict[str, np.ndarray]: Returns ------- - Dict[str, np.ndarray] + dict[str, np.ndarray] Dictionary containing the parsed data. Raises diff --git a/src/neuroconv/datainterfaces/behavior/video/video_utils.py b/src/neuroconv/datainterfaces/behavior/video/video_utils.py index 5000c468b..fa1ec3e36 100644 --- a/src/neuroconv/datainterfaces/behavior/video/video_utils.py +++ b/src/neuroconv/datainterfaces/behavior/video/video_utils.py @@ -224,7 +224,7 @@ def _get_frame_details(self): min_frame_size_mb = (math.prod(frame_shape) * self._get_dtype().itemsize) / 1e6 return min_frame_size_mb, frame_shape - def _get_data(self, selection: Tuple[slice]) -> np.ndarray: + def _get_data(self, selection: tuple[slice]) -> np.ndarray: start_frame = selection[0].start end_frame = selection[0].stop frames = np.empty(shape=[end_frame - start_frame, *self._maxshape[1:]]) diff --git a/src/neuroconv/datainterfaces/behavior/video/videodatainterface.py b/src/neuroconv/datainterfaces/behavior/video/videodatainterface.py index 56880b820..13dec9f5d 100644 --- a/src/neuroconv/datainterfaces/behavior/video/videodatainterface.py +++ b/src/neuroconv/datainterfaces/behavior/video/videodatainterface.py @@ -1,7 +1,7 @@ import warnings from copy import deepcopy from pathlib import Path -from typing import List, Literal, Optional +from typing import Literal, Optional import numpy as np import psutil @@ -103,7 +103,7 @@ def get_metadata(self): return metadata - def get_original_timestamps(self, stub_test: bool = False) -> List[np.ndarray]: + def get_original_timestamps(self, stub_test: bool = False) -> list[np.ndarray]: """ Retrieve the original unaltered timestamps for the data in this interface. @@ -158,7 +158,7 @@ def get_timing_type(self) -> Literal["starting_time and rate", "timestamps"]: "Please specify the temporal alignment of each video." ) - def get_timestamps(self, stub_test: bool = False) -> List[np.ndarray]: + def get_timestamps(self, stub_test: bool = False) -> list[np.ndarray]: """ Retrieve the timestamps for the data in this interface. @@ -175,7 +175,7 @@ def get_timestamps(self, stub_test: bool = False) -> List[np.ndarray]: """ return self._timestamps or self.get_original_timestamps(stub_test=stub_test) - def set_aligned_timestamps(self, aligned_timestamps: List[np.ndarray]): + def set_aligned_timestamps(self, aligned_timestamps: list[np.ndarray]): """ Replace all timestamps for this interface with those aligned to the common session start time. @@ -220,7 +220,7 @@ def set_aligned_starting_time(self, aligned_starting_time: float, stub_test: boo else: raise ValueError("There are no timestamps or starting times set to shift by a common value!") - def set_aligned_segment_starting_times(self, aligned_segment_starting_times: List[float], stub_test: bool = False): + def set_aligned_segment_starting_times(self, aligned_segment_starting_times: list[float], stub_test: bool = False): """ Align the individual starting time for each video (segment) in this interface relative to the common session start time. @@ -263,7 +263,7 @@ def add_to_nwbfile( metadata: Optional[dict] = None, stub_test: bool = False, external_mode: bool = True, - starting_frames: Optional[List[int]] = None, + starting_frames: Optional[list[int]] = None, chunk_data: bool = True, module_name: Optional[str] = None, module_description: Optional[str] = None, diff --git a/src/neuroconv/datainterfaces/ecephys/baserecordingextractorinterface.py b/src/neuroconv/datainterfaces/ecephys/baserecordingextractorinterface.py index b21210d77..354d78f80 100644 --- a/src/neuroconv/datainterfaces/ecephys/baserecordingextractorinterface.py +++ b/src/neuroconv/datainterfaces/ecephys/baserecordingextractorinterface.py @@ -1,4 +1,4 @@ -from typing import List, Literal, Optional, Union +from typing import Literal, Optional, Union import numpy as np from pynwb import NWBFile @@ -106,7 +106,7 @@ def get_metadata(self) -> DeepDict: return metadata - def get_original_timestamps(self) -> Union[np.ndarray, List[np.ndarray]]: + def get_original_timestamps(self) -> Union[np.ndarray, list[np.ndarray]]: """ Retrieve the original unaltered timestamps for the data in this interface. @@ -128,7 +128,7 @@ def get_original_timestamps(self) -> Union[np.ndarray, List[np.ndarray]]: for segment_index in range(self._number_of_segments) ] - def get_timestamps(self) -> Union[np.ndarray, List[np.ndarray]]: + def get_timestamps(self) -> Union[np.ndarray, list[np.ndarray]]: """ Retrieve the timestamps for the data in this interface. @@ -152,7 +152,7 @@ def set_aligned_timestamps(self, aligned_timestamps: np.ndarray): self.recording_extractor.set_times(times=aligned_timestamps) - def set_aligned_segment_timestamps(self, aligned_segment_timestamps: List[np.ndarray]): + def set_aligned_segment_timestamps(self, aligned_segment_timestamps: list[np.ndarray]): """ Replace all timestamps for all segments in this interface with those aligned to the common session start time. @@ -185,7 +185,7 @@ def set_aligned_starting_time(self, aligned_starting_time: float): ] ) - def set_aligned_segment_starting_times(self, aligned_segment_starting_times: List[float]): + def set_aligned_segment_starting_times(self, aligned_segment_starting_times: list[float]): """ Align the starting time for each segment in this interface relative to the common session start time. diff --git a/src/neuroconv/datainterfaces/ecephys/basesortingextractorinterface.py b/src/neuroconv/datainterfaces/ecephys/basesortingextractorinterface.py index 9c3c06849..b3cd25d24 100644 --- a/src/neuroconv/datainterfaces/ecephys/basesortingextractorinterface.py +++ b/src/neuroconv/datainterfaces/ecephys/basesortingextractorinterface.py @@ -1,5 +1,5 @@ from copy import deepcopy -from typing import List, Literal, Optional, Union +from typing import Literal, Optional, Union import numpy as np from pynwb import NWBFile @@ -83,7 +83,7 @@ def get_original_timestamps(self) -> np.ndarray: "Unable to fetch original timestamps for a SortingInterface since it relies upon an attached recording." ) - def get_timestamps(self) -> Union[np.ndarray, List[np.ndarray]]: + def get_timestamps(self) -> Union[np.ndarray, list[np.ndarray]]: if not self.sorting_extractor.has_recording(): raise NotImplementedError( "In order to align timestamps for a SortingInterface, it must have a recording " @@ -138,7 +138,7 @@ def set_aligned_timestamps(self, aligned_timestamps: np.ndarray): times=aligned_timestamps[segment_index], segment_index=segment_index ) - def set_aligned_segment_timestamps(self, aligned_segment_timestamps: List[np.ndarray]): + def set_aligned_segment_timestamps(self, aligned_segment_timestamps: list[np.ndarray]): """ Replace all timestamps for all segments in this interface with those aligned to the common session start time. @@ -182,7 +182,7 @@ def set_aligned_starting_time(self, aligned_starting_time: float): else: sorting_segment._t_start += aligned_starting_time - def set_aligned_segment_starting_times(self, aligned_segment_starting_times: List[float]): + def set_aligned_segment_starting_times(self, aligned_segment_starting_times: list[float]): """ Align the starting time for each segment in this interface relative to the common session start time. diff --git a/src/neuroconv/datainterfaces/ecephys/neuralynx/neuralynxdatainterface.py b/src/neuroconv/datainterfaces/ecephys/neuralynx/neuralynxdatainterface.py index b999944e1..da0573249 100644 --- a/src/neuroconv/datainterfaces/ecephys/neuralynx/neuralynxdatainterface.py +++ b/src/neuroconv/datainterfaces/ecephys/neuralynx/neuralynxdatainterface.py @@ -1,5 +1,5 @@ import json -from typing import List, Optional +from typing import Optional import numpy as np from pydantic import DirectoryPath @@ -18,7 +18,7 @@ class NeuralynxRecordingInterface(BaseRecordingExtractorInterface): info = "Interface for Neuralynx recording data." @classmethod - def get_stream_names(cls, folder_path: DirectoryPath) -> List[str]: + def get_stream_names(cls, folder_path: DirectoryPath) -> list[str]: from spikeinterface.extractors import NeuralynxRecordingExtractor stream_names, _ = NeuralynxRecordingExtractor.get_streams(folder_path=folder_path) @@ -158,16 +158,16 @@ def extract_neo_header_metadata(neo_reader) -> dict: return common_header -def _dict_intersection(dict_list: List) -> dict: +def _dict_intersection(dict_list: list[dict]) -> dict: """ Intersect dict_list and return only common keys and values Parameters ---------- - dict_list: list of dicitionaries each representing a header + dict_list: list of dictionaries each representing a header Returns ------- dict: - Dictionary containing key-value pairs common to all input dicitionary_list + Dictionary containing key-value pairs common to all input dictionary_list """ # Collect keys appearing in all dictionaries diff --git a/src/neuroconv/datainterfaces/ecephys/neuroscope/neuroscopedatainterface.py b/src/neuroconv/datainterfaces/ecephys/neuroscope/neuroscopedatainterface.py index cfb09e009..49d907ca6 100644 --- a/src/neuroconv/datainterfaces/ecephys/neuroscope/neuroscopedatainterface.py +++ b/src/neuroconv/datainterfaces/ecephys/neuroscope/neuroscopedatainterface.py @@ -269,7 +269,7 @@ def __init__( self, folder_path: FolderPathType, keep_mua_units: bool = True, - exclude_shanks: Optional[list] = None, + exclude_shanks: Optional[list[int]] = None, xml_file_path: Optional[FilePathType] = None, verbose: bool = True, ): diff --git a/src/neuroconv/datainterfaces/ecephys/openephys/openephysbinarydatainterface.py b/src/neuroconv/datainterfaces/ecephys/openephys/openephysbinarydatainterface.py index 88a700dd9..1de555489 100644 --- a/src/neuroconv/datainterfaces/ecephys/openephys/openephysbinarydatainterface.py +++ b/src/neuroconv/datainterfaces/ecephys/openephys/openephysbinarydatainterface.py @@ -1,4 +1,4 @@ -from typing import List, Optional +from typing import Optional from ..baserecordingextractorinterface import BaseRecordingExtractorInterface from ....utils import FolderPathType, get_schema_from_method_signature @@ -18,7 +18,7 @@ class OpenEphysBinaryRecordingInterface(BaseRecordingExtractorInterface): ExtractorName = "OpenEphysBinaryRecordingExtractor" @classmethod - def get_stream_names(cls, folder_path: FolderPathType) -> List[str]: + def get_stream_names(cls, folder_path: FolderPathType) -> list[str]: from spikeinterface.extractors import OpenEphysBinaryRecordingExtractor stream_names, _ = OpenEphysBinaryRecordingExtractor.get_streams(folder_path=folder_path) diff --git a/src/neuroconv/datainterfaces/ecephys/openephys/openephyslegacydatainterface.py b/src/neuroconv/datainterfaces/ecephys/openephys/openephyslegacydatainterface.py index 5d680f93b..2e836838c 100644 --- a/src/neuroconv/datainterfaces/ecephys/openephys/openephyslegacydatainterface.py +++ b/src/neuroconv/datainterfaces/ecephys/openephys/openephyslegacydatainterface.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import List, Optional +from typing import Optional from warnings import warn from ..baserecordingextractorinterface import BaseRecordingExtractorInterface @@ -18,7 +18,7 @@ class OpenEphysLegacyRecordingInterface(BaseRecordingExtractorInterface): info = "Interface for converting legacy OpenEphys recording data." @classmethod - def get_stream_names(cls, folder_path: FolderPathType) -> List[str]: + def get_stream_names(cls, folder_path: FolderPathType) -> list[str]: from spikeinterface.extractors import OpenEphysLegacyRecordingExtractor stream_names, _ = OpenEphysLegacyRecordingExtractor.get_streams(folder_path=folder_path) diff --git a/src/neuroconv/datainterfaces/ecephys/phy/phydatainterface.py b/src/neuroconv/datainterfaces/ecephys/phy/phydatainterface.py index cac24faa2..b6ffec1d7 100644 --- a/src/neuroconv/datainterfaces/ecephys/phy/phydatainterface.py +++ b/src/neuroconv/datainterfaces/ecephys/phy/phydatainterface.py @@ -26,7 +26,7 @@ def get_source_schema(cls) -> dict: def __init__( self, folder_path: FolderPathType, - exclude_cluster_groups: Optional[list] = None, + exclude_cluster_groups: Optional[list[str]] = None, verbose: bool = True, ): """ diff --git a/src/neuroconv/datainterfaces/ecephys/spikeglx/spikeglxconverter.py b/src/neuroconv/datainterfaces/ecephys/spikeglx/spikeglxconverter.py index eccd430c4..2e47e05b3 100644 --- a/src/neuroconv/datainterfaces/ecephys/spikeglx/spikeglxconverter.py +++ b/src/neuroconv/datainterfaces/ecephys/spikeglx/spikeglxconverter.py @@ -1,5 +1,5 @@ from pathlib import Path -from typing import List, Optional +from typing import Optional from .spikeglxdatainterface import SpikeGLXRecordingInterface from .spikeglxnidqinterface import SpikeGLXNIDQInterface @@ -26,7 +26,7 @@ def get_source_schema(cls): return source_schema @classmethod - def get_streams(cls, folder_path: FolderPathType) -> List[str]: + def get_streams(cls, folder_path: FolderPathType) -> list[str]: from spikeinterface.extractors import SpikeGLXRecordingExtractor return SpikeGLXRecordingExtractor.get_streams(folder_path=folder_path)[0] @@ -34,7 +34,7 @@ def get_streams(cls, folder_path: FolderPathType) -> List[str]: def __init__( self, folder_path: FolderPathType, - streams: Optional[List[str]] = None, + streams: Optional[list[str]] = None, verbose: bool = False, ): """ diff --git a/src/neuroconv/datainterfaces/ecephys/spikeglx/spikeglxnidqinterface.py b/src/neuroconv/datainterfaces/ecephys/spikeglx/spikeglxnidqinterface.py index 61bf6b056..d6391e0de 100644 --- a/src/neuroconv/datainterfaces/ecephys/spikeglx/spikeglxnidqinterface.py +++ b/src/neuroconv/datainterfaces/ecephys/spikeglx/spikeglxnidqinterface.py @@ -1,5 +1,4 @@ from pathlib import Path -from typing import List import numpy as np @@ -92,7 +91,7 @@ def get_metadata(self) -> dict: ] = "Raw acquisition traces from the NIDQ (.nidq.bin) channels." return metadata - def get_channel_names(self) -> List[str]: + def get_channel_names(self) -> list[str]: """Return a list of channel names as set in the recording extractor.""" return list(self.recording_extractor.get_channel_ids()) diff --git a/src/neuroconv/datainterfaces/icephys/abf/abfdatainterface.py b/src/neuroconv/datainterfaces/icephys/abf/abfdatainterface.py index 3b9457901..c3a7a7a84 100644 --- a/src/neuroconv/datainterfaces/icephys/abf/abfdatainterface.py +++ b/src/neuroconv/datainterfaces/icephys/abf/abfdatainterface.py @@ -1,7 +1,6 @@ import json from datetime import datetime, timedelta from pathlib import Path -from typing import List from warnings import warn from ..baseicephysinterface import BaseIcephysInterface @@ -157,7 +156,7 @@ def set_aligned_starting_time(self, aligned_starting_time: float): reader._t_starts[segment_index] += aligned_starting_time def set_aligned_segment_starting_times( - self, aligned_segment_starting_times: List[List[float]], stub_test: bool = False + self, aligned_segment_starting_times: list[list[float]], stub_test: bool = False ): """ Align the individual starting time for each video in this interface relative to the common session start time. diff --git a/src/neuroconv/datainterfaces/icephys/baseicephysinterface.py b/src/neuroconv/datainterfaces/icephys/baseicephysinterface.py index 341fc4c0d..d574bdabf 100644 --- a/src/neuroconv/datainterfaces/icephys/baseicephysinterface.py +++ b/src/neuroconv/datainterfaces/icephys/baseicephysinterface.py @@ -92,7 +92,7 @@ def add_to_nwbfile( nwbfile: NWBFile, metadata: dict = None, icephys_experiment_type: str = "voltage_clamp", - skip_electrodes: Tuple[int] = (), + skip_electrodes: tuple[int] = (), ): """ Primary function for converting raw (unprocessed) intracellular data to the NWB standard. diff --git a/src/neuroconv/datainterfaces/ophys/brukertiff/brukertiffdatainterface.py b/src/neuroconv/datainterfaces/ophys/brukertiff/brukertiffdatainterface.py index 2c663d7e6..de555cd16 100644 --- a/src/neuroconv/datainterfaces/ophys/brukertiff/brukertiffdatainterface.py +++ b/src/neuroconv/datainterfaces/ophys/brukertiff/brukertiffdatainterface.py @@ -1,4 +1,4 @@ -from typing import List, Literal, Optional +from typing import Literal, Optional from dateutil.parser import parse @@ -64,7 +64,7 @@ def __init__( self._stream_name = self.imaging_extractor.stream_name.replace("_", "") self._image_size = self.imaging_extractor.get_image_size() - def _determine_position_current(self) -> List[float]: + def _determine_position_current(self) -> list[float]: """ Returns y, x, and z position values. The unit of values is in the microscope reference frame. """ @@ -222,7 +222,7 @@ def __init__( self._stream_name = self.imaging_extractor.stream_name.replace("_", "") self._image_size = self.imaging_extractor.get_image_size() - def _determine_position_current(self) -> List[float]: + def _determine_position_current(self) -> list[float]: """ Returns y, x, and z position values. The unit of values is in the microscope reference frame. """ diff --git a/src/neuroconv/datainterfaces/text/timeintervalsinterface.py b/src/neuroconv/datainterfaces/text/timeintervalsinterface.py index dd7f491f8..05195aa33 100644 --- a/src/neuroconv/datainterfaces/text/timeintervalsinterface.py +++ b/src/neuroconv/datainterfaces/text/timeintervalsinterface.py @@ -1,6 +1,6 @@ from abc import abstractmethod from pathlib import Path -from typing import Dict, Optional +from typing import Optional import numpy as np from pynwb import NWBFile @@ -120,8 +120,8 @@ def add_to_nwbfile( nwbfile: NWBFile, metadata: Optional[dict] = None, tag: str = "trials", - column_name_mapping: Dict[str, str] = None, - column_descriptions: Dict[str, str] = None, + column_name_mapping: dict[str, str] = None, + column_descriptions: dict[str, str] = None, ) -> NWBFile: """ Run the NWB conversion for the instantiated data interface. diff --git a/src/neuroconv/nwbconverter.py b/src/neuroconv/nwbconverter.py index 689ac6050..e052259bf 100644 --- a/src/neuroconv/nwbconverter.py +++ b/src/neuroconv/nwbconverter.py @@ -4,7 +4,7 @@ import warnings from collections import Counter from pathlib import Path -from typing import Dict, List, Literal, Optional, Tuple, Union +from typing import Literal, Optional, Union from jsonschema import validate from pynwb import NWBFile @@ -35,8 +35,8 @@ class NWBConverter: """Primary class for all NWB conversion classes.""" display_name: Union[str, None] = None - keywords: Tuple[str] = tuple() - associated_suffixes: Tuple[str] = tuple() + keywords: tuple[str] = tuple() + associated_suffixes: tuple[str] = tuple() info: Union[str, None] = None data_interface_classes = None @@ -56,11 +56,11 @@ def get_source_schema(cls) -> dict: return source_schema @classmethod - def validate_source(cls, source_data: Dict[str, dict], verbose: bool = True): + def validate_source(cls, source_data: dict[str, dict], verbose: bool = True): """Validate source_data against Converter source_schema.""" cls._validate_source_data(source_data=source_data, verbose=verbose) - def _validate_source_data(self, source_data: Dict[str, dict], verbose: bool = True): + def _validate_source_data(self, source_data: dict[str, dict], verbose: bool = True): encoder = NWBSourceDataEncoder() # The encoder produces a serialized object, so we deserialized it for comparison @@ -72,7 +72,7 @@ def _validate_source_data(self, source_data: Dict[str, dict], verbose: bool = Tr if verbose: print("Source data is valid!") - def __init__(self, source_data: Dict[str, dict], verbose: bool = True): + def __init__(self, source_data: dict[str, dict], verbose: bool = True): """Validate source_data against source_schema and initialize all data interfaces.""" self.verbose = verbose self._validate_source_data(source_data=source_data, verbose=self.verbose) @@ -101,7 +101,7 @@ def get_metadata(self) -> DeepDict: metadata = dict_deep_update(metadata, interface_metadata) return metadata - def validate_metadata(self, metadata: Dict[str, dict], append_mode: bool = False): + def validate_metadata(self, metadata: dict[str, dict], append_mode: bool = False): """Validate metadata against Converter metadata_schema.""" encoder = NWBMetaDataEncoder() # The encoder produces a serialized object, so we deserialized it for comparison @@ -134,7 +134,7 @@ def get_conversion_options_schema(self) -> dict: return conversion_options_schema - def validate_conversion_options(self, conversion_options: Dict[str, dict]): + def validate_conversion_options(self, conversion_options: dict[str, dict]): """Validate conversion_options against Converter conversion_options_schema.""" validate(instance=conversion_options or {}, schema=self.get_conversion_options_schema()) if self.verbose: @@ -288,7 +288,7 @@ def get_source_schema(cls) -> dict: def validate_source(cls): raise NotImplementedError("Source data not available with previously initialized classes.") - def __init__(self, data_interfaces: Union[List[BaseDataInterface], Dict[str, BaseDataInterface]], verbose=True): + def __init__(self, data_interfaces: Union[list[BaseDataInterface], dict[str, BaseDataInterface]], verbose=True): self.verbose = verbose if isinstance(data_interfaces, list): # Create unique names for each interface diff --git a/src/neuroconv/tools/aws/_submit_aws_batch_job.py b/src/neuroconv/tools/aws/_submit_aws_batch_job.py index 9b4dfe81a..0d36bee7f 100644 --- a/src/neuroconv/tools/aws/_submit_aws_batch_job.py +++ b/src/neuroconv/tools/aws/_submit_aws_batch_job.py @@ -4,7 +4,7 @@ import os import time from datetime import datetime -from typing import Dict, List, Optional +from typing import Optional from uuid import uuid4 @@ -12,9 +12,9 @@ def submit_aws_batch_job( *, job_name: str, docker_image: str, - commands: Optional[List[str]] = None, - environment_variables: Optional[Dict[str, str]] = None, - job_dependencies: Optional[List[Dict[str, str]]] = None, + commands: Optional[list[str]] = None, + environment_variables: Optional[dict[str, str]] = None, + job_dependencies: Optional[list[dict[str, str]]] = None, status_tracker_table_name: str = "neuroconv_batch_status_tracker", iam_role_name: str = "neuroconv_batch_role", compute_environment_name: str = "neuroconv_batch_environment", @@ -24,7 +24,7 @@ def submit_aws_batch_job( minimum_worker_cpus: int = 4, submission_id: Optional[str] = None, region: Optional[str] = None, -) -> Dict[str, str]: +) -> dict[str, str]: """ Submit a job to AWS Batch for processing. diff --git a/src/neuroconv/tools/data_transfers/_dandi.py b/src/neuroconv/tools/data_transfers/_dandi.py index 291cb5e11..f77c83fc9 100644 --- a/src/neuroconv/tools/data_transfers/_dandi.py +++ b/src/neuroconv/tools/data_transfers/_dandi.py @@ -4,7 +4,7 @@ from pathlib import Path from shutil import rmtree from tempfile import mkdtemp -from typing import List, Optional, Union +from typing import Optional, Union from warnings import warn from pynwb import NWBHDF5IO @@ -21,7 +21,7 @@ def automatic_dandi_upload( cleanup: bool = False, number_of_jobs: Union[int, None] = None, number_of_threads: Union[int, None] = None, -) -> List[Path]: +) -> list[Path]: """ Fully automated upload of NWB files to a Dandiset. diff --git a/src/neuroconv/tools/data_transfers/_globus.py b/src/neuroconv/tools/data_transfers/_globus.py index 3429127f1..62bb654ed 100644 --- a/src/neuroconv/tools/data_transfers/_globus.py +++ b/src/neuroconv/tools/data_transfers/_globus.py @@ -4,7 +4,7 @@ import re from pathlib import Path from time import sleep, time -from typing import Dict, List, Tuple, Union +from typing import Union from pydantic import DirectoryPath from tqdm import tqdm @@ -15,7 +15,7 @@ def get_globus_dataset_content_sizes( globus_endpoint_id: str, path: str, recursive: bool = True, timeout: float = 120.0 -) -> Dict[str, int]: # pragma: no cover +) -> dict[str, int]: # pragma: no cover """ May require external login via 'globus login' from CLI. @@ -35,13 +35,13 @@ def get_globus_dataset_content_sizes( def transfer_globus_content( source_endpoint_id: str, - source_files: Union[str, List[List[str]]], + source_files: Union[str, list[list[str]]], destination_endpoint_id: str, destination_folder: DirectoryPath, display_progress: bool = True, progress_update_rate: float = 60.0, progress_update_timeout: float = 600.0, -) -> Tuple[bool, List[str]]: # pragma: no cover +) -> tuple[bool, list[str]]: # pragma: no cover """ Track progress for transferring content from source_endpoint_id to destination_endpoint_id:destination_folder. @@ -81,10 +81,10 @@ def transfer_globus_content( def _submit_transfer_request( source_endpoint_id: str, - source_files: Union[str, List[List[str]]], + source_files: Union[str, list[list[str]]], destination_endpoint_id: str, destination_folder_path: Path, - ) -> Dict[str, int]: + ) -> dict[str, int]: """Send transfer request to Globus.""" folder_content_sizes = dict() task_total_sizes = dict() @@ -134,7 +134,7 @@ def _submit_transfer_request( return task_total_sizes def _track_transfer( - task_total_sizes: Dict[str, int], + task_total_sizes: dict[str, int], display_progress: bool = True, progress_update_rate: float = 60.0, progress_update_timeout: float = 600.0, diff --git a/src/neuroconv/tools/hdmf.py b/src/neuroconv/tools/hdmf.py index 64db7b66e..a6ddb7a07 100644 --- a/src/neuroconv/tools/hdmf.py +++ b/src/neuroconv/tools/hdmf.py @@ -9,14 +9,14 @@ class GenericDataChunkIterator(HDMFGenericDataChunkIterator): - def _get_default_buffer_shape(self, buffer_gb: float = 1.0) -> Tuple[int]: + def _get_default_buffer_shape(self, buffer_gb: float = 1.0) -> tuple[int]: return self.estimate_default_buffer_shape( buffer_gb=buffer_gb, chunk_shape=self.chunk_shape, maxshape=self.maxshape, dtype=self.dtype ) # TODO: move this to the core iterator in HDMF so it can be easily swapped out as well as run on its own @staticmethod - def estimate_default_chunk_shape(chunk_mb: float, maxshape: Tuple[int, ...], dtype: np.dtype) -> Tuple[int, ...]: + def estimate_default_chunk_shape(chunk_mb: float, maxshape: tuple[int, ...], dtype: np.dtype) -> tuple[int, ...]: """ Select chunk shape with size in MB less than the threshold of chunk_mb. @@ -47,8 +47,8 @@ def estimate_default_chunk_shape(chunk_mb: float, maxshape: Tuple[int, ...], dty # TODO: move this to the core iterator in HDMF so it can be easily swapped out as well as run on its own @staticmethod def estimate_default_buffer_shape( - buffer_gb: float, chunk_shape: Tuple[int, ...], maxshape: Tuple[int, ...], dtype: np.dtype - ) -> Tuple[int, ...]: + buffer_gb: float, chunk_shape: tuple[int, ...], maxshape: tuple[int, ...], dtype: np.dtype + ) -> tuple[int, ...]: # Elevate any overflow warnings to trigger error. # This is usually an indicator of something going terribly wrong with the estimation calculations and should be # avoided at all costs. @@ -149,5 +149,5 @@ def _get_dtype(self) -> np.dtype: def _get_maxshape(self) -> tuple: return self.data.shape - def _get_data(self, selection: Tuple[slice]) -> np.ndarray: + def _get_data(self, selection: tuple[slice]) -> np.ndarray: return self.data[selection] diff --git a/src/neuroconv/tools/importing.py b/src/neuroconv/tools/importing.py index 3b4e67b9d..04a3cbf21 100644 --- a/src/neuroconv/tools/importing.py +++ b/src/neuroconv/tools/importing.py @@ -6,7 +6,7 @@ from importlib.util import find_spec from platform import processor, python_version from types import ModuleType -from typing import Dict, List, Optional, Tuple, Union +from typing import Optional, Union from packaging import version @@ -41,8 +41,8 @@ def is_package_installed(package_name: str) -> bool: def get_package( package_name: str, installation_instructions: Optional[str] = None, - excluded_python_versions: Optional[List[str]] = None, - excluded_platforms_and_python_versions: Optional[Dict[str, Union[List[str], Dict[str, List[str]]]]] = None, + excluded_python_versions: Optional[list[str]] = None, + excluded_platforms_and_python_versions: Optional[dict[str, Union[list[str], dict[str, list[str]]]]] = None, ) -> ModuleType: """ Check if package is installed and return module if so. @@ -128,7 +128,7 @@ def get_package( ) -def get_format_summaries() -> Dict[str, Dict[str, Union[str, Tuple[str, ...], None]]]: +def get_format_summaries() -> dict[str, dict[str, Union[str, tuple[str, ...], None]]]: """Simple helper function for compiling high level summaries of all format interfaces and converters.""" # Local scope import to avoid circularity from ..converters import converter_list diff --git a/src/neuroconv/tools/neo/neo.py b/src/neuroconv/tools/neo/neo.py index bcb27a40b..2fa9274fc 100644 --- a/src/neuroconv/tools/neo/neo.py +++ b/src/neuroconv/tools/neo/neo.py @@ -10,7 +10,7 @@ import pynwb from ..nwb_helpers import add_device_from_metadata -from ...utils import OptionalFilePathType +from ...utils import FilePathType response_classes = dict( voltage_clamp=pynwb.icephys.VoltageClampSeries, @@ -65,7 +65,7 @@ def get_number_of_segments(neo_reader, block: int = 0) -> int: return neo_reader.header["nb_segment"][block] -def get_command_traces(neo_reader, segment: int = 0, cmd_channel: int = 0) -> Tuple[list, str, str]: +def get_command_traces(neo_reader, segment: int = 0, cmd_channel: int = 0) -> tuple[list, str, str]: """ Get command traces (e.g. voltage clamp command traces). @@ -213,7 +213,7 @@ def add_icephys_recordings( metadata: dict = None, icephys_experiment_type: str = "voltage_clamp", stimulus_type: str = "not described", - skip_electrodes: Tuple[int] = (), + skip_electrodes: tuple[int] = (), compression: Optional[str] = None, # TODO: remove completely after 10/1/2024 ): """ @@ -383,7 +383,7 @@ def add_neo_to_nwb( compression: Optional[str] = None, # TODO: remove completely after 10/1/2024 icephys_experiment_type: str = "voltage_clamp", stimulus_type: Optional[str] = None, - skip_electrodes: Tuple[int] = (), + skip_electrodes: tuple[int] = (), ): """ Auxiliary static method for nwbextractor. @@ -439,7 +439,7 @@ def add_neo_to_nwb( def write_neo_to_nwb( neo_reader: neo.io.baseio.BaseIO, - save_path: OptionalFilePathType = None, # pragma: no cover + save_path: Optional[FilePathType] = None, # pragma: no cover overwrite: bool = False, nwbfile=None, metadata: dict = None, diff --git a/src/neuroconv/tools/nwb_helpers/_configuration_models/_base_backend.py b/src/neuroconv/tools/nwb_helpers/_configuration_models/_base_backend.py index 2c07a1bb0..2b0cc507e 100644 --- a/src/neuroconv/tools/nwb_helpers/_configuration_models/_base_backend.py +++ b/src/neuroconv/tools/nwb_helpers/_configuration_models/_base_backend.py @@ -1,6 +1,6 @@ """Base Pydantic models for DatasetInfo and DatasetConfiguration.""" -from typing import Any, ClassVar, Dict, Literal, Type +from typing import Any, ClassVar, Literal, Type from hdmf.container import DataIO from pydantic import BaseModel, ConfigDict, Field @@ -21,7 +21,7 @@ class BackendConfiguration(BaseModel): model_config = ConfigDict(validate_assignment=True) # Re-validate model on mutation - dataset_configurations: Dict[str, DatasetIOConfiguration] = Field( + dataset_configurations: dict[str, DatasetIOConfiguration] = Field( description=( "A mapping from object locations (e.g. `acquisition/TestElectricalSeriesAP/data`) " "to their DatasetConfiguration specification that contains all information " @@ -42,15 +42,15 @@ def __str__(self) -> str: # Pydantic models have several API calls for retrieving the schema - override all of them to work @classmethod - def schema(cls, **kwargs) -> Dict[str, Any]: + def schema(cls, **kwargs) -> dict[str, Any]: return cls.model_json_schema(**kwargs) @classmethod - def schema_json(cls, **kwargs) -> Dict[str, Any]: + def schema_json(cls, **kwargs) -> dict[str, Any]: return cls.model_json_schema(**kwargs) @classmethod - def model_json_schema(cls, **kwargs) -> Dict[str, Any]: + def model_json_schema(cls, **kwargs) -> dict[str, Any]: assert "mode" not in kwargs, "The 'mode' of this method is fixed to be 'validation' and cannot be changed." assert "schema_generator" not in kwargs, "The 'schema_generator' of this method cannot be changed." return super().model_json_schema(mode="validation", schema_generator=PureJSONSchemaGenerator, **kwargs) @@ -65,7 +65,7 @@ def from_nwbfile(cls, nwbfile: NWBFile) -> Self: return cls(dataset_configurations=dataset_configurations) - def find_locations_requiring_remapping(self, nwbfile: NWBFile) -> Dict[str, DatasetIOConfiguration]: + def find_locations_requiring_remapping(self, nwbfile: NWBFile) -> dict[str, DatasetIOConfiguration]: """ Find locations of objects with mismatched IDs in the file. @@ -80,7 +80,7 @@ def find_locations_requiring_remapping(self, nwbfile: NWBFile) -> Dict[str, Data Returns ------- - Dict[str, DatasetIOConfiguration] + dict[str, DatasetIOConfiguration] A dictionary where: * Keys: Locations in the NWB of objects with mismatched IDs. * Values: New `DatasetIOConfiguration` objects corresponding to the updated object IDs. @@ -127,7 +127,7 @@ def find_locations_requiring_remapping(self, nwbfile: NWBFile) -> Dict[str, Data def build_remapped_backend( self, - locations_to_remap: Dict[str, DatasetIOConfiguration], + locations_to_remap: dict[str, DatasetIOConfiguration], ) -> Self: """ Build a remapped backend configuration by updating mismatched object IDs. diff --git a/src/neuroconv/tools/nwb_helpers/_configuration_models/_base_dataset_io.py b/src/neuroconv/tools/nwb_helpers/_configuration_models/_base_dataset_io.py index 01e291034..8b40e9a9e 100644 --- a/src/neuroconv/tools/nwb_helpers/_configuration_models/_base_dataset_io.py +++ b/src/neuroconv/tools/nwb_helpers/_configuration_models/_base_dataset_io.py @@ -2,7 +2,7 @@ import math from abc import ABC, abstractmethod -from typing import Any, Dict, List, Literal, Tuple, Union +from typing import Any, Literal, Union import h5py import numcodecs @@ -56,7 +56,7 @@ def _find_location_in_memory_nwbfile(neurodata_object: Container, field_name: st return _recursively_find_location_in_memory_nwbfile(current_location=field_name, neurodata_object=neurodata_object) -def _infer_dtype_of_list(list_: List[Union[int, float, list]]) -> np.dtype: +def _infer_dtype_of_list(list_: list[Union[int, float, list]]) -> np.dtype: """ Attempt to infer the dtype of values in an arbitrarily sized and nested list. @@ -103,16 +103,16 @@ class DatasetIOConfiguration(BaseModel, ABC): ) dataset_name: Literal["data", "timestamps"] = Field(description="The reference name of the dataset.", frozen=True) dtype: InstanceOf[np.dtype] = Field(description="The data type of elements of this dataset.", frozen=True) - full_shape: Tuple[int, ...] = Field(description="The maximum shape of the entire dataset.", frozen=True) + full_shape: tuple[int, ...] = Field(description="The maximum shape of the entire dataset.", frozen=True) # User specifiable fields - chunk_shape: Union[Tuple[PositiveInt, ...], None] = Field( + chunk_shape: Union[tuple[PositiveInt, ...], None] = Field( description=( "The specified shape to use when chunking the dataset. " "For optimized streaming speeds, a total size of around 10 MB is recommended." ), ) - buffer_shape: Union[Tuple[int, ...], None] = Field( + buffer_shape: Union[tuple[int, ...], None] = Field( description=( "The specified shape to use when iteratively loading data into memory while writing the dataset. " "For optimized writing speeds and minimal RAM usage, a total size of around 1 GB is recommended." @@ -123,12 +123,12 @@ class DatasetIOConfiguration(BaseModel, ABC): ] = Field( description="The specified compression method to apply to this dataset. Set to `None` to disable compression.", ) - compression_options: Union[Dict[str, Any], None] = Field( + compression_options: Union[dict[str, Any], None] = Field( default=None, description="The optional parameters to use for the specified compression method." ) @abstractmethod - def get_data_io_kwargs(self) -> Dict[str, Any]: + def get_data_io_kwargs(self) -> dict[str, Any]: """ Fetch the properly structured dictionary of input arguments. @@ -142,7 +142,7 @@ def __str__(self) -> str: Reason being two-fold; a standard `repr` is intended to be slightly more machine-readable / a more basic representation of the true object state. But then also because an iterable of these objects, such as a - `List[DatasetConfiguration]`, would print out the nested representations, which only look good when using the + `list[DatasetConfiguration]`, would print out the nested representations, which only look good when using the basic `repr` (that is, this fancy string print-out does not look good when nested in another container). """ size_in_bytes = math.prod(self.full_shape) * self.dtype.itemsize @@ -174,7 +174,7 @@ def __str__(self) -> str: return string @model_validator(mode="before") - def validate_all_shapes(cls, values: Dict[str, Any]) -> Dict[str, Any]: + def validate_all_shapes(cls, values: dict[str, Any]) -> dict[str, Any]: location_in_file = values["location_in_file"] dataset_name = values["dataset_name"] @@ -231,15 +231,15 @@ def validate_all_shapes(cls, values: Dict[str, Any]) -> Dict[str, Any]: # Pydantic models have several API calls for retrieving the schema - override all of them to work @classmethod - def schema(cls, **kwargs) -> Dict[str, Any]: + def schema(cls, **kwargs) -> dict[str, Any]: return cls.model_json_schema(**kwargs) @classmethod - def schema_json(cls, **kwargs) -> Dict[str, Any]: + def schema_json(cls, **kwargs) -> dict[str, Any]: return cls.model_json_schema(**kwargs) @classmethod - def model_json_schema(cls, **kwargs) -> Dict[str, Any]: + def model_json_schema(cls, **kwargs) -> dict[str, Any]: assert "mode" not in kwargs, "The 'mode' of this method is fixed to be 'validation' and cannot be changed." assert "schema_generator" not in kwargs, "The 'schema_generator' of this method cannot be changed." return super().model_json_schema(mode="validation", schema_generator=PureJSONSchemaGenerator, **kwargs) diff --git a/src/neuroconv/tools/nwb_helpers/_configuration_models/_hdf5_backend.py b/src/neuroconv/tools/nwb_helpers/_configuration_models/_hdf5_backend.py index f85d388b7..011b2e26d 100644 --- a/src/neuroconv/tools/nwb_helpers/_configuration_models/_hdf5_backend.py +++ b/src/neuroconv/tools/nwb_helpers/_configuration_models/_hdf5_backend.py @@ -1,6 +1,6 @@ """Base Pydantic models for the HDF5DatasetConfiguration.""" -from typing import ClassVar, Dict, Literal, Type +from typing import ClassVar, Literal, Type from pydantic import Field from pynwb import H5DataIO @@ -16,7 +16,7 @@ class HDF5BackendConfiguration(BackendConfiguration): pretty_backend_name: ClassVar[Literal["HDF5"]] = "HDF5" data_io_class: ClassVar[Type[H5DataIO]] = H5DataIO - dataset_configurations: Dict[str, HDF5DatasetIOConfiguration] = Field( + dataset_configurations: dict[str, HDF5DatasetIOConfiguration] = Field( description=( "A mapping from object locations to their HDF5DatasetConfiguration specification that contains all " "information for writing the datasets to disk using the HDF5 backend." diff --git a/src/neuroconv/tools/nwb_helpers/_configuration_models/_hdf5_dataset_io.py b/src/neuroconv/tools/nwb_helpers/_configuration_models/_hdf5_dataset_io.py index 828a37998..44c7660ab 100644 --- a/src/neuroconv/tools/nwb_helpers/_configuration_models/_hdf5_dataset_io.py +++ b/src/neuroconv/tools/nwb_helpers/_configuration_models/_hdf5_dataset_io.py @@ -1,6 +1,6 @@ """Base Pydantic models for the HDF5DatasetConfiguration.""" -from typing import Any, Dict, Literal, Union +from typing import Any, Literal, Union import h5py from pydantic import Field, InstanceOf @@ -45,11 +45,11 @@ class HDF5DatasetIOConfiguration(DatasetIOConfiguration): ) # TODO: actually provide better schematic rendering of options. Only support defaults in GUIDE for now. # Looks like they'll have to be hand-typed however... Can try parsing the google docstrings - no annotation typing. - compression_options: Union[Dict[str, Any], None] = Field( + compression_options: Union[dict[str, Any], None] = Field( default=None, description="The optional parameters to use for the specified compression method." ) - def get_data_io_kwargs(self) -> Dict[str, Any]: + def get_data_io_kwargs(self) -> dict[str, Any]: if is_package_installed(package_name="hdf5plugin"): import hdf5plugin diff --git a/src/neuroconv/tools/nwb_helpers/_configuration_models/_zarr_backend.py b/src/neuroconv/tools/nwb_helpers/_configuration_models/_zarr_backend.py index abd5bdd67..ee26e0553 100644 --- a/src/neuroconv/tools/nwb_helpers/_configuration_models/_zarr_backend.py +++ b/src/neuroconv/tools/nwb_helpers/_configuration_models/_zarr_backend.py @@ -1,6 +1,6 @@ """Base Pydantic models for the ZarrDatasetConfiguration.""" -from typing import ClassVar, Dict, Literal, Type +from typing import ClassVar, Literal, Type import psutil from hdmf_zarr import ZarrDataIO @@ -17,7 +17,7 @@ class ZarrBackendConfiguration(BackendConfiguration): pretty_backend_name: ClassVar[Literal["Zarr"]] = "Zarr" data_io_class: ClassVar[Type[ZarrDataIO]] = ZarrDataIO - dataset_configurations: Dict[str, ZarrDatasetIOConfiguration] = Field( + dataset_configurations: dict[str, ZarrDatasetIOConfiguration] = Field( description=( "A mapping from object locations to their ZarrDatasetConfiguration specification that contains all " "information for writing the datasets to disk using the Zarr backend." diff --git a/src/neuroconv/tools/nwb_helpers/_configuration_models/_zarr_dataset_io.py b/src/neuroconv/tools/nwb_helpers/_configuration_models/_zarr_dataset_io.py index c070a20e9..48b7c070b 100644 --- a/src/neuroconv/tools/nwb_helpers/_configuration_models/_zarr_dataset_io.py +++ b/src/neuroconv/tools/nwb_helpers/_configuration_models/_zarr_dataset_io.py @@ -1,6 +1,6 @@ """Base Pydantic models for the ZarrDatasetConfiguration.""" -from typing import Any, Dict, List, Literal, Union +from typing import Any, Literal, Union import numcodecs import zarr @@ -58,11 +58,11 @@ class ZarrDatasetIOConfiguration(DatasetIOConfiguration): ) # TODO: actually provide better schematic rendering of options. Only support defaults in GUIDE for now. # Looks like they'll have to be hand-typed however... Can try parsing the numpy docstrings - no annotation typing. - compression_options: Union[Dict[str, Any], None] = Field( + compression_options: Union[dict[str, Any], None] = Field( default=None, description="The optional parameters to use for the specified compression method." ) filter_methods: Union[ - List[Union[Literal[tuple(AVAILABLE_ZARR_COMPRESSION_METHODS.keys())], InstanceOf[numcodecs.abc.Codec]]], None + list[Union[Literal[tuple(AVAILABLE_ZARR_COMPRESSION_METHODS.keys())], InstanceOf[numcodecs.abc.Codec]]], None ] = Field( default=None, description=( @@ -72,7 +72,7 @@ class ZarrDatasetIOConfiguration(DatasetIOConfiguration): "Set to `None` to disable filtering." ), ) - filter_options: Union[List[Dict[str, Any]], None] = Field( + filter_options: Union[list[dict[str, Any]], None] = Field( default=None, description="The optional parameters to use for each specified filter method." ) @@ -88,7 +88,7 @@ def __str__(self) -> str: # Inherited docstring from parent. noqa: D105 return string @model_validator(mode="before") - def validate_filter_methods_and_options_length_match(cls, values: Dict[str, Any]): + def validate_filter_methods_and_options_length_match(cls, values: dict[str, Any]): filter_methods = values.get("filter_methods", None) filter_options = values.get("filter_options", None) @@ -110,7 +110,7 @@ def validate_filter_methods_and_options_length_match(cls, values: Dict[str, Any] return values - def get_data_io_kwargs(self) -> Dict[str, Any]: + def get_data_io_kwargs(self) -> dict[str, Any]: filters = None if self.filter_methods: filters = list() diff --git a/src/neuroconv/tools/optogenetics.py b/src/neuroconv/tools/optogenetics.py index c4249f3ee..9ec2b7870 100644 --- a/src/neuroconv/tools/optogenetics.py +++ b/src/neuroconv/tools/optogenetics.py @@ -10,7 +10,7 @@ def create_optogenetic_stimulation_timeseries( frequency: float, pulse_width: float, power: float, -) -> Tuple[np.ndarray, np.ndarray]: +) -> tuple[np.ndarray, np.ndarray]: """Create a continuous stimulation time series from stimulation onset times and parameters. In the resulting data array, the offset time of each pulse is represented by a 0 power value. diff --git a/src/neuroconv/tools/path_expansion.py b/src/neuroconv/tools/path_expansion.py index b2dd367f4..427a33a9e 100644 --- a/src/neuroconv/tools/path_expansion.py +++ b/src/neuroconv/tools/path_expansion.py @@ -4,7 +4,7 @@ import os from datetime import date, datetime from pathlib import Path -from typing import Dict, Iterable, List +from typing import Iterable from parse import parse from pydantic import DirectoryPath, FilePath @@ -34,7 +34,7 @@ def extract_metadata(self, base_directory: DirectoryPath, format_: str): Yields ------ - Tuple[Path, Dict[str, Any]] + tuple[Path, dict[str, Any]] A tuple containing the file path as a `Path` object and a dictionary of the named metadata extracted from the file path. """ @@ -67,7 +67,7 @@ def list_directory(self, base_directory: DirectoryPath) -> Iterable[FilePath]: """ pass - def expand_paths(self, source_data_spec: Dict[str, dict]) -> List[DeepDict]: + def expand_paths(self, source_data_spec: dict[str, dict]) -> list[DeepDict]: """ Match paths in a directory to specs and extract metadata from the paths. diff --git a/src/neuroconv/tools/roiextractors/imagingextractordatachunkiterator.py b/src/neuroconv/tools/roiextractors/imagingextractordatachunkiterator.py index 98001dd8e..3ef022da2 100644 --- a/src/neuroconv/tools/roiextractors/imagingextractordatachunkiterator.py +++ b/src/neuroconv/tools/roiextractors/imagingextractordatachunkiterator.py @@ -138,7 +138,7 @@ def _get_maxshape(self) -> tuple: video_shape += (depth,) return video_shape - def _get_data(self, selection: Tuple[slice]) -> np.ndarray: + def _get_data(self, selection: tuple[slice]) -> np.ndarray: data = self.imaging_extractor.get_video( start_frame=selection[0].start, end_frame=selection[0].stop, diff --git a/src/neuroconv/tools/roiextractors/roiextractors.py b/src/neuroconv/tools/roiextractors/roiextractors.py index 3b8cac2ac..096d68942 100644 --- a/src/neuroconv/tools/roiextractors/roiextractors.py +++ b/src/neuroconv/tools/roiextractors/roiextractors.py @@ -35,7 +35,7 @@ from ..nwb_helpers import get_default_nwbfile_metadata, get_module, make_or_load_nwbfile from ...utils import ( DeepDict, - OptionalFilePathType, + FilePathType, calculate_regular_series_rate, dict_deep_update, ) @@ -570,7 +570,7 @@ def add_imaging( def write_imaging( imaging: ImagingExtractor, - nwbfile_path: OptionalFilePathType = None, + nwbfile_path: Optional[FilePathType] = None, nwbfile: Optional[NWBFile] = None, metadata: Optional[dict] = None, overwrite: bool = False, @@ -1447,7 +1447,7 @@ def add_segmentation( def write_segmentation( segmentation_extractor: SegmentationExtractor, - nwbfile_path: OptionalFilePathType = None, + nwbfile_path: Optional[FilePathType] = None, nwbfile: Optional[NWBFile] = None, metadata: Optional[dict] = None, overwrite: bool = False, diff --git a/src/neuroconv/tools/spikeinterface/spikeinterface.py b/src/neuroconv/tools/spikeinterface/spikeinterface.py index 48a102bc5..670b03dc6 100644 --- a/src/neuroconv/tools/spikeinterface/spikeinterface.py +++ b/src/neuroconv/tools/spikeinterface/spikeinterface.py @@ -1,7 +1,7 @@ import uuid import warnings from collections import defaultdict -from typing import Any, List, Literal, Optional, Union +from typing import Any, Literal, Optional, Union import numpy as np import psutil @@ -261,7 +261,7 @@ def _get_group_name(recording: BaseRecording) -> np.ndarray: return group_names -def _get_electrodes_table_global_ids(nwbfile: pynwb.NWBFile) -> List[str]: +def _get_electrodes_table_global_ids(nwbfile: pynwb.NWBFile) -> list[str]: """ Generate a list of global identifiers for channels in the electrode table of an NWB file. @@ -274,7 +274,7 @@ def _get_electrodes_table_global_ids(nwbfile: pynwb.NWBFile) -> List[str]: Returns ------- - List[str] + list[str] A list of unique keys, each representing a combination of channel name and group name from the electrodes table. If the electrodes table or the necessary columns are not present, an empty list is returned. @@ -293,7 +293,7 @@ def _get_electrodes_table_global_ids(nwbfile: pynwb.NWBFile) -> List[str]: return unique_keys -def _get_electrode_table_indices_for_recording(recording: BaseRecording, nwbfile: pynwb.NWBFile) -> List[int]: +def _get_electrode_table_indices_for_recording(recording: BaseRecording, nwbfile: pynwb.NWBFile) -> list[int]: """ Get the indices of the electrodes in the NWBFile that correspond to the channels in the recording. @@ -311,7 +311,7 @@ def _get_electrode_table_indices_for_recording(recording: BaseRecording, nwbfile Returns ------- - List[int] + list[int] A list of indices corresponding to the positions in the NWBFile's electrodes table that match the channels in the recording. """ @@ -1316,9 +1316,9 @@ def write_recording_to_nwbfile( def add_units_table( sorting: BaseSorting, nwbfile: pynwb.NWBFile, - unit_ids: Optional[List[Union[str, int]]] = None, + unit_ids: Optional[list[Union[str, int]]] = None, property_descriptions: Optional[dict] = None, - skip_properties: Optional[List[str]] = None, + skip_properties: Optional[list[str]] = None, units_table_name: str = "units", unit_table_description: str = "Autogenerated by neuroconv.", write_in_processing_module: bool = False, @@ -1355,9 +1355,9 @@ def add_units_table( def add_units_table_to_nwbfile( sorting: BaseSorting, nwbfile: pynwb.NWBFile, - unit_ids: Optional[List[Union[str, int]]] = None, + unit_ids: Optional[list[Union[str, int]]] = None, property_descriptions: Optional[dict] = None, - skip_properties: Optional[List[str]] = None, + skip_properties: Optional[list[str]] = None, units_table_name: str = "units", unit_table_description: Optional[str] = None, write_in_processing_module: bool = False, @@ -1614,10 +1614,10 @@ def add_units_table_to_nwbfile( def add_sorting( sorting: BaseSorting, nwbfile: Optional[pynwb.NWBFile] = None, - unit_ids: Optional[Union[List[str], List[int]]] = None, + unit_ids: Optional[Union[list[str], list[int]]] = None, property_descriptions: Optional[dict] = None, - skip_properties: Optional[List[str]] = None, - skip_features: Optional[List[str]] = None, + skip_properties: Optional[list[str]] = None, + skip_features: Optional[list[str]] = None, write_as: Literal["units", "processing"] = "units", units_name: str = "units", units_description: str = "Autogenerated by neuroconv.", @@ -1653,10 +1653,10 @@ def add_sorting( def add_sorting_to_nwbfile( sorting: BaseSorting, nwbfile: Optional[pynwb.NWBFile] = None, - unit_ids: Optional[Union[List[str], List[int]]] = None, + unit_ids: Optional[Union[list[str], list[int]]] = None, property_descriptions: Optional[dict] = None, - skip_properties: Optional[List[str]] = None, - skip_features: Optional[List[str]] = None, + skip_properties: Optional[list[str]] = None, + skip_features: Optional[list[str]] = None, write_as: Literal["units", "processing"] = "units", units_name: str = "units", units_description: str = "Autogenerated by neuroconv.", @@ -1735,10 +1735,10 @@ def write_sorting( metadata: Optional[dict] = None, overwrite: bool = False, verbose: bool = True, - unit_ids: Optional[List[Union[str, int]]] = None, + unit_ids: Optional[list[Union[str, int]]] = None, property_descriptions: Optional[dict] = None, - skip_properties: Optional[List[str]] = None, - skip_features: Optional[List[str]] = None, + skip_properties: Optional[list[str]] = None, + skip_features: Optional[list[str]] = None, write_as: Literal["units", "processing"] = "units", units_name: str = "units", units_description: str = "Autogenerated by neuroconv.", @@ -1782,10 +1782,10 @@ def write_sorting_to_nwbfile( metadata: Optional[dict] = None, overwrite: bool = False, verbose: bool = True, - unit_ids: Optional[List[Union[str, int]]] = None, + unit_ids: Optional[list[Union[str, int]]] = None, property_descriptions: Optional[dict] = None, - skip_properties: Optional[List[str]] = None, - skip_features: Optional[List[str]] = None, + skip_properties: Optional[list[str]] = None, + skip_features: Optional[list[str]] = None, write_as: Literal["units", "processing"] = "units", units_name: str = "units", units_description: str = "Autogenerated by neuroconv.", @@ -1868,8 +1868,8 @@ def add_sorting_analyzer( nwbfile: Optional[pynwb.NWBFile] = None, metadata: Optional[dict] = None, recording: Optional[BaseRecording] = None, - unit_ids: Optional[Union[List[str], List[int]]] = None, - skip_properties: Optional[List[str]] = None, + unit_ids: Optional[Union[list[str], list[int]]] = None, + skip_properties: Optional[list[str]] = None, property_descriptions: Optional[dict] = None, write_as: Literal["units", "processing"] = "units", units_name: str = "units", @@ -1903,8 +1903,8 @@ def add_sorting_analyzer_to_nwbfile( nwbfile: Optional[pynwb.NWBFile] = None, metadata: Optional[dict] = None, recording: Optional[BaseRecording] = None, - unit_ids: Optional[Union[List[str], List[int]]] = None, - skip_properties: Optional[List[str]] = None, + unit_ids: Optional[Union[list[str], list[int]]] = None, + skip_properties: Optional[list[str]] = None, property_descriptions: Optional[dict] = None, write_as: Literal["units", "processing"] = "units", units_name: str = "units", @@ -2017,10 +2017,10 @@ def write_sorting_analyzer( overwrite: bool = False, recording: Optional[BaseRecording] = None, verbose: bool = True, - unit_ids: Optional[Union[List[str], List[int]]] = None, + unit_ids: Optional[Union[list[str], list[int]]] = None, write_electrical_series: bool = False, add_electrical_series_kwargs: Optional[dict] = None, - skip_properties: Optional[List[str]] = None, + skip_properties: Optional[list[str]] = None, property_descriptions: Optional[dict] = None, write_as: Literal["units", "processing"] = "units", units_name: str = "units", @@ -2062,10 +2062,10 @@ def write_sorting_analyzer_to_nwbfile( overwrite: bool = False, recording: Optional[BaseRecording] = None, verbose: bool = True, - unit_ids: Optional[Union[List[str], List[int]]] = None, + unit_ids: Optional[Union[list[str], list[int]]] = None, write_electrical_series: bool = False, add_electrical_series_kwargs: Optional[dict] = None, - skip_properties: Optional[List[str]] = None, + skip_properties: Optional[list[str]] = None, property_descriptions: Optional[dict] = None, write_as: Literal["units", "processing"] = "units", units_name: str = "units", @@ -2169,10 +2169,10 @@ def write_waveforms( overwrite: bool = False, recording: Optional[BaseRecording] = None, verbose: bool = True, - unit_ids: Optional[List[Union[str, int]]] = None, + unit_ids: Optional[list[Union[str, int]]] = None, write_electrical_series: bool = False, add_electrical_series_kwargs: Optional[dict] = None, - skip_properties: Optional[List[str]] = None, + skip_properties: Optional[list[str]] = None, property_descriptions: Optional[dict] = None, write_as: Literal["units", "processing"] = "units", units_name: str = "units", @@ -2196,8 +2196,8 @@ def add_waveforms( nwbfile: Optional[pynwb.NWBFile] = None, metadata: Optional[dict] = None, recording: Optional[BaseRecording] = None, - unit_ids: Optional[List[Union[str, int]]] = None, - skip_properties: Optional[List[str]] = None, + unit_ids: Optional[list[Union[str, int]]] = None, + skip_properties: Optional[list[str]] = None, property_descriptions: Optional[dict] = None, write_as: Literal["units", "processing"] = "units", units_name: str = "units", diff --git a/src/neuroconv/tools/spikeinterface/spikeinterfacerecordingdatachunkiterator.py b/src/neuroconv/tools/spikeinterface/spikeinterfacerecordingdatachunkiterator.py index e6909a2e5..309fee56c 100644 --- a/src/neuroconv/tools/spikeinterface/spikeinterfacerecordingdatachunkiterator.py +++ b/src/neuroconv/tools/spikeinterface/spikeinterfacerecordingdatachunkiterator.py @@ -77,7 +77,7 @@ def __init__( progress_bar_options=progress_bar_options, ) - def _get_default_chunk_shape(self, chunk_mb: float = 10.0) -> Tuple[int, int]: + def _get_default_chunk_shape(self, chunk_mb: float = 10.0) -> tuple[int, int]: assert chunk_mb > 0, f"chunk_mb ({chunk_mb}) must be greater than zero!" chunk_channels = min( @@ -91,7 +91,7 @@ def _get_default_chunk_shape(self, chunk_mb: float = 10.0) -> Tuple[int, int]: return (chunk_frames, chunk_channels) - def _get_data(self, selection: Tuple[slice]) -> Iterable: + def _get_data(self, selection: tuple[slice]) -> Iterable: return self.recording.get_traces( segment_index=self.segment_index, channel_ids=self.channel_ids[selection[1]], diff --git a/src/neuroconv/tools/testing/_mock/_mock_dataset_models.py b/src/neuroconv/tools/testing/_mock/_mock_dataset_models.py index 77901f220..6f69f9e93 100644 --- a/src/neuroconv/tools/testing/_mock/_mock_dataset_models.py +++ b/src/neuroconv/tools/testing/_mock/_mock_dataset_models.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, Iterable, Literal, Tuple, Union +from typing import Any, Iterable, Literal, Union import h5py import numcodecs @@ -18,14 +18,14 @@ def mock_HDF5DatasetIOConfiguration( object_id: str = "481a0860-3a0c-40ec-b931-df4a3e9b101f", location_in_file: str = "acquisition/TestElectricalSeries/data", dataset_name: Literal["data", "timestamps"] = "data", - full_shape: Tuple[int, ...] = (60 * 30_000, 384), # ~1 minute of v1 NeuroPixels probe + full_shape: tuple[int, ...] = (60 * 30_000, 384), # ~1 minute of v1 NeuroPixels probe dtype: np.dtype = np.dtype("int16"), - chunk_shape: Tuple[int, ...] = (78_125, 64), # ~10 MB - buffer_shape: Tuple[int, ...] = (1_250_000, 384), # ~1 GB + chunk_shape: tuple[int, ...] = (78_125, 64), # ~10 MB + buffer_shape: tuple[int, ...] = (1_250_000, 384), # ~1 GB compression_method: Union[ Literal[tuple(AVAILABLE_HDF5_COMPRESSION_METHODS.keys())], h5py._hl.filters.FilterRefBase, None ] = "gzip", - compression_options: Union[Dict[str, Any], None] = None, + compression_options: Union[dict[str, Any], None] = None, ) -> HDF5DatasetIOConfiguration: """Mock object of a HDF5DatasetIOConfiguration with NeuroPixel-like values to show chunk/buffer recommendations.""" return HDF5DatasetIOConfiguration( @@ -45,18 +45,18 @@ def mock_ZarrDatasetIOConfiguration( object_id: str = "481a0860-3a0c-40ec-b931-df4a3e9b101f", location_in_file: str = "acquisition/TestElectricalSeries/data", dataset_name: Literal["data", "timestamps"] = "data", - full_shape: Tuple[int, ...] = (60 * 30_000, 384), # ~1 minute of v1 NeuroPixels probe + full_shape: tuple[int, ...] = (60 * 30_000, 384), # ~1 minute of v1 NeuroPixels probe dtype: np.dtype = np.dtype("int16"), - chunk_shape: Tuple[int, ...] = (78_125, 64), # ~10 MB - buffer_shape: Tuple[int, ...] = (1_250_000, 384), # ~1 GB + chunk_shape: tuple[int, ...] = (78_125, 64), # ~10 MB + buffer_shape: tuple[int, ...] = (1_250_000, 384), # ~1 GB compression_method: Union[ Literal[tuple(AVAILABLE_ZARR_COMPRESSION_METHODS.keys())], numcodecs.abc.Codec, None ] = "gzip", - compression_options: Union[Dict[str, Any]] = None, + compression_options: Union[dict[str, Any]] = None, filter_methods: Iterable[ Union[Literal[tuple(AVAILABLE_ZARR_COMPRESSION_METHODS.keys())], numcodecs.abc.Codec, None] ] = None, - filter_options: Union[Iterable[Dict[str, Any]], None] = None, + filter_options: Union[Iterable[dict[str, Any]], None] = None, ) -> ZarrDatasetIOConfiguration: """Mock object of a ZarrDatasetIOConfiguration with NeuroPixel-like values to show chunk/buffer recommendations.""" return ZarrDatasetIOConfiguration( @@ -76,7 +76,7 @@ def mock_ZarrDatasetIOConfiguration( def mock_HDF5BackendConfiguration() -> HDF5BackendConfiguration: """Mock instance of a HDF5BackendConfiguration with two NeuroPixel-like datasets.""" - dataset_configurations: Dict[str, HDF5DatasetIOConfiguration] = { + dataset_configurations: dict[str, HDF5DatasetIOConfiguration] = { "acquisition/TestElectricalSeriesAP/data": mock_HDF5DatasetIOConfiguration( location_in_file="acquisition/TestElectricalSeriesAP/data", dataset_name="data" ), @@ -97,7 +97,7 @@ def mock_HDF5BackendConfiguration() -> HDF5BackendConfiguration: def mock_ZarrBackendConfiguration() -> ZarrBackendConfiguration: """Mock instance of a HDF5BackendConfiguration with several NeuroPixel-like datasets.""" - dataset_configurations: Dict[str, ZarrDatasetIOConfiguration] = { + dataset_configurations: dict[str, ZarrDatasetIOConfiguration] = { "acquisition/TestElectricalSeriesAP/data": mock_ZarrDatasetIOConfiguration( location_in_file="acquisition/TestElectricalSeriesAP/data", dataset_name="data", diff --git a/src/neuroconv/tools/testing/data_interface_mixins.py b/src/neuroconv/tools/testing/data_interface_mixins.py index 578f3688c..b923851c2 100644 --- a/src/neuroconv/tools/testing/data_interface_mixins.py +++ b/src/neuroconv/tools/testing/data_interface_mixins.py @@ -5,7 +5,7 @@ from copy import deepcopy from datetime import datetime from pathlib import Path -from typing import List, Literal, Optional, Type, Union +from typing import Literal, Optional, Type, Union import numpy as np from hdmf.testing import TestCase as HDMFTestCase @@ -63,7 +63,7 @@ class DataInterfaceTestMixin: """ data_interface_cls: Type[BaseDataInterface] - interface_kwargs: Union[dict, List[dict]] + interface_kwargs: Union[dict, list[dict]] save_directory: Path = Path(tempfile.mkdtemp()) conversion_options: dict = dict() maxDiff = None @@ -260,7 +260,7 @@ class TemporalAlignmentMixin: """ data_interface_cls: Type[BaseDataInterface] - interface_kwargs: Union[dict, List[dict]] + interface_kwargs: Union[dict, list[dict]] maxDiff = None def setUpFreshInterface(self): diff --git a/src/neuroconv/tools/testing/mock_interfaces.py b/src/neuroconv/tools/testing/mock_interfaces.py index 902e805f4..6f91e775f 100644 --- a/src/neuroconv/tools/testing/mock_interfaces.py +++ b/src/neuroconv/tools/testing/mock_interfaces.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import List, Literal, Optional +from typing import Literal, Optional import numpy as np from pynwb import NWBFile @@ -65,7 +65,7 @@ def get_source_schema(cls) -> dict: return source_schema def __init__( - self, signal_duration: float = 7.0, ttl_times: Optional[List[List[float]]] = None, ttl_duration: float = 1.0 + self, signal_duration: float = 7.0, ttl_times: Optional[list[list[float]]] = None, ttl_duration: float = 1.0 ): """ Define a mock SpikeGLXNIDQInterface by overriding the recording extractor to be a mock TTL signal. @@ -128,7 +128,7 @@ def __init__( self, num_channels: int = 4, sampling_frequency: float = 30_000.0, - # durations: Tuple[float] = (1.0,), # Uncomment when pydantic is integrated for schema validation + # durations: tuple[float] = (1.0,), # Uncomment when pydantic is integrated for schema validation durations: tuple = (1.0,), seed: int = 0, verbose: bool = True, diff --git a/src/neuroconv/tools/testing/mock_probes.py b/src/neuroconv/tools/testing/mock_probes.py index f5a3cea88..8b41d0f9c 100644 --- a/src/neuroconv/tools/testing/mock_probes.py +++ b/src/neuroconv/tools/testing/mock_probes.py @@ -1,5 +1,3 @@ -from typing import List - import numpy as np @@ -7,7 +5,7 @@ def generate_mock_probe(num_channels: int, num_shanks: int = 3): import probeinterface as pi # The shank ids will be 0, 0, 0, ..., 1, 1, 1, ..., 2, 2, 2, ... - shank_ids: List[int] = [] + shank_ids: list[int] = [] positions = np.zeros((num_channels, 2)) # ceil division channels_per_shank = (num_channels + num_shanks - 1) // num_shanks diff --git a/src/neuroconv/tools/text.py b/src/neuroconv/tools/text.py index 8c5b84410..b47ff2215 100644 --- a/src/neuroconv/tools/text.py +++ b/src/neuroconv/tools/text.py @@ -1,5 +1,3 @@ -from typing import Dict - import numpy as np import pandas as pd from pynwb.epoch import TimeIntervals @@ -9,8 +7,8 @@ def convert_df_to_time_intervals( df: pd.DataFrame, table_name: str = "trials", table_description: str = "experimental trials", - column_name_mapping: Dict[str, str] = None, - column_descriptions: Dict[str, str] = None, + column_name_mapping: dict[str, str] = None, + column_descriptions: dict[str, str] = None, ) -> TimeIntervals: """ Convert a dataframe to a TimeIntervals object. diff --git a/src/neuroconv/utils/json_schema.py b/src/neuroconv/utils/json_schema.py index e31a71c58..73aa97bdf 100644 --- a/src/neuroconv/utils/json_schema.py +++ b/src/neuroconv/utils/json_schema.py @@ -4,7 +4,7 @@ import warnings from datetime import datetime from pathlib import Path -from typing import Any, Callable, Dict, List, Optional +from typing import Any, Callable, Optional import docstring_parser import hdmf.data_utils @@ -48,8 +48,8 @@ def get_base_schema( tag: Optional[str] = None, root: bool = False, id_: Optional[str] = None, - required: Optional[List] = None, - properties: Optional[Dict] = None, + required: Optional[list[str]] = None, + properties: Optional[dict] = None, **kwargs, ) -> dict: """Return the base schema used for all other schemas.""" @@ -69,7 +69,7 @@ def get_base_schema( return base_schema -def get_schema_from_method_signature(method: Callable, exclude: Optional[List[str]] = None) -> dict: +def get_schema_from_method_signature(method: Callable, exclude: Optional[list[str]] = None) -> dict: """Deprecated version of `get_json_schema_from_method_signature`.""" message = ( "The method `get_schema_from_method_signature` is now named `get_json_schema_from_method_signature`." @@ -80,7 +80,7 @@ def get_schema_from_method_signature(method: Callable, exclude: Optional[List[st return get_json_schema_from_method_signature(method=method, exclude=exclude) -def get_json_schema_from_method_signature(method: Callable, exclude: Optional[List[str]] = None) -> dict: +def get_json_schema_from_method_signature(method: Callable, exclude: Optional[list[str]] = None) -> dict: """ Get the equivalent JSON schema for a signature of a method. @@ -326,7 +326,7 @@ def get_metadata_schema_for_icephys(): return schema -def validate_metadata(metadata: Dict[str, dict], schema: Dict[str, dict], verbose: bool = False): +def validate_metadata(metadata: dict[str, dict], schema: dict[str, dict], verbose: bool = False): """Validate metadata against a schema.""" encoder = NWBMetaDataEncoder() # The encoder produces a serialized object, so we deserialized it for comparison From 0d27ada035dba212e8bd345f4e28c60a6f4eeb10 Mon Sep 17 00:00:00 2001 From: Ben Dichter Date: Wed, 21 Aug 2024 10:44:30 -0400 Subject: [PATCH 3/6] Update CHANGELOG.md --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6952f7e71..315635934 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,7 +30,7 @@ * Add tqdm with warning to DeepLabCut interface [PR #1006](https://github.com/catalystneuro/neuroconv/pull/1006) * `BaseRecordingInterface` now calls default metadata when metadata is not passing mimicking `run_conversion` behavior. [PR #1012](https://github.com/catalystneuro/neuroconv/pull/1012) * Added `get_json_schema_from_method_signature` which constructs Pydantic models automatically from the signature of any function with typical annotation types used throughout NeuroConv. [PR #1016](https://github.com/catalystneuro/neuroconv/pull/1016) - +* Changed typehint collections (e.g. `List`) to standard collections (e.g. `list`). [PR #1021](https://github.com/catalystneuro/neuroconv/pull/1021) ## v0.5.0 (July 17, 2024) From 8ecd6c3488c9f134845f6195e6afd6a421f39ad1 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 21 Aug 2024 14:46:05 +0000 Subject: [PATCH 4/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../behavior/lightningpose/lightningposedatainterface.py | 2 +- src/neuroconv/datainterfaces/icephys/baseicephysinterface.py | 1 - src/neuroconv/tools/hdmf.py | 1 - src/neuroconv/tools/neo/neo.py | 2 +- src/neuroconv/tools/optogenetics.py | 1 - .../tools/roiextractors/imagingextractordatachunkiterator.py | 2 +- .../spikeinterface/spikeinterfacerecordingdatachunkiterator.py | 2 +- 7 files changed, 4 insertions(+), 7 deletions(-) diff --git a/src/neuroconv/datainterfaces/behavior/lightningpose/lightningposedatainterface.py b/src/neuroconv/datainterfaces/behavior/lightningpose/lightningposedatainterface.py index e15425785..c9064271e 100644 --- a/src/neuroconv/datainterfaces/behavior/lightningpose/lightningposedatainterface.py +++ b/src/neuroconv/datainterfaces/behavior/lightningpose/lightningposedatainterface.py @@ -2,7 +2,7 @@ from copy import deepcopy from datetime import datetime from pathlib import Path -from typing import Optional, Tuple +from typing import Optional import numpy as np from pynwb import NWBFile diff --git a/src/neuroconv/datainterfaces/icephys/baseicephysinterface.py b/src/neuroconv/datainterfaces/icephys/baseicephysinterface.py index d574bdabf..ff82ba391 100644 --- a/src/neuroconv/datainterfaces/icephys/baseicephysinterface.py +++ b/src/neuroconv/datainterfaces/icephys/baseicephysinterface.py @@ -1,5 +1,4 @@ import importlib.util -from typing import Tuple import numpy as np from pynwb import NWBFile diff --git a/src/neuroconv/tools/hdmf.py b/src/neuroconv/tools/hdmf.py index a6ddb7a07..e8dfff294 100644 --- a/src/neuroconv/tools/hdmf.py +++ b/src/neuroconv/tools/hdmf.py @@ -2,7 +2,6 @@ import math import warnings -from typing import Tuple import numpy as np from hdmf.data_utils import GenericDataChunkIterator as HDMFGenericDataChunkIterator diff --git a/src/neuroconv/tools/neo/neo.py b/src/neuroconv/tools/neo/neo.py index 2fa9274fc..816e4c90a 100644 --- a/src/neuroconv/tools/neo/neo.py +++ b/src/neuroconv/tools/neo/neo.py @@ -3,7 +3,7 @@ import warnings from copy import deepcopy from pathlib import Path -from typing import Optional, Tuple +from typing import Optional import neo.io.baseio import numpy as np diff --git a/src/neuroconv/tools/optogenetics.py b/src/neuroconv/tools/optogenetics.py index 9ec2b7870..718f57c43 100644 --- a/src/neuroconv/tools/optogenetics.py +++ b/src/neuroconv/tools/optogenetics.py @@ -1,4 +1,3 @@ -from typing import Tuple import numpy as np diff --git a/src/neuroconv/tools/roiextractors/imagingextractordatachunkiterator.py b/src/neuroconv/tools/roiextractors/imagingextractordatachunkiterator.py index 3ef022da2..0792e2caf 100644 --- a/src/neuroconv/tools/roiextractors/imagingextractordatachunkiterator.py +++ b/src/neuroconv/tools/roiextractors/imagingextractordatachunkiterator.py @@ -1,7 +1,7 @@ """General purpose iterator for all ImagingExtractor data.""" import math -from typing import Optional, Tuple +from typing import Optional import numpy as np from hdmf.data_utils import GenericDataChunkIterator diff --git a/src/neuroconv/tools/spikeinterface/spikeinterfacerecordingdatachunkiterator.py b/src/neuroconv/tools/spikeinterface/spikeinterfacerecordingdatachunkiterator.py index 309fee56c..95b14fc23 100644 --- a/src/neuroconv/tools/spikeinterface/spikeinterfacerecordingdatachunkiterator.py +++ b/src/neuroconv/tools/spikeinterface/spikeinterfacerecordingdatachunkiterator.py @@ -1,4 +1,4 @@ -from typing import Iterable, Optional, Tuple +from typing import Iterable, Optional from hdmf.data_utils import GenericDataChunkIterator from spikeinterface import BaseRecording From b11e278d243eb020f86b061012dfca8e10be3526 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 21 Aug 2024 18:36:06 +0000 Subject: [PATCH 5/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/neuroconv/tools/optogenetics.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/neuroconv/tools/optogenetics.py b/src/neuroconv/tools/optogenetics.py index 718f57c43..f8ed27015 100644 --- a/src/neuroconv/tools/optogenetics.py +++ b/src/neuroconv/tools/optogenetics.py @@ -1,4 +1,3 @@ - import numpy as np From ec0f9c564c69672e51d892ef6b69b237a50fe16b Mon Sep 17 00:00:00 2001 From: CodyCBakerPhD Date: Wed, 21 Aug 2024 14:51:56 -0400 Subject: [PATCH 6/6] fixes post-merge --- src/neuroconv/datainterfaces/behavior/audio/audiointerface.py | 2 +- .../datainterfaces/behavior/video/videodatainterface.py | 2 +- src/neuroconv/datainterfaces/icephys/abf/abfdatainterface.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/neuroconv/datainterfaces/behavior/audio/audiointerface.py b/src/neuroconv/datainterfaces/behavior/audio/audiointerface.py index 043687e4a..d61cdf18b 100644 --- a/src/neuroconv/datainterfaces/behavior/audio/audiointerface.py +++ b/src/neuroconv/datainterfaces/behavior/audio/audiointerface.py @@ -28,7 +28,7 @@ class AudioInterface(BaseTemporalAlignmentInterface): associated_suffixes = (".wav",) info = "Interface for writing audio recordings to an NWB file." - def __init__(self, file_paths: List[FilePath], verbose: bool = False): + def __init__(self, file_paths: list[FilePath], verbose: bool = False): """ Data interface for writing acoustic recordings to an NWB file. diff --git a/src/neuroconv/datainterfaces/behavior/video/videodatainterface.py b/src/neuroconv/datainterfaces/behavior/video/videodatainterface.py index ef9c63a9e..dfb22deba 100644 --- a/src/neuroconv/datainterfaces/behavior/video/videodatainterface.py +++ b/src/neuroconv/datainterfaces/behavior/video/videodatainterface.py @@ -30,7 +30,7 @@ class VideoInterface(BaseDataInterface): def __init__( self, - file_paths: List[FilePath], + file_paths: list[FilePath], verbose: bool = False, *, metadata_key_name: str = "Videos", diff --git a/src/neuroconv/datainterfaces/icephys/abf/abfdatainterface.py b/src/neuroconv/datainterfaces/icephys/abf/abfdatainterface.py index 1091597ee..5336b6116 100644 --- a/src/neuroconv/datainterfaces/icephys/abf/abfdatainterface.py +++ b/src/neuroconv/datainterfaces/icephys/abf/abfdatainterface.py @@ -51,7 +51,7 @@ def get_source_schema(cls) -> dict: return source_schema def __init__( - self, file_paths: List[FilePath], icephys_metadata: dict = None, icephys_metadata_file_path: FilePath = None + self, file_paths: list[FilePath], icephys_metadata: dict = None, icephys_metadata_file_path: FilePath = None ): """ ABF IcephysInterface based on Neo AxonIO.