Skip to content

Commit

Permalink
refactored the pvi parser
Browse files Browse the repository at this point in the history
  • Loading branch information
evalott100 committed Apr 5, 2024
1 parent 61a1115 commit 3eb486d
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 153 deletions.
4 changes: 2 additions & 2 deletions src/ophyd_async/epics/pvi/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .pvi import PVIEntry, PVIParser, fill_pvi_entries
from .pvi import PVIEntry, fill_pvi_entries

__all__ = ["PVIEntry", "PVIParser", "fill_pvi_entries"]
__all__ = ["PVIEntry", "fill_pvi_entries"]
262 changes: 111 additions & 151 deletions src/ophyd_async/epics/pvi/pvi.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
Callable,
Dict,
FrozenSet,
List,
Literal,
Optional,
Tuple,
Expand Down Expand Up @@ -66,16 +65,10 @@ class PVIEntry:
This could either be a signal or a sub-table.
"""

access: Access
values: List[str]
# `sub_entries` if the signal is a PVI table
# If a sub device is a device vector then it will be represented by a further dict
pvi_pv: Optional[str] = None
sub_entries: Optional[Dict[str, Union[Dict[int, "PVIEntry"], "PVIEntry"]]] = None
device: Optional[Device] = None

@property
def is_pvi_table(self) -> bool:
return len(self.values) == 1 and self.values[0].endswith(":PVI")
common_device_type: Optional[Type[Device]] = None


def _verify_common_blocks(entry: PVIEntry, common_device: Type[Device]):
Expand Down Expand Up @@ -114,145 +107,42 @@ def _verify_common_blocks(entry: PVIEntry, common_device: Type[Device]):
}


class PVIParser:
def __init__(
self,
root_pv: str,
timeout=DEFAULT_TIMEOUT,
):
self.root_entry = PVIEntry(
access=frozenset({"d"}), values=[root_pv], sub_entries={}
)
self.timeout = timeout

async def get_pvi_entries(self, entry: Optional[PVIEntry] = None):
"""Creates signals from a top level PVI table"""
if not entry:
entry = self.root_entry

if not entry.is_pvi_table:
raise RuntimeError(f"{entry.values[0]} is not a PVI table")

pvi_table_signal_backend: PvaSignalBackend = PvaSignalBackend(
None, entry.values[0], entry.values[0]
)
await pvi_table_signal_backend.connect(
timeout=self.timeout
) # create table signal backend

pva_table = await pvi_table_signal_backend.get_value()
entry.sub_entries = {}

for sub_name, pva_entries in pva_table["pvi"].items():
sub_entry = PVIEntry(
access=frozenset(pva_entries.keys()),
values=list(pva_entries.values()),
sub_entries={},
)

if sub_entry.is_pvi_table:
sub_split_name, sub_split_number = _strip_number_from_string(sub_name)
if not sub_split_number:
sub_split_number = 1

await self.get_pvi_entries(entry=sub_entry)
entry.sub_entries[sub_split_name] = entry.sub_entries.get(
sub_split_name, {}
)
entry.sub_entries[sub_split_name][
sub_split_number
] = sub_entry # type: ignore
else:
entry.sub_entries[sub_name] = sub_entry

def initialize_device(
self,
entry: PVIEntry,
common_device_type: Optional[Type[Device]] = None,
):
"""Recursively iterates through the tree of PVI entries and creates devices.
Parameters
----------
entry:
The current PVI entry
common_device_type:
The common device type for the current entry
if it exists, else None
Returns
-------
The initialised device containing it's signals, all typed.
"""
def _parse_type(
is_pvi_table: bool,
number_suffix: Optional[int],
common_device_type: Optional[Type[Device]],
):
if common_device_type:
# pre-defined type
device_type = _strip_union(common_device_type)
is_device_vector, device_type = _strip_device_vector(device_type)
import inspect

if ((origin := get_origin(device_type)) and issubclass(origin, Signal)) or (
inspect.isclass(device_type) and issubclass(device_type, Signal)
):
# if device_type is of the form `Signal` or `Signal[type]`
is_signal = True
signal_dtype = get_args(device_type)[0]
else:
is_signal = False
signal_dtype = None

elif is_pvi_table:
# is a block, we can make it a DeviceVector if it ends ina number
is_device_vector = number_suffix is not None
is_signal = False
signal_dtype = None
device_type = Device
else:
# is a signal, signals aren't stored in DeviceVectors unless
# they're defined as such in the common_device_type
is_device_vector = False
is_signal = True
signal_dtype = None
device_type = Signal

assert entry.sub_entries
common_device_type_hints = (
get_type_hints(common_device_type) if common_device_type else None
)
for sub_name, sub_entries in entry.sub_entries.items():
sub_common_device_type = None
if common_device_type_hints:
sub_common_device_type = common_device_type_hints.get(sub_name, None)
sub_common_device_type = _strip_union(sub_common_device_type)
pre_defined_device_vector, sub_common_device_type = (
_strip_device_vector(sub_common_device_type)
)

if isinstance(sub_entries, dict) and (
len(sub_entries) != 1 or pre_defined_device_vector
):
sub_device: Union[DeviceVector, Device] = DeviceVector()

for sub_split_number, sub_entry in sub_entries.items():
if sub_entry.is_pvi_table: # If the entry isn't a signal
if (
sub_common_device_type
and get_origin(sub_common_device_type) == DeviceVector
):
sub_common_device_type = get_args(sub_common_device_type)[0]
sub_entry.device = (
sub_common_device_type()
if sub_common_device_type
else Device()
)
self.initialize_device(
sub_entry, common_device_type=sub_common_device_type
)
else: # entry is a signal
signal_type = (
get_args(sub_common_device_type)[0]
if sub_common_device_type
else None
)
sub_entry.device = _pvi_mapping[sub_entry.access](
signal_type, *sub_entry.values
)
assert isinstance(sub_device, DeviceVector)
sub_device[sub_split_number] = sub_entry.device
else:
if isinstance(sub_entries, dict):
sub_device = (
sub_common_device_type() if sub_common_device_type else Device()
)
assert list(sub_entries) == [1]
sub_entries[1].device = sub_device
self.initialize_device(
sub_entries[1], common_device_type=sub_common_device_type
)
else: # entry is a signal
signal_type = (
get_args(sub_common_device_type)[0]
if sub_common_device_type
else None
)
sub_device = _pvi_mapping[sub_entries.access](
signal_type, *sub_entries.values
)

setattr(entry.device, sub_name, sub_device)

# Check that all predefined devices are present
if common_device_type:
_verify_common_blocks(entry, common_device_type)
return is_device_vector, is_signal, signal_dtype, device_type


def _sim_common_blocks(device: Device, stripped_type: Optional[Type] = None):
Expand Down Expand Up @@ -301,6 +191,75 @@ def _sim_common_blocks(device: Device, stripped_type: Optional[Type] = None):
setattr(device, sub_name, sub_device)


async def _get_pvi_entries(entry: PVIEntry, timeout=DEFAULT_TIMEOUT):
pvi_table_signal_backend: PvaSignalBackend = PvaSignalBackend(
None, entry.pvi_pv, entry.pvi_pv
)
await pvi_table_signal_backend.connect(
timeout=timeout
) # create table signal backend

pva_table = (await pvi_table_signal_backend.get_value())["pvi"]
sub_entries = {}
common_device_type_hints = (
get_type_hints(entry.common_device_type) if entry.common_device_type else {}
)
for sub_name, pva_entries in pva_table.items():
pvs = list(pva_entries.values())
is_pvi_table = len(pvs) == 1 and pvs[0].endswith(":PVI")
sub_name_split, sub_number_split = _strip_number_from_string(sub_name)
is_device_vector, is_signal, signal_dtype, device_type = _parse_type(
is_pvi_table,
sub_number_split,
common_device_type_hints.get(sub_name_split),
)
if is_signal:
device = _pvi_mapping[frozenset(pva_entries.keys())](signal_dtype, *pvs)
else:
device = device_type()

sub_entry = PVIEntry(
device=device,
common_device_type=device_type,
)

if is_device_vector:
# If device vector then we store sub_name -> {sub_number -> sub_entry}
# and aggregate into `DeviceVector` in `_set_device_attributes`
sub_number_split = 1 if sub_number_split is None else sub_number_split
if sub_name_split not in sub_entries:
sub_entries[sub_name_split] = {}
sub_entries[sub_name_split][sub_number_split] = sub_entry
else:
sub_entries[sub_name] = sub_entry

if is_pvi_table:
sub_entry.pvi_pv = pvs[0]
await _get_pvi_entries(sub_entry)

entry.sub_entries = sub_entries

if entry.common_device_type:
_verify_common_blocks(entry, entry.common_device_type)


def _set_device_attributes(entry: PVIEntry):
for sub_name, sub_entry in entry.sub_entries.items():
if isinstance(sub_entry, dict):
sub_device = DeviceVector()
for key, device_vector_sub_entry in sub_entry.items():
sub_device[key] = device_vector_sub_entry.device
if device_vector_sub_entry.pvi_pv:
_set_device_attributes(device_vector_sub_entry)

else:
sub_device = sub_entry.device
if sub_entry.pvi_pv:
_set_device_attributes(sub_entry)

setattr(entry.device, sub_name, sub_device)


async def fill_pvi_entries(
device: Device, root_pv: str, timeout=DEFAULT_TIMEOUT, sim=False
):
Expand All @@ -314,7 +273,8 @@ async def fill_pvi_entries(
_sim_common_blocks(device)
else:
# check the pvi table for devices and fill the device with them
parser = PVIParser(root_pv, timeout=timeout)
await parser.get_pvi_entries()
parser.root_entry.device = device
parser.initialize_device(parser.root_entry, common_device_type=type(device))
root_entry = PVIEntry(
pvi_pv=root_pv, device=device, common_device_type=type(device)
)
await _get_pvi_entries(root_entry, timeout=timeout)
_set_device_attributes(root_entry)

0 comments on commit 3eb486d

Please sign in to comment.