Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stats calc tool #2628

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/changes/2628.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a generic stats-calculation tool utilizing the PixelStatisticsCalculator.
1 change: 1 addition & 0 deletions docs/user-guide/tools.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Data Processing Tools
* ``ctapipe-quickstart``: create some default analysis configurations and a working directory
* ``ctapipe-process``: Process event data in any supported format from R0/R1/DL0 to DL1 or DL2 HDF5 files.
* ``ctapipe-apply-models``: Tool to apply machine learning models in bulk (as opposed to event by event).
* ``ctapipe-calculate-pixel-statistics``: Tool to aggregate statistics and detect outliers from pixel-wise image data.
* ``ctapipe-train-disp-reconstructor`` : Train the ML models for the `ctapipe.reco.DispReconstructor` (monoscopic reconstruction)
* ``ctapipe-train-energy-regressor``: Train the ML models for the `ctapipe.reco.EnergyRegressor` (energy estimation)
* ``ctapipe-train-particle-classifier``: Train the ML models for the `ctapipe.reco.ParticleClassifier` (gamma-hadron separation)
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ ctapipe-process = "ctapipe.tools.process:main"
ctapipe-merge = "ctapipe.tools.merge:main"
ctapipe-fileinfo = "ctapipe.tools.fileinfo:main"
ctapipe-quickstart = "ctapipe.tools.quickstart:main"
ctapipe-calculate-pixel-statistics = "ctapipe.tools.calculate_pixel_stats:main"
ctapipe-train-energy-regressor = "ctapipe.tools.train_energy_regressor:main"
ctapipe-train-particle-classifier = "ctapipe.tools.train_particle_classifier:main"
ctapipe-train-disp-reconstructor = "ctapipe.tools.train_disp_reconstructor:main"
Expand Down
32 changes: 32 additions & 0 deletions src/ctapipe/resources/calculate_pixel_stats.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
PixelStatisticsCalculatorTool:
allowed_tels: [1,2,3,4]
input_column_name: image
output_table_name: statistics

PixelStatisticsCalculator:
stats_aggregator_type:
- ["type", "LST*", "SigmaClippingAggregator"]
- ["type", "MST*", "PlainAggregator"]

chunk_shift: 1000
faulty_pixels_fraction: 0.1
outlier_detector_list:
- name: MedianOutlierDetector
apply_to: median
config:
median_range_factors: [-15, 15]
- name: RangeOutlierDetector
apply_to: median
config:
validity_range: [-20, 120]
- name: StdOutlierDetector
apply_to: std
config:
std_range_factors: [-15, 15]

SigmaClippingAggregator:
chunk_size: 2500
max_sigma: 4
iterations: 5
PlainAggregator:
chunk_size: 2500
194 changes: 194 additions & 0 deletions src/ctapipe/tools/calculate_pixel_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
"""
Perform statistics calculation from pixel-wise image data
"""

import pathlib

import numpy as np
from astropy.table import vstack

from ctapipe.core import Tool
from ctapipe.core.tool import ToolConfigurationError
from ctapipe.core.traits import (
Bool,
CInt,
Path,
Set,
Unicode,
classes_with_traits,
)
from ctapipe.instrument import SubarrayDescription
from ctapipe.io import write_table
from ctapipe.io.tableloader import TableLoader
from ctapipe.monitoring.calculator import PixelStatisticsCalculator


class PixelStatisticsCalculatorTool(Tool):
"""
Perform statistics calculation for pixel-wise image data
"""

name = "ctapipe-calculate-pixel-statistics"
description = "Perform statistics calculation for pixel-wise image data"

examples = """
To calculate statistics of pixel-wise image data files:

> ctapipe-calculate-pixel-statistics --TableLoader.input_url input.dl1.h5 --output_path /path/monitoring.h5 --overwrite

"""

allowed_tels = Set(
trait=CInt(),
default_value=None,
allow_none=True,
help=(
"List of allowed tel_ids, others will be ignored. "
"If None, all telescopes in the input stream will be included."
),
).tag(config=True)

input_column_name = Unicode(
default_value="image",
allow_none=False,
help="Column name of the pixel-wise image data to calculate statistics",
).tag(config=True)

output_table_name = Unicode(
default_value="statistics",
allow_none=False,
help="Table name of the output statistics",
).tag(config=True)

output_path = Path(
help="Output filename", default_value=pathlib.Path("monitoring.h5")
).tag(config=True)

overwrite = Bool(help="Overwrite output file if it exists").tag(config=True)

aliases = {
("i", "input_url"): "TableLoader.input_url",
("o", "output_path"): "PixelStatisticsCalculatorTool.output_path",
}

flags = {
"overwrite": (
{"PixelStatisticsCalculatorTool": {"overwrite": True}},
"Overwrite existing files",
),
}

classes = [
TableLoader,
] + classes_with_traits(PixelStatisticsCalculator)

def setup(self):
# Read the input data with the 'TableLoader'
self.input_data = TableLoader(
parent=self,
)
# Check that the input and output files are not the same
if self.input_data.input_url == self.output_path:
raise ToolConfigurationError(
"Input and output files are same. Fix your configuration / cli arguments."
)
# Load the subarray description from the input file
subarray = SubarrayDescription.from_hdf(self.input_data.input_url)
# Get the telescope ids from the input data or use the allowed_tels configuration
self.tel_ids = (
subarray.tel_ids if self.allowed_tels is None else self.allowed_tels
)
# Initialization of the statistics calculator
self.stats_calculator = PixelStatisticsCalculator(
parent=self, subarray=subarray
)

def start(self):
# Iterate over the telescope ids and calculate the statistics
for tel_id in self.tel_ids:
# Read the whole dl1 images for one particular telescope
dl1_table = self.input_data.read_telescope_events_by_id(
kosack marked this conversation as resolved.
Show resolved Hide resolved
telescopes=[
tel_id,
],
dl1_images=True,
dl1_parameters=False,
dl1_muons=False,
dl2=False,
simulated=False,
true_images=False,
true_parameters=False,
instrument=False,
pointing=False,
)[tel_id]
# Check if the chunk size does not exceed the table length of the input data
if self.stats_calculator.stats_aggregators[
self.stats_calculator.stats_aggregator_type.tel[tel_id]
].chunk_size > len(dl1_table):
raise ToolConfigurationError(
f"Change --StatisticsAggregator.chunk_size to decrease the chunk size "
f"of the aggregation to a maximum of '{len(dl1_table)}' (table length of the "
f"input data for telescope 'tel_id={tel_id}')."
)
# Check if the input column name is in the table
if self.input_column_name not in dl1_table.colnames:
raise ToolConfigurationError(
f"Column '{self.input_column_name}' not found "
f"in the input data for telescope 'tel_id={tel_id}'."
)
# Perform the first pass of the statistics calculation
aggregated_stats = self.stats_calculator.first_pass(
table=dl1_table,
tel_id=tel_id,
col_name=self.input_column_name,
)
# Check if 'chunk_shift' is selected
if self.stats_calculator.chunk_shift is not None:
# Check if there are any faulty chunks to perform a second pass over the data
if np.any(~aggregated_stats["is_valid"].data):
# Perform the second pass of the statistics calculation
aggregated_stats_secondpass = self.stats_calculator.second_pass(
table=dl1_table,
valid_chunks=aggregated_stats["is_valid"].data,
tel_id=tel_id,
col_name=self.input_column_name,
)
# Stack the statistic values from the first and second pass
aggregated_stats = vstack(
[aggregated_stats, aggregated_stats_secondpass]
)
# Sort the stacked aggregated statistic values by starting time
aggregated_stats.sort(["time_start"])
else:
self.log.info(
"No faulty chunks found for telescope 'tel_id=%d'. Skipping second pass.",
tel_id,
)
# Add metadata to the aggregated statistics
aggregated_stats.meta["event_type"] = dl1_table["event_type"][0]
aggregated_stats.meta["input_column_name"] = self.input_column_name
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd not add this as input_column_name but as prefix for the columns. Using prefixes you can also aggregate multiple columns easily.

See e.g. how we write tables in ctapipe-apply-models.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maxnoe , we discussed this with @TjarkMiener offline and we don't see much use cases for this. Also, having the metadata as a prefix for the column name can make queries awkward and schemas inflexible (i.e. set of aggregators will be fixed for each input source). Also, in this case, something like pandas.MultiIndex will be more appropriate in my opinion, but again, I don't see much use. I'd leave this as is for the moment, and modify if a corresponding use case appears.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usecase is consistency with what we doe elsewhere and loading such data with e.g.TableLoader.

I.e. my use case would be to load these data and look add aggregated statistics for multiple columns.

This has to match waht e.g. is planned for quality pipe, right? Aggregating high-level information for many of the data columns and other things and loading them at the same time, producing control plots etc.

But fine to do in a follow up PR.

I really don't like the limitation to a single column though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me what you want sounds like a pandas.MultiIndex, which is not supported in the current way of astropy, and I'm not sure whether it will ever be supported like that... Also, aggregating (flattening) and joining can be done elsewhere (e.g. when QualPipe reads it).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant that independent of the columns names / data organization I don't like that this tool can only aggregate a single column, and that using it for multiple requires separate runs of the tool with separate output files, performing the expensive data loading again and again.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tool is not only aggregating stats, but also detecting outliers and treating regions of trouble properly. Each column requires a distinct set of configuration parameters. Therefore, I think it is inevitable to perform separate runs of the tool, which are independent on each other.

# Write the aggregated statistics and their outlier mask to the output file
write_table(
aggregated_stats,
self.output_path,
f"/dl1/monitoring/telescope/{self.output_table_name}/tel_{tel_id:03d}",
overwrite=self.overwrite,
)

def finish(self):
self.log.info(
"DL1 monitoring data was stored in '%s' under '%s'",
self.output_path,
f"/dl1/monitoring/telescope/{self.output_table_name}",
)
self.log.info("Tool is shutting down")


def main():
# Run the tool
tool = PixelStatisticsCalculatorTool()
tool.run()


if __name__ == "main":
main()
1 change: 1 addition & 0 deletions src/ctapipe/tools/quickstart.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

CONFIGS_TO_WRITE = [
"base_config.yaml",
"calculate_pixel_stats.yaml",
"stage1_config.yaml",
"stage2_config.yaml",
"ml_preprocessing_config.yaml",
Expand Down
124 changes: 124 additions & 0 deletions src/ctapipe/tools/tests/test_calculate_pixel_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
#!/usr/bin/env python3
"""
Test ctapipe-calculate-pixel-statistics tool
"""

import pytest
from traitlets.config.loader import Config

from ctapipe.core import run_tool
from ctapipe.core.tool import ToolConfigurationError
from ctapipe.io import read_table
from ctapipe.tools.calculate_pixel_stats import PixelStatisticsCalculatorTool


def test_calculate_pixel_stats_tool(tmp_path, dl1_image_file):
"""check statistics calculation from pixel-wise image data files"""

# Create a configuration suitable for the test
tel_id = 3
config = Config(
{
"PixelStatisticsCalculatorTool": {
"allowed_tels": [tel_id],
"input_column_name": "image",
"output_table_name": "statistics",
},
"PixelStatisticsCalculator": {
"stats_aggregator_type": [
("id", tel_id, "PlainAggregator"),
],
},
"PlainAggregator": {
"chunk_size": 1,
},
}
)
# Set the output file path
monitoring_file = tmp_path / "monitoring.dl1.h5"
# Run the tool with the configuration and the input file
run_tool(
PixelStatisticsCalculatorTool(config=config),
argv=[
f"--input_url={dl1_image_file}",
f"--output_path={monitoring_file}",
"--overwrite",
],
cwd=tmp_path,
raises=True,
)
# Check that the output file has been created
assert monitoring_file.exists()
# Check that the output file is not empty
assert (
read_table(
monitoring_file,
path=f"/dl1/monitoring/telescope/statistics/tel_{tel_id:03d}",
)["mean"]
is not None
)


def test_tool_config_error(tmp_path, dl1_image_file):
"""check tool configuration error"""

# Run the tool with the configuration and the input file
config = Config(
{
"PixelStatisticsCalculatorTool": {
"allowed_tels": [3],
"input_column_name": "image_charges",
"output_table_name": "statistics",
}
}
)
# Set the output file path
monitoring_file = tmp_path / "monitoring.dl1.h5"
# Check if ToolConfigurationError is raised
# when the column name of the pixel-wise image data is not correct
with pytest.raises(
ToolConfigurationError, match="Column 'image_charges' not found"
):
run_tool(
PixelStatisticsCalculatorTool(config=config),
argv=[
f"--input_url={dl1_image_file}",
f"--output_path={monitoring_file}",
"--StatisticsAggregator.chunk_size=1",
"--overwrite",
],
cwd=tmp_path,
raises=True,
)
# Check if ToolConfigurationError is raised
# when the input and output files are the same
with pytest.raises(
ToolConfigurationError, match="Input and output files are same."
):
run_tool(
PixelStatisticsCalculatorTool(),
argv=[
f"--input_url={dl1_image_file}",
f"--output_path={dl1_image_file}",
"--overwrite",
],
cwd=tmp_path,
raises=True,
)
# Check if ToolConfigurationError is raised
# when the chunk size is larger than the number of events in the input file
with pytest.raises(
ToolConfigurationError, match="Change --StatisticsAggregator.chunk_size"
):
run_tool(
PixelStatisticsCalculatorTool(),
argv=[
f"--input_url={dl1_image_file}",
f"--output_path={monitoring_file}",
"--PixelStatisticsCalculatorTool.allowed_tels=3",
"--StatisticsAggregator.chunk_size=2500",
"--overwrite",
],
cwd=tmp_path,
raises=True,
)
Loading