From febf9ceab0744a4699ad5d208f7b737cb3b7376d Mon Sep 17 00:00:00 2001 From: Callum Forrester Date: Mon, 17 Jun 2024 11:24:18 +0100 Subject: [PATCH] Derive PandA describe from datasets PV (#369) * Dervice PandA describe from datasets PV * Fix resource kwargs inconsistencies with nexus writer --- src/ophyd_async/panda/_common_blocks.py | 3 +- src/ophyd_async/panda/_hdf_panda.py | 2 - src/ophyd_async/panda/_table.py | 10 + src/ophyd_async/panda/writers/_hdf_writer.py | 127 +++--------- .../panda/writers/_panda_hdf_file.py | 12 +- tests/panda/test_hdf_panda.py | 46 +++-- tests/panda/test_writer.py | 184 +++++++++--------- 7 files changed, 147 insertions(+), 237 deletions(-) diff --git a/src/ophyd_async/panda/_common_blocks.py b/src/ophyd_async/panda/_common_blocks.py index 9c8501f52f..b320402288 100644 --- a/src/ophyd_async/panda/_common_blocks.py +++ b/src/ophyd_async/panda/_common_blocks.py @@ -3,7 +3,7 @@ from enum import Enum from ophyd_async.core import Device, DeviceVector, SignalR, SignalRW -from ophyd_async.panda._table import SeqTable +from ophyd_async.panda._table import DatasetTable, SeqTable class DataBlock(Device): @@ -14,6 +14,7 @@ class DataBlock(Device): num_captured: SignalR[int] capture: SignalRW[bool] flush_period: SignalRW[float] + datasets: SignalR[DatasetTable] class PulseBlock(Device): diff --git a/src/ophyd_async/panda/_hdf_panda.py b/src/ophyd_async/panda/_hdf_panda.py index 56e1728512..5d1c357b88 100644 --- a/src/ophyd_async/panda/_hdf_panda.py +++ b/src/ophyd_async/panda/_hdf_panda.py @@ -28,9 +28,7 @@ def __init__( create_children_from_annotations(self) controller = PandaPcapController(pcap=self.pcap) writer = PandaHDFWriter( - prefix=prefix, directory_provider=directory_provider, - name_provider=lambda: name, panda_device=self, ) super().__init__( diff --git a/src/ophyd_async/panda/_table.py b/src/ophyd_async/panda/_table.py index 339301bb20..1f9efdc349 100644 --- a/src/ophyd_async/panda/_table.py +++ b/src/ophyd_async/panda/_table.py @@ -6,6 +6,16 @@ import numpy.typing as npt +class PandaHdf5DatasetType(str, Enum): + FLOAT_64 = "float64" + UINT_32 = "uint32" + + +class DatasetTable(TypedDict): + name: npt.NDArray[np.str_] + hdf5_type: Sequence[PandaHdf5DatasetType] + + class SeqTrigger(str, Enum): IMMEDIATE = "Immediate" BITA_0 = "BITA=0" diff --git a/src/ophyd_async/panda/writers/_hdf_writer.py b/src/ophyd_async/panda/writers/_hdf_writer.py index 60f4a67bf8..64575f6638 100644 --- a/src/ophyd_async/panda/writers/_hdf_writer.py +++ b/src/ophyd_async/panda/writers/_hdf_writer.py @@ -1,8 +1,6 @@ import asyncio -from dataclasses import dataclass -from enum import Enum from pathlib import Path -from typing import Any, AsyncGenerator, AsyncIterator, Dict, List, Optional +from typing import AsyncGenerator, AsyncIterator, Dict, List, Optional from bluesky.protocols import DataKey, StreamAsset from p4p.client.thread import Context @@ -10,98 +8,25 @@ from ophyd_async.core import ( DEFAULT_TIMEOUT, DetectorWriter, - Device, DirectoryProvider, - NameProvider, - SignalR, wait_for_value, ) from ophyd_async.core.signal import observe_value -from ophyd_async.panda import CommonPandaBlocks +from .._common_blocks import CommonPandaBlocks from ._panda_hdf_file import _HDFDataset, _HDFFile -class Capture(str, Enum): - # Capture signals for the HDF Panda - No = "No" - Value = "Value" - Diff = "Diff" - Sum = "Sum" - Mean = "Mean" - Min = "Min" - Max = "Max" - MinMax = "Min Max" - MinMaxMean = "Min Max Mean" - - -def get_capture_signals( - block: Device, path_prefix: Optional[str] = "" -) -> Dict[str, SignalR]: - """Get dict mapping a capture signal's name to the signal itself""" - if not path_prefix: - path_prefix = "" - signals: Dict[str, SignalR[Any]] = {} - for attr_name, attr in block.children(): - # Capture signals end in _capture, but num_capture is a red herring - if attr_name == "num_capture": - continue - dot_path = f"{path_prefix}{attr_name}" - if isinstance(attr, SignalR) and attr_name.endswith("_capture"): - signals[dot_path] = attr - attr_signals = get_capture_signals(attr, path_prefix=dot_path + ".") - signals.update(attr_signals) - return signals - - -@dataclass -class CaptureSignalWrapper: - signal: SignalR - capture_type: Capture - - -# This should return a dictionary which contains a dict, containing the Capture -# signal object, and the value of that signal -async def get_signals_marked_for_capture( - capture_signals: Dict[str, SignalR], -) -> Dict[str, CaptureSignalWrapper]: - # Read signals to see if they should be captured - do_read = [signal.get_value() for signal in capture_signals.values()] - - signal_values = await asyncio.gather(*do_read) - - assert len(signal_values) == len( - capture_signals - ), "Length of read signals are different to length of signals" - - signals_to_capture: Dict[str, CaptureSignalWrapper] = {} - for signal_path, signal_object, signal_value in zip( - capture_signals.keys(), capture_signals.values(), signal_values - ): - signal_path = signal_path.replace("_capture", "") - if (signal_value in iter(Capture)) and (signal_value != Capture.No): - signals_to_capture[signal_path] = CaptureSignalWrapper( - signal_object, - signal_value, - ) - - return signals_to_capture - - class PandaHDFWriter(DetectorWriter): _ctxt: Optional[Context] = None def __init__( self, - prefix: str, directory_provider: DirectoryProvider, - name_provider: NameProvider, panda_device: CommonPandaBlocks, ) -> None: self.panda_device = panda_device - self._prefix = prefix self._directory_provider = directory_provider - self._name_provider = name_provider self._datasets: List[_HDFDataset] = [] self._file: Optional[_HDFFile] = None self._multiplier = 1 @@ -110,14 +35,9 @@ def __init__( async def open(self, multiplier: int = 1) -> Dict[str, DataKey]: """Retrieve and get descriptor of all PandA signals marked for capture""" - # Get capture PVs by looking at panda. Gives mapping of dotted attribute path - # to Signal object - self.capture_signals = get_capture_signals(self.panda_device) - # Ensure flushes are immediate await self.panda_device.data.flush_period.set(0) - to_capture = await get_signals_marked_for_capture(self.capture_signals) self._file = None info = self._directory_provider() # Set the initial values @@ -133,36 +53,21 @@ async def open(self, multiplier: int = 1) -> Dict[str, DataKey]: # Wait for it to start, stashing the status that tells us when it finishes await self.panda_device.data.capture.set(True) - name = self._name_provider() if multiplier > 1: raise ValueError( "All PandA datasets should be scalar, multiplier should be 1" ) - self._datasets = [] - for attribute_path, capture_signal in to_capture.items(): - split_path = attribute_path.split(".") - signal_name = split_path[-1] - # Get block names from numbered blocks, eg INENC[1] - block_name = ( - f"{split_path[-3]}{split_path[-2]}" - if split_path[-2].isnumeric() - else split_path[-2] - ) - for suffix in capture_signal.capture_type.split(" "): - self._datasets.append( - _HDFDataset( - name, - block_name, - f"{name}-{block_name}-{signal_name}-{suffix}", - f"{block_name}-{signal_name}".upper() + f"-{suffix}", - [1], - multiplier=1, - ) - ) + return await self._describe() + async def _describe(self) -> Dict[str, DataKey]: + """ + Return a describe based on the datasets PV + """ + + await self._update_datasets() describe = { - ds.name: DataKey( + ds.data_key: DataKey( source=self.panda_device.data.hdf_directory.source, shape=ds.shape, dtype="array" if ds.shape != [1] else "number", @@ -172,6 +77,18 @@ async def open(self, multiplier: int = 1) -> Dict[str, DataKey]: } return describe + async def _update_datasets(self) -> None: + """ + Load data from the datasets PV on the panda, update internal + representation of datasets that the panda will write. + """ + + capture_table = await self.panda_device.data.datasets.get_value() + self._datasets = [ + _HDFDataset(dataset_name, "/" + dataset_name, [1], multiplier=1) + for dataset_name in capture_table["name"] + ] + # Next few functions are exactly the same as AD writer. Could move as default # StandardDetector behavior async def wait_for_index( diff --git a/src/ophyd_async/panda/writers/_panda_hdf_file.py b/src/ophyd_async/panda/writers/_panda_hdf_file.py index 3b5b77449d..44ec51d347 100644 --- a/src/ophyd_async/panda/writers/_panda_hdf_file.py +++ b/src/ophyd_async/panda/writers/_panda_hdf_file.py @@ -9,10 +9,8 @@ @dataclass class _HDFDataset: - device_name: str - block: str - name: str - path: str + data_key: str + internal_path: str shape: List[int] multiplier: int @@ -29,12 +27,10 @@ def __init__( compose_stream_resource( spec="AD_HDF5_SWMR_SLICE", root=str(directory_info.root), - data_key=ds.name, + data_key=ds.data_key, resource_path=(f"{str(directory_info.root)}/{full_file_name}"), resource_kwargs={ - "name": ds.name, - "block": ds.block, - "path": ds.path, + "path": ds.internal_path, "multiplier": ds.multiplier, "timestamps": "/entry/instrument/NDAttributes/NDArrayTimeStamp", }, diff --git a/tests/panda/test_hdf_panda.py b/tests/panda/test_hdf_panda.py index 0801a19586..5ef81b7f40 100644 --- a/tests/panda/test_hdf_panda.py +++ b/tests/panda/test_hdf_panda.py @@ -1,5 +1,6 @@ from typing import Dict +import numpy as np import pytest from bluesky import plan_stubs as bps from bluesky.run_engine import RunEngine @@ -9,9 +10,8 @@ from ophyd_async.core.flyer import HardwareTriggeredFlyable from ophyd_async.core.mock_signal_utils import callback_on_mock_put from ophyd_async.core.signal import SignalR, assert_emitted -from ophyd_async.epics.signal.signal import epics_signal_r from ophyd_async.panda import HDFPanda, StaticSeqTableTriggerLogic -from ophyd_async.panda.writers._hdf_writer import Capture +from ophyd_async.panda._table import DatasetTable, PandaHdf5DatasetType from ophyd_async.plan_stubs import ( prepare_static_seq_table_flyer_and_detectors_with_same_trigger, ) @@ -26,25 +26,28 @@ class CaptureBlock(Device): mock_hdf_panda = HDFPanda( "HDFPANDA:", directory_provider=directory_provider, name="panda" ) - block_a = CaptureBlock(name="block_a") - block_b = CaptureBlock(name="block_b") - block_a.test_capture = epics_signal_r( - Capture, "pva://test_capture_a", name="test_capture_a" - ) - block_b.test_capture = epics_signal_r( - Capture, "pva://test_capture_b", name="test_capture_b" - ) - - setattr(mock_hdf_panda, "block_a", block_a) - setattr(mock_hdf_panda, "block_b", block_b) await mock_hdf_panda.connect(mock=True) def link_function(value, **kwargs): set_mock_value(mock_hdf_panda.pcap.active, value) callback_on_mock_put(mock_hdf_panda.pcap.arm, link_function) - set_mock_value(block_a.test_capture, Capture.Min) - set_mock_value(block_b.test_capture, Capture.Diff) + + set_mock_value( + mock_hdf_panda.data.datasets, + DatasetTable( + name=np.array( + [ + "x", + "y", + ] + ), + hdf5_type=[ + PandaHdf5DatasetType.UINT_32, + PandaHdf5DatasetType.FLOAT_64, + ], + ), + ) yield mock_hdf_panda @@ -127,10 +130,7 @@ def flying_plan(): # test descriptor data_key_names: Dict[str, str] = docs["descriptor"][0]["object_keys"]["panda"] - assert data_key_names == [ - "panda-block_a-test-Min", - "panda-block_b-test-Diff", - ] + assert data_key_names == ["x", "y"] for data_key_name in data_key_names: assert ( docs["descriptor"][0]["data_keys"][data_key_name]["source"] @@ -138,17 +138,15 @@ def flying_plan(): ) # test stream resources - for block_letter, stream_resource, data_key_name in zip( - ("a", "b"), docs["stream_resource"], data_key_names + for dataset_name, stream_resource, data_key_name in zip( + ("x", "y"), docs["stream_resource"], data_key_names ): assert stream_resource["data_key"] == data_key_name assert stream_resource["spec"] == "AD_HDF5_SWMR_SLICE" assert stream_resource["run_start"] == docs["start"][0]["uid"] assert stream_resource["resource_kwargs"] == { - "block": f"block_{block_letter}", + "path": "/" + dataset_name, "multiplier": 1, - "name": data_key_name, - "path": f"BLOCK_{block_letter.upper()}-TEST-{data_key_name.split('-')[-1]}", "timestamps": "/entry/instrument/NDAttributes/NDArrayTimeStamp", } diff --git a/tests/panda/test_writer.py b/tests/panda/test_writer.py index e056648ef4..dfe0ef5c86 100644 --- a/tests/panda/test_writer.py +++ b/tests/panda/test_writer.py @@ -1,6 +1,7 @@ -import asyncio -from unittest.mock import patch +from pathlib import Path +from unittest.mock import ANY +import numpy as np import pytest from ophyd_async.core import ( @@ -12,17 +13,44 @@ set_mock_value, ) from ophyd_async.epics.pvi import create_children_from_annotations, fill_pvi_entries -from ophyd_async.epics.signal.signal import epics_signal_r from ophyd_async.panda import CommonPandaBlocks -from ophyd_async.panda.writers._hdf_writer import ( - Capture, - CaptureSignalWrapper, - PandaHDFWriter, - get_capture_signals, - get_signals_marked_for_capture, -) +from ophyd_async.panda._table import DatasetTable, PandaHdf5DatasetType +from ophyd_async.panda.writers._hdf_writer import PandaHDFWriter from ophyd_async.panda.writers._panda_hdf_file import _HDFFile +TABLES = [ + DatasetTable( + name=np.array([]), + hdf5_type=[], + ), + DatasetTable( + name=np.array( + [ + "x", + ] + ), + hdf5_type=[ + PandaHdf5DatasetType.UINT_32, + ], + ), + DatasetTable( + name=np.array( + [ + "x", + "y", + "y_min", + "y_max", + ] + ), + hdf5_type=[ + PandaHdf5DatasetType.UINT_32, + PandaHdf5DatasetType.FLOAT_64, + PandaHdf5DatasetType.FLOAT_64, + PandaHdf5DatasetType.FLOAT_64, + ], + ), +] + @pytest.fixture async def panda_t(): @@ -53,13 +81,11 @@ async def mock_panda(panda_t): mock_panda = panda_t("mock_PANDA", name="mock_panda") set_mock_value( - mock_panda.block_a.test_capture, - Capture.MinMaxMean, # type: ignore[attr-defined] - ) - - set_mock_value( - mock_panda.block_b.test_capture, - Capture.No, # type: ignore[attr-defined] + mock_panda.data.datasets, + DatasetTable( + name=np.array([]), + hdf5_type=[], + ), ) return mock_panda @@ -72,72 +98,34 @@ async def mock_writer(tmp_path, mock_panda) -> PandaHDFWriter: ) async with DeviceCollector(mock=True): writer = PandaHDFWriter( - prefix="TEST-PANDA", directory_provider=dir_prov, - name_provider=lambda: "test-panda", panda_device=mock_panda, ) return writer -async def test_get_capture_signals_gets_all_signals(mock_panda): - async with DeviceCollector(mock=True): - mock_panda.test_seq = Device("seq") - mock_panda.test_seq.seq1_capture = epics_signal_r( - str, "pva://read_pv_1", name="seq1_capture" - ) - mock_panda.test_seq.seq2_capture = epics_signal_r( - str, "pva://read_pv_2", name="seq2_capture" - ) - await asyncio.gather( - mock_panda.test_seq.connect(mock=True), - mock_panda.test_seq.seq1_capture.connect(mock=True), - mock_panda.test_seq.seq2_capture.connect(mock=True), - ) - capture_signals = get_capture_signals(mock_panda) - expected_signals = [ - "block_a.test_capture", - "block_b.test_capture", - "test_seq.seq1_capture", - "test_seq.seq2_capture", - ] - for signal in expected_signals: - assert signal in capture_signals.keys() - - -async def test_get_signals_marked_for_capture(mock_panda): - capture_signals = { - "block_a.test_capture": mock_panda.block_a.test_capture, - "block_b.test_capture": mock_panda.block_b.test_capture, - } - signals_marked_for_capture = await get_signals_marked_for_capture(capture_signals) - assert len(signals_marked_for_capture) == 1 - assert signals_marked_for_capture["block_a.test"].capture_type == Capture.MinMaxMean - - -async def test_open_returns_correct_descriptors(mock_writer: PandaHDFWriter): +@pytest.mark.parametrize("table", TABLES) +async def test_open_returns_correct_descriptors( + mock_writer: PandaHDFWriter, table: DatasetTable +): assert hasattr(mock_writer.panda_device, "data") - cap1 = mock_writer.panda_device.block_a.test_capture # type: ignore[attr-defined] - cap2 = mock_writer.panda_device.block_b.test_capture # type: ignore[attr-defined] - set_mock_value(cap1, Capture.MinMaxMean) - set_mock_value(cap2, Capture.Value) + set_mock_value( + mock_writer.panda_device.data.datasets, + table, + ) description = await mock_writer.open() # to make capturing status not time out - assert len(description) == 4 - for key, entry in description.items(): - assert entry.get("shape") == [1] - assert entry.get("dtype") == "number" - assert isinstance(key, str) - assert "source" in entry - assert entry.get("external") == "STREAM:" - expected_datakeys = [ - "test-panda-block_a-test-Min", - "test-panda-block_a-test-Max", - "test-panda-block_a-test-Mean", - "test-panda-block_b-test-Value", - ] - for key in expected_datakeys: - assert key in description + + for key, entry, expected_key in zip( + description.keys(), description.values(), table["name"] + ): + assert key == expected_key + assert entry == { + "source": mock_writer.panda_device.data.hdf_directory.source, + "shape": [1], + "dtype": "number", + "external": "STREAM:", + } async def test_open_close_sets_capture(mock_writer: PandaHDFWriter): @@ -176,33 +164,35 @@ async def test_wait_for_index(mock_writer: PandaHDFWriter): await mock_writer.wait_for_index(3, timeout=0.1) -async def test_collect_stream_docs(mock_writer: PandaHDFWriter): +@pytest.mark.parametrize("table", TABLES) +async def test_collect_stream_docs( + mock_writer: PandaHDFWriter, + tmp_path: Path, + table: DatasetTable, +): # Give the mock writer datasets - cap1 = mock_writer.panda_device.block_a.test_capture # type: ignore[attr-defined] - cap2 = mock_writer.panda_device.block_b.test_capture # type: ignore[attr-defined] - set_mock_value(cap1, Capture.MinMaxMean) - set_mock_value(cap2, Capture.Value) + set_mock_value(mock_writer.panda_device.data.datasets, table) + await mock_writer.open() [item async for item in mock_writer.collect_stream_docs(1)] assert type(mock_writer._file) is _HDFFile assert mock_writer._file._last_emitted == 1 - resource_doc = mock_writer._file._bundles[0].stream_resource_doc - assert resource_doc["data_key"] == "test-panda-block_a-test-Min" - assert "mock_panda/data.h5" in resource_doc["resource_path"] - - -async def test_numeric_blocks_correctly_formated(mock_writer: PandaHDFWriter): - async def get_numeric_signal(_): - return { - "device.block.1": CaptureSignalWrapper( - epics_signal_r(str, "pva://read_pv", name="Capture.Value"), - Capture.Value, - ) + for i in range(len(table["name"])): + resource_doc = mock_writer._file._bundles[i].stream_resource_doc + name = table["name"][i] + assert resource_doc == { + "spec": "AD_HDF5_SWMR_SLICE", + "path_semantics": "posix", + "root": str(tmp_path), + "data_key": name, + "resource_path": str(tmp_path / "mock_panda" / "data.h5"), + "resource_kwargs": { + "path": "/" + name, + "multiplier": 1, + "timestamps": "/entry/instrument/NDAttributes/NDArrayTimeStamp", + }, + "uid": ANY, } - - with patch( - "ophyd_async.panda.writers._hdf_writer.get_signals_marked_for_capture", - get_numeric_signal, - ): - assert "test-panda-block-1-Value" in await mock_writer.open() + assert resource_doc["data_key"] == name + assert "mock_panda/data.h5" in resource_doc["resource_path"]