Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pynta restart for NERSC and Polaris #43

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
173 changes: 147 additions & 26 deletions pynta/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,18 @@
from fireworks.core.fworker import FWorker
import fireworks.fw_config
import logging
#restart RHE
import pickle
import time
from pynta.polaris import createFWorkers
from pynta.utils import copyDataAndSave
from pynta.multi_launcher import launch_multiprocess2
import json

class Pynta:
def __init__(self,path,rxns_file,surface_type,metal,label,launchpad_path=None,fworker_path=None,
vacuum=8.0,repeats=(3,3,4),slab_path=None,software="Espresso", pbc=(True,True,False),socket=False,queue=False,njobs_queue=0,a=None,
machine=None,
software_kwargs={'kpts': (3, 3, 1), 'tprnfor': True, 'occupations': 'smearing',
'smearing': 'marzari-vanderbilt',
'degauss': 0.01, 'ecutwfc': 40, 'nosym': True,
Expand All @@ -35,9 +43,11 @@ def __init__(self,path,rxns_file,surface_type,metal,label,launchpad_path=None,fw
TS_opt_software_kwargs=None,
lattice_opt_software_kwargs={'kpts': (25,25,25), 'ecutwfc': 70, 'degauss':0.02, 'mixing_mode': 'plain'},
reset_launchpad=False,queue_adapter_path=None,num_jobs=25,max_num_hfsp_opts=None,#max_num_hfsp_opts is mostly for fast testing
Eharmtol=3.0,Eharmfiltertol=30.0,Ntsmin=5,frozen_layers=2,fmaxopt=0.05,fmaxirc=0.1,fmaxopthard=0.05):
Eharmtol=3.0,Eharmfiltertol=30.0,Ntsmin=5,frozen_layers=2,fmaxopt=0.05,fmaxirc=0.1,fmaxopthard=0.05,pickled=0):

self.surface_type = surface_type
self.pickled = pickled

if launchpad_path:
launchpad = LaunchPad.from_file(launchpad_path)
else:
Expand All @@ -59,6 +69,7 @@ def __init__(self,path,rxns_file,surface_type,metal,label,launchpad_path=None,fw
self.metal = metal
self.adsorbate_fw_dict = dict()
self.software_kwargs = software_kwargs
self.machine = machine #need to specify 'alcf' or other machine of choice

if software.lower() == 'vasp':
self.pbc = (True,True,True)
Expand Down Expand Up @@ -135,7 +146,7 @@ def generate_slab(self,skip_launch=False):
write(os.path.join(self.path,"slab_init.xyz"),slab)
self.slab_path = os.path.join(self.path,"slab.xyz")
if self.software != "XTB":
fwslab = optimize_firework(os.path.join(self.path,"slab_init.xyz"),self.software,"slab",
fwslab = optimize_firework(os.path.join(self.path,"slab_init.xyz"),self.software,self.machine,"slab",
opt_method="BFGSLineSearch",socket=self.socket,software_kwargs=self.software_kwargs,
run_kwargs={"fmax" : self.fmaxopt},out_path=os.path.join(self.path,"slab.xyz"),constraints=["freeze up to {}".format(self.freeze_ind)],priority=1000)
wfslab = Workflow([fwslab], name=self.label+"_slab")
Expand All @@ -151,19 +162,43 @@ def generate_slab(self,skip_launch=False):
write(self.slab_path,slab)

def analyze_slab(self):
full_slab = self.slab
cas = SlabAdsorptionSites(full_slab, self.surface_type,allow_6fold=False,composition_effect=False,
#pickle info RHE
if self.pickled == 0:
full_slab = self.slab
cas = SlabAdsorptionSites(full_slab, self.surface_type,allow_6fold=False,composition_effect=False,
label_sites=True,
surrogate_metal=self.metal)

self.cas = cas
self.cas = cas

unique_site_lists,unique_site_pairs_lists,single_site_bond_params_lists,double_site_bond_params_lists = generate_unique_placements(full_slab,cas)

self.single_site_bond_params_lists = single_site_bond_params_lists
self.single_sites_lists = unique_site_lists
self.double_site_bond_params_lists = double_site_bond_params_lists
self.double_sites_lists = unique_site_pairs_lists

my_dictionary_to_pickled = {'cas' : cas,
'single_site_bond_params_list': single_site_bond_params_lists,
'single_sites_lists': unique_site_lists,
'double_site_bond_params_lists': double_site_bond_params_lists,
'double_sites_lists_full': unique_site_pairs_lists}

print("Save as a pickle")
with open('analize_slab.pickle', 'wb') as myfile:
pickle.dump(my_dictionary_to_pickled, myfile)

else :
with open('analize_slab.pickle', 'rb') as myfile:
my_dict = pickle.load(myfile)

unique_site_lists,unique_site_pairs_lists,single_site_bond_params_lists,double_site_bond_params_lists = generate_unique_placements(full_slab,cas)
print("Load from a pickle")

self.single_site_bond_params_lists = single_site_bond_params_lists
self.single_sites_lists = unique_site_lists
self.double_site_bond_params_lists = double_site_bond_params_lists
self.double_sites_lists = unique_site_pairs_lists
self.cas = my_dict['cas']
self.single_site_bond_params_lists = my_dict['single_site_bond_params_list']
self.single_sites_lists = my_dict['unique_site_lists']
self.double_site_bond_params_lists = my_dict['double_site_bond_params_lists']
self.double_sites_lists = my_dict['unique_site_pairs_lists']

def generate_mol_dict(self):
"""
Expand Down Expand Up @@ -352,12 +387,12 @@ def setup_adsorbates(self,initial_guess_finished=False):
xyz = os.path.join(self.path,"Adsorbates",adsname,str(prefix),str(prefix)+".xyz")
xyzs.append(xyz)
fwopt = optimize_firework(os.path.join(self.path,"Adsorbates",adsname,str(prefix),str(prefix)+"_init.xyz"),
self.software,"weakopt_"+str(prefix),
self.software,self.machine,"weakopt_"+str(prefix),
opt_method="MDMin",opt_kwargs={'dt': 0.05},socket=self.socket,software_kwargs=software_kwargs,
run_kwargs={"fmax" : 0.5, "steps" : 70},parents=[],constraints=constraints,
ignore_errors=True, metal=self.metal, facet=self.surface_type, target_site_num=target_site_num, priority=3)
fwopt2 = optimize_firework(os.path.join(self.path,"Adsorbates",adsname,str(prefix),"weakopt_"+str(prefix)+".xyz"),
self.software,str(prefix),
self.software,self.machine,str(prefix),
opt_method="QuasiNewton",socket=self.socket,software_kwargs=software_kwargs,
run_kwargs={"fmax" : self.fmaxopt, "steps" : 70},parents=[fwopt],constraints=constraints,
ignore_errors=True, metal=self.metal, facet=self.surface_type, target_site_num=target_site_num, priority=3, fmaxhard=self.fmaxopthard,
Expand All @@ -367,7 +402,7 @@ def setup_adsorbates(self,initial_guess_finished=False):
optfws2.append(fwopt2)

vib_obj_dict = {"software": self.software, "label": adsname, "software_kwargs": software_kwargs,
"constraints": vib_constraints}
"machine": self.machine, "constraints": ["freeze up to "+str(self.nslab)]}

cfw = collect_firework(xyzs,True,["vibrations_firework"],[vib_obj_dict],["vib.json"],[],parents=optfws2,label=adsname)
self.adsorbate_fw_dict[adsname] = optfws2
Expand Down Expand Up @@ -414,12 +449,12 @@ def setup_adsorbates(self,initial_guess_finished=False):
assert os.path.exists(init_path), init_path
xyzs.append(xyz)
fwopt = optimize_firework(init_path,
self.software,"weakopt_"+str(prefix),
self.software,self.machine,"weakopt_"+str(prefix),
opt_method="MDMin",opt_kwargs={'dt': 0.05},socket=self.socket,software_kwargs=software_kwargs,
run_kwargs={"fmax" : 0.5, "steps" : 70},parents=[],constraints=constraints,
ignore_errors=True, metal=self.metal, facet=self.surface_type, target_site_num=target_site_num, priority=3)
fwopt2 = optimize_firework(os.path.join(self.path,"Adsorbates",ad,str(prefix),"weakopt_"+str(prefix)+".xyz"),
self.software,str(prefix),
self.software,self.machine,str(prefix),
opt_method="QuasiNewton",socket=self.socket,software_kwargs=software_kwargs,
run_kwargs={"fmax" : self.fmaxopt, "steps" : 70},parents=[fwopt],constraints=constraints,
ignore_errors=True, metal=self.metal, facet=self.surface_type, target_site_num=target_site_num, priority=3)
Expand All @@ -428,7 +463,7 @@ def setup_adsorbates(self,initial_guess_finished=False):
optfws2.append(fwopt2)

vib_obj_dict = {"software": self.software, "label": ad, "software_kwargs": software_kwargs,
"constraints": vib_constraints}
"machine": self.machine, "constraints": ["freeze up to "+str(self.nslab)]}

cfw = collect_firework(xyzs,True,["vibrations_firework"],[vib_obj_dict],["vib.json"],[True,False],parents=optfws2,label=ad,allow_fizzled_parents=False)
self.adsorbate_fw_dict[ad] = optfws2
Expand All @@ -442,20 +477,20 @@ def setup_transition_states(self,adsorbates_finished=False):
Note the vibrational and IRC calculations are launched at the same time
"""
if self.software != "XTB":
opt_obj_dict = {"software":self.software,"label":"prefix","socket":self.socket,"software_kwargs":self.software_kwargs_TS,
opt_obj_dict = {"software":self.software,"label":"prefix","socket":self.socket,"software_kwargs":self.software_kwargs_TS,"machine": self.machine,
"run_kwargs": {"fmax" : self.fmaxopt, "steps" : 70},"constraints": ["freeze up to {}".format(self.freeze_ind)],"sella":True,"order":1,}
else:
opt_obj_dict = {"software":self.software,"label":"prefix","socket":self.socket,"software_kwargs":self.software_kwargs_TS,
opt_obj_dict = {"software":self.software,"label":"prefix","socket":self.socket,"software_kwargs":self.software_kwargs_TS,"machine": self.machine,
"run_kwargs": {"fmax" : 0.02, "steps" : 70},"constraints": ["freeze up to "+str(self.nslab)],"sella":True,"order":1,}
vib_obj_dict = {"software":self.software,"label":"prefix","socket":self.socket,"software_kwargs":self.software_kwargs,
"constraints": ["freeze up to "+str(self.nslab)]}
"machine": self.machine, "constraints": ["freeze up to "+str(self.nslab)]}
IRC_obj_dict = {"software":self.software,"label":"prefix","socket":self.socket,"software_kwargs":self.software_kwargs,
"run_kwargs": {"fmax" : self.fmaxirc, "steps" : 70},"constraints":["freeze up to "+str(self.nslab)]}
"machine": self.machine, "run_kwargs": {"fmax" : self.fmaxirc, "steps" : 70},"constraints":["freeze up to "+str(self.nslab)]}
for i,rxn in enumerate(self.rxns_dict):
ts_path = os.path.join(self.path,"TS"+str(i))
os.makedirs(ts_path)
ts_task = MolecularTSEstimate({"rxn": rxn,"ts_path": ts_path,"slab_path": self.slab_path,"adsorbates_path": os.path.join(self.path,"Adsorbates"),
"rxns_file": self.rxns_file,"path": self.path,"metal": self.metal,"facet": self.surface_type, "out_path": ts_path,
"rxns_file": self.rxns_file,"path": self.path,"metal": self.metal,"facet": self.surface_type, "out_path": ts_path, "machine": self.machine,
"spawn_jobs": True, "opt_obj_dict": opt_obj_dict, "vib_obj_dict": vib_obj_dict,
"IRC_obj_dict": IRC_obj_dict, "nprocs": 48, "name_to_adjlist_dict": self.name_to_adjlist_dict,
"gratom_to_molecule_atom_maps":{sm: {str(k):v for k,v in d.items()} for sm,d in self.gratom_to_molecule_atom_maps.items()},
Expand All @@ -475,12 +510,23 @@ def launch(self,single_job=False):
"""
Call appropriate rapidfire function
"""
if self.queue:
rapidfirequeue(self.launchpad,self.fworker,self.qadapter,njobs_queue=self.njobs_queue,nlaunches="infinite")
elif not self.queue and (self.num_jobs == 1 or single_job):
rapidfire(self.launchpad,self.fworker,nlaunches="infinite")
if self.machine == "alcf":
print("You are using alcf machine: if you want to restart, run pyn.reset(wfid='1')")
if self.queue:
rapidfirequeue(self.launchpad,self.fworker,self.qadapter,njobs_queue=self.njobs_queue,nlaunches="infinite")
elif not self.queue and (self.num_jobs == 1 or single_job):
rapidfire(self.launchpad,self.fworker,nlaunches="infinite")
else:
listfworkers = createFWorkers(self.num_jobs)
launch_multiprocess2(self.launchpad,listfworkers,"INFO",0,self.num_jobs,5)
else:
launch_multiprocess(self.launchpad,self.fworker,"INFO","infinite",self.num_jobs,5)
print("machine choice is not alcf: check your Fireworks Workflow id before restart Pynta")
if self.queue:
rapidfirequeue(self.launchpad,self.fworker,self.qadapter,njobs_queue=self.njobs_queue,nlaunches="infinite")
elif not self.queue and (self.num_jobs == 1 or single_job):
rapidfire(self.launchpad,self.fworker,nlaunches="infinite")
else:
launch_multiprocess(self.launchpad,self.fworker,"INFO","infinite",self.num_jobs,5)

def execute(self,generate_initial_ad_guesses=True,calculate_adsorbates=True,
calculate_transition_states=True,launch=True):
Expand Down Expand Up @@ -536,4 +582,79 @@ def execute_from_initial_ad_guesses(self):
self.launchpad.add_wf(wf)


self.launch()

#restart option: RHE + KSA
def reset(self,wfid):

id_number = int(wfid)
# Get the information of the workflow
wf1 = self.launchpad.get_wf_summary_dict(id_number, mode='more')

# Save the states of the workflow
wf_states = wf1['states']

# Save the launcher directories
wf_launchers = wf1['launch_dirs']

# In this bucle-for the tasks that are not completed, change the status
# We need the number of the task(id)

for task_name, task_state in wf_states.items():
if task_state != 'COMPLETED':
# Here we will change the map - node
task_id = int(task_name.split('--')[-1])
d = self.launchpad.get_fw_dict_by_id(task_id)
newd = deepcopy(d['spec'])

nameTask = newd['_tasks'][0]['_fw_name']

nameTasks = ['{{pynta.tasks.MolecularOptimizationTask}}',
'{{pynta.tasks.MolecularVibrationsTask}}',
'{{pynta.tasks.MolecularIRC}}']

#if nameTask in nameTasks:
# opt_method = newd['_tasks'][0]['opt_method'] if 'opt_method' in newd['_tasks'][0] else None

# if self.software == "Espresso" or self.software == "PWDFT":
# print(" Change: ", newd['_tasks'][0]['software_kwargs']['command'], end='')
# node = MapTaskToNodes()
# newcommand = node.getCommand()
# newd['_tasks'][0]['software_kwargs']['command'] = newcommand
# print(" by: ", newcommand)
# Here we work with reset the optimization task
# We load the trajectory file and save this structure in the
# tree of each uncompleted task

if 'opt' in task_name:
dirs = wf_launchers[task_name]
if dirs != []:
src = dirs[0]
print(' Name task {0:^20s} {1:^12s} {2}'.format(task_name, task_state, src))
file_traj = [name for name in os.listdir(src) if name.endswith(".traj")]
if len(file_traj) > 1:
file_traj = file_traj[0]
base, ext = os.path.splitext(file_traj)

with open (os.path.join(src, "FW.json")) as file:
filejson = json.load(file)

if opt_method == 'QuasiNewton':
namexyz = f'weakopt_{base}.xyz'
else:
namexyz = f'{base}_init.xyz'

atoms = read(os.path.join(src, file_traj), index=-1)
dst = os.path.dirname(filejson['spec']['_tasks'][0]['xyz'])

write(f'{src}/{namexyz}', atoms, format='xyz')

copyDataAndSave(src, dst, namexyz)
copyDataAndSave(src, dst, f'{base}.traj')

# Keep on with the task_state != 'COMPLETED'
self.launchpad.rerun_fw(task_id)
self.launchpad.update_spec([task_id], newd)


self.launch()
Loading
Loading