Skip to content

Commit

Permalink
Merge pull request #304 from SteVwonder/fix-emulator-new-submit
Browse files Browse the repository at this point in the history
sim: fix emulator after introduction of new submit RPC
  • Loading branch information
garlick authored Apr 1, 2018
2 parents 02b17e9 + fa44e05 commit cc03941
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 56 deletions.
94 changes: 58 additions & 36 deletions simulator/submitsrv.c
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,10 @@ int parse_job_csv (flux_t *h, char *filename, zlist_t *jobs)
// Finally, updated the submit event timer with the next submit time
int schedule_next_job (flux_t *h, sim_state_t *sim_state)
{
flux_future_t *f = NULL;
flux_msg_t *msg = NULL;
flux_future_t *create_f = NULL, *submit_f = NULL;
flux_kvsdir_t *dir = NULL;
job_t *job = NULL;
char *kvs_path = NULL;
const char *kvs_path = NULL;
int64_t new_jobid = -1;
double *new_sched_mod_time = NULL, *new_submit_mod_time = NULL;

Expand All @@ -240,23 +239,26 @@ int schedule_next_job (flux_t *h, sim_state_t *sim_state)
return -1;
}

f = flux_rpc_pack (h, "job.create", FLUX_NODEID_ANY, 0,
"{ s:i s:i s:I }",
"nnodes", job->nnodes,
"ntasks", job->ncpus,
"walltime", (int64_t)job->time_limit);
if (f == NULL) {
create_f = flux_rpc_pack (h, "job.create", FLUX_NODEID_ANY, 0,
"{ s:i s:i s:i s:i }",
"ntasks", job->ncpus,
"nnodes", job->nnodes,
"ncores", job->ncpus,
"walltime", (int)job->time_limit);
if (create_f == NULL) {
flux_log (h, LOG_ERR, "%s: %s", __FUNCTION__, strerror (errno));
return -1;
}
if (flux_rpc_get_unpack (f, "{ s:I s:s }",
if (flux_rpc_get_unpack (create_f, "{ s:I s:s }",
"jobid", &new_jobid,
"kvs_path", &kvs_path) < 0) {
"kvs_path", &kvs_path) < 0) {
flux_log (h, LOG_ERR, "%s: %s", __FUNCTION__, strerror (errno));
flux_future_destroy (f);
flux_future_destroy (create_f);
return -1;
}
flux_future_destroy (f);

flux_log (h, LOG_DEBUG, "%s: created job %"PRId64" at %s",
__FUNCTION__, new_jobid, kvs_path);

// Update lwj.%jobid%'s state in the kvs to "submitted"
if (!(dir = job_kvsdir (h, new_jobid)))
Expand All @@ -266,36 +268,56 @@ int schedule_next_job (flux_t *h, sim_state_t *sim_state)
log_err_exit ("put_job_in_kvs");

// Send "submitted" event
if (!(msg = flux_event_pack ("wreck.state.submitted",
"{ s:I s:s s:i s:i s:i}",
"lwj", new_jobid,
"kvs_path", kvs_path,
"nnodes", (int)job->nnodes,
"ntasks", (int)job->ncpus,
"walltime", (int)job->time_limit))
|| flux_send (h, msg, 0) < 0) {
submit_f = flux_rpc_pack (h, "job.submit-nocreate", FLUX_NODEID_ANY, 0,
"{ s:I s:s s:i s:i s:i s:i }",
"jobid", new_jobid,
"kvs_path", kvs_path,
"nnodes", job->nnodes,
"ntasks", job->ncpus,
"ncores", job->ncpus,
"walltime", (int)job->time_limit);
if (submit_f == NULL) {
flux_log (h, LOG_ERR, "%s: failed to pack job.submit-nocreate: %s",
__FUNCTION__, strerror (errno));
flux_future_destroy (submit_f);
return -1;
}
if (flux_rpc_get (submit_f, NULL) < 0) {
flux_log (h, LOG_ERR, "%s: failed to get response for job.submit-nocreate",
__FUNCTION__);
flux_future_destroy (submit_f);
return -1;
}
flux_msg_destroy (msg);
flux_future_destroy (submit_f);

// Must delay the destruction until we are done using the unpacked values
flux_future_destroy (create_f);

flux_log (h, LOG_INFO, "submitted job %"PRId64" (%d in csv)", new_jobid, job->id);

// Update event timers in reply (submit and sched)
new_sched_mod_time = (double *)zhash_lookup (timers, "sched");
if (new_sched_mod_time != NULL)
if (new_sched_mod_time == NULL) {
flux_log (h, LOG_ERR, "%s: 'sched' not in timers dictionary",
__FUNCTION__);
} else {
*new_sched_mod_time = sim_state->sim_time + .00001;
flux_log (h,
LOG_DEBUG,
"added a sched timer that will occur at %f",
*new_sched_mod_time);
flux_log (h, LOG_DEBUG, "added a sched timer that will occur at %f",
*new_sched_mod_time);
}

new_submit_mod_time = (double *)zhash_lookup (timers, module_name);
if (get_next_submit_time () > *new_sched_mod_time)
*new_submit_mod_time = get_next_submit_time ();
else
*new_submit_mod_time = *new_sched_mod_time + .0001;
flux_log (h, LOG_INFO, "submitted job %"PRId64" (%d in csv)", new_jobid, job->id);
flux_log (h,
LOG_DEBUG,
"next submit event will occur at %f",
*new_submit_mod_time);
if (new_sched_mod_time == NULL) {
flux_log (h, LOG_ERR, "%s: '%s' not in timers dictionary",
__FUNCTION__, module_name);
} else {
if (get_next_submit_time () > *new_sched_mod_time)
*new_submit_mod_time = get_next_submit_time ();
else
*new_submit_mod_time = *new_sched_mod_time + .0001;
flux_log (h, LOG_DEBUG, "next submit event will occur at %f",
*new_submit_mod_time);
}

// Cleanup
free_job (job);
Expand Down
5 changes: 0 additions & 5 deletions t/t2000-fcfs.t
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,6 @@ test_description='Test fcfs scheduler in simulator
#
. $(dirname $0)/sharness.sh

if test -z "$FLUX_SCHED_ENABLE_SIM_TESTS"; then
skip_all='skipping simulator driven tests temporarily'
test_done
fi

FLUX_MODULE_PATH="${SHARNESS_BUILD_DIRECTORY}/simulator/.libs:${FLUX_MODULE_PATH}"

rdlconf=$(readlink -e "${SHARNESS_TEST_SRCDIR}/../conf/hype-io.lua")
Expand Down
5 changes: 0 additions & 5 deletions t/t2001-fcfs-aware.t
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,6 @@ test_description='Test fcfs io-aware scheduler in simulator
#
. $(dirname $0)/sharness.sh

if test -z "$FLUX_SCHED_ENABLE_SIM_TESTS"; then
skip_all='skipping simulator driven tests temporarily'
test_done
fi

FLUX_MODULE_PATH_PREPEND="$FLUX_MODULE_PATH_PREPEND:$(sched_build_path simulator/.libs)"

rdlconf=$(readlink -e "${SHARNESS_TEST_SRCDIR}/../conf/hype-io.lua")
Expand Down
5 changes: 0 additions & 5 deletions t/t2002-easy.t
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@ test_description='Test easy scheduler in simulator
#
. $(dirname $0)/sharness.sh

if test -z "$FLUX_SCHED_ENABLE_SIM_TESTS"; then
skip_all='skipping simulator driven tests temporarily'
test_done
fi

rdlconf=$(sched_src_path "conf/hype-io.lua")
jobdata=$(readlink -e "${SHARNESS_TEST_SRCDIR}/data/job-traces/hype-test.csv")
expected_order=$(readlink -e "${SHARNESS_TEST_SRCDIR}/data/emulator-data/easy_expected")
Expand Down
5 changes: 0 additions & 5 deletions t/t2003-fcfs-inorder.t
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,6 @@ test_description='Test fcfs scheduler with queue-depth=1 in simulator
#
. $(dirname $0)/sharness.sh

if test -z "$FLUX_SCHED_ENABLE_SIM_TESTS"; then
skip_all='skipping simulator driven tests temporarily'
test_done
fi

FLUX_MODULE_PATH="${SHARNESS_BUILD_DIRECTORY}/simulator/.libs:${FLUX_MODULE_PATH}"

rdlconf=$(readlink -e "${SHARNESS_TEST_SRCDIR}/../conf/hype-io.lua")
Expand Down

0 comments on commit cc03941

Please sign in to comment.