Skip to content

Commit

Permalink
error check for get_groups
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael McCrackan committed Dec 20, 2024
1 parent 6f04c1c commit 9cfce36
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 29 deletions.
48 changes: 29 additions & 19 deletions sotodlib/preprocess/preprocess_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,19 +176,25 @@ def get_groups(obs_id, configs, context):
groups : list of list of int
The list of groups of detectors.
"""
group_by = np.atleast_1d(configs['subobs'].get('use', 'detset'))
for i, gb in enumerate(group_by):
if gb.startswith('dets:'):
group_by[i] = gb.split(':',1)[1]

if (gb == 'detset') and (len(group_by) == 1):
groups = context.obsfiledb.get_detsets(obs_id)
return group_by, [[g] for g in groups]

det_info = context.get_det_info(obs_id)
rs = det_info.subset(keys=group_by).distinct()
groups = [[b for a,b in r.items()] for r in rs]
return group_by, groups
try:
group_by = np.atleast_1d(configs['subobs'].get('use', 'detset'))
for i, gb in enumerate(group_by):
if gb.startswith('dets:'):
group_by[i] = gb.split(':',1)[1]

if (gb == 'detset') and (len(group_by) == 1):
groups = context.obsfiledb.get_detsets(obs_id)
return group_by, [[g] for g in groups]

det_info = context.get_det_info(obs_id)
rs = det_info.subset(keys=group_by).distinct()
groups = [[b for a,b in r.items()] for r in rs]
return groups, groups_by, None
except Exception as e:
error = f'Failed get groups for: {obs_id}'
errmsg = f'{type(e)}: {e}'
tb = ''.join(traceback.format_tb(e.__traceback__))
return [], [], [error, errmsg, tb]


def get_preprocess_db(configs, group_by, logger=None):
Expand Down Expand Up @@ -388,8 +394,8 @@ def multilayer_load_and_preprocess(obs_id, configs_init, configs_proc,
configs_proc, context_proc = get_preprocess_context(configs_proc, context_proc)
meta_proc = context_proc.get_meta(obs_id, dets=dets, meta=meta)

group_by_init, groups_init = get_groups(obs_id, configs_init, context_init)
group_by_proc, groups_proc = get_groups(obs_id, configs_proc, context_proc)
group_by_init, groups_init, error = get_groups(obs_id, configs_init, context_init)
group_by_proc, groups_proc, error = get_groups(obs_id, configs_proc, context_proc)

if (group_by_init != group_by_proc).any():
raise ValueError('init and proc groups do not match')
Expand Down Expand Up @@ -451,7 +457,7 @@ def find_db(obs_id, configs, dets, context=None, logger=None):
configs = yaml.safe_load(open(configs, "r"))
if context is None:
context = core.Context(configs["context_file"])
group_by, _ = get_groups(obs_id, configs, context)
group_by, _, _ = get_groups(obs_id, configs, context)
cur_groups = [list(np.fromiter(dets.values(), dtype='<U32'))]
dbexist = True
if os.path.exists(configs['archive']['index']):
Expand Down Expand Up @@ -560,7 +566,7 @@ def save_group_and_cleanup(obs_id, configs, context=None, subdir='temp',
if context is None:
context = core.Context(configs["context_file"])

group_by, groups = get_groups(obs_id, configs, context)
group_by, groups, error = get_groups(obs_id, configs, context)

all_groups = groups.copy()
for g in all_groups:
Expand All @@ -583,6 +589,7 @@ def save_group_and_cleanup(obs_id, configs, context=None, subdir='temp',
except OSError as e:
# remove if it can't be opened
os.remove(outputs_grp['temp_file'])
return error


def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, logger=None,
Expand Down Expand Up @@ -657,9 +664,12 @@ def preproc_or_load_group(obs_id, configs_init, dets, configs_proc=None, logger=
if context_proc is None:
context_proc = core.Context(configs_proc["context_file"])

group_by, groups = get_groups(obs_id, configs_proc, context_proc)
group_by, groups, error = get_groups(obs_id, configs_proc, context_proc)
else:
group_by, groups = get_groups(obs_id, configs_init, context_init)
group_by, groups, error = get_groups(obs_id, configs_init, context_init)

if error is not None:
return error[0], [error[1], error[2]], [error[1], error[2]], None

all_groups = groups.copy()
cur_groups = [list(np.fromiter(dets.values(), dtype='<U32'))]
Expand Down
17 changes: 13 additions & 4 deletions sotodlib/site_pipeline/multilayer_preprocess_tod.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,19 @@ def main(configs_init: str,
# clean up lingering files from previous incomplete runs
for obs in obs_list:
obs_id = obs['obs_id']
pp_util.save_group_and_cleanup(obs_id, configs_init, context_init,
subdir='temp', remove=overwrite)
pp_util.save_group_and_cleanup(obs_id, configs_proc, context_proc,
subdir='temp_proc', remove=overwrite)
error = pp_util.save_group_and_cleanup(obs_id, configs_init, context_init,
subdir='temp', remove=overwrite)
if error is not None:
f = open(errlog, 'a')
f.write(f'\n{time.time()}, init cleanup error\n{error[0]}\n{error[2]}\n')
f.close()

error = pp_util.save_group_and_cleanup(obs_id, configs_proc, context_proc,
subdir='temp_proc', remove=overwrite)
if error is not None:
f = open(errlog, 'a')
f.write(f'\n{time.time()}, dependent cleanup error\n{error[0]}\n{error[2]}\n')
f.close()

run_list = []

Expand Down
16 changes: 10 additions & 6 deletions sotodlib/site_pipeline/preprocess_tod.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def dummy_preproc(obs_id, group_list, logger,
error = None
outputs = []
context = core.Context(configs["context_file"])
group_by, groups = pp_util.get_groups(obs_id, configs, context)
group_by, groups, error = pp_util.get_groups(obs_id, configs, context)
pipe = Pipeline(configs["process_pipe"], plot_dir=configs["plot_dir"], logger=logger)
for group in groups:
logger.info(f"Beginning run for {obs_id}:{group}")
Expand Down Expand Up @@ -83,7 +83,7 @@ def preprocess_tod(obs_id,
configs = yaml.safe_load(open(configs, "r"))

context = core.Context(configs["context_file"])
group_by, groups = pp_util.get_groups(obs_id, configs, context)
group_by, groups, error = pp_util.get_groups(obs_id, configs, context)
all_groups = groups.copy()
for g in all_groups:
if group_list is not None:
Expand Down Expand Up @@ -327,21 +327,25 @@ def main(
# clean up lingering files from previous incomplete runs
for obs in obs_list:
obs_id = obs['obs_id']
pp_util.save_group_and_cleanup(obs_id, configs, context,
subdir='temp', remove=overwrite)
error = pp_util.save_group_and_cleanup(obs_id, configs, context,
subdir='temp', remove=overwrite)
if error is not None:
f = open(errlog, 'a')
f.write(f'\n{time.time()}, cleanup error\n{error[0]}\n{error[2]}\n')
f.close()

run_list = []

if overwrite or not os.path.exists(configs['archive']['index']):
#run on all if database doesn't exist
for obs in obs_list:
group_by, groups = pp_util.get_groups(obs["obs_id"], configs, context)
group_by, groups, error = pp_util.get_groups(obs["obs_id"], configs, context)
run_list.append( (obs, groups) )# = [ (o, groups) for o in obs_list]
else:
db = core.metadata.ManifestDb(configs['archive']['index'])
for obs in obs_list:
x = db.inspect({'obs:obs_id': obs["obs_id"]})
group_by, groups = pp_util.get_groups(obs["obs_id"], configs, context)
group_by, groups, error = pp_util.get_groups(obs["obs_id"], configs, context)
if x is None or len(x) == 0:
run_list.append( (obs, None) )
elif len(x) != len(groups):
Expand Down

0 comments on commit 9cfce36

Please sign in to comment.