Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multilayer preproc #1026

Merged
merged 23 commits into from
Dec 14, 2024
Merged

Add multilayer preproc #1026

merged 23 commits into from
Dec 14, 2024

Conversation

msilvafe
Copy link
Contributor

@msilvafe msilvafe commented Nov 12, 2024

Addresses issue #1003.

@msilvafe msilvafe requested a review from Wuhyun November 22, 2024 15:51
@msilvafe msilvafe marked this pull request as ready for review November 22, 2024 15:52
@Wuhyun
Copy link

Wuhyun commented Dec 3, 2024

I ran it successfully on two-step preprocessing, split at demodulation. Looks functional to me - thank you for implementing this!

Test performed (found under NERSC /global/cfs/cdirs/sobs/users/wuhyun/multilayer_preproc_tests for anyone interested):

# 1) Initial preproc
preprocess_tod(obs_id, config_1, group_list=[["ws0", "f150"]],...)
# 2) Dependent preproc
multilayer_preprocess_tod(obs_id, config_1, config_2, group_list=[["ws1", "f150"]],...)
# 3) Load from saved results
aman = multilayer_load_and_preprocess(obs_id, config_1, config_2, dets={"wafer_slot": "ws0", "wafer.bandpass": "f150"})

I have one general question. Currently, the initial preprocessing (preprocess_tod) needs to be run before the second (multilayer_preprocess_tod). Would it be worth combining the two processes (e.g. have multilayer run preprocess_tod inside it), so that we can save up some time reloading the first preprocess?

For context, I found that on NERSC login node it takes ~120s to run and ~60s to load a preprocess config up to demodulation (single wafer & bandpass). Maybe these run fast enough that we don't worry about it much?

@mmccrackan
Copy link
Contributor

I have one general question. Currently, the initial preprocessing (preprocess_tod) needs to be run before the second (multilayer_preprocess_tod). Would it be worth combining the two processes (e.g. have multilayer run preprocess_tod inside it), so that we can save up some time reloading the first preprocess?

Thanks for testing!

I had considered incorporating preprocess_tod into multilayer_preprocess and it certainly is possible to do this, though there are a few considerations. There are some complexities due to the desire to run many obs in parallel and the need to do the outputs at the end. Since multilayer_preprocess_tod needs to check the pre-existing database from preprocess_tod, it would probably need to be accessed in parallel and populated for all entries that multilayer_preprocess needs.

We have plans to integrate a single main function interface for preprocess_tod, multilayer_preprocess_tod, and preprocess_obs in order to make using these functions simpler, so maybe we can revisit this after we have done that.

@Wuhyun
Copy link

Wuhyun commented Dec 4, 2024

Thank you for your answer, that sounds good to me.
Please add that one import statement and I'll be happy to approve this if others are.

@msilvafe
Copy link
Contributor Author

msilvafe commented Dec 4, 2024

Thank you for your answer, that sounds good to me. Please add that one import statement and I'll be happy to approve this if others are.

Hey @Wuhyun thanks for reviewing and testing! What is the import statement that you're referring to?

Also after chatting with @mmccrackan we're going to go ahead and implement your suggestion to allow the multilayer function to run through both database builds and not require that the first one is prebuilt via preprocess_tod.

Copy link

@Wuhyun Wuhyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, forgot to submit this eariler.

@mmccrackan
Copy link
Contributor

Okay, I have made a bunch of changes to the functions here. I've tested it a fair bit, but there are a lot of logic branches here, so I probably missed a few cases. The main differences are:

  1. multilayer_preprocess_tod.py will now call preproc_or_load_group.py, meaning that it can run the pipeline on the first config if the db is not found or overwrite is true.
  2. preproc_or_load_group.py has been refactored to allow for one or two config files. If running one, it will operate like it did before. If two, it will either load both or it will load or run the first config and then run the second config.
  3. both preprocess_tod.py and multilayer_preprocess_tod.py now write out temp files for each group instead of just a single file for all groups. The former writes to "temp/" as before, whereas the latter writes to "temp_proc/".
  4. cleanup_mandb is now used for merging all the temp files in both cases.
  5. Some helper functions like find_db and save_groups have been added and are used throughout.
  6. Since we're now matching the per-group files of preproc_or_load_group.py in preprocess_tod.py and multilayer_preprocess_tod.py, the outputs have changed. The destination files are no longer output separately, but are output inside of the output lists. So the outputs for preprocess_tod.py are error and outputs, whereas for multilayer_preprocess_tod.py they are error, outputs_init, outputs_proc. The outputs may be empty depending on the runtime configuration.

outputs_proc = save_group(proc_aman, obs_id, configs_proc, dets, context_proc,
subdir='temp_proc/', overwrite=overwrite)
outputs_proc = save_group(obs_id, configs_proc, dets, context_proc, subdir='temp_proc')
if overwrite or not os.path.exists(outputs_proc['temp_file']):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic should go before pipe.run if we want to catch the logical case where the code exited after writing some of the groups (but not all) to the temp directory.


return aman
else:
return None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In check_cfg_match if the dependency check fails it just raises a logger.warning() and returns False let's raise an exception or print something to stdout here in the else statement that states that the dependency check fails.

dbix = {'obs:obs_id':obs_id}
for gb, g in zip(group_by, cur_groups[0]):
dbix[f'dets:{gb}'] = g
print(dbix)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably go to a logger instead of to stdout (if we actually need this, maybe only needed for debugging)

Comment on lines 472 to 473
overwrite: bool
Optional. Whether or not to overwrite existing entries in the preprocess manifest db.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not an argument.

@@ -335,35 +337,213 @@ def load_and_preprocess(obs_id, configs, context=None, dets=None, meta=None,
return aman


def preproc_or_load_group(obs_id, configs, dets, logger=None,
context=None, overwrite=False):
def multilayer_load_and_preprocess(obs_id, configs_init, configs_proc,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

context_<> arguments not used

an initial and a dependent pipeline stage. If the preprocess database entry for
this obsid-dets group already exists then this function will just load back the
processed tod calling either the ``load_and_preprocess`` or
''multilayer_load_and_preprocess`` functions. If the db entry does not exist of
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong type of quotes should be `` at the beginning of multilayer_load_and_preprocess. If the db entry does not exists or not of

if type(configs_init) == str:
configs_init = yaml.safe_load(open(configs_init, "r"))

if context_init is None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as line 484 here and at 583

if db_init_exist and (not overwrite):
if db_proc_exist:
logger.info(f"both db and depdendent db exist for {obs_id} {dets} loading data and applying preprocessing.")
aman = multilayer_load_and_preprocess(obs_id=obs_id, dets=dets, configs_init=configs_init,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write this (and line 618) into a try-except similar to lines 624/648 to prevent crashing @chervias's mapmaker script and writing a more useful error message to his error log.

aman.wrap('tags', tags)
proc_aman, success = pipe.run(aman)
proc_aman, success = pipe_init.run(aman)
aman.wrap('preprocess', proc_aman)
except Exception as e:
error = f'Failed to load: {obs_id} {dets}'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These error messages should now include reference to whether this is error-ing out in the init or proc section to aid debugging where in this mess of nested if-else statements we're exiting.

return success, [obs_id, dets], [obs_id, dets], None

outputs_init = save_group(obs_id, configs_init, dets, context_init, subdir='temp')
if overwrite or not os.path.exists(outputs_init['temp_file']):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check should probably happen before getting to this step as referenced above in L642

@mmccrackan
Copy link
Contributor

I've added some things to address the existing comments. There are likely still bugs or logic paths that might fail so I'll keep testing, but I wanted to get the updates out so everyone could take a look.

The biggest change is the addition of a new function save_group_and_cleanup which will look through all the input groups, find any for which a temporary file exists, call cleanup_mandb (creating a db if it doesn't exist), and delete the file afterward. If the file exists, but cannot be opened, it will also be deleted (in case of file corruption). This is intended to allow us to use the temp files from a run that failed before creating and populating the db and destination file for all groups. I pass through an overwrite option as remove (I changed the name to remove, since overwrite would imply the opposite behavior within save_group_and_cleanup) which will simply delete the files. This is added to allow for overwrite=True to work in preprocess_tod.py.

It is probably a good idea to do this before running preproc_or_load_group followed by cleanup_mandb. To do the same for one group, you can just do:

dets = {gb:gg for gb, gg in zip(group_by, group)}
outputs_grp = save_group(obs_id, configs, dets, context, subdir)

if os.path.exists(outputs_grp['temp_file']):
    try:
        if not remove: # overwrite
            cleanup_mandb(None, outputs_grp, configs, logger)
        else:
            # if we're overwriting, remove file so it will re-run
            os.remove(output_grp['temp_file'])
    except OSError as e:
        # remove if it can't be opened
        os.remove(output_grp['temp_file'])

I've also wrapped up most commands in preproc_or_load_group into try, except blocks, so hopefully nothing should result in an uncaught exception.

@msilvafe msilvafe merged commit 8c74d3b into master Dec 14, 2024
4 checks passed
@msilvafe msilvafe deleted the 20241104_multilayer_preproc branch December 14, 2024 19:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants