From a7d288559cc4a5258cfe4379d4f2ce0ba7fc45cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20N=C3=B6the?= Date: Wed, 30 Mar 2022 13:29:55 +0200 Subject: [PATCH 1/4] Implement merging dl2 subarray data, fixes #1861 --- ctapipe/conftest.py | 26 +++++ ctapipe/tools/{dl1_merge.py => merge.py} | 138 +++++++++++++---------- ctapipe/tools/tests/test_merge.py | 49 +++++++- setup.py | 2 +- 4 files changed, 150 insertions(+), 65 deletions(-) rename ctapipe/tools/{dl1_merge.py => merge.py} (83%) diff --git a/ctapipe/conftest.py b/ctapipe/conftest.py index b3a7c0cb89d..cec3dcce2a2 100644 --- a/ctapipe/conftest.py +++ b/ctapipe/conftest.py @@ -173,6 +173,32 @@ def dl2_shower_geometry_file(dl2_tmp_path, prod5_gamma_simtel_path): return output +@pytest.fixture(scope="session") +def dl2_proton_geometry_file(dl2_tmp_path, prod5_proton_simtel_path): + """ + File containing both parameters and shower geometry from a gamma simulation set. + """ + from ctapipe.core import run_tool + from ctapipe.tools.process import ProcessorTool + + output = dl2_tmp_path / "proton.training.h5" + + # prevent running process multiple times in case of parallel tests + with FileLock(output.with_suffix(output.suffix + ".lock")): + if output.is_file(): + return output + + argv = [ + f"--input={prod5_proton_simtel_path}", + f"--output={output}", + "--write-images", + "--write-stereo-shower", + "--max-events=20", + ] + assert run_tool(ProcessorTool(), argv=argv, cwd=dl2_tmp_path) == 0 + return output + + @pytest.fixture(scope="session") def dl2_shower_geometry_file_type(dl2_tmp_path, prod5_gamma_simtel_path): """ diff --git a/ctapipe/tools/dl1_merge.py b/ctapipe/tools/merge.py similarity index 83% rename from ctapipe/tools/dl1_merge.py rename to ctapipe/tools/merge.py index 4334e870935..aa19ce20927 100644 --- a/ctapipe/tools/dl1_merge.py +++ b/ctapipe/tools/merge.py @@ -17,30 +17,18 @@ from ..core.traits import Bool, Set, Unicode, flag, CInt from ..instrument import SubarrayDescription -import warnings - -warnings.filterwarnings("ignore", category=tables.NaturalNameWarning) PROV = Provenance() VERSION_KEY = "CTA PRODUCT DATA MODEL VERSION" IMAGE_STATISTICS_PATH = "/dl1/service/image_statistics" -all_nodes = { - "/dl1/monitoring/subarray/pointing", - "/dl1/monitoring/telescope/pointing", - "/dl1/service/image_statistics", - "/dl1/service/image_statistics.__table_column_meta__", +required_nodes = { "/dl1/event/subarray/trigger", "/dl1/event/telescope/trigger", - "/dl1/event/telescope/parameters", - "/dl1/event/telescope/images", - "/configuration/simulation/run", - "/simulation/event/subarray/shower", - "/simulation/event/telescope/parameters", - "/simulation/event/telescope/images", - "/simulation/service/shower_distribution", + "/dl1/monitoring/subarray/pointing", } + optional_nodes = { "/simulation/service/shower_distribution", "/simulation/event/telescope/images", @@ -49,7 +37,9 @@ "/dl1/event/telescope/images", "/dl1/service/image_statistics", "/dl1/service/image_statistics.__table_column_meta__", + "/dl2/event/subarray/geometry", } + simu_nodes = { "/simulation/event/subarray/shower", "/simulation/event/telescope/parameters", @@ -73,8 +63,25 @@ "/simulation/event/telescope/parameters", "/dl1/event/telescope/parameters", } + simu_images = {"/simulation/event/telescope/images"} +dl2_subarray_nodes = {"/dl2/event/subarray/geometry"} + + +all_nodes = ( + required_nodes + | optional_nodes + | simu_nodes + | service_nodes + | nodes_with_tels + | image_nodes + | parameter_nodes + | simu_images + | simu_images + | dl2_subarray_nodes +) + class MergeTool(Tool): name = "ctapipe-merge" @@ -238,16 +245,16 @@ def setup(self): self.output_file = tables.open_file(self.output_path, mode="a") # setup required nodes - self.usable_nodes = all_nodes + self.usable_nodes = all_nodes.copy() if self.skip_simu_images is True: - self.usable_nodes = self.usable_nodes - simu_images + self.usable_nodes -= simu_images if self.skip_images is True: - self.usable_nodes = self.usable_nodes - image_nodes + self.usable_nodes -= image_nodes if self.skip_parameters is True: - self.usable_nodes = self.usable_nodes - parameter_nodes + self.usable_nodes -= parameter_nodes def check_file_broken(self, file): # Check that the file is not broken or any node is missing @@ -318,54 +325,59 @@ def add_image_statistics(self, file): if node in file: file.copy_node(node, newparent=target_group) - def merge_tables(self, file): - # Loop over all nodes listed in usable_nodes. Appends table to output_file - # if it already exists, otherwise creates group and copies node. - for node in self.usable_nodes: - if node in service_nodes: - continue + def _merge_tel_group(self, file, input_node): + """Add a group that has one child table per telescope (type) to outputfile""" + if not isinstance(input_node, tables.Group): + raise TypeError(f"node must be a `tables.Table`, got {input_node}") - if node in file: - if node in nodes_with_tels: - if node not in self.output_file: - self._create_group(node) + node_path = input_node._v_pathname - if self.allowed_tels: - for tel_name in self.allowed_tel_names: - if tel_name in file.root[node]: - self._copy_or_append_tel_table(file, node, tel_name) + if input_node not in self.output_file: + self._create_group(node_path) - continue + for tel_name, table in input_node._v_children.items(): + if not self.allowed_tels or tel_name in self.allowed_tels: + self._merge_table(file, table) - for tel in file.root[node]: - self._copy_or_append_tel_table(file, node, tel.name) + def _merge_table(self, file, input_node): + if not isinstance(input_node, tables.Table): + raise TypeError(f"node must be a `tables.Table`, got {input_node}") - continue + node_path = input_node._v_pathname + if node_path in self.output_file: + output_table = self.output_file.get_node(node_path) + output_table.append(input_node[:].astype(output_table.dtype)) + else: + group_path, _ = os.path.split(node_path) + if group_path not in self.output_file: + self._create_group(group_path) - if node in self.output_file: - data = file.root[node][:] - output_node = self.output_file.get_node(node) - output_node.append(data) - else: - group_path, _ = os.path.split(node) - if group_path not in self.output_file: - self._create_group(group_path) + target_group = self.output_file.root[group_path] + file.copy_node(node_path, newparent=target_group) - target_group = self.output_file.root[group_path] - file.copy_node(node, newparent=target_group) + def merge_tables(self, file): + """Go over all mergeable nodes and append to outputfile""" + for node_path in self.usable_nodes: - def _copy_or_append_tel_table(self, file, node, tel_name): - tel_node_path = node + "/" + tel_name - if tel_node_path in self.output_file: - output_node = self.output_file.get_node(tel_node_path) - input_node = file.root[tel_node_path] + # skip service nodes that should only be included from the first file + if node_path in service_nodes: + continue - # cast needed for some image parameters that are sometimes - # float32 and sometimes float64 - output_node.append(input_node[:].astype(output_node.dtype)) - else: - target_group = self.output_file.root[node] - file.copy_node(tel_node_path, newparent=target_group) + if node_path in file: + node = file.root[node_path] + + # nodes with child tables per telescope (type) + if node_path in nodes_with_tels: + self._merge_tel_group(file, node) + + # groups of tables (e.g. dl2) + elif isinstance(node, tables.Group): + for table in node._v_children.values(): + self._merge_table(file, table) + + # single table + else: + self._merge_table(file, node) def _create_group(self, node): head, tail = os.path.split(node) @@ -397,15 +409,21 @@ def start(self): self.data_model_version = file.root._v_attrs[VERSION_KEY] # Check if first file is simulation - if "/simulation" not in file.root: self.usable_nodes = self.usable_nodes - simu_nodes self.log.info("Merging real data") self.is_simulation = False else: - self.log.info("Merging simulation-files") + self.log.info("Merging simulation files") self.is_simulation = True + for node in optional_nodes: + if node not in file.root: + self.log.info( + f"Optional node {node} not in first file, ignoring" + ) + self.usable_nodes.remove(node) + if self.check_file_broken(file) is True: if self.skip_broken_files is True: continue diff --git a/ctapipe/tools/tests/test_merge.py b/ctapipe/tools/tests/test_merge.py index 134b0dee0f2..c5afe3d896c 100644 --- a/ctapipe/tools/tests/test_merge.py +++ b/ctapipe/tools/tests/test_merge.py @@ -4,6 +4,10 @@ from ctapipe.core import run_tool from pathlib import Path +from ctapipe.io.astropy_helpers import read_table +from astropy.table import vstack +from astropy.utils.diff import report_diff_values +from io import StringIO from ctapipe.tools.process import ProcessorTool @@ -38,7 +42,7 @@ def run_stage1(input_path, cwd, output_path=None): def test_simple(tmp_path, dl1_file, dl1_proton_file): - from ctapipe.tools.dl1_merge import MergeTool + from ctapipe.tools.merge import MergeTool output = tmp_path / "merged_simple.dl1.h5" ret = run_tool( @@ -51,7 +55,7 @@ def test_simple(tmp_path, dl1_file, dl1_proton_file): def test_pattern(tmp_path: Path, dl1_file, dl1_proton_file): - from ctapipe.tools.dl1_merge import MergeTool + from ctapipe.tools.merge import MergeTool # touch a random file to test that the pattern does not use it open(dl1_file.parent / "foo.h5", "w").close() @@ -78,7 +82,7 @@ def test_pattern(tmp_path: Path, dl1_file, dl1_proton_file): def test_skip_images(tmp_path, dl1_file, dl1_proton_file): - from ctapipe.tools.dl1_merge import MergeTool + from ctapipe.tools.merge import MergeTool # create a second file so we can test the patterns output = tmp_path / "merged_no_images.dl1.h5" @@ -102,7 +106,7 @@ def test_skip_images(tmp_path, dl1_file, dl1_proton_file): def test_allowed_tels(tmp_path, dl1_file, dl1_proton_file): - from ctapipe.tools.dl1_merge import MergeTool + from ctapipe.tools.merge import MergeTool from ctapipe.instrument import SubarrayDescription # create file to test 'allowed-tels' option @@ -123,3 +127,40 @@ def test_allowed_tels(tmp_path, dl1_file, dl1_proton_file): s = SubarrayDescription.from_hdf(output) assert s.tel.keys() == {1, 2} + + tel_keys = {"tel_001", "tel_002"} + with tables.open_file(output) as f: + assert set(f.root.dl1.event.telescope.parameters._v_children).issubset(tel_keys) + assert set(f.root.dl1.event.telescope.images._v_children).issubset(tel_keys) + assert set(f.root.monitoring.telescope.pointing._v_children).issubset(tel_keys) + + +def test_dl2(tmp_path, dl2_shower_geometry_file, dl2_proton_geometry_file): + from ctapipe.tools.merge import MergeTool + + output = tmp_path / "merged.dl2.h5" + ret = run_tool( + MergeTool(), + argv=[ + f"--output={output}", + str(dl2_shower_geometry_file), + str(dl2_proton_geometry_file), + ], + ) + assert ret == 0, f"Running merge for dl2 files failed with exit code {ret}" + + table1 = read_table( + dl2_shower_geometry_file, "/dl2/event/subarray/geometry/HillasReconstructor" + ) + table2 = read_table( + dl2_proton_geometry_file, "/dl2/event/subarray/geometry/HillasReconstructor" + ) + table_merged = read_table( + output, "/dl2/event/subarray/geometry/HillasReconstructor" + ) + + diff = StringIO() + identical = report_diff_values(vstack([table1, table2]), table_merged, fileobj=diff) + assert ( + identical + ), f"Merged table not equal to individual tables. Diff:\n {diff.getvalue()}" diff --git a/setup.py b/setup.py index 7bc179fe450..298ed9d56d6 100755 --- a/setup.py +++ b/setup.py @@ -16,7 +16,7 @@ "ctapipe-reconstruct-muons = ctapipe.tools.muon_reconstruction:main", "ctapipe-display-dl1 = ctapipe.tools.display_dl1:main", "ctapipe-process = ctapipe.tools.process:main", - "ctapipe-merge = ctapipe.tools.dl1_merge:main", + "ctapipe-merge = ctapipe.tools.merge:main", "ctapipe-fileinfo = ctapipe.tools.fileinfo:main", "ctapipe-quickstart = ctapipe.tools.quickstart:main", ] From 4c836f5c51184cebb3b2b83be0ffe63a21a3b9a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20N=C3=B6the?= Date: Wed, 30 Mar 2022 13:42:19 +0200 Subject: [PATCH 2/4] Move setup code into setup --- ctapipe/tools/merge.py | 89 ++++++++++++++++++++---------------------- 1 file changed, 42 insertions(+), 47 deletions(-) diff --git a/ctapipe/tools/merge.py b/ctapipe/tools/merge.py index aa19ce20927..3a3a59fc305 100644 --- a/ctapipe/tools/merge.py +++ b/ctapipe/tools/merge.py @@ -233,17 +233,6 @@ def setup(self): ) sys.exit(1) - # create output file with subarray from first file - self.first_subarray = SubarrayDescription.from_hdf(self.input_files[0]) - if self.allowed_tels: - self.first_subarray = self.first_subarray.select_subarray( - tel_ids=self.allowed_tels - ) - self.allowed_tel_names = {"tel_%03d" % i for i in self.allowed_tels} - - self.first_subarray.to_hdf(self.output_path) - self.output_file = tables.open_file(self.output_path, mode="a") - # setup required nodes self.usable_nodes = all_nodes.copy() @@ -256,6 +245,35 @@ def setup(self): if self.skip_parameters is True: self.usable_nodes -= parameter_nodes + # Use first file as reference to setup nodes to merge + with tables.open_file(self.input_files[0], "r") as h5file: + self.data_model_version = h5file.root._v_attrs[VERSION_KEY] + + # Check if first file is simulation + if "/simulation" not in h5file.root: + self.usable_nodes = self.usable_nodes - simu_nodes + self.log.info("Merging observed data") + self.is_simulation = False + else: + self.log.info("Merging simulated data") + self.is_simulation = True + + # do not try to merge optional nodes not present in first file + for node in filter(lambda n: n not in h5file, optional_nodes): + self.log.info(f"First file does not contain {node}, ignoring") + self.usable_nodes.remove(node) + + # create output file with subarray from first file + self.first_subarray = SubarrayDescription.from_hdf(self.input_files[0]) + if self.allowed_tels: + self.first_subarray = self.first_subarray.select_subarray( + tel_ids=self.allowed_tels + ) + self.allowed_tel_names = {"tel_%03d" % i for i in self.allowed_tels} + + self.first_subarray.to_hdf(self.output_path) + self.output_file = tables.open_file(self.output_path, mode="a") + def check_file_broken(self, file): # Check that the file is not broken or any node is missing file_path = file.root._v_file.filename @@ -386,56 +404,33 @@ def _create_group(self, node): def start(self): merged_files_counter = 0 - for i, current_file in enumerate( - tqdm( - self.input_files, - desc="Merging", - unit="Files", - disable=not self.progress_bar, - ) + for input_path in tqdm( + self.input_files, + desc="Merging", + unit="Files", + disable=not self.progress_bar, ): - - if not HDF5EventSource.is_compatible(current_file): - self.log.critical( - f"input file {current_file} is not a supported DL1 file" - ) + if not HDF5EventSource.is_compatible(input_path): + self.log.critical(f"input file {input_path} is not a supported file") if self.skip_broken_files: continue else: sys.exit(1) - with tables.open_file(current_file, mode="r") as file: - if i == 0: - self.data_model_version = file.root._v_attrs[VERSION_KEY] - - # Check if first file is simulation - if "/simulation" not in file.root: - self.usable_nodes = self.usable_nodes - simu_nodes - self.log.info("Merging real data") - self.is_simulation = False - else: - self.log.info("Merging simulation files") - self.is_simulation = True - - for node in optional_nodes: - if node not in file.root: - self.log.info( - f"Optional node {node} not in first file, ignoring" - ) - self.usable_nodes.remove(node) + with tables.open_file(input_path, mode="r") as h5file: - if self.check_file_broken(file) is True: + if self.check_file_broken(h5file) is True: if self.skip_broken_files is True: continue else: self.log.critical("Broken file detected.") sys.exit(1) - self.merge_tables(file) - if IMAGE_STATISTICS_PATH in file: - self.add_image_statistics(file) + self.merge_tables(h5file) + if IMAGE_STATISTICS_PATH in h5file: + self.add_image_statistics(h5file) - PROV.add_input_file(str(current_file)) + PROV.add_input_file(str(input_path)) merged_files_counter += 1 self.log.info( From 148d28704d16056f7b1f040b6d8d68f7c18b43f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20N=C3=B6the?= Date: Wed, 30 Mar 2022 15:21:07 +0200 Subject: [PATCH 3/4] Fix allowed_telescopes check --- ctapipe/tools/merge.py | 6 ++++-- ctapipe/tools/tests/test_merge.py | 28 +++++++++++++--------------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/ctapipe/tools/merge.py b/ctapipe/tools/merge.py index 3a3a59fc305..2cf4ab090fc 100644 --- a/ctapipe/tools/merge.py +++ b/ctapipe/tools/merge.py @@ -269,7 +269,9 @@ def setup(self): self.first_subarray = self.first_subarray.select_subarray( tel_ids=self.allowed_tels ) - self.allowed_tel_names = {"tel_%03d" % i for i in self.allowed_tels} + self.allowed_tel_names = { + f"tel_{tel_id:03d}" for tel_id in self.allowed_tels + } self.first_subarray.to_hdf(self.output_path) self.output_file = tables.open_file(self.output_path, mode="a") @@ -354,7 +356,7 @@ def _merge_tel_group(self, file, input_node): self._create_group(node_path) for tel_name, table in input_node._v_children.items(): - if not self.allowed_tels or tel_name in self.allowed_tels: + if not self.allowed_tels or tel_name in self.allowed_tel_names: self._merge_table(file, table) def _merge_table(self, file, input_node): diff --git a/ctapipe/tools/tests/test_merge.py b/ctapipe/tools/tests/test_merge.py index c5afe3d896c..c1d2408006c 100644 --- a/ctapipe/tools/tests/test_merge.py +++ b/ctapipe/tools/tests/test_merge.py @@ -111,28 +111,26 @@ def test_allowed_tels(tmp_path, dl1_file, dl1_proton_file): # create file to test 'allowed-tels' option output = tmp_path / "merged_allowed_tels.dl1.h5" - ret = run_tool( - MergeTool(), - argv=[ - str(dl1_file), - str(dl1_proton_file), - f"--output={output}", - "--allowed-tels=1", - "--allowed-tels=2", - "--overwrite", - ], - cwd=tmp_path, - ) + + allowed_tels = {25, 125} + + argv = [str(dl1_file), str(dl1_proton_file), f"--output={output}", "--overwrite"] + for tel_id in allowed_tels: + argv.append(f"--allowed-tels={tel_id}") + + ret = run_tool(MergeTool(), argv=argv, cwd=tmp_path) assert ret == 0 s = SubarrayDescription.from_hdf(output) - assert s.tel.keys() == {1, 2} + assert s.tel.keys() == allowed_tels - tel_keys = {"tel_001", "tel_002"} + tel_keys = {f"tel_{tel_id:03d}" for tel_id in allowed_tels} with tables.open_file(output) as f: assert set(f.root.dl1.event.telescope.parameters._v_children).issubset(tel_keys) assert set(f.root.dl1.event.telescope.images._v_children).issubset(tel_keys) - assert set(f.root.monitoring.telescope.pointing._v_children).issubset(tel_keys) + assert set(f.root.dl1.monitoring.telescope.pointing._v_children).issubset( + tel_keys + ) def test_dl2(tmp_path, dl2_shower_geometry_file, dl2_proton_geometry_file): From 4d961979d82573fa477010627424f4d53333445e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20N=C3=B6the?= Date: Wed, 30 Mar 2022 20:00:31 +0200 Subject: [PATCH 4/4] Fix duplicated key and wrong error message --- ctapipe/tools/merge.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/ctapipe/tools/merge.py b/ctapipe/tools/merge.py index 2cf4ab090fc..e1e64cb8c69 100644 --- a/ctapipe/tools/merge.py +++ b/ctapipe/tools/merge.py @@ -40,7 +40,7 @@ "/dl2/event/subarray/geometry", } -simu_nodes = { +simulation_nodes = { "/simulation/event/subarray/shower", "/simulation/event/telescope/parameters", "/simulation/event/telescope/images", @@ -64,7 +64,7 @@ "/dl1/event/telescope/parameters", } -simu_images = {"/simulation/event/telescope/images"} +simulation_images = {"/simulation/event/telescope/images"} dl2_subarray_nodes = {"/dl2/event/subarray/geometry"} @@ -72,13 +72,12 @@ all_nodes = ( required_nodes | optional_nodes - | simu_nodes + | simulation_nodes | service_nodes | nodes_with_tels | image_nodes | parameter_nodes - | simu_images - | simu_images + | simulation_images | dl2_subarray_nodes ) @@ -237,7 +236,7 @@ def setup(self): self.usable_nodes = all_nodes.copy() if self.skip_simu_images is True: - self.usable_nodes -= simu_images + self.usable_nodes -= simulation_images if self.skip_images is True: self.usable_nodes -= image_nodes @@ -251,8 +250,8 @@ def setup(self): # Check if first file is simulation if "/simulation" not in h5file.root: - self.usable_nodes = self.usable_nodes - simu_nodes self.log.info("Merging observed data") + self.usable_nodes -= simulation_nodes self.is_simulation = False else: self.log.info("Merging simulated data") @@ -348,7 +347,7 @@ def add_image_statistics(self, file): def _merge_tel_group(self, file, input_node): """Add a group that has one child table per telescope (type) to outputfile""" if not isinstance(input_node, tables.Group): - raise TypeError(f"node must be a `tables.Table`, got {input_node}") + raise TypeError(f"node must be a `tables.Group`, got {input_node}") node_path = input_node._v_pathname