Skip to content

Commit

Permalink
Merge pull request #187 from TheoChem-VU/implement-xtbjob-class
Browse files Browse the repository at this point in the history
Implement xtbjob class
  • Loading branch information
YHordijk authored May 7, 2024
2 parents 67fbf95 + c954a03 commit 3e4649d
Show file tree
Hide file tree
Showing 9 changed files with 679 additions and 20 deletions.
1 change: 1 addition & 0 deletions src/tcutility/job/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
from .adf import ADFJob, ADFFragmentJob # noqa: F401
from .dftb import DFTBJob # noqa: F401
from .crest import CRESTJob, QCGJob # noqa: F401
from .xtb import XTBJob # noqa: F401
from .orca import ORCAJob # noqa: F401
from .nmr import NMRJob # noqa: F401
1 change: 0 additions & 1 deletion src/tcutility/job/crest.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ def _setup_job(self):
'coords.xyz',
f'-xnam "{self.xtb_path}"',
'--noreftopo',
'-rthr 1'
f'-c {self._charge}',
f'-u {self._spinpol}',
f'-tnmd {self._temp}',
Expand Down
25 changes: 23 additions & 2 deletions src/tcutility/job/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,22 @@
j = os.path.join


def _python_path():
'''
Sometimes it is necessary to have the Python path as some environments don't have its path.
This function attempts to find the Python path and returns it.
'''
python = sp.run('which python', shell=True, capture_output=True).stdout.decode().strip()

if python == '' or not os.path.exists(python):
python = sp.run('which python3', shell=True, capture_output=True).stdout.decode().strip()

# we default to the python executable
if python == '' or not os.path.exists(python):
python = 'python'

return python

class Job:
'''This is the base Job class used to build more advanced classes such as :class:`AMSJob <tcutility.job.ams.AMSJob>` and :class:`ORCAJob <tcutility.job.orca.ORCAJob>`.
The base class contains an empty :class:`Result <tcutility.results.result.Result>` object that holds the settings.
Expand Down Expand Up @@ -130,7 +146,8 @@ def run(self):
self.add_postamble(f'rm -r {self.workdir}')

for postscript in self._postscripts:
self._postambles.append(f'python {postscript[0]} {" ".join(postscript[1])}')
self._postambles.append(f'{_python_path()} {postscript[0]} {" ".join(postscript[1])}')

# setup the job and check if it was successfull
setup_success = self._setup_job()

Expand Down Expand Up @@ -199,8 +216,12 @@ def add_postscript(self, script, *args):

def dependency(self, otherjob: 'Job'):
'''
Set a dependency between this job and another job. This means that this job will run after the other job is finished running succesfully.
Set a dependency between this job and otherjob.
This means that this job will run after the other job is finished running succesfully.
'''
if otherjob.can_skip:
return

if hasattr(otherjob, 'slurm_job_id'):
self.sbatch(dependency=f'afterok:{otherjob.slurm_job_id}')
self.sbatch(kill_on_invalid_dep='Yes')
Expand Down
6 changes: 6 additions & 0 deletions src/tcutility/job/postscripts/write_converged_geoms.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,9 @@
amv.write('\n')

molecule.save(mol, f'converged_mols/{i}.xyz', comment=f'E = {energy * constants.HA2KCALMOL:.6f} kcal/mol')

if energy == max(energies):
molecule.save(mol, 'converged_mols/highest_energy.xyz', comment=f'E = {energy * constants.HA2KCALMOL:.6f} kcal/mol')

if energy == min(energies):
molecule.save(mol, 'converged_mols/lowest_energy.xyz', comment=f'E = {energy * constants.HA2KCALMOL:.6f} kcal/mol')
158 changes: 158 additions & 0 deletions src/tcutility/job/xtb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
from tcutility.job.generic import Job
from tcutility import spell_check
import os
from typing import Union


j = os.path.join


class XTBJob(Job):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.xtb_path = 'xtb'
self._options = ['coords.xyz']
self._scan = {}

def _setup_job(self):
os.makedirs(self.workdir, exist_ok=True)

self._molecule.write(j(self.workdir, 'coords.xyz'))

if self._scan:
with open(j(self.workdir, 'scan.inp'), 'w+') as scaninp:
for key, value in self._scan.items():
scaninp.write(f'${key}\n')
for line in value:
scaninp.write(f' {line}\n')
scaninp.write('$end\n')

options = ' '.join(self._options)
with open(self.runfile_path, 'w+') as runf:
runf.write('#!/bin/sh\n\n') # the shebang is not written by default by ADF
runf.write('\n'.join(self._preambles) + '\n\n')
runf.write(f'{self.xtb_path} {options}\n')
runf.write('\n'.join(self._postambles))

return True

def model(self, method: Union[str, int]):
'''
Set the method used by XTB. This includes GFN0-xTB, GFN1-xTB, GFN2-xTB and GFNFF.
Args:
method: the method to use. Can be specified by its full name, e.g. 'GFN2-xTB', is the same as 'GFN2' or simply 2.
'''
if isinstance(method, int):
if not 0 >= method >= 2:
raise ValueError(f'GFN{method}-xTB does not exist. Please choose one of GFN[0, 1, 2]-xTB.')
self._options.append(f'--gfn {method}')
return

if method.lower() in ['gfn0', 'gfn0-xtb']:
self._options.append('--gfn 0')
return

if method.lower() in ['gfn1', 'gfn1-xtb']:
self._options.append('--gfn 1')
return

if method.lower() in ['gfn2', 'gfn2-xtb']:
self._options.append('--gfn 2')
return

if method.lower() in ['gfnff', 'ff']:
self._options.append('--gfnff')
return

raise ValueError(f'Did not recognize the method {method} for XTB.')

def solvent(self, name: str = None, model: str = 'alpb'):
'''
Model solvation using the ALPB or GBSA model.
Args:
name: the name of the solvent you want to use. Must be ``None``, ``Acetone``, ``Acetonitrile``, ``CHCl3``, ``CS2``, ``DMSO``, ``Ether``, ``H2O``, ``Methanol``, ``THF`` or ``Toluene``.
grid_size: the size of the grid used to construct the solvent accessible surface. Must be ``230``, ``974``, ``2030`` or ``5810``.
'''
spell_check.check(model, ['alpb', 'gbsa'], ignore_case=True)
self._options.append(f'--{model} {name}')

def spin_polarization(self, val: int):
'''
Set the spin-polarization of the system.
'''
self._options.append(f'-u {val}')

def multiplicity(self, val: int):
'''
Set the multiplicity of the system. If the value is not one the calculation will also be unrestricted.
We use the following values:
1) singlet
2) doublet
3) triplet
4) ...
The multiplicity is equal to 2*S+1 for spin-polarization of S.
'''
self._options.append(f'-u {(val - 1)//2}')

def charge(self, val: int):
'''
Set the charge of the system.
'''
self._options.append(f'-c {val}')

def vibrations(self, enable: bool = True):
self._options.append('--hess')

def optimization(self, quality: str = 'Normal', calculate_hess: bool = True):
'''
Do a geometry optimization and calculate the normal modes of the input structure.
Args:
quality: the convergence criteria of the optimization.
See https://xtb-docs.readthedocs.io/en/latest/optimization.html#optimization-levels.
calculate_hess: whether to calculate the Hessian and do a normal mode analysis after optimization.
'''
spell_check.check(quality, ['crude', 'sloppy', 'loose', 'lax', 'normal', 'tight', 'vtight', 'extreme'], ignore_case=True)

if calculate_hess:
self._options.append(f'--ohess {quality}')
else:
self._options.append(f'--opt {quality}')

def PESScan(self, distances: list = [], angles: list = [], dihedrals: list = [], npoints: int = 20, quality: str = 'Normal', mode: str = 'concerted'):
'''
Set the task of the job to potential energy surface scan (PESScan).
Args:
distances: sequence of tuples or lists containing ``[atom_index1, atom_index2, start, end]``.
Atom indices start at 1. Distances are given in |angstrom|.
angles: sequence of tuples or lists containing ``[atom_index1, atom_index2, atom_index3, start, end]``.
Atom indices start at 1. Angles are given in degrees
dihedrals: sequence of tuples or lists containing ``[atom_index1, atom_index2, atom_index3, atom_index4, start, end]``.
Atom indices start at 1. Angles are given in degrees
npoints: the number of PES points to optimize.
.. note::
Currently we only support generating settings for 1-dimensional PESScans.
We will add support for N-dimensional PESScans later.
'''
self.optimization(quality=quality, calculate_hess=False)
self._options.append('--input scan.inp')

self._scan.setdefault('scan', [f'mode={mode}'])
# self._scan.setdefault('opt', [f'maxcycles=50'])
for i, d in enumerate(distances):
self._scan['scan'].append(f'distance: {d[0]},{d[1]},{d[2]}; {d[2]}, {d[3]}, {npoints}')
for i, a in enumerate(angles, start=i):
self._scan['scan'].append(f'angle: {a[0]},{a[1]},{a[2]},{a[3]}; {a[3]}, {a[4]}, {npoints}')
for i, a in enumerate(dihedrals, start=i):
self._scan['scan'].append(f'dihedral: {a[0]},{a[1]},{a[2]},{a[3]},{a[4]}; {a[4]}, {a[5]}, {npoints}')

@property
def output_mol_path(self):
return f'{self.workdir}/xtbopt.xyz'

12 changes: 10 additions & 2 deletions src/tcutility/results/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import pathlib as pl # noqa: E402

from .. import slurm # noqa: E402
from . import adf, ams, cache, dftb, orca # noqa: E402
from . import adf, ams, cache, dftb, orca, xtb # noqa: E402


def get_info(calc_dir: str):
Expand All @@ -29,6 +29,11 @@ def get_info(calc_dir: str):
except: # noqa
pass

try:
return xtb.get_info(calc_dir)
except: # noqa
pass

res = Result()

# if we cannot correctly read the info, we return some generic result object
Expand Down Expand Up @@ -93,14 +98,17 @@ def read(calc_dir: Union[str, pl.Path]) -> Result:
elif ret.engine == "dftb":
ret.dftb = dftb.get_calc_settings(ret)
ret.properties = dftb.get_properties(ret)
elif ret.engine == "xtb":
# ret.xtb = xtb.get_calc_settings(ret)
ret.properties = xtb.get_properties(ret)

elif ret.engine == "orca":
try:
ret.orca = orca.get_calc_settings(ret)
except:
ret.orca = None
print('Error reading:', calc_dir)
raise

try:
ret.properties = orca.get_properties(ret)
except:
Expand Down
8 changes: 7 additions & 1 deletion src/tcutility/results/ams.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def get_calc_files(calc_dir: str) -> dict:
"""
# collect all files in the current directory and subdirectories
files = []
for root, _, files_ in os.walk(calc_dir):
for root, _, files_ in os.walk(os.path.abspath(calc_dir)):
if os.path.split(root)[1].startswith('.'):
continue

Expand Down Expand Up @@ -468,6 +468,12 @@ def get_history(calc_dir: str) -> Result:
val = reader_ams.read("History", f"{item}({i+1})")
ret[item.lower()].append(val)

if 'converged' not in ret.keys() and ('PESScan', 'HistoryIndices') in reader_ams:
ret['converged'] = [False] * ret.number_of_entries
for idx in ensure_list(reader_ams.read('PESScan', 'HistoryIndices')):
ret['converged'][idx - 1] = True


return ret


Expand Down
48 changes: 34 additions & 14 deletions src/tcutility/results/dftb.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from tcutility.results import cache, Result
from tcutility import ensure_list
from tcutility import ensure_list, constants


def get_calc_settings(info: Result) -> Result:
Expand All @@ -8,6 +8,10 @@ def get_calc_settings(info: Result) -> Result:
"""

assert info.engine == "dftb", f"This function reads DFTB data, not {info.engine} data"

if info.input.task == 'PESScan':
return Result()

assert "dftb.rkf" in info.files, f'Missing dftb.rkf file, [{", ".join([": ".join(item) for item in info.files.items()])}]'

ret = Result()
Expand All @@ -19,18 +23,22 @@ def get_calc_settings(info: Result) -> Result:


def get_properties(info: Result) -> Result:
"""Function to get properties from an ADF calculation.
"""Function to get properties from an DFTB calculation.
Args:
info: Result object containing ADF properties.
info: Result object containing DFTB properties.
Returns:
:Result object containing properties from the ADF calculation:
:Result object containing properties from the DFTB calculation:
- **energy.bond (float)** – bonding energy (|kcal/mol|).
"""

assert info.engine == "dftb", f"This function reads DFTB data, not {info.engine} data"

if info.input.task == 'PESScan':
return Result()

assert "dftb.rkf" in info.files, f'Missing dftb.rkf file, [{", ".join([": ".join(item) for item in info.files.items()])}]'

reader_dftb = cache.get(info.files["dftb.rkf"])
Expand All @@ -48,21 +56,33 @@ def get_properties(info: Result) -> Result:
for i in range(1, number_of_properties + 1):
subtypes.append(reader_dftb.read("Properties", f"Subtype({i})").strip())
types.append(reader_dftb.read("Properties", f"Type({i})").strip())
values.append(reader_dftb.read("Properties", f"Value({i})"))
if types[-1] == 'Energy':
values.append(reader_dftb.read("Properties", f"Value({i})") * constants.HA2KCALMOL)
else:
values.append(reader_dftb.read("Properties", f"Value({i})"))

# then simply add the properties to ret
for typ, subtyp, value in zip(types, subtypes, values):
ret[typ.replace(" ", "_")][subtyp] = value

if ret.energy['DFTB Final Energy']:
ret.energy.bond = ret.energy['DFTB Final Energy']

# we also read vibrations
ret.vibrations.number_of_modes = reader_dftb.read("Vibrations", "nNormalModes")
ret.vibrations.frequencies = ensure_list(reader_dftb.read("Vibrations", "Frequencies[cm-1]"))
if ("Vibrations", "Intensities[km/mol]") in reader_dftb:
ret.vibrations.intensities = ensure_list(reader_dftb.read("Vibrations", "Intensities[km/mol]"))
ret.vibrations.number_of_imag_modes = len([freq for freq in ret.vibrations.frequencies if freq < 0])
ret.vibrations.character = "minimum" if ret.vibrations.number_of_imag_modes == 0 else "transitionstate"
ret.vibrations.modes = []
for i in range(ret.vibrations.number_of_modes):
ret.vibrations.modes.append(reader_dftb.read("Vibrations", f"NoWeightNormalMode({i+1})"))
if ('Vibrations', 'nNormalModes') in reader_dftb:
ret.vibrations.number_of_modes = reader_dftb.read("Vibrations", "nNormalModes")
ret.vibrations.frequencies = ensure_list(reader_dftb.read("Vibrations", "Frequencies[cm-1]"))
if ("Vibrations", "Intensities[km/mol]") in reader_dftb:
ret.vibrations.intensities = ensure_list(reader_dftb.read("Vibrations", "Intensities[km/mol]"))
ret.vibrations.number_of_imag_modes = len([freq for freq in ret.vibrations.frequencies if freq < 0])
ret.vibrations.character = "minimum" if ret.vibrations.number_of_imag_modes == 0 else "transitionstate"
ret.vibrations.modes = []
for i in range(ret.vibrations.number_of_modes):
ret.vibrations.modes.append(reader_dftb.read("Vibrations", f"NoWeightNormalMode({i+1})"))

if ("Thermodynamics", "Gibbs free Energy") in reader_dftb:
ret.energy.gibbs = reader_dftb.read("Thermodynamics", "Gibbs free Energy") * constants.HA2KCALMOL
ret.energy.enthalpy = reader_dftb.read("Thermodynamics", "Enthalpy") * constants.HA2KCALMOL
ret.energy.nuclear_internal = reader_dftb.read("Thermodynamics", "Internal Energy total") * constants.HA2KCALMOL

return ret
Loading

0 comments on commit 3e4649d

Please sign in to comment.