From 14c9b00ab1733bbf459914c7917b6f66c0e21035 Mon Sep 17 00:00:00 2001 From: Maximilian Linhoff Date: Fri, 18 Nov 2022 15:32:45 +0100 Subject: [PATCH 1/6] Improve performance of TableLoader by not using join --- ctapipe/io/tableloader.py | 41 +++++++++++++++++++++++++++++++-------- 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/ctapipe/io/tableloader.py b/ctapipe/io/tableloader.py index 0eda1a1229e..1da67be7eb2 100644 --- a/ctapipe/io/tableloader.py +++ b/ctapipe/io/tableloader.py @@ -8,7 +8,7 @@ import numpy as np import tables -from astropy.table import Table, vstack +from astropy.table import Table, hstack, vstack from astropy.utils.decorators import lazyproperty from ctapipe.instrument.optics import FocalLengthKind @@ -113,6 +113,29 @@ def _join_telescope_events(table1, table2): return join_allow_empty(table1, table2, TELESCOPE_EVENT_KEYS, how) +def _merge_table_same_index(table1, table2, index_keys): + """Merge two tables assuming their primary keys are identical""" + if len(table1) != len(table2): + raise ValueError("Tables must have identical length") + + if len(table1) == 0: + return table1 + + if not np.all(table1[index_keys] == table2[index_keys]): + raise ValueError("Tables primary keys ({index_keys}) do not match") + + columns = [col for col in table2.columns if col not in index_keys] + return hstack((table1, table2[columns]), join_type="exact") + + +def _merge_subarray_tables(table1, table2): + return _merge_table_same_index(table1, table2, ["obs_id", "event_id"]) + + +def _merge_telescope_tables(table1, table2): + return _merge_table_same_index(table1, table2, ["obs_id", "event_id", "tel_id"]) + + class TableLoader(Component): """ Load telescope-event or subarray-event data from ctapipe HDF5 files @@ -310,7 +333,7 @@ def read_subarray_events(self, start=None, stop=None, keep_order=True): if self.load_simulated and SHOWER_TABLE in self.h5file: showers = read_table(self.h5file, SHOWER_TABLE, start=start, stop=stop) - table = _join_subarray_events(table, showers) + table = _merge_subarray_tables(table, showers) if self.load_dl2: if DL2_SUBARRAY_GROUP in self.h5file: @@ -325,7 +348,7 @@ def read_subarray_events(self, start=None, stop=None, keep_order=True): start=start, stop=stop, ) - table = _join_subarray_events(table, dl2) + table = _merge_subarray_tables(table, dl2) if self.load_observation_info: table = self._join_observation_info(table, start=start, stop=stop) @@ -371,19 +394,21 @@ def _read_telescope_events_for_id(self, tel_id, start=None, stop=None): if tel_id is None: raise ValueError("Please, specify a telescope ID.") - table = _empty_telescope_events_table() + table = read_table(self.h5file, "/dl1/event/telescope/trigger") + table = table[table["tel_id"] == tel_id] + table = table[slice(start, stop)] if self.load_dl1_parameters: parameters = self._read_telescope_table( PARAMETERS_GROUP, tel_id, start=start, stop=stop ) - table = _join_telescope_events(table, parameters) + table = _merge_telescope_tables(table, parameters) if self.load_dl1_images: images = self._read_telescope_table( IMAGES_GROUP, tel_id, start=start, stop=stop ) - table = _join_telescope_events(table, images) + table = _merge_telescope_tables(table, images) if self.load_dl2: if DL2_TELESCOPE_GROUP in self.h5file: @@ -397,13 +422,13 @@ def _read_telescope_events_for_id(self, tel_id, start=None, stop=None): dl2 = self._read_telescope_table( path, tel_id, start=start, stop=stop ) - table = _join_telescope_events(table, dl2) + table = _merge_telescope_tables(table, dl2) if self.load_true_images: true_images = self._read_telescope_table( TRUE_IMAGES_GROUP, tel_id, start=start, stop=stop ) - table = _join_telescope_events(table, true_images) + table = _merge_telescope_tables(table, true_images) if self.load_true_parameters: true_parameters = self._read_telescope_table( From b97877222a53f02643ea47c0aecb81b197ec3ced Mon Sep 17 00:00:00 2001 From: Maximilian Linhoff Date: Fri, 18 Nov 2022 16:15:58 +0100 Subject: [PATCH 2/6] Try fast-merging dl2 --- ctapipe/io/tableloader.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/ctapipe/io/tableloader.py b/ctapipe/io/tableloader.py index 1da67be7eb2..cc45e1e9ad5 100644 --- a/ctapipe/io/tableloader.py +++ b/ctapipe/io/tableloader.py @@ -122,7 +122,7 @@ def _merge_table_same_index(table1, table2, index_keys): return table1 if not np.all(table1[index_keys] == table2[index_keys]): - raise ValueError("Tables primary keys ({index_keys}) do not match") + raise ValueError(f"Tables primary keys ({index_keys}) do not match") columns = [col for col in table2.columns if col not in index_keys] return hstack((table1, table2[columns]), join_type="exact") @@ -348,7 +348,10 @@ def read_subarray_events(self, start=None, stop=None, keep_order=True): start=start, stop=stop, ) - table = _merge_subarray_tables(table, dl2) + try: + table = _merge_subarray_tables(table, dl2) + except ValueError: + table = _join_subarray_events(table, dl2) if self.load_observation_info: table = self._join_observation_info(table, start=start, stop=stop) From cdadae1e39f139ad8bd4915deeffa2a8099ebaa6 Mon Sep 17 00:00:00 2001 From: Maximilian Linhoff Date: Fri, 18 Nov 2022 16:27:47 +0100 Subject: [PATCH 3/6] Fall back to join if hstack is not possible --- ctapipe/io/tableloader.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/ctapipe/io/tableloader.py b/ctapipe/io/tableloader.py index cc45e1e9ad5..e3215b3dd54 100644 --- a/ctapipe/io/tableloader.py +++ b/ctapipe/io/tableloader.py @@ -2,6 +2,7 @@ Class and related functions to read DL1 (a,b) and/or DL2 (a) data from an HDF5 file produced with ctapipe-process. """ +import warnings from collections import defaultdict from pathlib import Path from typing import Dict @@ -37,6 +38,10 @@ TELESCOPE_EVENT_KEYS = ["obs_id", "event_id", "tel_id"] +class IndexNotMatching(UserWarning): + """Warning that is raised if the order of two tables is not matching as expected""" + + class ChunkIterator: """An iterator that calls a function on advancemnt @@ -113,7 +118,7 @@ def _join_telescope_events(table1, table2): return join_allow_empty(table1, table2, TELESCOPE_EVENT_KEYS, how) -def _merge_table_same_index(table1, table2, index_keys): +def _merge_table_same_index(table1, table2, index_keys, fallback_join_type="left"): """Merge two tables assuming their primary keys are identical""" if len(table1) != len(table2): raise ValueError("Tables must have identical length") @@ -122,7 +127,10 @@ def _merge_table_same_index(table1, table2, index_keys): return table1 if not np.all(table1[index_keys] == table2[index_keys]): - raise ValueError(f"Tables primary keys ({index_keys}) do not match") + warnings.warn( + "Table order does not match, falling back to join", IndexNotMatching + ) + return join_allow_empty(table1, table2, index_keys, fallback_join_type) columns = [col for col in table2.columns if col not in index_keys] return hstack((table1, table2[columns]), join_type="exact") @@ -348,10 +356,7 @@ def read_subarray_events(self, start=None, stop=None, keep_order=True): start=start, stop=stop, ) - try: - table = _merge_subarray_tables(table, dl2) - except ValueError: - table = _join_subarray_events(table, dl2) + table = _merge_subarray_tables(table, dl2) if self.load_observation_info: table = self._join_observation_info(table, start=start, stop=stop) From 994e070cba17af416b3f837a0b2211b6b6dcd5ff Mon Sep 17 00:00:00 2001 From: Maximilian Linhoff Date: Fri, 18 Nov 2022 17:33:10 +0100 Subject: [PATCH 4/6] Test for order (but is meaningless on non-merged file) --- ctapipe/io/tableloader.py | 3 +++ ctapipe/tools/tests/test_apply_models.py | 14 ++++++++++++++ 2 files changed, 17 insertions(+) diff --git a/ctapipe/io/tableloader.py b/ctapipe/io/tableloader.py index e3215b3dd54..9d7e1c14d21 100644 --- a/ctapipe/io/tableloader.py +++ b/ctapipe/io/tableloader.py @@ -430,6 +430,9 @@ def _read_telescope_events_for_id(self, tel_id, start=None, stop=None): dl2 = self._read_telescope_table( path, tel_id, start=start, stop=stop ) + if len(dl2) == 0: + continue + table = _merge_telescope_tables(table, dl2) if self.load_true_images: diff --git a/ctapipe/tools/tests/test_apply_models.py b/ctapipe/tools/tests/test_apply_models.py index 7393a9bfcd3..6ee1ba810b4 100644 --- a/ctapipe/tools/tests/test_apply_models.py +++ b/ctapipe/tools/tests/test_apply_models.py @@ -60,6 +60,12 @@ def test_apply_energy_regressor( assert f"{prefix}_tel_energy" in events.colnames assert f"{prefix}_tel_is_valid" in events.colnames + from ctapipe.io.tests.test_table_loader import check_equal_array_event_order + + trigger = read_table(output_path, "/dl1/event/subarray/trigger") + energy = read_table(output_path, "/dl2/event/subarray/energy/ExtraTreesRegressor") + check_equal_array_event_order(trigger, energy) + def test_apply_particle_classifier( particle_classifier_path, @@ -141,3 +147,11 @@ def test_apply_both( events = loader.read_telescope_events() assert "ExtraTreesClassifier_prediction" in events.colnames assert "ExtraTreesRegressor_energy" in events.colnames + + from ctapipe.io.tests.test_table_loader import check_equal_array_event_order + + trigger = read_table(output_path, "/dl1/event/subarray/trigger") + particle_clf = read_table( + output_path, "/dl2/event/subarray/classification/ExtraTreesClassifier" + ) + check_equal_array_event_order(trigger, particle_clf) From 838d7c823e9801d0b53225ef19f2fc376288a9dd Mon Sep 17 00:00:00 2001 From: Maximilian Linhoff Date: Sun, 20 Nov 2022 15:24:33 +0100 Subject: [PATCH 5/6] Use constants --- ctapipe/io/tableloader.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ctapipe/io/tableloader.py b/ctapipe/io/tableloader.py index 9d7e1c14d21..fc54daba6aa 100644 --- a/ctapipe/io/tableloader.py +++ b/ctapipe/io/tableloader.py @@ -137,11 +137,11 @@ def _merge_table_same_index(table1, table2, index_keys, fallback_join_type="left def _merge_subarray_tables(table1, table2): - return _merge_table_same_index(table1, table2, ["obs_id", "event_id"]) + return _merge_table_same_index(table1, table2, SUBARRAY_EVENT_KEYS) def _merge_telescope_tables(table1, table2): - return _merge_table_same_index(table1, table2, ["obs_id", "event_id", "tel_id"]) + return _merge_table_same_index(table1, table2, TELESCOPE_EVENT_KEYS) class TableLoader(Component): From 799925c2a2167b2bdaba9092b2603bc7295708d3 Mon Sep 17 00:00:00 2001 From: Maximilian Linhoff Date: Sun, 20 Nov 2022 19:04:12 +0100 Subject: [PATCH 6/6] Resort stereo predictions to order in input file --- ctapipe/tools/apply_models.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/ctapipe/tools/apply_models.py b/ctapipe/tools/apply_models.py index 49aa949b7fc..73be64a8c62 100644 --- a/ctapipe/tools/apply_models.py +++ b/ctapipe/tools/apply_models.py @@ -11,7 +11,9 @@ from ctapipe.core.tool import Tool from ctapipe.core.traits import Bool, Path, flag from ctapipe.io import TableLoader, write_table +from ctapipe.io.astropy_helpers import read_table from ctapipe.io.tableio import TelListToMaskTransform +from ctapipe.io.tableloader import _join_subarray_events from ctapipe.reco import EnergyRegressor, ParticleClassifier, StereoCombiner __all__ = [ @@ -203,6 +205,15 @@ def _combine(self, combiner, mono_predictions): stereo_predictions[c.name] = np.array([trafo(r) for r in c]) stereo_predictions[c.name].description = c.description + # to ensure events are stored in the correct order, + # we resort to trigger table order + trigger = read_table(self.h5file, "/dl1/event/subarray/trigger")[ + ["obs_id", "event_id"] + ] + trigger["__sort_index__"] = np.arange(len(trigger)) + stereo_predictions = _join_subarray_events(trigger, stereo_predictions) + stereo_predictions.sort("__sort_index__") + write_table( stereo_predictions, self.output_path,