Skip to content

Commit

Permalink
Merge pull request #1277 from garlick/msg_handlers
Browse files Browse the repository at this point in the history
libflux: clean up bulk message handler registration functions
  • Loading branch information
morrone authored Nov 13, 2017
2 parents 819d8c5 + 43fa0c8 commit 3c3890c
Show file tree
Hide file tree
Showing 29 changed files with 305 additions and 252 deletions.
20 changes: 11 additions & 9 deletions doc/man3/flux_msg_handler_addvec.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ flux_msg_handler_addvec(3)

NAME
----
flux_msg_handler_addvec, flux_msg_handler_delvec - bulk add/remove message handlers
flux_msg_handler_addvec,
flux_msg_handler_delvec - bulk add/remove message handlers


SYNOPSIS
Expand All @@ -14,28 +15,29 @@ SYNOPSIS

struct flux_msg_handler_spec {
int typemask;
char *topic_glob;
const char *topic_glob;
flux_msg_handler_f cb;
uint32_t rolemask;
flux_msg_handler_t *w;
};

int flux_msg_handler_addvec (flux_t *h,
struct flux_msg_handler_spec tab[],
void *arg);
const struct flux_msg_handler_spec tab[],
void *arg,
flux_msg_handler_t **handlers[]);

void flux_msg_handler_delvec (struct flux_msg_handler_spec tab[]);
void flux_msg_handler_delvec (flux_msg_handler_t *handlers[]);


DESCRIPTION
-----------
`flux_msg_handler_addvec()` creates and starts an array of message handlers,
terminated by FLUX_MSGHANDLER_TABLE_END. The new message handler objects
are stored in the array.
are assigned to an internally allocated array, returned in _handlers_.
The last entry in the array is set to NULL.
`flux_msg_handler_delvec()` stops and destroys an array of message handler
objects, terminated by FLUX_MSGHANDLER_TABLE_END.
`flux_msg_handler_delvec()` stops and destroys an array of message handlers
returned from `flux_msg_handler_addvec()`.
These functions are convenience functions which call
`flux_msg_handler_create(3)`, `flux_msg_handler_start(3)`; and
Expand Down
11 changes: 6 additions & 5 deletions doc/man3/treduce.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ void heartbeat_cb (flux_t *h, flux_msg_handler_t *mh,
free (item);
}

struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_EVENT, "hb", heartbeat_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "treduce.forward", forward_cb, 0, NULL },
const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_EVENT, "hb", heartbeat_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "treduce.forward", forward_cb, 0 },
FLUX_MSGHANDLER_TABLE_END,
};

Expand All @@ -123,6 +123,7 @@ int mod_main (flux_t *h, int argc, char **argv)
uint32_t rank;
double timeout = 0.;
int flags;
flux_msg_handler_t **handlers = NULL;

if (argc == 1) {
timeout = strtod (argv[0], NULL);
Expand All @@ -139,11 +140,11 @@ int mod_main (flux_t *h, int argc, char **argv)
return -1;
if (flux_event_subscribe (h, "hb") < 0)
return -1;
if (flux_msg_handler_addvec (h, htab, &ctx) < 0)
if (flux_msg_handler_addvec (h, htab, &ctx, &handlers) < 0)
return -1;
if (flux_reactor_run (flux_get_reactor (h), 0) < 0)
return -1;
flux_msg_handler_delvec (htab);
flux_msg_handler_delvec (handlers);
return 0;
}

Expand Down
18 changes: 8 additions & 10 deletions src/broker/attr.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

struct attr_struct {
zhash_t *hash;
flux_msg_handler_t **handlers;
};

struct entry {
Expand Down Expand Up @@ -415,27 +416,24 @@ void lsattr_request_cb (flux_t *h, flux_msg_handler_t *mh,
** Initialization
**/

static struct flux_msg_handler_spec handlers[] = {
{ FLUX_MSGTYPE_REQUEST, "attr.get", getattr_request_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "attr.list", lsattr_request_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "attr.set", setattr_request_cb, 0, NULL },
static const struct flux_msg_handler_spec handlers[] = {
{ FLUX_MSGTYPE_REQUEST, "attr.get", getattr_request_cb, FLUX_ROLE_ALL },
{ FLUX_MSGTYPE_REQUEST, "attr.list", lsattr_request_cb, FLUX_ROLE_ALL },
{ FLUX_MSGTYPE_REQUEST, "attr.set", setattr_request_cb, 0 },
FLUX_MSGHANDLER_TABLE_END,
};


int attr_register_handlers (attr_t *attrs, flux_t *h)
{
if (flux_msg_handler_addvec (h, handlers, attrs) < 0)
if (flux_msg_handler_addvec (h, handlers, attrs, &attrs->handlers) < 0)
return -1;
/* allow any user to attr.get and attr.list */
flux_msg_handler_allow_rolemask (handlers[0].w, FLUX_ROLE_ALL);
flux_msg_handler_allow_rolemask (handlers[1].w, FLUX_ROLE_ALL);
return 0;
}

void attr_unregister_handlers (void)
void attr_unregister_handlers (attr_t *attrs)
{
flux_msg_handler_delvec (handlers);
flux_msg_handler_delvec (attrs->handlers);
}

attr_t *attr_create (void)
Expand Down
2 changes: 1 addition & 1 deletion src/broker/attr.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ void attr_destroy (attr_t *attrs);
/* Register/unregister message handlers
*/
int attr_register_handlers (attr_t *attrs, flux_t *h);
void attr_unregister_handlers (void);
void attr_unregister_handlers (attr_t *attrs);

/* Delete an attribute
*/
Expand Down
43 changes: 28 additions & 15 deletions src/broker/broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ static void signal_cb (flux_reactor_t *r, flux_watcher_t *w,
static void broker_handle_signals (broker_ctx_t *ctx, zlist_t *sigwatchers);
static void broker_unhandle_signals (zlist_t *sigwatchers);

static void broker_add_services (broker_ctx_t *ctx);
static flux_msg_handler_t **broker_add_services (broker_ctx_t *ctx);
static void broker_remove_services (flux_msg_handler_t *handlers[]);

static int load_module_byname (broker_ctx_t *ctx, const char *name,
const char *argz, size_t argz_len,
Expand Down Expand Up @@ -310,6 +311,7 @@ int main (int argc, char *argv[])
sigset_t old_sigmask;
struct sigaction old_sigact_int;
struct sigaction old_sigact_term;
flux_msg_handler_t **handlers;

memset (&ctx, 0, sizeof (ctx));
log_init (argv[0]);
Expand Down Expand Up @@ -579,7 +581,7 @@ int main (int argc, char *argv[])
if (rusage_initialize (ctx.h, "cmb") < 0)
log_err_exit ("rusage_initialize");

broker_add_services (&ctx);
handlers = broker_add_services (&ctx);

/* Initialize comms module infrastructure.
*/
Expand Down Expand Up @@ -651,7 +653,7 @@ int main (int argc, char *argv[])

/* Unregister builtin services
*/
attr_unregister_handlers ();
attr_unregister_handlers (ctx.attrs);
content_cache_destroy (ctx.cache);

broker_unhandle_signals (sigwatchers);
Expand All @@ -668,6 +670,8 @@ int main (int argc, char *argv[])
service_switch_destroy (ctx.services);
hello_destroy (ctx.hello);
attr_destroy (ctx.attrs);
shutdown_destroy (ctx.shutdown);
broker_remove_services (handlers);
flux_close (ctx.h);
flux_reactor_destroy (ctx.reactor);
if (ctx.subscriptions) {
Expand Down Expand Up @@ -1694,16 +1698,16 @@ static int route_to_handle (const flux_msg_t *msg, void *arg)
return 0;
}

static struct flux_msg_handler_spec handlers[] = {
{ FLUX_MSGTYPE_REQUEST, "cmb.rmmod", cmb_rmmod_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.insmod", cmb_insmod_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.lsmod", cmb_lsmod_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.lspeer", cmb_lspeer_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.panic", cmb_panic_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.event-mute", cmb_event_mute_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.disconnect", cmb_disconnect_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.sub", cmb_sub_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.unsub", cmb_unsub_cb, 0, NULL },
static const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_REQUEST, "cmb.rmmod", cmb_rmmod_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.insmod", cmb_insmod_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.lsmod", cmb_lsmod_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.lspeer", cmb_lspeer_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.panic", cmb_panic_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.event-mute", cmb_event_mute_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.disconnect", cmb_disconnect_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.sub", cmb_sub_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.unsub", cmb_unsub_cb, 0 },
FLUX_MSGHANDLER_TABLE_END,
};

Expand All @@ -1727,8 +1731,9 @@ static struct internal_service services[] = {
* Register message handlers for some cmb services. Others are registered
* in their own initialization functions.
*/
static void broker_add_services (broker_ctx_t *ctx)
static flux_msg_handler_t **broker_add_services (broker_ctx_t *ctx)
{
flux_msg_handler_t **handlers;
struct internal_service *svc;
for (svc = &services[0]; svc->name != NULL; svc++) {
if (!nodeset_member (svc->nodeset, overlay_get_rank(ctx->overlay)))
Expand All @@ -1738,8 +1743,16 @@ static void broker_add_services (broker_ctx_t *ctx)
log_err_exit ("error registering service for %s", svc->name);
}

if (flux_msg_handler_addvec (ctx->h, handlers, ctx) < 0)
if (flux_msg_handler_addvec (ctx->h, htab, ctx, &handlers) < 0)
log_err_exit ("error registering message handlers");
return handlers;
}

/* Unregister message handlers
*/
static void broker_remove_services (flux_msg_handler_t *handlers[])
{
flux_msg_handler_delvec (handlers);
}

/**
Expand Down
23 changes: 13 additions & 10 deletions src/broker/content-cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ struct cache_entry {
struct content_cache {
flux_t *h;
flux_t *enclosing_h;
flux_msg_handler_t **handlers;
uint32_t rank;
zhash_t *entries;
uint8_t backing:1; /* 'content.backing' service available */
Expand Down Expand Up @@ -854,22 +855,24 @@ static void heartbeat_event (flux_t *h, flux_msg_handler_t *mh,
/* Initialization
*/

static struct flux_msg_handler_spec handlers[] = {
{ FLUX_MSGTYPE_REQUEST, "content.load", content_load_request, FLUX_ROLE_USER, NULL },
{ FLUX_MSGTYPE_REQUEST, "content.store", content_store_request, FLUX_ROLE_USER, NULL },
{ FLUX_MSGTYPE_REQUEST, "content.backing", content_backing_request, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "content.dropcache", content_dropcache_request, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "content.stats.get", content_stats_request, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "content.flush", content_flush_request, 0, NULL },
{ FLUX_MSGTYPE_EVENT, "hb", heartbeat_event, 0, NULL },
static const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_REQUEST, "content.load", content_load_request,
FLUX_ROLE_USER },
{ FLUX_MSGTYPE_REQUEST, "content.store", content_store_request,
FLUX_ROLE_USER },
{ FLUX_MSGTYPE_REQUEST, "content.backing", content_backing_request, 0 },
{ FLUX_MSGTYPE_REQUEST, "content.dropcache", content_dropcache_request, 0 },
{ FLUX_MSGTYPE_REQUEST, "content.stats.get", content_stats_request, 0 },
{ FLUX_MSGTYPE_REQUEST, "content.flush", content_flush_request, 0 },
{ FLUX_MSGTYPE_EVENT, "hb", heartbeat_event, 0 },
FLUX_MSGHANDLER_TABLE_END,
};

int content_cache_set_flux (content_cache_t *cache, flux_t *h)
{
cache->h = h;

if (flux_msg_handler_addvec (h, handlers, cache) < 0)
if (flux_msg_handler_addvec (h, htab, cache, &cache->handlers) < 0)
return -1;
if (flux_get_rank (h, &cache->rank) < 0)
return -1;
Expand Down Expand Up @@ -975,7 +978,7 @@ void content_cache_destroy (content_cache_t *cache)
if (cache) {
if (cache->h) {
(void)flux_event_unsubscribe (cache->h, "hb");
flux_msg_handler_delvec (handlers);
flux_msg_handler_delvec (cache->handlers);
}
if (cache->backing_name)
free (cache->backing_name);
Expand Down
15 changes: 8 additions & 7 deletions src/broker/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

typedef struct {
flux_t *h;
flux_msg_handler_t **handlers;
struct subprocess_manager *sm;
uint32_t rank;
const char *local_uri;
Expand Down Expand Up @@ -507,19 +508,19 @@ static void ps_request_cb (flux_t *h, flux_msg_handler_t *mh,
json_decref (procs);
}

static struct flux_msg_handler_spec handlers[] = {
{ FLUX_MSGTYPE_REQUEST, "cmb.exec", exec_request_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.exec.signal", signal_request_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.exec.write", write_request_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.processes", ps_request_cb, 0, NULL },
static const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_REQUEST, "cmb.exec", exec_request_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.exec.signal", signal_request_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.exec.write", write_request_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.processes", ps_request_cb, 0 },
FLUX_MSGHANDLER_TABLE_END,
};

static void exec_finalize (void *arg)
{
exec_t *x = arg;
flux_msg_handler_delvec (x->handlers);
free (x);
flux_msg_handler_delvec (handlers);
}

int exec_initialize (flux_t *h, struct subprocess_manager *sm,
Expand All @@ -537,7 +538,7 @@ int exec_initialize (flux_t *h, struct subprocess_manager *sm,
free (x);
return -1;
}
if (flux_msg_handler_addvec (h, handlers, x) < 0) {
if (flux_msg_handler_addvec (h, htab, x, &x->handlers) < 0) {
free (x);
return -1;
}
Expand Down
12 changes: 7 additions & 5 deletions src/broker/heaptrace.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
#include <flux/core.h>
#include "heaptrace.h"

static flux_msg_handler_t **handlers = NULL;

static void start_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
Expand Down Expand Up @@ -111,10 +113,10 @@ static void stop_cb (flux_t *h, flux_msg_handler_t *mh,
FLUX_LOG_ERROR (h);
}

static struct flux_msg_handler_spec handlers[] = {
{ FLUX_MSGTYPE_REQUEST, "heaptrace.start", start_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "heaptrace.dump", dump_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "heaptrace.stop", stop_cb, 0, NULL },
static const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_REQUEST, "heaptrace.start", start_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "heaptrace.dump", dump_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "heaptrace.stop", stop_cb, 0 },
FLUX_MSGHANDLER_TABLE_END,
};

Expand All @@ -126,7 +128,7 @@ static void heaptrace_finalize (void *arg)
int heaptrace_initialize (flux_t *h)
{
char *dummy = "hello";
if (flux_msg_handler_addvec (h, handlers, NULL) < 0)
if (flux_msg_handler_addvec (h, htab, NULL, &handlers) < 0)
return -1;
flux_aux_set (h, "flux::heaptrace", dummy, heaptrace_finalize);
return 0;
Expand Down
9 changes: 6 additions & 3 deletions src/broker/hello.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ static double default_reduction_timeout = 10.;
struct hello_struct {
flux_t *h;
attr_t *attrs;
flux_msg_handler_t **handlers;
uint32_t rank;
uint32_t size;
uint32_t count;
Expand Down Expand Up @@ -82,6 +83,7 @@ void hello_destroy (hello_t *hello)
{
if (hello) {
flux_reduce_destroy (hello->reduce);
flux_msg_handler_delvec (hello->handlers);
free (hello);
}
}
Expand Down Expand Up @@ -123,8 +125,8 @@ int hello_register_attrs (hello_t *hello, attr_t *attrs)
return 0;
}

static struct flux_msg_handler_spec handlers[] = {
{ FLUX_MSGTYPE_REQUEST, "hello.join", join_request, 0, NULL },
static const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_REQUEST, "hello.join", join_request, 0 },
FLUX_MSGHANDLER_TABLE_END,
};

Expand Down Expand Up @@ -169,7 +171,8 @@ int hello_start (hello_t *hello)
log_err ("hello: error getting rank/size");
goto done;
}
if (flux_msg_handler_addvec (hello->h, handlers, hello) < 0) {
if (flux_msg_handler_addvec (hello->h, htab, hello,
&hello->handlers) < 0) {
log_err ("hello: adding message handlers");
goto done;
}
Expand Down
Loading

0 comments on commit 3c3890c

Please sign in to comment.