Skip to content

Commit

Permalink
complete the pvi parser
Browse files Browse the repository at this point in the history
Fill the device with signals, all propertly typed.
Still need to refactor and add validation, tests, and
a decent sim mode.
  • Loading branch information
evalott100 committed Mar 13, 2024
1 parent 38d45fd commit c92443b
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 185 deletions.
116 changes: 109 additions & 7 deletions src/ophyd_async/epics/pvi.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,23 @@
from dataclasses import dataclass
from typing import Callable, Dict, FrozenSet, List, Literal, Optional, TypeVar
from typing import (
get_origin,
get_args,
Callable,
Dict,
Any,
FrozenSet,
List,
Type,
Literal,
get_type_hints,
Optional,
TypeVar,
Tuple,
Union,
Generator,
)

from ophyd_async.core import Device, DeviceVector
from ophyd_async.core.signal import Signal
from ophyd_async.core.signal_backend import SignalBackend
from ophyd_async.core.utils import DEFAULT_TIMEOUT
Expand All @@ -11,13 +28,28 @@
epics_signal_w,
epics_signal_x,
)
import re

T = TypeVar("T")
Access = FrozenSet[
Literal["r"] | Literal["w"] | Literal["rw"] | Literal["x"] | Literal["d"]
]


def _strip_number_from_string(string: str):
match = re.match(r"(.*?)(\d*)$", string)
name = match.group(1)
number = match.group(2) or None
if number:
number = int(number)
return name, number


def _get_member_type(cls: Type, attribute_name: str) -> Type:
if attribute_name in cls.__annotations__:
return cls.__annotations__[attribute_name]


@dataclass
class PVIEntry:
access: Access
Expand All @@ -33,22 +65,22 @@ def is_pvi_table(self) -> bool:

_pvi_mapping: Dict[FrozenSet[str], Callable[..., Signal]] = {
frozenset({"r", "w"}): lambda read_pv, write_pv: epics_signal_rw(
None, read_pv, write_pv
None, "pva://" + read_pv, "pva://" + write_pv
),
frozenset({"rw"}): lambda read_write_pv: epics_signal_rw(
None, read_write_pv, write_pv=read_write_pv
None, "pva://" + read_write_pv, write_pv="pva://" + read_write_pv
),
frozenset({"r"}): lambda read_pv: epics_signal_r(None, read_pv),
frozenset({"w"}): lambda write_pv: epics_signal_w(None, write_pv),
frozenset({"x"}): lambda write_pv: epics_signal_x(write_pv),
frozenset({"r"}): lambda read_pv: epics_signal_r(None, "pva://" + read_pv),
frozenset({"w"}): lambda write_pv: epics_signal_w(None, "pva://" + write_pv),
frozenset({"x"}): lambda write_pv: epics_signal_x("pva://" + write_pv),
}


class PVIParser:
def __init__(self, root_pv: str, timeout=DEFAULT_TIMEOUT):
self.timeout = timeout

self.root_entry = PVIEntry(access=frozenset({"d"}), values=[root_pv])
self._device_vectors = {}

async def get_pvi_entries(self, top_level_entry: Optional[PVIEntry] = None):
"""Creates signals from a top level PVI table"""
Expand Down Expand Up @@ -81,3 +113,73 @@ async def get_pvi_entries(self, top_level_entry: Optional[PVIEntry] = None):
entry.signal_backend = _pvi_mapping[entry.access](*entry.values)

top_level_entry.sub_entries[name] = entry

def make_device(
self,
entry: PVIEntry,
name: str = None,
common_device: Type[Device] = None,
) -> Tuple[Union[Device, Signal], str, Optional[int]]:

if name:
split_name, split_number = _strip_number_from_string(name)

if not entry.is_pvi_table:
return (entry.signal_backend, split_name, split_number)

# Initialise from the passed in common_devices, or make a new device
if device_type := get_type_hints(common_device).get(split_name):
origin = get_origin(device_type)
if origin == DeviceVector:
# It's possible for a block in panda to not end in a split
# number - e.g `seq` instead of `seq1` in panda if there's only
# one seq block
if not split_number:
split_number = 1
sub_common_device = get_args(device_type)[0]
device = DeviceVector({split_number: sub_common_device()})
else:
sub_common_device = device_type
device = device_type()
else:
sub_common_device = None
device = Device()

else:
# For the top level device
split_name, split_number = None, None
sub_common_device = common_device
device = Device()

for sub_name, sub_entry in entry.sub_entries.items():
sub_device, sub_split_name, sub_split_number = self.make_device(
sub_entry,
name=sub_name,
common_device=sub_common_device,
)
if attr := getattr(device, sub_split_name, None):
assert sub_split_number
if isinstance(attr, DeviceVector) and isinstance(sub_device, DeviceVector):
attr.update(sub_device)
elif isinstance(attr, DeviceVector):
attr.update({sub_split_number: sub_device})
else:
setattr(device, sub_split_name, sub_device)

return device, split_name, split_number

def generate_all_devices(self, common_device=None):
return self.make_device(self.root_entry, common_device=common_device)[0]


async def fill_pvi_entries(
device: Device, root_pv: str, timeout=DEFAULT_TIMEOUT, sim=True
):
if not sim:
parser = PVIParser(root_pv, timeout=timeout)
await parser.get_pvi_entries()

for sub_name, sub_device in parser.generate_all_devices(
common_device=type(device)
).__dict__.items():
setattr(device, sub_name, sub_device)
3 changes: 1 addition & 2 deletions src/ophyd_async/panda/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .panda import PandA, PcapBlock, PulseBlock, PVIEntry, SeqBlock, SeqTable
from .panda import PandA, PcapBlock, PulseBlock, SeqBlock, SeqTable
from .table import (
SeqTable,
SeqTableRow,
Expand All @@ -12,7 +12,6 @@
"PandA",
"PcapBlock",
"PulseBlock",
"PVIEntry",
"seq_table_from_arrays",
"seq_table_from_rows",
"SeqBlock",
Expand Down
183 changes: 7 additions & 176 deletions src/ophyd_async/panda/panda.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import re
from typing import Dict, Optional, Tuple, get_args, get_origin, get_type_hints
from typing import Optional, Tuple, get_args, get_origin, get_type_hints

from ophyd_async.core import (
DEFAULT_TIMEOUT,
Expand All @@ -10,7 +10,7 @@
SignalR,
SignalRW,
)
from ophyd_async.epics.pvi import PVIEntry, PVIParser
from ophyd_async.epics.pvi import fill_pvi_entries
from ophyd_async.panda.table import SeqTable


Expand All @@ -28,39 +28,6 @@ class PcapBlock(Device):
active: SignalR[bool]


def _block_name_number(block_name: str) -> Tuple[str, Optional[int]]:
"""Maps a panda block name to a block and number.
There are exceptions to this rule; some blocks like pcap do not contain numbers.
Other blocks may contain numbers and letters, but no numbers at the end.
Such block names will only return the block name, and not a number.
If this function returns both a block name and number, it should be instantiated
into a device vector."""
m = re.match("^([0-9a-z_-]*)([0-9]+)$", block_name)
if m is not None:
name, num = m.groups()
return name, int(num or 1) # just to pass type checks.

return block_name, None


def _remove_inconsistent_blocks(device_level_pvi: PVIEntry) -> None:
"""Remove blocks from pvi information.
This is needed because some pandas have 'pcap' and 'pcap1' blocks, which are
inconsistent with the assumption that pandas should only have a 'pcap' block,
for example.
"""
pvi_keys = set(device_level_pvi.sub_entries.keys())
for k in pvi_keys:
kn = re.sub(r"\d*$", "", k)
if kn and k != kn and kn in pvi_keys:
del device_level_pvi.sub_entries[k]


class CommonPandABlocks(Device):
pulse: DeviceVector[PulseBlock]
seq: DeviceVector[SeqBlock]
Expand All @@ -69,95 +36,8 @@ class CommonPandABlocks(Device):

class PandA(CommonPandABlocks, Device):
def __init__(self, prefix: str, name: str = "") -> None:
Device.__init__(self, name)
self._prefix = prefix

def verify_block(self, name: str, num: Optional[int]):
"""Given a block name and number, return information about a block."""
anno = get_type_hints(self, globalns=globals()).get(name)

block: Device = Device()

if anno:
type_args = get_args(anno)
block = type_args[0]() if type_args else anno()

if not type_args:
assert num is None, f"Only expected one {name} block, got {num}"

return block

async def _make_block(
self,
block_name: str,
block_pvi: PVIEntry,
num: Optional[int],
timeout: float = DEFAULT_TIMEOUT,
):
"""Makes a block given a block name containing relevant signals.
Loops through the signals in the block (found using type hints), if not in
sim mode then does a pvi call, and identifies this signal from the pvi call.
"""
block = self.verify_block(block_name, num)

field_annos = get_type_hints(block, globalns=globals())

for sig_name, sig_type in field_annos.items():
# if not in sim mode,
if block_pvi:
print("DEBUG:", sig_name, block_pvi.sub_entries)
sub_entry: Optional[PVIEntry] = block_pvi.sub_entries.get(sig_name)
if sub_entry is None:
raise Exception(
f"{self.__class__.__name__} has a {sig_name} block containing a/"
+ f"an {sig_name} signal which has not been retrieved by PVI."
)

# Already created in pvi.py
signal = sub_entry.signal_backend

setattr(block, sig_name, signal)

# checks for any extra pvi information not contained in this class
if block_pvi:
for signal_name, sub_entry in block_pvi.sub_entries.items():
if not hasattr(block, signal_name):
# makes any extra signals
setattr(block, signal_name, sub_entry.signal_backend)

return block

async def _make_untyped_block(
self, block_entry: PVIEntry, timeout: float = DEFAULT_TIMEOUT
):
"""Populates a block using PVI information.
This block is not typed as part of the PandA interface but needs to be
included dynamically anyway.
"""
block = Device()

for sub_name, sub_entry in block_entry.sub_entries.items():
setattr(block, sub_name, sub_entry.signal_backend)

return block

# TODO redo to set_panda_block? confusing name
def set_attribute(self, name: str, num: Optional[int], block: Device):
"""Set a block on the panda.
Need to be able to set device vectors on the panda as well, e.g. if num is not
None, need to be able to make a new device vector and start populating it...
"""
anno = get_type_hints(self, globalns=globals()).get(name)

# if it's an annotated device vector, or it isn't but we've got a number then
# make a DeviceVector on the class
if get_origin(anno) == DeviceVector or (not anno and num is not None):
self.__dict__.setdefault(name, DeviceVector())[num] = block
else:
setattr(self, name, block)
super().__init__(name)

async def connect(
self, sim: bool = False, timeout: float = DEFAULT_TIMEOUT
Expand All @@ -170,56 +50,7 @@ async def connect(
If there's no pvi information, that's because we're in sim mode. In that case,
makes all required blocks.
"""
pvi_parser = PVIParser(self._prefix + "PVI", timeout=timeout)

if not sim:
await pvi_parser.get_pvi_entries()
_remove_inconsistent_blocks(pvi_parser.root_entry)

hints = {
attr_name: attr_type
for attr_name, attr_type in get_type_hints(self, globalns=globals()).items()
if not attr_name.startswith("_")
}

# create all the blocks pvi says it should have,
if pvi_parser.root_entry.sub_entries:
for (
block_name,
block_field,
) in pvi_parser.root_entry.sub_entries.items():
name, num = _block_name_number(block_name)

if name in hints:
block = await self._make_block(
name, block_field, num, timeout=timeout
)
else:
block = await self._make_untyped_block(block_field, timeout=timeout)

self.set_attribute(name, num, block)

# then check if the ones defined in this class are in the pvi info
# make them if there is no pvi info, i.e. sim mode.
for block_name in hints.keys():
if sim:
num = 1 if get_origin(hints[block_name]) == DeviceVector else None
block = await self._make_block(
block_name, num, "sim://", sim=sim, timeout=timeout
)
self.set_attribute(block_name, num, block)
else:
if get_origin(hints[block_name]) == DeviceVector:
block_name += "1"

entry: Optional[PVIEntry] = pvi_parser.root_entry.sub_entries.get(
block_name
)

assert entry, f"Expected PandA to contain {block_name} block."
assert list(entry.access) == [
"d"
], f"Expected PandA to only contain blocks, got {entry}"

self.set_name(self.name)
await Device.connect(self, sim)

await fill_pvi_entries(self, self._prefix + "PVI", timeout=timeout, sim=sim)

await super().connect(sim)

0 comments on commit c92443b

Please sign in to comment.