Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libflux: clean up bulk message handler registration functions #1277

Merged
merged 7 commits into from
Nov 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions doc/man3/flux_msg_handler_addvec.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ flux_msg_handler_addvec(3)

NAME
----
flux_msg_handler_addvec, flux_msg_handler_delvec - bulk add/remove message handlers
flux_msg_handler_addvec,
flux_msg_handler_delvec - bulk add/remove message handlers


SYNOPSIS
Expand All @@ -14,28 +15,29 @@ SYNOPSIS

struct flux_msg_handler_spec {
int typemask;
char *topic_glob;
const char *topic_glob;
flux_msg_handler_f cb;
uint32_t rolemask;
flux_msg_handler_t *w;
};

int flux_msg_handler_addvec (flux_t *h,
struct flux_msg_handler_spec tab[],
void *arg);
const struct flux_msg_handler_spec tab[],
void *arg,
flux_msg_handler_t **handlers[]);

void flux_msg_handler_delvec (struct flux_msg_handler_spec tab[]);
void flux_msg_handler_delvec (flux_msg_handler_t *handlers[]);


DESCRIPTION
-----------

`flux_msg_handler_addvec()` creates and starts an array of message handlers,
terminated by FLUX_MSGHANDLER_TABLE_END. The new message handler objects
are stored in the array.
are assigned to an internally allocated array, returned in _handlers_.
The last entry in the array is set to NULL.

`flux_msg_handler_delvec()` stops and destroys an array of message handler
objects, terminated by FLUX_MSGHANDLER_TABLE_END.
`flux_msg_handler_delvec()` stops and destroys an array of message handlers
returned from `flux_msg_handler_addvec()`.

These functions are convenience functions which call
`flux_msg_handler_create(3)`, `flux_msg_handler_start(3)`; and
Expand Down
11 changes: 6 additions & 5 deletions doc/man3/treduce.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ void heartbeat_cb (flux_t *h, flux_msg_handler_t *mh,
free (item);
}

struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_EVENT, "hb", heartbeat_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "treduce.forward", forward_cb, 0, NULL },
const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_EVENT, "hb", heartbeat_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "treduce.forward", forward_cb, 0 },
FLUX_MSGHANDLER_TABLE_END,
};

Expand All @@ -123,6 +123,7 @@ int mod_main (flux_t *h, int argc, char **argv)
uint32_t rank;
double timeout = 0.;
int flags;
flux_msg_handler_t **handlers = NULL;

if (argc == 1) {
timeout = strtod (argv[0], NULL);
Expand All @@ -139,11 +140,11 @@ int mod_main (flux_t *h, int argc, char **argv)
return -1;
if (flux_event_subscribe (h, "hb") < 0)
return -1;
if (flux_msg_handler_addvec (h, htab, &ctx) < 0)
if (flux_msg_handler_addvec (h, htab, &ctx, &handlers) < 0)
return -1;
if (flux_reactor_run (flux_get_reactor (h), 0) < 0)
return -1;
flux_msg_handler_delvec (htab);
flux_msg_handler_delvec (handlers);
return 0;
}

Expand Down
18 changes: 8 additions & 10 deletions src/broker/attr.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

struct attr_struct {
zhash_t *hash;
flux_msg_handler_t **handlers;
};

struct entry {
Expand Down Expand Up @@ -415,27 +416,24 @@ void lsattr_request_cb (flux_t *h, flux_msg_handler_t *mh,
** Initialization
**/

static struct flux_msg_handler_spec handlers[] = {
{ FLUX_MSGTYPE_REQUEST, "attr.get", getattr_request_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "attr.list", lsattr_request_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "attr.set", setattr_request_cb, 0, NULL },
static const struct flux_msg_handler_spec handlers[] = {
{ FLUX_MSGTYPE_REQUEST, "attr.get", getattr_request_cb, FLUX_ROLE_ALL },
{ FLUX_MSGTYPE_REQUEST, "attr.list", lsattr_request_cb, FLUX_ROLE_ALL },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change from 0 (FLUX_ROLE_NONE) to FLUX_ROLE_ALL looks like something that should be broken out into a separate commit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a hard one to split out. The code was setting the rolemask to 0, then changing it with code like

flux_msg_handler_allow_rolemask (handlers[0].w, FLUX_ROLE_ALL);

Since w is gone, I thought it best to change the initial setting rather than walk the handlers list or whatever.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused about why that is hard. Doesn't

 flux_msg_handler_allow_rolemask (handlers[0].w, FLUX_ROLE_ALL);

just change into

flux_msg_handler_allow_rolemask (attrs->handlers[0], FLUX_ROLE_ALL);

No walking seems to be required.

But again, I perfectly fine with changing the place the rolemask is set. You could even make that rolemask location change in a commit before this one. It is just a change that isn't really pertinent to this commit.

This is minor enough though that I am willing to let it slide.

{ FLUX_MSGTYPE_REQUEST, "attr.set", setattr_request_cb, 0 },
FLUX_MSGHANDLER_TABLE_END,
};


int attr_register_handlers (attr_t *attrs, flux_t *h)
{
if (flux_msg_handler_addvec (h, handlers, attrs) < 0)
if (flux_msg_handler_addvec (h, handlers, attrs, &attrs->handlers) < 0)
return -1;
/* allow any user to attr.get and attr.list */
flux_msg_handler_allow_rolemask (handlers[0].w, FLUX_ROLE_ALL);
flux_msg_handler_allow_rolemask (handlers[1].w, FLUX_ROLE_ALL);
return 0;
}

void attr_unregister_handlers (void)
void attr_unregister_handlers (attr_t *attrs)
{
flux_msg_handler_delvec (handlers);
flux_msg_handler_delvec (attrs->handlers);
}

attr_t *attr_create (void)
Expand Down
2 changes: 1 addition & 1 deletion src/broker/attr.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ void attr_destroy (attr_t *attrs);
/* Register/unregister message handlers
*/
int attr_register_handlers (attr_t *attrs, flux_t *h);
void attr_unregister_handlers (void);
void attr_unregister_handlers (attr_t *attrs);

/* Delete an attribute
*/
Expand Down
43 changes: 28 additions & 15 deletions src/broker/broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ static void signal_cb (flux_reactor_t *r, flux_watcher_t *w,
static void broker_handle_signals (broker_ctx_t *ctx, zlist_t *sigwatchers);
static void broker_unhandle_signals (zlist_t *sigwatchers);

static void broker_add_services (broker_ctx_t *ctx);
static flux_msg_handler_t **broker_add_services (broker_ctx_t *ctx);
static void broker_remove_services (flux_msg_handler_t *handlers[]);

static int load_module_byname (broker_ctx_t *ctx, const char *name,
const char *argz, size_t argz_len,
Expand Down Expand Up @@ -310,6 +311,7 @@ int main (int argc, char *argv[])
sigset_t old_sigmask;
struct sigaction old_sigact_int;
struct sigaction old_sigact_term;
flux_msg_handler_t **handlers;

memset (&ctx, 0, sizeof (ctx));
log_init (argv[0]);
Expand Down Expand Up @@ -579,7 +581,7 @@ int main (int argc, char *argv[])
if (rusage_initialize (ctx.h, "cmb") < 0)
log_err_exit ("rusage_initialize");

broker_add_services (&ctx);
handlers = broker_add_services (&ctx);

/* Initialize comms module infrastructure.
*/
Expand Down Expand Up @@ -651,7 +653,7 @@ int main (int argc, char *argv[])

/* Unregister builtin services
*/
attr_unregister_handlers ();
attr_unregister_handlers (ctx.attrs);
content_cache_destroy (ctx.cache);

broker_unhandle_signals (sigwatchers);
Expand All @@ -668,6 +670,8 @@ int main (int argc, char *argv[])
service_switch_destroy (ctx.services);
hello_destroy (ctx.hello);
attr_destroy (ctx.attrs);
shutdown_destroy (ctx.shutdown);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a good improvement, but unrelated to this commit's logical topic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, will split out.

broker_remove_services (handlers);
flux_close (ctx.h);
flux_reactor_destroy (ctx.reactor);
if (ctx.subscriptions) {
Expand Down Expand Up @@ -1694,16 +1698,16 @@ static int route_to_handle (const flux_msg_t *msg, void *arg)
return 0;
}

static struct flux_msg_handler_spec handlers[] = {
{ FLUX_MSGTYPE_REQUEST, "cmb.rmmod", cmb_rmmod_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.insmod", cmb_insmod_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.lsmod", cmb_lsmod_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.lspeer", cmb_lspeer_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.panic", cmb_panic_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.event-mute", cmb_event_mute_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.disconnect", cmb_disconnect_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.sub", cmb_sub_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.unsub", cmb_unsub_cb, 0, NULL },
static const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_REQUEST, "cmb.rmmod", cmb_rmmod_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.insmod", cmb_insmod_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.lsmod", cmb_lsmod_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.lspeer", cmb_lspeer_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.panic", cmb_panic_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.event-mute", cmb_event_mute_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.disconnect", cmb_disconnect_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.sub", cmb_sub_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.unsub", cmb_unsub_cb, 0 },
FLUX_MSGHANDLER_TABLE_END,
};

Expand All @@ -1727,8 +1731,9 @@ static struct internal_service services[] = {
* Register message handlers for some cmb services. Others are registered
* in their own initialization functions.
*/
static void broker_add_services (broker_ctx_t *ctx)
static flux_msg_handler_t **broker_add_services (broker_ctx_t *ctx)
{
flux_msg_handler_t **handlers;
struct internal_service *svc;
for (svc = &services[0]; svc->name != NULL; svc++) {
if (!nodeset_member (svc->nodeset, overlay_get_rank(ctx->overlay)))
Expand All @@ -1738,8 +1743,16 @@ static void broker_add_services (broker_ctx_t *ctx)
log_err_exit ("error registering service for %s", svc->name);
}

if (flux_msg_handler_addvec (ctx->h, handlers, ctx) < 0)
if (flux_msg_handler_addvec (ctx->h, htab, ctx, &handlers) < 0)
log_err_exit ("error registering message handlers");
return handlers;
}

/* Unregister message handlers
*/
static void broker_remove_services (flux_msg_handler_t *handlers[])
{
flux_msg_handler_delvec (handlers);
}

/**
Expand Down
23 changes: 13 additions & 10 deletions src/broker/content-cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ struct cache_entry {
struct content_cache {
flux_t *h;
flux_t *enclosing_h;
flux_msg_handler_t **handlers;
uint32_t rank;
zhash_t *entries;
uint8_t backing:1; /* 'content.backing' service available */
Expand Down Expand Up @@ -854,22 +855,24 @@ static void heartbeat_event (flux_t *h, flux_msg_handler_t *mh,
/* Initialization
*/

static struct flux_msg_handler_spec handlers[] = {
{ FLUX_MSGTYPE_REQUEST, "content.load", content_load_request, FLUX_ROLE_USER, NULL },
{ FLUX_MSGTYPE_REQUEST, "content.store", content_store_request, FLUX_ROLE_USER, NULL },
{ FLUX_MSGTYPE_REQUEST, "content.backing", content_backing_request, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "content.dropcache", content_dropcache_request, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "content.stats.get", content_stats_request, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "content.flush", content_flush_request, 0, NULL },
{ FLUX_MSGTYPE_EVENT, "hb", heartbeat_event, 0, NULL },
static const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_REQUEST, "content.load", content_load_request,
FLUX_ROLE_USER },
{ FLUX_MSGTYPE_REQUEST, "content.store", content_store_request,
FLUX_ROLE_USER },
{ FLUX_MSGTYPE_REQUEST, "content.backing", content_backing_request, 0 },
{ FLUX_MSGTYPE_REQUEST, "content.dropcache", content_dropcache_request, 0 },
{ FLUX_MSGTYPE_REQUEST, "content.stats.get", content_stats_request, 0 },
{ FLUX_MSGTYPE_REQUEST, "content.flush", content_flush_request, 0 },
{ FLUX_MSGTYPE_EVENT, "hb", heartbeat_event, 0 },
FLUX_MSGHANDLER_TABLE_END,
};

int content_cache_set_flux (content_cache_t *cache, flux_t *h)
{
cache->h = h;

if (flux_msg_handler_addvec (h, handlers, cache) < 0)
if (flux_msg_handler_addvec (h, htab, cache, &cache->handlers) < 0)
return -1;
if (flux_get_rank (h, &cache->rank) < 0)
return -1;
Expand Down Expand Up @@ -975,7 +978,7 @@ void content_cache_destroy (content_cache_t *cache)
if (cache) {
if (cache->h) {
(void)flux_event_unsubscribe (cache->h, "hb");
flux_msg_handler_delvec (handlers);
flux_msg_handler_delvec (cache->handlers);
}
if (cache->backing_name)
free (cache->backing_name);
Expand Down
15 changes: 8 additions & 7 deletions src/broker/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

typedef struct {
flux_t *h;
flux_msg_handler_t **handlers;
struct subprocess_manager *sm;
uint32_t rank;
const char *local_uri;
Expand Down Expand Up @@ -507,19 +508,19 @@ static void ps_request_cb (flux_t *h, flux_msg_handler_t *mh,
json_decref (procs);
}

static struct flux_msg_handler_spec handlers[] = {
{ FLUX_MSGTYPE_REQUEST, "cmb.exec", exec_request_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.exec.signal", signal_request_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.exec.write", write_request_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.processes", ps_request_cb, 0, NULL },
static const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_REQUEST, "cmb.exec", exec_request_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.exec.signal", signal_request_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.exec.write", write_request_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.processes", ps_request_cb, 0 },
FLUX_MSGHANDLER_TABLE_END,
};

static void exec_finalize (void *arg)
{
exec_t *x = arg;
flux_msg_handler_delvec (x->handlers);
free (x);
flux_msg_handler_delvec (handlers);
}

int exec_initialize (flux_t *h, struct subprocess_manager *sm,
Expand All @@ -537,7 +538,7 @@ int exec_initialize (flux_t *h, struct subprocess_manager *sm,
free (x);
return -1;
}
if (flux_msg_handler_addvec (h, handlers, x) < 0) {
if (flux_msg_handler_addvec (h, htab, x, &x->handlers) < 0) {
free (x);
return -1;
}
Expand Down
12 changes: 7 additions & 5 deletions src/broker/heaptrace.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
#include <flux/core.h>
#include "heaptrace.h"

static flux_msg_handler_t **handlers = NULL;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In C, file scope variables are already initialized to 0 so the "= NULL" is redundant. Not a big deal.


static void start_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
Expand Down Expand Up @@ -111,10 +113,10 @@ static void stop_cb (flux_t *h, flux_msg_handler_t *mh,
FLUX_LOG_ERROR (h);
}

static struct flux_msg_handler_spec handlers[] = {
{ FLUX_MSGTYPE_REQUEST, "heaptrace.start", start_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "heaptrace.dump", dump_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "heaptrace.stop", stop_cb, 0, NULL },
static const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_REQUEST, "heaptrace.start", start_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "heaptrace.dump", dump_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "heaptrace.stop", stop_cb, 0 },
FLUX_MSGHANDLER_TABLE_END,
};

Expand All @@ -126,7 +128,7 @@ static void heaptrace_finalize (void *arg)
int heaptrace_initialize (flux_t *h)
{
char *dummy = "hello";
if (flux_msg_handler_addvec (h, handlers, NULL) < 0)
if (flux_msg_handler_addvec (h, htab, NULL, &handlers) < 0)
return -1;
flux_aux_set (h, "flux::heaptrace", dummy, heaptrace_finalize);
return 0;
Expand Down
9 changes: 6 additions & 3 deletions src/broker/hello.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ static double default_reduction_timeout = 10.;
struct hello_struct {
flux_t *h;
attr_t *attrs;
flux_msg_handler_t **handlers;
uint32_t rank;
uint32_t size;
uint32_t count;
Expand Down Expand Up @@ -82,6 +83,7 @@ void hello_destroy (hello_t *hello)
{
if (hello) {
flux_reduce_destroy (hello->reduce);
flux_msg_handler_delvec (hello->handlers);
free (hello);
}
}
Expand Down Expand Up @@ -123,8 +125,8 @@ int hello_register_attrs (hello_t *hello, attr_t *attrs)
return 0;
}

static struct flux_msg_handler_spec handlers[] = {
{ FLUX_MSGTYPE_REQUEST, "hello.join", join_request, 0, NULL },
static const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_REQUEST, "hello.join", join_request, 0 },
FLUX_MSGHANDLER_TABLE_END,
};

Expand Down Expand Up @@ -169,7 +171,8 @@ int hello_start (hello_t *hello)
log_err ("hello: error getting rank/size");
goto done;
}
if (flux_msg_handler_addvec (hello->h, handlers, hello) < 0) {
if (flux_msg_handler_addvec (hello->h, htab, hello,
&hello->handlers) < 0) {
log_err ("hello: adding message handlers");
goto done;
}
Expand Down
Loading