From 218039275dc475f7f1e595be8666257a8e5c7a9e Mon Sep 17 00:00:00 2001 From: David Stansby Date: Thu, 23 Jun 2022 19:00:48 +0100 Subject: [PATCH 1/2] Add typing to brainreg code --- brainglobe_napari_io/brainreg/reader_dir.py | 15 ++++++--- .../brainreg/reader_dir_standard_space.py | 19 ++++++++---- brainglobe_napari_io/brainreg/utils.py | 31 +++++++------------ 3 files changed, 35 insertions(+), 30 deletions(-) diff --git a/brainglobe_napari_io/brainreg/reader_dir.py b/brainglobe_napari_io/brainreg/reader_dir.py index a01c90e..2453ba2 100644 --- a/brainglobe_napari_io/brainreg/reader_dir.py +++ b/brainglobe_napari_io/brainreg/reader_dir.py @@ -1,15 +1,19 @@ import json import os from pathlib import Path +from typing import Callable, List, Optional, Union import tifffile from bg_atlasapi.bg_atlas import BrainGlobeAtlas +from napari.types import LayerDataTuple from .utils import is_brainreg_dir, load_additional_downsampled_channels +PathOrPaths = Union[List[os.PathLike], os.PathLike] + # Assume this is more used -def brainreg_read_dir(path): +def brainreg_read_dir(path: PathOrPaths) -> Optional[Callable]: """A basic implementation of the napari_get_reader hook specification. Parameters @@ -26,11 +30,12 @@ def brainreg_read_dir(path): if isinstance(path, str) and is_brainreg_dir(path): return reader_function + else: + return None -def reader_function(path): - """Take a path or list of paths and return a list of LayerData tuples. - +def reader_function(path: os.PathLike) -> List[LayerDataTuple]: + """ Readers are expected to return data as a list of tuples, where each tuple is (data, [add_kwargs, [layer_type]]), "add_kwargs" and "layer_type" are both optional. @@ -59,7 +64,7 @@ def reader_function(path): atlas = BrainGlobeAtlas(metadata["atlas"]) metadata["atlas_class"] = atlas - layers = [] + layers: List[LayerDataTuple] = [] layers = load_additional_downsampled_channels(path, layers) layers.append( diff --git a/brainglobe_napari_io/brainreg/reader_dir_standard_space.py b/brainglobe_napari_io/brainreg/reader_dir_standard_space.py index 8cb7451..557e320 100644 --- a/brainglobe_napari_io/brainreg/reader_dir_standard_space.py +++ b/brainglobe_napari_io/brainreg/reader_dir_standard_space.py @@ -1,14 +1,18 @@ import json import os from pathlib import Path +from typing import Callable, List, Optional, Union import tifffile from bg_atlasapi.bg_atlas import BrainGlobeAtlas +from napari.types import LayerDataTuple from .utils import is_brainreg_dir, load_additional_downsampled_channels +PathOrPaths = Union[List[os.PathLike], os.PathLike] -def brainreg_read_dir_standard_space(path): + +def brainreg_read_dir_standard_space(path: PathOrPaths) -> Optional[Callable]: """A basic implementation of the napari_get_reader hook specification. Parameters @@ -25,11 +29,12 @@ def brainreg_read_dir_standard_space(path): if isinstance(path, str) and is_brainreg_dir(path): return reader_function + else: + return None -def reader_function(path): - """Take a path or list of paths and return a list of LayerData tuples. - +def reader_function(path: os.PathLike) -> List[LayerDataTuple]: + """ Readers are expected to return data as a list of tuples, where each tuple is (data, [add_kwargs, [layer_type]]), "add_kwargs" and "layer_type" are both optional. @@ -57,7 +62,7 @@ def reader_function(path): atlas = BrainGlobeAtlas(metadata["atlas"]) metadata["atlas_class"] = atlas - layers = [] + layers: List[LayerDataTuple] = [] layers = load_additional_downsampled_channels( path, layers, @@ -76,7 +81,9 @@ def reader_function(path): return layers -def load_atlas(atlas, layers): +def load_atlas( + atlas: BrainGlobeAtlas, layers: List[LayerDataTuple] +) -> List[LayerDataTuple]: atlas_image = atlas.annotation layers.append( ( diff --git a/brainglobe_napari_io/brainreg/utils.py b/brainglobe_napari_io/brainreg/utils.py index e0e78ff..f8da639 100644 --- a/brainglobe_napari_io/brainreg/utils.py +++ b/brainglobe_napari_io/brainreg/utils.py @@ -1,21 +1,14 @@ import os +from pathlib import Path +from typing import List import tifffile +from napari.types import LayerDataTuple -def is_brainreg_dir(path): - """Determines whether a path is to a brainreg output directory - - Parameters - ---------- - path : str - Path to file. - - Returns - ------- - function or None - If the path is a recognized format, return a function that accepts the - same path or list of paths, and returns a list of layer data tuples. +def is_brainreg_dir(path: os.PathLike) -> bool: + """ + Determines whether a path is to a brainreg output directory. """ path = os.path.abspath(path) if os.path.isdir(path): @@ -28,12 +21,12 @@ def is_brainreg_dir(path): def load_additional_downsampled_channels( - path, - layers, - extension=".tiff", - search_string="downsampled_", - exlusion_string="downsampled_standard", -): + path: Path, + layers: List[LayerDataTuple], + extension: str = ".tiff", + search_string: str = "downsampled_", + exlusion_string: str = "downsampled_standard", +) -> List[LayerDataTuple]: # Get additional downsampled channels, but not main one, and not those # in standard space From 0473562b886a0039b5d8096b282a05f5723f96a1 Mon Sep 17 00:00:00 2001 From: David Stansby Date: Fri, 24 Jun 2022 13:04:51 +0100 Subject: [PATCH 2/2] Add some cellfinder typing --- brainglobe_napari_io/cellfinder/reader_dir.py | 69 +++++++++++-------- brainglobe_napari_io/cellfinder/utils.py | 42 ++++++----- 2 files changed, 66 insertions(+), 45 deletions(-) diff --git a/brainglobe_napari_io/cellfinder/reader_dir.py b/brainglobe_napari_io/cellfinder/reader_dir.py index 2f24a67..943c411 100644 --- a/brainglobe_napari_io/cellfinder/reader_dir.py +++ b/brainglobe_napari_io/cellfinder/reader_dir.py @@ -13,26 +13,20 @@ import os import sys from pathlib import Path +from typing import Callable, List, Optional, Tuple, Union import bg_space as bgs +from napari.typing import LayerDataTuple from ..brainreg.reader_dir import reader_function as brainreg_reader from .utils import load_cells +PathOrPaths = Union[List[os.PathLike], os.PathLike] -def is_cellfinder_dir(path): - """Determines whether a path is to a brainreg output directory - Parameters - ---------- - path : str - Path to file. - - Returns - ------- - function or None - If the path is a recognized format, return a function that accepts the - same path or list of paths, and returns a list of layer data tuples. +def is_cellfinder_dir(path: os.PathLike) -> bool: + """ + Determines whether a path is to a cellfinder output directory. """ path = os.path.abspath(path) if os.path.isdir(path): @@ -44,7 +38,7 @@ def is_cellfinder_dir(path): return False -def cellfinder_read_dir(path): +def cellfinder_read_dir(path: PathOrPaths) -> Optional[Callable]: """A basic implementation of the napari_get_reader hook specification. Parameters @@ -60,9 +54,13 @@ def cellfinder_read_dir(path): """ if isinstance(path, str) and is_cellfinder_dir(path): return reader_function + else: + return None -def reader_function(path, point_size=15, opacity=0.6, symbol="ring"): +def reader_function( + path: os.PathLike, point_size: int = 15, opacity: float = 0.6, symbol: str = "ring" +) -> List[LayerDataTuple]: """Take a path or list of paths and return a list of LayerData tuples. Readers are expected to return data as a list of tuples, where each tuple @@ -91,7 +89,7 @@ def reader_function(path, point_size=15, opacity=0.6, symbol="ring"): with open(path / "cellfinder.json") as json_file: metadata = json.load(json_file) - layers = [] + layers: List[LayerDataTuple] = [] registration_directory = path / "registration" if registration_directory.exists(): @@ -115,7 +113,14 @@ def reader_function(path, point_size=15, opacity=0.6, symbol="ring"): return layers -def load_cells_from_file(path, layers, point_size, opacity, symbol, channel=None): +def load_cells_from_file( + path: Path, + layers: List[LayerDataTuple], + point_size: int, + opacity: float, + symbol: str, + channel=None, +) -> List[LayerDataTuple]: classified_cells_path = path / "points" / "cell_classification.xml" layers = load_cells( layers, @@ -130,7 +135,9 @@ def load_cells_from_file(path, layers, point_size, opacity, symbol, channel=None return layers -def load_registration(layers, registration_directory, metadata): +def load_registration( + layers: List[LayerDataTuple], registration_directory: os.PathLike, metadata +) -> List[LayerDataTuple]: registration_layers = brainreg_reader(registration_directory) registration_layers = remove_downsampled_images(registration_layers) atlas = get_atlas(registration_layers) @@ -140,28 +147,30 @@ def load_registration(layers, registration_directory, metadata): return layers -def get_atlas(layers): +def get_atlas(layers: List[LayerDataTuple]): for layer in layers: atlas = layer[1]["metadata"]["atlas_class"] if atlas: return atlas -def remove_downsampled_images(layers): +def remove_downsampled_images(layers: List[LayerDataTuple]) -> List[LayerDataTuple]: # assumes the atlas annotations and boundaries are the last two layers layers = list(layers) - layers = layers[-2:] - layers = tuple(layers) - return layers + return layers[-2:] -def scale_reorient_layers(layers, atlas, metadata): +def scale_reorient_layers( + layers: List[LayerDataTuple], atlas, metadata +) -> List[LayerDataTuple]: layers = reorient_registration_layers(layers, atlas, metadata) layers = scale_registration_layers(layers, atlas, metadata) return layers -def reorient_registration_layers(layers, atlas, metadata): +def reorient_registration_layers( + layers: List[LayerDataTuple], atlas, metadata +) -> List[LayerDataTuple]: # TODO: do this with napari affine transforms, rather than transforming # the stack in memory atlas_orientation = atlas.orientation @@ -175,14 +184,18 @@ def reorient_registration_layers(layers, atlas, metadata): return new_layers -def reorient_registration_layer(layer, atlas_orientation, raw_data_orientation): +def reorient_registration_layer( + layer: LayerDataTuple, atlas_orientation, raw_data_orientation +) -> LayerDataTuple: layer = list(layer) layer[0] = bgs.map_stack_to(atlas_orientation, raw_data_orientation, layer[0]) layer = tuple(layer) return layer -def scale_registration_layers(layers, atlas, metadata): +def scale_registration_layers( + layers: List[LayerDataTuple], atlas, metadata +) -> List[LayerDataTuple]: new_layers = [] scale = get_scale(atlas, metadata) for layer in layers: @@ -191,7 +204,7 @@ def scale_registration_layers(layers, atlas, metadata): return new_layers -def get_scale(atlas, metadata, scaling_rounding_decimals=5): +def get_scale(atlas, metadata, scaling_rounding_decimals: int = 5) -> Tuple[int, ...]: source_space = bgs.AnatomicalSpace(metadata["orientation"]) scaling = [] for idx, axis in enumerate(atlas.space.axes_order): @@ -207,7 +220,7 @@ def get_scale(atlas, metadata, scaling_rounding_decimals=5): return tuple(scaling) -def scale_registration_layer(layer, scale): +def scale_registration_layer(layer: LayerDataTuple, scale) -> LayerDataTuple: layer = list(layer) layer[1]["scale"] = scale layer = tuple(layer) diff --git a/brainglobe_napari_io/cellfinder/utils.py b/brainglobe_napari_io/cellfinder/utils.py index 450b350..4356e90 100644 --- a/brainglobe_napari_io/cellfinder/utils.py +++ b/brainglobe_napari_io/cellfinder/utils.py @@ -1,24 +1,32 @@ +import os +from pathlib import Path +from typing import Dict, List, Tuple + +import numpy as np import pandas as pd from imlib.cells.cells import Cell from imlib.IO.cells import cells_xml_to_df +from napari.types import LayerDataTuple -def cells_df_as_np(cells_df, new_order=[2, 1, 0], type_column="type"): +def cells_df_as_np( + cells_df: pd.DataFrame, new_order: List[int] = [2, 1, 0], type_column: str = "type" +) -> np.ndarray: cells_df = cells_df.drop(columns=[type_column]) cells = cells_df[cells_df.columns[new_order]] cells = cells.to_numpy() return cells -def cells_to_array(cells): +def cells_to_array(cells) -> Tuple[np.ndarray, np.ndarray]: df = pd.DataFrame([c.to_dict() for c in cells]) points = cells_df_as_np(df[df["type"] == Cell.CELL]) rejected = cells_df_as_np(df[df["type"] == Cell.UNKNOWN]) return points, rejected -def get_cell_arrays(cells_file): - df = cells_xml_to_df(cells_file) +def get_cell_arrays(cells_file: os.PathLike) -> Tuple[np.ndarray, np.ndarray]: + df = cells_xml_to_df(str(cells_file)) non_cells = df[df["type"] == Cell.UNKNOWN] cells = df[df["type"] == Cell.CELL] @@ -28,31 +36,31 @@ def get_cell_arrays(cells_file): return cells, non_cells -def convert_layer_to_cells(layer_data, cells=True): +def convert_layer_to_cells(layer_data, cells: bool = True) -> List[Cell]: cells_to_save = [] if cells: - type = Cell.CELL + cell_type = Cell.CELL else: - type = Cell.UNKNOWN + cell_type = Cell.UNKNOWN for idx, point in enumerate(layer_data): - cell = Cell([point[2], point[1], point[0]], type) + cell = Cell([point[2], point[1], point[0]], cell_type) cells_to_save.append(cell) return cells_to_save def load_cells( - layers, - classified_cells_path, - point_size, - opacity, - symbol, - cell_color, - non_cell_color, + layers: List[LayerDataTuple], + classified_cells_path: Path, + point_size: int, + opacity: float, + symbol: str, + cell_color: str, + non_cell_color: str, channel=None, -): - cells, non_cells = get_cell_arrays(str(classified_cells_path)) +) -> List[LayerDataTuple]: + cells, non_cells = get_cell_arrays(classified_cells_path) if channel is not None: channel_base = f"channel_{channel}: " else: