Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add some typing #18

Merged
merged 2 commits into from
Jun 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions brainglobe_napari_io/brainreg/reader_dir.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
import json
import os
from pathlib import Path
from typing import Callable, List, Optional, Union

import tifffile
from bg_atlasapi.bg_atlas import BrainGlobeAtlas
from napari.types import LayerDataTuple

from .utils import is_brainreg_dir, load_additional_downsampled_channels

PathOrPaths = Union[List[os.PathLike], os.PathLike]


# Assume this is more used
def brainreg_read_dir(path):
def brainreg_read_dir(path: PathOrPaths) -> Optional[Callable]:
"""A basic implementation of the napari_get_reader hook specification.

Parameters
Expand All @@ -26,11 +30,12 @@ def brainreg_read_dir(path):

if isinstance(path, str) and is_brainreg_dir(path):
return reader_function
else:
return None


def reader_function(path):
"""Take a path or list of paths and return a list of LayerData tuples.

def reader_function(path: os.PathLike) -> List[LayerDataTuple]:
"""
Readers are expected to return data as a list of tuples, where each tuple
is (data, [add_kwargs, [layer_type]]), "add_kwargs" and "layer_type" are
both optional.
Expand Down Expand Up @@ -59,7 +64,7 @@ def reader_function(path):
atlas = BrainGlobeAtlas(metadata["atlas"])
metadata["atlas_class"] = atlas

layers = []
layers: List[LayerDataTuple] = []
layers = load_additional_downsampled_channels(path, layers)

layers.append(
Expand Down
19 changes: 13 additions & 6 deletions brainglobe_napari_io/brainreg/reader_dir_standard_space.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import json
import os
from pathlib import Path
from typing import Callable, List, Optional, Union

import tifffile
from bg_atlasapi.bg_atlas import BrainGlobeAtlas
from napari.types import LayerDataTuple

from .utils import is_brainreg_dir, load_additional_downsampled_channels

PathOrPaths = Union[List[os.PathLike], os.PathLike]

def brainreg_read_dir_standard_space(path):

def brainreg_read_dir_standard_space(path: PathOrPaths) -> Optional[Callable]:
"""A basic implementation of the napari_get_reader hook specification.

Parameters
Expand All @@ -25,11 +29,12 @@ def brainreg_read_dir_standard_space(path):

if isinstance(path, str) and is_brainreg_dir(path):
return reader_function
else:
return None


def reader_function(path):
"""Take a path or list of paths and return a list of LayerData tuples.

def reader_function(path: os.PathLike) -> List[LayerDataTuple]:
"""
Readers are expected to return data as a list of tuples, where each tuple
is (data, [add_kwargs, [layer_type]]), "add_kwargs" and "layer_type" are
both optional.
Expand Down Expand Up @@ -57,7 +62,7 @@ def reader_function(path):

atlas = BrainGlobeAtlas(metadata["atlas"])
metadata["atlas_class"] = atlas
layers = []
layers: List[LayerDataTuple] = []
layers = load_additional_downsampled_channels(
path,
layers,
Expand All @@ -76,7 +81,9 @@ def reader_function(path):
return layers


def load_atlas(atlas, layers):
def load_atlas(
atlas: BrainGlobeAtlas, layers: List[LayerDataTuple]
) -> List[LayerDataTuple]:
atlas_image = atlas.annotation
layers.append(
(
Expand Down
31 changes: 12 additions & 19 deletions brainglobe_napari_io/brainreg/utils.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,14 @@
import os
from pathlib import Path
from typing import List

import tifffile
from napari.types import LayerDataTuple


def is_brainreg_dir(path):
"""Determines whether a path is to a brainreg output directory

Parameters
----------
path : str
Path to file.

Returns
-------
function or None
If the path is a recognized format, return a function that accepts the
same path or list of paths, and returns a list of layer data tuples.
def is_brainreg_dir(path: os.PathLike) -> bool:
"""
Determines whether a path is to a brainreg output directory.
"""
path = os.path.abspath(path)
if os.path.isdir(path):
Expand All @@ -28,12 +21,12 @@ def is_brainreg_dir(path):


def load_additional_downsampled_channels(
path,
layers,
extension=".tiff",
search_string="downsampled_",
exlusion_string="downsampled_standard",
):
path: Path,
layers: List[LayerDataTuple],
extension: str = ".tiff",
search_string: str = "downsampled_",
exlusion_string: str = "downsampled_standard",
) -> List[LayerDataTuple]:

# Get additional downsampled channels, but not main one, and not those
# in standard space
Expand Down
69 changes: 41 additions & 28 deletions brainglobe_napari_io/cellfinder/reader_dir.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,20 @@
import os
import sys
from pathlib import Path
from typing import Callable, List, Optional, Tuple, Union

import bg_space as bgs
from napari.typing import LayerDataTuple

from ..brainreg.reader_dir import reader_function as brainreg_reader
from .utils import load_cells

PathOrPaths = Union[List[os.PathLike], os.PathLike]

def is_cellfinder_dir(path):
"""Determines whether a path is to a brainreg output directory

Parameters
----------
path : str
Path to file.

Returns
-------
function or None
If the path is a recognized format, return a function that accepts the
same path or list of paths, and returns a list of layer data tuples.
def is_cellfinder_dir(path: os.PathLike) -> bool:
"""
Determines whether a path is to a cellfinder output directory.
"""
path = os.path.abspath(path)
if os.path.isdir(path):
Expand All @@ -44,7 +38,7 @@ def is_cellfinder_dir(path):
return False


def cellfinder_read_dir(path):
def cellfinder_read_dir(path: PathOrPaths) -> Optional[Callable]:
"""A basic implementation of the napari_get_reader hook specification.

Parameters
Expand All @@ -60,9 +54,13 @@ def cellfinder_read_dir(path):
"""
if isinstance(path, str) and is_cellfinder_dir(path):
return reader_function
else:
return None


def reader_function(path, point_size=15, opacity=0.6, symbol="ring"):
def reader_function(
path: os.PathLike, point_size: int = 15, opacity: float = 0.6, symbol: str = "ring"
) -> List[LayerDataTuple]:
"""Take a path or list of paths and return a list of LayerData tuples.

Readers are expected to return data as a list of tuples, where each tuple
Expand Down Expand Up @@ -91,7 +89,7 @@ def reader_function(path, point_size=15, opacity=0.6, symbol="ring"):
with open(path / "cellfinder.json") as json_file:
metadata = json.load(json_file)

layers = []
layers: List[LayerDataTuple] = []

registration_directory = path / "registration"
if registration_directory.exists():
Expand All @@ -115,7 +113,14 @@ def reader_function(path, point_size=15, opacity=0.6, symbol="ring"):
return layers


def load_cells_from_file(path, layers, point_size, opacity, symbol, channel=None):
def load_cells_from_file(
path: Path,
layers: List[LayerDataTuple],
point_size: int,
opacity: float,
symbol: str,
channel=None,
) -> List[LayerDataTuple]:
classified_cells_path = path / "points" / "cell_classification.xml"
layers = load_cells(
layers,
Expand All @@ -130,7 +135,9 @@ def load_cells_from_file(path, layers, point_size, opacity, symbol, channel=None
return layers


def load_registration(layers, registration_directory, metadata):
def load_registration(
layers: List[LayerDataTuple], registration_directory: os.PathLike, metadata
) -> List[LayerDataTuple]:
registration_layers = brainreg_reader(registration_directory)
registration_layers = remove_downsampled_images(registration_layers)
atlas = get_atlas(registration_layers)
Expand All @@ -140,28 +147,30 @@ def load_registration(layers, registration_directory, metadata):
return layers


def get_atlas(layers):
def get_atlas(layers: List[LayerDataTuple]):
for layer in layers:
atlas = layer[1]["metadata"]["atlas_class"]
if atlas:
return atlas


def remove_downsampled_images(layers):
def remove_downsampled_images(layers: List[LayerDataTuple]) -> List[LayerDataTuple]:
# assumes the atlas annotations and boundaries are the last two layers
layers = list(layers)
layers = layers[-2:]
layers = tuple(layers)
return layers
return layers[-2:]


def scale_reorient_layers(layers, atlas, metadata):
def scale_reorient_layers(
layers: List[LayerDataTuple], atlas, metadata
) -> List[LayerDataTuple]:
layers = reorient_registration_layers(layers, atlas, metadata)
layers = scale_registration_layers(layers, atlas, metadata)
return layers


def reorient_registration_layers(layers, atlas, metadata):
def reorient_registration_layers(
layers: List[LayerDataTuple], atlas, metadata
) -> List[LayerDataTuple]:
# TODO: do this with napari affine transforms, rather than transforming
# the stack in memory
atlas_orientation = atlas.orientation
Expand All @@ -175,14 +184,18 @@ def reorient_registration_layers(layers, atlas, metadata):
return new_layers


def reorient_registration_layer(layer, atlas_orientation, raw_data_orientation):
def reorient_registration_layer(
layer: LayerDataTuple, atlas_orientation, raw_data_orientation
) -> LayerDataTuple:
layer = list(layer)
layer[0] = bgs.map_stack_to(atlas_orientation, raw_data_orientation, layer[0])
layer = tuple(layer)
return layer


def scale_registration_layers(layers, atlas, metadata):
def scale_registration_layers(
layers: List[LayerDataTuple], atlas, metadata
) -> List[LayerDataTuple]:
new_layers = []
scale = get_scale(atlas, metadata)
for layer in layers:
Expand All @@ -191,7 +204,7 @@ def scale_registration_layers(layers, atlas, metadata):
return new_layers


def get_scale(atlas, metadata, scaling_rounding_decimals=5):
def get_scale(atlas, metadata, scaling_rounding_decimals: int = 5) -> Tuple[int, ...]:
source_space = bgs.AnatomicalSpace(metadata["orientation"])
scaling = []
for idx, axis in enumerate(atlas.space.axes_order):
Expand All @@ -207,7 +220,7 @@ def get_scale(atlas, metadata, scaling_rounding_decimals=5):
return tuple(scaling)


def scale_registration_layer(layer, scale):
def scale_registration_layer(layer: LayerDataTuple, scale) -> LayerDataTuple:
layer = list(layer)
layer[1]["scale"] = scale
layer = tuple(layer)
Expand Down
Loading