Skip to content

Commit

Permalink
Merge pull request #294 from dongahn/sched_prop
Browse files Browse the repository at this point in the history
sched: Add queue-depth=1 FCFS perf/scalability optimization
  • Loading branch information
SteVwonder authored Mar 23, 2018
2 parents 822f739 + d691856 commit 09ad395
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 7 deletions.
6 changes: 6 additions & 0 deletions sched/plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ static struct behavior_plugin *behavior_plugin_create (flux_t *h, void *dso)
memset (plugin, 0, sizeof (*plugin));
dlerror (); // Clear old dlerrors

plugin->get_sched_properties = dlsym (dso, "get_sched_properties");
strerr = dlerror();
if (strerr || !plugin->get_sched_properties || !*plugin->get_sched_properties) {
flux_log (h, LOG_ERR, "can't load get_sched_properties: %s", strerr);
goto error;
}
plugin->sched_loop_setup = dlsym (dso, "sched_loop_setup");
strerr = dlerror();
if (strerr || !plugin->sched_loop_setup || !*plugin->sched_loop_setup) {
Expand Down
3 changes: 3 additions & 0 deletions sched/plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ struct behavior_plugin {
char *name; /* Name of plugin */
char *path; /* Path to plugin dso */

int (*get_sched_properties)(flux_t *h,
struct sched_prop *prop);

int (*sched_loop_setup)(flux_t *h);

int64_t (*find_resources)(flux_t *h,
Expand Down
1 change: 1 addition & 0 deletions sched/plugin_version.map
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{ global:
get_sched_properties;
allocate_resources;
find_resources;
reserve_resources;
Expand Down
21 changes: 16 additions & 5 deletions sched/sched.c
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ typedef struct {
zlist_t *r_queue; /* Running job queue */
zlist_t *c_queue; /* Complete/cancelled job queue */
machs_t *machs; /* Helps resolve resources to ranks */
bool ooo_capable; /* sched policy schedule jobs out of order */
ssrvarg_t arg; /* args passed to this module */
simctx_t sctx; /* simulator context */
resrc_api_ctx_t *rsapi; /* resrc_api handle */
Expand Down Expand Up @@ -371,6 +372,7 @@ static ssrvctx_t *getctx (flux_t *h)
oom ();
if (!(ctx->machs = rs2rank_tab_new ()))
oom ();
ctx->ooo_capable = true;
ssrvarg_init (&(ctx->arg));
ctx->rsapi = resrc_api_init ();
ctx->sctx.in_sim = false;
Expand Down Expand Up @@ -1622,13 +1624,13 @@ static int schedule_jobs (ssrvctx_t *ctx)

if (priority_plugin)
priority_plugin->prioritize_jobs (ctx->h, jobs);
if (!behavior_plugin)
return -1;

/* Sort by decreasing priority */
zlist_sort (jobs, compare_priority);
resrc_tree_release_all_reservations (resrc_tree_root (ctx->rsapi));

if (!behavior_plugin)
return -1;
if (ctx->ooo_capable)
resrc_tree_release_all_reservations (resrc_tree_root (ctx->rsapi));
rc = behavior_plugin->sched_loop_setup (ctx->h);
job = zlist_first (jobs);
while (!rc && job && (qdepth < ctx->arg.s_params.queue_depth)) {
Expand Down Expand Up @@ -1967,12 +1969,21 @@ int mod_main (flux_t *h, int argc, char **argv)
flux_log_error (h, "failed to load %s", ctx->arg.userplugin);
goto done;
}
flux_log (h, LOG_INFO, "%s plugin loaded", ctx->arg.userplugin);
if (plugin_process_args (ctx, ctx->arg.userplugin_opts) < 0) {
flux_log_error (h, "failed to process args for %s",
ctx->arg.userplugin);
goto done;
}
flux_log (h, LOG_INFO, "%s plugin loaded", ctx->arg.userplugin);
struct sched_prop prop;
struct behavior_plugin *behavior_plugin = behavior_plugin_get (ctx->loader);
if (behavior_plugin->get_sched_properties (h, &prop) < 0) {
flux_log_error (h, "failed to fetch sched plugin properties for %s",
ctx->arg.userplugin);
errno = EINVAL;
goto done;
}
ctx->ooo_capable = prop.out_of_order_capable;
}
if (ctx->arg.prio_plugin) {
if (sched_plugin_load (ctx->loader, ctx->arg.prio_plugin) < 0) {
Expand Down
9 changes: 9 additions & 0 deletions sched/sched_backfill.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ resrc_tree_t *select_resources (flux_t *h, resrc_api_ctx_t *rsapi,
resrc_reqst_t *resrc_reqst,
resrc_tree_t *selected_parent);

int get_sched_properties (flux_t *h, struct sched_prop *prop)
{
if (!prop)
return -1;

prop->out_of_order_capable = true;
return 0;
}

int sched_loop_setup (flux_t *h)
{
curr_reservation_depth = 0;
Expand Down
16 changes: 14 additions & 2 deletions sched/sched_fcfs.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "resrc_reqst.h"
#include "scheduler.h"

static int queue_depth = SCHED_PARAM_Q_DEPTH_DEFAULT;

static bool select_children (flux_t *h, resrc_api_ctx_t *rsapi,
resrc_tree_list_t *children,
Expand All @@ -52,6 +53,15 @@ resrc_tree_t *select_resources (flux_t *h, resrc_api_ctx_t *rsapi,
resrc_reqst_t *resrc_reqst,
resrc_tree_t *selected_parent);

int get_sched_properties (flux_t *h, struct sched_prop *prop)
{
if (!prop)
return -1;

prop->out_of_order_capable = (queue_depth > 1)? true : false;
return 0;
}

int sched_loop_setup (flux_t *h)
{
return 0;
Expand Down Expand Up @@ -252,14 +262,16 @@ int reserve_resources (flux_t *h, resrc_api_ctx_t *rsapi,
{
int rc = -1;

if (*selected_tree)
/* If queue_depth is 1, this scheduler isn't out-of-order capable */
if (queue_depth > 1 && *selected_tree)
rc = resrc_tree_reserve (*selected_tree, job_id, 0, 0);
return rc;
}


int process_args (flux_t *h, char *argz, size_t argz_len)
int process_args (flux_t *h, char *argz, size_t argz_len, const sched_params_t *sp)
{
queue_depth = sp->queue_depth;
return 0;
}

Expand Down
6 changes: 6 additions & 0 deletions sched/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ typedef struct {
double priority; /*!< scheduling priority */
} flux_lwj_t;

/**
* Defines the properties of the scheduler plugin
*/
struct sched_prop {
bool out_of_order_capable; ; /*!< true if out of order scheduling*/
};

/**
* Defines parameters that control scheduling optimization
Expand Down
1 change: 1 addition & 0 deletions t/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ TESTS = \
t2000-fcfs.t \
t2001-fcfs-aware.t \
t2002-easy.t \
t2003-fcfs-inorder.t \
t3001-resource-basic.t \
t3002-resource-prefix.t \
t3003-resource-global.t \
Expand Down
50 changes: 50 additions & 0 deletions t/t2003-fcfs-inorder.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#!/bin/bash
#set -x

test_description='Test fcfs scheduler with queue-depth=1 in simulator
'

# source sharness from the directore where this test
# file resides
#
. $(dirname $0)/sharness.sh
FLUX_MODULE_PATH="${SHARNESS_BUILD_DIRECTORY}/simulator/.libs:${FLUX_MODULE_PATH}"

rdlconf=$(readlink -e "${SHARNESS_TEST_SRCDIR}/../conf/hype-io.lua")
jobdata=$(readlink -e "${SHARNESS_TEST_SRCDIR}/data/job-traces/hype-test.csv")
expected_order=$(readlink -e "${SHARNESS_TEST_SRCDIR}/data/emulator-data/fcfs_expected")

#
# print only with --debug
#
test_debug '
echo rdlconf=${rdlconf} &&
echo jobdata=${jobdata} &&
echo expected_order=${expected_order}
'

#
# test_under_flux is under sharness.d/
#
test_under_flux 1

test_expect_success 'sim: started successfully with queue-depth=1' '
adjust_session_info 12 &&
timed_wait_job 5 &&
flux module load sim exit-on-complete=false &&
flux module load submit job-csv=${jobdata} &&
flux module load sim_exec &&
flux module load sched rdl-conf=${rdlconf} in-sim=true plugin=sched.fcfs sched-params=queue-depth=1
'

test_expect_success 'sim: scheduled and ran all jobs with queue-depth=1' '
timed_sync_wait_job 60
'

for x in $(seq 1 12); do echo "$x $(flux kvs get $(job_kvs_path $x).starting_time)"; done | sort -k 2n -k 1n | cut -d ' ' -f 1 > actual

test_expect_success 'jobs scheduled in correct order with queue-depth=1' '
diff -u ${expected_order} ./actual
'

test_done

0 comments on commit 09ad395

Please sign in to comment.