-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for watchers into AsyncStatus.wrap #117
Comments
An idea for the signature: T = TypeVar('T')
P = ParamSpec('P')
Watchers = Sequence[Callable]
class AsyncStatus(Status):
...
@classmethod
def wrap(cls, f: Callable[Concatenate[T, Watchers, P], Coroutine]) -> Callable[Concatenate[T, P], "AsyncStatus"]:
@functools.wraps(f)
def wrap_f(self: T, *args, **kwargs) -> AsyncStatus:
watchers = []
return AsyncStatus(f(self, watchers, *args, **kwargs), watchers)
return wrap_f Then we can use it to wrap |
Thinking more about this, I wonder if we should support signatures with and without watchers, i.e. T = TypeVar('T')
P = ParamSpec('P')
Watchers = Sequence[Callable]
class AsyncStatus(Status):
...
@classmethod
def wrap(cls, f: Callable[P, Awaitable]) -> Callable[P, "AsyncStatus"]:
@functools.wraps(f)
def wrap_f(*args, **kwargs) -> AsyncStatus:
return AsyncStatus(f(*args, **kwargs))
return wrap_f
@classmethod
def wrap_passing_watchers(cls, f: Callable[Concatenate[T, Watchers, P], Awaitable]) -> Callable[Concatenate[T, P], "AsyncStatus"]:
@functools.wraps(f)
def wrap_f(self: T, *args, **kwargs) -> AsyncStatus:
watchers = []
return AsyncStatus(f(self, watchers, *args, **kwargs), watchers)
return wrap_f |
This would mean we could do: class MyDevice(Stageable, Movable):
@AsyncStatus.wrap
def stage(self):
...
@AsyncStatus.wrap_passing_watchers
def set(self, watchers: Watchers, value: float):
... Rather than passing a superfluous |
@DominicOram @dperl-dls bike shedding welcome... |
I agree with you, I'd prefer two separate wrappers for that. I must admit I don't understand how |
The flow looks like this:
Thinking about this now, this means that there is a race condition between the first update coming in and the interested code calling |
Here's another idea, we make the wrapped function either return a coroutine or an async iterator, where the yield value is a dataclass of watcher values. Something like: @AsyncStatus.wrap
async def stage(self) -> None:
await self.stop_.trigger()
@AsyncStatus.wrap
async def set(self, value) -> AsyncIterator[WatcherUpdate]:
await self.setpoint.set(value)
for readback in observe_value(self.readback):
yield WatcherUpdate(target=value, current=value, ...) |
oh, so that the wrapped coroutine has the same |
do we always want |
Yes, we do this for motors at the moment, and will do that with flyers too: ophyd-async/src/ophyd_async/epics/demo/__init__.py Lines 82 to 95 in fd6b8ab
But switching to yielding a dataclass would be a bit more type safe than calling a random callable in many places.
Thinking about this more, maybe we want to separate the class AsyncStatus(Status):
def __init__(self, awaitable: Awaitable):
# Spawn a task as before, but don't make a watcher list
@classmethod
def wrap(cls: Type[T], f: Callable[P, Awaitable]) -> Callable[P, T]:
@functools.wraps(f)
def wrap_f(*args, **kwargs) -> AsyncStatus:
return cls(f(*args, **kwargs))
return wrap_f
class WatchableAsyncStatus(Status):
def __init__(self, iterator: AsyncIterator[WatcherUpdate]):
self._watchers: List[Callable] = []
self._start = time.monotonic()
self._last_update: Optional[WatcherUpdate] = None
super().__init__(self._notify_watchers_from(iterator))
async def _notify_watchers_from(self, iterator: AsyncIterator[WatcherUpdate]):
async for self._last_update in iterator:
for watcher in self._watchers:
self._update_watcher(watcher, self_last_update)
def _update_watcher(self, watcher: Callable, update: WatcherUpdate):
watcher(
name=update.name,
current=update.current,
initial=update.initial,
target=update.target,
unit=update.units,
precision=update.precision,
time_elapsed=time.monotonic() - self._start,
)
def watch(self, watcher: Callable):
self._watchers.append(watcher)
if self._last_update:
self._update_watcher(watcher, self._last_update)
@classmethod
def wrap(cls: Type[T], f: Callable[P, AsyncIterator[WatcherUpdate]]) -> Callable[P, T]:
@functools.wraps(f)
def wrap_f(*args, **kwargs) -> AsyncStatus:
return cls(f(*args, **kwargs))
return wrap_f Then we get to write: @AsyncStatus.wrap
async def stage(self) -> None:
await self.stop_.trigger()
@WatchableAsyncStatus.wrap
async def set(self, value) -> AsyncIterator[WatcherUpdate]:
await self.setpoint.set(value)
for readback in observe_value(self.readback):
yield WatcherUpdate(target=value, current=value, ...) This also solves the first update race condition |
* Re: issues (#117) (#45) * Adds a WatchableAsyncStatus which wraps an AsyncIterator * Lets AsyncStatus.wrap and WatchableAsyncStatus.wrap decorate any function which returns the right type * Updates motors, flyers etc. to match * Tests the above --------- Co-authored-by: Tom C (DLS) <[email protected]>
Pass is as an extra argument into the wrapped function.
The text was updated successfully, but these errors were encountered: