Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2: Initial support for NTNDArray and correct timestamp -> timeStamp #1148

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 66 additions & 7 deletions ophyd/v2/_p4p.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
from asyncio import CancelledError
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, Optional, Sequence, Type, Union
from typing import Any, Dict, List, Optional, Sequence, Type, Union

from bluesky.protocols import Descriptor, Dtype, Reading
from p4p import Value
from p4p.client.asyncio import Context, Subscription

from .core import (
Expand Down Expand Up @@ -55,12 +56,54 @@ def descriptor(self, source: str, value) -> Descriptor:
dtype = specifier_to_dtype[value.type().aspy("value")]
return dict(source=source, dtype=dtype, shape=[])

def metadata_fields(self) -> List[str]:
"""
Fields to request from PVA for metadata.
"""
return ["alarm", "timeStamp"]

def value_fields(self) -> List[str]:
"""
Fields to request from PVA for the value.
"""
return ["value"]


class PvaArrayConverter(PvaConverter):
def descriptor(self, source: str, value) -> Descriptor:
return dict(source=source, dtype="array", shape=[len(value["value"])])


class PvaNDArrayConverter(PvaConverter):
def metadata_fields(self) -> List[str]:
return super().metadata_fields() + ["dimension"]

def _get_dimensions(self, value) -> List[int]:
dimensions: List[Value] = value["dimension"]
dims = [dim.size for dim in dimensions]
# Note: dimensions in NTNDArray are in fortran-like order
# with first index changing fastest.
#
# Therefore we need to reverse the order of the dimensions
# here to get back to a more usual C-like order with the
# last index changing fastest.
return dims[::-1]

def value(self, value):
dims = self._get_dimensions(value)
return value["value"].reshape(dims)

def descriptor(self, source: str, value) -> Descriptor:
dims = self._get_dimensions(value)
return dict(source=source, dtype="array", shape=dims)

def write_value(self, value):
# No clear use-case for writing directly to an NDArray, and some
# complexities around flattening to 1-D - e.g. dimension-order.
# Don't support this for now.
raise TypeError("Writing to NDArray not supported")


@dataclass
class PvaEnumConverter(PvaConverter):
enum_class: Type[Enum]
Expand Down Expand Up @@ -110,7 +153,7 @@ def make_converter(datatype: Optional[Type], values: Dict[str, Any]) -> PvaConve
if datatype and datatype != Sequence[str]:
raise TypeError(f"{pv} has type [str] not {datatype.__name__}")
return PvaArrayConverter()
elif "NTScalarArray" in typeid:
elif "NTScalarArray" in typeid or "NTNDArray" in typeid:
pv_dtype = get_unique(
{k: v["value"].dtype for k, v in values.items()}, "dtypes"
)
Expand All @@ -122,7 +165,10 @@ def make_converter(datatype: Optional[Type], values: Dict[str, Any]) -> PvaConve
raise TypeError(f"{pv} has type [{pv_dtype}] not {datatype.__name__}")
if dtype != pv_dtype:
raise TypeError(f"{pv} has type [{pv_dtype}] not [{dtype}]")
return PvaArrayConverter()
if "NTNDArray" in typeid:
return PvaNDArrayConverter()
else:
return PvaArrayConverter()
elif "NTEnum" in typeid and datatype is bool:
# Wanted a bool, but database represents as an enum
pv_choices_len = get_unique(
Expand Down Expand Up @@ -213,14 +259,23 @@ async def get_descriptor(self) -> Descriptor:
value = await self.ctxt.get(self.read_pv)
return self.converter.descriptor(self.source, value)

def _pva_request_string(self, fields: List[str]) -> str:
"""
Converts a list of requested fields into a PVA request string which can be
passed to p4p.
"""
return f"field({','.join(fields)})"

async def get_reading(self) -> Reading:
value = await self.ctxt.get(
self.read_pv, request="field(value,alarm,timestamp)"
request: str = self._pva_request_string(
self.converter.value_fields() + self.converter.metadata_fields()
)
value = await self.ctxt.get(self.read_pv, request=request)
return self.converter.reading(value)

async def get_value(self) -> T:
value = await self.ctxt.get(self.read_pv, "field(value)")
request: str = self._pva_request_string(self.converter.value_fields())
value = await self.ctxt.get(self.read_pv, request)
return self.converter.value(value)

def set_callback(self, callback: Optional[ReadingValueCallback[T]]) -> None:
Expand All @@ -232,8 +287,12 @@ def set_callback(self, callback: Optional[ReadingValueCallback[T]]) -> None:
async def async_callback(v):
callback(self.converter.reading(v), self.converter.value(v))

request: str = self._pva_request_string(
self.converter.value_fields() + self.converter.metadata_fields()
)

self.subscription = self.ctxt.monitor(
self.read_pv, async_callback, request="field(value,alarm,timestamp)"
self.read_pv, async_callback, request=request
)
else:
if self.subscription:
Expand Down
39 changes: 39 additions & 0 deletions ophyd/v2/tests/test_epics.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import subprocess
import sys
import time
from contextlib import closing
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
Expand Down Expand Up @@ -300,6 +301,44 @@ async def test_pva_table(ioc: IOC) -> None:
q.close()


async def test_pva_ntdarray(ioc: IOC):
if ioc.protocol == "ca":
# CA can't do ndarray
return

put = np.array([1, 2, 3, 4, 5, 6], dtype=np.int64).reshape((2, 3))
initial = np.zeros_like(put)

backend = await ioc.make_backend(npt.NDArray[np.int64], "ntndarray")

# Backdoor into the "raw" data underlying the NDArray in QSrv
raw_data_backend = await ioc.make_backend(npt.NDArray[np.int64], "ntndarray:data")

# Make a monitor queue that will monitor for updates
for i, p in [(initial, put), (put, initial)]:
with closing(MonitorQueue(backend)) as q:
assert {
"source": backend.source,
"dtype": "array",
"shape": [2, 3],
} == await backend.get_descriptor()
# Check initial value
await q.assert_updates(pytest.approx(i))
await raw_data_backend.put(p.flatten())
await q.assert_updates(pytest.approx(p))


async def test_writing_to_ndarray_raises_typeerror(ioc: IOC):
if ioc.protocol == "ca":
# CA can't do ndarray
return

backend = await ioc.make_backend(npt.NDArray[np.int64], "ntndarray")

with pytest.raises(TypeError):
await backend.put(np.zeros((6,), dtype=np.int64))


async def test_non_existant_errors(ioc: IOC):
backend = await ioc.make_backend(str, "non-existant", connect=False)
# Can't use asyncio.wait_for on python3.8 because of
Expand Down
41 changes: 40 additions & 1 deletion ophyd/v2/tests/test_records.db
Original file line number Diff line number Diff line change
Expand Up @@ -241,4 +241,43 @@ record(waveform, "$(P)table:enum")
"": {"+type": "meta", "+channel": "VAL"}
}
})
}
}

record(longout, "$(P)ntndarray:ArraySize0_RBV") {
field(VAL, "3")
field(PINI, "YES")
info(Q:group, {
"$(P)ntndarray":{
"dimension[0].size":{+channel:"VAL", +type:"plain", +putorder:0}
}
})
}

record(longout, "$(P)ntndarray:ArraySize1_RBV") {
field(VAL, "2")
field(PINI, "YES")
info(Q:group, {
"$(P)ntndarray":{
"dimension[1].size":{+channel:"VAL", +type:"plain", +putorder:0}
}
})
}

record(waveform, "$(P)ntndarray:data")
{
field(FTVL, "INT64")
field(NELM, "6")
field(INP, {const:[0, 0, 0, 0, 0, 0]})
field(PINI, "YES")
info(Q:group, {
"$(P)ntndarray":{
+id:"epics:nt/NTNDArray:1.0",
"value":{
+type:"any",
+channel:"VAL",
+trigger:"*",
},
"": {+type:"meta", +channel:"SEVR"}
}
})
}
Loading