diff --git a/doc/man3/flux_msg_handler_addvec.adoc b/doc/man3/flux_msg_handler_addvec.adoc index 810042295401..c1cd64dbfa55 100644 --- a/doc/man3/flux_msg_handler_addvec.adoc +++ b/doc/man3/flux_msg_handler_addvec.adoc @@ -5,7 +5,8 @@ flux_msg_handler_addvec(3) NAME ---- -flux_msg_handler_addvec, flux_msg_handler_delvec - bulk add/remove message handlers +flux_msg_handler_addvec, +flux_msg_handler_delvec - bulk add/remove message handlers SYNOPSIS @@ -14,17 +15,17 @@ SYNOPSIS struct flux_msg_handler_spec { int typemask; - char *topic_glob; + const char *topic_glob; flux_msg_handler_f cb; uint32_t rolemask; - flux_msg_handler_t *w; }; int flux_msg_handler_addvec (flux_t *h, - struct flux_msg_handler_spec tab[], - void *arg); + const struct flux_msg_handler_spec tab[], + void *arg, + flux_msg_handler_t **handlers[]); - void flux_msg_handler_delvec (struct flux_msg_handler_spec tab[]); + void flux_msg_handler_delvec (flux_msg_handler_t *handlers[]); DESCRIPTION @@ -32,10 +33,11 @@ DESCRIPTION `flux_msg_handler_addvec()` creates and starts an array of message handlers, terminated by FLUX_MSGHANDLER_TABLE_END. The new message handler objects -are stored in the array. +are assigned to an internally allocated array, returned in _handlers_. +The last entry in the array is set to NULL. -`flux_msg_handler_delvec()` stops and destroys an array of message handler -objects, terminated by FLUX_MSGHANDLER_TABLE_END. +`flux_msg_handler_delvec()` stops and destroys an array of message handlers +returned from `flux_msg_handler_addvec()`. These functions are convenience functions which call `flux_msg_handler_create(3)`, `flux_msg_handler_start(3)`; and diff --git a/doc/man3/treduce.c b/doc/man3/treduce.c index 02f7b058122a..2d8e93438cf5 100644 --- a/doc/man3/treduce.c +++ b/doc/man3/treduce.c @@ -103,9 +103,9 @@ void heartbeat_cb (flux_t *h, flux_msg_handler_t *mh, free (item); } -struct flux_msg_handler_spec htab[] = { - { FLUX_MSGTYPE_EVENT, "hb", heartbeat_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "treduce.forward", forward_cb, 0, NULL }, +const struct flux_msg_handler_spec htab[] = { + { FLUX_MSGTYPE_EVENT, "hb", heartbeat_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "treduce.forward", forward_cb, 0 }, FLUX_MSGHANDLER_TABLE_END, }; @@ -123,6 +123,7 @@ int mod_main (flux_t *h, int argc, char **argv) uint32_t rank; double timeout = 0.; int flags; + flux_msg_handler_t **handlers = NULL; if (argc == 1) { timeout = strtod (argv[0], NULL); @@ -139,11 +140,11 @@ int mod_main (flux_t *h, int argc, char **argv) return -1; if (flux_event_subscribe (h, "hb") < 0) return -1; - if (flux_msg_handler_addvec (h, htab, &ctx) < 0) + if (flux_msg_handler_addvec (h, htab, &ctx, &handlers) < 0) return -1; if (flux_reactor_run (flux_get_reactor (h), 0) < 0) return -1; - flux_msg_handler_delvec (htab); + flux_msg_handler_delvec (handlers); return 0; } diff --git a/src/broker/attr.c b/src/broker/attr.c index 480a143f7be9..4638d84d05ae 100644 --- a/src/broker/attr.c +++ b/src/broker/attr.c @@ -35,6 +35,7 @@ struct attr_struct { zhash_t *hash; + flux_msg_handler_t **handlers; }; struct entry { @@ -415,27 +416,24 @@ void lsattr_request_cb (flux_t *h, flux_msg_handler_t *mh, ** Initialization **/ -static struct flux_msg_handler_spec handlers[] = { - { FLUX_MSGTYPE_REQUEST, "attr.get", getattr_request_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "attr.list", lsattr_request_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "attr.set", setattr_request_cb, 0, NULL }, +static const struct flux_msg_handler_spec handlers[] = { + { FLUX_MSGTYPE_REQUEST, "attr.get", getattr_request_cb, FLUX_ROLE_ALL }, + { FLUX_MSGTYPE_REQUEST, "attr.list", lsattr_request_cb, FLUX_ROLE_ALL }, + { FLUX_MSGTYPE_REQUEST, "attr.set", setattr_request_cb, 0 }, FLUX_MSGHANDLER_TABLE_END, }; int attr_register_handlers (attr_t *attrs, flux_t *h) { - if (flux_msg_handler_addvec (h, handlers, attrs) < 0) + if (flux_msg_handler_addvec (h, handlers, attrs, &attrs->handlers) < 0) return -1; - /* allow any user to attr.get and attr.list */ - flux_msg_handler_allow_rolemask (handlers[0].w, FLUX_ROLE_ALL); - flux_msg_handler_allow_rolemask (handlers[1].w, FLUX_ROLE_ALL); return 0; } -void attr_unregister_handlers (void) +void attr_unregister_handlers (attr_t *attrs) { - flux_msg_handler_delvec (handlers); + flux_msg_handler_delvec (attrs->handlers); } attr_t *attr_create (void) diff --git a/src/broker/attr.h b/src/broker/attr.h index 805079575e69..c5aa5f983fd6 100644 --- a/src/broker/attr.h +++ b/src/broker/attr.h @@ -20,7 +20,7 @@ void attr_destroy (attr_t *attrs); /* Register/unregister message handlers */ int attr_register_handlers (attr_t *attrs, flux_t *h); -void attr_unregister_handlers (void); +void attr_unregister_handlers (attr_t *attrs); /* Delete an attribute */ diff --git a/src/broker/broker.c b/src/broker/broker.c index 96972c22876d..e088c4b592ae 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -157,7 +157,8 @@ static void signal_cb (flux_reactor_t *r, flux_watcher_t *w, static void broker_handle_signals (broker_ctx_t *ctx, zlist_t *sigwatchers); static void broker_unhandle_signals (zlist_t *sigwatchers); -static void broker_add_services (broker_ctx_t *ctx); +static flux_msg_handler_t **broker_add_services (broker_ctx_t *ctx); +static void broker_remove_services (flux_msg_handler_t *handlers[]); static int load_module_byname (broker_ctx_t *ctx, const char *name, const char *argz, size_t argz_len, @@ -310,6 +311,7 @@ int main (int argc, char *argv[]) sigset_t old_sigmask; struct sigaction old_sigact_int; struct sigaction old_sigact_term; + flux_msg_handler_t **handlers; memset (&ctx, 0, sizeof (ctx)); log_init (argv[0]); @@ -579,7 +581,7 @@ int main (int argc, char *argv[]) if (rusage_initialize (ctx.h, "cmb") < 0) log_err_exit ("rusage_initialize"); - broker_add_services (&ctx); + handlers = broker_add_services (&ctx); /* Initialize comms module infrastructure. */ @@ -651,7 +653,7 @@ int main (int argc, char *argv[]) /* Unregister builtin services */ - attr_unregister_handlers (); + attr_unregister_handlers (ctx.attrs); content_cache_destroy (ctx.cache); broker_unhandle_signals (sigwatchers); @@ -668,6 +670,8 @@ int main (int argc, char *argv[]) service_switch_destroy (ctx.services); hello_destroy (ctx.hello); attr_destroy (ctx.attrs); + shutdown_destroy (ctx.shutdown); + broker_remove_services (handlers); flux_close (ctx.h); flux_reactor_destroy (ctx.reactor); if (ctx.subscriptions) { @@ -1694,16 +1698,16 @@ static int route_to_handle (const flux_msg_t *msg, void *arg) return 0; } -static struct flux_msg_handler_spec handlers[] = { - { FLUX_MSGTYPE_REQUEST, "cmb.rmmod", cmb_rmmod_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "cmb.insmod", cmb_insmod_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "cmb.lsmod", cmb_lsmod_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "cmb.lspeer", cmb_lspeer_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "cmb.panic", cmb_panic_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "cmb.event-mute", cmb_event_mute_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "cmb.disconnect", cmb_disconnect_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "cmb.sub", cmb_sub_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "cmb.unsub", cmb_unsub_cb, 0, NULL }, +static const struct flux_msg_handler_spec htab[] = { + { FLUX_MSGTYPE_REQUEST, "cmb.rmmod", cmb_rmmod_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "cmb.insmod", cmb_insmod_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "cmb.lsmod", cmb_lsmod_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "cmb.lspeer", cmb_lspeer_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "cmb.panic", cmb_panic_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "cmb.event-mute", cmb_event_mute_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "cmb.disconnect", cmb_disconnect_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "cmb.sub", cmb_sub_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "cmb.unsub", cmb_unsub_cb, 0 }, FLUX_MSGHANDLER_TABLE_END, }; @@ -1727,8 +1731,9 @@ static struct internal_service services[] = { * Register message handlers for some cmb services. Others are registered * in their own initialization functions. */ -static void broker_add_services (broker_ctx_t *ctx) +static flux_msg_handler_t **broker_add_services (broker_ctx_t *ctx) { + flux_msg_handler_t **handlers; struct internal_service *svc; for (svc = &services[0]; svc->name != NULL; svc++) { if (!nodeset_member (svc->nodeset, overlay_get_rank(ctx->overlay))) @@ -1738,8 +1743,16 @@ static void broker_add_services (broker_ctx_t *ctx) log_err_exit ("error registering service for %s", svc->name); } - if (flux_msg_handler_addvec (ctx->h, handlers, ctx) < 0) + if (flux_msg_handler_addvec (ctx->h, htab, ctx, &handlers) < 0) log_err_exit ("error registering message handlers"); + return handlers; +} + +/* Unregister message handlers + */ +static void broker_remove_services (flux_msg_handler_t *handlers[]) +{ + flux_msg_handler_delvec (handlers); } /** diff --git a/src/broker/content-cache.c b/src/broker/content-cache.c index 809491b830bb..778dc386959f 100644 --- a/src/broker/content-cache.c +++ b/src/broker/content-cache.c @@ -71,6 +71,7 @@ struct cache_entry { struct content_cache { flux_t *h; flux_t *enclosing_h; + flux_msg_handler_t **handlers; uint32_t rank; zhash_t *entries; uint8_t backing:1; /* 'content.backing' service available */ @@ -854,14 +855,16 @@ static void heartbeat_event (flux_t *h, flux_msg_handler_t *mh, /* Initialization */ -static struct flux_msg_handler_spec handlers[] = { - { FLUX_MSGTYPE_REQUEST, "content.load", content_load_request, FLUX_ROLE_USER, NULL }, - { FLUX_MSGTYPE_REQUEST, "content.store", content_store_request, FLUX_ROLE_USER, NULL }, - { FLUX_MSGTYPE_REQUEST, "content.backing", content_backing_request, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "content.dropcache", content_dropcache_request, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "content.stats.get", content_stats_request, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "content.flush", content_flush_request, 0, NULL }, - { FLUX_MSGTYPE_EVENT, "hb", heartbeat_event, 0, NULL }, +static const struct flux_msg_handler_spec htab[] = { + { FLUX_MSGTYPE_REQUEST, "content.load", content_load_request, + FLUX_ROLE_USER }, + { FLUX_MSGTYPE_REQUEST, "content.store", content_store_request, + FLUX_ROLE_USER }, + { FLUX_MSGTYPE_REQUEST, "content.backing", content_backing_request, 0 }, + { FLUX_MSGTYPE_REQUEST, "content.dropcache", content_dropcache_request, 0 }, + { FLUX_MSGTYPE_REQUEST, "content.stats.get", content_stats_request, 0 }, + { FLUX_MSGTYPE_REQUEST, "content.flush", content_flush_request, 0 }, + { FLUX_MSGTYPE_EVENT, "hb", heartbeat_event, 0 }, FLUX_MSGHANDLER_TABLE_END, }; @@ -869,7 +872,7 @@ int content_cache_set_flux (content_cache_t *cache, flux_t *h) { cache->h = h; - if (flux_msg_handler_addvec (h, handlers, cache) < 0) + if (flux_msg_handler_addvec (h, htab, cache, &cache->handlers) < 0) return -1; if (flux_get_rank (h, &cache->rank) < 0) return -1; @@ -975,7 +978,7 @@ void content_cache_destroy (content_cache_t *cache) if (cache) { if (cache->h) { (void)flux_event_unsubscribe (cache->h, "hb"); - flux_msg_handler_delvec (handlers); + flux_msg_handler_delvec (cache->handlers); } if (cache->backing_name) free (cache->backing_name); diff --git a/src/broker/exec.c b/src/broker/exec.c index 0ce5c1dc3dc4..dc7bc0fc03c4 100644 --- a/src/broker/exec.c +++ b/src/broker/exec.c @@ -41,6 +41,7 @@ typedef struct { flux_t *h; + flux_msg_handler_t **handlers; struct subprocess_manager *sm; uint32_t rank; const char *local_uri; @@ -507,19 +508,19 @@ static void ps_request_cb (flux_t *h, flux_msg_handler_t *mh, json_decref (procs); } -static struct flux_msg_handler_spec handlers[] = { - { FLUX_MSGTYPE_REQUEST, "cmb.exec", exec_request_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "cmb.exec.signal", signal_request_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "cmb.exec.write", write_request_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "cmb.processes", ps_request_cb, 0, NULL }, +static const struct flux_msg_handler_spec htab[] = { + { FLUX_MSGTYPE_REQUEST, "cmb.exec", exec_request_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "cmb.exec.signal", signal_request_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "cmb.exec.write", write_request_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "cmb.processes", ps_request_cb, 0 }, FLUX_MSGHANDLER_TABLE_END, }; static void exec_finalize (void *arg) { exec_t *x = arg; + flux_msg_handler_delvec (x->handlers); free (x); - flux_msg_handler_delvec (handlers); } int exec_initialize (flux_t *h, struct subprocess_manager *sm, @@ -537,7 +538,7 @@ int exec_initialize (flux_t *h, struct subprocess_manager *sm, free (x); return -1; } - if (flux_msg_handler_addvec (h, handlers, x) < 0) { + if (flux_msg_handler_addvec (h, htab, x, &x->handlers) < 0) { free (x); return -1; } diff --git a/src/broker/heaptrace.c b/src/broker/heaptrace.c index 8c15764bb15d..510fe00f7963 100644 --- a/src/broker/heaptrace.c +++ b/src/broker/heaptrace.c @@ -38,6 +38,8 @@ #include #include "heaptrace.h" +static flux_msg_handler_t **handlers = NULL; + static void start_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { @@ -111,10 +113,10 @@ static void stop_cb (flux_t *h, flux_msg_handler_t *mh, FLUX_LOG_ERROR (h); } -static struct flux_msg_handler_spec handlers[] = { - { FLUX_MSGTYPE_REQUEST, "heaptrace.start", start_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "heaptrace.dump", dump_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "heaptrace.stop", stop_cb, 0, NULL }, +static const struct flux_msg_handler_spec htab[] = { + { FLUX_MSGTYPE_REQUEST, "heaptrace.start", start_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "heaptrace.dump", dump_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "heaptrace.stop", stop_cb, 0 }, FLUX_MSGHANDLER_TABLE_END, }; @@ -126,7 +128,7 @@ static void heaptrace_finalize (void *arg) int heaptrace_initialize (flux_t *h) { char *dummy = "hello"; - if (flux_msg_handler_addvec (h, handlers, NULL) < 0) + if (flux_msg_handler_addvec (h, htab, NULL, &handlers) < 0) return -1; flux_aux_set (h, "flux::heaptrace", dummy, heaptrace_finalize); return 0; diff --git a/src/broker/hello.c b/src/broker/hello.c index fab594c9567c..6c514138496a 100644 --- a/src/broker/hello.c +++ b/src/broker/hello.c @@ -42,6 +42,7 @@ static double default_reduction_timeout = 10.; struct hello_struct { flux_t *h; attr_t *attrs; + flux_msg_handler_t **handlers; uint32_t rank; uint32_t size; uint32_t count; @@ -82,6 +83,7 @@ void hello_destroy (hello_t *hello) { if (hello) { flux_reduce_destroy (hello->reduce); + flux_msg_handler_delvec (hello->handlers); free (hello); } } @@ -123,8 +125,8 @@ int hello_register_attrs (hello_t *hello, attr_t *attrs) return 0; } -static struct flux_msg_handler_spec handlers[] = { - { FLUX_MSGTYPE_REQUEST, "hello.join", join_request, 0, NULL }, +static const struct flux_msg_handler_spec htab[] = { + { FLUX_MSGTYPE_REQUEST, "hello.join", join_request, 0 }, FLUX_MSGHANDLER_TABLE_END, }; @@ -169,7 +171,8 @@ int hello_start (hello_t *hello) log_err ("hello: error getting rank/size"); goto done; } - if (flux_msg_handler_addvec (hello->h, handlers, hello) < 0) { + if (flux_msg_handler_addvec (hello->h, htab, hello, + &hello->handlers) < 0) { log_err ("hello: adding message handlers"); goto done; } diff --git a/src/broker/log.c b/src/broker/log.c index f4640a8636e3..48022ede0503 100644 --- a/src/broker/log.c +++ b/src/broker/log.c @@ -46,6 +46,7 @@ static const int default_level = LOG_DEBUG; typedef struct { int magic; flux_t *h; + flux_msg_handler_t **handlers; uint32_t rank; char *filename; FILE *f; @@ -642,19 +643,19 @@ static void disconnect_request_cb (flux_t *h, flux_msg_handler_t *mh, /* no response */ } -static struct flux_msg_handler_spec handlers[] = { - { FLUX_MSGTYPE_REQUEST, "log.append", append_request_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "log.clear", clear_request_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "log.dmesg", dmesg_request_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "log.disconnect", disconnect_request_cb, 0, NULL }, +static const struct flux_msg_handler_spec htab[] = { + { FLUX_MSGTYPE_REQUEST, "log.append", append_request_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "log.clear", clear_request_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "log.dmesg", dmesg_request_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "log.disconnect", disconnect_request_cb, 0 }, FLUX_MSGHANDLER_TABLE_END, }; static void logbuf_finalize (void *arg) { logbuf_t *logbuf = arg; + flux_msg_handler_delvec (logbuf->handlers); logbuf_destroy (logbuf); - flux_msg_handler_delvec (handlers); /* FIXME: need logbuf_unregister_attrs() */ } @@ -674,7 +675,7 @@ int logbuf_initialize (flux_t *h, uint32_t rank, attr_t *attrs) goto error; if (fake_rank (h, rank) < 0) goto error; - if (flux_msg_handler_addvec (h, handlers, logbuf) < 0) + if (flux_msg_handler_addvec (h, htab, logbuf, &logbuf->handlers) < 0) goto error; flux_log_set_appname (h, "broker"); flux_log_set_redirect (h, logbuf_append_redirect, logbuf); diff --git a/src/broker/sequence.c b/src/broker/sequence.c index c5262098131c..aaaa5c3dcbae 100644 --- a/src/broker/sequence.c +++ b/src/broker/sequence.c @@ -36,6 +36,7 @@ typedef struct { zhash_t *vhash; + flux_msg_handler_t **handlers; } seqhash_t; static seqhash_t * sequence_hash_create (void) @@ -231,16 +232,16 @@ static void sequence_request_cb (flux_t *h, flux_msg_handler_t *mh, } } -static struct flux_msg_handler_spec handlers[] = { - { FLUX_MSGTYPE_REQUEST, "seq.*", sequence_request_cb, 0, NULL }, +static const struct flux_msg_handler_spec htab[] = { + { FLUX_MSGTYPE_REQUEST, "seq.*", sequence_request_cb, 0 }, FLUX_MSGHANDLER_TABLE_END, }; static void sequence_hash_finalize (void *arg) { seqhash_t *seq = arg; + flux_msg_handler_delvec (seq->handlers); sequence_hash_destroy (seq); - flux_msg_handler_delvec (handlers); } int sequence_hash_initialize (flux_t *h) @@ -248,7 +249,7 @@ int sequence_hash_initialize (flux_t *h) seqhash_t *seq = sequence_hash_create (); if (!seq) return -1; - if (flux_msg_handler_addvec (h, handlers, seq) < 0) { + if (flux_msg_handler_addvec (h, htab, seq, &seq->handlers) < 0) { sequence_hash_destroy (seq); return -1; } diff --git a/src/broker/shutdown.c b/src/broker/shutdown.c index 78fc692c65f3..284377c3da36 100644 --- a/src/broker/shutdown.c +++ b/src/broker/shutdown.c @@ -63,6 +63,7 @@ void shutdown_destroy (shutdown_t *s) if (s) { if (s->shutdown) flux_msg_handler_destroy (s->shutdown); + shutdown_disarm (s); if (s->h) (void)flux_event_unsubscribe (s->h, "shutdown"); free (s); diff --git a/src/cmd/builtin/proxy.c b/src/cmd/builtin/proxy.c index c5d231e19808..77c23fcd177c 100644 --- a/src/cmd/builtin/proxy.c +++ b/src/cmd/builtin/proxy.c @@ -875,9 +875,9 @@ static int child_create (proxy_ctx_t *ctx, int ac, char **av, const char *workpa return -1; } -static struct flux_msg_handler_spec htab[] = { - { FLUX_MSGTYPE_EVENT, NULL, event_cb, 0, NULL }, - { FLUX_MSGTYPE_RESPONSE, NULL, response_cb, 0, NULL }, +static const struct flux_msg_handler_spec htab[] = { + { FLUX_MSGTYPE_EVENT, NULL, event_cb, 0 }, + { FLUX_MSGTYPE_RESPONSE, NULL, response_cb, 0 }, FLUX_MSGHANDLER_TABLE_END }; @@ -900,6 +900,7 @@ static int cmd_proxy (optparse_t *p, int ac, char *av[]) const char *job; const char *optarg; int optindex; + flux_msg_handler_t **handlers = NULL; log_init ("flux-proxy"); @@ -978,7 +979,7 @@ static int cmd_proxy (optparse_t *p, int ac, char *av[]) /* Create/start event/response message watchers */ - if (flux_msg_handler_addvec (h, htab, ctx) < 0) { + if (flux_msg_handler_addvec (h, htab, ctx, &handlers) < 0) { flux_log_error (h, "flux_msg_watcher_addvec"); goto done; } @@ -990,7 +991,7 @@ static int cmd_proxy (optparse_t *p, int ac, char *av[]) goto done; } done: - flux_msg_handler_delvec (htab); + flux_msg_handler_delvec (handlers); flux_watcher_destroy (ctx->listen_w); if (ctx->listen_fd >= 0) { if (close (ctx->listen_fd) < 0) diff --git a/src/common/libflux/msg_handler.c b/src/common/libflux/msg_handler.c index 0f363d7849f3..431d77e5c232 100644 --- a/src/common/libflux/msg_handler.c +++ b/src/common/libflux/msg_handler.c @@ -556,51 +556,63 @@ flux_msg_handler_t *flux_msg_handler_create (flux_t *h, return NULL; } -int flux_msg_handler_addvec (flux_t *h, struct flux_msg_handler_spec tab[], - void *arg) +static bool at_end (struct flux_msg_handler_spec spec) +{ + struct flux_msg_handler_spec end = FLUX_MSGHANDLER_TABLE_END; + + return (spec.typemask == end.typemask + && spec.topic_glob == end.topic_glob + && spec.cb == end.cb + && spec.rolemask == end.rolemask); +} + +int flux_msg_handler_addvec (flux_t *h, + const struct flux_msg_handler_spec tab[], + void *arg, + flux_msg_handler_t **hp[]) { int i; struct flux_match match = FLUX_MATCH_ANY; + flux_msg_handler_t **handlers = NULL; + int count = 0; + int saved_errno; - for (i = 0; ; i++) { - if (!tab[i].typemask && !tab[i].topic_glob && !tab[i].cb) - break; /* FLUX_MSGHANDLER_TABLE_END */ + if (!h || !tab || !hp) { + errno = EINVAL; + return -1; + } + while (!at_end (tab[count])) + count++; + if (!(handlers = calloc (count + 1, sizeof (flux_msg_handler_t *)))) + return -1; + for (i = 0; i < count; i++) { match.typemask = tab[i].typemask; /* flux_msg_handler_create() will make a copy of the topic_glob * so it is safe to temporarily remove "const" from * tab[i].topic_glob with a cast. */ match.topic_glob = (char *)tab[i].topic_glob; - tab[i].w = flux_msg_handler_create (h, match, tab[i].cb, arg); - if (!tab[i].w) + if (!(handlers[i] = flux_msg_handler_create (h, match, tab[i].cb, arg))) goto error; - flux_msg_handler_allow_rolemask (tab[i].w, tab[i].rolemask); - flux_msg_handler_start (tab[i].w); + flux_msg_handler_allow_rolemask (handlers[i], tab[i].rolemask); + flux_msg_handler_start (handlers[i]); } + *hp = handlers; return 0; error: - while (i >= 0) { - if (tab[i].w) { - flux_msg_handler_stop (tab[i].w); - flux_msg_handler_destroy (tab[i].w); - tab[i].w = NULL; - } - i--; - } + saved_errno = errno; + flux_msg_handler_delvec (handlers); + errno = saved_errno; return -1; } -void flux_msg_handler_delvec (struct flux_msg_handler_spec tab[]) +void flux_msg_handler_delvec (flux_msg_handler_t *handlers[]) { - int i; - - for (i = 0; ; i++) { - if (!tab[i].typemask && !tab[i].topic_glob && !tab[i].cb) - break; /* FLUX_MSGHANDLER_TABLE_END */ - if (tab[i].w) { - flux_msg_handler_stop (tab[i].w); - flux_msg_handler_destroy (tab[i].w); - tab[i].w = NULL; + if (handlers) { + for (int i = 0; handlers[i] != NULL; i++) { + flux_msg_handler_destroy (handlers[i]); + handlers[i] = NULL; } + free (handlers); } } diff --git a/src/common/libflux/msg_handler.h b/src/common/libflux/msg_handler.h index fc95e9e8fa39..b16d9bd5eabf 100644 --- a/src/common/libflux/msg_handler.h +++ b/src/common/libflux/msg_handler.h @@ -36,13 +36,14 @@ struct flux_msg_handler_spec { const char *topic_glob; flux_msg_handler_f cb; uint32_t rolemask; - flux_msg_handler_t *w; }; -#define FLUX_MSGHANDLER_TABLE_END { 0, NULL, NULL, 0, NULL } +#define FLUX_MSGHANDLER_TABLE_END { 0, NULL, NULL, 0 } -int flux_msg_handler_addvec (flux_t *h, struct flux_msg_handler_spec tab[], - void *arg); -void flux_msg_handler_delvec (struct flux_msg_handler_spec tab[]); +int flux_msg_handler_addvec (flux_t *h, + const struct flux_msg_handler_spec tab[], + void *arg, + flux_msg_handler_t **msg_handlers[]); +void flux_msg_handler_delvec (flux_msg_handler_t *msg_handlers[]); /* Requeue any unmatched messages, if handle was cloned. */ diff --git a/src/common/libjsc/jstatctl.c b/src/common/libjsc/jstatctl.c index ccc622b965ac..d3a4148a0964 100644 --- a/src/common/libjsc/jstatctl.c +++ b/src/common/libjsc/jstatctl.c @@ -61,6 +61,7 @@ typedef struct { typedef struct { zhash_t *active_jobs; lru_cache_t *kvs_paths; + flux_msg_handler_t **handlers; zlist_t *callbacks; int first_time; flux_t *h; @@ -121,6 +122,7 @@ static void freectx (void *arg) zhash_destroy (&(ctx->active_jobs)); lru_cache_destroy (ctx->kvs_paths); zlist_destroy (&(ctx->callbacks)); + flux_msg_handler_delvec (ctx->handlers); } static jscctx_t *getctx (flux_t *h) @@ -1120,9 +1122,9 @@ static void job_state_cb (flux_t *h, flux_msg_handler_t *mh, * Public Job Status and Control API * * * ******************************************************************************/ -static struct flux_msg_handler_spec htab[] = { - { FLUX_MSGTYPE_EVENT, "wreck.state.*", job_state_cb, 0, NULL }, - { FLUX_MSGTYPE_EVENT, "jsc.state.*", job_state_cb, 0, NULL }, +static const struct flux_msg_handler_spec htab[] = { + { FLUX_MSGTYPE_EVENT, "wreck.state.*", job_state_cb, 0 }, + { FLUX_MSGTYPE_EVENT, "jsc.state.*", job_state_cb, 0 }, FLUX_MSGHANDLER_TABLE_END }; @@ -1131,6 +1133,7 @@ static int notify_status_obj (flux_t *h, jsc_handler_obj_f func, void *d) int rc = -1; cb_pair_t *c = NULL; jscctx_t *ctx = NULL; + flux_msg_handler_t **handlers; if (!func) goto done; @@ -1144,19 +1147,19 @@ static int notify_status_obj (flux_t *h, jsc_handler_obj_f func, void *d) rc = -1; goto done; } - if (flux_msg_handler_addvec (h, htab, (void *)ctx) < 0) { + if (flux_msg_handler_addvec (h, htab, NULL, &handlers) < 0) { flux_log_error (h, "registering resource event handler"); rc = -1; goto done; } ctx = getctx (h); + ctx->handlers = handlers; c = (cb_pair_t *) xzmalloc (sizeof(*c)); c->cb = func; c->arg = d; if (zlist_append (ctx->callbacks, c) < 0) goto done; - zlist_freefn (ctx->callbacks, c, free, true); rc = 0; diff --git a/src/modules/aggregator/aggregator.c b/src/modules/aggregator/aggregator.c index e8f0ec4b8ce3..9004b9363462 100644 --- a/src/modules/aggregator/aggregator.c +++ b/src/modules/aggregator/aggregator.c @@ -530,31 +530,31 @@ static void push_cb (flux_t *h, flux_msg_handler_t *mh, } -static struct flux_msg_handler_spec htab[] = { - //{ FLUX_MSGTYPE_EVENT, "hb", hb_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "aggregator.push", push_cb, 0, NULL }, +static const struct flux_msg_handler_spec htab[] = { + //{ FLUX_MSGTYPE_EVENT, "hb", hb_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "aggregator.push", push_cb, 0 }, FLUX_MSGHANDLER_TABLE_END, }; int mod_main (flux_t *h, int argc, char **argv) { int rc = -1; + flux_msg_handler_t **handlers = NULL; struct aggregator *ctx = aggregator_create (h); if (!ctx) goto done; - if (flux_msg_handler_addvec (h, htab, ctx) < 0) { + if (flux_msg_handler_addvec (h, htab, ctx, &handlers) < 0) { flux_log_error (h, "flux_msg_handler_advec"); goto done; } if (flux_reactor_run (flux_get_reactor (h), 0) < 0) { flux_log_error (h, "flux_reactor_run"); - goto done_delvec; + goto done; } rc = 0; -done_delvec: - flux_msg_handler_delvec (htab); done: + flux_msg_handler_delvec (handlers); aggregator_destroy (ctx); return rc; } diff --git a/src/modules/barrier/barrier.c b/src/modules/barrier/barrier.c index 56e64feea471..e386ecc32691 100644 --- a/src/modules/barrier/barrier.c +++ b/src/modules/barrier/barrier.c @@ -327,9 +327,9 @@ static void timeout_cb (flux_reactor_t *r, flux_watcher_t *w, } static struct flux_msg_handler_spec htab[] = { - { FLUX_MSGTYPE_REQUEST, "barrier.enter", enter_request_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "barrier.disconnect", disconnect_request_cb, 0, NULL }, - { FLUX_MSGTYPE_EVENT, "barrier.exit", exit_event_cb, 0, NULL }, + { FLUX_MSGTYPE_REQUEST, "barrier.enter", enter_request_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "barrier.disconnect", disconnect_request_cb, 0 }, + { FLUX_MSGTYPE_EVENT, "barrier.exit", exit_event_cb, 0 }, FLUX_MSGHANDLER_TABLE_END, }; @@ -337,6 +337,7 @@ int mod_main (flux_t *h, int argc, char **argv) { int rc = -1; barrier_ctx_t *ctx = getctx (h); + flux_msg_handler_t **handlers = NULL; if (!ctx) goto done; @@ -344,18 +345,17 @@ int mod_main (flux_t *h, int argc, char **argv) flux_log_error (h, "flux_event_subscribe"); goto done; } - if (flux_msg_handler_addvec (h, htab, ctx) < 0) { + if (flux_msg_handler_addvec (h, htab, ctx, &handlers) < 0) { flux_log_error (h, "flux_msghandler_add"); goto done; } if (flux_reactor_run (flux_get_reactor (h), 0) < 0) { flux_log_error (h, "flux_reactor_run"); - goto done_unreg; + goto done; } rc = 0; -done_unreg: - flux_msg_handler_delvec (htab); done: + flux_msg_handler_delvec (handlers); return rc; } diff --git a/src/modules/connector-local/local.c b/src/modules/connector-local/local.c index 2e371e118dfe..6a6c753ac6e9 100644 --- a/src/modules/connector-local/local.c +++ b/src/modules/connector-local/local.c @@ -931,12 +931,11 @@ static int listener_init (mod_local_ctx_t *ctx, char *sockpath) return -1; } -static struct flux_msg_handler_spec htab[] = { - { FLUX_MSGTYPE_EVENT, NULL, event_cb, FLUX_ROLE_ALL, NULL }, - { FLUX_MSGTYPE_RESPONSE, NULL, response_cb, FLUX_ROLE_ALL, NULL }, +static const struct flux_msg_handler_spec htab[] = { + { FLUX_MSGTYPE_EVENT, NULL, event_cb, FLUX_ROLE_ALL }, + { FLUX_MSGTYPE_RESPONSE, NULL, response_cb, FLUX_ROLE_ALL }, FLUX_MSGHANDLER_TABLE_END }; -const int htablen = sizeof (htab) / sizeof (htab[0]); int mod_main (flux_t *h, int argc, char **argv) { @@ -944,6 +943,7 @@ int mod_main (flux_t *h, int argc, char **argv) char sockpath[PATH_MAX + 1]; const char *local_uri = NULL; char *tmpdir; + flux_msg_handler_t **handlers = NULL; int rc = -1; if (!ctx) @@ -973,7 +973,7 @@ int mod_main (flux_t *h, int argc, char **argv) /* Create/start event/response message watchers */ - if (flux_msg_handler_addvec (h, htab, ctx) < 0) { + if (flux_msg_handler_addvec (h, htab, ctx, &handlers) < 0) { flux_log_error (h, "flux_msg_handler_addvec"); goto done; } @@ -982,12 +982,11 @@ int mod_main (flux_t *h, int argc, char **argv) */ if (flux_reactor_run (ctx->reactor, 0) < 0) { flux_log_error (h, "flux_reactor_run"); - goto done_delvec; + goto done; } rc = 0; -done_delvec: - flux_msg_handler_delvec (htab); done: + flux_msg_handler_delvec (handlers); flux_watcher_destroy (ctx->listen_w); if (ctx->listen_fd >= 0) { if (close (ctx->listen_fd) < 0) diff --git a/src/modules/content-sqlite/content-sqlite.c b/src/modules/content-sqlite/content-sqlite.c index 649bb8e31853..fc50db9a739e 100644 --- a/src/modules/content-sqlite/content-sqlite.c +++ b/src/modules/content-sqlite/content-sqlite.c @@ -511,31 +511,32 @@ void shutdown_cb (flux_t *h, flux_msg_handler_t *mh, flux_reactor_stop (flux_get_reactor (h)); } -static struct flux_msg_handler_spec htab[] = { - { FLUX_MSGTYPE_REQUEST, "content-backing.load", load_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "content-backing.store", store_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "content-sqlite.shutdown", shutdown_cb, 0, NULL }, - { FLUX_MSGTYPE_EVENT, "shutdown", broker_shutdown_cb, 0, NULL }, +static const struct flux_msg_handler_spec htab[] = { + { FLUX_MSGTYPE_REQUEST, "content-backing.load", load_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "content-backing.store", store_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "content-sqlite.shutdown", shutdown_cb, 0, }, + { FLUX_MSGTYPE_EVENT, "shutdown", broker_shutdown_cb, 0 }, FLUX_MSGHANDLER_TABLE_END, }; int mod_main (flux_t *h, int argc, char **argv) { + flux_msg_handler_t **handlers = NULL; int lzo_rc = lzo_init (); sqlite_ctx_t *ctx = getctx (h); if (!ctx) - return -1; + goto done; if (lzo_rc != LZO_E_OK) { flux_log (h, LOG_ERR, "lzo_init failed (rc=%d)", lzo_rc); - return -1; + goto done; } if (flux_event_subscribe (h, "shutdown") < 0) { flux_log_error (h, "flux_event_subscribe"); - return -1; + goto done; } - if (flux_msg_handler_addvec (h, htab, ctx) < 0) { + if (flux_msg_handler_addvec (h, htab, ctx, &handlers) < 0) { flux_log_error (h, "flux_msg_handler_addvec"); - return -1; + goto done; } if (register_backing_store (h, true, "content-sqlite") < 0) { flux_log_error (h, "registering backing store"); @@ -546,7 +547,7 @@ int mod_main (flux_t *h, int argc, char **argv) goto done; } done: - flux_msg_handler_delvec (htab); + flux_msg_handler_delvec (handlers); return 0; } diff --git a/src/modules/cron/cron.c b/src/modules/cron/cron.c index 862c42d7272c..821e345580cc 100644 --- a/src/modules/cron/cron.c +++ b/src/modules/cron/cron.c @@ -907,13 +907,13 @@ static void cron_ls_handler (flux_t *h, flux_msg_handler_t *w, /**************************************************************************/ -static struct flux_msg_handler_spec htab[] = { - { FLUX_MSGTYPE_REQUEST, "cron.create", cron_create_handler, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "cron.delete", cron_delete_handler, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "cron.list", cron_ls_handler, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "cron.stop", cron_stop_handler, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "cron.start", cron_start_handler, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "cron.sync", cron_sync_handler, 0, NULL }, +static const struct flux_msg_handler_spec htab[] = { + { FLUX_MSGTYPE_REQUEST, "cron.create", cron_create_handler, 0 }, + { FLUX_MSGTYPE_REQUEST, "cron.delete", cron_delete_handler, 0 }, + { FLUX_MSGTYPE_REQUEST, "cron.list", cron_ls_handler, 0 }, + { FLUX_MSGTYPE_REQUEST, "cron.stop", cron_stop_handler, 0 }, + { FLUX_MSGTYPE_REQUEST, "cron.start", cron_start_handler, 0 }, + { FLUX_MSGTYPE_REQUEST, "cron.sync", cron_sync_handler, 0 }, FLUX_MSGHANDLER_TABLE_END, }; @@ -941,19 +941,20 @@ static void process_args (cron_ctx_t *ctx, int ac, char **av) int mod_main (flux_t *h, int ac, char **av) { int rc = -1; + flux_msg_handler_t **handlers = NULL; cron_ctx_t *ctx = cron_ctx_create (h); process_args (ctx, ac, av); - if (flux_msg_handler_addvec (h, htab, ctx) < 0) { + if (flux_msg_handler_addvec (h, htab, ctx, &handlers) < 0) { flux_log_error (h, "flux_msg_handler_addvec"); goto done; } if ((rc = flux_reactor_run (flux_get_reactor (h), 0)) < 0) flux_log_error (h, "flux_reactor_run"); - flux_msg_handler_delvec (htab); - cron_ctx_destroy (ctx); done: + flux_msg_handler_delvec (handlers); + cron_ctx_destroy (ctx); return rc; } diff --git a/src/modules/kvs/kvs.c b/src/modules/kvs/kvs.c index e3ed15c54de2..1e495db18331 100644 --- a/src/modules/kvs/kvs.c +++ b/src/modules/kvs/kvs.c @@ -1575,23 +1575,23 @@ static void stats_clear_request_cb (flux_t *h, flux_msg_handler_t *mh, flux_log_error (h, "%s: flux_respond", __FUNCTION__); } -static struct flux_msg_handler_spec handlers[] = { - { FLUX_MSGTYPE_REQUEST, "kvs.stats.get", stats_get_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "kvs.stats.clear",stats_clear_request_cb, 0, NULL }, - { FLUX_MSGTYPE_EVENT, "kvs.stats.clear",stats_clear_event_cb, 0, NULL }, - { FLUX_MSGTYPE_EVENT, "kvs.setroot", setroot_event_cb, 0, NULL }, - { FLUX_MSGTYPE_EVENT, "kvs.error", error_event_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "kvs.getroot", getroot_request_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "kvs.dropcache", dropcache_request_cb, 0, NULL }, - { FLUX_MSGTYPE_EVENT, "kvs.dropcache", dropcache_event_cb, 0, NULL }, - { FLUX_MSGTYPE_EVENT, "hb", heartbeat_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "kvs.disconnect", disconnect_request_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "kvs.unwatch", unwatch_request_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "kvs.sync", sync_request_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "kvs.get", get_request_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "kvs.watch", watch_request_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "kvs.fence", fence_request_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "kvs.relayfence", relayfence_request_cb, 0, NULL }, +static const struct flux_msg_handler_spec htab[] = { + { FLUX_MSGTYPE_REQUEST, "kvs.stats.get", stats_get_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "kvs.stats.clear",stats_clear_request_cb, 0 }, + { FLUX_MSGTYPE_EVENT, "kvs.stats.clear",stats_clear_event_cb, 0 }, + { FLUX_MSGTYPE_EVENT, "kvs.setroot", setroot_event_cb, 0 }, + { FLUX_MSGTYPE_EVENT, "kvs.error", error_event_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "kvs.getroot", getroot_request_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "kvs.dropcache", dropcache_request_cb, 0 }, + { FLUX_MSGTYPE_EVENT, "kvs.dropcache", dropcache_event_cb, 0 }, + { FLUX_MSGTYPE_EVENT, "hb", heartbeat_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "kvs.disconnect", disconnect_request_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "kvs.unwatch", unwatch_request_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "kvs.sync", sync_request_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "kvs.get", get_request_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "kvs.watch", watch_request_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "kvs.fence", fence_request_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "kvs.relayfence", relayfence_request_cb, 0 }, FLUX_MSGHANDLER_TABLE_END, }; @@ -1687,6 +1687,7 @@ static int store_initial_rootdir (kvs_ctx_t *ctx, const json_t *rootdir, int mod_main (flux_t *h, int argc, char **argv) { kvs_ctx_t *ctx = getctx (h); + flux_msg_handler_t **handlers = NULL; int rc = -1; if (!ctx) { @@ -1719,18 +1720,17 @@ int mod_main (flux_t *h, int argc, char **argv) } setroot (ctx, rootref, rootseq); } - if (flux_msg_handler_addvec (h, handlers, ctx) < 0) { + if (flux_msg_handler_addvec (h, htab, ctx, &handlers) < 0) { flux_log_error (h, "flux_msg_handler_addvec"); goto done; } if (flux_reactor_run (flux_get_reactor (h), 0) < 0) { flux_log_error (h, "flux_reactor_run"); - goto done_delvec; + goto done; } rc = 0; -done_delvec: - flux_msg_handler_delvec (handlers); done: + flux_msg_handler_delvec (handlers); return rc; } diff --git a/src/modules/resource-hwloc/resource.c b/src/modules/resource-hwloc/resource.c index 4b2cc5b8a033..ed6e5bafca14 100644 --- a/src/modules/resource-hwloc/resource.c +++ b/src/modules/resource-hwloc/resource.c @@ -588,12 +588,12 @@ static void process_args (flux_t *h, resource_ctx_t *ctx, int argc, char **argv) } } -static struct flux_msg_handler_spec htab[] = { +static const struct flux_msg_handler_spec htab[] = { { FLUX_MSGTYPE_REQUEST, "resource-hwloc.reload", reload_request_cb, - 0, NULL + 0 }, { FLUX_MSGTYPE_REQUEST, "resource-hwloc.topo", topo_request_cb, - FLUX_ROLE_USER, NULL + FLUX_ROLE_USER }, FLUX_MSGHANDLER_TABLE_END }; @@ -602,6 +602,7 @@ int mod_main (flux_t *h, int argc, char **argv) { int rc = -1; resource_ctx_t *ctx; + flux_msg_handler_t **handlers = NULL; if (!(ctx = resource_hwloc_ctx_create (h))) goto done; @@ -617,19 +618,18 @@ int mod_main (flux_t *h, int argc, char **argv) goto done; } - if (flux_msg_handler_addvec (h, htab, ctx) < 0) { + if (flux_msg_handler_addvec (h, htab, ctx, &handlers) < 0) { flux_log_error (h, "flux_msghandler_add"); goto done; } if (flux_reactor_run (flux_get_reactor (h), 0) < 0) { flux_log_error (h, "flux_reactor_run"); - goto done_delvec; + goto done; } rc = 0; -done_delvec: - flux_msg_handler_delvec (htab); done: + flux_msg_handler_delvec (handlers); resource_hwloc_ctx_destroy (ctx); return rc; } diff --git a/src/modules/userdb/userdb.c b/src/modules/userdb/userdb.c index 3acaa7fa8158..d200dcbce9f1 100644 --- a/src/modules/userdb/userdb.c +++ b/src/modules/userdb/userdb.c @@ -350,12 +350,12 @@ static void disconnect (flux_t *h, flux_msg_handler_t *mh, } } -static struct flux_msg_handler_spec htab[] = { - { FLUX_MSGTYPE_REQUEST, "userdb.lookup", lookup, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "userdb.addrole", addrole, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "userdb.delrole", delrole, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "userdb.getnext", getnext, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "userdb.disconnect", disconnect, 0, NULL }, +static const struct flux_msg_handler_spec htab[] = { + { FLUX_MSGTYPE_REQUEST, "userdb.lookup", lookup, 0 }, + { FLUX_MSGTYPE_REQUEST, "userdb.addrole", addrole, 0 }, + { FLUX_MSGTYPE_REQUEST, "userdb.delrole", delrole, 0 }, + { FLUX_MSGTYPE_REQUEST, "userdb.getnext", getnext, 0 }, + { FLUX_MSGTYPE_REQUEST, "userdb.disconnect", disconnect, 0 }, FLUX_MSGHANDLER_TABLE_END, }; @@ -363,6 +363,7 @@ int mod_main (flux_t *h, int argc, char **argv) { int rc = -1; userdb_ctx_t *ctx; + flux_msg_handler_t **handlers = NULL; struct user *up; if (!(ctx = getctx (h, argc, argv))) { @@ -372,18 +373,17 @@ int mod_main (flux_t *h, int argc, char **argv) flux_log_error (h, "failed to add owner to userdb"); goto done; } - if (flux_msg_handler_addvec (h, htab, ctx) < 0) { + if (flux_msg_handler_addvec (h, htab, ctx, &handlers) < 0) { flux_log_error (h, "flux_msghandler_add"); goto done; } if (flux_reactor_run (flux_get_reactor (h), 0) < 0) { flux_log_error (h, "flux_reactor_run"); - goto done_unreg; + goto done; } rc = 0; -done_unreg: - flux_msg_handler_delvec (htab); done: + flux_msg_handler_delvec (handlers); return rc; } diff --git a/src/modules/wreck/job.c b/src/modules/wreck/job.c index 8cf561f02ffb..0f175a65d54f 100644 --- a/src/modules/wreck/job.c +++ b/src/modules/wreck/job.c @@ -608,18 +608,21 @@ static void runevent_cb (flux_t *h, flux_msg_handler_t *w, Jput (in); } -struct flux_msg_handler_spec mtab[] = { - { FLUX_MSGTYPE_REQUEST, "job.create", job_request_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "job.submit", job_request_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "job.shutdown", job_request_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "job.kvspath", job_kvspath_cb, 0, NULL }, - { FLUX_MSGTYPE_EVENT, "wrexec.run.*", runevent_cb, 0, NULL }, +static const struct flux_msg_handler_spec mtab[] = { + { FLUX_MSGTYPE_REQUEST, "job.create", job_request_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "job.submit", job_request_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "job.shutdown", job_request_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "job.kvspath", job_kvspath_cb, 0 }, + { FLUX_MSGTYPE_EVENT, "wrexec.run.*", runevent_cb, 0 }, FLUX_MSGHANDLER_TABLE_END }; int mod_main (flux_t *h, int argc, char **argv) { - if (flux_msg_handler_addvec (h, mtab, NULL) < 0) { + flux_msg_handler_t **handlers = NULL; + int rc = -1; + + if (flux_msg_handler_addvec (h, mtab, NULL, &handlers) < 0) { flux_log_error (h, "flux_msg_handler_addvec"); return (-1); } @@ -633,38 +636,40 @@ int mod_main (flux_t *h, int argc, char **argv) || (flux_event_subscribe (h, "wreck.state.submitted") < 0) || (flux_event_subscribe (h, "wrexec.run.") < 0)) { flux_log_error (h, "flux_event_subscribe"); - return -1; + goto done; } if ((flux_attr_get_int (h, "wreck.lwj-dir-levels", &kvs_dir_levels) < 0) && (flux_attr_set_int (h, "wreck.lwj-dir-levels", kvs_dir_levels) < 0)) { flux_log_error (h, "failed to get or set lwj-dir-levels"); - return -1; + goto done; } if ((flux_attr_get_int (h, "wreck.lwj-bits-per-dir", &kvs_bits_per_dir) < 0) && (flux_attr_set_int (h, "wreck.lwj-bits-per-dir", kvs_bits_per_dir) < 0)) { flux_log_error (h, "failed to get or set lwj-bits-per-dir"); - return -1; + goto done; } if (flux_get_rank (h, &broker_rank) < 0) { flux_log_error (h, "flux_get_rank"); - return -1; + goto done; } if (!(local_uri = flux_attr_get (h, "local-uri", NULL))) { flux_log_error (h, "flux_attr_get (\"local-uri\")"); - return -1; + goto done; } if (flux_reactor_run (flux_get_reactor (h), 0) < 0) { flux_log_error (h, "flux_reactor_run"); - return -1; + goto done; } - flux_msg_handler_delvec (mtab); - return 0; + rc = 0; +done: + flux_msg_handler_delvec (handlers); + return rc; } MOD_NAME ("job"); diff --git a/t/module/parent.c b/t/module/parent.c index 2d153fb3dbee..11f55d4eec21 100644 --- a/t/module/parent.c +++ b/t/module/parent.c @@ -212,16 +212,17 @@ static void lsmod_request_cb (flux_t *h, flux_msg_handler_t *mh, flux_modlist_destroy (mods); } -struct flux_msg_handler_spec htab[] = { - { FLUX_MSGTYPE_REQUEST, "parent.insmod", insmod_request_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "parent.rmmod", rmmod_request_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "parent.lsmod", lsmod_request_cb, 0, NULL }, +const struct flux_msg_handler_spec htab[] = { + { FLUX_MSGTYPE_REQUEST, "parent.insmod", insmod_request_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "parent.rmmod", rmmod_request_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "parent.lsmod", lsmod_request_cb, 0 }, FLUX_MSGHANDLER_TABLE_END, }; int mod_main (flux_t *h, int argc, char **argv) { int saved_errno; + flux_msg_handler_t **handlers = NULL; if (argc == 1 && !strcmp (argv[0], "--init-failure")) { flux_log (h, LOG_INFO, "aborting during init per test request"); @@ -232,7 +233,7 @@ int mod_main (flux_t *h, int argc, char **argv) saved_errno = ENOMEM; goto error; } - if (flux_msg_handler_addvec (h, htab, NULL) < 0) { + if (flux_msg_handler_addvec (h, htab, NULL, &handlers) < 0) { saved_errno = errno; flux_log_error (h, "flux_msghandler_addvec"); goto error; @@ -245,6 +246,7 @@ int mod_main (flux_t *h, int argc, char **argv) zhash_destroy (&modules); return 0; error: + flux_msg_handler_delvec (handlers); zhash_destroy (&modules); errno = saved_errno; return -1; diff --git a/t/request/req.c b/t/request/req.c index 8736ddbe00c3..c65595204389 100644 --- a/t/request/req.c +++ b/t/request/req.c @@ -382,18 +382,18 @@ void null_request_cb (flux_t *h, flux_msg_handler_t *mh, flux_reactor_stop_error (flux_get_reactor (h)); } -struct flux_msg_handler_spec htab[] = { - { FLUX_MSGTYPE_REQUEST, "req.null", null_request_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "req.echo", echo_request_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "req.err", err_request_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "req.src", src_request_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "req.nsrc", nsrc_request_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "req.sink", sink_request_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "req.xping", xping_request_cb, 0, NULL }, - { FLUX_MSGTYPE_RESPONSE, "req.ping", ping_response_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "req.clog", clog_request_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "req.flush", flush_request_cb, 0, NULL }, - { FLUX_MSGTYPE_REQUEST, "req.count", count_request_cb, 0, NULL }, +const struct flux_msg_handler_spec htab[] = { + { FLUX_MSGTYPE_REQUEST, "req.null", null_request_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "req.echo", echo_request_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "req.err", err_request_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "req.src", src_request_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "req.nsrc", nsrc_request_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "req.sink", sink_request_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "req.xping", xping_request_cb, 0 }, + { FLUX_MSGTYPE_RESPONSE, "req.ping", ping_response_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "req.clog", clog_request_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "req.flush", flush_request_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "req.count", count_request_cb, 0 }, FLUX_MSGHANDLER_TABLE_END, }; @@ -401,13 +401,14 @@ int mod_main (flux_t *h, int argc, char **argv) { int saved_errno; t_req_ctx_t *ctx = getctx (h); + flux_msg_handler_t **handlers = NULL; if (!ctx) { saved_errno = errno; flux_log_error (h, "error allocating context"); goto error; } - if (flux_msg_handler_addvec (h, htab, ctx) < 0) { + if (flux_msg_handler_addvec (h, htab, ctx, &handlers) < 0) { saved_errno = errno; flux_log_error (h, "flux_msg_handler_addvec"); goto error; @@ -415,10 +416,10 @@ int mod_main (flux_t *h, int argc, char **argv) if (flux_reactor_run (flux_get_reactor (h), 0) < 0) { saved_errno = errno; flux_log_error (h, "flux_reactor_run"); - flux_msg_handler_delvec (htab); + flux_msg_handler_delvec (handlers); goto error; } - flux_msg_handler_delvec (htab); + flux_msg_handler_delvec (handlers); return 0; error: errno = saved_errno; diff --git a/t/rpc/mrpc.c b/t/rpc/mrpc.c index 4bf9f7bbdc5a..289c25d7ad4a 100644 --- a/t/rpc/mrpc.c +++ b/t/rpc/mrpc.c @@ -119,19 +119,19 @@ void rpcftest_hello_cb (flux_t *h, flux_msg_handler_t *mh, (void)flux_respond_pack (h, msg, "{}"); } -static struct flux_msg_handler_spec htab[] = { - { FLUX_MSGTYPE_REQUEST, "rpctest.hello", rpctest_hello_cb, 0, NULL}, - { FLUX_MSGTYPE_REQUEST, "rpcftest.hello", rpcftest_hello_cb, 0, NULL}, - { FLUX_MSGTYPE_REQUEST, "rpctest.echo", rpctest_echo_cb, 0, NULL}, - { FLUX_MSGTYPE_REQUEST, "rpctest.nodeid", rpctest_nodeid_cb, 0, NULL}, - { FLUX_MSGTYPE_REQUEST, "rpcftest.nodeid", rpcftest_nodeid_cb, 0, NULL}, +static const struct flux_msg_handler_spec htab[] = { + { FLUX_MSGTYPE_REQUEST, "rpctest.hello", rpctest_hello_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "rpcftest.hello", rpcftest_hello_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "rpctest.echo", rpctest_echo_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "rpctest.nodeid", rpctest_nodeid_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "rpcftest.nodeid", rpcftest_nodeid_cb, 0 }, FLUX_MSGHANDLER_TABLE_END, }; -const int htablen = sizeof (htab) / sizeof (htab[0]); int test_server (flux_t *h, void *arg) { - if (flux_msg_handler_addvec (h, htab, NULL) < 0) { + flux_msg_handler_t **handlers = NULL; + if (flux_msg_handler_addvec (h, htab, NULL, &handlers) < 0) { diag ("flux_msg_handler_addvec failed"); return -1; } @@ -139,7 +139,7 @@ int test_server (flux_t *h, void *arg) diag ("flux_reactor_run failed"); return -1; } - flux_msg_handler_delvec (htab); + flux_msg_handler_delvec (handlers); return 0; } diff --git a/t/rpc/rpc.c b/t/rpc/rpc.c index d74cc5f86626..3c1be96fc0ff 100644 --- a/t/rpc/rpc.c +++ b/t/rpc/rpc.c @@ -110,19 +110,20 @@ void rpcftest_hello_cb (flux_t *h, flux_msg_handler_t *mh, (void)flux_respond_pack (h, msg, "{}"); } -static struct flux_msg_handler_spec htab[] = { - { FLUX_MSGTYPE_REQUEST, "rpctest.incr", rpctest_incr_cb, 0, NULL}, - { FLUX_MSGTYPE_REQUEST, "rpctest.hello", rpctest_hello_cb, 0, NULL}, - { FLUX_MSGTYPE_REQUEST, "rpcftest.hello", rpcftest_hello_cb, 0, NULL}, - { FLUX_MSGTYPE_REQUEST, "rpctest.echo", rpctest_echo_cb, 0, NULL}, - { FLUX_MSGTYPE_REQUEST, "rpctest.rawecho", rpctest_rawecho_cb, 0, NULL}, - { FLUX_MSGTYPE_REQUEST, "rpctest.nodeid", rpctest_nodeid_cb, 0, NULL}, +static const struct flux_msg_handler_spec htab[] = { + { FLUX_MSGTYPE_REQUEST, "rpctest.incr", rpctest_incr_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "rpctest.hello", rpctest_hello_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "rpcftest.hello", rpcftest_hello_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "rpctest.echo", rpctest_echo_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "rpctest.rawecho", rpctest_rawecho_cb, 0 }, + { FLUX_MSGTYPE_REQUEST, "rpctest.nodeid", rpctest_nodeid_cb, 0 }, FLUX_MSGHANDLER_TABLE_END, }; int test_server (flux_t *h, void *arg) { - if (flux_msg_handler_addvec (h, htab, NULL) < 0) { + flux_msg_handler_t **handlers = NULL; + if (flux_msg_handler_addvec (h, htab, NULL, &handlers) < 0) { diag ("flux_msg_handler_addvec failed"); return -1; } @@ -130,7 +131,7 @@ int test_server (flux_t *h, void *arg) diag ("flux_reactor_run failed"); return -1; } - flux_msg_handler_delvec (htab); + flux_msg_handler_delvec (handlers); return 0; }