Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sat mapmaking refactor #973

Merged
merged 50 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
fdcbb3f
Refactor of the demod mapmaking
chervias Sep 27, 2024
c51b5a0
Docstring for make_atomic_map
chervias Sep 28, 2024
bd74d40
Newline at end of file
chervias Sep 28, 2024
372d450
Handling of failed preprocessings
chervias Sep 28, 2024
2cca6d8
Fixing a bug
chervias Sep 28, 2024
7e06d85
Fix bug
chervias Sep 30, 2024
4d996a0
updates for 20240819 satp3 catchup job
Sep 30, 2024
abb4c93
remove changes added after catchup job finished
Sep 30, 2024
b0100e0
add update to site_pipeline
Oct 1, 2024
5d120a3
Restoring the MPI communicator capabilities
chervias Oct 1, 2024
4f15b08
remove profiling calls
Oct 2, 2024
d2de940
adjust fill_glitches to address #740
Oct 2, 2024
c8bbd93
add error message for empty iir_params to partially address #969
Oct 2, 2024
6a45b75
adjustment to new t2p leakage model handling
Oct 2, 2024
413d293
remove repeated calculation get_gap_fill
Oct 2, 2024
feafb55
add lmfit to conf.py
Oct 3, 2024
af484c7
Merge branch 'master' into 20240819_satp3_catchup
mmccrackan Oct 3, 2024
a3e0f79
add lmfit to setup.py
Oct 3, 2024
89bd808
Merge remote-tracking branch 'origin/master' into sat-mapmaking-refactor
chervias Oct 3, 2024
eaf6076
breaking up into smaller functions, the writing of files added to mak…
chervias Oct 4, 2024
8e5efa0
updating the docstring
chervias Oct 4, 2024
851d959
support for multiprocessing and loading the context inside the function
chervias Oct 10, 2024
3a1fc19
Site pipeline script added
chervias Oct 10, 2024
34b66e3
Merge remote-tracking branch 'origin/master' into sat-mapmaking-refactor
chervias Oct 10, 2024
0ffb6f4
Restoring the check in the atomic maps database
chervias Oct 10, 2024
0354e10
Merge remote-tracking branch 'origin/20240819_satp3_catchup' into sat…
chervias Oct 11, 2024
cf1593e
Making logger to work under multiprocessing pools
chervias Oct 14, 2024
6535ecc
updates from satp1 site pipeline job
Oct 21, 2024
2b24dbf
remove lingering profile call
Oct 21, 2024
9ea2196
fix processes.py
Oct 21, 2024
a8e5f0f
Merge remote-tracking branch 'origin/20240819_satp3_catchup' into sat…
chervias Oct 25, 2024
4015b08
Merge remote-tracking branch 'origin/master' into sat-mapmaking-refactor
chervias Oct 27, 2024
a28fc5f
Fixing conflicts with healpix PR, we still need to add healpix suppor…
chervias Oct 28, 2024
fddb0be
Support for healpix added to make_demod_map and the script
chervias Oct 28, 2024
63e00d5
Implementing nside_tile=auto in make_demod_map
chervias Oct 29, 2024
bc97351
adjustments to t2p, flags, gapfill
Oct 30, 2024
78627d9
fix docstring
Oct 30, 2024
873021a
update glitch fill test
Oct 30, 2024
0ffbdaf
update pca for compatibility
Oct 31, 2024
804eddf
Merge branch 'master' into 20240819_satp3_catchup
mmccrackan Oct 31, 2024
25519a5
Merge remote-tracking branch 'origin/20240819_satp3_catchup' into sat…
chervias Nov 1, 2024
33ac57b
Merge remote-tracking branch 'origin/master' into sat-mapmaking-refactor
chervias Nov 1, 2024
15017a9
newline fix
chervias Nov 1, 2024
ee1f280
Merge remote-tracking branch 'origin/master' into sat-mapmaking-refactor
chervias Nov 7, 2024
0dea922
removing the script, which will be merged in a further commit
chervias Nov 7, 2024
59c6d58
Fixing bug where sys was not imported
chervias Nov 7, 2024
a6e488b
Adding exception handling in obs_grouping
chervias Nov 8, 2024
5c4686c
Various fixes replying to review
chervias Nov 8, 2024
dbeb889
removing script which was added by mistake
chervias Nov 8, 2024
6a4627e
Various fixes
chervias Nov 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 209 additions & 15 deletions sotodlib/mapmaking/demod_mapmaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
you accumulate observations into the div and rhs maps. For examples
how to use look at docstring of DemodMapmaker.
"""
__all__ = ['DemodMapmaker','DemodSignal','DemodSignalMap']
import numpy as np
from pixell import enmap, utils, tilemap, bunch
__all__ = ['DemodMapmaker','DemodSignal','DemodSignalMap','make_demod_map']
import numpy as np, os
from pixell import enmap, utils as putils, tilemap, bunch, mpi
import so3g.proj

from .. import core
from .. import coords
from .utilities import recentering_to_quat_lonlat, evaluate_recentering, MultiZipper, unarr, safe_invert_div
from .utilities import import_optional, get_flags
Expand Down Expand Up @@ -213,7 +214,7 @@ def for_rectpix(cls, shape, wcs, comm, comps="TQU", name="sky", ofmt="{name}", o
ext : str, optional
The extension used for the files.
dtype : numpy.dtype
The data type to use for the time-ordered data.
The data type to use for the maps.
sys : str or None, optional
The coordinate system to map. Defaults to equatorial
recenter : str or None
Expand Down Expand Up @@ -338,22 +339,14 @@ def prepare(self):
self.hits = tilemap.redistribute(self.hits,self.comm)
else:
if self.comm is not None:
self.rhs = utils.allreduce(self.rhs, self.comm)
self.div = utils.allreduce(self.div, self.comm)
self.hits = utils.allreduce(self.hits,self.comm)
self.rhs = putils.allreduce(self.rhs, self.comm)
self.div = putils.allreduce(self.div, self.comm)
self.hits = putils.allreduce(self.hits,self.comm)
self.ready = True

@property
def ncomp(self): return len(self.comps)

def to_work(self, map):
if self.tiled: return tilemap.redistribute(map, self.comm, self.geo_work.active)
else: return map.copy()

def from_work(self, map):
if self.tiled: return tilemap.redistribute(map, self.comm, self.rhs.geometry.active)
else: return utils.allreduce(map, self.comm)

def write(self, prefix, tag, m):
if not self.output: return
oname = self.ofmt.format(name=self.name)
Expand Down Expand Up @@ -385,6 +378,207 @@ def write(self, prefix, tag, m):

return oname

def setup_demod_map(noise_model, shape=None, wcs=None, nside=None,
comm=mpi.COMM_WORLD, comps='TQU', split_labels=None,
singlestream=False, dtype_tod=np.float32,
dtype_map=np.float32, recenter=None, verbose=0):
"""
Setup the classes for demod mapmaking and return
a DemodMapmmaker object
"""
if shape is not None and wcs is not None:
if split_labels==None:
# this is the case where we did not request any splits at all
signal_map = DemodSignalMap.for_rectpix(shape, wcs, comm, comps=comps,
dtype=dtype_map, tiled=False,
ofmt="", singlestream=singlestream,
recenter=recenter )
else:
# this is the case where we asked for at least 2 splits (1 split set).
# We count how many split we'll make, we need to define the Nsplits
# maps inside the DemodSignalMap
Nsplits = len(split_labels)
signal_map = DemodSignalMap.for_rectpix(shape, wcs, comm, comps=comps,
dtype=dtype_map, tiled=False,
ofmt="", Nsplits=Nsplits,
singlestream=singlestream,
recenter=recenter)
elif nside is not None:
if split_labels==None:
# this is the case where we did not request any splits at all
signal_map = DemodSignalMap.for_healpix(nside, nside_tile='auto',
comps=comps, dtype=dtype_map,
ofmt="", singlestream=singlestream,
ext="fits.gz")
else:
# this is the case where we asked for at least 2 splits (1 split set).
# We count how many split we'll make, we need to define the Nsplits
# maps inside the DemodSignalMap
Nsplits = len(split_labels)
signal_map = DemodSignalMap.for_healpix(nside, nside_tile='auto',
comps=comps, dtype=dtype_map,
ofmt="", Nsplits=Nsplits,
singlestream=singlestream,
ext="fits.gz")
signals = [signal_map]
mapmaker = DemodMapmaker(signals, noise_model=noise_model,
dtype=dtype_tod,
verbose=verbose>0,
singlestream=singlestream)
return mapmaker

def write_demod_maps(prefix, data, split_labels=None):
"""
Write maps from data into files
"""
if split_labels==None:
# we have no splits, so we save index 0 of the lists
data.signal.write(prefix, "full_wmap", data.wmap[0])
data.signal.write(prefix, "full_weights", data.weights[0])
data.signal.write(prefix, "full_hits", data.signal.hits)
else:
# we have splits
Nsplits = len(split_labels)
for n_split in range(Nsplits):
data.signal.write(prefix, "%s_wmap"%split_labels[n_split],
data.wmap[n_split])
data.signal.write(prefix, "%s_weights"%split_labels[n_split],
data.weights[n_split])
data.signal.write(prefix, "%s_hits"%split_labels[n_split],
data.signal.hits[n_split])

def write_demod_info(oname, info, split_labels=None):
putils.mkdir(os.path.dirname(oname))
if split_labels==None:
bunch.write(oname+'_full_info.hdf', info[0])
else:
# we have splits
Nsplits = len(split_labels)
for n_split in range(Nsplits):
bunch.write(oname+'_%s_info.hdf'%split_labels[n_split], info[n_split])

def make_demod_map(context, obslist, noise_model, info,
preprocess_config, prefix, shape=None, wcs=None,
nside=None, comm=mpi.COMM_WORLD, comps="TQU", t0=0,
dtype_tod=np.float32, dtype_map=np.float32,
tag="", verbose=0, split_labels=None, L=None,
site='so_sat3', recenter=None, singlestream=False):
"""
Make a demodulated map from the list of observations in obslist.

Arguments
---------
context : str
File path to context used to load obs from.
obslist : dict
The obslist which is the output of the
mapmaking.obs_grouping.build_obslists, contains the information of the
single or multiple obs to map.
noise_model : sotodlib.mapmaking.Nmat
Noise model to pass to DemodMapmaker.
info : list
Information for the database, will be written as a .hdf file.
preprocess_config : dict
Dictionary with the config yaml file for the preprocess database.
prefix : str
Prefix for the output files
shape : tuple, optional
Shape of the geometry to use for mapping.
wcs : dict, optional
WCS kernel of the geometry to use for mapping.
nside : int, optional
Nside for healpix pixelization
comps : str, optional
Which components to map, only TQU supported for now.
t0 : int, optional
Ctime to use as the label in the map files.
dtype_tod : numpy.dtype, optional
The data type to use for the time-ordered data. Only tested
with float32.
dtype_map : numpy.dtype, optional
The data type to use for the maps.
tag : str, optional
Prefix tag for the logger.
verbose : bool, optional
split_labels : list or None, optional
A list of strings with the splits requested. If None then no splits
were asked for, i.e. we will produce one map.
L : logger, optional
Logger for printing on the screen.
site : str, optional
Platform name for the pointing matrix.
recenter : str or None
String to make object-centered maps, such as Moon/Sun/Planet centered maps.
Look at sotodlib.mapmaking.parse_recentering for details.
singlestream : Bool
If True, do not perform demodulated filter+bin mapmaking but
rather regular filter+bin mapmaking, i.e. map from obs.signal
rather than from obs.dsT, obs.demodQ, obs.demodU.

Returns
-------
errors : list
List of errors from preprocess database. To be used in cleanup_mandb.
outputs : list
List of outputs from preprocess database. To be used in cleanup_mandb.
"""
from .. import site_pipeline
context = core.Context(context)
if L is None:
L = site_pipeline.util.init_logger("Demod filterbin mapmaking")
pre = "" if tag is None else tag + " "
if comm.rank == 0: L.info(pre + "Initializing equation system")
mapmaker = setup_demod_map(noise_model, shape=shape, wcs=wcs, nside=nside,
comm=comm, comps=comps, split_labels=split_labels,
singlestream=singlestream, dtype_tod=dtype_tod,
dtype_map=dtype_map, recenter=recenter, verbose=verbose)

if comm.rank == 0: L.info(pre + "Building RHS")
# And feed it with our observations
nobs_kept = 0
errors = [] ; outputs = []; # PENDING: do an allreduce of these.
# not needed for atomic maps, but needed for
# depth-1 maps
for oi in range(len(obslist)):
obs_id, detset, band = obslist[oi][:3]
name = "%s:%s:%s" % (obs_id, detset, band)
error, output, obs = site_pipeline.preprocess_tod.preproc_or_load_group(obs_id,
configs=preprocess_config,
dets={'wafer_slot':detset, 'wafer.bandpass':band},
logger=L, context=context, overwrite=False)
errors.append(error) ; outputs.append(output) ;
if error not in [None,'load_success']:
L.info('tod %s:%s:%s failed in the prepoc database'%(obs_id,detset,band))
continue
obs.wrap("weather", np.full(1, "toco"))
obs.wrap("site", np.full(1, site))
obs.flags.wrap('glitch_flags', obs.preprocess.turnaround_flags.turnarounds
+ obs.preprocess.jumps_2pi.jump_flag + obs.preprocess.glitches.glitch_flags, )
mapmaker.add_obs(name, obs, split_labels=split_labels)
L.info('Done with tod %s:%s:%s'%(obs_id,detset,band))
nobs_kept += 1
nobs_kept = comm.allreduce(nobs_kept)
# if we skip all the obs then we return error and output
if nobs_kept == 0:
return errors, outputs

for signal in mapmaker.signals:
signal.prepare()
if comm.rank == 0: L.info(pre + "Writing F+B outputs")
wmap = []
weights = []
# mapmaker.signals[0] is signal_map
for n_split in range(mapmaker.signals[0].Nsplits):
wmap.append( mapmaker.signals[0].rhs[n_split] )
div = np.diagonal(mapmaker.signals[0].div[n_split], axis1=0, axis2=1)
div = np.moveaxis(div, -1, 0) # this moves the last axis to the 0th position
weights.append(div)
mapdata = bunch.Bunch(wmap=wmap, weights=weights, signal=mapmaker.signals[0], t0=t0)
# output to files
write_demod_maps(prefix, mapdata, split_labels=split_labels, )
write_demod_info(prefix, info, split_labels=split_labels )
return errors, outputs

def project_rhs_demod(pmap, signalT, signalQ, signalU, det_weightsT, det_weightsQU, wrapper=lambda x:x):
"""
Project demodulated T, Q, U timestreams into weighted maps.
Expand Down
12 changes: 7 additions & 5 deletions sotodlib/mapmaking/obs_grouping.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
depth-1 maps, etc.
"""

__all__ = ['build_obslists']
__all__ = ['build_obslists','NoTODFound']
import numpy as np
from pixell import utils
from scipy import ndimage

from .utilities import get_ids

class NoTODFound(Exception):
def __init__(self, msg):
self.msg = msg

def build_obslists(context, query, mode=None, nset=None, wafer=None,
freq=None, ntod=None, tods=None, fixed_time=None, mindur=None, ):
"""
Expand Down Expand Up @@ -78,8 +82,7 @@ def build_obslists(context, query, mode=None, nset=None, wafer=None,
if tods: ids = np.array(eval("ids"+tods)).reshape(-1)
if ntod: ids = ids[:ntod]
if len(ids) == 0:
print("No tods found!")
sys.exit(1)
raise NoTODFound("No tods found!")
# Extract info about the selected ids
obs_infos = context.obsdb.query("obs_id in (%s)" % ",".join(["'%s'" % id for id in ids]))
obs_infos = obs_infos.asarray().view(np.recarray)
Expand All @@ -100,8 +103,7 @@ def build_obslists(context, query, mode=None, nset=None, wafer=None,
periods = split_periods(periods, 24*3600) # If fixed_time was not set,
#then we do 24 hrs by default and it will be the same as depth_1
else:
print("Invalid mode!")
sys.exit(1)
raise NoTODFound("Invalid mode!")

# We will make one map per period-detset-band
obslists = build_period_obslists(obs_infos, periods, context, nset=nset,
Expand Down