From 14ee869df5e5a2380c6af86c510019bbe251b6f6 Mon Sep 17 00:00:00 2001 From: sakim8048 Date: Wed, 3 Apr 2024 09:48:16 -0700 Subject: [PATCH 01/10] polaris mapping info added in polaris.py --- pynta/polaris.py | 44 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 pynta/polaris.py diff --git a/pynta/polaris.py b/pynta/polaris.py new file mode 100644 index 00000000..96a9e02d --- /dev/null +++ b/pynta/polaris.py @@ -0,0 +1,44 @@ +# Map the FW-task on polaris single-queue allocation + +""" Polaris Mapping """ + +import os +from fireworks.core.fworker import FWorker + +def createCommand(node, software): + binary = os.environ.get("EXE") + if binary is None: + binary = '/lus/eagle/projects/catalysis_aesp/raymundohe/soft/qe-7.1/build_cuda_nogpuawarempi_openmp/bin/pw.x' + + if software == 'Espresso': + command = 'mpiexec --hosts {} -n 4 --ppn 4 --depth=8 --cpu-bind depth --env OMP_NUM_THREADS=8 --env CUDA_VISIBLE_DEVICES=0,1,2,3 {} -in PREFIX.pwi > PREFIX.pwo'.format(node, binary) + elif software == 'PWDFT': + command = 'mpiexec --hosts {} -n 4 --ppn 4 --cpu-bind depth --env OMP_NUM_THREADS=1 --env CUDA_VISIBLE_DEVICES=0,1,2,3 {} PREFIX.nwxi > PREFIX.nwxo'.format(node, binary) +#add NWChem? + return command + + +def createFWorkers(num_jobs): + + nodes_list = [] + nodefile = os.environ.get("PBS_NODEFILE") + + if nodefile is None: + for i in range(num_jobs): + nodes_list.append(f'localhost_{i}') + else: + with open(nodefile) as f: + for line in f: + nodes_list.append(line.strip('\n')) + + fworkers = [] + + for i in range(num_jobs): + host = nodes_list[i] + print("host", host) + env = {'host': host} + + fworkers.append(FWorker(env=env)) + + return fworkers + From 9633af3114e6cb71f107299fef6d1966c6646ed9 Mon Sep 17 00:00:00 2001 From: sakim8048 Date: Wed, 3 Apr 2024 10:10:29 -0700 Subject: [PATCH 02/10] Add machine keyword to pynta object --- pynta/main.py | 131 +++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 113 insertions(+), 18 deletions(-) diff --git a/pynta/main.py b/pynta/main.py index 861d6145..429f9992 100644 --- a/pynta/main.py +++ b/pynta/main.py @@ -21,10 +21,18 @@ from fireworks.core.fworker import FWorker import fireworks.fw_config import logging +#restart RHE +import pickle +import time +from pynta.polaris import createFWorkers +from pynta.utils import copyDataAndSave +from pynta.multi_launcher import launch_multiprocess2 +import json class Pynta: def __init__(self,path,rxns_file,surface_type,metal,label,launchpad_path=None,fworker_path=None, vacuum=8.0,repeats=(3,3,4),slab_path=None,software="Espresso", pbc=(True,True,False),socket=False,queue=False,njobs_queue=0,a=None, + machine=None, software_kwargs={'kpts': (3, 3, 1), 'tprnfor': True, 'occupations': 'smearing', 'smearing': 'marzari-vanderbilt', 'degauss': 0.01, 'ecutwfc': 40, 'nosym': True, @@ -59,6 +67,7 @@ def __init__(self,path,rxns_file,surface_type,metal,label,launchpad_path=None,fw self.metal = metal self.adsorbate_fw_dict = dict() self.software_kwargs = software_kwargs + self.machine = machine #need to specify 'alcf' or other machine of choice if software.lower() == 'vasp': self.pbc = (True,True,True) @@ -135,7 +144,7 @@ def generate_slab(self,skip_launch=False): write(os.path.join(self.path,"slab_init.xyz"),slab) self.slab_path = os.path.join(self.path,"slab.xyz") if self.software != "XTB": - fwslab = optimize_firework(os.path.join(self.path,"slab_init.xyz"),self.software,"slab", + fwslab = optimize_firework(os.path.join(self.path,"slab_init.xyz"),self.software,self.machine,"slab", opt_method="BFGSLineSearch",socket=self.socket,software_kwargs=self.software_kwargs, run_kwargs={"fmax" : self.fmaxopt},out_path=os.path.join(self.path,"slab.xyz"),constraints=["freeze up to {}".format(self.freeze_ind)],priority=1000) wfslab = Workflow([fwslab], name=self.label+"_slab") @@ -352,12 +361,12 @@ def setup_adsorbates(self,initial_guess_finished=False): xyz = os.path.join(self.path,"Adsorbates",adsname,str(prefix),str(prefix)+".xyz") xyzs.append(xyz) fwopt = optimize_firework(os.path.join(self.path,"Adsorbates",adsname,str(prefix),str(prefix)+"_init.xyz"), - self.software,"weakopt_"+str(prefix), + self.software,self.machine,"weakopt_"+str(prefix), opt_method="MDMin",opt_kwargs={'dt': 0.05},socket=self.socket,software_kwargs=software_kwargs, run_kwargs={"fmax" : 0.5, "steps" : 70},parents=[],constraints=constraints, ignore_errors=True, metal=self.metal, facet=self.surface_type, target_site_num=target_site_num, priority=3) fwopt2 = optimize_firework(os.path.join(self.path,"Adsorbates",adsname,str(prefix),"weakopt_"+str(prefix)+".xyz"), - self.software,str(prefix), + self.software,self.machine,str(prefix), opt_method="QuasiNewton",socket=self.socket,software_kwargs=software_kwargs, run_kwargs={"fmax" : self.fmaxopt, "steps" : 70},parents=[fwopt],constraints=constraints, ignore_errors=True, metal=self.metal, facet=self.surface_type, target_site_num=target_site_num, priority=3, fmaxhard=self.fmaxopthard, @@ -367,7 +376,7 @@ def setup_adsorbates(self,initial_guess_finished=False): optfws2.append(fwopt2) vib_obj_dict = {"software": self.software, "label": adsname, "software_kwargs": software_kwargs, - "constraints": vib_constraints} + "machine": self.machine, "constraints": ["freeze up to "+str(self.nslab)]} cfw = collect_firework(xyzs,True,["vibrations_firework"],[vib_obj_dict],["vib.json"],[],parents=optfws2,label=adsname) self.adsorbate_fw_dict[adsname] = optfws2 @@ -414,12 +423,12 @@ def setup_adsorbates(self,initial_guess_finished=False): assert os.path.exists(init_path), init_path xyzs.append(xyz) fwopt = optimize_firework(init_path, - self.software,"weakopt_"+str(prefix), + self.software,self.machine,"weakopt_"+str(prefix), opt_method="MDMin",opt_kwargs={'dt': 0.05},socket=self.socket,software_kwargs=software_kwargs, run_kwargs={"fmax" : 0.5, "steps" : 70},parents=[],constraints=constraints, ignore_errors=True, metal=self.metal, facet=self.surface_type, target_site_num=target_site_num, priority=3) fwopt2 = optimize_firework(os.path.join(self.path,"Adsorbates",ad,str(prefix),"weakopt_"+str(prefix)+".xyz"), - self.software,str(prefix), + self.software,self.machine,str(prefix), opt_method="QuasiNewton",socket=self.socket,software_kwargs=software_kwargs, run_kwargs={"fmax" : self.fmaxopt, "steps" : 70},parents=[fwopt],constraints=constraints, ignore_errors=True, metal=self.metal, facet=self.surface_type, target_site_num=target_site_num, priority=3) @@ -428,7 +437,7 @@ def setup_adsorbates(self,initial_guess_finished=False): optfws2.append(fwopt2) vib_obj_dict = {"software": self.software, "label": ad, "software_kwargs": software_kwargs, - "constraints": vib_constraints} + "machine": self.machine, "constraints": ["freeze up to "+str(self.nslab)]} cfw = collect_firework(xyzs,True,["vibrations_firework"],[vib_obj_dict],["vib.json"],[True,False],parents=optfws2,label=ad,allow_fizzled_parents=False) self.adsorbate_fw_dict[ad] = optfws2 @@ -442,26 +451,26 @@ def setup_transition_states(self,adsorbates_finished=False): Note the vibrational and IRC calculations are launched at the same time """ if self.software != "XTB": - opt_obj_dict = {"software":self.software,"label":"prefix","socket":self.socket,"software_kwargs":self.software_kwargs_TS, + opt_obj_dict = {"software":self.software,"label":"prefix","socket":self.socket,"software_kwargs":self.software_kwargs_TS,"machine": self.machine, "run_kwargs": {"fmax" : self.fmaxopt, "steps" : 70},"constraints": ["freeze up to {}".format(self.freeze_ind)],"sella":True,"order":1,} else: - opt_obj_dict = {"software":self.software,"label":"prefix","socket":self.socket,"software_kwargs":self.software_kwargs_TS, + opt_obj_dict = {"software":self.software,"label":"prefix","socket":self.socket,"software_kwargs":self.software_kwargs_TS,"machine": self.machine, "run_kwargs": {"fmax" : 0.02, "steps" : 70},"constraints": ["freeze up to "+str(self.nslab)],"sella":True,"order":1,} vib_obj_dict = {"software":self.software,"label":"prefix","socket":self.socket,"software_kwargs":self.software_kwargs, - "constraints": ["freeze up to "+str(self.nslab)]} + "machine": self.machine, "constraints": ["freeze up to "+str(self.nslab)]} IRC_obj_dict = {"software":self.software,"label":"prefix","socket":self.socket,"software_kwargs":self.software_kwargs, - "run_kwargs": {"fmax" : self.fmaxirc, "steps" : 70},"constraints":["freeze up to "+str(self.nslab)]} + "machine": self.machine, "run_kwargs": {"fmax" : self.fmaxirc, "steps" : 70},"constraints":["freeze up to "+str(self.nslab)]} for i,rxn in enumerate(self.rxns_dict): ts_path = os.path.join(self.path,"TS"+str(i)) os.makedirs(ts_path) ts_task = MolecularTSEstimate({"rxn": rxn,"ts_path": ts_path,"slab_path": self.slab_path,"adsorbates_path": os.path.join(self.path,"Adsorbates"), - "rxns_file": self.rxns_file,"path": self.path,"metal": self.metal,"facet": self.surface_type, "out_path": ts_path, + "rxns_file": self.rxns_file,"path": self.path,"metal": self.metal,"facet": self.surface_type, "out_path": ts_path, "machine": self.machine, "spawn_jobs": True, "opt_obj_dict": opt_obj_dict, "vib_obj_dict": vib_obj_dict, "IRC_obj_dict": IRC_obj_dict, "nprocs": 48, "name_to_adjlist_dict": self.name_to_adjlist_dict, "gratom_to_molecule_atom_maps":{sm: {str(k):v for k,v in d.items()} for sm,d in self.gratom_to_molecule_atom_maps.items()}, "gratom_to_molecule_surface_atom_maps":{sm: {str(k):v for k,v in d.items()} for sm,d in self.gratom_to_molecule_surface_atom_maps.items()}, "nslab":self.nslab,"Eharmtol":self.Eharmtol,"Eharmfiltertol":self.Eharmfiltertol,"Ntsmin":self.Ntsmin, - "max_num_hfsp_opts":self.max_num_hfsp_opts}) + "max_num_hfsp_opts":self.max_num_hfsp_opts, "acat_tol": self.acat_tol, "emt_metal": self.emt_metal}) reactants = rxn["reactant_names"] products = rxn["product_names"] parents = [] @@ -475,12 +484,23 @@ def launch(self,single_job=False): """ Call appropriate rapidfire function """ - if self.queue: - rapidfirequeue(self.launchpad,self.fworker,self.qadapter,njobs_queue=self.njobs_queue,nlaunches="infinite") - elif not self.queue and (self.num_jobs == 1 or single_job): - rapidfire(self.launchpad,self.fworker,nlaunches="infinite") + if self.machine == "alcf": + print("You are using alcf machine: if you want to restart, run pyn.reset(wfid='1')") + if self.queue: + rapidfirequeue(self.launchpad,self.fworker,self.qadapter,njobs_queue=self.njobs_queue,nlaunches="infinite") + elif not self.queue and (self.num_jobs == 1 or single_job): + rapidfire(self.launchpad,self.fworker,nlaunches="infinite") + else: + listfworkers = createFWorkers(self.num_jobs) + launch_multiprocess2(self.launchpad,listfworkers,"INFO",0,self.num_jobs,5) else: - launch_multiprocess(self.launchpad,self.fworker,"INFO","infinite",self.num_jobs,5) + print("machine choice is not alcf: check your Fireworks Workflow id before restart Pynta") + if self.queue: + rapidfirequeue(self.launchpad,self.fworker,self.qadapter,njobs_queue=self.njobs_queue,nlaunches="infinite") + elif not self.queue and (self.num_jobs == 1 or single_job): + rapidfire(self.launchpad,self.fworker,nlaunches="infinite") + else: + launch_multiprocess(self.launchpad,self.fworker,"INFO","infinite",self.num_jobs,5) def execute(self,generate_initial_ad_guesses=True,calculate_adsorbates=True, calculate_transition_states=True,launch=True): @@ -536,4 +556,79 @@ def execute_from_initial_ad_guesses(self): self.launchpad.add_wf(wf) + self.launch() + +#restart option: RHE + KSA + def reset(self,wfid): + + id_number = int(wfid) + # Get the information of the workflow + wf1 = self.launchpad.get_wf_summary_dict(id_number, mode='more') + + # Save the states of the workflow + wf_states = wf1['states'] + + # Save the launcher directories + wf_launchers = wf1['launch_dirs'] + + # In this bucle-for the tasks that are not completed, change the status + # We need the number of the task(id) + + for task_name, task_state in wf_states.items(): + if task_state != 'COMPLETED': + # Here we will change the map - node + task_id = int(task_name.split('--')[-1]) + d = self.launchpad.get_fw_dict_by_id(task_id) + newd = deepcopy(d['spec']) + + nameTask = newd['_tasks'][0]['_fw_name'] + + nameTasks = ['{{pynta.tasks.MolecularOptimizationTask}}', + '{{pynta.tasks.MolecularVibrationsTask}}', + '{{pynta.tasks.MolecularIRC}}'] + + #if nameTask in nameTasks: + # opt_method = newd['_tasks'][0]['opt_method'] if 'opt_method' in newd['_tasks'][0] else None + + # if self.software == "Espresso" or self.software == "PWDFT": + # print(" Change: ", newd['_tasks'][0]['software_kwargs']['command'], end='') + # node = MapTaskToNodes() + # newcommand = node.getCommand() + # newd['_tasks'][0]['software_kwargs']['command'] = newcommand + # print(" by: ", newcommand) + # Here we work with reset the optimization task + # We load the trajectory file and save this structure in the + # tree of each uncompleted task + + if 'opt' in task_name: + dirs = wf_launchers[task_name] + if dirs != []: + src = dirs[0] + print(' Name task {0:^20s} {1:^12s} {2}'.format(task_name, task_state, src)) + file_traj = [name for name in os.listdir(src) if name.endswith(".traj")] + if len(file_traj) > 1: + file_traj = file_traj[0] + base, ext = os.path.splitext(file_traj) + + with open (os.path.join(src, "FW.json")) as file: + filejson = json.load(file) + + if opt_method == 'QuasiNewton': + namexyz = f'weakopt_{base}.xyz' + else: + namexyz = f'{base}_init.xyz' + + atoms = read(os.path.join(src, file_traj), index=-1) + dst = os.path.dirname(filejson['spec']['_tasks'][0]['xyz']) + + write(f'{src}/{namexyz}', atoms, format='xyz') + + copyDataAndSave(src, dst, namexyz) + copyDataAndSave(src, dst, f'{base}.traj') + + # Keep on with the task_state != 'COMPLETED' + self.launchpad.rerun_fw(task_id) + self.launchpad.update_spec([task_id], newd) + + self.launch() From 9555e8ead8a8f92863c258b4149a287a82fde708 Mon Sep 17 00:00:00 2001 From: sakim8048 Date: Wed, 3 Apr 2024 10:28:26 -0700 Subject: [PATCH 03/10] Pickle slab info --- pynta/main.py | 40 ++++++++++++++++++++++++++++++++-------- 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/pynta/main.py b/pynta/main.py index 429f9992..55267b64 100644 --- a/pynta/main.py +++ b/pynta/main.py @@ -160,19 +160,43 @@ def generate_slab(self,skip_launch=False): write(self.slab_path,slab) def analyze_slab(self): - full_slab = self.slab - cas = SlabAdsorptionSites(full_slab, self.surface_type,allow_6fold=False,composition_effect=False, + #pickle info RHE + if self.pickled == 0: + full_slab = self.slab + cas = SlabAdsorptionSites(full_slab, self.surface_type,allow_6fold=False,composition_effect=False, label_sites=True, surrogate_metal=self.metal) - self.cas = cas + self.cas = cas - unique_site_lists,unique_site_pairs_lists,single_site_bond_params_lists,double_site_bond_params_lists = generate_unique_placements(full_slab,cas) + unique_site_lists,unique_site_pairs_lists,single_site_bond_params_lists,double_site_bond_params_lists = generate_unique_placements(full_slab,cas) - self.single_site_bond_params_lists = single_site_bond_params_lists - self.single_sites_lists = unique_site_lists - self.double_site_bond_params_lists = double_site_bond_params_lists - self.double_sites_lists = unique_site_pairs_lists + self.single_site_bond_params_lists = single_site_bond_params_lists + self.single_sites_lists = unique_site_lists + self.double_site_bond_params_lists = double_site_bond_params_lists + self.double_sites_lists = unique_site_pairs_lists + + my_dictionary_to_pickled = {'cas' : cas, + 'single_site_bond_params_list': single_site_bond_params_lists, + 'single_sites_lists': unique_site_lists, + 'double_site_bond_params_lists': double_site_bond_params_lists, + 'double_sites_lists_full': unique_site_pairs_lists} + + print("Save as a pickle") + with open('analize_slab.pickle', 'wb') as myfile: + pickle.dump(my_dictionary_to_pickled, myfile) + + else : + with open('analize_slab.pickle', 'rb') as myfile: + my_dict = pickle.load(myfile) + + print("Load from a pickle") + + self.cas = my_dict['cas'] + self.single_site_bond_params_lists = my_dict['single_site_bond_params_list'] + self.single_sites_lists = my_dict['unique_site_lists'] + self.double_site_bond_params_lists = my_dict['double_site_bond_params_lists'] + self.double_sites_lists = my_dict['unique_site_pairs_lists'] def generate_mol_dict(self): """ From a1b89672e5ccde3127d73101da712587f9c224a7 Mon Sep 17 00:00:00 2001 From: sakim8048 Date: Wed, 3 Apr 2024 10:41:42 -0700 Subject: [PATCH 04/10] add machine keyword and polaris mapping --- pynta/tasks.py | 56 ++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 41 insertions(+), 15 deletions(-) diff --git a/pynta/tasks.py b/pynta/tasks.py index 9e5f3172..28deb29d 100644 --- a/pynta/tasks.py +++ b/pynta/tasks.py @@ -34,6 +34,8 @@ from contextlib import contextmanager from copy import deepcopy from pathlib import Path +#polaris node mapping +from pynta.polaris import createCommand class OptimizationTask(FiretaskBase): def run_task(self, fw_spec): @@ -56,10 +58,10 @@ class DoNothingTask(FiretaskBase): def run_task(self, fw_spec): return FWAction() -def optimize_firework(xyz,software,label,opt_method=None,sella=None,socket=False,order=0,software_kwargs={},opt_kwargs={}, +def optimize_firework(xyz,software,machine,label,opt_method=None,sella=None,socket=False,order=0,software_kwargs={},opt_kwargs={}, run_kwargs={},constraints=[],parents=[],out_path=None,time_limit_hrs=np.inf,fmaxhard=0.0,ignore_errors=False, target_site_num=None,metal=None,facet=None,priority=1,allow_fizzled_parents=False): - d = {"xyz" : xyz, "software" : software,"label" : label} + d = {"xyz" : xyz, "software" : software,"label" : label, "machine":machine} if opt_method: d["opt_method"] = opt_method if software_kwargs: d["software_kwargs"] = software_kwargs if opt_kwargs: d["opt_kwargs"] = opt_kwargs @@ -76,6 +78,7 @@ def optimize_firework(xyz,software,label,opt_method=None,sella=None,socket=False d["target_site_num"] = target_site_num d["metal"] = metal d["facet"] = facet + d["machine"] = machine t1 = MolecularOptimizationTask(d) directory = os.path.dirname(xyz) if out_path is None: out_path = os.path.join(directory,label+".xyz") @@ -85,12 +88,16 @@ def optimize_firework(xyz,software,label,opt_method=None,sella=None,socket=False @explicit_serialize class MolecularOptimizationTask(OptimizationTask): - required_params = ["software","label"] + required_params = ["software","label","machine"] optional_params = ["software_kwargs","opt_method", "opt_kwargs","run_kwargs", "constraints","sella","order","socket","time_limit_hrs","fmaxhard","ignore_errors","target_site_num","metal","facet"] def run_task(self, fw_spec): errors = [] software_kwargs = deepcopy(self["software_kwargs"]) if "software_kwargs" in self.keys() else dict() + # Mapping nodes to fworkers + if self["machine"] == "alcf" and (self["software"] == "Espresso" or self["software"] == "PWDFT"): + software_kwargs["command"] = createCommand(fw_spec["_fw_env"]["host"], self["software"]) + socket = self["socket"] if "socket" in self.keys() else False if socket: unixsocket = "ase_"+self["software"].lower()+"_"+self["label"]+"_"+self["xyz"].replace("/","_").replace(".","_") @@ -297,12 +304,14 @@ def run_task(self, fw_spec): @explicit_serialize class MolecularOptimizationFailTask(OptimizationTask): - required_params = ["software","label"] + required_params = ["software","label","machine"] optional_params = ["software_kwargs","opt_method", "opt_kwargs","run_kwargs"] def run_task(self, fw_spec): - print(fw_spec) software_kwargs = deepcopy(self["software_kwargs"]) if "software_kwargs" in self.keys() else dict() + # Mapping nodes to fworkers + if self["machine"] == "alcf" and (self["software"] == "Espresso" or self["software"] == "PWDFT"): + software_kwargs["command"] = createCommand(fw_spec["_fw_env"]["host"], self["software"]) software = name_to_ase_software(self["software"])(**software_kwargs) opt_kwargs = deepcopy(self["opt_kwargs"]) if "opt_kwargs" in self.keys() else dict() @@ -315,7 +324,7 @@ def run_task(self, fw_spec): sp.calc = software opt = opt_method(sp,trajectory=label+".traj") - opt.run(fmax=0.02,steps=2) + opt.run(fmax=0.50,steps=2) if not opt.converged(): fw = restart_opt_firework(self,fw_spec["_tasks"]) @@ -325,10 +334,12 @@ def run_task(self, fw_spec): return FWAction() -def energy_firework(xyz,software,label,software_kwargs={},parents=[],out_path=None,ignore_errors=False): - d = {"xyz" : xyz, "software" : software, "label" : label} +def energy_firework(xyz,software,machine,label,software_kwargs={},parents=[],out_path=None,ignore_errors=False): + d = {"xyz" : xyz, "software" : software, "label" : label, "machine":machine} + if software_kwargs: d["software_kwargs"] = software_kwargs d["ignore_errors"] = ignore_errors + d["machine"] = machine t1 = MolecularEnergyTask(d) directory = os.path.dirname(xyz) if out_path is None: out_path = os.path.join(directory,label+"_energy.json") @@ -337,7 +348,7 @@ def energy_firework(xyz,software,label,software_kwargs={},parents=[],out_path=No @explicit_serialize class MolecularEnergyTask(EnergyTask): - required_params = ["software","label"] + required_params = ["software","label","machine"] optional_params = ["software_kwargs","energy_kwargs","ignore_errors"] def run_task(self, fw_spec): xyz = self['xyz'] @@ -346,6 +357,9 @@ def run_task(self, fw_spec): software_kwargs = deepcopy(self["software_kwargs"]) if "software_kwargs" in self.keys() else dict() energy_kwargs = deepcopy(self["energy_kwargs"]) if "energy_kwargs" in self.keys() else dict() ignore_errors = deepcopy(self["ignore_errors"]) if "ignore_errors" in self.keys() else False + # Mapping nodes to fworkers + if self["machine"] == "alcf" and (self["software"] == "Espresso" or self["software"] == "PWDFT"): + software_kwargs["command"] = createCommand(fw_spec["_fw_env"]["host"], self["software"]) try: sp = read(xyz) @@ -361,11 +375,14 @@ def run_task(self, fw_spec): return FWAction() -def vibrations_firework(xyz,software,label,software_kwargs={},parents=[],out_path=None,constraints=[],socket=False,ignore_errors=False): - d = {"xyz" : xyz, "software" : software, "label" : label, "socket": socket} +def vibrations_firework(xyz,software,machine,label,software_kwargs={},parents=[],out_path=None,constraints=[],socket=False,ignore_errors=False): + d = {"xyz" : xyz, "software" : software, "label" : label, "socket": socket, "machine":machine} + if software_kwargs: d["software_kwargs"] = software_kwargs if constraints: d["constraints"] = constraints d["ignore_errors"] = ignore_errors + d["machine"] = machine + t1 = MolecularVibrationsTask(d) directory = os.path.dirname(xyz) if out_path is None: @@ -381,13 +398,17 @@ def vibrations_firework(xyz,software,label,software_kwargs={},parents=[],out_pat @explicit_serialize class MolecularVibrationsTask(VibrationTask): - required_params = ["software","label"] + required_params = ["software","label","machine"] optional_params = ["software_kwargs","constraints","ignore_errors","socket"] def run_task(self, fw_spec): indices = None xyz = self['xyz'] label = self["label"] + machine = self["machine"] software_kwargs = deepcopy(self["software_kwargs"]) if "software_kwargs" in self.keys() else dict() + # Mapping nodes to fworkers + if self["machine"] == "alcf" and (self["software"] == "Espresso" or self["software"] == "PWDFT"): + software_kwargs["command"] = createCommand(fw_spec["_fw_env"]["host"], self["software"]) software = name_to_ase_software(self["software"])(**software_kwargs) ignore_errors = deepcopy(self["ignore_errors"]) if "ignore_errors" in self.keys() else False socket = self["socket"] if "socket" in self.keys() else False @@ -726,10 +747,12 @@ def run_task(self, fw_spec): else: return FWAction() -def IRC_firework(xyz,label,out_path=None,spawn_jobs=False,software=None, +def IRC_firework(xyz,label,machine,out_path=None,spawn_jobs=False,software=None, socket=False,software_kwargs={},opt_kwargs={},run_kwargs={},constraints=[],parents=[],ignore_errors=False,forward=True): + if out_path is None: out_path = os.path.join(directory,label+"_irc.traj") - t1 = MolecularIRC(xyz=xyz,label=label,software=software, + + t1 = MolecularIRC(xyz=xyz,label=label,software=software,machine=machine, socket=socket,software_kwargs=software_kwargs,opt_kwargs=opt_kwargs,run_kwargs=run_kwargs, constraints=constraints,ignore_errors=ignore_errors,forward=forward) t2 = FileTransferTask({'files': [{'src': label+'_irc.traj', 'dest': out_path}], 'mode': 'copy', 'ignore_errors' : ignore_errors}) @@ -738,12 +761,15 @@ def IRC_firework(xyz,label,out_path=None,spawn_jobs=False,software=None, @explicit_serialize class MolecularIRC(FiretaskBase): - required_params = ["xyz","label"] + required_params = ["xyz","label","machine"] optional_params = ["software","socket", "software_kwargs", "opt_kwargs", "run_kwargs", "constraints", "ignore_errors", "forward"] def run_task(self, fw_spec): errors = [] software_kwargs = deepcopy(self["software_kwargs"]) if "software_kwargs" in self.keys() else dict() + # Mapping nodes to fworkers + if self["machine"] == "alcf" and (self["software"] == "Espresso" or self["software"] == "PWDFT"): + software_kwargs["command"] = createCommand(fw_spec["_fw_env"]["host"], self["software"]) socket = self["socket"] if "socket" in self.keys() else False if socket: unixsocket = "ase_"+self["software"].lower()+"_"+self["label"]+"_"+self["xyz"].replace("/","_").replace(".","_") From 598dc5b8419a72163196cecb22bad089528678dd Mon Sep 17 00:00:00 2001 From: rayhe88 Date: Wed, 3 Apr 2024 11:25:31 -0700 Subject: [PATCH 05/10] Contibutor acknowledgement: Restart design contribution by RHE --- pynta/polaris.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pynta/polaris.py b/pynta/polaris.py index 96a9e02d..492ee27d 100644 --- a/pynta/polaris.py +++ b/pynta/polaris.py @@ -1,4 +1,5 @@ # Map the FW-task on polaris single-queue allocation +# Polaris Mapping designed by RHE """ Polaris Mapping """ From 2ceead73e02aa977aa574497d24525d4b027be08 Mon Sep 17 00:00:00 2001 From: rayhe88 Date: Wed, 3 Apr 2024 11:53:56 -0700 Subject: [PATCH 06/10] copyDataAndSave added in util. Function called in pyn.reset() --- pynta/utils.py | 51 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/pynta/utils.py b/pynta/utils.py index 56264bfd..053a9cda 100644 --- a/pynta/utils.py +++ b/pynta/utils.py @@ -256,3 +256,54 @@ def construct_constraint(d): constructor = getattr(ase.constraints,constraint_dict["type"]) del constraint_dict["type"] return constructor(**constraint_dict) + +def copyDataAndSave(origin, destination, file): + ''' + Function to copy a file from a origin subdirectory to a destination subdirectory. + If the file already exists, a "_BK" string is added to the name that it corresponds + to a backup copy. This function needs another function 'check_copy', which exchanges + the files, so that the most current file always does not have the _BK string, + in this way the files are not affected when rerun the workflow. + + Parameters + ___________ + origin: str + destination: str + file : str + + ''' + src = os.path.join(origin, file) + dst = os.path.join(destination, file) + + count = 1 + + while os.path.exists(dst): + base, ext = os.path.splitext(file) + new_name = f'{base}_BK{count}{ext}' + dst = os.path.join(destination, new_name) + count += 1 + + shutil.copy(src, dst) + + check_copy(file, destination) + +def check_copy(file2cpy, subdir): + files = os.listdir(subdir) + + file_maxval = None + number_maxval = -1 + + for file in files: + if file.startswith(file2cpy) and "_BK" in file: + num = int(file.split("_BK")[1].split(".")[0]) + if num > number_maxval: + number_maxval = num + file_maxval = file + + if file_maxval: + file_origin = os.path.join(subdir, file2cpy) + file_bk = os.path.join(subdir, file_maxval) + + os.rename(file_origin, file_origin + "_tmp") + os.rename(file_bk, file_origin) + os.rename(file_origin + "_tmp", file_bk) \ No newline at end of file From 26a2cb642d36f75d5ef86f73f94976dac159fa8d Mon Sep 17 00:00:00 2001 From: sakim8048 Date: Wed, 3 Apr 2024 12:00:17 -0700 Subject: [PATCH 07/10] import module --- pynta/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pynta/utils.py b/pynta/utils.py index 053a9cda..d49b30a0 100644 --- a/pynta/utils.py +++ b/pynta/utils.py @@ -1,6 +1,7 @@ import shutil import os import ase +import sys from ase.utils.structure_comparator import SymmetryEquivalenceCheck from ase.io import write, read import ase.constraints From 8ef0994640ebf510c8f8557f09314c7e71a57050 Mon Sep 17 00:00:00 2001 From: rayhe88 Date: Wed, 3 Apr 2024 12:04:53 -0700 Subject: [PATCH 08/10] multi_launcher.py for parallel environment. Required for reset() --- pynta/multi_launcher.py | 297 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 297 insertions(+) create mode 100644 pynta/multi_launcher.py diff --git a/pynta/multi_launcher.py b/pynta/multi_launcher.py new file mode 100644 index 00000000..3e7ceefe --- /dev/null +++ b/pynta/multi_launcher.py @@ -0,0 +1,297 @@ +""" +This module contains methods for launching several Rockets in a parallel environment +""" + +import os +import threading +import time +from multiprocessing import Manager, Process + +from fireworks.core.rocket_launcher import rapidfire +from fireworks.core.fworker import FWorker + +from fireworks.fw_config import ( + DS_PASSWORD, + PING_TIME_SECS, + RAPIDFIRE_SLEEP_SECS, + FWData, +) +from fireworks.utilities.fw_utilities import ( + DataServer, + get_fw_logger, + get_my_host, + log_multi, +) + +__author__ = "Xiaohui Qu, Anubhav Jain" +__copyright__ = "Copyright 2013, The Material Project & The Electrolyte Genome Project" +__maintainer__ = "Xiaohui Qu" +__email__ = "xqu@lbl.gov" +__date__ = "Aug 19, 2013" + + +def ping_multilaunch(port, stop_event): + """ + A single manager to ping all launches during multiprocess launches + + Args: + port (int): Listening port number of the DataServer + stop_event (Thread.Event): stop event + """ + ds = DataServer(address=("127.0.0.1", port), authkey=DS_PASSWORD) + ds.connect() + fd = FWData() + + lp = ds.LaunchPad() + while not stop_event.is_set(): + for pid, lid in fd.Running_IDs.items(): + if lid: + try: + os.kill(pid, 0) # throws OSError if the process is dead + lp.ping_launch(lid) + except OSError: # means this process is dead! + fd.Running_IDs[pid] = None + + stop_event.wait(PING_TIME_SECS) + + +def rapidfire_process( + fworker, nlaunches, sleep, loglvl, port, node_list, sub_nproc, timeout, running_ids_dict, local_redirect +): + """ + Initializes shared data with multiprocessing parameters and starts a rapidfire. + + Args: + fworker (FWorker): object + nlaunches (int): 0 means 'until completion', -1 or "infinite" means to loop forever + sleep (int): secs to sleep between rapidfire loop iterations + loglvl (str): level at which to output logs to stdout + port (int): Listening port number of the shared object manage + password (str): security password to access the server + node_list ([str]): computer node list + sub_nproc (int): number of processors of the sub job + timeout (int): # of seconds after which to stop the rapidfire process + local_redirect (bool): redirect standard input and output to local file + """ + ds = DataServer(address=("127.0.0.1", port), authkey=DS_PASSWORD) + ds.connect() + launchpad = ds.LaunchPad() + FWData().DATASERVER = ds + FWData().MULTIPROCESSING = True + FWData().NODE_LIST = node_list + FWData().SUB_NPROCS = sub_nproc + FWData().Running_IDs = running_ids_dict + sleep_time = sleep if sleep else RAPIDFIRE_SLEEP_SECS + l_dir = launchpad.get_logdir() if launchpad else None + l_logger = get_fw_logger( + "rocket.launcher", l_dir=l_dir, stream_level=loglvl) + # Record the start time for timeout update + process_start_time = time.time() + rapidfire( + launchpad, + fworker=fworker, + m_dir=None, + nlaunches=nlaunches, + max_loops=-1, + sleep_time=sleep, + strm_lvl=loglvl, + timeout=timeout, + local_redirect=local_redirect, + ) + while nlaunches == 0: + time.sleep(1.5) # wait for LaunchPad to be initialized + launch_ids = FWData().Running_IDs.values() + live_ids = list(set(launch_ids) - {None}) + if len(live_ids) > 0: + # Some other sub jobs are still running + + # Update the timeout according to the already elapsed time + time_elapsed = time.time() - process_start_time + timeout_left = timeout - time_elapsed + + # Stand down if there is less than 3% of the time left + if timeout_left < 0.03 * timeout: + log_multi( + l_logger, + ( + f"Remaining time {timeout_left}s is less than 3% of the original timeout " + f"{timeout}s - standing down" + ), + ) + break + + log_multi( + l_logger, f"Sleeping for {sleep_time} secs before resubmit sub job") + time.sleep(sleep_time) + log_multi(l_logger, "Resubmit sub job") + rapidfire( + launchpad, + fworker=fworker, + m_dir=None, + nlaunches=nlaunches, + max_loops=-1, + sleep_time=sleep, + strm_lvl=loglvl, + timeout=timeout, + local_redirect=local_redirect, + ) + else: + break + log_multi(l_logger, "Sub job finished") + + +def start_rockets( + lfworker, + nlaunches, + sleep, + loglvl, + port, + node_lists, + sub_nproc_list, + timeout=None, + running_ids_dict=None, + local_redirect=False, +): + """ + Create each sub job and start a rocket launch in each one + + Args: + fworker (FWorker): object + nlaunches (int): 0 means 'until completion', -1 or "infinite" means to loop forever + sleep (int): secs to sleep between rapidfire loop iterations + loglvl (str): level at which to output logs to stdout + port (int): Listening port number + node_lists ([str]): computer node list + sub_nproc_list ([int]): list of the number of the process of sub jobs + timeout (int): # of seconds after which to stop the rapidfire process + running_ids_dict (dict): Shared dict between process to record IDs + local_redirect (bool): redirect standard input and output to local file + Returns: + ([multiprocessing.Process]) all the created processes + """ + processes = [ + Process( + target=rapidfire_process, + args=(ifworker, nlaunches, sleep, loglvl, port, nl, + sub_nproc, timeout, running_ids_dict, local_redirect), + ) + for nl, sub_nproc, ifworker in zip(node_lists, sub_nproc_list, lfworker) + ] + + for p in processes: + p.start() + time.sleep(0.15) + + return processes + + +def split_node_lists(num_jobs, total_node_list=None, ppn=24): + """ + Parse node list and processor list from nodefile contents + + Args: + num_jobs (int): number of sub jobs + total_node_list (list of str): the node list of the whole large job + ppn (int): number of procesors per node + + Returns: + (([int],[int])) the node list and processor list for each job + """ + if total_node_list: + orig_node_list = sorted(list(set(total_node_list))) + nnodes = len(orig_node_list) + if nnodes % num_jobs != 0: + raise ValueError( + f"can't allocate nodes, {nnodes} can't be divided by {num_jobs}") + sub_nnodes = nnodes // num_jobs + sub_nproc_list = [sub_nnodes * ppn] * num_jobs + node_lists = [orig_node_list[i: i + sub_nnodes] + for i in range(0, nnodes, sub_nnodes)] + else: + sub_nproc_list = [ppn] * num_jobs + node_lists = [None] * num_jobs + return node_lists, sub_nproc_list + + +# TODO: why is loglvl a required parameter??? Also nlaunches and sleep_time could have a sensible default?? +def launch_multiprocess2( + launchpad, + lfworker, + loglvl, + nlaunches, + num_jobs, + sleep_time, + total_node_list=None, + ppn=1, + timeout=None, + exclude_current_node=False, + local_redirect=False, +): + """ + Launch the jobs in the job packing mode. + + Args: + launchpad (LaunchPad) + fworker (FWorker) + loglvl (str): level at which to output logs + nlaunches (int): 0 means 'until completion', -1 or "infinite" means to loop forever + num_jobs(int): number of sub jobs + sleep_time (int): secs to sleep between rapidfire loop iterations + total_node_list ([str]): contents of NODEFILE (doesn't affect execution) + ppn (int): processors per node (doesn't affect execution) + timeout (int): # of seconds after which to stop the rapidfire process + exclude_current_node: Don't use the script launching node as a compute node + local_redirect (bool): redirect standard input and output to local file + """ + print(" >>> Our multilauncher") + # parse node file contents + if exclude_current_node: + host = get_my_host() + l_dir = launchpad.get_logdir() if launchpad else None + l_logger = get_fw_logger( + "rocket.launcher", l_dir=l_dir, stream_level=loglvl) + if host in total_node_list: + log_multi( + l_logger, f'Remove the current node "{host}" from compute node') + total_node_list.remove(host) + else: + log_multi( + l_logger, "The current node is not in the node list, keep the node list as is") + node_lists, sub_nproc_list = split_node_lists( + num_jobs, total_node_list, ppn) + + # create shared dataserver + ds = DataServer.setup(launchpad) + port = ds.address[1] + + manager = Manager() + running_ids_dict = manager.dict() + # launch rapidfire processes + processes = start_rockets( + lfworker, + nlaunches, + sleep_time, + loglvl, + port, + node_lists, + sub_nproc_list, + timeout=timeout, + running_ids_dict=running_ids_dict, + local_redirect=local_redirect, + ) + FWData().Running_IDs = running_ids_dict + + # start pinging service + ping_stop = threading.Event() + ping_thread = threading.Thread( + target=ping_multilaunch, args=(port, ping_stop)) + ping_thread.start() + + # wait for completion + + for p in processes: + p.join() + + ping_stop.set() + ping_thread.join() + ds.shutdown() From f61a7a8399968353181bc9919530c82390ba2755 Mon Sep 17 00:00:00 2001 From: sakim8048 Date: Wed, 3 Apr 2024 12:17:05 -0700 Subject: [PATCH 09/10] add pickle to pynta object --- pynta/main.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pynta/main.py b/pynta/main.py index 55267b64..a233f69d 100644 --- a/pynta/main.py +++ b/pynta/main.py @@ -43,9 +43,11 @@ def __init__(self,path,rxns_file,surface_type,metal,label,launchpad_path=None,fw TS_opt_software_kwargs=None, lattice_opt_software_kwargs={'kpts': (25,25,25), 'ecutwfc': 70, 'degauss':0.02, 'mixing_mode': 'plain'}, reset_launchpad=False,queue_adapter_path=None,num_jobs=25,max_num_hfsp_opts=None,#max_num_hfsp_opts is mostly for fast testing - Eharmtol=3.0,Eharmfiltertol=30.0,Ntsmin=5,frozen_layers=2,fmaxopt=0.05,fmaxirc=0.1,fmaxopthard=0.05): + Eharmtol=3.0,Eharmfiltertol=30.0,Ntsmin=5,frozen_layers=2,fmaxopt=0.05,fmaxirc=0.1,fmaxopthard=0.05,pickled=0): self.surface_type = surface_type + self.pickled = pickled + if launchpad_path: launchpad = LaunchPad.from_file(launchpad_path) else: From b3016d2e36f629fcf89df00532f00391d9adae62 Mon Sep 17 00:00:00 2001 From: sakim8048 Date: Wed, 3 Apr 2024 12:23:14 -0700 Subject: [PATCH 10/10] delete unnecessary variables --- pynta/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pynta/main.py b/pynta/main.py index a233f69d..dbc54bbc 100644 --- a/pynta/main.py +++ b/pynta/main.py @@ -496,7 +496,7 @@ def setup_transition_states(self,adsorbates_finished=False): "gratom_to_molecule_atom_maps":{sm: {str(k):v for k,v in d.items()} for sm,d in self.gratom_to_molecule_atom_maps.items()}, "gratom_to_molecule_surface_atom_maps":{sm: {str(k):v for k,v in d.items()} for sm,d in self.gratom_to_molecule_surface_atom_maps.items()}, "nslab":self.nslab,"Eharmtol":self.Eharmtol,"Eharmfiltertol":self.Eharmfiltertol,"Ntsmin":self.Ntsmin, - "max_num_hfsp_opts":self.max_num_hfsp_opts, "acat_tol": self.acat_tol, "emt_metal": self.emt_metal}) + "max_num_hfsp_opts":self.max_num_hfsp_opts}) reactants = rxn["reactant_names"] products = rxn["product_names"] parents = []