-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add MDARunner in
experimental
namespace (#197)
* feat: add runner from pymmcore-plus * add dep * fix hint * try fix 3.8 * style(pre-commit.ci): auto fixes [...] * more coverage * change namespace to experimental --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
- Loading branch information
1 parent
a5819ef
commit 70b32dd
Showing
6 changed files
with
667 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
"""MDARunner class for running an Iterable[MDAEvent].""" | ||
|
||
from useq.experimental._runner import MDARunner | ||
from useq.experimental.protocols import PMDAEngine | ||
|
||
__all__ = ["MDARunner", "PMDAEngine"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,348 @@ | ||
from __future__ import annotations | ||
|
||
import logging | ||
import time | ||
import warnings | ||
from contextlib import contextmanager | ||
from typing import TYPE_CHECKING | ||
from unittest.mock import MagicMock | ||
|
||
from useq._mda_sequence import MDASequence | ||
from useq.experimental.protocols import PMDAEngine, PMDASignaler | ||
|
||
if TYPE_CHECKING: | ||
from collections.abc import Iterable, Iterator | ||
|
||
from useq import MDAEvent | ||
|
||
|
||
MSG = ( | ||
"This sequence is a placeholder for a generator of events with unknown " | ||
"length & shape. Iterating over it has no effect." | ||
) | ||
|
||
|
||
@contextmanager | ||
def _exceptions_logged(logger: logging.Logger) -> Iterator[None]: | ||
"""Context manager to log exceptions.""" | ||
try: | ||
yield | ||
except Exception as e: | ||
logger.error(e) | ||
|
||
|
||
class GeneratorMDASequence(MDASequence): | ||
axis_order: tuple[str, ...] = () | ||
|
||
@property | ||
def sizes(self) -> dict[str, int]: # pragma: no cover | ||
warnings.warn(MSG, stacklevel=2) | ||
return {} | ||
|
||
def iter_axis(self, axis: str) -> Iterator: # pragma: no cover | ||
warnings.warn(MSG, stacklevel=2) | ||
yield from [] | ||
|
||
def __str__(self) -> str: # pragma: no cover | ||
return "GeneratorMDASequence()" | ||
|
||
|
||
class MDARunner: | ||
"""Object that executes a multi-dimensional experiment using an MDAEngine. | ||
This object is available at [`CMMCorePlus.mda`][pymmcore_plus.CMMCorePlus.mda]. | ||
This is the main object that runs a multi-dimensional experiment; it does so by | ||
driving an acquisition engine that implements the | ||
[`PMDAEngine`][pymmcore_plus.mda.PMDAEngine] protocol. It emits signals at specific | ||
times during the experiment (see | ||
[`PMDASignaler`][pymmcore_plus.mda.events.PMDASignaler] for details on the signals | ||
that are available to connect to and when they are emitted). | ||
""" | ||
|
||
def __init__( | ||
self, signal_emitter: PMDASignaler, logger: logging.Logger | None = None | ||
) -> None: | ||
self._engine: PMDAEngine | None = None | ||
self._signals = signal_emitter | ||
self._logger = logger or logging.getLogger(__name__) | ||
|
||
self._running = False | ||
self._paused = False | ||
self._paused_time: float = 0 | ||
self._pause_interval: float = 0.1 # sec to wait between checking pause state | ||
|
||
self._canceled = False | ||
self._sequence: MDASequence | None = None | ||
# timer for the full sequence, reset only once at the beginning of the sequence | ||
self._sequence_t0: float = 0.0 | ||
# event clock, reset whenever `event.reset_event_timer` is True | ||
self._t0: float = 0.0 | ||
|
||
def set_engine(self, engine: PMDAEngine) -> PMDAEngine | None: | ||
"""Set the [`PMDAEngine`][pymmcore_plus.mda.PMDAEngine] to use for the MDA run.""" # noqa: E501 | ||
# MagicMock on py312 no longer satisfies isinstance ... so we explicitly | ||
# allow it here just for the sake of testing. | ||
if not isinstance(engine, (PMDAEngine, MagicMock)): # pragma: no cover | ||
raise TypeError("Engine does not conform to the Engine protocol.") | ||
|
||
if self.is_running(): # pragma: no cover | ||
raise RuntimeError( | ||
"Cannot register a new engine when the current engine is running " | ||
"an acquisition. Please cancel the current engine's acquisition " | ||
"before registering" | ||
) | ||
|
||
old_engine, self._engine = self._engine, engine | ||
return old_engine | ||
|
||
@property | ||
def engine(self) -> PMDAEngine | None: | ||
"""The [`PMDAEngine`][pymmcore_plus.mda.PMDAEngine] that is currently being used.""" # noqa: E501 | ||
return self._engine | ||
|
||
@property | ||
def events(self) -> PMDASignaler: | ||
"""Signals that are emitted during the MDA run. | ||
See [`PMDASignaler`][pymmcore_plus.mda.PMDASignaler] for details on the | ||
signals that are available to connect to. | ||
""" | ||
return self._signals | ||
|
||
def is_running(self) -> bool: | ||
"""Return True if an acquisition is currently underway. | ||
This will return True at any point between the emission of the | ||
[`sequenceStarted`][pymmcore_plus.mda.PMDASignaler.sequenceStarted] and | ||
[`sequenceFinished`][pymmcore_plus.mda.PMDASignaler.sequenceFinished] signals, | ||
including when the acquisition is currently paused. | ||
Returns | ||
------- | ||
bool | ||
Whether an acquisition is underway. | ||
""" | ||
return self._running | ||
|
||
def is_paused(self) -> bool: | ||
"""Return True if the acquisition is currently paused. | ||
Use `toggle_pause` to change the paused state. | ||
Returns | ||
------- | ||
bool | ||
Whether the current acquisition is paused. | ||
""" | ||
return self._paused | ||
|
||
def cancel(self) -> None: | ||
"""Cancel the currently running acquisition. | ||
This is a no-op if no acquisition is currently running. | ||
If an acquisition is running then this will cancel the acquisition and | ||
a sequenceCanceled signal, followed by a sequenceFinished signal will | ||
be emitted. | ||
""" | ||
self._canceled = True | ||
self._paused_time = 0 | ||
|
||
def toggle_pause(self) -> None: | ||
"""Toggle the paused state of the current acquisition. | ||
To get whether the acquisition is currently paused use the | ||
[`is_paused`][pymmcore_plus.mda.MDARunner.is_paused] method. This method is a | ||
no-op if no acquisition is currently underway. | ||
""" | ||
if self.is_running(): | ||
self._paused = not self._paused | ||
self._signals.sequencePauseToggled.emit(self._paused) | ||
|
||
def run( | ||
self, | ||
events: Iterable[MDAEvent], | ||
) -> None: | ||
"""Run the multi-dimensional acquisition defined by `sequence`. | ||
Most users should not use this directly as it will block further | ||
execution. Instead, use the | ||
[`CMMCorePlus.run_mda`][pymmcore_plus.CMMCorePlus.run_mda] method which will | ||
run on a thread. | ||
Parameters | ||
---------- | ||
events : Iterable[MDAEvent] | ||
An iterable of `useq.MDAEvents` objects to execute. | ||
""" | ||
error = None | ||
sequence = events if isinstance(events, MDASequence) else GeneratorMDASequence() | ||
# NOTE: it's important that `_prepare_to_run` and `_finish_run` are | ||
# called inside the context manager, since the `mda_listeners_connected` | ||
# context manager expects to see both of those signals. | ||
try: | ||
engine = self._prepare_to_run(sequence) | ||
self._run(engine, events) | ||
except Exception as e: | ||
error = e | ||
with _exceptions_logged(self._logger): | ||
self._finish_run(sequence) | ||
if error is not None: | ||
raise error | ||
|
||
def seconds_elapsed(self) -> float: | ||
"""Return the number of seconds since the start of the acquisition.""" | ||
return time.perf_counter() - self._sequence_t0 | ||
|
||
def event_seconds_elapsed(self) -> float: | ||
"""Return the number of seconds on the "event clock". | ||
This is the time since either the start of the acquisition or the last | ||
event with `reset_event_timer` set to `True`. | ||
""" | ||
return time.perf_counter() - self._t0 | ||
|
||
def _run(self, engine: PMDAEngine, events: Iterable[MDAEvent]) -> None: | ||
"""Main execution of events, inside the try/except block of `run`.""" | ||
teardown_event = getattr(engine, "teardown_event", lambda e: None) | ||
event_iterator = getattr(engine, "event_iterator", iter) | ||
_events: Iterator[MDAEvent] = event_iterator(events) | ||
self._reset_event_timer() | ||
self._sequence_t0 = self._t0 | ||
|
||
for event in _events: | ||
if event.reset_event_timer: | ||
self._reset_event_timer() | ||
# If cancelled break out of the loop | ||
if self._wait_until_event(event) or not self._running: | ||
break | ||
|
||
self._signals.eventStarted.emit(event) | ||
self._logger.info("%s", event) | ||
engine.setup_event(event) | ||
|
||
try: | ||
runner_time_ms = self.seconds_elapsed() * 1000 | ||
# this is a bit of a hack to pass the time into the engine | ||
# it is used for intra-event time calculations inside the engine. | ||
# we pop it off after the event is executed. | ||
event.metadata["runner_t0"] = self._sequence_t0 | ||
output = engine.exec_event(event) or () # in case output is None | ||
for payload in output: | ||
img, event, meta = payload | ||
event.metadata.pop("runner_t0", None) | ||
# if the engine calculated its own time, don't overwrite it | ||
if "runner_time_ms" not in meta: | ||
meta["runner_time_ms"] = runner_time_ms | ||
with _exceptions_logged(self._logger): | ||
self._signals.frameReady.emit(img, event, meta) | ||
finally: | ||
teardown_event(event) | ||
|
||
def _prepare_to_run(self, sequence: MDASequence) -> PMDAEngine: | ||
"""Set up for the MDA run. | ||
Parameters | ||
---------- | ||
sequence : MDASequence | ||
The sequence of events to run. | ||
""" | ||
if not self._engine: # pragma: no cover | ||
raise RuntimeError("No MDAEngine set.") | ||
|
||
self._running = True | ||
self._paused = False | ||
self._paused_time = 0.0 | ||
self._sequence = sequence | ||
|
||
meta = self._engine.setup_sequence(sequence) | ||
self._signals.sequenceStarted.emit(sequence, meta or {}) | ||
self._logger.info("MDA Started: %s", sequence) | ||
return self._engine | ||
|
||
def _reset_event_timer(self) -> None: | ||
self._t0 = time.perf_counter() # reference time, in seconds | ||
|
||
def _check_canceled(self) -> bool: | ||
"""Return True if the cancel method has been called and emit relevant signals. | ||
If cancelled, this relies on the `self._sequence` being the current sequence | ||
in order to emit a `sequenceCanceled` signal. | ||
Returns | ||
------- | ||
bool | ||
Whether the MDA has been canceled. | ||
""" | ||
if self._canceled: | ||
self._logger.warning("MDA Canceled: %s", self._sequence) | ||
self._signals.sequenceCanceled.emit(self._sequence) | ||
self._canceled = False | ||
return True | ||
return False | ||
|
||
def _wait_until_event(self, event: MDAEvent) -> bool: | ||
"""Wait until the event's min start time, checking for pauses cancellations. | ||
Parameters | ||
---------- | ||
event : MDAEvent | ||
The event to wait for. | ||
Returns | ||
------- | ||
bool | ||
Whether the MDA was cancelled while waiting. | ||
""" | ||
if not self.is_running(): | ||
return False # pragma: no cover | ||
if self._check_canceled(): | ||
return True | ||
while self.is_paused() and not self._canceled: | ||
self._paused_time += self._pause_interval # fixme: be more precise | ||
time.sleep(self._pause_interval) | ||
|
||
if self._check_canceled(): | ||
return True | ||
|
||
# FIXME: this is actually the only place where the runner assumes our event is | ||
# an MDAevent. For everything else, the engine is technically the only thing | ||
# that cares about the event time. | ||
# So this whole method could potentially be moved to the engine. | ||
if event.min_start_time: | ||
go_at = event.min_start_time + self._paused_time | ||
# We need to enter a loop here checking paused and canceled. | ||
# otherwise you'll potentially wait a long time to cancel | ||
remaining_wait_time = go_at - self.event_seconds_elapsed() | ||
while remaining_wait_time > 0: | ||
self._signals.awaitingEvent.emit(event, remaining_wait_time) | ||
while self._paused and not self._canceled: | ||
self._paused_time += self._pause_interval # fixme: be more precise | ||
remaining_wait_time += self._pause_interval | ||
time.sleep(self._pause_interval) | ||
|
||
if self._canceled: | ||
break | ||
time.sleep(min(remaining_wait_time, 0.5)) | ||
remaining_wait_time = go_at - self.event_seconds_elapsed() | ||
|
||
# check canceled again in case it was canceled | ||
# during the waiting loop | ||
return self._check_canceled() | ||
|
||
def _finish_run(self, sequence: MDASequence) -> None: | ||
"""To be called at the end of an acquisition. | ||
Parameters | ||
---------- | ||
sequence : MDASequence | ||
The sequence that was finished. | ||
""" | ||
self._running = False | ||
self._canceled = False | ||
|
||
if hasattr(self._engine, "teardown_sequence"): | ||
self._engine.teardown_sequence(sequence) # type: ignore | ||
|
||
self._logger.info("MDA Finished: %s", sequence) | ||
self._signals.sequenceFinished.emit(sequence) |
Oops, something went wrong.