Skip to content

Commit

Permalink
Merge pull request #564 from simonsobs/planet_azel_coords
Browse files Browse the repository at this point in the history
Add function to derive the az el of planets
  • Loading branch information
tterasaki authored Oct 16, 2023
2 parents 81f3eff + feca206 commit 9f9b8af
Showing 1 changed file with 103 additions and 43 deletions.
146 changes: 103 additions & 43 deletions sotodlib/coords/planets.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class SlowSource:
precision: not implemented, don't worry about it.
"""

def __init__(self, timestamp, ra, dec, v_ra=0., v_dec=0.,
precision=.0001 * coords.DEG):
self.timestamp = timestamp
Expand All @@ -50,7 +51,6 @@ def __init__(self, timestamp, ra, dec, v_ra=0., v_dec=0.,

@classmethod
def for_named_source(cls, name, timestamp):

"""Returns a SlowSource for planet ``name``, with position and
peculiar velocity measured at time timestamp (float, unix).
Expand All @@ -75,6 +75,7 @@ def pos(self, timestamps):
dt = timestamps - self.timestamp
return self.ra + self.v_ra * dt, self.dec + self.v_dec * dt


def get_scan_q(tod, planet, refq=None):
"""Identify the point (in time and azimuth space) at which the
specified planet crosses the boresight elevation for the
Expand All @@ -99,7 +100,8 @@ def get_scan_q(tod, planet, refq=None):
planet = SlowSource.for_named_source(planet, t)

def scan_q_model(t, az, el, planet):
csl = so3g.proj.CelestialSightLine.az_el(t, az, el, weather='typical', site='so')
csl = so3g.proj.CelestialSightLine.az_el(
t, az, el, weather='typical', site='so')
ra0, dec0 = planet.pos(t)
return csl.Q, ~so3g.proj.quat.rotation_lonlat(ra0, dec0) * csl.Q

Expand Down Expand Up @@ -128,6 +130,7 @@ def distance(p):
'psi': psi,
'planet': planet}


def get_scan_P(tod, planet, refq=None, res=None, size=None, **kw):
"""Get a standard Projection Matrix targeting a planet (or some
interesting fixed position), in source-scan coordinates.
Expand All @@ -151,6 +154,7 @@ def get_scan_P(tod, planet, refq=None, res=None, size=None, **kw):
P.geom = enmap.Geometry(shape=mz.shape, wcs=mz.wcs)
return P, X


def filter_for_sources(tod=None, signal=None, source_flags=None,
n_modes=10, low_pass=None,
wrap=None):
Expand Down Expand Up @@ -217,11 +221,12 @@ def filter_for_sources(tod=None, signal=None, source_flags=None,
# It's convenient to remove the same levels from signal now,
# too.
levels = signal_pca.mean(axis=1)
signal_pca -= levels[:,None]
signal -= levels[:,None]
signal_pca -= levels[:, None]
signal -= levels[:, None]

# Get PCA model and discard the source vectors.
pca = tod_ops.pca.get_pca_model(tod, signal=signal_pca, n_modes=n_modes)
pca = tod_ops.pca.get_pca_model(
tod, signal=signal_pca, n_modes=n_modes)
del signal_pca

# Remove the PCA model.
Expand All @@ -231,14 +236,14 @@ def filter_for_sources(tod=None, signal=None, source_flags=None,
tod.wrap(wrap, signal, [(0, 'dets'), (1, 'samps')])
return signal

def get_source_pos(source_name, timestamp, site='_default'):
"""Get the equatorial coordinates of a planet (or fixed-position
source, see note) at some time. Returns the apparent position,
accounting for geographical position on earth, but assuming no
atmospheric refraction.
Note that this will download a 16M ephemeris file on first use.
def _get_astrometric(source_name, timestamp, site='_default'):
"""
Derive skyfield's Astrometric object of a celestial source at a
specific timestamp and observing site, which is used to derive
radec/azel in get_source_pos/get_source_azel.
Note that it will download a 16M ephemeris file on first use.
Args:
source_name: Planet name; in capitalized format, e.g. "Jupiter",
or fixed source specification.
Expand All @@ -247,25 +252,8 @@ def get_source_pos(source_name, timestamp, site='_default'):
site will be looked up in so3g.proj.SITES dict.
Returns:
ra (float): in radians.
dec (float): in radians.
distance (float): in AU.
Note:
Before checking in the ephemeris, the source_name will be
matched against a regular expression and if it has the format
'Jxxx[+-]yyy', where xxx and yyy are decimal numbers, then a
fixed-position source at RA,Dec = xxx,yyy in degrees will be
processed. In that case, the distance is returned as Inf.
astrometric: skyfield's astrometric object
"""
# Check against fixed-position template...
m = re.match(r'J(?P<ra_deg>\d+(\.\d*)?)(?P<dec_deg>[+-]\d+(\.\d*)?)', source_name)
if m:
ra, dec = float(m['ra_deg']) * coords.DEG, float(m['dec_deg']) * coords.DEG
return ra, dec, float('inf')

# Get the ephemeris -- this will trigger a 16M download on first use.
de_url = RESOURCE_URLS['de421.bsp']
de_filename = au_data.download_file(de_url, cache=True)
Expand Down Expand Up @@ -293,10 +281,75 @@ def get_source_pos(source_name, timestamp, site='_default'):
timescale = skyfield_api.load.timescale()
sf_timestamp = timescale.from_datetime(
datetime.datetime.fromtimestamp(timestamp, tz=skyfield_api.utc))
amet0 = observatory.at(sf_timestamp).observe(target)
astrometric = observatory.at(sf_timestamp).observe(target)
return astrometric


def get_source_pos(source_name, timestamp, site='_default'):
"""Get the equatorial coordinates of a planet (or fixed-position
source, see note) at some time. Returns the apparent position,
accounting for geographical position on earth, but assuming no
atmospheric refraction.
Args:
source_name: Planet name; in capitalized format, e.g. "Jupiter",
or fixed source specification.
timestamp: unix timestamp.
site (str or so3g.proj.EarthlySite): if this is a string, the
site will be looked up in so3g.proj.SITES dict.
Returns:
ra (float): in radians.
dec (float): in radians.
distance (float): in AU.
Note:
Before checking in the ephemeris, the source_name will be
matched against a regular expression and if it has the format
'Jxxx[+-]yyy', where xxx and yyy are decimal numbers, then a
fixed-position source at RA,Dec = xxx,yyy in degrees will be
processed. In that case, the distance is returned as Inf.
"""
# Check against fixed-position template...
m = re.match(
r'J(?P<ra_deg>\d+(\.\d*)?)(?P<dec_deg>[+-]\d+(\.\d*)?)', source_name)
if m:
ra, dec = float(m['ra_deg']) * \
coords.DEG, float(m['dec_deg']) * coords.DEG
return ra, dec, float('inf')

# Derive from skyfield astrometric object
amet0 = _get_astrometric(source_name, timestamp, site)
ra, dec, distance = amet0.radec()
return ra.to(units.rad).value, dec.to(units.rad).value, distance.to(units.au).value


def get_source_azel(source_name, timestamp, site='_default'):
"""
Get the apparent azimuth and elevation of a celestial source at a
specific timestamp and observing site. Returns the apparent position,
accounting for geographical position on earth, but assuming no
atmospheric refraction.
Args:
source_name: Planet name; in capitalized format, e.g. "Jupiter"
timestamp (float): The Unix timestamp representing the time for
which to calculate azimuth and elevation.
site (str or so3g.proj.EarthlySite): if this is a string, the
site will be looked up in so3g.proj.SITES dict.
Returns:
az (float): in radians.
el (float): in radians.
distance (float): in AU.
"""
amet0 = _get_astrometric(source_name, timestamp, site)
el, az, distance = amet0.apparent().altaz()
return az.to(units.rad).value, el.to(units.rad).value, distance.to(units.au).value


def get_nearby_sources(tod=None, source_list=None, distance=1.):
"""Identify solar system objects (especially "planets") that might be
within a TOD's scan footprint.
Expand Down Expand Up @@ -329,11 +382,12 @@ def get_nearby_sources(tod=None, source_list=None, distance=1.):

# One central detector
xieta0, R, _ = coords.helpers.get_focal_plane_cover(tod, 0)
fp = so3g.proj.FocalPlane.from_xieta(['x'], [xieta0[0]],[xieta0[1]],[0])
fp = so3g.proj.FocalPlane.from_xieta(['x'], [xieta0[0]], [xieta0[1]], [0])

asm = so3g.proj.Assembly.attach(sight, fp)
p = so3g.proj.Projectionist.for_geom(shape, wcs)
w = p.to_map(np.zeros((1,len(tod.timestamps)), 'float32')+1, asm, comps='T')
w = p.to_map(np.zeros((1, len(tod.timestamps)),
'float32')+1, asm, comps='T')
w = enmap.enmap(w, wcs=wcs)

if source_list is None:
Expand All @@ -357,8 +411,8 @@ def get_nearby_sources(tod=None, source_list=None, distance=1.):
float(dec) * coords.DEG)
else:
sl = coords.planets.SlowSource.for_named_source(source_name, t)
x = w.distance_from([[sl.dec],[sl.ra]])
md = x[w[0]!=0].min()
x = w.distance_from([[sl.dec], [sl.ra]])
md = x[w[0] != 0].min()
logger.debug(('Source {:12} is at ({:8.4f},{:8.4f}); '
'that is {:5.2f} degrees off footprint.').format(
source_name, sl.ra / coords.DEG,
Expand All @@ -367,6 +421,7 @@ def get_nearby_sources(tod=None, source_list=None, distance=1.):
positions.append((source_name, sl))
return positions


def compute_source_flags(tod=None, P=None, mask=None, wrap=None,
center_on=None, res=None, max_pix=4e6):
"""Process masking instructions and create RangesMatrix that flags
Expand Down Expand Up @@ -422,13 +477,14 @@ def compute_source_flags(tod=None, P=None, mask=None, wrap=None,
_add_to_mask(mask, mask_map)
a = P.from_map(mask_map)
source_flags = so3g.proj.RangesMatrix(
[so3g.proj.Ranges.from_mask(r!=0) for r in a])
[so3g.proj.Ranges.from_mask(r != 0) for r in a])

if wrap:
asssert(tod is not None, "Pass in a tod to 'wrap' the output.")
tod.wrap(wrap, source_flags, [(0, 'dets'), (1, 'samps')])
return source_flags


def _add_to_mask(req, mask_map):
# Helper for compute_source_flags.
if req is None:
Expand All @@ -443,12 +499,13 @@ def _add_to_mask(req, mask_map):
x, y, r = req['xyr']
d = enmap.distance_from(mask_map.shape, mask_map.wcs,
[[y * coords.DEG], [x * coords.DEG]])
mask_map += 1.* (d < r * coords.DEG)
mask_map += 1. * (d < r * coords.DEG)
else:
raise ValueError(f'Unknown shape="{shape}" in mask request.')
else:
raise ValueError(f'Weird mask request: {req}')


def load_detector_splits(tod=None, filename=None, dataset=None,
source=None, wrap=None):
"""Convert a partition of detectors into a dict of disjoint
Expand Down Expand Up @@ -512,10 +569,11 @@ def load_detector_splits(tod=None, filename=None, dataset=None,
if group in flags:
continue
flags[group] = so3g.proj.RangesMatrix.ones((tod.signal.shape))
for i in (source['group']==group).nonzero()[0]:
for i in (source['group'] == group).nonzero()[0]:
flags[group].ranges[i] = yes
return flags


def get_det_weights(tod, signal=None, wrap=None,
outlier_clip=None):
"""Compute detector weights, based on variance of signal. If
Expand All @@ -537,9 +595,10 @@ def get_det_weights(tod, signal=None, wrap=None,
qs = np.quantile(det_weights[ok], [.16, .84]) # "1 sigma" lims
q0, dq = qs.mean(), np.diff(qs)[0]/2
ok *= ((q0 - outlier_clip * dq < det_weights) *
(det_weights < q0 + outlier_clip * dq))
(det_weights < q0 + outlier_clip * dq))
return det_weights * ok


def write_det_weights(tod, filename, dataset, det_weights=None):
"""Save detector weights to an HDF file dataset.
Expand Down Expand Up @@ -580,7 +639,7 @@ def make_map(tod, center_on=None, scan_coords=True, thread_algo=False,
case) must be passed through info.
"""
assert(center_on is not None) # Pass in the source name, e.g. 'uranus'
assert (center_on is not None) # Pass in the source name, e.g. 'uranus'

# Test the map format string...
if filename is not None:
Expand All @@ -606,7 +665,8 @@ class MmTimer(coords.Timer):
comps=comps, cuts=cuts,
threads=thread_algo)
else:
planet = coords.planets.SlowSource.for_named_source(center_on, tod.timestamps[0])
planet = coords.planets.SlowSource.for_named_source(
center_on, tod.timestamps[0])
ra0, dec0 = planet.pos(tod.timestamps.mean())
wcsk = coords.get_wcs_kernel('tan', ra0, dec0, res=res)
P = coords.P.for_tod(tod, comps=comps,
Expand Down Expand Up @@ -679,4 +739,4 @@ class MmTimer(coords.Timer):
'solved': map1b,
'P': P,
'det_weights': det_weights,
}
}

0 comments on commit 9f9b8af

Please sign in to comment.