Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sim: fix emulator after introduction of new submit RPC #304

Merged
merged 4 commits into from
Apr 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 58 additions & 36 deletions simulator/submitsrv.c
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,10 @@ int parse_job_csv (flux_t *h, char *filename, zlist_t *jobs)
// Finally, updated the submit event timer with the next submit time
int schedule_next_job (flux_t *h, sim_state_t *sim_state)
{
flux_future_t *f = NULL;
flux_msg_t *msg = NULL;
flux_future_t *create_f = NULL, *submit_f = NULL;
flux_kvsdir_t *dir = NULL;
job_t *job = NULL;
char *kvs_path = NULL;
const char *kvs_path = NULL;
int64_t new_jobid = -1;
double *new_sched_mod_time = NULL, *new_submit_mod_time = NULL;

Expand All @@ -240,23 +239,26 @@ int schedule_next_job (flux_t *h, sim_state_t *sim_state)
return -1;
}

f = flux_rpc_pack (h, "job.create", FLUX_NODEID_ANY, 0,
"{ s:i s:i s:I }",
"nnodes", job->nnodes,
"ntasks", job->ncpus,
"walltime", (int64_t)job->time_limit);
if (f == NULL) {
create_f = flux_rpc_pack (h, "job.create", FLUX_NODEID_ANY, 0,
"{ s:i s:i s:i s:i }",
"ntasks", job->ncpus,
"nnodes", job->nnodes,
"ncores", job->ncpus,
"walltime", (int)job->time_limit);
if (create_f == NULL) {
flux_log (h, LOG_ERR, "%s: %s", __FUNCTION__, strerror (errno));
return -1;
}
if (flux_rpc_get_unpack (f, "{ s:I s:s }",
if (flux_rpc_get_unpack (create_f, "{ s:I s:s }",
"jobid", &new_jobid,
"kvs_path", &kvs_path) < 0) {
"kvs_path", &kvs_path) < 0) {
flux_log (h, LOG_ERR, "%s: %s", __FUNCTION__, strerror (errno));
flux_future_destroy (f);
flux_future_destroy (create_f);
return -1;
}
flux_future_destroy (f);

flux_log (h, LOG_DEBUG, "%s: created job %"PRId64" at %s",
__FUNCTION__, new_jobid, kvs_path);

// Update lwj.%jobid%'s state in the kvs to "submitted"
if (!(dir = job_kvsdir (h, new_jobid)))
Expand All @@ -266,36 +268,56 @@ int schedule_next_job (flux_t *h, sim_state_t *sim_state)
log_err_exit ("put_job_in_kvs");

// Send "submitted" event
if (!(msg = flux_event_pack ("wreck.state.submitted",
"{ s:I s:s s:i s:i s:i}",
"lwj", new_jobid,
"kvs_path", kvs_path,
"nnodes", (int)job->nnodes,
"ntasks", (int)job->ncpus,
"walltime", (int)job->time_limit))
|| flux_send (h, msg, 0) < 0) {
submit_f = flux_rpc_pack (h, "job.submit-nocreate", FLUX_NODEID_ANY, 0,
"{ s:I s:s s:i s:i s:i s:i }",
"jobid", new_jobid,
"kvs_path", kvs_path,
"nnodes", job->nnodes,
"ntasks", job->ncpus,
"ncores", job->ncpus,
"walltime", (int)job->time_limit);
if (submit_f == NULL) {
flux_log (h, LOG_ERR, "%s: failed to pack job.submit-nocreate: %s",
__FUNCTION__, strerror (errno));
flux_future_destroy (submit_f);
return -1;
}
if (flux_rpc_get (submit_f, NULL) < 0) {
flux_log (h, LOG_ERR, "%s: failed to get response for job.submit-nocreate",
__FUNCTION__);
flux_future_destroy (submit_f);
return -1;
}
flux_msg_destroy (msg);
flux_future_destroy (submit_f);

// Must delay the destruction until we are done using the unpacked values
flux_future_destroy (create_f);

flux_log (h, LOG_INFO, "submitted job %"PRId64" (%d in csv)", new_jobid, job->id);

// Update event timers in reply (submit and sched)
new_sched_mod_time = (double *)zhash_lookup (timers, "sched");
if (new_sched_mod_time != NULL)
if (new_sched_mod_time == NULL) {
flux_log (h, LOG_ERR, "%s: 'sched' not in timers dictionary",
__FUNCTION__);
} else {
*new_sched_mod_time = sim_state->sim_time + .00001;
flux_log (h,
LOG_DEBUG,
"added a sched timer that will occur at %f",
*new_sched_mod_time);
flux_log (h, LOG_DEBUG, "added a sched timer that will occur at %f",
*new_sched_mod_time);
}

new_submit_mod_time = (double *)zhash_lookup (timers, module_name);
if (get_next_submit_time () > *new_sched_mod_time)
*new_submit_mod_time = get_next_submit_time ();
else
*new_submit_mod_time = *new_sched_mod_time + .0001;
flux_log (h, LOG_INFO, "submitted job %"PRId64" (%d in csv)", new_jobid, job->id);
flux_log (h,
LOG_DEBUG,
"next submit event will occur at %f",
*new_submit_mod_time);
if (new_sched_mod_time == NULL) {
flux_log (h, LOG_ERR, "%s: '%s' not in timers dictionary",
__FUNCTION__, module_name);
} else {
if (get_next_submit_time () > *new_sched_mod_time)
*new_submit_mod_time = get_next_submit_time ();
else
*new_submit_mod_time = *new_sched_mod_time + .0001;
flux_log (h, LOG_DEBUG, "next submit event will occur at %f",
*new_submit_mod_time);
}

// Cleanup
free_job (job);
Expand Down
5 changes: 0 additions & 5 deletions t/t2000-fcfs.t
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,6 @@ test_description='Test fcfs scheduler in simulator
#
. $(dirname $0)/sharness.sh

if test -z "$FLUX_SCHED_ENABLE_SIM_TESTS"; then
skip_all='skipping simulator driven tests temporarily'
test_done
fi

FLUX_MODULE_PATH="${SHARNESS_BUILD_DIRECTORY}/simulator/.libs:${FLUX_MODULE_PATH}"

rdlconf=$(readlink -e "${SHARNESS_TEST_SRCDIR}/../conf/hype-io.lua")
Expand Down
5 changes: 0 additions & 5 deletions t/t2001-fcfs-aware.t
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,6 @@ test_description='Test fcfs io-aware scheduler in simulator
#
. $(dirname $0)/sharness.sh

if test -z "$FLUX_SCHED_ENABLE_SIM_TESTS"; then
skip_all='skipping simulator driven tests temporarily'
test_done
fi

FLUX_MODULE_PATH_PREPEND="$FLUX_MODULE_PATH_PREPEND:$(sched_build_path simulator/.libs)"

rdlconf=$(readlink -e "${SHARNESS_TEST_SRCDIR}/../conf/hype-io.lua")
Expand Down
5 changes: 0 additions & 5 deletions t/t2002-easy.t
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@ test_description='Test easy scheduler in simulator
#
. $(dirname $0)/sharness.sh

if test -z "$FLUX_SCHED_ENABLE_SIM_TESTS"; then
skip_all='skipping simulator driven tests temporarily'
test_done
fi

rdlconf=$(sched_src_path "conf/hype-io.lua")
jobdata=$(readlink -e "${SHARNESS_TEST_SRCDIR}/data/job-traces/hype-test.csv")
expected_order=$(readlink -e "${SHARNESS_TEST_SRCDIR}/data/emulator-data/easy_expected")
Expand Down
5 changes: 0 additions & 5 deletions t/t2003-fcfs-inorder.t
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,6 @@ test_description='Test fcfs scheduler with queue-depth=1 in simulator
#
. $(dirname $0)/sharness.sh

if test -z "$FLUX_SCHED_ENABLE_SIM_TESTS"; then
skip_all='skipping simulator driven tests temporarily'
test_done
fi

FLUX_MODULE_PATH="${SHARNESS_BUILD_DIRECTORY}/simulator/.libs:${FLUX_MODULE_PATH}"

rdlconf=$(readlink -e "${SHARNESS_TEST_SRCDIR}/../conf/hype-io.lua")
Expand Down