From 71fbd43b0760d653bb6ee5cc12c1e0dea9b832b6 Mon Sep 17 00:00:00 2001 From: ejhigson <18406298+ejhigson@users.noreply.github.com> Date: Tue, 6 Aug 2019 17:55:32 +0100 Subject: [PATCH] adding resume_dyn_run option to resume the dyPolyChord process partway through the dynamic run --- dyPolyChord/run_dynamic_ns.py | 168 +++++++++++++++++++++++++--------- 1 file changed, 124 insertions(+), 44 deletions(-) diff --git a/dyPolyChord/run_dynamic_ns.py b/dyPolyChord/run_dynamic_ns.py index fed52bd..f1003e0 100644 --- a/dyPolyChord/run_dynamic_ns.py +++ b/dyPolyChord/run_dynamic_ns.py @@ -109,6 +109,12 @@ def run_dypolychord(run_polychord, dynamic_goal, settings_dict_in, **kwargs): output files for the combined run in PolyChord format. When debugging this can be set to False to allow inspection of intermediate output. + resume_dyn_run: bool, optional + Resume a partially completed dyPolyChord run using its cached output + files. Resuming is only possible if the initial exploratory run + finished and the process reached the dynamic run stage. If the run + is resumed with different settings to what were used the first time + then this may give unexpected results. """ try: nlive_const = kwargs.pop('nlive_const', settings_dict_in['nlive']) @@ -123,10 +129,11 @@ def run_dypolychord(run_polychord, dynamic_goal, settings_dict_in, **kwargs): comm = kwargs.pop('comm', None) stats_means_errs = kwargs.pop('stats_means_errs', True) clean = kwargs.pop('clean', True) + resume_dyn_run = kwargs.pop('resume_dyn_run', True) if kwargs: raise TypeError('Unexpected **kwargs: {0}'.format(kwargs)) - # Step 1: do initial run - # ---------------------- + # Set Up + # ------ # set up rank if running with MPI if comm is not None: rank = comm.Get_rank() @@ -143,45 +150,86 @@ def run_dypolychord(run_polychord, dynamic_goal, settings_dict_in, **kwargs): 'dyPolyChord with multiple MPI processes. You have seed={} ' 'and {} MPI processes.').format( settings_dict_in['seed'], comm.Get_size()), UserWarning) - # Make a copy of settings dic so we dont edit settings - settings_dict = copy.deepcopy(settings_dict_in) - settings_dict['file_root'] = settings_dict['file_root'] + '_init' - settings_dict['nlive'] = ninit - if dynamic_goal == 0: - # We definitely won't need to resume midway through in this case, so - # just run PolyChod normally - run_polychord(settings_dict, comm=comm) - if rank == 0: - final_seed = settings_dict['seed'] - if settings_dict['seed'] >= 0: - final_seed += seed_increment - step_ndead = None - resume_outputs = None + root_name = os.path.join(settings_dict_in['base_dir'], + settings_dict_in['file_root']) + if resume_dyn_run: + # Check if the files we need to resume the dynamic run all exist. + files_needed = ['_dyn_info.pkl', '_init_dead.txt', '_dyn_dead.txt', + '_dyn.resume'] + files_needed = [root_name + ending for ending in files_needed] + files_exist = [os.path.isfile(name) for name in files_needed] + if all(files_exist): + skip_initial_run = True + print('resume_dyn_run=True so I am skipping the inital ' + 'exploratory run and resuming the dynamic run') + else: + skip_initial_run = False + # Only print a message if some of the files are present + if any(files_exist): + msg = ( + 'resume_dyn_run=True but I could not resume as not ' + 'all of the files I need are present. Perhaps the initial ' + 'exploratory run did not finish? The dyPolyChord process ' + 'can only be resumed from after the dynamic run ' + 'starts.\nFiles I am missing are:') + for i, name in enumerate(files_needed): + if not files_exist[i]: + msg += '\n' + name + print(msg) else: - step_ndead, resume_outputs, final_seed = run_and_save_resumes( - run_polychord, settings_dict, init_step, seed_increment, comm=comm) - # Step 2: calculate an allocation of live points - # ---------------------------------------------- - if rank == 0: - try: - # Get settings for dynamic run based on initial run - settings_dict = process_initial_run( - settings_dict_in, nlive_const=nlive_const, - smoothing_filter=smoothing_filter, - step_ndead=step_ndead, resume_outputs=resume_outputs, - ninit=ninit, dynamic_goal=dynamic_goal, - final_seed=final_seed) - except: # pragma: no cover - if comm is None or comm.Get_size() == 1: - raise - else: - # print error info - traceback.print_exc(file=sys.stdout) - print('Error in process with rank == 0: forcing MPI abort.') - sys.stdout.flush() # Make sure message prints before abort - comm.Abort(1) + skip_initial_run = False + if skip_initial_run: + dyn_info = nestcheck.io_utils.pickle_load(root_name + '_dyn_info') + else: + # Step 1: do initial run + # ---------------------- + if rank == 0: + # Make a copy of settings dic so we dont edit settings + settings_dict = copy.deepcopy(settings_dict_in) + settings_dict['file_root'] = settings_dict['file_root'] + '_init' + settings_dict['nlive'] = ninit + if dynamic_goal == 0: + # We definitely won't need to resume midway through in this case, so + # just run PolyChod normally + run_polychord(settings_dict, comm=comm) + if rank == 0: + final_seed = settings_dict['seed'] + if settings_dict['seed'] >= 0: + final_seed += seed_increment + step_ndead = None + resume_outputs = None + else: + step_ndead, resume_outputs, final_seed = run_and_save_resumes( + run_polychord, settings_dict, init_step, seed_increment, comm=comm) + # Step 2: calculate an allocation of live points + # ---------------------------------------------- + if rank == 0: + try: + # Get settings for dynamic run based on initial run + dyn_info = process_initial_run( + settings_dict_in, nlive_const=nlive_const, + smoothing_filter=smoothing_filter, + step_ndead=step_ndead, resume_outputs=resume_outputs, + ninit=ninit, dynamic_goal=dynamic_goal, + final_seed=final_seed) + except: # pragma: no cover + if comm is None or comm.Get_size() == 1: + raise + else: + # print error info + traceback.print_exc(file=sys.stdout) + print('Error in process with rank == 0: forcing MPI abort.') + sys.stdout.flush() # Make sure message prints before abort + comm.Abort(1) # Step 3: do dynamic run # ---------------------- + # Get settings for dynamic run + if rank == 0: + settings_dict = get_dynamic_settings(settings_dict_in, dyn_info) + if resume_dyn_run: + settings_dict['write_resume'] = True + settings_dict['read_resume'] = True + # Do the run run_polychord(settings_dict, comm=comm) # Step 4: process output and tidy # ------------------------------- @@ -211,8 +259,9 @@ def run_dypolychord(run_polychord, dynamic_goal, settings_dict_in, **kwargs): def process_initial_run(settings_dict_in, **kwargs): - """Loads the initial exploratory run and analyses it to create the settings - for the second, dynamic run. + """Loads the initial exploratory run and analyses it to create information + about the second, dynamic run. This information is returned as a dictionary + and also cached. Parameters ---------- @@ -239,6 +288,12 @@ def process_initial_run(settings_dict_in, **kwargs): resume. Keys are elements of step_ndead. final_seed: int Random seed at the end of the initial run. + + Returns + ------- + dyn_info: dict + Information about the second dynamic run which is calculated from + analysing the initial exploratory run. """ dynamic_goal = kwargs.pop('dynamic_goal') nlive_const = kwargs.pop('nlive_const') @@ -265,6 +320,8 @@ def process_initial_run(settings_dict_in, **kwargs): dyn_info = dyPolyChord.nlive_allocation.allocate( init_run, samp_tot, dynamic_goal, smoothing_filter=smoothing_filter) + dyn_info['final_seed'] = final_seed + dyn_info['ninit'] = ninit root_name = os.path.join(settings_dict_in['base_dir'], settings_dict_in['file_root']) if dyn_info['peak_start_ind'] != 0: @@ -289,20 +346,43 @@ def process_initial_run(settings_dict_in, **kwargs): resume_outputs[resume_ndead]['nlike']) except KeyError: pass # protect from error reading nlike from .stats file - nestcheck.io_utils.pickle_save( - dyn_info, root_name + '_dyn_info', overwrite_existing=True) if dynamic_goal != 0: # Remove all the temporary resume files. Use set to avoid # duplicates as these cause OSErrors. for snd in set(step_ndead): os.remove(root_name + '_init_' + str(snd) + '.resume') + nestcheck.io_utils.pickle_save( + dyn_info, root_name + '_dyn_info', overwrite_existing=True) + return dyn_info + + +def get_dynamic_settings(settings_dict_in, dyn_info): + """Loads the initial exploratory run and analyses it to create information + about the second, dynamic run. This information is returned as a dictionary + and also cached. + + Parameters + ---------- + settings_dict_in: dict + Initial PolyChord settings (see check_settings for information on + allowed and default settings). + dynamic_goal: float or int + Number in (0, 1) which determines how to allocate computational effort + between parameter estimation and evidence calculation. See the dynamic + nested sampling paper for more details. + + Returns + ------- + settings_dict: dict + PolyChord settings for dynamic run. + """ settings_dict = copy.deepcopy(settings_dict_in) - settings_dict['seed'] = final_seed + settings_dict['seed'] = dyn_info['final_seed'] if settings_dict['seed'] >= 0: assert settings_dict_in['seed'] >= 0, ( 'if input seed was <0 it should not have been edited') if dyn_info['peak_start_ind'] != 0: - settings_dict['nlive'] = ninit + settings_dict['nlive'] = dyn_info['ninit'] else: settings_dict['nlive'] = dyn_info['nlives_dict'][ min(dyn_info['nlives_dict'].keys())]