Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

signal extractor SlidingWindowMaxSum #1568

Merged
merged 15 commits into from
Feb 11, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 28 additions & 13 deletions ctapipe/image/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,13 +619,8 @@ def _calculate_correction(self, telid):
This method is decorated with @lru_cache to ensure it is only
calculated once per telescope.

WARNING: TO BE DONE properly, the current code reuses the function of
LocalPeakWindowSum assuming that the pulse is symmetric (which normally is not
true). The proper approach would be to apply a similar sliding window over the
reference pulse shape and compute the maximum fraction of the pulse contained
in window of a given size. Thus the correction for the leakage of signal outside
of the integration window is slightly overestimated.

The same procedure as for the actual SlidingWindowMaxSum extractor is used, but
on the reference pulse_shape (that is also more finely binned)

Parameters
----------
Expand All @@ -640,14 +635,34 @@ def _calculate_correction(self, telid):
"""

readout = self.subarray.tel[telid].camera.readout
return integration_correction(
readout.reference_pulse_shape,
readout.reference_pulse_sample_width.to_value("ns"),
(1 / readout.sampling_rate).to_value("ns"),
self.window_width.tel[telid],
self.window_width.tel[telid] // 2, # assuming that shift = 0.5 * window

# compute the number of slices to integrate in the pulse template
width_shape = int(
round(
(
self.window_width.tel[telid]
/ readout.sampling_rate
/ readout.reference_pulse_sample_width
)
.to("")
.value
)
)

n_channels = len(readout.reference_pulse_shape)
correction = np.ones(n_channels, dtype=np.float)
for ichannel, pulse_shape in enumerate(readout.reference_pulse_shape):

# apply the same method as sliding window to find the highest sum
cwf = np.cumsum(pulse_shape)
# add zero at the begining so it is easier to substract the two arrays later
cwf = np.concatenate((np.zeros(1), cwf))
sums = cwf[width_shape:] - cwf[:-width_shape]
maxsum = np.max(sums)
correction[ichannel] = np.sum(pulse_shape) / maxsum

return correction

def __call__(self, waveforms, telid, selected_gain_channel):
charge, peak_time = extract_sliding_window(
waveforms, self.window_width.tel[telid], self.sampling_rate[telid]
Expand Down
52 changes: 52 additions & 0 deletions ctapipe/image/tests/test_sliding_window_correction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import numpy as np
import astropy.units as u
from numpy.testing import assert_allclose

from ctapipe.image.extractor import SlidingWindowMaxSum
from ctapipe.image.toymodel import WaveformModel
from ctapipe.instrument import SubarrayDescription, TelescopeDescription

from ctapipe.utils import datasets

datasets.DEFAULT_URL = "http://cccta-dataserver.in2p3.fr/data/ctapipe-extra/v0.3.2/"
Copy link
Contributor

@kosack kosack Feb 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't really set this globally, since it affects all other tests (and definitely should not be committed when we merge the PR). Perhaps for now, you might want to use the monkeypatch test fixture instead, something like:

def test_xxx(monkeypatch):
    with monkeypatch.context() as m:
         m.setattr(datasets, "DEFAULT_URL", "http://cccta-dataserver.in2p3.fr/data/ctapipe-extra/v0.3.2/")
         # the rest of the test

see https://docs.pytest.org/en/stable/monkeypatch.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I put this test explicitely into a separate file to avoid affecting other files with a global variable, because I thought that each file is run independently.
either way, now modified as suggested by you



def test_sw_pulse_lst():

# prepare array with 1 LST
subarray = SubarrayDescription(
"LST1",
tel_positions={1: np.zeros(3) * u.m},
tel_descriptions={
1: TelescopeDescription.from_name(optics_name="LST", camera_name="LSTCam")
},
)

telid = list(subarray.tel.keys())[0]

n_pixels = subarray.tel[telid].camera.geometry.n_pixels
n_samples = 40
readout = subarray.tel[telid].camera.readout

random = np.random.RandomState(1)
minCharge = 100
maxCharge = 1000
charge_true = random.uniform(minCharge, maxCharge, n_pixels)
time_true = random.uniform(
n_samples // 2 - 1, n_samples // 2 + 1, n_pixels
) / readout.sampling_rate.to_value(u.GHz)

waveform_model = WaveformModel.from_camera_readout(readout)
waveform = waveform_model.get_waveform(charge_true, time_true, n_samples)
selected_gain_channel = np.zeros(charge_true.size, dtype=np.int)

# define extractor
extr_width = 8
extractor = SlidingWindowMaxSum(subarray=subarray)
extractor.window_width = extr_width
charge, peak_time = extractor(waveform, telid, selected_gain_channel)
print(charge / charge_true)
assert_allclose(charge, charge_true, rtol=0.02)


test_sw_pulse_lst()