Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for arbitrary prerequisites to case.submit #1753

Merged
merged 6 commits into from
Oct 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 55 additions & 48 deletions config/acme/machines/config_batch.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,28 @@
</directives>
</batch_system>

<batch_system type="cobalt" >
<batch_query>qstat</batch_query>
<batch_submit>qsub</batch_submit>
<batch_cancel>qdel</batch_cancel>
<batch_env>-v</batch_env>
<batch_directive></batch_directive>
<jobid_pattern>(\d+)</jobid_pattern>
<depend_string> --dependencies</depend_string>
<walltime_format>%H:%M:%s</walltime_format>
<batch_mail_flag>-M</batch_mail_flag>
<batch_mail_type_flag></batch_mail_type_flag>
<batch_mail_type></batch_mail_type>
<submit_args>
<arg flag="--cwd" name="CASEROOT"/>
<arg flag="-A" name="PROJECT"/>
<arg flag="-t" name="JOB_WALLCLOCK_TIME"/>
<arg flag="-n" name=" ($TOTALPES + $MAX_MPITASKS_PER_NODE - 1)/$MAX_MPITASKS_PER_NODE"/>
<arg flag="-q" name="JOB_QUEUE"/>
<arg flag="--mode script"/>
</submit_args>
</batch_system>
<batch_system type="cobalt" >
<batch_query>qstat</batch_query>
<batch_submit>qsub</batch_submit>
<batch_cancel>qdel</batch_cancel>
<batch_env>-v</batch_env>
<batch_directive></batch_directive>
<jobid_pattern>(\d+)</jobid_pattern>
<depend_string> --dependencies jobid</depend_string>
<depend_separator>:</depend_separator>
<walltime_format>%H:%M:%s</walltime_format>
<batch_mail_flag>-M</batch_mail_flag>
<batch_mail_type_flag></batch_mail_type_flag>
<batch_mail_type></batch_mail_type>
<submit_args>
<arg flag="--cwd" name="CASEROOT"/>
<arg flag="-A" name="PROJECT"/>
<arg flag="-t" name="JOB_WALLCLOCK_TIME"/>
<arg flag="-n" name=" ($TOTALPES + $MAX_MPITASKS_PER_NODE - 1)/MAX_MPITASKS_PER_NODE"/>
<arg flag="-q" name="JOB_QUEUE"/>
<arg flag="--mode script"/>
</submit_args>
</batch_system>

<batch_system type="cobalt_theta" >
<batch_query>qstat</batch_query>
Expand All @@ -64,7 +65,8 @@
<batch_env>-v</batch_env>
<batch_directive>#COBALT</batch_directive>
<jobid_pattern>(\d+)</jobid_pattern>
<depend_string> --dependencies</depend_string>
<depend_string>--dependencies jobid</depend_string>
<depend_separator>:</depend_separator>
<batch_mail_flag>-M</batch_mail_flag>
<batch_mail_type_flag></batch_mail_type_flag>
<batch_mail_type></batch_mail_type>
Expand All @@ -84,7 +86,8 @@
<batch_redirect>&lt;</batch_redirect>
<batch_directive>#BSUB</batch_directive>
<jobid_pattern>&lt;(\d+)&gt;</jobid_pattern>
<depend_string> -w 'done(jobid)'</depend_string>
<depend_string>-w 'done(jobid)'</depend_string>
<depend_separator>&amp;&amp;</depend_separator>
<walltime_format>%H:%M</walltime_format>
<batch_mail_flag>-u</batch_mail_flag>
<batch_mail_type_flag></batch_mail_type_flag>
Expand Down Expand Up @@ -114,7 +117,8 @@
<batch_env>-v</batch_env>
<batch_directive>#PBS</batch_directive>
<jobid_pattern>^(\S+)$</jobid_pattern>
<depend_string> -W depend=afterok:jobid</depend_string>
<depend_string>-W depend=afterok:jobid</depend_string>
<depend_separator>:</depend_separator>
<walltime_format>%H:%M:%S</walltime_format>
<batch_mail_flag>-M</batch_mail_flag>
<batch_mail_type_flag>-m</batch_mail_type_flag>
Expand All @@ -140,7 +144,8 @@
<batch_cancel>canceljob</batch_cancel>
<batch_directive>#MSUB</batch_directive>
<jobid_pattern>(\d+)$</jobid_pattern>
<depend_string> -W depend=afterok:jobid</depend_string>
<depend_string>-W depend=afterok:jobid</depend_string>
<depend_separator>:</depend_separator>
<walltime_format>%H:%M:%S</walltime_format>
<batch_mail_flag>-M</batch_mail_flag>
<batch_mail_type_flag>-m</batch_mail_type_flag>
Expand All @@ -165,6 +170,7 @@
<batch_directive>#SBATCH</batch_directive>
<jobid_pattern>(\d+)$</jobid_pattern>
<depend_string> -l depend=jobid</depend_string>
<depend_separator>:</depend_separator>
<walltime_format>%H:%M:%S</walltime_format>
<batch_mail_flag>--mail-user</batch_mail_flag>
<batch_mail_type_flag>--mail-type</batch_mail_type_flag>
Expand All @@ -187,29 +193,30 @@
</batch_system>
<!-- for lawrence livermore computing -->

<batch_system type="slurm" >
<batch_query per_job_arg="-j">squeue</batch_query>
<batch_submit>sbatch</batch_submit>
<batch_cancel>scancel</batch_cancel>
<batch_directive>#SBATCH</batch_directive>
<jobid_pattern>(\d+)$</jobid_pattern>
<depend_string> --dependency=afterok:jobid</depend_string>
<walltime_format>%H:%M:%S</walltime_format>
<batch_mail_flag>--mail-user</batch_mail_flag>
<batch_mail_type_flag>--mail-type</batch_mail_type_flag>
<batch_mail_type>none, all, begin, end, fail</batch_mail_type>
<submit_args>
<arg flag="--time" name="$JOB_WALLCLOCK_TIME"/>
<arg flag="-p" name="$JOB_QUEUE"/>
<arg flag="--account" name="$PROJECT"/>
</submit_args>
<directives>
<directive> --job-name={{ job_id }}</directive>
<directive> --nodes={{ num_nodes }}</directive>
<directive> --output={{ output_error_path }}.%j </directive>
<directive> --exclusive </directive>
</directives>
</batch_system>
<batch_system type="slurm" >
<batch_query per_job_arg="-j">squeue</batch_query>
<batch_submit>sbatch</batch_submit>
<batch_cancel>scancel</batch_cancel>
<batch_directive>#SBATCH</batch_directive>
<jobid_pattern>(\d+)$</jobid_pattern>
<depend_string>--dependency=afterok:jobid</depend_string>
<depend_separator>:</depend_separator>
<walltime_format>%H:%M:%S</walltime_format>
<batch_mail_flag>--mail-user</batch_mail_flag>
<batch_mail_type_flag>--mail-type</batch_mail_type_flag>
<batch_mail_type>none, all, begin, end, fail</batch_mail_type>
<submit_args>
<arg flag="--time" name="$JOB_WALLCLOCK_TIME"/>
<arg flag="-p" name="$JOB_QUEUE"/>
<arg flag="--account" name="$PROJECT"/>
</submit_args>
<directives>
<directive> --job-name={{ job_id }}</directive>
<directive> --nodes={{ num_nodes }}</directive>
<directive> --output={{ output_error_path }}.%j </directive>
<directive> --exclusive </directive>
</directives>
</batch_system>

<!-- blues is PBS -->
<batch_system MACH="blues" type="pbs" >
Expand Down
4 changes: 4 additions & 0 deletions config/xml_schemas/config_batch.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<xs:element name="batch_directive" type="xs:string"/>
<xs:element name="jobid_pattern" type="xs:string"/>
<xs:element name="depend_string" type="xs:string"/>
<xs:element name="depend_separator" type="xs:string"/>
<xs:element name="walltime_format" type="xs:string"/>
<xs:element name="batch_mail_flag" type="xs:string"/>
<xs:element name="batch_mail_type_flag" type="xs:string"/>
Expand Down Expand Up @@ -68,6 +69,9 @@
a previous job has completed successfully -->
<xs:element minOccurs="0" ref="depend_string"/>

<!-- depend_separator: How to separate multiple batch job dependencies -->
<xs:element minOccurs="0" ref="depend_separator"/>

<!-- walltime_format: time format expected by batch system for the wall clock time field -->
<xs:element minOccurs="0" ref="walltime_format"/>

Expand Down
13 changes: 6 additions & 7 deletions scripts/Tools/case.submit
Original file line number Diff line number Diff line change
Expand Up @@ -61,24 +61,23 @@ OR

args = CIME.utils.parse_args_and_handle_standard_logging_options(args, parser)

CIME.utils.expect(args.prereq is None, "--prereq not currently supported")

return args.test, args.caseroot, args.job, args.no_batch, args.resubmit, \
args.skip_preview_namelist, args.mail_user, args.mail_type, \
return args.test, args.caseroot, args.job, args.no_batch, args.prereq, \
args.resubmit, args.skip_preview_namelist, args.mail_user, args.mail_type, \
args.batch_args

###############################################################################
def _main_func(description):
###############################################################################
test, caseroot, job, no_batch, resubmit, skip_pnl, \
test, caseroot, job, no_batch, prereq, resubmit, skip_pnl, \
mail_user, mail_type, batch_args = parse_command_line(sys.argv, description)
if test:
test_results = doctest.testmod(verbose=True)
sys.exit(1 if test_results.failed > 0 else 0)

with Case(caseroot, read_only=False) as case:
submit(case, job=job, no_batch=no_batch, resubmit=resubmit, skip_pnl=skip_pnl,
mail_user=mail_user, mail_type=mail_type, batch_args=batch_args)
submit(case, job=job, no_batch=no_batch, prereq=prereq, resubmit=resubmit,
skip_pnl=skip_pnl, mail_user=mail_user, mail_type=mail_type,
batch_args=batch_args)

if __name__ == "__main__":
_main_func(__doc__)
44 changes: 20 additions & 24 deletions scripts/lib/CIME/XML/env_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ def __init__(self, case_root=None, infile="env_batch.xml"):
"""
initialize an object interface to file env_batch.xml in the case directory
"""
self._prereq_jobid = None
self._batchtype = None
# This arbitrary setting should always be overwritten
self._default_walltime = "00:20:00"
Expand Down Expand Up @@ -316,9 +315,9 @@ def get_submit_args(self, case, job):

return submitargs

def submit_jobs(self, case, no_batch=False, job=None, skip_pnl=False,
mail_user=None, mail_type='never', batch_args=None,
dry_run=False):
def submit_jobs(self, case, no_batch=False, job=None, user_prereq=None,
skip_pnl=False, mail_user=None, mail_type='never',
batch_args=None, dry_run=False):
alljobs = self.get_jobs()
startindex = 0
jobs = []
Expand Down Expand Up @@ -355,25 +354,16 @@ def submit_jobs(self, case, no_batch=False, job=None, skip_pnl=False,
deps = dependency.split()
else:
deps = []
jobid = ""
if self._prereq_jobid is not None:
jobid = self._prereq_jobid
dep_jobs = []
if user_prereq is not None:
dep_jobs.append(user_prereq)
for dep in deps:
if dep in depid and depid[dep] is not None:
jobid += " " + str(depid[dep])
#TODO: doubt these will be used
# elif dep == "and":
# jobid += " && "
# elif dep == "or":
# jobid += " || "
if dep in depid.keys() and depid[dep] is not None:
dep_jobs.append(str(depid[dep]))


slen = len(jobid)
if slen == 0:
jobid = None

logger.warning("job is {}".format(job))
result = self._submit_single_job(case, job, jobid,
logger.warning("job {} depends on {}".format(job, dep_jobs))
result = self._submit_single_job(case, job,
dep_jobs=dep_jobs,
no_batch=no_batch,
skip_pnl=skip_pnl,
mail_user=mail_user,
Expand All @@ -391,7 +381,7 @@ def submit_jobs(self, case, no_batch=False, job=None, skip_pnl=False,
else:
return depid

def _submit_single_job(self, case, job, depid=None, no_batch=False,
def _submit_single_job(self, case, job, dep_jobs=None, no_batch=False,
skip_pnl=False, mail_user=None, mail_type='never',
batch_args=None, dry_run=False):
logger.warning("Submit job {}".format(job))
Expand All @@ -415,9 +405,15 @@ def _submit_single_job(self, case, job, depid=None, no_batch=False,
if args_override:
submitargs = args_override

if depid is not None:
if dep_jobs is not None and len(dep_jobs) > 0:
logger.info("dependencies: {}".format(dep_jobs))
dep_string = self.get_value("depend_string", subgroup=None)
dep_string = dep_string.replace("jobid",depid.strip()) # pylint: disable=maybe-no-member
separator_string = self.get_value("depend_separator", subgroup=None)
expect("jobid" in dep_string, "depend_string is missing jobid for prerequisite jobs")
dep_ids_str = str(dep_jobs[0])
for dep_id in dep_jobs[1:]:
dep_ids_str += separator_string + str(dep_id)
dep_string = dep_string.replace("jobid",dep_ids_str.strip()) # pylint: disable=maybe-no-member
submitargs += " " + dep_string

if batch_args is not None:
Expand Down
4 changes: 2 additions & 2 deletions scripts/lib/CIME/case.py
Original file line number Diff line number Diff line change
Expand Up @@ -1165,11 +1165,11 @@ def _get_comp_user_mods(self, component):
else:
return comp_user_mods

def submit_jobs(self, no_batch=False, job=None, skip_pnl=False,
def submit_jobs(self, no_batch=False, job=None, prereq=None, skip_pnl=False,
mail_user=None, mail_type='never', batch_args=None,
dry_run=False):
env_batch = self.get_env('batch')
return env_batch.submit_jobs(self, no_batch=no_batch, job=job,
return env_batch.submit_jobs(self, no_batch=no_batch, job=job, user_prereq=prereq,
skip_pnl=skip_pnl, mail_user=mail_user,
mail_type=mail_type, batch_args=batch_args,
dry_run=dry_run)
Expand Down
18 changes: 10 additions & 8 deletions scripts/lib/CIME/case_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

logger = logging.getLogger(__name__)

def _submit(case, job=None, resubmit=False, no_batch=False, skip_pnl=False,
mail_user=None, mail_type='never', batch_args=None):
def _submit(case, job=None, no_batch=False, prereq=None, resubmit=False,
skip_pnl=False, mail_user=None, mail_type='never', batch_args=None):
if job is None:
if case.get_value("TEST"):
job = "case.test"
Expand Down Expand Up @@ -64,8 +64,8 @@ def _submit(case, job=None, resubmit=False, no_batch=False, skip_pnl=False,

logger.warning("submit_jobs {}".format(job))
job_ids = case.submit_jobs(no_batch=no_batch, job=job, skip_pnl=skip_pnl,
mail_user=mail_user, mail_type=mail_type,
batch_args=batch_args)
prereq=prereq, mail_user=mail_user,
mail_type=mail_type, batch_args=batch_args)

xml_jobids = []
for jobname, jobid in job_ids.items():
Expand All @@ -77,8 +77,8 @@ def _submit(case, job=None, resubmit=False, no_batch=False, skip_pnl=False,
if xml_jobid_text:
case.set_value("JOB_IDS", xml_jobid_text)

def submit(case, job=None, resubmit=False, no_batch=False, skip_pnl=False,
mail_user=None, mail_type='never', batch_args=None):
def submit(case, job=None, no_batch=False, prereq=None, resubmit=False,
skip_pnl=False, mail_user=None, mail_type='never', batch_args=None):
if case.get_value("TEST"):
caseroot = case.get_value("CASEROOT")
casebaseid = case.get_value("CASEBASEID")
Expand All @@ -93,8 +93,10 @@ def submit(case, job=None, resubmit=False, no_batch=False, skip_pnl=False,
ts.set_status(SUBMIT_PHASE, TEST_PASS_STATUS)

try:
functor = lambda: _submit(case, job, resubmit, no_batch, skip_pnl,
mail_user, mail_type, batch_args)
functor = lambda: _submit(case, job=job, no_batch=no_batch, prereq=prereq,
resubmit=resubmit, skip_pnl=skip_pnl,
mail_user=mail_user, mail_type=mail_type,
batch_args=batch_args)
run_and_log_case_status(functor, "case.submit", caseroot=case.get_value("CASEROOT"))
except:
# If something failed in the batch system, make sure to mark
Expand Down
43 changes: 43 additions & 0 deletions scripts/tests/scripts_regression_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from six import assertRaisesRegex
import six

import collections

from CIME.utils import run_cmd, run_cmd_no_fail, get_lids, get_current_commit
import update_acme_tests
import CIME.test_scheduler, CIME.wait_for_tests
Expand Down Expand Up @@ -1394,6 +1396,47 @@ def test_cime_case(self):
# Test some test properties
self.assertEqual(case.get_value("TESTCASE"), "TESTRUNPASS")

###########################################################################
def test_cime_case_prereq(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that you should be using the existing system test framework in CIME/SystemTests.
You could create a new test based on the SMS test in which job one is an initial run which writes a restart at the final time and job two is started with the prereq flag and is a CONTINUE_RUN which reads the restart from job 1. This would test that job 2 does not start before job 1 is complete and that job 2 successfully completes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm somewhat concerned that this would depend on the queue system not fortuitously scheduling the jobs regardless of whether the prerequisites have been met.
The other issue is it's not clear to me how to get job 1's queue id? run_indv calls case_run, which assumes it's already on the queue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could include a negative test - make the first run fail and assure that the second doesn't start. As for the jobid, code env_batch.py has a subroutine get_job_id. Since you will need to submit two separate jobs for this test you should look at the ERR test for an example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the silence on this; I've been focusing on other work recently.
I'm not certain what the best way to implement a negative test for this is - querying the batch system to verify the job will never be run seems just as much work as checking that the batch system added the dependency, neither of which seem doable with CIME currently.
I'd rather not add a timeout which might not be met even with the prerequisite running successfully, so querying the batch system seems like something I'd have to do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jedwards4b @jgfouca Would there be any suggestions on a robust method of testing this without querying the queue? Waiting for some threshold and verifying the dependent one didn't run seems dangerous, especially with the recent changes on skybridge. AFAICT, there isn't a robust method, so just doing the simple positive test seems reasonable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mfdeakin-sandia , can you please explain what the options are in pseudo-code steps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests I'm envisioning is as follows:
Submit a job (done)
While that job is running, submit a job with the prerequisite argument (the dependent job) for the currently running job (done)
Ensure the job does not run if the other job has not finished successfully yet. (not done)

I can add a check to the dependent job to verify the first job has finished, but if the dependent job was not submitted to the queue correctly due to a bug in the prerequisite logic, this test may still pass if it makes it onto a node after the original finished.
@jedwards4b suggested a negative test, force the original job to fail, and then verify the dependent job is never run. I could add a timeout and verify the job hasn't run by this time; but this depends on the queue not submitting after the timeout.

Another option would be to write a fake batch system as @billsacks suggested; this would be a separate Python script which outputs the arguments to a file so we can verify they're correct. Given all the xml needed to support this fake batch system, I'm not certain this is the way we want to go, but if it's helpful for other parts of the code that could justify it.

###########################################################################
testcase_name = 'prereq_test'
testdir = os.path.join(TEST_ROOT, testcase_name)
if os.path.exists(testdir):
shutil.rmtree(testdir)
run_cmd_assert_result(self, ("%s/create_newcase --case %s --script-root %s --compset X --res f19_g16 --output-root %s"
% (SCRIPT_DIR, testcase_name, testdir, testdir)),
from_dir=SCRIPT_DIR)

with Case(testdir, read_only=False) as case:
job_name = "case.run"
prereq_name = 'prereq_test'
batch_commands = case.submit_jobs(prereq=prereq_name, job=job_name, skip_pnl=True, dry_run=True)
self.assertTrue(isinstance(batch_commands, collections.Sequence), "case.submit_jobs did not return a sequence for a dry run")
self.assertTrue(len(batch_commands) > 0, "case.submit_jobs did not return any job submission string")
# The first element in the internal sequence should just be the job name
# The second one (batch_cmd_index) should be the actual batch submission command
batch_cmd_index = 1
# The prerequisite should be applied to all jobs, though we're only expecting one
for batch_cmd in batch_commands:
self.assertTrue(isinstance(batch_cmd, collections.Sequence), "case.submit_jobs did not return a sequence of sequences")
self.assertTrue(len(batch_cmd) > batch_cmd_index, "case.submit_jobs returned internal sequences with length <= {}".format(batch_cmd_index))
self.assertTrue(isinstance(batch_cmd[1], str), "case.submit_jobs returned internal sequences without the batch command string as the second parameter: {}".format(batch_cmd[1]))
batch_cmd_args = batch_cmd[1]

jobid_ident = 'jobid'
dep_str_fmt = case.get_env('batch').get_value('depend_string', subgroup=None)
self.assertTrue(jobid_ident in dep_str_fmt, "dependency string doesn't include the jobid identifier {}".format(jobid_ident))
dep_str = dep_str_fmt[:-len(jobid_ident)]

while dep_str in batch_cmd_args:
dep_id_pos = batch_cmd_args.find(dep_str) + len(dep_str)
batch_cmd_args = batch_cmd_args[dep_id_pos:]
prereq_substr = batch_cmd_args[:len(prereq_name)]
if prereq_substr == prereq_name:
break

self.assertTrue(prereq_name in prereq_substr, "Dependencies added, but not the user specified one")

###########################################################################
def test_cime_case_build_threaded_1(self):
###########################################################################
Expand Down