Skip to content

Commit

Permalink
Merge pull request #278 from garlick/msg_handlers
Browse files Browse the repository at this point in the history
track bulk message handler registration API change
  • Loading branch information
morrone authored Nov 15, 2017
2 parents a55889f + 9ee7e4d commit e068c92
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 42 deletions.
2 changes: 1 addition & 1 deletion rdl/test/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ AM_CPPFLAGS = -I$(top_srcdir) $(JANSSON_CFLAGS) $(LUA_INCLUDE)

TESTS_ENVIRONMENT = \
LUA_PATH="$(abs_top_srcdir)/rdl/?.lua;$(FLUX_PREFIX)/share/lua/5.1/?.lua;$(FLUX_PREFIX)/share/lua/5.1/fluxometer/?.lua;$(LUA_PATH);;" \
LUA_CPATH="$(abs_top_builddir)/rdl/?.so;$(abs_top_builddir)/rdl/test/.libs/?.so;$(FLUX_PREFIX)/lib64/lua/5.1/?.so;$(LUA_CPATH);;" \
LUA_CPATH="$(abs_top_builddir)/rdl/?.so;$(abs_top_builddir)/rdl/test/.libs/?.so;$(FLUX_PREFIX)/lib64/lua/5.1/?.so;$(FLUX_PREFIX)/lib/lua/5.1/?.so;$(LUA_CPATH);;" \
TESTRDL_INPUT_FILE="$(abs_top_srcdir)/conf/hype.lua"

TESTS = trdl test-jansson.lua
Expand Down
2 changes: 1 addition & 1 deletion resource/planner/test/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ AM_CPPFLAGS = -I$(top_srcdir) $(CZMQ_CFLAGS) $(FLUX_CORE_CFLAGS)
TESTS_ENVIRONMENT = \
TESTRESRC_INPUT_FILE="$(abs_top_srcdir)/conf/hype.lua" \
LUA_PATH="$(abs_top_srcdir)/rdl/?.lua;$(FLUX_PREFIX)/share/lua/5.1/?.lua;$(LUA_PATH);;" \
LUA_CPATH="$(abs_top_builddir)/rdl/?.so;$(FLUX_PREFIX)/lib64/lua/5.1/?.so;$(LUA_CPATH);;"
LUA_CPATH="$(abs_top_builddir)/rdl/?.so;$(abs_top_builddir)/rdl/test/.libs/?.so;$(FLUX_PREFIX)/lib64/lua/5.1/?.so;$(FLUX_PREFIX)/lib/lua/5.1/?.so;$(LUA_CPATH);;"

TESTS = planner_test01

Expand Down
2 changes: 1 addition & 1 deletion resrc/test/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ AM_CPPFLAGS = -I$(top_srcdir) $(JANSSON_CFLAGS) $(CZMQ_CFLAGS) $(FLUX_CORE_CFLAG
TESTS_ENVIRONMENT = \
TESTRESRC_INPUT_FILE="$(abs_top_srcdir)/conf/hype.lua" \
LUA_PATH="$(abs_top_srcdir)/rdl/?.lua;$(FLUX_PREFIX)/share/lua/5.1/?.lua;$(LUA_PATH);;" \
LUA_CPATH="$(abs_top_builddir)/rdl/?.so;$(FLUX_PREFIX)/lib64/lua/5.1/?.so;$(LUA_CPATH);;"
LUA_CPATH="$(abs_top_builddir)/rdl/?.so;$(abs_top_builddir)/rdl/test/.libs/?.so;$(FLUX_PREFIX)/lib64/lua/5.1/?.so;$(FLUX_PREFIX)/lib/lua/5.1/?.so;$(LUA_CPATH);;"

TESTS = tresrc

Expand Down
14 changes: 8 additions & 6 deletions sched/plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
struct sched_plugin_loader {
flux_t *h;
struct sched_plugin *plugin;
flux_msg_handler_t **handlers;
};

static void plugin_destroy (struct sched_plugin *plugin)
Expand Down Expand Up @@ -290,10 +291,10 @@ static void lsmod_cb (flux_t *h, flux_msg_handler_t *w,
}


static struct flux_msg_handler_spec plugin_htab[] = {
{ FLUX_MSGTYPE_REQUEST, "sched.insmod", insmod_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "sched.rmmod", rmmod_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "sched.lsmod", lsmod_cb, 0, NULL },
static const struct flux_msg_handler_spec plugin_htab[] = {
{ FLUX_MSGTYPE_REQUEST, "sched.insmod", insmod_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "sched.rmmod", rmmod_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "sched.lsmod", lsmod_cb, 0 },
FLUX_MSGHANDLER_TABLE_END,
};

Expand All @@ -306,7 +307,8 @@ struct sched_plugin_loader *sched_plugin_loader_create (flux_t *h)
}
memset (sploader, 0, sizeof (*sploader));
sploader->h = h;
if (flux_msg_handler_addvec (h, plugin_htab, sploader) < 0) {
if (flux_msg_handler_addvec (h, plugin_htab, sploader,
&sploader->handlers) < 0) {
flux_log_error (h, "flux_msghandler_addvec");
free (sploader);
return NULL;
Expand All @@ -318,7 +320,7 @@ void sched_plugin_loader_destroy (struct sched_plugin_loader *sploader)
{
if (sploader) {
sched_plugin_unload (sploader);
flux_msg_handler_delvec (plugin_htab);
flux_msg_handler_delvec (sploader->handlers);
free (sploader);
}
}
Expand Down
23 changes: 15 additions & 8 deletions sched/sched.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ typedef struct {
flux_watcher_t *before;
flux_watcher_t *after;
flux_watcher_t *idle;
flux_msg_handler_t **handlers;
flux_msg_handler_t **sim_handlers;
} ssrvctx_t;


Expand Down Expand Up @@ -329,6 +331,8 @@ static void freectx (void *arg)
flux_watcher_destroy (ctx->after);
if (ctx->idle)
flux_watcher_destroy (ctx->idle);
flux_msg_handler_delvec (ctx->handlers);
flux_msg_handler_delvec (ctx->sim_handlers);
free (ctx);
}

Expand Down Expand Up @@ -360,6 +364,8 @@ static ssrvctx_t *getctx (flux_t *h)
ctx->before = NULL;
ctx->after = NULL;
ctx->idle = NULL;
ctx->handlers = NULL;
ctx->sim_handlers = NULL;
flux_aux_set (h, "sched", ctx, freectx);
}
return ctx;
Expand Down Expand Up @@ -946,10 +952,10 @@ static void trigger_cb (flux_t *h,
/*
* Simulator Initialization Functions
*/
static struct flux_msg_handler_spec sim_htab[] = {
{FLUX_MSGTYPE_EVENT, "sim.start", start_cb, 0, NULL},
{FLUX_MSGTYPE_REQUEST, "sched.trigger", trigger_cb, 0, NULL},
{FLUX_MSGTYPE_EVENT, "sched.res.*", sim_res_event_cb, 0, NULL},
static const struct flux_msg_handler_spec sim_htab[] = {
{FLUX_MSGTYPE_EVENT, "sim.start", start_cb, 0},
{FLUX_MSGTYPE_REQUEST, "sched.trigger", trigger_cb, 0},
{FLUX_MSGTYPE_EVENT, "sched.res.*", sim_res_event_cb, 0},
FLUX_MSGHANDLER_TABLE_END,
};

Expand All @@ -966,7 +972,8 @@ static int reg_sim_events (ssrvctx_t *ctx)
flux_log (ctx->h, LOG_ERR, "subscribing to event: %s", strerror (errno));
goto done;
}
if (flux_msg_handler_addvec (ctx->h, sim_htab, (void *)h) < 0) {
if (flux_msg_handler_addvec (ctx->h, sim_htab, (void *)h,
&ctx->sim_handlers) < 0) {
flux_log (ctx->h, LOG_ERR, "flux_msg_handler_addvec: %s", strerror (errno));
goto done;
}
Expand Down Expand Up @@ -1007,8 +1014,8 @@ static int setup_sim (ssrvctx_t *ctx, bool sim)
* *
******************************************************************************/

static struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_EVENT, "sched.res.*", res_event_cb, 0, NULL},
static const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_EVENT, "sched.res.*", res_event_cb, 0},
FLUX_MSGHANDLER_TABLE_END
};

Expand All @@ -1030,7 +1037,7 @@ static int inline reg_events (ssrvctx_t *ctx)
rc = -1;
goto done;
}
if (flux_msg_handler_addvec (h, htab, (void *)h) < 0) {
if (flux_msg_handler_addvec (h, htab, (void *)h, &ctx->handlers) < 0) {
flux_log (h, LOG_ERR,
"error registering resource event handler: %s",
strerror (errno));
Expand Down
20 changes: 12 additions & 8 deletions simulator/sim_execsrv.c
Original file line number Diff line number Diff line change
Expand Up @@ -642,17 +642,19 @@ static void run_cb (flux_t *h,
flux_log (h, LOG_DEBUG, "queued the running of jobid %d", *jobid);
}

static struct flux_msg_handler_spec htab[] = {
{FLUX_MSGTYPE_EVENT, "sim.start", start_cb, 0, NULL},
{FLUX_MSGTYPE_REQUEST, "sim_exec.trigger", trigger_cb, 0, NULL},
{FLUX_MSGTYPE_REQUEST, "sim_exec.run.*", run_cb, 0, NULL},
static const struct flux_msg_handler_spec htab[] = {
{FLUX_MSGTYPE_EVENT, "sim.start", start_cb, 0},
{FLUX_MSGTYPE_REQUEST, "sim_exec.trigger", trigger_cb, 0},
{FLUX_MSGTYPE_REQUEST, "sim_exec.run.*", run_cb, 0},
FLUX_MSGHANDLER_TABLE_END,
};

int mod_main (flux_t *h, int argc, char **argv)
{
ctx_t *ctx = getctx (h);
uint32_t rank;
flux_msg_handler_t **handlers = NULL;
int rc = -1;

if (flux_get_rank (h, &rank) < 0)
return -1;
Expand All @@ -666,7 +668,7 @@ int mod_main (flux_t *h, int argc, char **argv)
flux_log (h, LOG_ERR, "subscribing to event: %s", strerror (errno));
return -1;
}
if (flux_msg_handler_addvec (h, htab, ctx) < 0) {
if (flux_msg_handler_addvec (h, htab, ctx, &handlers) < 0) {
flux_log (h, LOG_ERR, "flux_msg_handler_add: %s", strerror (errno));
return -1;
}
Expand All @@ -675,10 +677,12 @@ int mod_main (flux_t *h, int argc, char **argv)

if (flux_reactor_run (flux_get_reactor (h), 0) < 0) {
flux_log (h, LOG_ERR, "flux_reactor_run: %s", strerror (errno));
return -1;
goto done_delvec;
}

return 0;
rc = 0;
done_delvec:
flux_msg_handler_delvec (handlers);
return rc;
}

MOD_NAME ("sim_exec");
24 changes: 14 additions & 10 deletions simulator/simsrv.c
Original file line number Diff line number Diff line change
Expand Up @@ -427,14 +427,13 @@ static void alive_cb (flux_t *h,
flux_log (h, LOG_DEBUG, "sending start event again");
}

static struct flux_msg_handler_spec htab[] = {
{FLUX_MSGTYPE_REQUEST, "sim.join", join_cb, 0, NULL},
{FLUX_MSGTYPE_REQUEST, "sim.reply", reply_cb, 0, NULL},
{FLUX_MSGTYPE_REQUEST, "sim.alive", alive_cb, 0, NULL},
{FLUX_MSGTYPE_EVENT, "rdl.update", rdl_update_cb, 0, NULL},
static const struct flux_msg_handler_spec htab[] = {
{FLUX_MSGTYPE_REQUEST, "sim.join", join_cb, 0},
{FLUX_MSGTYPE_REQUEST, "sim.reply", reply_cb, 0},
{FLUX_MSGTYPE_REQUEST, "sim.alive", alive_cb, 0},
{FLUX_MSGTYPE_EVENT, "rdl.update", rdl_update_cb, 0},
FLUX_MSGHANDLER_TABLE_END,
};
const int htablen = sizeof (htab) / sizeof (htab[0]);

int mod_main (flux_t *h, int argc, char **argv)
{
Expand All @@ -443,6 +442,8 @@ int mod_main (flux_t *h, int argc, char **argv)
char *eoc_str;
bool exit_on_complete;
uint32_t rank;
flux_msg_handler_t **handlers = NULL;
int rc = -1;

if (flux_get_rank (h, &rank) < 0)
return -1;
Expand Down Expand Up @@ -470,22 +471,25 @@ int mod_main (flux_t *h, int argc, char **argv)
return -1;
}

if (flux_msg_handler_addvec (h, htab, ctx) < 0) {
if (flux_msg_handler_addvec (h, htab, ctx, &handlers) < 0) {
flux_log (h, LOG_ERR, "flux_msg_handler_add: %s", strerror (errno));
return -1;
}

if (send_start_event (h) < 0) {
flux_log (h, LOG_ERR, "sim failed to send start event");
return -1;
goto done_delvec;
}
flux_log (h, LOG_DEBUG, "sim sent start event");

if (flux_reactor_run (flux_get_reactor (h), 0) < 0) {
flux_log (h, LOG_ERR, "flux_reactor_run: %s", strerror (errno));
return -1;
goto done_delvec;
}
return 0;
rc = 0;
done_delvec:
flux_msg_handler_delvec (handlers);
return rc;
}

MOD_NAME ("sim");
17 changes: 11 additions & 6 deletions simulator/submitsrv.c
Original file line number Diff line number Diff line change
Expand Up @@ -349,9 +349,9 @@ static void trigger_cb (flux_t *h,
Jput (o);
}

static struct flux_msg_handler_spec htab[] = {
{FLUX_MSGTYPE_EVENT, "sim.start", start_cb, 0, NULL},
{FLUX_MSGTYPE_REQUEST, "submit.trigger", trigger_cb, 0, NULL},
static const struct flux_msg_handler_spec htab[] = {
{FLUX_MSGTYPE_EVENT, "sim.start", start_cb, 0},
{FLUX_MSGTYPE_REQUEST, "submit.trigger", trigger_cb, 0},
FLUX_MSGHANDLER_TABLE_END,
};

Expand All @@ -362,6 +362,8 @@ int mod_main (flux_t *h, int argc, char **argv)
oom ();
char *csv_filename;
uint32_t rank;
flux_msg_handler_t **handlers = NULL;
int rc = -1;

if (flux_get_rank (h, &rank) < 0)
return -1;
Expand All @@ -384,7 +386,7 @@ int mod_main (flux_t *h, int argc, char **argv)
flux_log (h, LOG_ERR, "subscribing to event: %s", strerror (errno));
return -1;
}
if (flux_msg_handler_addvec (h, htab, NULL) < 0) {
if (flux_msg_handler_addvec (h, htab, NULL, &handlers) < 0) {
flux_log (h, LOG_ERR, "flux_msg_handler_addvec: %s", strerror (errno));
return -1;
}
Expand All @@ -393,10 +395,13 @@ int mod_main (flux_t *h, int argc, char **argv)

if (flux_reactor_run (flux_get_reactor (h), 0) < 0) {
flux_log (h, LOG_ERR, "flux_reactor_run: %s", strerror (errno));
return -1;
goto done_delvec;
}
rc = 0;
done_delvec:
flux_msg_handler_delvec (handlers);
zhash_destroy (&args);
return 0;
return rc;
}

MOD_NAME ("submit");
2 changes: 1 addition & 1 deletion t/t1004-module-load.t
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ test_expect_success 'module-load: sched loads the backfill plugin with arguments
test_expect_success 'module-load: no jobs are lost' '
for i in `seq $sched_start_jobid $sched_end_jobid`
do
state=$(flux kvs get $(job_kvs_path $i).state)
state=$(flux kvs get -j $(job_kvs_path $i).state)
if test $state != "submitted"; then
return 48
fi
Expand Down

0 comments on commit e068c92

Please sign in to comment.