Skip to content

Commit

Permalink
moved block making utilities into pvi parser
Browse files Browse the repository at this point in the history
Created a sim mode for pvi devices where the typed attributes are given
sim signals. Adjusted tests.
  • Loading branch information
evalott100 committed Mar 27, 2024
1 parent 6e98217 commit 833a6ec
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 264 deletions.
288 changes: 249 additions & 39 deletions src/ophyd_async/epics/pvi.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,23 @@
from typing import Callable, Dict, FrozenSet, Optional, Type, TypedDict, TypeVar
import re
from dataclasses import dataclass
from typing import (
Callable,
Dict,
FrozenSet,
List,
Literal,
Optional,
Tuple,
Type,
TypeVar,
Union,
get_args,
get_origin,
get_type_hints,
)

from ophyd_async.core import Device, DeviceVector, SimSignalBackend
from ophyd_async.core.signal import Signal
from ophyd_async.core.signal_backend import SignalBackend
from ophyd_async.core.utils import DEFAULT_TIMEOUT
from ophyd_async.epics._backend._p4p import PvaSignalBackend
from ophyd_async.epics.signal.signal import (
Expand All @@ -12,59 +28,253 @@
)

T = TypeVar("T")
Access = FrozenSet[
Literal["r"] | Literal["w"] | Literal["rw"] | Literal["x"] | Literal["d"]
]


def _strip_number_from_string(string: str) -> Tuple[str, Optional[int]]:
match = re.match(r"(.*?)(\d*)$", string)
assert match

name = match.group(1)
number = match.group(2) or None
if number:
number = int(number)
return name, number


@dataclass
class PVIEntry:
"""
A dataclass to represent a single entry in the PVI table.
This could either be a signal or a sub-table.
"""

name: Optional[str]
access: Access
values: List[str]
# `sub_entries` if the signal is a PVI table
# If a sub device is a device vector then it will be represented by a further dict
sub_entries: Optional[Dict[str, Union[Dict[int, "PVIEntry"], "PVIEntry"]]] = None
device: Optional[Device] = None

@property
def is_pvi_table(self) -> bool:
return len(self.values) == 1 and self.values[0].endswith(":PVI")


_pvi_mapping: Dict[FrozenSet[str], Callable[..., Signal]] = {
frozenset({"r", "w"}): lambda dtype, read_pv, write_pv: epics_signal_rw(
dtype, read_pv, write_pv
frozenset({"r", "w"}): lambda read_pv, write_pv: epics_signal_rw(
None, "pva://" + read_pv, "pva://" + write_pv
),
frozenset({"rw"}): lambda dtype, read_pv, write_pv: epics_signal_rw(
dtype, read_pv, write_pv
frozenset({"rw"}): lambda read_write_pv: epics_signal_rw(
None, "pva://" + read_write_pv, write_pv="pva://" + read_write_pv
),
frozenset({"r"}): lambda dtype, read_pv, _: epics_signal_r(dtype, read_pv),
frozenset({"w"}): lambda dtype, _, write_pv: epics_signal_w(dtype, write_pv),
frozenset({"x"}): lambda _, __, write_pv: epics_signal_x(write_pv),
frozenset({"r"}): lambda read_pv: epics_signal_r(None, "pva://" + read_pv),
frozenset({"w"}): lambda write_pv: epics_signal_w(None, "pva://" + write_pv),
frozenset({"x"}): lambda write_pv: epics_signal_x("pva://" + write_pv),
}


class PVIEntry(TypedDict, total=False):
d: str
r: str
rw: str
w: str
x: str
class PVIParser:
def __init__(
self,
root_pv: str,
timeout=DEFAULT_TIMEOUT,
):
self.root_entry = PVIEntry(
name=None, access=frozenset({"d"}), values=[root_pv], sub_entries={}
)
self.timeout = timeout

async def get_pvi_entries(self, entry: Optional[PVIEntry] = None):
"""Creates signals from a top level PVI table"""
if not entry:
entry = self.root_entry

assert entry.is_pvi_table

pvi_table_signal_backend: PvaSignalBackend = PvaSignalBackend(
None, entry.values[0], entry.values[0]
)
await pvi_table_signal_backend.connect(
timeout=self.timeout
) # create table signal backend

pva_table = await pvi_table_signal_backend.get_value()
assert "pvi" in pva_table

entry.sub_entries = {}

for sub_name, pva_enties in pva_table["pvi"].items():
sub_entry = PVIEntry(
name=sub_name,
access=frozenset(pva_enties.keys()),
values=list(pva_enties.values()),
sub_entries={},
)

if sub_entry.is_pvi_table:
sub_split_name, sub_split_number = _strip_number_from_string(sub_name)
if not sub_split_number:
sub_split_number = 1

await self.get_pvi_entries(entry=sub_entry)
entry.sub_entries[sub_split_name] = entry.sub_entries.get(
sub_split_name, {}
)
entry.sub_entries[sub_split_name][
sub_split_number
] = sub_entry # type: ignore
else:
sub_entry.device = _pvi_mapping[sub_entry.access](*sub_entry.values)
entry.sub_entries[sub_name] = sub_entry

async def pvi_get(
read_pv: str, timeout: float = DEFAULT_TIMEOUT
) -> Dict[str, PVIEntry]:
"""Makes a PvaSignalBackend purely to connect to PVI information.
def _get_common_device_types(
self, name: str, common_device: Type[Device]
) -> Optional[Type[Device]]:
return get_type_hints(common_device).get(name, {})

This backend is simply thrown away at the end of this method. This is useful
because the backend handles a CancelledError exception that gets thrown on
timeout, and therefore can be used for error reporting."""
backend: SignalBackend = PvaSignalBackend(None, read_pv, read_pv)
await backend.connect(timeout=timeout)
d: Dict[str, Dict[str, Dict[str, str]]] = await backend.get_value()
pv_info = d.get("pvi") or {}
result = {}
def initialize_device(
self,
entry: PVIEntry,
common_device: Optional[Type[Device]] = None,
):
"""Recursively iterates through the tree of PVI entries and creates devices.
for attr_name, attr_info in pv_info.items():
result[attr_name] = PVIEntry(**attr_info) # type: ignore
Args:
entry The current PVI entry
common_device The common device type for the current entry
if it exists, else None
Returns:
The initialised device containing it's signals, all typed.
"""

return result
assert entry.sub_entries
for sub_name, sub_entries in entry.sub_entries.items():
sub_common_device = (
self._get_common_device_types(sub_name, common_device)
if common_device
else None
)

if isinstance(sub_entries, dict) and (
len(sub_entries) != 1 or (get_origin(sub_common_device) == DeviceVector)
):

def make_signal(signal_pvi: PVIEntry, dtype: Optional[Type[T]] = None) -> Signal[T]:
"""Make a signal.
sub_device: Union[DeviceVector, Device] = DeviceVector()
for sub_split_number, sub_entry in sub_entries.items():
if not sub_entry.device: # If the entry is't a signal
if (
sub_common_device
and get_origin(sub_common_device) == DeviceVector
):
sub_common_device = get_args(sub_common_device)[0]
sub_entry.device = (
sub_common_device() if sub_common_device else Device()
)
self.initialize_device(
sub_entry, common_device=sub_common_device
)
assert isinstance(sub_device, DeviceVector)
sub_device[sub_split_number] = sub_entry.device
else:
if isinstance(sub_entries, dict):
sub_device = sub_common_device() if sub_common_device else Device()
assert list(sub_entries) == [1]
sub_entries[1].device = sub_device
self.initialize_device(
sub_entries[1], common_device=sub_common_device
)
else:
assert sub_entries.device
sub_device = sub_entries.device

This assumes datatype is None so it can be used to create dynamic signals.
setattr(entry.device, sub_name, sub_device)

if common_device:
common_sub_devices = get_type_hints(common_device)
for sub_name, sub_device in common_sub_devices.items():
if sub_name in ("_name", "parent"):
continue
if sub_name not in entry.sub_entries:
raise RuntimeError(
f"sub device `{sub_name}:{type(sub_device)}` was not provided"
" by pvi"
)


def _strip_union(field: Union[Union[T], T]) -> T:
if get_origin(field) is Union:
args = get_args(field)
for arg in args:
if arg is not type(None):
return arg

return field


def _strip_device_vector(
field: Union[DeviceVector[Device], Device]
) -> Tuple[bool, Device]:
if get_origin(field) is DeviceVector:
return True, get_args(field)[0]
return False, field


def _sim_common_blocks(device: Device, stripped_type: Optional[Type] = None):

device_t = stripped_type or type(device)
for sub_name, sub_device_t in get_type_hints(device_t).items():
if sub_name in ("_name", "parent"):
continue

# we'll take the first type in the union which isn't NoneType
sub_device_t = _strip_union(sub_device_t)
is_device_vector, sub_device_t = _strip_device_vector(sub_device_t)
is_signal = (origin := get_origin(sub_device_t)) and issubclass(origin, Signal)

if is_signal:
signal_type = get_args(sub_device_t)[0]
print("DEBUG: SIGNAL TYPE", signal_type)
print("DEBUG: SIGNAL ARGS", get_args(sub_device_t))
sub_device = sub_device_t(SimSignalBackend(signal_type, sub_name))
elif is_device_vector:
sub_device = DeviceVector(
{
1: sub_device_t(name=f"{device.name}-{sub_name}-1"),
2: sub_device_t(name=f"{device.name}-{sub_name}-2"),
}
)
else:
sub_device = sub_device_t(name=f"{device.name}-{sub_name}")

if not is_signal:
if is_device_vector:
for sub_device_in_vector in sub_device.values():
_sim_common_blocks(sub_device_in_vector, stripped_type=sub_device_t)
else:
_sim_common_blocks(sub_device, stripped_type=sub_device_t)

setattr(device, sub_name, sub_device)


async def fill_pvi_entries(
device: Device, root_pv: str, timeout=DEFAULT_TIMEOUT, sim=True
):
"""
operations = frozenset(signal_pvi.keys())
pvs = [signal_pvi[i] for i in operations] # type: ignore
signal_factory = _pvi_mapping[operations]
Fills a `device` with signals from a the `root_pvi:PVI` table.
write_pv = "pva://" + pvs[0]
read_pv = write_pv if len(pvs) < 2 else "pva://" + pvs[1]
If the device names match with parent devices of `device` then types are used.
"""
if not sim:
# check the pvi table for devices and fill the device with them
parser = PVIParser(root_pv, timeout=timeout)
await parser.get_pvi_entries()
parser.root_entry.device = device
parser.initialize_device(parser.root_entry, common_device=type(device))

return signal_factory(dtype, read_pv, write_pv)
if sim:
# set up sim signals for the common annotations
_sim_common_blocks(device)
3 changes: 1 addition & 2 deletions src/ophyd_async/panda/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .panda import PandA, PcapBlock, PulseBlock, PVIEntry, SeqBlock, SeqTable
from .panda import PandA, PcapBlock, PulseBlock, SeqBlock
from .panda_controller import PandaPcapController
from .table import (
SeqTable,
Expand All @@ -13,7 +13,6 @@
"PandA",
"PcapBlock",
"PulseBlock",
"PVIEntry",
"seq_table_from_arrays",
"seq_table_from_rows",
"SeqBlock",
Expand Down
Loading

0 comments on commit 833a6ec

Please sign in to comment.