diff --git a/simulator/submitsrv.c b/simulator/submitsrv.c index 3376d2685..25f7307a6 100644 --- a/simulator/submitsrv.c +++ b/simulator/submitsrv.c @@ -220,11 +220,10 @@ int parse_job_csv (flux_t *h, char *filename, zlist_t *jobs) // Finally, updated the submit event timer with the next submit time int schedule_next_job (flux_t *h, sim_state_t *sim_state) { - flux_future_t *f = NULL; - flux_msg_t *msg = NULL; + flux_future_t *create_f = NULL, *submit_f = NULL; flux_kvsdir_t *dir = NULL; job_t *job = NULL; - char *kvs_path = NULL; + const char *kvs_path = NULL; int64_t new_jobid = -1; double *new_sched_mod_time = NULL, *new_submit_mod_time = NULL; @@ -240,23 +239,26 @@ int schedule_next_job (flux_t *h, sim_state_t *sim_state) return -1; } - f = flux_rpc_pack (h, "job.create", FLUX_NODEID_ANY, 0, - "{ s:i s:i s:I }", - "nnodes", job->nnodes, - "ntasks", job->ncpus, - "walltime", (int64_t)job->time_limit); - if (f == NULL) { + create_f = flux_rpc_pack (h, "job.create", FLUX_NODEID_ANY, 0, + "{ s:i s:i s:i s:i }", + "ntasks", job->ncpus, + "nnodes", job->nnodes, + "ncores", job->ncpus, + "walltime", (int)job->time_limit); + if (create_f == NULL) { flux_log (h, LOG_ERR, "%s: %s", __FUNCTION__, strerror (errno)); return -1; } - if (flux_rpc_get_unpack (f, "{ s:I s:s }", + if (flux_rpc_get_unpack (create_f, "{ s:I s:s }", "jobid", &new_jobid, - "kvs_path", &kvs_path) < 0) { + "kvs_path", &kvs_path) < 0) { flux_log (h, LOG_ERR, "%s: %s", __FUNCTION__, strerror (errno)); - flux_future_destroy (f); + flux_future_destroy (create_f); return -1; } - flux_future_destroy (f); + + flux_log (h, LOG_DEBUG, "%s: created job %"PRId64" at %s", + __FUNCTION__, new_jobid, kvs_path); // Update lwj.%jobid%'s state in the kvs to "submitted" if (!(dir = job_kvsdir (h, new_jobid))) @@ -266,36 +268,56 @@ int schedule_next_job (flux_t *h, sim_state_t *sim_state) log_err_exit ("put_job_in_kvs"); // Send "submitted" event - if (!(msg = flux_event_pack ("wreck.state.submitted", - "{ s:I s:s s:i s:i s:i}", - "lwj", new_jobid, - "kvs_path", kvs_path, - "nnodes", (int)job->nnodes, - "ntasks", (int)job->ncpus, - "walltime", (int)job->time_limit)) - || flux_send (h, msg, 0) < 0) { + submit_f = flux_rpc_pack (h, "job.submit-nocreate", FLUX_NODEID_ANY, 0, + "{ s:I s:s s:i s:i s:i s:i }", + "jobid", new_jobid, + "kvs_path", kvs_path, + "nnodes", job->nnodes, + "ntasks", job->ncpus, + "ncores", job->ncpus, + "walltime", (int)job->time_limit); + if (submit_f == NULL) { + flux_log (h, LOG_ERR, "%s: failed to pack job.submit-nocreate: %s", + __FUNCTION__, strerror (errno)); + flux_future_destroy (submit_f); + return -1; + } + if (flux_rpc_get (submit_f, NULL) < 0) { + flux_log (h, LOG_ERR, "%s: failed to get response for job.submit-nocreate", + __FUNCTION__); + flux_future_destroy (submit_f); return -1; } - flux_msg_destroy (msg); + flux_future_destroy (submit_f); + + // Must delay the destruction until we are done using the unpacked values + flux_future_destroy (create_f); + + flux_log (h, LOG_INFO, "submitted job %"PRId64" (%d in csv)", new_jobid, job->id); // Update event timers in reply (submit and sched) new_sched_mod_time = (double *)zhash_lookup (timers, "sched"); - if (new_sched_mod_time != NULL) + if (new_sched_mod_time == NULL) { + flux_log (h, LOG_ERR, "%s: 'sched' not in timers dictionary", + __FUNCTION__); + } else { *new_sched_mod_time = sim_state->sim_time + .00001; - flux_log (h, - LOG_DEBUG, - "added a sched timer that will occur at %f", - *new_sched_mod_time); + flux_log (h, LOG_DEBUG, "added a sched timer that will occur at %f", + *new_sched_mod_time); + } + new_submit_mod_time = (double *)zhash_lookup (timers, module_name); - if (get_next_submit_time () > *new_sched_mod_time) - *new_submit_mod_time = get_next_submit_time (); - else - *new_submit_mod_time = *new_sched_mod_time + .0001; - flux_log (h, LOG_INFO, "submitted job %"PRId64" (%d in csv)", new_jobid, job->id); - flux_log (h, - LOG_DEBUG, - "next submit event will occur at %f", - *new_submit_mod_time); + if (new_sched_mod_time == NULL) { + flux_log (h, LOG_ERR, "%s: '%s' not in timers dictionary", + __FUNCTION__, module_name); + } else { + if (get_next_submit_time () > *new_sched_mod_time) + *new_submit_mod_time = get_next_submit_time (); + else + *new_submit_mod_time = *new_sched_mod_time + .0001; + flux_log (h, LOG_DEBUG, "next submit event will occur at %f", + *new_submit_mod_time); + } // Cleanup free_job (job); diff --git a/t/t2000-fcfs.t b/t/t2000-fcfs.t index ce0ffbe26..d351a8e2a 100755 --- a/t/t2000-fcfs.t +++ b/t/t2000-fcfs.t @@ -9,11 +9,6 @@ test_description='Test fcfs scheduler in simulator # . $(dirname $0)/sharness.sh -if test -z "$FLUX_SCHED_ENABLE_SIM_TESTS"; then - skip_all='skipping simulator driven tests temporarily' - test_done -fi - FLUX_MODULE_PATH="${SHARNESS_BUILD_DIRECTORY}/simulator/.libs:${FLUX_MODULE_PATH}" rdlconf=$(readlink -e "${SHARNESS_TEST_SRCDIR}/../conf/hype-io.lua") diff --git a/t/t2001-fcfs-aware.t b/t/t2001-fcfs-aware.t index 3d927089e..5c2a5e070 100755 --- a/t/t2001-fcfs-aware.t +++ b/t/t2001-fcfs-aware.t @@ -9,11 +9,6 @@ test_description='Test fcfs io-aware scheduler in simulator # . $(dirname $0)/sharness.sh -if test -z "$FLUX_SCHED_ENABLE_SIM_TESTS"; then - skip_all='skipping simulator driven tests temporarily' - test_done -fi - FLUX_MODULE_PATH_PREPEND="$FLUX_MODULE_PATH_PREPEND:$(sched_build_path simulator/.libs)" rdlconf=$(readlink -e "${SHARNESS_TEST_SRCDIR}/../conf/hype-io.lua") diff --git a/t/t2002-easy.t b/t/t2002-easy.t index acaf11b9b..0e8133860 100755 --- a/t/t2002-easy.t +++ b/t/t2002-easy.t @@ -8,11 +8,6 @@ test_description='Test easy scheduler in simulator # . $(dirname $0)/sharness.sh -if test -z "$FLUX_SCHED_ENABLE_SIM_TESTS"; then - skip_all='skipping simulator driven tests temporarily' - test_done -fi - rdlconf=$(sched_src_path "conf/hype-io.lua") jobdata=$(readlink -e "${SHARNESS_TEST_SRCDIR}/data/job-traces/hype-test.csv") expected_order=$(readlink -e "${SHARNESS_TEST_SRCDIR}/data/emulator-data/easy_expected") diff --git a/t/t2003-fcfs-inorder.t b/t/t2003-fcfs-inorder.t index 1be27aee6..7d51f2663 100755 --- a/t/t2003-fcfs-inorder.t +++ b/t/t2003-fcfs-inorder.t @@ -9,11 +9,6 @@ test_description='Test fcfs scheduler with queue-depth=1 in simulator # . $(dirname $0)/sharness.sh -if test -z "$FLUX_SCHED_ENABLE_SIM_TESTS"; then - skip_all='skipping simulator driven tests temporarily' - test_done -fi - FLUX_MODULE_PATH="${SHARNESS_BUILD_DIRECTORY}/simulator/.libs:${FLUX_MODULE_PATH}" rdlconf=$(readlink -e "${SHARNESS_TEST_SRCDIR}/../conf/hype-io.lua")