Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add MDARunner in experimental namespace #197

Merged
merged 10 commits into from
Nov 21, 2024
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ dependencies = ["pydantic >=2.6", "numpy", "typing-extensions"]
# https://peps.python.org/pep-0621/#dependencies-optional-dependencies
[project.optional-dependencies]
yaml = ["PyYAML"]
test = ["pytest>=6.0", "pytest-cov", "PyYAML"]
test = ["pytest>=6.0", "pytest-cov", "PyYAML", "psygnal"]
dev = [
"ipython",
"mypy",
Expand Down Expand Up @@ -103,7 +103,7 @@ ignore = [
keep-runtime-typing = true

[tool.ruff.lint.per-file-ignores]
"tests/*.py" = ["D", "S101", "E501"]
"tests/*.py" = ["D", "S101", "E501", "SLF"]

[tool.ruff.lint.flake8-tidy-imports]
# Disallow all relative imports.
Expand Down
6 changes: 6 additions & 0 deletions src/useq/experimental/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""MDARunner class for running an Iterable[MDAEvent]."""

from useq.experimental._runner import MDARunner
from useq.experimental.protocols import PMDAEngine

__all__ = ["MDARunner", "PMDAEngine"]
348 changes: 348 additions & 0 deletions src/useq/experimental/_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,348 @@
from __future__ import annotations

import logging
import time
import warnings
from contextlib import contextmanager
from typing import TYPE_CHECKING
from unittest.mock import MagicMock

from useq._mda_sequence import MDASequence
from useq.experimental.protocols import PMDAEngine, PMDASignaler

if TYPE_CHECKING:
from collections.abc import Iterable, Iterator

from useq import MDAEvent


MSG = (
"This sequence is a placeholder for a generator of events with unknown "
"length & shape. Iterating over it has no effect."
)


@contextmanager
def _exceptions_logged(logger: logging.Logger) -> Iterator[None]:
"""Context manager to log exceptions."""
try:
yield
except Exception as e:
logger.error(e)


class GeneratorMDASequence(MDASequence):
axis_order: tuple[str, ...] = ()

@property
def sizes(self) -> dict[str, int]: # pragma: no cover
warnings.warn(MSG, stacklevel=2)
return {}

def iter_axis(self, axis: str) -> Iterator: # pragma: no cover
warnings.warn(MSG, stacklevel=2)
yield from []

def __str__(self) -> str: # pragma: no cover
return "GeneratorMDASequence()"


class MDARunner:
"""Object that executes a multi-dimensional experiment using an MDAEngine.

This object is available at [`CMMCorePlus.mda`][pymmcore_plus.CMMCorePlus.mda].

This is the main object that runs a multi-dimensional experiment; it does so by
driving an acquisition engine that implements the
[`PMDAEngine`][pymmcore_plus.mda.PMDAEngine] protocol. It emits signals at specific
times during the experiment (see
[`PMDASignaler`][pymmcore_plus.mda.events.PMDASignaler] for details on the signals
that are available to connect to and when they are emitted).
"""

def __init__(
self, signal_emitter: PMDASignaler, logger: logging.Logger | None = None
) -> None:
self._engine: PMDAEngine | None = None
self._signals = signal_emitter
self._logger = logger or logging.getLogger(__name__)

self._running = False
self._paused = False
self._paused_time: float = 0
self._pause_interval: float = 0.1 # sec to wait between checking pause state

self._canceled = False
self._sequence: MDASequence | None = None
# timer for the full sequence, reset only once at the beginning of the sequence
self._sequence_t0: float = 0.0
# event clock, reset whenever `event.reset_event_timer` is True
self._t0: float = 0.0

def set_engine(self, engine: PMDAEngine) -> PMDAEngine | None:
"""Set the [`PMDAEngine`][pymmcore_plus.mda.PMDAEngine] to use for the MDA run.""" # noqa: E501
# MagicMock on py312 no longer satisfies isinstance ... so we explicitly
# allow it here just for the sake of testing.
if not isinstance(engine, (PMDAEngine, MagicMock)): # pragma: no cover
raise TypeError("Engine does not conform to the Engine protocol.")

if self.is_running(): # pragma: no cover
raise RuntimeError(
"Cannot register a new engine when the current engine is running "
"an acquisition. Please cancel the current engine's acquisition "
"before registering"
)

old_engine, self._engine = self._engine, engine
return old_engine

@property
def engine(self) -> PMDAEngine | None:
"""The [`PMDAEngine`][pymmcore_plus.mda.PMDAEngine] that is currently being used.""" # noqa: E501
return self._engine

@property
def events(self) -> PMDASignaler:
"""Signals that are emitted during the MDA run.

See [`PMDASignaler`][pymmcore_plus.mda.PMDASignaler] for details on the
signals that are available to connect to.
"""
return self._signals

def is_running(self) -> bool:
"""Return True if an acquisition is currently underway.

This will return True at any point between the emission of the
[`sequenceStarted`][pymmcore_plus.mda.PMDASignaler.sequenceStarted] and
[`sequenceFinished`][pymmcore_plus.mda.PMDASignaler.sequenceFinished] signals,
including when the acquisition is currently paused.

Returns
-------
bool
Whether an acquisition is underway.
"""
return self._running

def is_paused(self) -> bool:
"""Return True if the acquisition is currently paused.

Use `toggle_pause` to change the paused state.

Returns
-------
bool
Whether the current acquisition is paused.
"""
return self._paused

def cancel(self) -> None:
"""Cancel the currently running acquisition.

This is a no-op if no acquisition is currently running.
If an acquisition is running then this will cancel the acquisition and
a sequenceCanceled signal, followed by a sequenceFinished signal will
be emitted.
"""
self._canceled = True
self._paused_time = 0

def toggle_pause(self) -> None:
"""Toggle the paused state of the current acquisition.

To get whether the acquisition is currently paused use the
[`is_paused`][pymmcore_plus.mda.MDARunner.is_paused] method. This method is a
no-op if no acquisition is currently underway.
"""
if self.is_running():
self._paused = not self._paused
self._signals.sequencePauseToggled.emit(self._paused)

def run(
self,
events: Iterable[MDAEvent],
) -> None:
"""Run the multi-dimensional acquisition defined by `sequence`.

Most users should not use this directly as it will block further
execution. Instead, use the
[`CMMCorePlus.run_mda`][pymmcore_plus.CMMCorePlus.run_mda] method which will
run on a thread.

Parameters
----------
events : Iterable[MDAEvent]
An iterable of `useq.MDAEvents` objects to execute.
"""
error = None
sequence = events if isinstance(events, MDASequence) else GeneratorMDASequence()
# NOTE: it's important that `_prepare_to_run` and `_finish_run` are
# called inside the context manager, since the `mda_listeners_connected`
# context manager expects to see both of those signals.
try:
engine = self._prepare_to_run(sequence)
self._run(engine, events)
except Exception as e:
error = e
with _exceptions_logged(self._logger):
self._finish_run(sequence)
if error is not None:
raise error

def seconds_elapsed(self) -> float:
"""Return the number of seconds since the start of the acquisition."""
return time.perf_counter() - self._sequence_t0

def event_seconds_elapsed(self) -> float:
"""Return the number of seconds on the "event clock".

This is the time since either the start of the acquisition or the last
event with `reset_event_timer` set to `True`.
"""
return time.perf_counter() - self._t0

def _run(self, engine: PMDAEngine, events: Iterable[MDAEvent]) -> None:
"""Main execution of events, inside the try/except block of `run`."""
teardown_event = getattr(engine, "teardown_event", lambda e: None)
event_iterator = getattr(engine, "event_iterator", iter)
_events: Iterator[MDAEvent] = event_iterator(events)
self._reset_event_timer()
self._sequence_t0 = self._t0

for event in _events:
if event.reset_event_timer:
self._reset_event_timer()
# If cancelled break out of the loop
if self._wait_until_event(event) or not self._running:
break

self._signals.eventStarted.emit(event)
self._logger.info("%s", event)
engine.setup_event(event)

try:
runner_time_ms = self.seconds_elapsed() * 1000
# this is a bit of a hack to pass the time into the engine
# it is used for intra-event time calculations inside the engine.
# we pop it off after the event is executed.
event.metadata["runner_t0"] = self._sequence_t0
output = engine.exec_event(event) or () # in case output is None
for payload in output:
img, event, meta = payload
event.metadata.pop("runner_t0", None)
# if the engine calculated its own time, don't overwrite it
if "runner_time_ms" not in meta:
meta["runner_time_ms"] = runner_time_ms
with _exceptions_logged(self._logger):
self._signals.frameReady.emit(img, event, meta)
finally:
teardown_event(event)

def _prepare_to_run(self, sequence: MDASequence) -> PMDAEngine:
"""Set up for the MDA run.

Parameters
----------
sequence : MDASequence
The sequence of events to run.
"""
if not self._engine: # pragma: no cover
raise RuntimeError("No MDAEngine set.")

self._running = True
self._paused = False
self._paused_time = 0.0
self._sequence = sequence

meta = self._engine.setup_sequence(sequence)
self._signals.sequenceStarted.emit(sequence, meta or {})
self._logger.info("MDA Started: %s", sequence)
return self._engine

def _reset_event_timer(self) -> None:
self._t0 = time.perf_counter() # reference time, in seconds

def _check_canceled(self) -> bool:
"""Return True if the cancel method has been called and emit relevant signals.

If cancelled, this relies on the `self._sequence` being the current sequence
in order to emit a `sequenceCanceled` signal.

Returns
-------
bool
Whether the MDA has been canceled.
"""
if self._canceled:
self._logger.warning("MDA Canceled: %s", self._sequence)
self._signals.sequenceCanceled.emit(self._sequence)
self._canceled = False
return True
return False

def _wait_until_event(self, event: MDAEvent) -> bool:
"""Wait until the event's min start time, checking for pauses cancellations.

Parameters
----------
event : MDAEvent
The event to wait for.

Returns
-------
bool
Whether the MDA was cancelled while waiting.
"""
if not self.is_running():
return False # pragma: no cover
if self._check_canceled():
return True
while self.is_paused() and not self._canceled:
self._paused_time += self._pause_interval # fixme: be more precise
time.sleep(self._pause_interval)

if self._check_canceled():
return True

# FIXME: this is actually the only place where the runner assumes our event is
# an MDAevent. For everything else, the engine is technically the only thing
# that cares about the event time.
# So this whole method could potentially be moved to the engine.
if event.min_start_time:
go_at = event.min_start_time + self._paused_time
# We need to enter a loop here checking paused and canceled.
# otherwise you'll potentially wait a long time to cancel
remaining_wait_time = go_at - self.event_seconds_elapsed()
while remaining_wait_time > 0:
self._signals.awaitingEvent.emit(event, remaining_wait_time)
while self._paused and not self._canceled:
self._paused_time += self._pause_interval # fixme: be more precise
remaining_wait_time += self._pause_interval
time.sleep(self._pause_interval)

if self._canceled:
break
time.sleep(min(remaining_wait_time, 0.5))
remaining_wait_time = go_at - self.event_seconds_elapsed()

# check canceled again in case it was canceled
# during the waiting loop
return self._check_canceled()

def _finish_run(self, sequence: MDASequence) -> None:
"""To be called at the end of an acquisition.

Parameters
----------
sequence : MDASequence
The sequence that was finished.
"""
self._running = False
self._canceled = False

if hasattr(self._engine, "teardown_sequence"):
self._engine.teardown_sequence(sequence) # type: ignore

self._logger.info("MDA Finished: %s", sequence)
self._signals.sequenceFinished.emit(sequence)
Loading