From e525a60013b2c0b4f767a14b58afa0e7115e2017 Mon Sep 17 00:00:00 2001 From: watsonjj Date: Fri, 21 Jun 2019 00:36:15 +0200 Subject: [PATCH 1/2] Refactoring of GainSelector --- ctapipe/calib/camera/gainselection.py | 214 ++++++------------ .../calib/camera/tests/test_gainselection.py | 140 +++--------- 2 files changed, 109 insertions(+), 245 deletions(-) diff --git a/ctapipe/calib/camera/gainselection.py b/ctapipe/calib/camera/gainselection.py index 53559f6ffcc..52ad01d0d08 100644 --- a/ctapipe/calib/camera/gainselection.py +++ b/ctapipe/calib/camera/gainselection.py @@ -1,173 +1,107 @@ """ Algorithms to select correct gain channel """ -from abc import ABCMeta, abstractclassmethod - +from abc import abstractmethod +from enum import IntEnum import numpy as np +from ctapipe.core import Component, traits -from ...core import Component, traits -from ...utils import get_table_dataset - -__all__ = ['GainSelector', - 'ThresholdGainSelector', - 'SimpleGainSelector', - 'pick_gain_channel'] - - -def pick_gain_channel(waveforms, threshold, select_by_sample=False): - """ - the PMTs on some cameras have 2 gain channels. select one - according to a threshold. - - Parameters: - ----------- - waveforms: np.ndarray - Array of shape (N_gain, N_pix, N_samp) - threshold: float - threshold (in PE/sample) of when to switch to low-gain channel - select_by_sample: bool - if true, select only low-gain *samples* when high-gain is over - threshold - - Returns - ------- - tuple: - gain-selected intensity, boolean array of which channel was chosen - """ - - # if we have 2 channels: - if waveforms.shape[0] == 2: - waveforms = np.squeeze(waveforms) - new_waveforms = waveforms[0].copy() - - if select_by_sample: - # replace any samples that are above threshold with low-gain ones: - gain_mask = waveforms[0] > threshold - new_waveforms[gain_mask] = waveforms[1][gain_mask] - else: - # use entire low-gain waveform if any sample of high-gain - # waveform is above threshold - gain_mask = (waveforms[0] > threshold).any(axis=1) - new_waveforms[gain_mask] = waveforms[1][gain_mask] +__all__ = [ + 'GainChannel', + 'GainSelector', + 'ManualGainSelector', + 'ThresholdGainSelector', +] - elif waveforms.shape[0] == 1: - new_waveforms = np.squeeze(waveforms) - gain_mask = np.zeros_like(new_waveforms).astype(bool) - else: - raise ValueError("input waveforms has shape %s. not sure what to do " - "with that.", waveforms.shape) +class GainChannel(IntEnum): + HIGH = 0 + LOW = 1 - return new_waveforms, gain_mask - -class GainSelector(Component, metaclass=ABCMeta): +class GainSelector(Component): """ Base class for algorithms that reduce a 2-gain-channel waveform to a single waveform. """ - @abstractclassmethod - def select_gains(self, cam_id, multi_gain_waveform): + + def __call__(self, waveforms): """ - Takes an input waveform and cam_id and performs gain selection + Reduce the waveform to a single gain channel + + Parameters + ---------- + waveforms : ndarray + Waveforms stored in a numpy array of shape + (n_chan, n_pix, n_samples). Returns ------- - tuple(ndarray, ndarray): - (waveform, gain_mask), where the gain_mask is a boolean array of - which gain channel was used. + reduced_waveforms : ndarray + Waveform with a single channel + Shape: (n_pix, n_samples) """ - pass + if waveforms.ndim == 2: # Return if already gain selected + return waveforms + elif waveforms.ndim == 3: + n_channels, n_pixels, n_samples = waveforms.shape + if n_channels == 1: # Reduce if already single channel + return waveforms[0] + else: + pixel_channel = self.select_channel(waveforms) + return waveforms[pixel_channel, np.arange(n_pixels)] + else: + raise ValueError( + f"Cannot handle waveform array of shape: {waveforms.ndim}" + ) + @abstractmethod + def select_channel(self, waveforms): + """ + Abstract method to be defined by a GainSelector subclass. -class NullGainSelector(GainSelector): - """ - do no gain selection, leaving possibly 2 gain channels at the DL1 level. - this may break further steps in the chain if they do not expect 2 gains. - """ + Call the relevant functions to decide on the gain channel used for + each pixel. - def select_gains(self, cam_id, multi_gain_waveform): - return multi_gain_waveform, np.ones(multi_gain_waveform.shape[1]) + Parameters + ---------- + waveforms : ndarray + Waveforms stored in a numpy array of shape + (n_chan, n_pix, n_samples). + + Returns + ------- + pixel_channel : ndarray + Gain channel to use for each pixel + Shape: n_pix + Dtype: int + """ -class SimpleGainSelector(GainSelector): +class ManualGainSelector(GainSelector): """ - Simply choose a single gain channel always. + Manually choose a gain channel. """ + channel = traits.CaselessStrEnum( + ["HIGH", "LOW"], + default_value="HIGH", + help="Which gain channel to retain" + ).tag(config=True) - channel = traits.Int(default_value=0, help="which gain channel to " - "retain").tag(config=True) - - def select_gains(self, cam_id, multi_gain_waveform): - return ( - multi_gain_waveform[self.channel], - (np.ones(multi_gain_waveform.shape[1]) * self.channel).astype( - np.bool) - ) + def select_channel(self, waveforms): + return GainChannel[self.channel] class ThresholdGainSelector(GainSelector): """ - Select gain channel using fixed-threshold for any sample in the waveform. - The thresholds are loaded from an `astropy.table.Table` that must contain - two columns: `cam_id` (the name of the camera) and `gain_threshold_pe`, - the threshold in photo-electrons per sample at which the switch should - occur. - - Parameters - ---------- - threshold_table_name: str - Name of gain channel table to load - select_by_sample: bool - If True, replaces only the waveform samples that are above - the threshold with low-gain versions, otherwise the full - low-gain waveform is used. - - Attributes - ---------- - thresholds: dict - mapping of cam_id to threshold value + Select gain channel according to a maximum threshold value. """ - - threshold_table_name = traits.Unicode( - default_value='gain_channel_thresholds', - help='Name of gain channel table to load' - ).tag(config=True) - - select_by_sample = traits.Bool( - default_value=False, - help='If True, replaces only the waveform samples that are above ' - 'the threshold with low-gain versions, otherwise the full ' - 'low-gain waveform is used.' + threshold = traits.Float( + default_value=1000, + help="Threshold value in waveform sample units. If a waveform " + "contains a sample above this threshold, use the low gain " + "channel for that pixel." ).tag(config=True) - def __init__(self, config=None, parent=None, **kwargs): - super().__init__(config=config, parent=parent, **kwargs) - - tab = get_table_dataset( - self.threshold_table_name, - role='dl0.tel.svc.gain_thresholds' - ) - self.thresholds = dict(zip(tab['cam_id'], tab['gain_threshold_pe'])) - self.log.debug("Loaded threshold table: \n %s", tab) - - def __str__(self): - return f"{self.__class__.__name__}({self.thresholds})" - - def select_gains(self, cam_id, multi_gain_waveform): - - try: - threshold = self.thresholds[cam_id] - except KeyError: - raise KeyError( - "Camera ID '{}' not found in the gain-threshold " - "table '{}'".format(cam_id, self.threshold_table_name) - ) - - waveform, gain_mask = pick_gain_channel( - waveforms=multi_gain_waveform, - threshold=threshold, - select_by_sample=self.select_by_sample - ) - - return waveform, gain_mask + def select_channel(self, waveforms): + return (waveforms[0] > self.threshold).any(axis=1).astype(int) diff --git a/ctapipe/calib/camera/tests/test_gainselection.py b/ctapipe/calib/camera/tests/test_gainselection.py index 8c90d344d6e..f694a63bbaf 100644 --- a/ctapipe/calib/camera/tests/test_gainselection.py +++ b/ctapipe/calib/camera/tests/test_gainselection.py @@ -1,118 +1,48 @@ import numpy as np -import pytest +from ctapipe.calib.camera.gainselection import ManualGainSelector, \ + ThresholdGainSelector, GainChannel, GainSelector -from ctapipe.calib.camera.gainselection import ThresholdGainSelector -from ctapipe.calib.camera.gainselection import SimpleGainSelector -from ctapipe.calib.camera.gainselection import pick_gain_channel +class TestGainSelector(GainSelector): + def select_channel(self, waveforms): + return GainChannel.HIGH -def test_pick_gain_channel(): - threshold = 100 - good_hg_value = 35 - good_lg_value = 50 - dummy_waveforms = np.ones((2, 1000, 30)) * good_lg_value - dummy_waveforms[1:] = good_hg_value # high gains +def test_gain_selector(): + shape = (2, 2048, 128) + waveforms = np.indices(shape)[1] + waveforms[1] *= 2 - # set pixels above 500's sample 13 to a high value (to trigger switch) - dummy_waveforms[0, 500:, 13:15] = threshold + 10 + gs = TestGainSelector() + waveforms_gs = gs(waveforms) + np.testing.assert_equal(waveforms[GainChannel.HIGH], waveforms_gs) + waveforms_gs = gs(waveforms[0]) + np.testing.assert_equal(waveforms_gs, waveforms[0]) + waveforms_gs = gs(waveforms[[0]]) + np.testing.assert_equal(waveforms_gs, waveforms[0]) - # First test the default mode of replacing the full waveform: - # at the end, the final waveforms should be of shape (1000,30) and - # pixels from 500 and above should have the low-gain value of good_hg_value - new_waveforms, gain_mask = pick_gain_channel( - waveforms=dummy_waveforms, - threshold=threshold, - select_by_sample=False - ) +def test_manual_gain_selector(): + shape = (2, 2048, 128) + waveforms = np.indices(shape)[1] + waveforms[1] *= 2 - assert gain_mask.shape == (1000,) - assert new_waveforms.shape == (1000, 30) - assert (new_waveforms[500:] == good_hg_value).all() - assert (new_waveforms[:500] == good_lg_value).all() + gs_high = ManualGainSelector(channel="HIGH") + waveforms_gs_high = gs_high(waveforms) + np.testing.assert_equal(waveforms[GainChannel.HIGH], waveforms_gs_high) - # Next test the optional mode of replacing only the samples above threshold: - # at the end, the final waveforms should be of shape (1000,30) and - # pixels from 500 and above and with sample number 15:13 should have the - # low-gain value of good_hg_value: - - new_waveforms, gain_mask = pick_gain_channel( - waveforms=dummy_waveforms, - threshold=threshold, - select_by_sample=True - ) - - assert gain_mask.shape == new_waveforms.shape - assert new_waveforms.shape == (1000, 30) - assert (new_waveforms[500:, 13:15] == good_hg_value).all() - assert (new_waveforms[500:, :13] == good_lg_value).all() - assert (new_waveforms[500:, 15:] == good_lg_value).all() - - -def test_pick_gain_channel_bad_input(): - input_waveforms = np.arange(10).reshape(1, 10) - waveforms, gain_mask = pick_gain_channel(input_waveforms, threshold=4) - assert gain_mask is not None - assert (waveforms == input_waveforms).all() + gs_high = ManualGainSelector(channel="LOW") + waveforms_gs_low = gs_high(waveforms) + np.testing.assert_equal(waveforms[GainChannel.LOW], waveforms_gs_low) def test_threshold_gain_selector(): - selector = ThresholdGainSelector() - print(selector) - - assert 'NectarCam' in selector.thresholds - - threshold = selector.thresholds['NectarCam'] - good_hg_value = 35 - good_lg_value = 50 - dummy_waveforms = np.ones((2, 1000, 30)) * good_lg_value - dummy_waveforms[1:] = good_hg_value # - dummy_waveforms[0, 500:, 13:15] = threshold + 10 - - new_waveforms, gain_mask = selector.select_gains("NectarCam", - dummy_waveforms) - assert gain_mask.shape == (1000,) - assert new_waveforms.shape == (1000, 30) - assert (new_waveforms[500:] == good_hg_value).all() - assert (new_waveforms[:500] == good_lg_value).all() - - selector.select_by_sample = True - - new_waveforms, gain_mask = selector.select_gains("NectarCam", - dummy_waveforms) - - assert new_waveforms.shape == (1000, 30) - assert (new_waveforms[500:, 13:15] == good_hg_value).all() - assert (new_waveforms[500:, :13] == good_lg_value).all() - assert (new_waveforms[500:, 15:] == good_lg_value).all() - assert gain_mask.shape == new_waveforms.shape - - # test some failures: - # Camera that doesn't have a threshold: - with pytest.raises(KeyError): - selector.select_gains("NonExistantCamera", dummy_waveforms) - - # 3-gain channel input: - with pytest.raises(ValueError): - selector.select_gains("NectarCam", np.ones((3, 1000, 30))) - - # 1-gain channel input: - wf0 = np.ones((1, 1000, 1)) - wf1, gm = selector.select_gains("ASTRICam", wf0) - assert wf1.shape == (1000,) - assert gm.shape == (1000,) - - -def test_simple_gain_selector(): - gs = SimpleGainSelector() - - for chan in [0, 1]: - gs.channel = chan - - waveforms_2g = np.random.normal(size=(2, 1000, 30)) - waveforms_1g, mask = gs.select_gains("NectarCam", waveforms_2g) - - assert waveforms_1g.shape == (1000, 30) - assert (waveforms_1g == waveforms_2g[chan]).all() - assert mask.shape == (1000,) + shape = (2, 2048, 128) + waveforms = np.zeros(shape) + waveforms[1] = 1 + waveforms[0, 0] = 100 + + gs = ThresholdGainSelector(threshold=50) + waveforms_gs = gs(waveforms) + assert (waveforms_gs[0] == 1).all() + assert (waveforms_gs[np.arange(1, 2048)] == 0).all() From b49f4a98c36b3ad19081deabb846975b26b6e789 Mon Sep 17 00:00:00 2001 From: watsonjj Date: Fri, 21 Jun 2019 07:38:03 +0200 Subject: [PATCH 2/2] Codacy fixes --- ctapipe/calib/camera/gainselection.py | 5 ++++- ctapipe/calib/camera/tests/test_gainselection.py | 12 ++++++------ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/ctapipe/calib/camera/gainselection.py b/ctapipe/calib/camera/gainselection.py index 52ad01d0d08..1b140f77434 100644 --- a/ctapipe/calib/camera/gainselection.py +++ b/ctapipe/calib/camera/gainselection.py @@ -15,6 +15,9 @@ class GainChannel(IntEnum): + """ + Possible gain channels + """ HIGH = 0 LOW = 1 @@ -44,7 +47,7 @@ def __call__(self, waveforms): if waveforms.ndim == 2: # Return if already gain selected return waveforms elif waveforms.ndim == 3: - n_channels, n_pixels, n_samples = waveforms.shape + n_channels, n_pixels, _ = waveforms.shape if n_channels == 1: # Reduce if already single channel return waveforms[0] else: diff --git a/ctapipe/calib/camera/tests/test_gainselection.py b/ctapipe/calib/camera/tests/test_gainselection.py index f694a63bbaf..2ee34a2042d 100644 --- a/ctapipe/calib/camera/tests/test_gainselection.py +++ b/ctapipe/calib/camera/tests/test_gainselection.py @@ -13,12 +13,12 @@ def test_gain_selector(): waveforms = np.indices(shape)[1] waveforms[1] *= 2 - gs = TestGainSelector() - waveforms_gs = gs(waveforms) + gain_selector = TestGainSelector() + waveforms_gs = gain_selector(waveforms) np.testing.assert_equal(waveforms[GainChannel.HIGH], waveforms_gs) - waveforms_gs = gs(waveforms[0]) + waveforms_gs = gain_selector(waveforms[0]) np.testing.assert_equal(waveforms_gs, waveforms[0]) - waveforms_gs = gs(waveforms[[0]]) + waveforms_gs = gain_selector(waveforms[[0]]) np.testing.assert_equal(waveforms_gs, waveforms[0]) @@ -42,7 +42,7 @@ def test_threshold_gain_selector(): waveforms[1] = 1 waveforms[0, 0] = 100 - gs = ThresholdGainSelector(threshold=50) - waveforms_gs = gs(waveforms) + gain_selector = ThresholdGainSelector(threshold=50) + waveforms_gs = gain_selector(waveforms) assert (waveforms_gs[0] == 1).all() assert (waveforms_gs[np.arange(1, 2048)] == 0).all()