-
Notifications
You must be signed in to change notification settings - Fork 24
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Move generic pvi behaviour into own module
- Loading branch information
1 parent
59bf40a
commit b5f7a10
Showing
4 changed files
with
85 additions
and
96 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
from typing import Callable, Dict, FrozenSet, Optional, Type, TypedDict, TypeVar | ||
|
||
from ophyd_async.core.signal import Signal | ||
from ophyd_async.core.signal_backend import SignalBackend | ||
from ophyd_async.epics._backend._p4p import PvaSignalBackend | ||
from ophyd_async.epics.signal.signal import ( | ||
epics_signal_r, | ||
epics_signal_rw, | ||
epics_signal_w, | ||
epics_signal_x, | ||
) | ||
|
||
T = TypeVar("T") | ||
|
||
|
||
_pvi_mapping: Dict[FrozenSet[str], Callable[..., Signal]] = { | ||
frozenset({"r", "w"}): lambda dtype, read_pv, write_pv: epics_signal_rw( | ||
dtype, read_pv, write_pv | ||
), | ||
frozenset({"rw"}): lambda dtype, read_pv, write_pv: epics_signal_rw( | ||
dtype, read_pv, write_pv | ||
), | ||
frozenset({"r"}): lambda dtype, read_pv, _: epics_signal_r(dtype, read_pv), | ||
frozenset({"w"}): lambda dtype, _, write_pv: epics_signal_w(dtype, write_pv), | ||
frozenset({"x"}): lambda _, __, write_pv: epics_signal_x(write_pv), | ||
} | ||
|
||
|
||
class PVIEntry(TypedDict, total=False): | ||
d: str | ||
r: str | ||
rw: str | ||
w: str | ||
x: str | ||
|
||
|
||
async def pvi_get(prefix: str) -> Dict[str, PVIEntry]: | ||
"""Makes a PvaSignalBackend purely to connect to PVI information. | ||
This backend is simply thrown away at the end of this method. This is useful | ||
because the backend handles a CancelledError exception that gets thrown on | ||
timeout, and therefore can be used for error reporting.""" | ||
|
||
read_pv = prefix + "PVI" | ||
backend: SignalBackend = PvaSignalBackend(None, read_pv, read_pv) | ||
await backend.connect() | ||
pv_info: Dict[str, Dict[str, str]] = await backend.get_value() | ||
|
||
result = {} | ||
|
||
for attr_name, attr_info in pv_info.items(): | ||
result[attr_name] = PVIEntry(**attr_info) # type: ignore | ||
|
||
return result | ||
|
||
|
||
def make_signal(signal_pvi: PVIEntry, dtype: Optional[Type[T]] = None) -> Signal[T]: | ||
"""Make a signal. | ||
This assumes datatype is None so it can be used to create dynamic signals. | ||
""" | ||
operations = frozenset(signal_pvi.keys()) | ||
pvs = [signal_pvi[i] for i in operations] # type: ignore | ||
signal_factory = _pvi_mapping[operations] | ||
|
||
write_pv = "pva://" + pvs[0] | ||
read_pv = "pva://" + pvs[1] if len(pvs) > 1 else None | ||
|
||
return signal_factory(dtype, read_pv, write_pv) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters