From b5f7a1081230dabd8e2fcf8aef7a1e69014a64bb Mon Sep 17 00:00:00 2001 From: "Ware, Joseph (DLSLtd,RAL,LSCI)" Date: Fri, 16 Feb 2024 13:34:25 +0000 Subject: [PATCH] Move generic pvi behaviour into own module --- src/ophyd_async/epics/pvi.py | 69 +++++++++++++++++++++ src/ophyd_async/panda/panda.py | 104 ++++---------------------------- tests/panda/test_panda.py | 6 +- tests/panda/test_panda_utils.py | 2 +- 4 files changed, 85 insertions(+), 96 deletions(-) create mode 100644 src/ophyd_async/epics/pvi.py diff --git a/src/ophyd_async/epics/pvi.py b/src/ophyd_async/epics/pvi.py new file mode 100644 index 0000000000..349f992260 --- /dev/null +++ b/src/ophyd_async/epics/pvi.py @@ -0,0 +1,69 @@ +from typing import Callable, Dict, FrozenSet, Optional, Type, TypedDict, TypeVar + +from ophyd_async.core.signal import Signal +from ophyd_async.core.signal_backend import SignalBackend +from ophyd_async.epics._backend._p4p import PvaSignalBackend +from ophyd_async.epics.signal.signal import ( + epics_signal_r, + epics_signal_rw, + epics_signal_w, + epics_signal_x, +) + +T = TypeVar("T") + + +_pvi_mapping: Dict[FrozenSet[str], Callable[..., Signal]] = { + frozenset({"r", "w"}): lambda dtype, read_pv, write_pv: epics_signal_rw( + dtype, read_pv, write_pv + ), + frozenset({"rw"}): lambda dtype, read_pv, write_pv: epics_signal_rw( + dtype, read_pv, write_pv + ), + frozenset({"r"}): lambda dtype, read_pv, _: epics_signal_r(dtype, read_pv), + frozenset({"w"}): lambda dtype, _, write_pv: epics_signal_w(dtype, write_pv), + frozenset({"x"}): lambda _, __, write_pv: epics_signal_x(write_pv), +} + + +class PVIEntry(TypedDict, total=False): + d: str + r: str + rw: str + w: str + x: str + + +async def pvi_get(prefix: str) -> Dict[str, PVIEntry]: + """Makes a PvaSignalBackend purely to connect to PVI information. + + This backend is simply thrown away at the end of this method. This is useful + because the backend handles a CancelledError exception that gets thrown on + timeout, and therefore can be used for error reporting.""" + + read_pv = prefix + "PVI" + backend: SignalBackend = PvaSignalBackend(None, read_pv, read_pv) + await backend.connect() + pv_info: Dict[str, Dict[str, str]] = await backend.get_value() + + result = {} + + for attr_name, attr_info in pv_info.items(): + result[attr_name] = PVIEntry(**attr_info) # type: ignore + + return result + + +def make_signal(signal_pvi: PVIEntry, dtype: Optional[Type[T]] = None) -> Signal[T]: + """Make a signal. + + This assumes datatype is None so it can be used to create dynamic signals. + """ + operations = frozenset(signal_pvi.keys()) + pvs = [signal_pvi[i] for i in operations] # type: ignore + signal_factory = _pvi_mapping[operations] + + write_pv = "pva://" + pvs[0] + read_pv = "pva://" + pvs[1] if len(pvs) > 1 else None + + return signal_factory(dtype, read_pv, write_pv) diff --git a/src/ophyd_async/panda/panda.py b/src/ophyd_async/panda/panda.py index 2aa9e8470d..a8adda81e9 100644 --- a/src/ophyd_async/panda/panda.py +++ b/src/ophyd_async/panda/panda.py @@ -1,20 +1,7 @@ from __future__ import annotations import re -from typing import ( - Callable, - Dict, - FrozenSet, - Optional, - Tuple, - Type, - TypedDict, - TypeVar, - cast, - get_args, - get_origin, - get_type_hints, -) +from typing import Dict, Optional, Tuple, cast, get_args, get_origin, get_type_hints from ophyd_async.core import ( Device, @@ -26,13 +13,7 @@ SignalX, SimSignalBackend, ) -from ophyd_async.epics._backend._p4p import PvaSignalBackend -from ophyd_async.epics.signal import ( - epics_signal_r, - epics_signal_rw, - epics_signal_w, - epics_signal_x, -) +from ophyd_async.epics.pvi import PVIEntry, make_signal, pvi_get from ophyd_async.panda.table import SeqTable @@ -50,17 +31,6 @@ class PcapBlock(Device): active: SignalR[bool] -class PVIEntry(TypedDict, total=False): - d: str - r: str - rw: str - w: str - x: str - - -T = TypeVar("T") - - def _block_name_number(block_name: str) -> Tuple[str, Optional[int]]: """Maps a panda block name to a block and number. @@ -79,7 +49,7 @@ def _block_name_number(block_name: str) -> Tuple[str, Optional[int]]: return block_name, None -def _remove_inconsistent_blocks(pvi_info: Dict[str, PVIEntry]) -> None: +def _remove_inconsistent_blocks(pvi_info: Optional[Dict[str, PVIEntry]]) -> None: """Remove blocks from pvi information. This is needed because some pandas have 'pcap' and 'pcap1' blocks, which are @@ -87,6 +57,8 @@ def _remove_inconsistent_blocks(pvi_info: Dict[str, PVIEntry]) -> None: for example. """ + if pvi_info is None: + return pvi_keys = set(pvi_info.keys()) for k in pvi_keys: kn = re.sub(r"\d*$", "", k) @@ -99,18 +71,6 @@ class PandA(Device): seq: DeviceVector[SeqBlock] pcap: PcapBlock - _pvi_mapping: Dict[FrozenSet[str], Callable[..., Signal]] = { - frozenset({"r", "w"}): lambda dtype, read_pv, write_pv: epics_signal_rw( - dtype, read_pv, write_pv - ), - frozenset({"rw"}): lambda dtype, read_pv, write_pv: epics_signal_rw( - dtype, read_pv, write_pv - ), - frozenset({"r"}): lambda dtype, read_pv, _: epics_signal_r(dtype, read_pv), - frozenset({"w"}): lambda dtype, _, write_pv: epics_signal_w(dtype, write_pv), - frozenset({"x"}): lambda _, __, write_pv: epics_signal_x(write_pv), - } - def __init__(self, prefix: str, name: str) -> None: super().__init__(name) self._prefix = prefix @@ -141,7 +101,7 @@ async def _make_block( block = self.verify_block(name, num) field_annos = get_type_hints(block, globalns=globals()) - block_pvi = await self.pvi_get(block_pv) if not sim else None + block_pvi = await pvi_get(block_pv) if not sim else None # finds which fields this class actually has, e.g. delay, width... for sig_name, sig_type in field_annos.items(): @@ -150,7 +110,6 @@ async def _make_block( # if not in sim mode, if block_pvi: - block_pvi = cast(Dict[str, PVIEntry], block_pvi) # try to get this block in the pvi. entry: Optional[PVIEntry] = block_pvi.get(sig_name) if entry is None: @@ -159,9 +118,7 @@ async def _make_block( + f"an {sig_name} signal which has not been retrieved by PVI." ) - signal: Signal = self._make_signal( - entry, args[0] if len(args) > 0 else None - ) + signal: Signal = make_signal(entry, args[0] if len(args) > 0 else None) else: backend: SignalBackend = SimSignalBackend( @@ -176,8 +133,7 @@ async def _make_block( for attr, attr_pvi in block_pvi.items(): if not hasattr(block, attr): # makes any extra signals - signal = self._make_signal(attr_pvi) - setattr(block, attr, signal) + setattr(block, attr, make_signal(attr_pvi)) return block @@ -188,30 +144,13 @@ async def _make_untyped_block(self, block_pv: str): included dynamically anyway. """ block = Device() - block_pvi: Dict[str, PVIEntry] = await self.pvi_get(block_pv) + block_pvi: Dict[str, PVIEntry] = await pvi_get(block_pv) for signal_name, signal_pvi in block_pvi.items(): - signal: Signal = self._make_signal(signal_pvi) - setattr(block, signal_name, signal) + setattr(block, signal_name, make_signal(signal_pvi)) return block - def _make_signal( - self, signal_pvi: PVIEntry, dtype: Optional[Type[T]] = None - ) -> Signal[T]: - """Make a signal. - - This assumes datatype is None so it can be used to create dynamic signals. - """ - operations = frozenset(signal_pvi.keys()) - pvs = [signal_pvi[i] for i in operations] # type: ignore - signal_factory = self._pvi_mapping[operations] - - write_pv = "pva://" + pvs[0] - read_pv = "pva://" + pvs[1] if len(pvs) > 1 else None - - return signal_factory(dtype, read_pv, write_pv) - # TODO redo to set_panda_block? confusing name def set_attribute(self, name: str, num: Optional[int], block: Device): """Set a block on the panda. @@ -228,26 +167,6 @@ def set_attribute(self, name: str, num: Optional[int], block: Device): else: setattr(self, name, block) - async def pvi_get(self, prefix: str) -> Dict[str, PVIEntry]: - """Makes a PvaSignalBackend purely to connect to PVI information. - - This backend is simply thrown away at the end of this method. This is useful - because the backend handles a CancelledError exception that gets thrown on - timeout, and therefore can be used for error reporting.""" - - read_pv = prefix + "PVI:" - backend: SignalBackend = PvaSignalBackend(None, read_pv, read_pv) - await backend.connect() - pv_info: Dict[str, Dict[str, str]] = await backend.get_value() - - result = {} - - for attr_name, attr_info in pv_info.items(): - result[attr_name] = PVIEntry(**attr_info) # type: ignore - - _remove_inconsistent_blocks(result) - return result - async def connect(self, sim=False) -> None: """Initialises all blocks and connects them. @@ -257,7 +176,8 @@ async def connect(self, sim=False) -> None: If there's no pvi information, that's because we're in sim mode. In that case, makes all required blocks. """ - pvi_info = await self.pvi_get(self._prefix) if not sim else None + pvi_info = await pvi_get(self._prefix) if not sim else None + _remove_inconsistent_blocks(pvi_info) hints = { attr_name: attr_type diff --git a/tests/panda/test_panda.py b/tests/panda/test_panda.py index 5e82ef2fa9..55f73717a1 100644 --- a/tests/panda/test_panda.py +++ b/tests/panda/test_panda.py @@ -38,14 +38,14 @@ def get(self, pv: str, timeout: float = 0.0): @pytest.fixture async def sim_panda(): async with DeviceCollector(sim=True): - sim_panda = PandA("PANDAQSRV:", "panda") + sim_panda = PandA("PANDAQSRV:", "sim_panda") yield sim_panda def test_panda_names_correct(sim_panda: PandA): - assert sim_panda.seq[1].name == "panda-seq-1" - assert sim_panda.pulse[1].name == "panda-pulse-1" + assert sim_panda.seq[1].name == "sim_panda-seq-1" + assert sim_panda.pulse[1].name == "sim_panda-pulse-1" def test_panda_name_set(): diff --git a/tests/panda/test_panda_utils.py b/tests/panda/test_panda_utils.py index 329cfae3c5..2c8ffd2951 100644 --- a/tests/panda/test_panda_utils.py +++ b/tests/panda/test_panda_utils.py @@ -13,7 +13,7 @@ @pytest.fixture async def sim_panda(): async with DeviceCollector(sim=True): - sim_panda = PandA("PANDA", "panda") + sim_panda = PandA("PANDA", "sim_panda") sim_panda.phase_1_signal_units = epics_signal_rw(int, "") yield sim_panda