diff --git a/.gitignore b/.gitignore index 2bba71b..daf1ea3 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,4 @@ __pycache__ *.pyc .mypy_cache .nox +.coverage diff --git a/install.sh b/install.sh index 71c53ed..7fc2ae7 100755 --- a/install.sh +++ b/install.sh @@ -1,12 +1,11 @@ #!/usr/bin/env bash if which brew &> /dev/null; then - brew install portaudio libmagic ffmpeg libav + brew install portaudio ffmpeg libav elif which apt-get &> /dev/null; then sudo apt-get update sudo apt-get install \ portaudio19-dev libportaudio2 \ - libmagic-dev \ ffmpeg \ libav-tools else diff --git a/noxfile.py b/noxfile.py index 6c6b986..c7dd2ae 100644 --- a/noxfile.py +++ b/noxfile.py @@ -29,4 +29,4 @@ def lint(session): @nox.session def tests(session): session.install(".[dev]") - session.run("pytest", "ultratrace2") \ No newline at end of file + session.run("pytest", "ultratrace2") diff --git a/requirements.txt b/requirements.txt index f5a67b6..8bc75c2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,7 +9,6 @@ pydicom==1.3.0 pydub==0.23.1 pyparsing==2.4.5 python-dateutil==2.8.1 -python-magic==0.4.15 six==1.13.0 TextGrid==1.5 tqdm==4.40.2 diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..6921bec --- /dev/null +++ b/setup.cfg @@ -0,0 +1,3 @@ +[tool:pytest] +addopts = + --cov=ultratrace2 diff --git a/setup.py b/setup.py index 4a33d44..fc556fe 100644 --- a/setup.py +++ b/setup.py @@ -33,6 +33,7 @@ def get_requirement(line): "mypy", "pytest", "numpy-stubs @ git+https://github.com/numpy/numpy-stubs.git@master", + "pytest-cov", "pytest-mock", ] }, diff --git a/ultratrace2/__main__.py b/ultratrace2/__main__.py index 94a9cf0..a5138f5 100644 --- a/ultratrace2/__main__.py +++ b/ultratrace2/__main__.py @@ -11,11 +11,22 @@ def main(): parser = argparse.ArgumentParser(prog="ultratrace") # noqa: E128 + parser.add_argument( + "--headless", + action="store_true", + default=False, + help="run ultratrace without a GUI interface", + ) parser.add_argument( "path", default=None, help="path (unique to a participant) where subdirectories contain raw data", ) + parser.add_argument( + "theme", # FIXME: not yet supported + default=None, + help="name of Ttk theme to use for widgets", + ) parser.add_argument( "--no-audio", dest="audio", @@ -56,7 +67,8 @@ def main(): args = parser.parse_args() - app = initialize_app(args) + app = initialize_app(headless=args.headless, path=args.path, theme=args.theme) + app.main() diff --git a/ultratrace2/app.py b/ultratrace2/app.py index f3d0a14..32fae47 100644 --- a/ultratrace2/app.py +++ b/ultratrace2/app.py @@ -6,19 +6,24 @@ class App: - def __init__(self, args): # FIXME: be more granular here - - if args.path is None: + def __init__( + self, + headless: bool = False, + path: Optional[str] = None, + theme: Optional[str] = None, + ): + + if path is None and not headless: path = choose_dir() - if not path: - raise ValueError("You must choose a directory to open") - else: - path = args.path + if not path: + raise ValueError("You must choose a directory to open") + + self.project: Project = Project.get_by_path(path) - self.project = Project(path) - self.gui = GUI(theme=args.theme) + if not headless: + self.gui = GUI(theme=theme) - def main(self): + def main(self) -> None: pass @@ -26,7 +31,10 @@ def main(self): app: Optional[App] = None -def initialize_app(args) -> App: # FIXME: be more granular here +def initialize_app( + headless: bool = False, path: Optional[str] = None, theme: Optional[str] = None +) -> App: + global app - app = App(args) + app = App(headless=headless, path=path, theme=theme,) return app diff --git a/ultratrace2/model/files/ADT.py b/ultratrace2/model/files/ADT.py deleted file mode 100644 index 285c119..0000000 --- a/ultratrace2/model/files/ADT.py +++ /dev/null @@ -1,89 +0,0 @@ -import logging -import os - -from abc import ABC, abstractmethod -from collections import OrderedDict -from magic import Magic # type: ignore -from typing import ClassVar, Dict, Optional, Sequence, Type - - -logger = logging.getLogger(__name__) - - -class FileLoadError(Exception): - pass - - -class TypedFile(ABC): - - preferred_impls = ClassVar[Sequence["TypedFileImpl"]] - - impls: Dict[Type["TypedFileImpl"], Optional["TypedFileImpl"]] - impl: Optional["TypedFileImpl"] - - def __new__(cls): - cls.impls = OrderedDict() - for impl_type in cls.preferred_impls: - cls.impls[impl_type] = None - return super().__new__(cls) - - def __init__(self): - self.impl = None - - def has_impl(self) -> bool: - return self.impl is not None - - def interpret(self, path: str) -> bool: - mimetype = Magic(mime=True).from_file(path) - _, extension = os.path.splitext(path.lower()) - recognized = False - for Impl in self.preferred_impls: - if Impl.recognizes(mimetype, extension): - recognized = True - if self.impl is not None: - logger.error( - f"cannot parse {path}: previous {type(Impl).__name__} was: {self.impl.path}, skipping..." - ) - continue - self.impl = Impl(path) - return True - return recognized - - def data(self): # FIXME: add signature - if self.impl is None: - raise ValueError("cannot load: no implementation found") - try: - self.impl.data() - except FileLoadError as e: - logger.error(f"unable to load {self.impl}: {e}") - self.impl = None # FIXME: is this sane behavior? - - def __repr__(self): - return f"{type(self).__name__}({self.impl})" - - -class TypedFileImpl(ABC): - - mimetypes: ClassVar[Sequence[str]] - extensions: ClassVar[Sequence[str]] - - def __init__(self, path: str): - self.path = path - self._data = None - - def data(self): # FIXME: add signature - # lazy load - if self._data is None: - self.load() - return self._data - - @abstractmethod - def load(self): # should throw FileLoadError if something went wrong - pass - - @classmethod - def recognizes(cls, mimetype: str, extension: str) -> bool: - return mimetype in cls.mimetypes or extension in cls.extensions - - def __repr__(self): - return f'{type(self).__name__}("{self.path}")' diff --git a/ultratrace2/model/files/__init__.py b/ultratrace2/model/files/__init__.py index e69de29..b3b2552 100644 --- a/ultratrace2/model/files/__init__.py +++ b/ultratrace2/model/files/__init__.py @@ -0,0 +1,32 @@ +from .registry import register_loader_for_extensions_and_mime_types as __register + +from .loaders import DICOMLoader +from .loaders import FLACLoader +from .loaders import MeasurementLoader +from .loaders import MP3Loader +from .loaders import OggLoader +from .loaders import TextGridLoader +from .loaders import WAVLoader + + +__register( + [".dicom", ".dcm"], ["application/dicom"], DICOMLoader, +) +__register( + [], [], FLACLoader, +) +__register( + [], [], MeasurementLoader, +) +__register( + [], [], MP3Loader, +) +__register( + [], [], OggLoader, +) +__register( + [".textgrid"], ["text/plain"], TextGridLoader, +) +__register( + [".wav"], ["audio/x-wav", "audio/wav"], WAVLoader, +) diff --git a/ultratrace2/model/files/bundle.py b/ultratrace2/model/files/bundle.py index 9a568fb..10216bf 100644 --- a/ultratrace2/model/files/bundle.py +++ b/ultratrace2/model/files/bundle.py @@ -1,70 +1,100 @@ import logging import os -from typing import Dict, List, Set +from typing import Dict, FrozenSet, Optional, Sequence, Type -from .impls import Sound, Alignment, ImageSet +from .loaders.base import ( + AlignmentFileLoader, + ImageSetFileLoader, + SoundFileLoader, + FileLoaderBase, + FileLoadError, +) +from .registry import get_loader_for logger = logging.getLogger(__name__) class FileBundle: - def __init__(self, name: str): + def __init__( + self, + name: str, + alignment_file: Optional[AlignmentFileLoader] = None, + image_set_file: Optional[ImageSetFileLoader] = None, + sound_file: Optional[SoundFileLoader] = None, + ): self.name = name - self.alignment_file = Alignment() - self.image_file = ImageSet() - self.sound_file = Sound() - - def interpret(self, path: str): # FIXME: add signature - return ( - self.alignment_file.interpret(path) - or self.image_file.interpret(path) - or self.sound_file.interpret(path) - ) # noqa: E126 + self.alignment_file = alignment_file + self.image_set_file = image_set_file + self.sound_file = sound_file def has_impl(self) -> bool: - return ( - self.alignment_file.has_impl() - or self.image_file.has_impl() - or self.sound_file.has_impl() + return any( + f is not None + for f in [self.alignment_file, self.image_set_file, self.sound_file] ) + def set_alignment_file(self, alignment_file: AlignmentFileLoader) -> None: + if self.alignment_file is not None: + logger.warning("Overwriting existing alignment file") + self.alignment_file = alignment_file + + def set_image_set_file(self, image_set_file: ImageSetFileLoader) -> None: + if self.image_set_file is not None: + logger.warning("Overwriting existing image-set file") + self.image_set_file = image_set_file + + def set_sound_file(self, sound_file: SoundFileLoader) -> None: + if self.sound_file is not None: + logger.warning("Overwriting existing sound file") + self.sound_file = sound_file + def __repr__(self): - return f'Bundle("{self.name}",{self.alignment_file},{self.image_file},{self.sound_file})' + return f'Bundle("{self.name}",{self.alignment_file},{self.image_set_file},{self.sound_file})' class FileBundleList: - exclude_dirs: Set[str] = set( + exclude_dirs: FrozenSet[str] = frozenset( [ ".git", "node_modules", "__pycache__", + ".ultratrace", # FIXME: add more ignoreable dirs ] ) - def __init__(self, path: str, extra_exclude_dirs: List[str] = []): + def __init__(self, bundles: Dict[str, FileBundle]): - # FIXME: implement `extra_exclude_dirs` as a command-line arg - for extra_exclude_dir in extra_exclude_dirs: - self.exclude_dirs.add(extra_exclude_dir) + self.current_bundle = None + self.bundles: Dict[str, FileBundle] = bundles - self.path = path - self.has_alignment_impl = False - self.has_image_impl = False - self.has_sound_impl = False + self.has_alignment_impl: bool = False + self.has_image_set_impl: bool = False + self.has_sound_impl: bool = False - self.current_bundle = None + for bundle in bundles.values(): + self.has_alignment_impl |= bundle.alignment_file is not None + self.has_image_set_impl |= bundle.image_set_file is not None + self.has_sound_impl |= bundle.sound_file is not None + + @classmethod + def build_from_dir( + cls, root_path: str, extra_exclude_dirs: Sequence[str] = [] + ) -> "FileBundleList": + + # FIXME: implement `extra_exclude_dirs` as a command-line arg + exclude_dirs = cls.exclude_dirs.union(extra_exclude_dirs) bundles: Dict[str, FileBundle] = {} # NB: `topdown=True` increases runtime cost from O(n) -> O(n^2), but it allows us to # modify `dirs` in-place so that we can skip certain directories. For more info, # see https://stackoverflow.com/questions/19859840/excluding-directories-in-os-walk - for path, dirs, filenames in os.walk(path, topdown=True): - dirs[:] = [d for d in dirs if d not in self.exclude_dirs] + for path, dirs, filenames in os.walk(root_path, topdown=True): + dirs[:] = [d for d in dirs if d not in exclude_dirs] for filename in filenames: @@ -77,18 +107,24 @@ def __init__(self, path: str, extra_exclude_dirs: List[str] = []): ) continue - if name not in bundles: - bundles[name] = FileBundle(name) - - if not bundles[name].interpret(filepath): + file_loader: Optional[Type[FileLoaderBase]] = get_loader_for(filepath) + if file_loader is None: logger.warning(f"unrecognized filetype: {filepath}") + continue - # FIXME: do this when we add to our data structure - for filename, bundle in bundles.items(): - # build up self.bundles here - if not self.has_alignment_impl and bundle.alignment_file.has_impl(): - self.has_alignment_impl = True - if not self.has_image_impl and bundle.image_file.has_impl(): - self.has_image_impl = True - if not self.has_sound_impl and bundle.sound_file.has_impl(): - self.has_sound_impl = True + if name not in bundles: + bundles[name] = FileBundle(filepath) + + try: + loaded_file = file_loader.from_file(filepath) + if loaded_file is not None: + if isinstance(loaded_file, AlignmentFileLoader): + bundles[name].set_alignment_file(loaded_file) + elif isinstance(loaded_file, ImageSetFileLoader): + bundles[name].set_image_set_file(loaded_file) + elif isinstance(loaded_file, SoundFileLoader): + bundles[name].set_sound_file(loaded_file) + except FileLoadError as e: + logger.error(e) + + return cls(bundles) diff --git a/ultratrace2/model/files/impls/__init__.py b/ultratrace2/model/files/impls/__init__.py deleted file mode 100644 index 64f73eb..0000000 --- a/ultratrace2/model/files/impls/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .alignment import Alignment # noqa: F401 -from .image import ImageSet # noqa: F401 -from .sound import Sound # noqa: F401 diff --git a/ultratrace2/model/files/impls/alignment.py b/ultratrace2/model/files/impls/alignment.py deleted file mode 100644 index 5218dfd..0000000 --- a/ultratrace2/model/files/impls/alignment.py +++ /dev/null @@ -1,35 +0,0 @@ -from typing import ClassVar, Sequence - -from ..ADT import TypedFile, TypedFileImpl - - -class Alignment(TypedFile): - class TextGrid(TypedFileImpl): - - mimetypes = ["text/plain"] - extensions = [".textgrid"] - - def load(self): - raise NotImplementedError() - - @classmethod - def recognizes(cls, mimetype: str, extension: str) -> bool: - return mimetype in cls.mimetypes and extension in cls.extensions - - class Measurement(TypedFileImpl): - # FIXME: what is this? do we need to support it? - - mimetypes: ClassVar[Sequence[str]] = [] - extensions: ClassVar[Sequence[str]] = [] - - def load(self): - raise NotImplementedError() - - @classmethod - def recognizes(cls, mimetype: str, extension: str) -> bool: - return mimetype in cls.mimetypes and extension in cls.extensions - - preferred_impls = [TextGrid, Measurement] - - def __init__(self): - super().__init__() diff --git a/ultratrace2/model/files/impls/image.py b/ultratrace2/model/files/impls/image.py deleted file mode 100644 index 11aff06..0000000 --- a/ultratrace2/model/files/impls/image.py +++ /dev/null @@ -1,71 +0,0 @@ -import numpy as np -import os -import PIL # type: ignore -import pydicom # type: ignore - -from tqdm import tqdm # type: ignore - -from ..ADT import FileLoadError, TypedFile, TypedFileImpl - - -class ImageSet(TypedFile): - class DICOM(TypedFileImpl): - - mimetypes = ["application/dicom"] - extensions = [".dicom", ".dcm"] - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) # FIXME: make this more granular - self.png_path = f"{self.path}-frames" - - def load(self) -> None: - if os.path.exists(self.png_path): - return - - try: - dicom = pydicom.read_file(self.path) - except pydicom.errors.InvalidDicomError as e: - raise FileLoadError(str(e)) - - pixels: np.ndarray = dicom.pixel_array - - # check encoding, manipulate array if we need to - if len(pixels.shape) == 3: - is_greyscale = True - frames, rows, columns = pixels.shape - - elif len(pixels.shape) == 4: - # full-color RGB - is_greyscale = False - if pixels.shape[0] == 3: - # RGB-first - rgb, frames, rows, columns = pixels.shape - pixels.reshape((frames, rows, columns, rgb)) - elif pixels.shape[3] == 3: - # RGB-last - frames, rows, columns, rgb = pixels.shape - else: - raise FileLoadError( - "Invalid DICOM ({self.path}), unknown shape {pixels.shape}" - ) - - else: - raise FileLoadError( - "Invalid DICOM ({self.path}), unknown shape {pixels.shape}" - ) - - os.mkdir(self.png_path) - for i in tqdm(range(frames), desc="converting to PNG"): - filename = os.path.join(self.png_path, f"{i:06}.png") - arr = pixels[i, :, :] if is_greyscale else pixels[i, :, :, :] - img = PIL.Image.fromarray(arr) - img.save(filename, format="PNG", compress_level=1) - - def convert_to_png(self, *args, **kwargs): - # FIXME: implement this function, signatures, etc. - print("converting") - - preferred_impls = [DICOM] - - def __init__(self): - super().__init__() diff --git a/ultratrace2/model/files/impls/sound.py b/ultratrace2/model/files/impls/sound.py deleted file mode 100644 index a448f86..0000000 --- a/ultratrace2/model/files/impls/sound.py +++ /dev/null @@ -1,39 +0,0 @@ -from typing import ClassVar, Sequence - -from ..ADT import TypedFile, TypedFileImpl - - -class Sound(TypedFile): - class WAV(TypedFileImpl): - - mimetypes = ["audio/x-wav", "audio/wav"] - extensions = ["wav"] - - def load(self): - raise NotImplementedError() - - class FLAC(TypedFileImpl): - mimetypes: ClassVar[Sequence[str]] = [] - extensions: ClassVar[Sequence[str]] = [] - - def load(self): - raise NotImplementedError() - - class Ogg(TypedFileImpl): - mimetypes: ClassVar[Sequence[str]] = [] - extensions: ClassVar[Sequence[str]] = [] - - def load(self): - raise NotImplementedError() - - class MP3(TypedFileImpl): - mimetypes: ClassVar[Sequence[str]] = [] - extensions: ClassVar[Sequence[str]] = [] - - def load(self): - raise NotImplementedError() - - preferred_impls = [WAV, FLAC, Ogg, MP3] - - def __init__(self): - super().__init__() diff --git a/ultratrace2/model/files/loaders/__init__.py b/ultratrace2/model/files/loaders/__init__.py new file mode 100644 index 0000000..c27a3dd --- /dev/null +++ b/ultratrace2/model/files/loaders/__init__.py @@ -0,0 +1,12 @@ +# alignment files +from .measurement import MeasurementLoader # noqa: F401 +from .textgrid import TextGridLoader # noqa: F401 + +# imageset files +from .dicom import DICOMLoader # noqa: F401 + +# sound files +from .flac import FLACLoader # noqa: F401 +from .mp3 import MP3Loader # noqa: F401 +from .ogg import OggLoader # noqa: F401 +from .wav import WAVLoader # noqa: F401 diff --git a/ultratrace2/model/files/loaders/base.py b/ultratrace2/model/files/loaders/base.py new file mode 100644 index 0000000..b98ffb7 --- /dev/null +++ b/ultratrace2/model/files/loaders/base.py @@ -0,0 +1,62 @@ +import logging + +from abc import ABC, abstractmethod +from PIL import Image # type: ignore +from typing import Type, TypeVar + + +logger = logging.getLogger(__name__) + + +class FileLoadError(Exception): + pass + + +Self = TypeVar("Self", bound="FileLoaderBase") + + +class FileLoaderBase(ABC): + @abstractmethod + def get_path(self) -> str: + ... + + @abstractmethod + def set_path(self, path) -> None: + ... + + path = property(get_path, set_path) + + def __repr__(self): + return f"{type(self).__name__}({self.path})" + + @classmethod + @abstractmethod + def from_file(cls: Type[Self], path: str) -> Self: + """Construct an instance from a path. + + NB: If this concrete method fails to load the data at the given path, then + it should throw a `FileLoadError`.""" + + +class AlignmentFileLoader(FileLoaderBase): + pass + + +class ImageSetFileLoader(FileLoaderBase): + @abstractmethod + def __len__(self) -> int: + """ImageSets should have some notion of their length. + + For example, for DICOM files, this is equal to the number of frames. This + number can then be used to "slice up" any accompanying Alignment or Sound + files. + """ + + @abstractmethod + def get_frame(self, i: int) -> Image.Image: + """ImageSets should support random access of frames.""" + pass + + +class SoundFileLoader(FileLoaderBase): + pass diff --git a/ultratrace2/model/files/loaders/dicom.py b/ultratrace2/model/files/loaders/dicom.py new file mode 100644 index 0000000..6ae1dc6 --- /dev/null +++ b/ultratrace2/model/files/loaders/dicom.py @@ -0,0 +1,87 @@ +from PIL import Image # type: ignore + +import numpy as np +import os +import pydicom # type: ignore + +from .base import FileLoadError, ImageSetFileLoader + + +class DICOMLoader(ImageSetFileLoader): + def get_path(self) -> str: + return self._path + + def set_path(self, path) -> None: + self._path = path + + def __init__(self, path: str, pixels: np.ndarray): + """Construct the DICOMLoader from a pixel array. + + The `shape` of the pixel array should be `(n_frames, n_rows, n_columns)` for greyscale + images and `(n_frames, n_rows, n_columns, rgb_data)` for full-color images.""" + self.set_path(path) + self.pixels = pixels + # FIXME: these should be in the `.ultratrace/` dir + self.png_dir = f"{self.path}-frames" + if not os.path.exists(self.png_dir): + os.mkdir(self.png_dir, mode=0o755) + + def is_greyscale(self) -> bool: + return len(self.pixels) == 3 + + def __len__(self) -> int: + return self.pixels.shape[0] + + def get_png_filepath_for_frame(self, i: int) -> str: + return os.path.join(self.png_dir, f"{i:06}.png") + + def get_frame(self, i: int) -> Image.Image: + png_filepath = self.get_png_filepath_for_frame(i) + if os.path.exists(png_filepath): + return Image.open(png_filepath) + else: + arr = ( + self.pixels[i, :, :] if self.is_greyscale() else self.pixels[i, :, :, :] + ) + img = Image.fromarray(arr) + img.save(png_filepath, format="PNG", compress_level=1) + return img + + @classmethod + def from_file(cls, path: str) -> "DICOMLoader": + + if not os.path.exists(path): + raise FileLoadError("Cannot load from path: '{path}': cannot find") + + try: + dicom = pydicom.read_file(path) + except pydicom.errors.InvalidDicomError as e: + raise FileLoadError( + "Invalid DICOM ({path}), unable to read: {str(e)}" + ) from e + + pixels: np.ndarray = dicom.pixel_array + + # check encoding, manipulate array if we need to + if len(pixels.shape) == 3: + n_frames, n_rows, n_columns = pixels.shape + return cls(path, pixels) + + elif len(pixels.shape) == 4: + # full-color RGB + if pixels.shape[0] == 3: + # RGB-first + rgb, n_frames, n_rows, n_columns = pixels.shape + pixels.reshape((n_frames, n_rows, n_columns, rgb)) + return cls(path, pixels) + + elif pixels.shape[3] == 3: + # RGB-last + n_frames, n_rows, n_columns, rgb = pixels.shape + return cls(path, pixels) + + raise FileLoadError("Invalid DICOM ({path}), unknown shape {pixels.shape}") + + def convert_to_png(self, *args, **kwargs): + # FIXME: implement this as a helper function + raise NotImplementedError() diff --git a/ultratrace2/model/files/loaders/flac.py b/ultratrace2/model/files/loaders/flac.py new file mode 100644 index 0000000..bc05a13 --- /dev/null +++ b/ultratrace2/model/files/loaders/flac.py @@ -0,0 +1,16 @@ +from .base import SoundFileLoader + + +class FLACLoader(SoundFileLoader): + def get_path(self) -> str: + return self._path + + def set_path(self, path) -> None: + self._path = path + + def __init__(self, path: str): + self.set_path(path) + + @classmethod + def from_file(cls, path: str) -> "FLACLoader": + raise NotImplementedError() diff --git a/ultratrace2/model/files/loaders/measurement.py b/ultratrace2/model/files/loaders/measurement.py new file mode 100644 index 0000000..f85ba48 --- /dev/null +++ b/ultratrace2/model/files/loaders/measurement.py @@ -0,0 +1,18 @@ +from .base import AlignmentFileLoader + + +class MeasurementLoader(AlignmentFileLoader): + # FIXME: what is this? do we need to support it? + + def get_path(self) -> str: + return self._path + + def set_path(self, path) -> None: + self._path = path + + def __init__(self, path: str): + self.set_path(path) + + @classmethod + def from_file(cls, path: str) -> "MeasurementLoader": + raise NotImplementedError() diff --git a/ultratrace2/model/files/loaders/mp3.py b/ultratrace2/model/files/loaders/mp3.py new file mode 100644 index 0000000..c19df99 --- /dev/null +++ b/ultratrace2/model/files/loaders/mp3.py @@ -0,0 +1,16 @@ +from .base import SoundFileLoader + + +class MP3Loader(SoundFileLoader): + def get_path(self) -> str: + return self._path + + def set_path(self, path) -> None: + self._path = path + + def __init__(self, path: str): + self.set_path(path) + + @classmethod + def from_file(cls, path: str) -> "MP3Loader": + raise NotImplementedError() diff --git a/ultratrace2/model/files/loaders/ogg.py b/ultratrace2/model/files/loaders/ogg.py new file mode 100644 index 0000000..fd52679 --- /dev/null +++ b/ultratrace2/model/files/loaders/ogg.py @@ -0,0 +1,16 @@ +from .base import SoundFileLoader + + +class OggLoader(SoundFileLoader): + def get_path(self) -> str: + return self._path + + def set_path(self, path) -> None: + self._path = path + + def __init__(self, path: str): + self.set_path(path) + + @classmethod + def from_file(cls, path: str) -> "OggLoader": + raise NotImplementedError() diff --git a/ultratrace2/model/files/loaders/textgrid.py b/ultratrace2/model/files/loaders/textgrid.py new file mode 100644 index 0000000..9becb63 --- /dev/null +++ b/ultratrace2/model/files/loaders/textgrid.py @@ -0,0 +1,16 @@ +from .base import AlignmentFileLoader + + +class TextGridLoader(AlignmentFileLoader): + def get_path(self) -> str: + return self._path + + def set_path(self, path) -> None: + self._path = path + + def __init__(self, path: str): + self.set_path(path) + + @classmethod + def from_file(cls, path: str) -> "TextGridLoader": + raise NotImplementedError() diff --git a/ultratrace2/model/files/loaders/wav.py b/ultratrace2/model/files/loaders/wav.py new file mode 100644 index 0000000..2c218ba --- /dev/null +++ b/ultratrace2/model/files/loaders/wav.py @@ -0,0 +1,16 @@ +from .base import SoundFileLoader + + +class WAVLoader(SoundFileLoader): + def get_path(self) -> str: + return self._path + + def set_path(self, path) -> None: + self._path = path + + def __init__(self, path: str): + self.set_path(path) + + @classmethod + def from_file(cls, path: str) -> "WAVLoader": + raise NotImplementedError() diff --git a/ultratrace2/model/files/registry.py b/ultratrace2/model/files/registry.py new file mode 100644 index 0000000..4766fb3 --- /dev/null +++ b/ultratrace2/model/files/registry.py @@ -0,0 +1,61 @@ +import mimetypes +import os + +from collections import defaultdict +from typing import DefaultDict, Optional, Sequence, Set, Type + +from .loaders.base import FileLoaderBase + + +# global maps +__extension_to_loaders_map: DefaultDict[str, Set[Type[FileLoaderBase]]] = defaultdict( + set +) +__mime_type_to_loaders_map: DefaultDict[str, Set[Type[FileLoaderBase]]] = defaultdict( + set +) + + +def register_loader_for_extensions_and_mime_types( + extensions: Sequence[str], + mime_types: Sequence[str], + loader_cls: Type[FileLoaderBase], +) -> None: + """Register a loader which recognizes a file in the format indicated by the given + extensions and MIME types. + + Parameters: + extensions: A set of file extension, e.g. [".wav"], indicating the file format + mime_types: A set of MIME types, e.g. ["audio/x-wav", "audio/wav"], also indicating the file format + loader_cls: A file loader which knows how to load files with the given file extensions and MIME types + """ + + for extension in extensions: + __extension_to_loaders_map[extension].add(loader_cls) + + for mime_type in mime_types: + __mime_type_to_loaders_map[mime_type].add(loader_cls) + + +def get_loader_for(path: str) -> Optional[Type[FileLoaderBase]]: + + _, extension = os.path.splitext(path.lower()) + mime_type, _ = mimetypes.guess_type(path) + if mime_type is None: + # Early return since we can't possibly match anymore + return None + + loader_clses_by_extension = __extension_to_loaders_map[extension] + loader_clses_by_mime_type = __mime_type_to_loaders_map[mime_type] + + # NB: Use set-intersection (could potentially use set-union instead). + loader_clses = loader_clses_by_extension & loader_clses_by_mime_type + + if len(loader_clses) == 0: + return None + elif len(loader_clses) == 1: + return loader_clses.pop() + else: + raise NotImplementedError( + f"Found multiple Loaders for path '{path}': {','.join(map(str, loader_clses))}" + ) diff --git a/ultratrace2/model/project.py b/ultratrace2/model/project.py index 78a8f09..9345ab9 100644 --- a/ultratrace2/model/project.py +++ b/ultratrace2/model/project.py @@ -1,23 +1,70 @@ +import logging import os +import pickle from .trace import TraceList from .files.bundle import FileBundleList +logger = logging.getLogger(__name__) + class Project: - def __init__(self, path: str): - if not os.path.exists(path): - raise ValueError(f"cannot initialize project at {path}") - self.root_path = os.path.realpath(os.path.abspath(path)) # absolute path - self.traces = TraceList() - self.files = FileBundleList(self.root_path) + def __init__(self, traces: TraceList, files: FileBundleList): + """ + Internal function: to construct a Project, either call ::get_by_path() + """ + self.traces = traces + self.files = files def save(self): raise NotImplementedError() @classmethod - def load(cls): - raise NotImplementedError() + def load(cls, save_file: str) -> "Project": + with open(save_file, "rb") as fp: + project = pickle.load(fp) + assert isinstance(project, Project) + return project + + @classmethod + def get_by_path(cls, root_path: str) -> "Project": + + root_path = os.path.realpath(os.path.abspath(root_path)) # absolute path + if not os.path.exists(root_path): + raise ValueError( + f"cannot initialize project at {root_path}: directory does not exist" + ) + + if not os.path.isdir(root_path): + raise ValueError( + f"cannot initialize project at {root_path}: not a directory" + ) + + save_dir = cls.get_save_dir(root_path) + if not os.path.exists(save_dir): + os.mkdir(save_dir, mode=0o755) + + save_file = cls.get_save_file(root_path) + try: + return cls.load(save_file) + except Exception as e: + logger.warning(e) + logger.info( + f"Unable to find existing project at {root_path}, creating new one..." + ) + + traces = TraceList() + file_bundles = FileBundleList.build_from_dir(root_path) + return cls(traces, file_bundles) + + @staticmethod + def get_save_dir(path: str) -> str: + return os.path.join(path, ".ultratrace") + + @staticmethod + def get_save_file(path: str) -> str: + save_dir = Project.get_save_dir(path) + return os.path.join(save_dir, "project.pkl") def filepath(self): raise NotImplementedError() @@ -32,7 +79,7 @@ def has_alignment_impl(self) -> bool: return self.files.has_alignment_impl def has_image_impl(self) -> bool: - return self.files.has_image_impl + return self.files.has_image_set_impl def has_sound_impl(self) -> bool: return self.files.has_sound_impl diff --git a/ultratrace2/model/tests/test_project.py b/ultratrace2/model/tests/test_project.py new file mode 100644 index 0000000..38c7f7e --- /dev/null +++ b/ultratrace2/model/tests/test_project.py @@ -0,0 +1,38 @@ +import pytest + +from ..project import Project +from ..trace import TraceList +from ..files.bundle import FileBundle, FileBundleList + + +@pytest.mark.parametrize( + "save_file,error", + [ + ("", FileNotFoundError), + ("/path/to/nowhere", FileNotFoundError), + ("/dev/null", EOFError), # pickle cannot open + ("/etc/sudoers", PermissionError), # not readable + ], +) +def test_load_project_invalid(save_file: str, error: Exception) -> None: + with pytest.raises(error): + Project.load(save_file) + + +# FIXME: not implemented, so can't test :/ +""" +def test_load_project_valid(save_file) -> None: + pass +""" + + +@pytest.mark.parametrize( + "traces,files", + [ + (TraceList(), FileBundleList({})), + (TraceList(), FileBundleList({"x": FileBundle("x")})), + ], +) +def test_init_project(traces: TraceList, files: FileBundleList) -> None: + p = Project(traces, files) + assert isinstance(p, Project) diff --git a/ultratrace2/tests/__init__.py b/ultratrace2/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ultratrace2/tests/test_app.py b/ultratrace2/tests/test_app.py new file mode 100644 index 0000000..6f4d02b --- /dev/null +++ b/ultratrace2/tests/test_app.py @@ -0,0 +1,37 @@ +from pathlib import Path +from typing import Any, Dict + +import pytest + +from .. import app as app_py +from ..app import App, initialize_app + + +@pytest.mark.parametrize( + "kwargs,error", + [ + (dict(headless=True), ValueError), + (dict(headless=True, path="/path/to/nowhere"), ValueError), + (dict(headless=True, path="/dev/null"), ValueError), # not a directory + (dict(headless=True, path="/"), PermissionError), # not writeable + ], +) +def test_initialize_app_invalid( + kwargs: Dict[str, Any], error: Exception, tmpdir +) -> None: + app_py.app = None # overwrite global object + with pytest.raises(error): + initialize_app(**kwargs) + + +@pytest.mark.parametrize("kwargs", [(dict(headless=True)),]) # noqa: E231 +def test_initialize_app_valid(kwargs: Dict[str, Any], tmp_path: Path) -> None: + # overwrite global object + app_py.app = None + # initialize an empty dir + path = tmp_path / "ultratrace-test-app" + path.mkdir() + # initialize + kwargs["path"] = str(path) + app = initialize_app(**kwargs) + assert isinstance(app, App)