Skip to content

Commit

Permalink
Implement merging dl2 subarray data, fixes #1861
Browse files Browse the repository at this point in the history
  • Loading branch information
maxnoe committed Mar 30, 2022
1 parent 2923760 commit 4a59f52
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 66 deletions.
26 changes: 26 additions & 0 deletions ctapipe/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,32 @@ def dl2_shower_geometry_file(dl2_tmp_path, prod5_gamma_simtel_path):
return output


@pytest.fixture(scope="session")
def dl2_proton_geometry_file(dl2_tmp_path, prod5_proton_simtel_path):
"""
File containing both parameters and shower geometry from a gamma simulation set.
"""
from ctapipe.core import run_tool
from ctapipe.tools.process import ProcessorTool

output = dl2_tmp_path / "proton.training.h5"

# prevent running process multiple times in case of parallel tests
with FileLock(output.with_suffix(output.suffix + ".lock")):
if output.is_file():
return output

argv = [
f"--input={prod5_proton_simtel_path}",
f"--output={output}",
"--write-images",
"--write-stereo-shower",
"--max-events=20",
]
assert run_tool(ProcessorTool(), argv=argv, cwd=dl2_tmp_path) == 0
return output


@pytest.fixture(scope="session")
def dl2_shower_geometry_file_type(dl2_tmp_path, prod5_gamma_simtel_path):
"""
Expand Down
141 changes: 80 additions & 61 deletions ctapipe/tools/dl1_merge.py → ctapipe/tools/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,18 @@
from ..core.traits import Bool, Set, Unicode, flag, CInt
from ..instrument import SubarrayDescription

import warnings

warnings.filterwarnings("ignore", category=tables.NaturalNameWarning)

PROV = Provenance()

VERSION_KEY = "CTA PRODUCT DATA MODEL VERSION"
IMAGE_STATISTICS_PATH = "/dl1/service/image_statistics"

all_nodes = {
"/dl1/monitoring/subarray/pointing",
"/dl1/monitoring/telescope/pointing",
"/dl1/service/image_statistics",
"/dl1/service/image_statistics.__table_column_meta__",
required_nodes = {
"/dl1/event/subarray/trigger",
"/dl1/event/telescope/trigger",
"/dl1/event/telescope/parameters",
"/dl1/event/telescope/images",
"/configuration/simulation/run",
"/simulation/event/subarray/shower",
"/simulation/event/telescope/parameters",
"/simulation/event/telescope/images",
"/simulation/service/shower_distribution",
"/dl1/monitoring/subarray/pointing",
}

optional_nodes = {
"/simulation/service/shower_distribution",
"/simulation/event/telescope/images",
Expand All @@ -49,7 +37,9 @@
"/dl1/event/telescope/images",
"/dl1/service/image_statistics",
"/dl1/service/image_statistics.__table_column_meta__",
"/dl2/event/subarray/geometry",
}

simu_nodes = {
"/simulation/event/subarray/shower",
"/simulation/event/telescope/parameters",
Expand All @@ -73,8 +63,25 @@
"/simulation/event/telescope/parameters",
"/dl1/event/telescope/parameters",
}

simu_images = {"/simulation/event/telescope/images"}

dl2_subarray_nodes = {"/dl2/event/subarray/geometry"}


all_nodes = (
required_nodes
| optional_nodes
| simu_nodes
| service_nodes
| nodes_with_tels
| image_nodes
| parameter_nodes
| simu_images
| simu_images
| dl2_subarray_nodes
)


class MergeTool(Tool):
name = "ctapipe-merge"
Expand Down Expand Up @@ -238,16 +245,16 @@ def setup(self):
self.output_file = tables.open_file(self.output_path, mode="a")

# setup required nodes
self.usable_nodes = all_nodes
self.usable_nodes = all_nodes.copy()

if self.skip_simu_images is True:
self.usable_nodes = self.usable_nodes - simu_images
self.usable_nodes -= simu_images

if self.skip_images is True:
self.usable_nodes = self.usable_nodes - image_nodes
self.usable_nodes -= image_nodes

if self.skip_parameters is True:
self.usable_nodes = self.usable_nodes - parameter_nodes
self.usable_nodes -= parameter_nodes

def check_file_broken(self, file):
# Check that the file is not broken or any node is missing
Expand Down Expand Up @@ -315,56 +322,62 @@ def add_image_statistics(self, file):
)

for node in service_nodes:
file.copy_node(node, newparent=target_group)
if node in file:
file.copy_node(node, newparent=target_group)

def merge_tables(self, file):
# Loop over all nodes listed in usable_nodes. Appends table to output_file
# if it already exists, otherwise creates group and copies node.
for node in self.usable_nodes:
if node in service_nodes:
continue
def _merge_tel_group(self, file, input_node):
"""Add a group that has one child table per telescope (type) to outputfile"""
if not isinstance(input_node, tables.Group):
raise TypeError(f"node must be a `tables.Table`, got {input_node}")

if node in file:
if node in nodes_with_tels:
if node not in self.output_file:
self._create_group(node)
node_path = input_node._v_pathname

if self.allowed_tels:
for tel_name in self.allowed_tel_names:
if tel_name in file.root[node]:
self._copy_or_append_tel_table(file, node, tel_name)
if input_node not in self.output_file:
self._create_group(node_path)

continue
for tel_name, table in input_node._v_children.items():
if not self.allowed_tels or tel_name in self.allowed_tels:
self._merge_table(file, table)

for tel in file.root[node]:
self._copy_or_append_tel_table(file, node, tel.name)
def _merge_table(self, file, input_node):
if not isinstance(input_node, tables.Table):
raise TypeError(f"node must be a `tables.Table`, got {input_node}")

continue
node_path = input_node._v_pathname
if node_path in self.output_file:
output_table = self.output_file.get_node(node_path)
output_table.append(input_node[:].astype(output_table.dtype))
else:
group_path, _ = os.path.split(node_path)
if group_path not in self.output_file:
self._create_group(group_path)

if node in self.output_file:
data = file.root[node][:]
output_node = self.output_file.get_node(node)
output_node.append(data)
else:
group_path, _ = os.path.split(node)
if group_path not in self.output_file:
self._create_group(group_path)
target_group = self.output_file.root[group_path]
file.copy_node(node_path, newparent=target_group)

target_group = self.output_file.root[group_path]
file.copy_node(node, newparent=target_group)
def merge_tables(self, file):
"""Go over all mergeable nodes and append to outputfile"""
for node_path in self.usable_nodes:

def _copy_or_append_tel_table(self, file, node, tel_name):
tel_node_path = node + "/" + tel_name
if tel_node_path in self.output_file:
output_node = self.output_file.get_node(tel_node_path)
input_node = file.root[tel_node_path]
# skip service nodes that should only be included from the first file
if node_path in service_nodes:
continue

# cast needed for some image parameters that are sometimes
# float32 and sometimes float64
output_node.append(input_node[:].astype(output_node.dtype))
else:
target_group = self.output_file.root[node]
file.copy_node(tel_node_path, newparent=target_group)
if node_path in file:
node = file.root[node_path]

# nodes with child tables per telescope (type)
if node_path in nodes_with_tels:
self._merge_tel_group(file, node)

# groups of tables (e.g. dl2)
elif isinstance(node, tables.Group):
for table in node._v_children.values():
self._merge_table(file, table)

# single table
else:
self._merge_table(file, node)

def _create_group(self, node):
head, tail = os.path.split(node)
Expand Down Expand Up @@ -396,15 +409,21 @@ def start(self):
self.data_model_version = file.root._v_attrs[VERSION_KEY]

# Check if first file is simulation

if "/simulation" not in file.root:
self.usable_nodes = self.usable_nodes - simu_nodes
self.log.info("Merging real data")
self.is_simulation = False
else:
self.log.info("Merging simulation-files")
self.log.info("Merging simulation files")
self.is_simulation = True

for node in optional_nodes:
if node not in file.root:
self.log.info(
f"Optional node {node} not in first file, ignoring"
)
self.usable_nodes.remove(node)

if self.check_file_broken(file) is True:
if self.skip_broken_files is True:
continue
Expand Down
49 changes: 45 additions & 4 deletions ctapipe/tools/tests/test_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

from ctapipe.core import run_tool
from pathlib import Path
from ctapipe.io.astropy_helpers import read_table
from astropy.table import vstack
from astropy.utils.diff import report_diff_values
from io import StringIO

from ctapipe.tools.process import ProcessorTool

Expand Down Expand Up @@ -38,7 +42,7 @@ def run_stage1(input_path, cwd, output_path=None):


def test_simple(tmp_path, dl1_file, dl1_proton_file):
from ctapipe.tools.dl1_merge import MergeTool
from ctapipe.tools.merge import MergeTool

output = tmp_path / "merged_simple.dl1.h5"
ret = run_tool(
Expand All @@ -51,7 +55,7 @@ def test_simple(tmp_path, dl1_file, dl1_proton_file):


def test_pattern(tmp_path: Path, dl1_file, dl1_proton_file):
from ctapipe.tools.dl1_merge import MergeTool
from ctapipe.tools.merge import MergeTool

# touch a random file to test that the pattern does not use it
open(dl1_file.parent / "foo.h5", "w").close()
Expand All @@ -78,7 +82,7 @@ def test_pattern(tmp_path: Path, dl1_file, dl1_proton_file):


def test_skip_images(tmp_path, dl1_file, dl1_proton_file):
from ctapipe.tools.dl1_merge import MergeTool
from ctapipe.tools.merge import MergeTool

# create a second file so we can test the patterns
output = tmp_path / "merged_no_images.dl1.h5"
Expand All @@ -102,7 +106,7 @@ def test_skip_images(tmp_path, dl1_file, dl1_proton_file):


def test_allowed_tels(tmp_path, dl1_file, dl1_proton_file):
from ctapipe.tools.dl1_merge import MergeTool
from ctapipe.tools.merge import MergeTool
from ctapipe.instrument import SubarrayDescription

# create file to test 'allowed-tels' option
Expand All @@ -123,3 +127,40 @@ def test_allowed_tels(tmp_path, dl1_file, dl1_proton_file):

s = SubarrayDescription.from_hdf(output)
assert s.tel.keys() == {1, 2}

tel_keys = {"tel_001", "tel_002"}
with tables.open_file(output) as f:
assert set(f.root.dl1.event.telescope.parameters._v_children).issubset(tel_keys)
assert set(f.root.dl1.event.telescope.images._v_children).issubset(tel_keys)
assert set(f.root.monitoring.telescope.pointing._v_children).issubset(tel_keys)


def test_dl2(tmp_path, dl2_shower_geometry_file, dl2_proton_geometry_file):
from ctapipe.tools.merge import MergeTool

output = tmp_path / "merged.dl2.h5"
ret = run_tool(
MergeTool(),
argv=[
f"--output={output}",
str(dl2_shower_geometry_file),
str(dl2_proton_geometry_file),
],
)
assert ret == 0, f"Running merge for dl2 files failed with exit code {ret}"

table1 = read_table(
dl2_shower_geometry_file, "/dl2/event/subarray/geometry/HillasReconstructor"
)
table2 = read_table(
dl2_proton_geometry_file, "/dl2/event/subarray/geometry/HillasReconstructor"
)
table_merged = read_table(
output, "/dl2/event/subarray/geometry/HillasReconstructor"
)

diff = StringIO()
identical = report_diff_values(vstack([table1, table2]), table_merged, fileobj=diff)
assert (
identical
), f"Merged table not equal to individual tables. Diff:\n {diff.getvalue()}"
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"ctapipe-reconstruct-muons = ctapipe.tools.muon_reconstruction:main",
"ctapipe-display-dl1 = ctapipe.tools.display_dl1:main",
"ctapipe-process = ctapipe.tools.process:main",
"ctapipe-merge = ctapipe.tools.dl1_merge:main",
"ctapipe-merge = ctapipe.tools.merge:main",
"ctapipe-fileinfo = ctapipe.tools.fileinfo:main",
"ctapipe-quickstart = ctapipe.tools.quickstart:main",
]
Expand Down

0 comments on commit 4a59f52

Please sign in to comment.