-
Notifications
You must be signed in to change notification settings - Fork 24
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
173 stream resource seperate classes (#429)
Co-authored-by: Seher Karakuzu <[email protected]> Co-authored-by: Dan Allan <[email protected]> Co-authored-by: skarakuzu <[email protected]> Co-authored-by: Abigail Emery <[email protected]>
- Loading branch information
1 parent
b153c50
commit dbbcf28
Showing
18 changed files
with
368 additions
and
332 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
121 changes: 121 additions & 0 deletions
121
src/ophyd_async/epics/areadetector/writers/general_hdffile.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
from dataclasses import dataclass, field | ||
from pathlib import Path | ||
from typing import Iterator, List, Sequence | ||
from urllib.parse import urlunparse | ||
|
||
import event_model | ||
from event_model import ( | ||
ComposeStreamResource, | ||
ComposeStreamResourceBundle, | ||
StreamDatum, | ||
StreamResource, | ||
) | ||
|
||
from ophyd_async.core import DirectoryInfo | ||
|
||
|
||
@dataclass | ||
class _HDFDataset: | ||
data_key: str | ||
dataset: str | ||
shape: Sequence[int] = field(default_factory=tuple) | ||
multiplier: int = 1 | ||
swmr: bool = False | ||
|
||
|
||
SLICE_NAME = "AD_HDF5_SWMR_SLICE" | ||
|
||
|
||
def versiontuple(v): | ||
return tuple(map(int, (v.split(".")))) | ||
|
||
|
||
class _HDFFile: | ||
""" | ||
:param directory_info: Contains information about how to construct a StreamResource | ||
:param full_file_name: Absolute path to the file to be written | ||
:param datasets: Datasets to write into the file | ||
""" | ||
|
||
def __init__( | ||
self, | ||
directory_info: DirectoryInfo, | ||
full_file_name: Path, | ||
datasets: List[_HDFDataset], | ||
hostname: str = "localhost", | ||
) -> None: | ||
self._last_emitted = 0 | ||
self._hostname = hostname | ||
|
||
if len(datasets) == 0: | ||
self._bundles = [] | ||
return None | ||
|
||
if versiontuple(event_model.__version__) < versiontuple("1.21.0"): | ||
path = f"{full_file_name}" | ||
root = str(directory_info.root) | ||
bundler_composer = ComposeStreamResource() | ||
|
||
self._bundles: List[ComposeStreamResourceBundle] = [ | ||
bundler_composer( | ||
spec=SLICE_NAME, | ||
root=root, | ||
resource_path=path, | ||
data_key=ds.data_key, | ||
resource_kwargs={ | ||
"path": ds.dataset, | ||
"multiplier": ds.multiplier, | ||
"swmr": ds.swmr, | ||
}, | ||
) | ||
for ds in datasets | ||
] | ||
else: | ||
bundler_composer = ComposeStreamResource() | ||
|
||
uri = urlunparse( | ||
( | ||
"file", | ||
self._hostname, | ||
str((directory_info.root / full_file_name).absolute()), | ||
"", | ||
"", | ||
None, | ||
) | ||
) | ||
|
||
self._bundles: List[ComposeStreamResourceBundle] = [ | ||
bundler_composer( | ||
mimetype="application/x-hdf5", | ||
uri=uri, | ||
data_key=ds.data_key, | ||
parameters={ | ||
"dataset": ds.dataset, | ||
"swmr": ds.swmr, | ||
"multiplier": ds.multiplier, | ||
}, | ||
uid=None, | ||
validate=True, | ||
) | ||
for ds in datasets | ||
] | ||
|
||
def stream_resources(self) -> Iterator[StreamResource]: | ||
for bundle in self._bundles: | ||
yield bundle.stream_resource_doc | ||
|
||
def stream_data(self, indices_written: int) -> Iterator[StreamDatum]: | ||
# Indices are relative to resource | ||
if indices_written > self._last_emitted: | ||
indices = { | ||
"start": self._last_emitted, | ||
"stop": indices_written, | ||
} | ||
self._last_emitted = indices_written | ||
for bundle in self._bundles: | ||
yield bundle.compose_stream_datum(indices) | ||
return None | ||
|
||
def close(self) -> None: | ||
for bundle in self._bundles: | ||
bundle.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.