Skip to content

Commit

Permalink
PwBaseWorkChain: Remove automatic_parallelization input (#904)
Browse files Browse the repository at this point in the history
The automatic parallelization feature has been broken for a while, and
is based on dated benchmarks that most likely are not relevant on newer
machines. Since QE v7.1, an automatic parallelization feature has also been
added to `pw.x`.

Here we remove all the code related to the `automatic_parallelization`
input, and deprecate the related exit codes.
  • Loading branch information
mbercx authored Apr 7, 2023
1 parent 9f4142d commit 5cae75f
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 290 deletions.
20 changes: 6 additions & 14 deletions src/aiida_quantumespresso/cli/workflows/pw/bands.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
@options.HUBBARD_FILE()
@options.STARTING_MAGNETIZATION()
@options.SMEARING()
@options.AUTOMATIC_PARALLELIZATION()
@options.CLEAN_WORKDIR()
@options.MAX_NUM_MACHINES()
@options.MAX_WALLCLOCK_SECONDS()
Expand All @@ -30,15 +29,14 @@
@decorators.with_dbenv()
def launch_workflow(
code, structure, pseudo_family, kpoints_distance, ecutwfc, ecutrho, hubbard_u, hubbard_v, hubbard_file_pk,
starting_magnetization, smearing, automatic_parallelization, clean_workdir, max_num_machines, max_wallclock_seconds,
with_mpi, daemon
starting_magnetization, smearing, clean_workdir, max_num_machines, max_wallclock_seconds, with_mpi, daemon
):
"""Run a `PwBandsWorkChain`."""
# pylint: disable=too-many-statements
from aiida.orm import Bool, Dict, Float
from aiida.plugins import WorkflowFactory

from aiida_quantumespresso.utils.resources import get_automatic_parallelization_options, get_default_options
from aiida_quantumespresso.utils.resources import get_default_options

builder = WorkflowFactory('quantumespresso.pw.bands').get_builder()

Expand Down Expand Up @@ -93,16 +91,10 @@ def launch_workflow(
builder.scf.base.pw.hubbard_file = hubbard_file
builder.bands.base.pw.hubbard_file = hubbard_file

if automatic_parallelization:
auto_para = Dict(get_automatic_parallelization_options(max_num_machines, max_wallclock_seconds))
builder.relax.base.automatic_parallelization = auto_para
builder.scf.automatic_parallelization = auto_para
builder.bands.automatic_parallelization = auto_para
else:
metadata_options = get_default_options(max_num_machines, max_wallclock_seconds, with_mpi)
builder.relax.base.pw.metadata.options = metadata_options
builder.scf.pw.metadata.options = metadata_options
builder.bands.pw.metadata.options = metadata_options
metadata_options = get_default_options(max_num_machines, max_wallclock_seconds, with_mpi)
builder.relax.base.pw.metadata.options = metadata_options
builder.scf.pw.metadata.options = metadata_options
builder.bands.pw.metadata.options = metadata_options

if clean_workdir:
builder.clean_workdir = Bool(True)
Expand Down
12 changes: 3 additions & 9 deletions src/aiida_quantumespresso/cli/workflows/pw/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
@options.HUBBARD_FILE()
@options.STARTING_MAGNETIZATION()
@options.SMEARING()
@options.AUTOMATIC_PARALLELIZATION()
@options.CLEAN_WORKDIR()
@options.MAX_NUM_MACHINES()
@options.MAX_WALLCLOCK_SECONDS()
Expand All @@ -30,14 +29,13 @@
@decorators.with_dbenv()
def launch_workflow(
code, structure, pseudo_family, kpoints_distance, ecutwfc, ecutrho, hubbard_u, hubbard_v, hubbard_file_pk,
starting_magnetization, smearing, automatic_parallelization, clean_workdir, max_num_machines, max_wallclock_seconds,
with_mpi, daemon
starting_magnetization, smearing, clean_workdir, max_num_machines, max_wallclock_seconds, with_mpi, daemon
):
"""Run a `PwBaseWorkChain`."""
from aiida.orm import Bool, Dict, Float
from aiida.plugins import WorkflowFactory

from aiida_quantumespresso.utils.resources import get_automatic_parallelization_options, get_default_options
from aiida_quantumespresso.utils.resources import get_default_options

builder = WorkflowFactory('quantumespresso.pw.base').get_builder()

Expand Down Expand Up @@ -76,11 +74,7 @@ def launch_workflow(
if hubbard_file:
builder.hubbard_file = hubbard_file

if automatic_parallelization:
automatic_parallelization = get_automatic_parallelization_options(max_num_machines, max_wallclock_seconds)
builder.automatic_parallelization = Dict(automatic_parallelization)
else:
builder.pw.metadata.options = get_default_options(max_num_machines, max_wallclock_seconds, with_mpi)
builder.pw.metadata.options = get_default_options(max_num_machines, max_wallclock_seconds, with_mpi)

if clean_workdir:
builder.clean_workdir = Bool(True)
Expand Down
13 changes: 4 additions & 9 deletions src/aiida_quantumespresso/cli/workflows/pw/relax.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
@options.HUBBARD_FILE()
@options.STARTING_MAGNETIZATION()
@options.SMEARING()
@options.AUTOMATIC_PARALLELIZATION()
@options.CLEAN_WORKDIR()
@options.MAX_NUM_MACHINES()
@options.MAX_WALLCLOCK_SECONDS()
Expand All @@ -38,14 +37,14 @@
@decorators.with_dbenv()
def launch_workflow(
code, structure, pseudo_family, kpoints_distance, ecutwfc, ecutrho, hubbard_u, hubbard_v, hubbard_file_pk,
starting_magnetization, smearing, automatic_parallelization, clean_workdir, max_num_machines, max_wallclock_seconds,
with_mpi, daemon, final_scf
starting_magnetization, smearing, clean_workdir, max_num_machines, max_wallclock_seconds, with_mpi, daemon,
final_scf
):
"""Run a `PwRelaxWorkChain`."""
from aiida.orm import Bool, Dict, Float, Str
from aiida.plugins import WorkflowFactory

from aiida_quantumespresso.utils.resources import get_automatic_parallelization_options, get_default_options
from aiida_quantumespresso.utils.resources import get_default_options

builder = WorkflowFactory('quantumespresso.pw.relax').get_builder()

Expand Down Expand Up @@ -87,11 +86,7 @@ def launch_workflow(
if hubbard_file:
builder.base.pw.hubbard_file = hubbard_file

if automatic_parallelization:
automatic_parallelization = get_automatic_parallelization_options(max_num_machines, max_wallclock_seconds)
builder.base.automatic_parallelization = Dict(automatic_parallelization)
else:
builder.base.pw.metadata.options = get_default_options(max_num_machines, max_wallclock_seconds, with_mpi)
builder.base.pw.metadata.options = get_default_options(max_num_machines, max_wallclock_seconds, with_mpi)

if clean_workdir:
builder.clean_workdir = Bool(True)
Expand Down
137 changes: 0 additions & 137 deletions src/aiida_quantumespresso/utils/resources.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
# -*- coding: utf-8 -*-
"""Utilities for calculation job resources."""
from math import ceil, exp

import numpy as np

from aiida_quantumespresso.utils.defaults.calculation import pw as pw_defaults


def create_scheduler_resources(scheduler, base, goal):
Expand Down Expand Up @@ -70,135 +65,3 @@ def get_default_options(max_num_machines=1, max_wallclock_seconds=1800, with_mpi
'max_wallclock_seconds': int(max_wallclock_seconds),
'withmpi': with_mpi,
}


def get_automatic_parallelization_options(max_num_machines=1, max_wallclock_seconds=1800): # pylint: disable=invalid-name
"""Return an instance of the automatic parallelization options dictionary.
:param max_num_machines: set the number of nodes, default=1
:param max_wallclock_seconds: set the maximum number of wallclock seconds, default=1800
"""
return {
'max_num_machines': max_num_machines,
'target_time_seconds': 0.5 * max_wallclock_seconds,
'max_wallclock_seconds': max_wallclock_seconds
}


def get_pw_parallelization_parameters(
calculation,
max_num_machines,
target_time_seconds,
max_wallclock_seconds,
calculation_mode='scf',
round_interval=1800,
scaling_law=(exp(-16.1951988), 1.22535849)
):
"""Guess optimal choice of parallelzation parameters for a PwCalculation based on a completed initialization run.
:param calculation: an initial pw calculation (only initialization is sufficient),
to get number of k-points, of electrons, of spins, fft grids, etc.
:param max_num_machines: the maximum allowed number of nodes to be used
:param target_time_seconds: time the calculation should take finally for the user
:param max_wallclock_seconds: maximum allowed walltime the calculation should take
:param calculation_mode: kind of calculation_mode to be performed
('scf', 'nscf', 'bands', 'relax', 'md', 'vc-relax', 'vc-md')
:param round_interval: the interval in seconds to which the estimated time in the results
will be rounded up, to determine the max_wallclock_seconds that should be set
:param scaling_law: list or tuple with 2 numbers giving the
fit parameters for a power law expressing the single-CPU time to do
1 scf step, for 1 k-point, 1 spin and 1 small box of the fft grid,
as a function of number of electrons, in the form: normalized_single_CPU_time = A*n_elec^B
where A is the first number and B the second.
Default values were obtained on piz-dora (CSCS) in 2015, on a set of
4370 calculations (with a very rough fit).
:return: a dictionary with suggested parallelization parameters with the following keys
* npools: the number of pools to use in the cmdline setting
* num_machines: the recommended number of nodes
* num_mpiprocs_per_machine: the recommended number of processes per nodes
* estimated_time: the estimated time the calculation should take in seconds
* max_wallclock_seconds: the recommended max_wall_clock_seconds setting based on the estimated_time value and
the round_interval argument
.. note:: If there was an out-of-memory problem during the initial
calculation, the number of machines is increased.
"""
# pylint: disable=invalid-name
from math import gcd

default_num_mpiprocs_per_machine = calculation.computer.get_default_mpiprocs_per_machine()

input_parameters = calculation.inputs.parameters.get_dict()
output_parameters = calculation.outputs.output_parameters.get_dict()
electron_settings = input_parameters.get('ELECTRONS', {})

nspin = output_parameters['number_of_spin_components']
nbands = output_parameters['number_of_bands']
nkpoints = output_parameters['number_of_k_points']
nsteps = electron_settings.get('electron_maxstep', pw_defaults.electron_maxstep)
fft_grid = output_parameters['fft_grid']

# Determine expected number of scf iterations. In the case of scf-like modes with relax or
# dynamics steps, we assume an average of 6 steps. All others are single step calculations
if calculation_mode in ['scf']:
niterations = nsteps
elif calculation_mode in ['relax', 'md', 'vc-relax', 'vc-md']:
niterations = nsteps * 6
else:
niterations = 1

# Compute an estimate single-CPU time
time_single_cpu = np.prod(fft_grid) * nspin * nkpoints * niterations * scaling_law[0] * nbands**scaling_law[1]

# The number of nodes is the maximum number we can use that is dividing nkpoints
num_machines = max(m for m in range(1, max_num_machines + 1) if nkpoints % m == 0)

# If possible try to make number of kpoints even by changing the number of machines
if (
num_machines == 1 and nkpoints > 6 and max_num_machines > 1 and
time_single_cpu / default_num_mpiprocs_per_machine > target_time_seconds
):
num_machines = max(m for m in range(1, max_num_machines + 1) if (nkpoints + 1) % m == 0)

# Now we will try to decrease the number of processes per machine (by not more than one fourth)
# until we manage to get an efficient plane wave parallelization
# (i.e. number of procs per pool dividing the third dimension of the fft grid)
num_mpiprocs_per_machine = default_num_mpiprocs_per_machine
successful = False
while num_mpiprocs_per_machine >= 0.75 * default_num_mpiprocs_per_machine:
if nkpoints % num_machines != 0:
npools = num_machines
else:
npools = num_machines * gcd(num_mpiprocs_per_machine, nkpoints / num_machines)
if fft_grid[2] % num_mpiprocs_per_machine / (npools / num_machines) == 0:
successful = True
break
num_mpiprocs_per_machine -= 1

if not successful:
num_mpiprocs_per_machine = default_num_mpiprocs_per_machine
if nkpoints % num_machines != 0:
npools = num_machines
else:
npools = num_machines * gcd(num_mpiprocs_per_machine, nkpoints / num_machines)

# Increase the number of machines in case of memory problem during initialization
if calculation.get_scheduler_stderr() and 'OOM' in calculation.get_scheduler_stderr():
num_machines = max(i for i in range(num_machines, max_num_machines + 1) if i % num_machines == 0)

estimated_time = time_single_cpu / (num_mpiprocs_per_machine * num_machines)
max_wallclock_seconds = min(ceil(estimated_time / round_interval) * round_interval, max_wallclock_seconds)

result = {
'resources': {
'num_machines': num_machines,
'num_mpiprocs_per_machine': num_mpiprocs_per_machine,
'tot_num_mpiprocs': num_machines * num_mpiprocs_per_machine,
},
'max_wallclock_seconds': max_wallclock_seconds,
'estimated_time': estimated_time,
'npools': npools,
}

return result
Loading

0 comments on commit 5cae75f

Please sign in to comment.