Skip to content

Commit

Permalink
Merge pull request #1864 from cta-observatory/merge_dl2
Browse files Browse the repository at this point in the history
Implement merging dl2 subarray data, fixes #1861
  • Loading branch information
maxnoe authored Mar 31, 2022
2 parents 38dcb2a + 4d96197 commit 896c03b
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 113 deletions.
26 changes: 26 additions & 0 deletions ctapipe/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,32 @@ def dl2_shower_geometry_file(dl2_tmp_path, prod5_gamma_simtel_path):
return output


@pytest.fixture(scope="session")
def dl2_proton_geometry_file(dl2_tmp_path, prod5_proton_simtel_path):
"""
File containing both parameters and shower geometry from a gamma simulation set.
"""
from ctapipe.core import run_tool
from ctapipe.tools.process import ProcessorTool

output = dl2_tmp_path / "proton.training.h5"

# prevent running process multiple times in case of parallel tests
with FileLock(output.with_suffix(output.suffix + ".lock")):
if output.is_file():
return output

argv = [
f"--input={prod5_proton_simtel_path}",
f"--output={output}",
"--write-images",
"--write-stereo-shower",
"--max-events=20",
]
assert run_tool(ProcessorTool(), argv=argv, cwd=dl2_tmp_path) == 0
return output


@pytest.fixture(scope="session")
def dl2_shower_geometry_file_type(dl2_tmp_path, prod5_gamma_simtel_path):
"""
Expand Down
212 changes: 113 additions & 99 deletions ctapipe/tools/dl1_merge.py → ctapipe/tools/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,18 @@
from ..core.traits import Bool, Set, Unicode, flag, CInt
from ..instrument import SubarrayDescription

import warnings

warnings.filterwarnings("ignore", category=tables.NaturalNameWarning)

PROV = Provenance()

VERSION_KEY = "CTA PRODUCT DATA MODEL VERSION"
IMAGE_STATISTICS_PATH = "/dl1/service/image_statistics"

all_nodes = {
"/dl1/monitoring/subarray/pointing",
"/dl1/monitoring/telescope/pointing",
"/dl1/service/image_statistics",
"/dl1/service/image_statistics.__table_column_meta__",
required_nodes = {
"/dl1/event/subarray/trigger",
"/dl1/event/telescope/trigger",
"/dl1/event/telescope/parameters",
"/dl1/event/telescope/images",
"/configuration/simulation/run",
"/simulation/event/subarray/shower",
"/simulation/event/telescope/parameters",
"/simulation/event/telescope/images",
"/simulation/service/shower_distribution",
"/dl1/monitoring/subarray/pointing",
}

optional_nodes = {
"/simulation/service/shower_distribution",
"/simulation/event/telescope/images",
Expand All @@ -49,8 +37,10 @@
"/dl1/event/telescope/images",
"/dl1/service/image_statistics",
"/dl1/service/image_statistics.__table_column_meta__",
"/dl2/event/subarray/geometry",
}
simu_nodes = {

simulation_nodes = {
"/simulation/event/subarray/shower",
"/simulation/event/telescope/parameters",
"/simulation/event/telescope/images",
Expand All @@ -73,7 +63,23 @@
"/simulation/event/telescope/parameters",
"/dl1/event/telescope/parameters",
}
simu_images = {"/simulation/event/telescope/images"}

simulation_images = {"/simulation/event/telescope/images"}

dl2_subarray_nodes = {"/dl2/event/subarray/geometry"}


all_nodes = (
required_nodes
| optional_nodes
| simulation_nodes
| service_nodes
| nodes_with_tels
| image_nodes
| parameter_nodes
| simulation_images
| dl2_subarray_nodes
)


class MergeTool(Tool):
Expand Down Expand Up @@ -226,29 +232,49 @@ def setup(self):
)
sys.exit(1)

# setup required nodes
self.usable_nodes = all_nodes.copy()

if self.skip_simu_images is True:
self.usable_nodes -= simulation_images

if self.skip_images is True:
self.usable_nodes -= image_nodes

if self.skip_parameters is True:
self.usable_nodes -= parameter_nodes

# Use first file as reference to setup nodes to merge
with tables.open_file(self.input_files[0], "r") as h5file:
self.data_model_version = h5file.root._v_attrs[VERSION_KEY]

# Check if first file is simulation
if "/simulation" not in h5file.root:
self.log.info("Merging observed data")
self.usable_nodes -= simulation_nodes
self.is_simulation = False
else:
self.log.info("Merging simulated data")
self.is_simulation = True

# do not try to merge optional nodes not present in first file
for node in filter(lambda n: n not in h5file, optional_nodes):
self.log.info(f"First file does not contain {node}, ignoring")
self.usable_nodes.remove(node)

# create output file with subarray from first file
self.first_subarray = SubarrayDescription.from_hdf(self.input_files[0])
if self.allowed_tels:
self.first_subarray = self.first_subarray.select_subarray(
tel_ids=self.allowed_tels
)
self.allowed_tel_names = {"tel_%03d" % i for i in self.allowed_tels}
self.allowed_tel_names = {
f"tel_{tel_id:03d}" for tel_id in self.allowed_tels
}

self.first_subarray.to_hdf(self.output_path)
self.output_file = tables.open_file(self.output_path, mode="a")

# setup required nodes
self.usable_nodes = all_nodes

if self.skip_simu_images is True:
self.usable_nodes = self.usable_nodes - simu_images

if self.skip_images is True:
self.usable_nodes = self.usable_nodes - image_nodes

if self.skip_parameters is True:
self.usable_nodes = self.usable_nodes - parameter_nodes

def check_file_broken(self, file):
# Check that the file is not broken or any node is missing
file_path = file.root._v_file.filename
Expand Down Expand Up @@ -318,54 +344,59 @@ def add_image_statistics(self, file):
if node in file:
file.copy_node(node, newparent=target_group)

def merge_tables(self, file):
# Loop over all nodes listed in usable_nodes. Appends table to output_file
# if it already exists, otherwise creates group and copies node.
for node in self.usable_nodes:
if node in service_nodes:
continue
def _merge_tel_group(self, file, input_node):
"""Add a group that has one child table per telescope (type) to outputfile"""
if not isinstance(input_node, tables.Group):
raise TypeError(f"node must be a `tables.Group`, got {input_node}")

if node in file:
if node in nodes_with_tels:
if node not in self.output_file:
self._create_group(node)
node_path = input_node._v_pathname

if self.allowed_tels:
for tel_name in self.allowed_tel_names:
if tel_name in file.root[node]:
self._copy_or_append_tel_table(file, node, tel_name)
if input_node not in self.output_file:
self._create_group(node_path)

continue
for tel_name, table in input_node._v_children.items():
if not self.allowed_tels or tel_name in self.allowed_tel_names:
self._merge_table(file, table)

for tel in file.root[node]:
self._copy_or_append_tel_table(file, node, tel.name)
def _merge_table(self, file, input_node):
if not isinstance(input_node, tables.Table):
raise TypeError(f"node must be a `tables.Table`, got {input_node}")

continue
node_path = input_node._v_pathname
if node_path in self.output_file:
output_table = self.output_file.get_node(node_path)
output_table.append(input_node[:].astype(output_table.dtype))
else:
group_path, _ = os.path.split(node_path)
if group_path not in self.output_file:
self._create_group(group_path)

if node in self.output_file:
data = file.root[node][:]
output_node = self.output_file.get_node(node)
output_node.append(data)
else:
group_path, _ = os.path.split(node)
if group_path not in self.output_file:
self._create_group(group_path)
target_group = self.output_file.root[group_path]
file.copy_node(node_path, newparent=target_group)

target_group = self.output_file.root[group_path]
file.copy_node(node, newparent=target_group)
def merge_tables(self, file):
"""Go over all mergeable nodes and append to outputfile"""
for node_path in self.usable_nodes:

def _copy_or_append_tel_table(self, file, node, tel_name):
tel_node_path = node + "/" + tel_name
if tel_node_path in self.output_file:
output_node = self.output_file.get_node(tel_node_path)
input_node = file.root[tel_node_path]
# skip service nodes that should only be included from the first file
if node_path in service_nodes:
continue

# cast needed for some image parameters that are sometimes
# float32 and sometimes float64
output_node.append(input_node[:].astype(output_node.dtype))
else:
target_group = self.output_file.root[node]
file.copy_node(tel_node_path, newparent=target_group)
if node_path in file:
node = file.root[node_path]

# nodes with child tables per telescope (type)
if node_path in nodes_with_tels:
self._merge_tel_group(file, node)

# groups of tables (e.g. dl2)
elif isinstance(node, tables.Group):
for table in node._v_children.values():
self._merge_table(file, table)

# single table
else:
self._merge_table(file, node)

def _create_group(self, node):
head, tail = os.path.split(node)
Expand All @@ -374,50 +405,33 @@ def _create_group(self, node):
def start(self):
merged_files_counter = 0

for i, current_file in enumerate(
tqdm(
self.input_files,
desc="Merging",
unit="Files",
disable=not self.progress_bar,
)
for input_path in tqdm(
self.input_files,
desc="Merging",
unit="Files",
disable=not self.progress_bar,
):

if not HDF5EventSource.is_compatible(current_file):
self.log.critical(
f"input file {current_file} is not a supported DL1 file"
)
if not HDF5EventSource.is_compatible(input_path):
self.log.critical(f"input file {input_path} is not a supported file")
if self.skip_broken_files:
continue
else:
sys.exit(1)

with tables.open_file(current_file, mode="r") as file:
if i == 0:
self.data_model_version = file.root._v_attrs[VERSION_KEY]

# Check if first file is simulation

if "/simulation" not in file.root:
self.usable_nodes = self.usable_nodes - simu_nodes
self.log.info("Merging real data")
self.is_simulation = False
else:
self.log.info("Merging simulation-files")
self.is_simulation = True
with tables.open_file(input_path, mode="r") as h5file:

if self.check_file_broken(file) is True:
if self.check_file_broken(h5file) is True:
if self.skip_broken_files is True:
continue
else:
self.log.critical("Broken file detected.")
sys.exit(1)

self.merge_tables(file)
if IMAGE_STATISTICS_PATH in file:
self.add_image_statistics(file)
self.merge_tables(h5file)
if IMAGE_STATISTICS_PATH in h5file:
self.add_image_statistics(h5file)

PROV.add_input_file(str(current_file))
PROV.add_input_file(str(input_path))
merged_files_counter += 1

self.log.info(
Expand Down
Loading

0 comments on commit 896c03b

Please sign in to comment.