Skip to content

Commit

Permalink
Sat mapmaking refactor (#973)
Browse files Browse the repository at this point in the history
* Refactor of the demod mapmaking
* removing the script, which will be merged in a further commit
* Adding exception handling in obs_grouping
* Various fixes
  • Loading branch information
chervias authored Nov 8, 2024
1 parent b604949 commit a03ad3b
Show file tree
Hide file tree
Showing 2 changed files with 216 additions and 20 deletions.
224 changes: 209 additions & 15 deletions sotodlib/mapmaking/demod_mapmaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
you accumulate observations into the div and rhs maps. For examples
how to use look at docstring of DemodMapmaker.
"""
__all__ = ['DemodMapmaker','DemodSignal','DemodSignalMap']
import numpy as np
from pixell import enmap, utils, tilemap, bunch
__all__ = ['DemodMapmaker','DemodSignal','DemodSignalMap','make_demod_map']
import numpy as np, os
from pixell import enmap, utils as putils, tilemap, bunch, mpi
import so3g.proj

from .. import core
from .. import coords
from .utilities import recentering_to_quat_lonlat, evaluate_recentering, MultiZipper, unarr, safe_invert_div
from .utilities import import_optional, get_flags
Expand Down Expand Up @@ -213,7 +214,7 @@ def for_rectpix(cls, shape, wcs, comm, comps="TQU", name="sky", ofmt="{name}", o
ext : str, optional
The extension used for the files.
dtype : numpy.dtype
The data type to use for the time-ordered data.
The data type to use for the maps.
sys : str or None, optional
The coordinate system to map. Defaults to equatorial
recenter : str or None
Expand Down Expand Up @@ -338,22 +339,14 @@ def prepare(self):
self.hits = tilemap.redistribute(self.hits,self.comm)
else:
if self.comm is not None:
self.rhs = utils.allreduce(self.rhs, self.comm)
self.div = utils.allreduce(self.div, self.comm)
self.hits = utils.allreduce(self.hits,self.comm)
self.rhs = putils.allreduce(self.rhs, self.comm)
self.div = putils.allreduce(self.div, self.comm)
self.hits = putils.allreduce(self.hits,self.comm)
self.ready = True

@property
def ncomp(self): return len(self.comps)

def to_work(self, map):
if self.tiled: return tilemap.redistribute(map, self.comm, self.geo_work.active)
else: return map.copy()

def from_work(self, map):
if self.tiled: return tilemap.redistribute(map, self.comm, self.rhs.geometry.active)
else: return utils.allreduce(map, self.comm)

def write(self, prefix, tag, m):
if not self.output: return
oname = self.ofmt.format(name=self.name)
Expand Down Expand Up @@ -385,6 +378,207 @@ def write(self, prefix, tag, m):

return oname

def setup_demod_map(noise_model, shape=None, wcs=None, nside=None,
comm=mpi.COMM_WORLD, comps='TQU', split_labels=None,
singlestream=False, dtype_tod=np.float32,
dtype_map=np.float32, recenter=None, verbose=0):
"""
Setup the classes for demod mapmaking and return
a DemodMapmmaker object
"""
if shape is not None and wcs is not None:
if split_labels==None:
# this is the case where we did not request any splits at all
signal_map = DemodSignalMap.for_rectpix(shape, wcs, comm, comps=comps,
dtype=dtype_map, tiled=False,
ofmt="", singlestream=singlestream,
recenter=recenter )
else:
# this is the case where we asked for at least 2 splits (1 split set).
# We count how many split we'll make, we need to define the Nsplits
# maps inside the DemodSignalMap
Nsplits = len(split_labels)
signal_map = DemodSignalMap.for_rectpix(shape, wcs, comm, comps=comps,
dtype=dtype_map, tiled=False,
ofmt="", Nsplits=Nsplits,
singlestream=singlestream,
recenter=recenter)
elif nside is not None:
if split_labels==None:
# this is the case where we did not request any splits at all
signal_map = DemodSignalMap.for_healpix(nside, nside_tile='auto',
comps=comps, dtype=dtype_map,
ofmt="", singlestream=singlestream,
ext="fits.gz")
else:
# this is the case where we asked for at least 2 splits (1 split set).
# We count how many split we'll make, we need to define the Nsplits
# maps inside the DemodSignalMap
Nsplits = len(split_labels)
signal_map = DemodSignalMap.for_healpix(nside, nside_tile='auto',
comps=comps, dtype=dtype_map,
ofmt="", Nsplits=Nsplits,
singlestream=singlestream,
ext="fits.gz")
signals = [signal_map]
mapmaker = DemodMapmaker(signals, noise_model=noise_model,
dtype=dtype_tod,
verbose=verbose>0,
singlestream=singlestream)
return mapmaker

def write_demod_maps(prefix, data, split_labels=None):
"""
Write maps from data into files
"""
if split_labels==None:
# we have no splits, so we save index 0 of the lists
data.signal.write(prefix, "full_wmap", data.wmap[0])
data.signal.write(prefix, "full_weights", data.weights[0])
data.signal.write(prefix, "full_hits", data.signal.hits)
else:
# we have splits
Nsplits = len(split_labels)
for n_split in range(Nsplits):
data.signal.write(prefix, "%s_wmap"%split_labels[n_split],
data.wmap[n_split])
data.signal.write(prefix, "%s_weights"%split_labels[n_split],
data.weights[n_split])
data.signal.write(prefix, "%s_hits"%split_labels[n_split],
data.signal.hits[n_split])

def write_demod_info(oname, info, split_labels=None):
putils.mkdir(os.path.dirname(oname))
if split_labels==None:
bunch.write(oname+'_full_info.hdf', info[0])
else:
# we have splits
Nsplits = len(split_labels)
for n_split in range(Nsplits):
bunch.write(oname+'_%s_info.hdf'%split_labels[n_split], info[n_split])

def make_demod_map(context, obslist, noise_model, info,
preprocess_config, prefix, shape=None, wcs=None,
nside=None, comm=mpi.COMM_WORLD, comps="TQU", t0=0,
dtype_tod=np.float32, dtype_map=np.float32,
tag="", verbose=0, split_labels=None, L=None,
site='so_sat3', recenter=None, singlestream=False):
"""
Make a demodulated map from the list of observations in obslist.
Arguments
---------
context : str
File path to context used to load obs from.
obslist : dict
The obslist which is the output of the
mapmaking.obs_grouping.build_obslists, contains the information of the
single or multiple obs to map.
noise_model : sotodlib.mapmaking.Nmat
Noise model to pass to DemodMapmaker.
info : list
Information for the database, will be written as a .hdf file.
preprocess_config : dict
Dictionary with the config yaml file for the preprocess database.
prefix : str
Prefix for the output files
shape : tuple, optional
Shape of the geometry to use for mapping.
wcs : dict, optional
WCS kernel of the geometry to use for mapping.
nside : int, optional
Nside for healpix pixelization
comps : str, optional
Which components to map, only TQU supported for now.
t0 : int, optional
Ctime to use as the label in the map files.
dtype_tod : numpy.dtype, optional
The data type to use for the time-ordered data. Only tested
with float32.
dtype_map : numpy.dtype, optional
The data type to use for the maps.
tag : str, optional
Prefix tag for the logger.
verbose : bool, optional
split_labels : list or None, optional
A list of strings with the splits requested. If None then no splits
were asked for, i.e. we will produce one map.
L : logger, optional
Logger for printing on the screen.
site : str, optional
Platform name for the pointing matrix.
recenter : str or None
String to make object-centered maps, such as Moon/Sun/Planet centered maps.
Look at sotodlib.mapmaking.parse_recentering for details.
singlestream : Bool
If True, do not perform demodulated filter+bin mapmaking but
rather regular filter+bin mapmaking, i.e. map from obs.signal
rather than from obs.dsT, obs.demodQ, obs.demodU.
Returns
-------
errors : list
List of errors from preprocess database. To be used in cleanup_mandb.
outputs : list
List of outputs from preprocess database. To be used in cleanup_mandb.
"""
from .. import site_pipeline
context = core.Context(context)
if L is None:
L = site_pipeline.util.init_logger("Demod filterbin mapmaking")
pre = "" if tag is None else tag + " "
if comm.rank == 0: L.info(pre + "Initializing equation system")
mapmaker = setup_demod_map(noise_model, shape=shape, wcs=wcs, nside=nside,
comm=comm, comps=comps, split_labels=split_labels,
singlestream=singlestream, dtype_tod=dtype_tod,
dtype_map=dtype_map, recenter=recenter, verbose=verbose)

if comm.rank == 0: L.info(pre + "Building RHS")
# And feed it with our observations
nobs_kept = 0
errors = [] ; outputs = []; # PENDING: do an allreduce of these.
# not needed for atomic maps, but needed for
# depth-1 maps
for oi in range(len(obslist)):
obs_id, detset, band = obslist[oi][:3]
name = "%s:%s:%s" % (obs_id, detset, band)
error, output, obs = site_pipeline.preprocess_tod.preproc_or_load_group(obs_id,
configs=preprocess_config,
dets={'wafer_slot':detset, 'wafer.bandpass':band},
logger=L, context=context, overwrite=False)
errors.append(error) ; outputs.append(output) ;
if error not in [None,'load_success']:
L.info('tod %s:%s:%s failed in the prepoc database'%(obs_id,detset,band))
continue
obs.wrap("weather", np.full(1, "toco"))
obs.wrap("site", np.full(1, site))
obs.flags.wrap('glitch_flags', obs.preprocess.turnaround_flags.turnarounds
+ obs.preprocess.jumps_2pi.jump_flag + obs.preprocess.glitches.glitch_flags, )
mapmaker.add_obs(name, obs, split_labels=split_labels)
L.info('Done with tod %s:%s:%s'%(obs_id,detset,band))
nobs_kept += 1
nobs_kept = comm.allreduce(nobs_kept)
# if we skip all the obs then we return error and output
if nobs_kept == 0:
return errors, outputs

for signal in mapmaker.signals:
signal.prepare()
if comm.rank == 0: L.info(pre + "Writing F+B outputs")
wmap = []
weights = []
# mapmaker.signals[0] is signal_map
for n_split in range(mapmaker.signals[0].Nsplits):
wmap.append( mapmaker.signals[0].rhs[n_split] )
div = np.diagonal(mapmaker.signals[0].div[n_split], axis1=0, axis2=1)
div = np.moveaxis(div, -1, 0) # this moves the last axis to the 0th position
weights.append(div)
mapdata = bunch.Bunch(wmap=wmap, weights=weights, signal=mapmaker.signals[0], t0=t0)
# output to files
write_demod_maps(prefix, mapdata, split_labels=split_labels, )
write_demod_info(prefix, info, split_labels=split_labels )
return errors, outputs

def project_rhs_demod(pmap, signalT, signalQ, signalU, det_weightsT, det_weightsQU, wrapper=lambda x:x):
"""
Project demodulated T, Q, U timestreams into weighted maps.
Expand Down
12 changes: 7 additions & 5 deletions sotodlib/mapmaking/obs_grouping.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
depth-1 maps, etc.
"""

__all__ = ['build_obslists']
__all__ = ['build_obslists','NoTODFound']
import numpy as np
from pixell import utils
from scipy import ndimage

from .utilities import get_ids

class NoTODFound(Exception):
def __init__(self, msg):
self.msg = msg

def build_obslists(context, query, mode=None, nset=None, wafer=None,
freq=None, ntod=None, tods=None, fixed_time=None, mindur=None, ):
"""
Expand Down Expand Up @@ -78,8 +82,7 @@ def build_obslists(context, query, mode=None, nset=None, wafer=None,
if tods: ids = np.array(eval("ids"+tods)).reshape(-1)
if ntod: ids = ids[:ntod]
if len(ids) == 0:
print("No tods found!")
sys.exit(1)
raise NoTODFound("No tods found!")
# Extract info about the selected ids
obs_infos = context.obsdb.query("obs_id in (%s)" % ",".join(["'%s'" % id for id in ids]))
obs_infos = obs_infos.asarray().view(np.recarray)
Expand All @@ -100,8 +103,7 @@ def build_obslists(context, query, mode=None, nset=None, wafer=None,
periods = split_periods(periods, 24*3600) # If fixed_time was not set,
#then we do 24 hrs by default and it will be the same as depth_1
else:
print("Invalid mode!")
sys.exit(1)
raise NoTODFound("Invalid mode!")

# We will make one map per period-detset-band
obslists = build_period_obslists(obs_infos, periods, context, nset=nset,
Expand Down

0 comments on commit a03ad3b

Please sign in to comment.