From 6743c194e5bc878bef75067593d83c71766fe011 Mon Sep 17 00:00:00 2001 From: Tom Willemsen Date: Thu, 21 Sep 2023 14:39:06 +0100 Subject: [PATCH] Support ntndarray --- src/ophyd_async/epics/_backend/_p4p.py | 73 +++++++++++++++++++++++--- tests/epics/test_records.db | 41 ++++++++++++++- tests/epics/test_signals.py | 40 ++++++++++++++ 3 files changed, 146 insertions(+), 8 deletions(-) diff --git a/src/ophyd_async/epics/_backend/_p4p.py b/src/ophyd_async/epics/_backend/_p4p.py index d92d36a604..a00d87e7c0 100644 --- a/src/ophyd_async/epics/_backend/_p4p.py +++ b/src/ophyd_async/epics/_backend/_p4p.py @@ -3,9 +3,10 @@ from asyncio import CancelledError from dataclasses import dataclass from enum import Enum -from typing import Any, Dict, Optional, Sequence, Type, Union +from typing import Any, Dict, List, Optional, Sequence, Type, Union from bluesky.protocols import Descriptor, Dtype, Reading +from p4p import Value from p4p.client.asyncio import Context, Subscription from ophyd_async.core import ( @@ -55,12 +56,54 @@ def descriptor(self, source: str, value) -> Descriptor: dtype = specifier_to_dtype[value.type().aspy("value")] return dict(source=source, dtype=dtype, shape=[]) + def metadata_fields(self) -> List[str]: + """ + Fields to request from PVA for metadata. + """ + return ["alarm", "timeStamp"] + + def value_fields(self) -> List[str]: + """ + Fields to request from PVA for the value. + """ + return ["value"] + class PvaArrayConverter(PvaConverter): def descriptor(self, source: str, value) -> Descriptor: return dict(source=source, dtype="array", shape=[len(value["value"])]) +class PvaNDArrayConverter(PvaConverter): + def metadata_fields(self) -> List[str]: + return super().metadata_fields() + ["dimension"] + + def _get_dimensions(self, value) -> List[int]: + dimensions: List[Value] = value["dimension"] + dims = [dim.size for dim in dimensions] + # Note: dimensions in NTNDArray are in fortran-like order + # with first index changing fastest. + # + # Therefore we need to reverse the order of the dimensions + # here to get back to a more usual C-like order with the + # last index changing fastest. + return dims[::-1] + + def value(self, value): + dims = self._get_dimensions(value) + return value["value"].reshape(dims) + + def descriptor(self, source: str, value) -> Descriptor: + dims = self._get_dimensions(value) + return dict(source=source, dtype="array", shape=dims) + + def write_value(self, value): + # No clear use-case for writing directly to an NDArray, and some + # complexities around flattening to 1-D - e.g. dimension-order. + # Don't support this for now. + raise TypeError("Writing to NDArray not supported") + + @dataclass class PvaEnumConverter(PvaConverter): enum_class: Type[Enum] @@ -112,7 +155,7 @@ def make_converter(datatype: Optional[Type], values: Dict[str, Any]) -> PvaConve if datatype and datatype != Sequence[str]: raise TypeError(f"{pv} has type [str] not {datatype.__name__}") return PvaArrayConverter() - elif "NTScalarArray" in typeid: + elif "NTScalarArray" in typeid or "NTNDArray" in typeid: pv_dtype = get_unique( {k: v["value"].dtype for k, v in values.items()}, "dtypes" ) @@ -124,7 +167,10 @@ def make_converter(datatype: Optional[Type], values: Dict[str, Any]) -> PvaConve raise TypeError(f"{pv} has type [{pv_dtype}] not {datatype.__name__}") if dtype != pv_dtype: raise TypeError(f"{pv} has type [{pv_dtype}] not [{dtype}]") - return PvaArrayConverter() + if "NTNDArray" in typeid: + return PvaNDArrayConverter() + else: + return PvaArrayConverter() elif "NTEnum" in typeid and datatype is bool: # Wanted a bool, but database represents as an enum pv_choices_len = get_unique( @@ -217,14 +263,23 @@ async def get_descriptor(self) -> Descriptor: value = await self.ctxt.get(self.read_pv) return self.converter.descriptor(self.source, value) + def _pva_request_string(self, fields: List[str]) -> str: + """ + Converts a list of requested fields into a PVA request string which can be + passed to p4p. + """ + return f"field({','.join(fields)})" + async def get_reading(self) -> Reading: - value = await self.ctxt.get( - self.read_pv, request="field(value,alarm,timestamp)" + request: str = self._pva_request_string( + self.converter.value_fields() + self.converter.metadata_fields() ) + value = await self.ctxt.get(self.read_pv, request=request) return self.converter.reading(value) async def get_value(self) -> T: - value = await self.ctxt.get(self.read_pv, "field(value)") + request: str = self._pva_request_string(self.converter.value_fields()) + value = await self.ctxt.get(self.read_pv, request=request) return self.converter.value(value) def set_callback(self, callback: Optional[ReadingValueCallback[T]]) -> None: @@ -236,8 +291,12 @@ def set_callback(self, callback: Optional[ReadingValueCallback[T]]) -> None: async def async_callback(v): callback(self.converter.reading(v), self.converter.value(v)) + request: str = self._pva_request_string( + self.converter.value_fields() + self.converter.metadata_fields() + ) + self.subscription = self.ctxt.monitor( - self.read_pv, async_callback, request="field(value,alarm,timestamp)" + self.read_pv, async_callback, request=request ) else: if self.subscription: diff --git a/tests/epics/test_records.db b/tests/epics/test_records.db index 307ad5cd3b..9fecbdb490 100644 --- a/tests/epics/test_records.db +++ b/tests/epics/test_records.db @@ -241,4 +241,43 @@ record(waveform, "$(P)table:enum") "": {"+type": "meta", "+channel": "VAL"} } }) -} \ No newline at end of file +} + +record(longout, "$(P)ntndarray:ArraySize0_RBV") { + field(VAL, "3") + field(PINI, "YES") + info(Q:group, { + "$(P)ntndarray":{ + "dimension[0].size":{+channel:"VAL", +type:"plain", +putorder:0} + } + }) +} + +record(longout, "$(P)ntndarray:ArraySize1_RBV") { + field(VAL, "2") + field(PINI, "YES") + info(Q:group, { + "$(P)ntndarray":{ + "dimension[1].size":{+channel:"VAL", +type:"plain", +putorder:0} + } + }) +} + +record(waveform, "$(P)ntndarray:data") +{ + field(FTVL, "INT64") + field(NELM, "6") + field(INP, {const:[0, 0, 0, 0, 0, 0]}) + field(PINI, "YES") + info(Q:group, { + "$(P)ntndarray":{ + +id:"epics:nt/NTNDArray:1.0", + "value":{ + +type:"any", + +channel:"VAL", + +trigger:"*", + }, + "": {+type:"meta", +channel:"SEVR"} + } + }) +} diff --git a/tests/epics/test_signals.py b/tests/epics/test_signals.py index fc61ac39f1..c5d03fee09 100644 --- a/tests/epics/test_signals.py +++ b/tests/epics/test_signals.py @@ -5,6 +5,7 @@ import subprocess import sys import time +from contextlib import closing from dataclasses import dataclass from enum import Enum from pathlib import Path @@ -302,6 +303,45 @@ async def test_pva_table(ioc: IOC) -> None: q.close() +async def test_pva_ntdarray(ioc: IOC): + if ioc.protocol == "ca": + # CA can't do ndarray + return + + put = np.array([1, 2, 3, 4, 5, 6], dtype=np.int64).reshape((2, 3)) + initial = np.zeros_like(put) + + backend = await ioc.make_backend(npt.NDArray[np.int64], "ntndarray") + + # Backdoor into the "raw" data underlying the NDArray in QSrv + # not supporting direct writes to NDArray at the moment. + raw_data_backend = await ioc.make_backend(npt.NDArray[np.int64], "ntndarray:data") + + # Make a monitor queue that will monitor for updates + for i, p in [(initial, put), (put, initial)]: + with closing(MonitorQueue(backend)) as q: + assert { + "source": backend.source, + "dtype": "array", + "shape": [2, 3], + } == await backend.get_descriptor() + # Check initial value + await q.assert_updates(pytest.approx(i)) + await raw_data_backend.put(p.flatten()) + await q.assert_updates(pytest.approx(p)) + + +async def test_writing_to_ndarray_raises_typeerror(ioc: IOC): + if ioc.protocol == "ca": + # CA can't do ndarray + return + + backend = await ioc.make_backend(npt.NDArray[np.int64], "ntndarray") + + with pytest.raises(TypeError): + await backend.put(np.zeros((6,), dtype=np.int64)) + + async def test_non_existant_errors(ioc: IOC): backend = await ioc.make_backend(str, "non-existant", connect=False) # Can't use asyncio.wait_for on python3.8 because of