Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use metadata from DB records to populate DataKeys #137

Closed
wants to merge 11 commits into from
96 changes: 78 additions & 18 deletions src/ophyd_async/epics/_backend/_aioca.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import logging
import sys
from dataclasses import dataclass
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, Optional, Sequence, Type, Union
from math import isnan, nan
from typing import Any, Dict, List, Optional, Sequence, Type, Union

from aioca import (
FORMAT_CTRL,
Expand All @@ -15,7 +16,7 @@
caput,
)
from aioca.types import AugmentedValue, Dbr, Format
from bluesky.protocols import Descriptor, Dtype, Reading
from bluesky.protocols import DataKey, Dtype, Reading
DiamondJoseph marked this conversation as resolved.
Show resolved Hide resolved
from epicscorelibs.ca import dbr

from ophyd_async.core import (
Expand All @@ -37,7 +38,52 @@
dbr.DBR_CHAR: "string",
dbr.DBR_LONG: "integer",
dbr.DBR_DOUBLE: "number",
dbr.DBR_ENUM: "string",
}
_common_meta = {
"units",
"precision",
}


def _data_key_from_augmented_value(
value: AugmentedValue,
*,
choices: Optional[List[str]] = None,
dtype: Optional[str] = None,
) -> DataKey:
"""Use the return value of get with FORMAT_CTRL to construct a DataKey
describing the signal. See docstring of AugmentedValue for expected
value fields by DBR type.

Args:
value (AugmentedValue): Description of the the return type of a DB record
kwargs: Overrides for the returned values.
e.g. to force treating a value as an Enum by passing choices

Returns:
DataKey: A rich DataKey describing the DB record
"""
source = f"ca://{value.name}"
assert value.ok, f"Error reading {source}: {value}"

scalar = value.element_count == 1
dtype = dtype or dbr_to_dtype[value.datatype]

d = DataKey(
source=source,
DiamondJoseph marked this conversation as resolved.
Show resolved Hide resolved
dtype=dtype if scalar else "array",
# strictly value.element_count >= len(value)
shape=[] if scalar else [len(value)],
DiamondJoseph marked this conversation as resolved.
Show resolved Hide resolved
)
for key in _common_meta:
attr = getattr(value, key, nan)
if isinstance(attr, str) or not isnan(attr):
d[key] = attr

if choices is not None:
d["choices"] = choices
return d


@dataclass
Expand All @@ -58,8 +104,8 @@ def reading(self, value: AugmentedValue):
alarm_severity=-1 if value.severity > 2 else value.severity,
)

def descriptor(self, source: str, value: AugmentedValue) -> Descriptor:
return dict(source=source, dtype=dbr_to_dtype[value.datatype], shape=[])
def descriptor(self, value: AugmentedValue) -> DataKey:
return _data_key_from_augmented_value(value)


class CaLongStrConverter(CaConverter):
Expand All @@ -72,14 +118,18 @@ def write_value(self, value: str):
return value + "\0"


class CaArrayConverter(CaConverter):
def descriptor(self, source: str, value: AugmentedValue) -> Descriptor:
return dict(source=source, dtype="array", shape=[len(value)])


@dataclass
class CaEnumConverter(CaConverter):
"""To prevent issues when a signal is restarted and returns with different enum
values or orders, we put treat an Enum signal as a DBR_STRING, and cache the
choices on this class.
"""

enum_class: Type[Enum]
choices: List[str] = field(init=False)

def __post_init__(self):
self.choices = [e.value for e in self.enum_class]

def write_value(self, value: Union[Enum, str]):
if isinstance(value, Enum):
Expand All @@ -90,9 +140,19 @@ def write_value(self, value: Union[Enum, str]):
def value(self, value: AugmentedValue):
return self.enum_class(value)

def descriptor(self, source: str, value: AugmentedValue) -> Descriptor:
choices = [e.value for e in self.enum_class]
return dict(source=source, dtype="string", shape=[], choices=choices)
def descriptor(self, value: AugmentedValue) -> DataKey:
# Sometimes DBR_TYPE returns as String, must pass choices still
return _data_key_from_augmented_value(value, choices=self.choices)
DiamondJoseph marked this conversation as resolved.
Show resolved Hide resolved


@dataclass
class CaBoolConverter(CaConverter):

def value(self, value: AugmentedValue) -> bool:
return bool(value)

def descriptor(self, value: AugmentedValue) -> DataKey:
return _data_key_from_augmented_value(value, dtype="bool")


class DisconnectedCaConverter(CaConverter):
Expand All @@ -113,7 +173,7 @@ def make_converter(
# Waveform of strings, check we wanted this
if datatype and datatype != Sequence[str]:
raise TypeError(f"{pv} has type [str] not {datatype.__name__}")
return CaArrayConverter(pv_dbr, None)
return CaConverter(pv_dbr, None)
elif is_array:
pv_dtype = get_unique({k: v.dtype for k, v in values.items()}, "dtypes")
# This is an array
Expand All @@ -124,15 +184,15 @@ def make_converter(
raise TypeError(f"{pv} has type [{pv_dtype}] not {datatype.__name__}")
if dtype != pv_dtype:
raise TypeError(f"{pv} has type [{pv_dtype}] not [{dtype}]")
return CaArrayConverter(pv_dbr, None)
return CaConverter(pv_dbr, None)
elif pv_dbr == dbr.DBR_ENUM and datatype is bool:
# Database can't do bools, so are often representated as enums, CA can do int
pv_choices_len = get_unique(
{k: len(v.enums) for k, v in values.items()}, "number of choices"
)
if pv_choices_len != 2:
raise TypeError(f"{pv} has {pv_choices_len} choices, can't map to bool")
return CaConverter(dbr.DBR_SHORT, dbr.DBR_SHORT)
return CaBoolConverter(dbr.DBR_SHORT, dbr.DBR_SHORT)
elif pv_dbr == dbr.DBR_ENUM:
# This is an Enum
pv_choices = get_unique(
Expand Down Expand Up @@ -216,9 +276,9 @@ async def _caget(self, format: Format) -> AugmentedValue:
timeout=None,
)

async def get_descriptor(self) -> Descriptor:
async def get_descriptor(self) -> DataKey:
value = await self._caget(FORMAT_CTRL)
return self.converter.descriptor(self.source, value)
return self.converter.descriptor(value)

async def get_reading(self) -> Reading:
value = await self._caget(FORMAT_TIME)
Expand Down
22 changes: 15 additions & 7 deletions src/ophyd_async/epics/_backend/_p4p.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import atexit
import logging
import time
from dataclasses import dataclass
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Optional, Sequence, Type, Union

Expand Down Expand Up @@ -109,7 +109,16 @@ def write_value(self, value):

@dataclass
class PvaEnumConverter(PvaConverter):
"""To prevent issues when a signal is restarted and returns with different enum
values or orders, we put treat an Enum signal as a DBR_STRING, and cache the
choices on this class.
"""

enum_class: Type[Enum]
choices: List[str] = field(init=False)

def __post_init__(self):
self.choices = [e.value for e in self.enum_class]

def write_value(self, value: Union[Enum, str]):
if isinstance(value, Enum):
Expand All @@ -121,16 +130,15 @@ def value(self, value):
return list(self.enum_class)[value["value"]["index"]]

def descriptor(self, source: str, value) -> Descriptor:
choices = [e.value for e in self.enum_class]
return dict(source=source, dtype="string", shape=[], choices=choices)
return dict(source=source, dtype="string", shape=[], choices=self.choices)


class PvaEnumBoolConverter(PvaConverter):
class PvaBoolConverter(PvaConverter):
def value(self, value):
return value["value"]["index"]
return bool(value["value"]["index"])

def descriptor(self, source: str, value) -> Descriptor:
return dict(source=source, dtype="integer", shape=[])
return dict(source=source, dtype="bool", shape=[])


class PvaTableConverter(PvaConverter):
Expand Down Expand Up @@ -208,7 +216,7 @@ def make_converter(datatype: Optional[Type], values: Dict[str, Any]) -> PvaConve
)
if pv_choices_len != 2:
raise TypeError(f"{pv} has {pv_choices_len} choices, can't map to bool")
return PvaEnumBoolConverter()
return PvaBoolConverter()
elif "NTEnum" in typeid:
# This is an Enum
pv_choices = get_unique(
Expand Down
Loading
Loading