diff --git a/requirements.txt b/requirements.txt index cb6df00f..5237daf8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,6 @@ -dask +dask[array,diagnostics] != 2021.03.0 h5py +hdf5plugin numpy +pint zarr diff --git a/requirements_dev.txt b/requirements_dev.txt index 4cf02cf9..a3d002e8 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -1,6 +1,8 @@ -dask +dask[array,diagnostics] != 2021.03.0 h5py +hdf5plugin numpy +pint pytest pytest-cov zarr diff --git a/setup.cfg b/setup.cfg index 14cfa88f..0370b48a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -30,9 +30,11 @@ project-urls = [options] include_package_data = True install_requires = - dask + dask[array,diagnostics] != 2021.03.0 h5py + hdf5plugin numpy + pint zarr packages = find: package_dir = @@ -42,3 +44,9 @@ zip_safe = False [options.packages.find] where = src + +[options.entry_points] +console_scripts = + cues = tristan.command_line.cues:main + vds = tristan.command_line.vds:main + images = tristan.command_line.images:main diff --git a/src/tristan/__init__.py b/src/tristan/__init__.py index 901ef663..be60b553 100644 --- a/src/tristan/__init__.py +++ b/src/tristan/__init__.py @@ -13,53 +13,65 @@ __version__ = "0.0.0" __version_tuple__ = tuple(int(x) for x in __version__.split(".")) -import os -from typing import MutableSequence, Optional, Tuple +from typing import Dict, Optional, Tuple -import h5py -import numpy as np +import pint +from dask import array as da + +try: + from numpy.typing import ArrayLike +except ImportError: + # NumPy versions compatible with Python 3.6 do not have the numpy.typing module. + import numpy as np + + ArrayLike = np.ndarray clock_frequency = 6.4e8 -# Translations of the cue_id messages. +# Translations of the basic cue_id messages. padding = 0 sync = 0x800 -sync_module_1 = 0x801 -sync_module_2 = 0x802 shutter_open = 0x840 -shutter_open_module_1 = 0x841 -shutter_open_module_2 = 0x842 shutter_close = 0x880 -shutter_close_module_1 = 0x881 -shutter_close_module_2 = 0x882 fem_falling = 0x8C1 fem_rising = 0x8E1 ttl_falling = 0x8C9 ttl_rising = 0x8E9 lvds_falling = 0x8CA lvds_rising = 0x8EA +tzero_falling = 0x8CB +tzero_rising = 0x8EB +sync_falling = 0x8CC +sync_rising = 0x8EC reserved = 0xF00 cues = { padding: "Padding", - sync: "Extended time stamp, global synchronisation signal", - sync_module_1: "Extended time stamp, sensor module 1", - sync_module_2: "Extended time stamp, sensor module 2", + sync: "Extended time stamp, global synchronisation", shutter_open: "Shutter open time stamp, global", - shutter_open_module_1: "Shutter open time stamp, sensor module 1", - shutter_open_module_2: "Shutter open time stamp, sensor module 2", shutter_close: "Shutter close time stamp, global", - shutter_close_module_1: "Shutter close time stamp, sensor module 1", - shutter_close_module_2: "Shutter close time stamp, sensor module 2", - fem_falling: "FEM trigger input, falling edge", - fem_rising: "FEM trigger input, rising edge", + fem_falling: "FEM trigger, falling edge", + fem_rising: "FEM trigger, rising edge", ttl_falling: "Clock trigger TTL input, falling edge", ttl_rising: "Clock trigger TTL input, rising edge", lvds_falling: "Clock trigger LVDS input, falling edge", lvds_rising: "Clock trigger LVDS input, rising edge", + tzero_falling: "Clock trigger TZERO input, falling edge", + tzero_rising: "Clock trigger TZERO input, rising edge", + sync_falling: "Clock trigger SYNC input, falling edge", + sync_rising: "Clock trigger SYNC input, rising edge", 0xBC6: "Error: messages out of sync", 0xBCA: "Error: messages out of sync", reserved: "Reserved", + **{ + basic + n: f"{name} time stamp, sensor module {n}" + for basic, name in ( + (sync, "Extended"), + (shutter_open, "Shutter open"), + (shutter_close, "Shutter close"), + ) + for n in range(1, 64) + }, } # Keys of event data in the HDF5 file structure. @@ -68,61 +80,53 @@ event_location_key = "event_id" event_time_key = "event_time_offset" event_energy_key = "event_energy" -size_key = "entry/instrument/detector/module/data_size" +cue_keys = cue_id_key, cue_time_key +event_keys = event_location_key, event_time_key, event_energy_key -def fullpath(path: str) -> str: - """Get an absolute path with tilde home directory shorthand expanded.""" - return os.path.abspath(os.path.expanduser(path)) +nx_size_key = "entry/instrument/detector/module/data_size" -def first_cue_time( - data: h5py.File, message: int, events_group: Optional[str] = "/" -) -> Optional[int]: +def first_cue_time(data: Dict[str, da.Array], message: int) -> Optional[int]: """ - Find the timestamp of the first instance of a cue message in a data file. + Find the timestamp of the first instance of a cue message in a Tristan data set. Args: - data: A NeXus-like LATRD data file. + data: A NeXus-like LATRD data dictionary (a dictionary with data set + names as keys and Dask arrays as values). Must contain one entry + for cue id messages and one for cue timestamps. The two arrays are + assumed to have the same length. message: The message code, as defined in the Tristan standard. - events_group: HDF5 group containing the events data. Returns: The timestamp, measured in clock cycles from the global synchronisation signal. If the message doesn't exist in the data set, this returns None. """ - events_group = events_group or "/" - - index = np.argmax(data[events_group + cue_id_key][...] == message) - - # Catch the case in which the message is not present in the data set. - if index == 0 and data[events_group + cue_id_key][0] != message: - return None + index = da.argmax(data[cue_id_key] == message) + if index or data[cue_id_key][0] == message: + return data[cue_time_key][index] - return data[events_group + cue_time_key][index].astype(int) - -def cue_times( - data: h5py.File, message: int, events_group: Optional[str] = "/" -) -> MutableSequence[int]: +def cue_times(data: Dict[str, da.Array], message: int) -> da.Array: """ - Find the timestamps of all instances of a cue message in a data file. + Find the timestamps of all instances of a cue message in a Tristan data set. Args: - data: A NeXus-like LATRD data file. + data: A NeXus-like LATRD data dictionary (a dictionary with data set + names as keys and Dask arrays as values). Must contain one entry + for cue id messages and one for cue timestamps. The two arrays are + assumed to have the same length. message: The message code, as defined in the Tristan standard. - events_group: HDF5 group containing the events data. Returns: - The timestamps, measured in clock cycles from the global synchronisation signal. + The timestamps, measured in clock cycles from the global synchronisation + signal, de-duplicated. """ - events_group = events_group or "/" - - index = np.nonzero(data[events_group + cue_id_key][...] == message) - return np.unique(data[events_group + cue_time_key][index].astype(int)) + index = da.flatnonzero(data[cue_id_key] == message) + return da.unique(data[cue_time_key][index]) -def seconds(timestamp: int, reference: int = 0) -> float: +def seconds(timestamp: ArrayLike, reference: ArrayLike = 0) -> ArrayLike: """ Convert a Tristan timestamp to seconds, measured from a given time. @@ -138,28 +142,32 @@ def seconds(timestamp: int, reference: int = 0) -> float: Returns: The difference between the two timestamps in seconds. """ - return (timestamp - reference) / clock_frequency + return pint.Quantity((timestamp - reference) / clock_frequency, "s").to_compact() -def coordinates(event_location: int) -> Tuple[int, int]: +def pixel_index(location: ArrayLike, image_size: Tuple[int, int]) -> ArrayLike: """ - Extract pixel coordinate information from an event location message. + Extract pixel coordinate information from an event location (event_id) message. + + Translate a Tristan event location message to the index of the corresponding + pixel in the flattened image array (i.e. numbered from zero, in row-major order). + + The pixel coordinates of an event on a Tristan detector are encoded in a 32-bit + integer location message (the event_id) with 26 bits of useful information. + Extract the y coordinate (the 13 least significant bits) and the x coordinate + (the 13 next least significant bits). Find the corresponding pixel index in the + flattened image array by multiplying the y value by the size of the array in x, + and adding the x value. - The pixel coordinates of an event on a Tristan detector are encoded in an - integer location message with 26 bits of useful information. Extract the y - coordinate (the 13 least significant bits) and the x coordinate (the 13 next - least significant bits). + This function calls the Python built-in divmod and so can be broadcast over NumPy + and Dask arrays. Args: - event_location: Either a single event location message (an integer) or a NumPy - array of multiple integers representing the coordinates for - several events. + location: Event location message (an integer). + image_size: Shape of the image array in (y, x), i.e. (slow, fast). Returns: - A tuple (x, y) of decoded coordinates. + Index in the flattened image array of the pixel where the event occurred. """ - x, y = divmod(event_location, 0x2000) - if isinstance(event_location, np.ndarray): - return x.astype(np.int16), y.astype(np.int16) - else: - return x, y + x, y = divmod(location, 0x2000) + return x + y * image_size[1] diff --git a/src/tristan/binning.py b/src/tristan/binning.py new file mode 100644 index 00000000..bf38762d --- /dev/null +++ b/src/tristan/binning.py @@ -0,0 +1,70 @@ +"""Tools for binning events to images.""" + + +from operator import mul +from typing import Dict, Tuple + +import numpy as np +from dask import array as da + +try: + from numpy.typing import ArrayLike +except ImportError: + # NumPy versions compatible with Python 3.6 do not have the numpy.typing module. + ArrayLike = np.ndarray + +from . import ( + event_location_key, + event_time_key, + first_cue_time, + pixel_index, + shutter_close, + shutter_open, +) + + +def find_start_end(data): + start_time = first_cue_time(data, shutter_open) + end_time = first_cue_time(data, shutter_close) + return da.compute(start_time, end_time) + + +def make_single_image( + data: Dict[str, da.Array], image_size: Tuple[int, int], start: int, end: int +) -> da.Array: + event_times = data[event_time_key] + event_locations = data[event_location_key] + + valid_events = (start <= event_times) & (event_times < end) + event_locations = event_locations[valid_events] + event_locations = pixel_index(event_locations, image_size) + + image = da.bincount(event_locations, minlength=mul(*image_size)) + return image.astype(np.uint32).reshape(1, *image_size) + + +def make_multiple_images( + data: Dict[str, da.Array], image_size: Tuple[int, int], bins: ArrayLike +) -> da.Array: + event_times = data[event_time_key] + event_locations = data[event_location_key] + + valid_events = (bins[0] <= event_times) & (event_times < bins[-1]) + event_times = event_times[valid_events] + event_locations = event_locations[valid_events] + + image_indices = da.digitize(event_times, bins) - 1 + event_locations = pixel_index(event_locations, image_size) + num_images = bins.size - 1 + + image_indices = [ + image_indices == image_number for image_number in range(num_images) + ] + images = da.stack( + [ + da.bincount(event_locations[indices], minlength=mul(*image_size)) + for indices in image_indices + ] + ) + + return images.astype(np.uint32).reshape(num_images, *image_size) diff --git a/src/tristan/command_line/__init__.py b/src/tristan/command_line/__init__.py new file mode 100644 index 00000000..6d24827e --- /dev/null +++ b/src/tristan/command_line/__init__.py @@ -0,0 +1,65 @@ +"""General utilities for the command-line tools.""" + +import argparse + +import pint + +from .. import __version__ + +__all__ = ("version_parser", "input_parser", "image_output_parser", "exposure_parser") + + +version_parser = argparse.ArgumentParser(add_help=False) +version_parser.add_argument( + "-V", + "--version", + action="version", + version="%(prog)s: Tristan tools {version}".format(version=__version__), +) + + +input_parser = argparse.ArgumentParser(add_help=False) +input_parser.add_argument( + "input_file", + help="Tristan metadata ('_meta.h5') or raw data ('_000001.h5', etc.) file. " + "This file must be in the same directory as the HDF5 files containing all the " + "corresponding raw events data.", + metavar="input-file", +) + + +image_output_parser = argparse.ArgumentParser(add_help=False) +image_output_parser.add_argument( + "-o", + "--output-file", + help="File name or location for output image file, defaults to the working " + "directory. If only a directory location is given, the pattern of the raw data " + "files will be used, with '_meta.h5' replaced with '_single_image.h5'.", +) +image_output_parser.add_argument( + "-f", + "--force", + help="Force the output image file to over-write any existing file with the same " + "name.", + action="store_true", +) + +image_output_parser.add_argument( + "-s", + "--image-size", + help="Dimensions of the detector in pixels, separated by a comma, as 'x,y', i.e. " + "'fast,slow'.", +) + + +exposure_parser = argparse.ArgumentParser(add_help=False) +group = exposure_parser.add_mutually_exclusive_group(required=True) +group.add_argument( + "-e", + "--exposure-time", + help="Duration of each image. This will be used to calculate the number of " + "images. Specify a value with units like '--exposure-time .5ms', '-e 500µs' or " + "'-e 500us'.", + type=pint.Quantity, +) +group.add_argument("-n", "--num-images", help="Number of images.", type=int) diff --git a/src/tristan/command_line/cues.py b/src/tristan/command_line/cues.py new file mode 100644 index 00000000..d11e64be --- /dev/null +++ b/src/tristan/command_line/cues.py @@ -0,0 +1,66 @@ +"""Summarise the cue messages in Tristan data.""" + +import argparse +from contextlib import ExitStack + +import h5py +import numpy as np +from dask import array as da + +from .. import cue_id_key, cue_keys, cue_time_key, cues, reserved, seconds +from ..data import data_files, find_input_file_name +from . import input_parser, version_parser + +parser = argparse.ArgumentParser( + description=__doc__, parents=[version_parser, input_parser] +) + + +def main(args=None): + args = parser.parse_args(args) + + data_dir, root = find_input_file_name(args.input_file) + raw_files, _ = data_files(data_dir, root) + + with ExitStack() as stack: + files = [stack.enter_context(h5py.File(f, "r")) for f in raw_files] + data = { + key: da.concatenate([f[key] for f in files]).rechunk() for key in cue_keys + } + + relevant = (data[cue_id_key] > 0) & (data[cue_id_key] != reserved) + cue_ids = data[cue_id_key][relevant].compute() + cue_times = data[cue_time_key][relevant].compute() + + unique_cues = np.sort(np.unique(cue_ids)) + + print("\nSummary of cue messages:") + + for cue in unique_cues: + cues_sel = cue_ids == cue + cue_times_sel = cue_times[cues_sel] + deduplicated = np.sort(np.unique(cue_times_sel)) + + if deduplicated.size > 1: + time_diffs = np.diff(deduplicated) + min_diff = time_diffs.min() + max_diff = time_diffs.max() + avg_diff = time_diffs.mean() + + print( + f""" +{cues.get(cue, f'Unknown (0x{cue:04x})')}: +Found {cue_times_sel.size} instances. +Found {deduplicated.size} de-duplicated instances with +\tsmallest time difference: {min_diff} cycles ({seconds(min_diff):~.3g}), +\tlargest time difference: {max_diff} cycles ({seconds(max_diff):~.3g}), +\tmean time difference: {avg_diff:.2f} cycles ({seconds(avg_diff):~.3g}).""" + ) + elif cue_times_sel.size > 1: + n = cue_times_sel.size + print( + f"\n{cues.get(cue, f'Unknown (0x{cue:04x})')}: Found {n} instances,\n" + f"\tall with the same timestamp." + ) + else: + print(f"\n{cues.get(cue, f'Unknown (0x{cue:04x})')}: Found 1 instance.") diff --git a/src/tristan/command_line/images.py b/src/tristan/command_line/images.py new file mode 100644 index 00000000..b866f358 --- /dev/null +++ b/src/tristan/command_line/images.py @@ -0,0 +1,300 @@ +"""Aggregate the events from a LATRD Tristan data collection into one or more images.""" + +import argparse +import sys +from contextlib import ExitStack +from pathlib import Path +from typing import Tuple + +import h5py +import numpy as np +import pint +from dask import array as da +from dask.diagnostics import ProgressBar +from hdf5plugin import Bitshuffle + +from .. import ( + clock_frequency, + cue_keys, + cue_times, + cues, + event_location_key, + event_time_key, + fem_falling, + fem_rising, + lvds_falling, + lvds_rising, + ttl_falling, + ttl_rising, +) +from ..binning import ( + find_start_end, + first_cue_time, + make_multiple_images, + make_single_image, +) +from ..data import data_files, find_file_names +from . import exposure_parser, image_output_parser, input_parser, version_parser + +triggers = { + "TTL-rising": ttl_rising, + "TTL-falling": ttl_falling, + "LVDS-rising": lvds_rising, + "LVDS-falling": lvds_falling, + "FEM-rising": fem_rising, + "FEM-falling": fem_falling, +} + + +def determine_image_size(data_dir: Path, root: str) -> Tuple[int, int]: + """Find the image size from metadata.""" + nexus_file = data_dir / f"{root}.nxs" + try: + with h5py.File(nexus_file, "r") as f: + return f["entry/instrument/detector/module/data_size"][()] + except (FileNotFoundError, OSError): + sys.exit( + f"Cannot find NeXus file:\n\t{nexus_file}\nPlease specify the " + f"detector dimensions in (x, y) with '--image-size'." + ) + + +def exposure( + start: int, end: int, exposure_time: pint.Quantity = None, num_images: int = None +): + freq = pint.Quantity(clock_frequency, "Hz") + if exposure_time: + exposure_time = exposure_time.to_base_units().to_compact() + exposure_cycles = (exposure_time * freq).to_base_units().magnitude + num_images = int((end - start) // exposure_cycles) + else: + # Because they are expected to be mutually exclusive, if there is no + # exposure_time, there must be a num_images. + num_images = num_images + exposure_cycles = (end - start) / num_images + exposure_time = exposure_cycles / freq + exposure_time = exposure_time.to_base_units().to_compact() + + return exposure_time, exposure_cycles, num_images + + +def single_image_cli(args): + """Utility for making a single image from event-mode data.""" + data_dir, root, output_file = find_file_names( + args.input_file, args.output_file, "single_image", args.force + ) + + if args.image_size: + image_size = tuple(map(int, args.image_size.split(",")))[::-1] + else: + image_size = determine_image_size(data_dir, root) + + raw_files, _ = data_files(data_dir, root) + + with ExitStack() as stack: + files = [stack.enter_context(h5py.File(f, "r")) for f in raw_files] + data = { + key: da.concatenate([f[key] for f in files]).rechunk() + for key in (event_location_key, event_time_key) + cue_keys + } + + print("Binning events into a single image.") + image = make_single_image(data, image_size, *find_start_end(data)) + + with ProgressBar(), h5py.File(output_file, "w" if args.force else "x") as f: + data_set = f.require_dataset( + "data", + shape=image.shape, + dtype=image.dtype, + chunks=image.chunksize, + **Bitshuffle(), + ) + image.store(data_set) + + print(f"Image file written to\n\t{output_file}") + + +def multiple_images_cli(args): + """Utility for making multiple images from event-mode data.""" + data_dir, root, output_file = find_file_names( + args.input_file, args.output_file, "images", args.force + ) + + if args.image_size: + image_size = tuple(map(int, args.image_size.split(",")))[::-1] + else: + image_size = determine_image_size(data_dir, root) + + raw_files, _ = data_files(data_dir, root) + + with ExitStack() as stack: + files = [stack.enter_context(h5py.File(f, "r")) for f in raw_files] + data = { + key: da.concatenate([f[key] for f in files]).rechunk() + for key in (event_location_key, event_time_key) + cue_keys + } + + start, end = find_start_end(data) + exposure_time, exposure_cycles, num_images = exposure( + start, end, args.exposure_time, args.num_images + ) + + print( + f"Binning events into {num_images} images with an exposure time of " + f"{exposure_time:~.3g}." + ) + + if args.align_trigger: + trigger_type = triggers.get(args.align_trigger) + print( + f"Image start and end times will be chosen such that the first " + f"'{cues[trigger_type]}' is aligned with an image boundary." + ) + # Note we are assuming that the first trigger time is after shutter open. + trigger_time = first_cue_time(data, trigger_type) + if trigger_time is None: + sys.exit(f"Could not find a '{cues[trigger_type]}' signal.") + trigger_time = trigger_time.compute().astype(int) + bins_pre = np.arange( + trigger_time - exposure_cycles, start, -exposure_cycles, dtype=np.uint64 + )[::-1] + bins_post = np.arange(trigger_time, end, exposure_cycles, dtype=np.uint64) + bins = np.concatenate((bins_pre, bins_post)) + else: + bins = np.linspace(start, end, num_images + 1, dtype=np.uint64) + + images = make_multiple_images(data, image_size, bins) + + with ProgressBar(), h5py.File(output_file, "w" if args.force else "x") as f: + data_set = f.require_dataset( + "data", + shape=images.shape, + dtype=images.dtype, + chunks=images.chunksize, + **Bitshuffle(), + ) + images.store(data_set) + + print(f"Images file written to\n\t{output_file}") + + +def pump_probe_cli(args): + """Utility for making multiple images from """ + data_dir, root, output_file = find_file_names( + args.input_file, args.output_file, "images", args.force + ) + + if args.image_size: + image_size = tuple(map(int, args.image_size.split(",")))[::-1] + else: + image_size = determine_image_size(data_dir, root) + + raw_files, _ = data_files(data_dir, root) + + with ExitStack() as stack: + files = [stack.enter_context(h5py.File(f, "r")) for f in raw_files] + data = { + key: da.concatenate([f[key] for f in files]).rechunk() + for key in (event_location_key, event_time_key) + cue_keys + } + + trigger_type = triggers.get(args.trigger_type) + + trigger_times = cue_times(data, trigger_type).compute().astype(int) + trigger_times = np.sort(np.unique(trigger_times)) + end = np.diff(trigger_times).min() + + if not np.any(trigger_times): + sys.exit(f"Could not find a '{cues[trigger_type]}' signal.") + + exposure_time, exposure_cycles, num_images = exposure( + 0, end, args.exposure_time, args.num_images + ) + + print( + f"Binning events into {num_images} images with an exposure time of " + f"{exposure_time:~.3g} according to the time elapsed since the mose recent " + f"'{cues[trigger_type]}' signal." + ) + + # Measure the event time as time elapsed since the most recent trigger signal. + data[event_time_key] = data[event_time_key].astype(np.int64) + data[event_time_key] -= trigger_times[ + da.digitize(data[event_time_key], trigger_times) - 1 + ] + + bins = np.linspace(0, end, num_images + 1, dtype=np.uint64) + + images = make_multiple_images(data, image_size, bins) + + with ProgressBar(), h5py.File(output_file, "w" if args.force else "x") as f: + data_set = f.require_dataset( + "data", + shape=images.shape, + dtype=images.dtype, + chunks=images.chunksize, + **Bitshuffle(), + ) + images.store(data_set) + + print(f"Images file written to\n\t{output_file}") + + +parser = argparse.ArgumentParser(description=__doc__) +subparsers = parser.add_subparsers( + help="Choose the manner in which to create images.", + required=True, + dest="sub-command", +) + +parser_single = subparsers.add_parser( + "single", + aliases=["1"], + description=( + "Aggregate all the events from a LATRD Tristan data collection " + "into a single image." + ), + parents=[version_parser, input_parser, image_output_parser], +) +parser_single.set_defaults(func=single_image_cli) + +parser_multiple = subparsers.add_parser( + "multiple", + aliases=["multi"], + description=( + "Bin the events from a LATRD Tristan data collection into multiple images." + ), + parents=[version_parser, input_parser, image_output_parser, exposure_parser], +) +parser_multiple.add_argument( + "-a", + "--align-trigger", + help="Align the start and end time of images such that the first trigger signal of " + "the chosen type is matched up with an image start time. Useful for examining " + "effects in the data before and after a single trigger pulse.", + choices=triggers.keys(), +) +parser_multiple.set_defaults(func=multiple_images_cli) + +parser_pump_probe = subparsers.add_parser( + "pump-probe", + aliases=["pp"], + description="With LATRD data from a pump-probe experiment, where the pump " + "signal has a regular repeat rate, bin events into images " + "according to the time elapsed since the most recent pump trigger " + "signal.", + parents=[version_parser, input_parser, image_output_parser, exposure_parser], +) +parser_pump_probe.add_argument( + "-t", + "--trigger-type", + help="The type of trigger signal used as the pump pulse marker.", + choices=triggers.keys(), +) +parser_pump_probe.set_defaults(func=pump_probe_cli) + + +def main(args=None): + """Perform the image binning with a user-specified sub-command.""" + args = parser.parse_args(args) + args.func(args) diff --git a/src/tristan/command_line/vds.py b/src/tristan/command_line/vds.py new file mode 100644 index 00000000..aa245a96 --- /dev/null +++ b/src/tristan/command_line/vds.py @@ -0,0 +1,52 @@ +""" +Create an HDF5 virtual data set (VDS) file to aggregate raw Tristan events data. + +By default, this file will be saved in the same directory as the raw data and +detector metadata files, retaining the same naming convention. So if the metadata +file is named 'my_data_1_meta.h5', then the new VDS file will be named +'my_data_1_vds.h5'. +""" + +import argparse + +import h5py + +from ..data import data_files, find_file_names +from ..vds import time_slice_info, virtual_data_set +from . import input_parser, version_parser + +parser = argparse.ArgumentParser( + description=__doc__, parents=[version_parser, input_parser] +) +parser.add_argument( + "-o", + "--output-file", + help="File name for output VDS file. " + "By default, the pattern of the input file will be used, with '_meta.h5' " + "replaced with '_vds.h5'.", +) +parser.add_argument( + "-f", + "--force", + help="Force the output file to over-write any existing file with the same name.", + action="store_true", +) + + +def main(args=None): + """Utility for making an HDF5 VDS from raw Tristan data.""" + args = parser.parse_args(args) + data_dir, root, output_file = find_file_names( + args.input_file, args.output_file, "vds", args.force + ) + + raw_files, meta_file = data_files(data_dir, root) + with h5py.File(meta_file, "r") as f: + ts_info = time_slice_info(f) + layouts = virtual_data_set(raw_files, f, *ts_info) + + with h5py.File(output_file, "w" if args.force else "x") as f: + for layout in layouts.items(): + f.create_virtual_dataset(*layout) + + print(f"Virtual data set file written to\n\t{output_file}") diff --git a/src/tristan/data.py b/src/tristan/data.py new file mode 100644 index 00000000..0fb8ad26 --- /dev/null +++ b/src/tristan/data.py @@ -0,0 +1,103 @@ +"""Tools for extracting data on cues and events from Tristan data files.""" + +import glob +import re +import sys +from itertools import filterfalse +from pathlib import Path +from typing import List, Optional + +import h5py +import numpy as np + +# Regex for the names of data sets, in the time slice metadata file, representing the +# distribution of time slices across raw data files for each module. +ts_key_regex = re.compile(r"ts_qty_module\d{2}") + + +def find_input_file_name(in_file): + """Resolve the input file name.""" + in_file = Path(in_file).expanduser().resolve() + + data_dir = in_file.parent + + # Get the segment 'name_root' from 'name_root_meta.h5' or 'name_root_000001.h5'. + file_name_root = re.fullmatch(r"(.*)_(?:meta|\d+)", in_file.stem) + if file_name_root: + file_name_root = file_name_root[1] + else: + sys.exit( + "Input file name did not have the expected format '_meta.h5':\n" + f"\t{in_file}" + ) + + return data_dir, file_name_root + + +def find_file_names( + in_file: str, out_file: Optional[str], default_out_label: str, force: bool +) -> (Path, str, Path): + """Resolve the input and output file names.""" + in_file = Path(in_file).expanduser().resolve() + + data_dir = in_file.parent + + # Get the segment 'name_root' from 'name_root_meta.h5' or 'name_root_000001.h5'. + file_name_root = re.fullmatch(r"(.*)_(?:meta|\d+)", in_file.stem) + if file_name_root: + file_name_root = file_name_root[1] + else: + sys.exit( + "Input file name did not have the expected format '_meta.h5':\n" + f"\t{in_file}" + ) + + if out_file: + out_file = Path(out_file).expanduser().resolve() + else: + out_file = Path(f"{file_name_root}_{default_out_label}.h5") + + if not force and out_file.exists(): + sys.exit( + f"This output file already exists:\n\t{out_file}\n" + f"Use '-f' to override or specify a different output file path with '-o'." + ) + + return data_dir, file_name_root, out_file + + +def data_files(data_dir: Path, root: str, n_dig: int = 6) -> (List[Path], Path): + """ + Extract information about the files containing raw cues and events data. + + Args: + data_dir: Directory containing the raw data and time slice metadata HDF5 files. + root: Input file name, stripped of '_meta.h5', '_000001.h5', etc.. + n_dig: Number of digits in the raw file number, e.g. six in '_000001.h5'. + + Returns: + - Lexicographically sorted list of raw file paths. + - File path of the time slice metadata file. + """ + meta_file = data_dir / f"{root}_meta.h5" + if not meta_file.exists(): + sys.exit(f"Could not find the expected detector metadata file:\n\t{meta_file}") + + with h5py.File(meta_file, "r") as f: + n_files = np.sum(f.get("fp_per_module", default=())) + + if n_files: + raw_files = [data_dir / f"{root}_{n + 1:0{n_dig}d}.h5" for n in range(n_files)] + missing = list(filterfalse(Path.exists, raw_files)) + if missing: + missing = "\n\t".join(map(str, missing)) + sys.exit(f"The following expected data files are missing:\n\t{missing}") + else: + print( + "The detector metadata hold no information about the number of " + "expected raw data files. Falling back on finding the data dynamically." + ) + search_path = str(data_dir / f"{root}_{n_dig * '[0-9]'}.h5") + raw_files = [Path(path_str) for path_str in sorted(glob.glob(search_path))] + + return raw_files, meta_file diff --git a/src/tristan/vds.py b/src/tristan/vds.py new file mode 100644 index 00000000..70883caa --- /dev/null +++ b/src/tristan/vds.py @@ -0,0 +1,194 @@ +"""Tools for creating and manipulating an HDF5 VDS for Tristan data.""" + +from contextlib import ExitStack +from itertools import chain, compress, cycle, zip_longest +from pathlib import Path +from typing import Dict, Iterable, List + +import h5py +import numpy as np + +from . import cue_keys, event_keys +from .data import ts_key_regex + +Sources = Dict[str, Iterable[h5py.VirtualSource]] +TimeSliceInfo = List[slice], np.ndarray, List[slice], List[slice] +VirtualSourceInfo = Sources, Sources, List[int], Dict[str, type] +Layouts = Dict[str, h5py.VirtualLayout] + + +def time_slice_info(meta_file: h5py.File) -> TimeSliceInfo: + """ + Assemble information about the event data time slices from the metadata file. + + Args: + meta_file: Metadata ('_meta.h5') file. Assumes metadata version 1. + + Returns: + - List of slice objects used to select each time slice from the virtual source + objects in order to populate the virtual layouts. Length and order + correspond to 'events_per_ts'. + - List of the number of events in each time slice, in the order that the time + slices will appear in the VDS. Length is the number of time slices recorded. + """ + fp_per_module = meta_file["fp_per_module"][()] + + ts_keys = sorted(filter(ts_key_regex.match, meta_file.keys())) + ts_data = [meta_file[ts_key] for ts_key in ts_keys] + + time_slices = [] + num_events_per_ts = [] + # Loop through the modules, acting on the time slice metadata for each in turn. + for num_files, ts_counts in zip(fp_per_module, ts_data): + ts_counts = ts_counts[()] + # Reshape the time slice metadata for a single module into a rectangular array + # with shape (number of time slices per file, number of files), so as to be + # able to generate file-specific slices. + num_ts_per_fp = -(-ts_counts.size // num_files) + ts_counts.resize(num_ts_per_fp * num_files) + # Keep a separate record of each module's array of event counts per time slice. + num_events_per_ts.append(ts_counts) + ts_counts = ts_counts.reshape(num_ts_per_fp, num_files) + # Generate the cumulative count of events per time slice for each file. + ts_per_module = np.pad(np.cumsum(ts_counts, axis=0), ((1, 0), (0, 0))) + # Turn these counts into slices to select from a virtual source for each file. + time_slices.append( + map(slice, ts_per_module[:-1].flatten(), ts_per_module[1:].flatten()) + ) + + # Assemble all the source slices into a single list, ordered first + # chronologically, then by module number. Where modules have recorded different + # numbers of time slices, zip_longest will pad with None. + time_slices = list(chain.from_iterable(zip_longest(*time_slices))) + + # Resize each module's array of event counts per time slice so that their sizes + # match. This is achieved by zero-padding to match the None-padding of the list + # of time slices by zip_longest. + max_size = max(data.size for data in num_events_per_ts) + num_events_per_ts = np.column_stack( + [np.pad(data, (0, max_size - data.size)) for data in num_events_per_ts] + ).flatten() + + return time_slices, num_events_per_ts + + +def virtual_sources(files: List[Path], meta_file: h5py.File) -> VirtualSourceInfo: + """ + Create HDF5 virtual sources and collate ancillary information from raw data files. + + Args: + files: Lexicographically sorted list of raw file paths. + meta_file: Tristan detector metadata file object. + + Returns: + - Dictionary of event data set names and iterators of corresponding HDF5 virtual + sources. The iterator of sources for each data set is based on + itertools.cycle and so repeats indefinitely in the order in which successive + event slices should be selected to build the virtual data set. + - Dictionary of cue data set names and lists of corresponding HDF5 virtual + sources. The lists of sources have length and order as per the list of input + files. + - List of the number of cues in each data file after zero-padding has been + stripped. Length and order as per the list of input files. + - Dictionary of data set names and corresponding data types. + """ + event_sources = {key: [] for key in event_keys} + cue_sources = {key: [] for key in cue_keys} + num_cues_per_file = [] + + with ExitStack() as stack: + raw_files = [stack.enter_context(h5py.File(path, "r")) for path in files] + + dtypes = {key: raw_files[0][key].dtype for key in event_keys + cue_keys} + + for raw_file in raw_files: + # The cues are padded with zeroes. Find the first so we can slice them off. + num_cues_per_file.append(np.argmax(raw_file["cue_id"][()] == 0)) + for key in event_keys: + event_sources[key].append(h5py.VirtualSource(raw_file[key])) + for key in cue_keys: + cue_sources[key].append(h5py.VirtualSource(raw_file[key])) + + # Make a list of slices with which to divide the lexicographically sorted list of + # file paths into sub-lists, each slice corresponding to a different detector + # module. Ordered by module number. Length is equal to the number of modules in + # the detector. + file_slices = np.pad(np.cumsum(meta_file["fp_per_module"]), (1, 0)) + file_slices = list(map(slice, file_slices[:-1], file_slices[1:])) + + # Construct a carousel to select time slices in the order in which they should + # appear in the virtual layout. + for key, sources in event_sources.items(): + carousel = zip(*(cycle(sources[file_slice]) for file_slice in file_slices)) + event_sources[key] = chain.from_iterable(carousel) + + return event_sources, cue_sources, num_cues_per_file, dtypes + + +def virtual_layouts(num_events: int, num_cues: int, dtypes: Dict[str, type]) -> Layouts: + """Create a dictionary of data set names and corresponding HDF5 virtual layouts.""" + layouts = {} + for key in event_keys: + layouts[key] = h5py.VirtualLayout(shape=(num_events,), dtype=dtypes[key]) + for key in cue_keys: + layouts[key] = h5py.VirtualLayout(shape=(num_cues,), dtype=dtypes[key]) + return layouts + + +def virtual_data_set( + raw_files: List[Path], + meta_file: h5py.File, + time_slices: List[slice], + events_per_ts: Iterable[int], +) -> Layouts: + """ + Define a virtual data set in the form of virtual layouts linked to virtual sources. + + The time slices of events will appear in the VDS ordered chronologically, then by + module number to resolve ties. Cue data will simply be aggregated in + lexicographical order of the raw files from which they are sourced. + + Args: + raw_files: Lexicographically sorted list of raw file paths. + meta_file: Tristan detector metadata file object. + time_slices: List of slice objects used to select each time slice from the + virtual source objects in order to populate the virtual + layouts. Length and order correspond to 'events_per_ts'. + events_per_ts: List of the number of events in each time slice, in the order + that the time slices will appear in the VDS. Length is the + number of time slices recorded. + + Returns: + A dictionary of raw data set names and corresponding HDF5 virtual layouts. + """ + event_sources, cue_sources, cues_per_file, dtypes = virtual_sources( + raw_files, meta_file + ) + + # Generate the slices used to assign the cue data to their virtual layouts. + cue_slices = np.pad(np.cumsum(cues_per_file), (1, 0)) + num_cues = cue_slices[-1] + cue_slices = list(map(slice, cue_slices[:-1], cue_slices[1:])) + + # Generate the slices used to assign the event data to their virtual layouts. + event_slices = np.pad(np.cumsum(events_per_ts), (1, 0)) + num_events = event_slices[-1] + event_slices = list(map(slice, event_slices[:-1], event_slices[1:])) + + layouts = virtual_layouts(num_events, num_cues, dtypes) + + # Map virtual source slices to virtual layout slices. + for key, sources in event_sources.items(): + layout = layouts[key] + # HDF5 VDS doesn't like empty slices, so only assign a source time slice to a + # layout slice if the corresponding number of events is non-zero. Trying to + # assign empty slices to empty slices somehow results in a corrupted VDS. + mapping = compress(zip(event_slices, sources, time_slices), events_per_ts) + for event_slice, source, time_slice in mapping: + layout[event_slice] = source[time_slice] + for key, sources in cue_sources.items(): + layout = layouts[key] + for cue_slice, source, count in zip(cue_slices, sources, cues_per_file): + layout[cue_slice] = source[:count] + + return layouts