-
Notifications
You must be signed in to change notification settings - Fork 19
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
sotodlib.coords: pointing and coordinate support
Integrates so3g accelerated pointing code with tod-in-AxisManager.
- Loading branch information
Showing
3 changed files
with
253 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
from .pmat import P | ||
from .helpers import get_radec, get_horiz, get_footprint, get_wcs_kernel, DEG |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
import so3g.proj | ||
import numpy as np | ||
from pixell import enmap | ||
|
||
|
||
DEG = np.pi/180 | ||
|
||
def _find_field(axisman, default, provided): | ||
"""This is a utility function for the pattern where a default should | ||
be extracted from an AxisManager, unless an alternative key name | ||
has been passed in, or simply an alternative array of values. | ||
""" | ||
if provided is None: | ||
provided = default | ||
if isinstance(provided, str): | ||
return axisman[provided] | ||
return provided | ||
|
||
def get_radec(tod, wrap=False, dets=None, timestamps=None, focal_plane=None, | ||
boresight=None): | ||
dets = _find_field(tod, tod.dets.vals, dets) | ||
timestamps = _find_field(tod, 'timestamps', timestamps) | ||
boresight = _find_field(tod, 'boresight', boresight) | ||
fp = _find_field(tod, 'focal_plane', focal_plane) | ||
sight = so3g.proj.CelestialSightLine.az_el( | ||
timestamps, boresight.az, boresight.el, roll=boresight.roll, | ||
site='so', weather='typical') | ||
fp = so3g.proj.FocalPlane.from_xieta(dets, fp.xi, fp.eta, fp.gamma) | ||
asm = so3g.proj.Assembly.attach(sight, fp) | ||
output = np.zeros((len(dets), len(timestamps), 4)) | ||
proj = so3g.proj.Projectionist() | ||
proj.get_coords(asm, output=output) | ||
if wrap is True: | ||
wrap = 'radec' | ||
if wrap: | ||
tod.wrap(wrap, output, [(0, 'dets'), (1, 'samps')]) | ||
return output | ||
|
||
def get_horiz(tod, wrap=False, dets=None, timestamps=None, focal_plane=None, | ||
boresight=None): | ||
dets = _find_field(tod, tod.dets.vals, dets) | ||
timestamps = _find_field(tod, 'timestamps', timestamps) | ||
boresight = _find_field(tod, 'boresight', boresight) | ||
fp = _find_field(tod, 'focal_plane', focal_plane) | ||
|
||
sight = so3g.proj.CelestialSightLine.for_horizon( | ||
timestamps, boresight.az, boresight.el, roll=boresight.roll) | ||
|
||
fp = so3g.proj.FocalPlane.from_xieta(dets, fp.xi, fp.eta, fp.gamma) | ||
asm = so3g.proj.Assembly.attach(sight, fp) | ||
output = np.zeros((len(dets), len(timestamps), 4)) | ||
proj = so3g.proj.Projectionist() | ||
proj.get_coords(asm, output=output) | ||
# The lonlat pair is (-az, el), so restore the az sign. | ||
output[:,:,0] *= -1 | ||
if wrap is True: | ||
wrap = 'horiz' | ||
if wrap: | ||
tod.wrap(wrap, output, [(0, 'dets'), (1, 'samps')]) | ||
return output | ||
|
||
def get_wcs_kernel(proj, ra, dec, res): | ||
"""Get a WCS "kernel" -- this is a WCS holding a single pixel | ||
centered on CRVAL. | ||
This interface _will_ change. | ||
""" | ||
assert np.isscalar(res) # This ain't enlib. | ||
_, wcs = enmap.geometry(np.array((dec, ra)), shape=(1,1), proj=proj, | ||
res=(res, -res)) | ||
return wcs | ||
|
||
def get_footprint(tod, wcs_kernel, dets=None, timestamps=None, boresight=None, | ||
focal_plane=None): | ||
"""Find a geometry (in the sense of enmap) based on wcs_kernel that is | ||
big enough to contain all data from tod. Returns (shape, wcs). | ||
""" | ||
dets = _find_field(tod, tod.dets.vals, dets) | ||
timestamps = _find_field(tod, 'timestamps', timestamps) | ||
boresight = _find_field(tod, 'boresight', boresight) | ||
fp0 = _find_field(tod, 'focal_plane', focal_plane) | ||
|
||
# Do a simplest convex hull... | ||
q = so3g.proj.quat.rotation_xieta(fp0.xi, fp0.eta) | ||
xi, eta, _ = so3g.proj.quat.decompose_xieta(q) | ||
xi0, eta0 = xi.mean(), eta.mean() | ||
R = ((xi - xi0)**2 + (eta - eta0)**2).max()**.5 | ||
|
||
n_circ = 16 | ||
dphi = 2*np.pi/n_circ | ||
phi = np.arange(n_circ) * dphi | ||
L = 1.01 * R / np.cos(dphi/2) | ||
xi, eta = L * np.cos(phi) + xi0, L * np.sin(phi) + eta0 | ||
fake_dets = ['hull%i' % i for i in range(n_circ)] | ||
fp1 = so3g.proj.FocalPlane.from_xieta(fake_dets, xi, eta, 0*xi) | ||
|
||
sight = so3g.proj.CelestialSightLine.az_el( | ||
timestamps, boresight.az, boresight.el, roll=boresight.roll, | ||
site='so', weather='typical') | ||
asm = so3g.proj.Assembly.attach(sight, fp1) | ||
output = np.zeros((len(fake_dets), len(timestamps), 4)) | ||
proj = so3g.proj.Projectionist.for_geom((1,1), wcs_kernel) | ||
proj.get_planar(asm, output=output) | ||
|
||
output2 = output*0 | ||
proj.get_coords(asm, output=output2) | ||
|
||
delts = wcs_kernel.wcs.cdelt * DEG | ||
planar = [output[:,:,0], output[:,:,1]] | ||
# Get the extrema.. | ||
ranges = [(p.min()/d, p.max()/d) for p, d in zip(planar, delts)] | ||
del output | ||
|
||
# Start a new WCS and set the lower left corner. | ||
w = wcs_kernel.copy() | ||
corner = [int(np.floor(min(r)+.5)) for r in ranges] | ||
w.wcs.crpix = [1 - corner[0], 1 - corner[1]] | ||
|
||
# The other corner helps us with the shape... | ||
far_corner = [int(np.floor(max(r)+.5)) for r in ranges] | ||
shape = tuple([fc - c + 1 for fc, c in zip(far_corner, corner)][::-1]) | ||
return (shape, w) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
import so3g.proj | ||
import numpy as np | ||
import scipy | ||
|
||
from .helpers import _find_field | ||
|
||
|
||
class P: | ||
"""Pointing Matrix. This class caches certain pre-computed | ||
information (such as rotation taking boresight coordinates to | ||
celestial coordinates) and certain context-dependent settings | ||
(such as a map shape, WCS, and spin-component configuration), and | ||
provides easy access to projection, deprojection, and coordinate | ||
routines. | ||
""" | ||
@classmethod | ||
def for_geom(cls, tod, geom, comps='TQU', | ||
dets=None, timestamps=None, focal_plane=None, boresight=None): | ||
dets = _find_field(tod, tod.dets.vals, dets) | ||
timestamps = _find_field(tod, 'timestamps', timestamps) | ||
boresight = _find_field(tod, 'boresight', boresight) | ||
fp = _find_field(tod, 'focal_plane', focal_plane) | ||
sight = so3g.proj.CelestialSightLine.az_el( | ||
timestamps, boresight.az, boresight.el, roll=boresight.roll, | ||
site='so', weather='typical') | ||
fp = so3g.proj.FocalPlane.from_xieta(dets, fp.xi, fp.eta, fp.gamma) | ||
asm = so3g.proj.Assembly.attach(sight, fp) | ||
|
||
self = cls() | ||
self.sight = sight | ||
self.asm = asm | ||
self.default_comps = comps | ||
self.geom = geom | ||
return self | ||
|
||
def comp_count(self, comps=None): | ||
if comps is None: | ||
comps = self.default_comps | ||
return len(comps) | ||
|
||
def zeros(self, super_shape=None, comps=None): | ||
if super_shape is None: | ||
super_shape = (self.comp_count(comps), ) | ||
proj = so3g.proj.Projectionist.for_geom(*self.geom) | ||
return proj.zeros(super_shape) | ||
|
||
def to_map(self, tod=None, dest=None, comps=None, signal=None, det_weights=None): | ||
signal = _find_field(tod, 'signal', signal) | ||
det_weights = _find_field(tod, None, det_weights) # note defaults to None | ||
|
||
if comps is None: | ||
comps = self.default_comps | ||
if dest is None: | ||
dest = self.zeros(comps=comps) | ||
|
||
proj = so3g.proj.Projectionist.for_geom(self.geom[0], self.geom[1]) | ||
|
||
proj.to_map(signal, self.asm, output=dest, det_weights=det_weights, comps=comps) | ||
return dest | ||
|
||
def to_weights(self, tod=None, dest=None, comps=None, signal=None, det_weights=None): | ||
det_weights = _find_field(tod, None, det_weights) # note defaults to None | ||
|
||
if comps is None: | ||
comps = self.default_comps | ||
if dest is None: | ||
_n = self.comp_count(comps) | ||
dest = self.zeros((_n, _n)) | ||
|
||
proj = so3g.proj.Projectionist.for_geom(*self.geom) | ||
|
||
proj.to_weights(self.asm, output=dest, det_weights=det_weights, comps=comps) | ||
return dest | ||
|
||
def to_inverse_weights(self, weights_map=None, | ||
tod=None, dest=None, comps=None, signal=None, det_weights=None): | ||
if weights_map is None: | ||
weights_map = self.to_weights(tod=tod, comps=comps, signal=signal, | ||
det_weights=det_weights) | ||
# Invert in each pixel. | ||
if dest is None: | ||
dest = weights_map * 0 | ||
if weights_map.shape[0] == 1: | ||
s = weights_map[0,0] != 0 | ||
dest[0,0][s] = 1./weights_map[0,0][s] | ||
else: | ||
temp_shape = weights_map.shape[:2] + (-1,) | ||
w_reshaped = weights_map.reshape(temp_shape) | ||
dest.shape = temp_shape | ||
# This is a strong candidate for C-ification. | ||
for i in range(dest.shape[2]): | ||
dest[:,:,i] = scipy.linalg.pinvh(w_reshaped[:,:,i], lower=False) | ||
dest.shape = weights_map.shape | ||
return dest | ||
|
||
def remove_weights(self, signal_map=None, weights_map=None, | ||
inverse_weights_map=None, dest=None, | ||
**kwargs): | ||
if inverse_weights_map is None: | ||
inverse_weights_map = self.to_inverse_weights( | ||
weights_map=weights_map, **kwargs) | ||
if signal_map is None: | ||
signal_map = self.to_map(**kwargs) | ||
if dest is None: | ||
dest = np.empty(signal_map.shape, signal_map.dtype) | ||
# Is there really no way to avoid looping over pixels? | ||
iw = inverse_weights_map.reshape(inverse_weights_map.shape[:2] + (-1,)) | ||
sm = signal_map.reshape(signal_map.shape[:1] + (-1,)) | ||
dest.shape = sm.shape | ||
for i in range(sm.shape[1]): | ||
dest[:,i] = np.dot(iw[:,:,i], sm[:,i]) | ||
dest.shape = signal_map.shape | ||
return dest | ||
|
||
def from_map(self, signal_map, tod=None, dest=None, comps=None, | ||
wrap=None): | ||
proj = so3g.proj.Projectionist.for_geom(*self.geom) | ||
if comps is None: | ||
comps = self.default_comps | ||
tod_shape = (len(self.asm.dets), len(self.asm.Q)) | ||
if dest is None: | ||
dest = np.zeros(tod_shape, 'float32') | ||
assert(dest.shape == tod_shape) # P.asm and dest argument disagree | ||
proj.from_map(signal_map, self.asm, signal=dest, comps=comps) | ||
return dest |