Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Impact intersection #778

Merged
merged 11 commits into from
Oct 10, 2018
219 changes: 65 additions & 154 deletions ctapipe/reco/HillasReconstructor.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,24 @@ def normalise(vec):
return vec


def line_line_intersection_3d(uvw_vectors, origins):
'''
Intersection of many lines in 3d.
See https://en.wikipedia.org/wiki/Line%E2%80%93line_intersection
'''
C = []
S = []
for n, pos in zip(uvw_vectors, origins):
n = n.reshape((3, 1))
norm_matrix = (n @ n.T) - np.eye(3)
C.append(norm_matrix @ pos)
S.append(norm_matrix)

S = np.array(S).sum(axis=0)
C = np.array(C).sum(axis=0)
return np.linalg.inv(S) @ C


class HillasReconstructor(Reconstructor):
"""
class that reconstructs the direction of an atmospheric shower
Expand Down Expand Up @@ -120,20 +138,21 @@ class are set to np.nan
direction, err_est_dir = self.estimate_direction()

# core position estimate using a geometric approach
pos, err_est_pos = self.estimate_core_position()
core_pos = self.estimate_core_position(hillas_dict)

# container class for reconstructed showers
result = ReconstructedShowerContainer()
_, lat, lon = cartesian_to_spherical(*direction)

# estimate max height of shower
h_max = self.estimate_h_max(hillas_dict, inst.subarray, pointing_alt, pointing_az)
h_max = self.estimate_h_max()


result.alt, result.az = lat, lon
result.core_x = pos[0]
result.core_y = pos[1]
result.core_uncert = err_est_pos
# astropy's coordinates system rotates counter-clockwise.
# Apparently we assume it to be clockwise.
result.alt, result.az = lat, -lon
result.core_x = core_pos[0]
result.core_y = core_pos[1]
result.core_uncert = np.nan

result.tel_ids = [h for h in hillas_dict.keys()]
result.average_size = np.mean([h.intensity for h in hillas_dict.values()])
Expand Down Expand Up @@ -248,163 +267,53 @@ def estimate_direction(self):



def estimate_core_position(self):
r"""calculates the core position as the least linear square solution
of an (over-constrained) equation system

Notes
-----
The basis is the "trace" of each telescope's `HillasPlane` which
can be determined by the telescope's position P=(Px, Py) and
the circle's normal vector, projected to the ground n=(nx,
ny), so that for every r=(x, y) on the trace

:math:`\vec n \cdot \vec r = \vec n \cdot \vec P` ,

:math:`n_x \cdot x + n_y \cdot y = d`

In a perfect world, the traces of all telescopes cross in the
shower's point of impact. This means that there is one common
point (x, y) for every telescope, so we can write in matrix
form:

.. math::
:label: fullmatrix

\begin{pmatrix}
nx_1 & ny_1 \\
\vdots & \vdots \\
nx_n & ny_n
\end{pmatrix}
\cdot (x, y) =
\begin{pmatrix}
d_1 \\
\vdots \\
d_n
\end{pmatrix}



or :math:`\boldsymbol{A} \cdot \vec r = \vec D` .

Since we do not live in a perfect world and there probably is
no point r that fulfils this equation system, it is solved by
the method of least linear square:

.. math::
:label: rchisqr
def estimate_core_position(self, hillas_dict):
'''
Estimate the core position by intersection the major ellipse lines of each telescope.

\vec{r}_{\chi^2} = (\boldsymbol{A}^\text{T} \cdot \boldsymbol{A})^{-1}
\boldsymbol{A}^\text{T} \cdot \vec D


:math:`\vec{r}_{\chi^2}` minimises the squared difference of


.. math::

\vec D - \boldsymbol{A} \cdot \vec r


Weights are applied to every line of equation :eq:`fullmatrix`
as stored in circle.weight (assuming they have been set in
`get_great_circles` or elsewhere).
Parameters
-----------
hillas_dict : dictionary
dictionary of hillas moments
subarray : ctapipe.instrument.SubarrayDescription
subarray information

Returns
-------
r_chisqr: numpy.ndarray(2)
the minimum :math:`\chi^2` solution for the shower impact position
pos_uncert: astropy length quantity
error estimate on the reconstructed core position

"""

A = np.zeros((len(self.hillas_planes), 2))
D = np.zeros(len(self.hillas_planes))
for i, circle in enumerate(self.hillas_planes.values()):
# apply weight from circle and from the tilt of the circle
# towards the horizontal plane: simply projecting
# circle.norm to the ground gives higher weight to planes
# perpendicular to the ground and less to those that have
# a steeper angle
A[i] = circle.weight * circle.norm[:2]
# since A[i] is used in the dot-product, no need to multiply the
# weight here
D[i] = np.dot(A[i], circle.pos[:2])

# the math from equation (2) would look like this:
# ATA = np.dot(A.T, A)
# ATAinv = np.linalg.inv(ATA)
# ATAinvAT = np.dot(ATAinv, A.T)
# return np.dot(ATAinvAT, D) * unit

# instead used directly the numpy implementation
# speed is the same, just handles already "SingularMatrixError"
if np.all(np.isfinite(A)) and np.all(np.isfinite(D)):
# note that NaN values create a value error with MKL
# installations but not otherwise.
pos = np.linalg.lstsq(A, D)[0] * u.m
else:
return [np.nan, np.nan], [np.nan, np.nan]

weighted_sum_dist = np.sum([np.dot(pos[:2] - c.pos[:2], c.norm[:2]) * c.weight
for c in self.hillas_planes.values()]) * pos.unit
norm_sum_dist = np.sum([c.weight * np.linalg.norm(c.norm[:2])
for c in self.hillas_planes.values()])
pos_uncert = abs(weighted_sum_dist / norm_sum_dist)

return pos, pos_uncert



def estimate_h_max(self, hillas_dict, subarray, pointing_alt, pointing_az):
weights = []
tels = []
dirs = []
-----------
astropy.unit.Quantity (wrapped numpy array) of shape 2

for tel_id, moments in hillas_dict.items():
'''

focal_length = subarray.tel[tel_id].optics.equivalent_focal_length
uvw_vectors = np.array([(np.cos(h.phi), np.sin(h.phi), 0) for h in hillas_dict.values()])
positions = [plane.pos for plane in self.hillas_planes.values()]

pointing = SkyCoord(
alt=pointing_alt[tel_id],
az=pointing_az[tel_id],
frame='altaz'
)

hf = HorizonFrame(
array_direction=pointing,
pointing_direction=pointing
)
cf = CameraFrame(
focal_length=focal_length,
array_direction=pointing,
pointing_direction=pointing
)
core_position = line_line_intersection_3d(uvw_vectors, positions)
# we are only intyerested in x and y
return core_position[:2] * u.m

cog_coord = SkyCoord(x=moments.x, y=moments.y, frame=cf)
cog_coord = cog_coord.transform_to(hf)

cog_direction = spherical_to_cartesian(1, cog_coord.alt, cog_coord.az)
cog_direction = np.array(cog_direction).ravel()

weights.append(self.hillas_planes[tel_id].weight)
tels.append(self.hillas_planes[tel_id].pos)
dirs.append(cog_direction)
def estimate_h_max(self):
'''
Estimate the max height by intersecting the lines of the cog directions of each telescope.

# minimising the test function
pos_max = minimize(dist_to_line3d, np.array([0, 0, 10000]),
args=(np.array(tels), np.array(dirs), np.array(weights)),
method='BFGS',
options={'disp': False}
).x
return pos_max[2] * u.m
Parameters
-----------
hillas_dict : dictionary
dictionary of hillas moments
subarray : ctapipe.instrument.SubarrayDescription
subarray information

Returns
-----------
astropy.unit.Quantity
the estimated max height
'''
uvw_vectors = np.array([plane.a for plane in self.hillas_planes.values()])
positions = [plane.pos for plane in self.hillas_planes.values()]

def dist_to_line3d(pos, tels, dirs, weights):
result = np.average(np.linalg.norm(np.cross((pos - tels), dirs), axis=1),
weights=weights)
return result
# not sure if its better to return the length of the vector of the z component
return np.linalg.norm(line_line_intersection_3d(uvw_vectors, positions))


class HillasPlane:
Expand Down Expand Up @@ -450,8 +359,10 @@ def __init__(self, p1, p2, telescope_position, weight=1):

self.pos = telescope_position

self.a = np.array(spherical_to_cartesian(1, p1.alt, p1.az)).ravel()
self.b = np.array(spherical_to_cartesian(1, p2.alt, p2.az)).ravel()
# astropy's coordinates system rotates counter clockwise. Apparently we assume it to
# be clockwise
self.a = np.array(spherical_to_cartesian(1, p1.alt, -p1.az)).ravel()
self.b = np.array(spherical_to_cartesian(1, p2.alt, -p2.az)).ravel()

# a and c form an orthogonal basis for the great circle
# not really necessary since the norm can be calculated
Expand Down
42 changes: 36 additions & 6 deletions ctapipe/reco/tests/test_HillasReconstructor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,44 @@ def test_estimator_results():
print("direction fit test minimise:", dir_fit_minimise)
print()

# performing the direction fit with the geometric algorithm
fitted_core_position, _ = fit.estimate_core_position()
print("direction fit test core position:", fitted_core_position)
print()

def test_h_max_results():
"""
creating some planes pointing in different directions (two
north-south, two east-west) and that have a slight position errors (+-
0.1 m in one of the four cardinal directions """

p1 = SkyCoord(alt=0 * u.deg, az=45 * u.deg, frame='altaz')
p2 = SkyCoord(alt=0 * u.deg, az=45 * u.deg, frame='altaz')
circle1 = HillasPlane(p1=p1, p2=p2, telescope_position=[0, 1, 0] * u.m)

p1 = SkyCoord(alt=0 * u.deg, az=90 * u.deg, frame='altaz')
p2 = SkyCoord(alt=0 * u.deg, az=90 * u.deg, frame='altaz')
circle2 = HillasPlane(p1=p1, p2=p2, telescope_position=[1, 0, 0] * u.m)

p1 = SkyCoord(alt=0 * u.deg, az=45 * u.deg, frame='altaz')
p2 = SkyCoord(alt=0 * u.deg, az=45 * u.deg, frame='altaz')
circle3 = HillasPlane(p1=p1, p2=p2, telescope_position=[0, -1, 0] * u.m)

p1 = SkyCoord(alt=0 * u.deg, az=90 * u.deg, frame='altaz')
p2 = SkyCoord(alt=0 * u.deg, az=90 * u.deg, frame='altaz')
circle4 = HillasPlane(p1=p1, p2=p2, telescope_position=[-1, 0, 0] * u.m)

# creating the fit class and setting the the great circle member
fit = HillasReconstructor()
fit.hillas_planes = {1: circle1, 2: circle2, 3: circle3, 4: circle4}


# performing the direction fit with the minimisation algorithm
# and a seed that is perpendicular to the up direction
h_max_reco = fit.estimate_h_max()
print("h max fit test minimise:", h_max_reco)


# the results should be close to the direction straight up
np.testing.assert_allclose(dir_fit_minimise, [0, 0, 1], atol=1e-3)
np.testing.assert_allclose(fitted_core_position.value, [0, 0], atol=1e-3)
np.testing.assert_allclose(h_max_reco, 0, atol=1e-8)
# np.testing.assert_allclose(fitted_core_position.value, [0, 0], atol=1e-3)



def test_reconstruction():
Expand Down