Skip to content

Commit

Permalink
libflux/msg_handler: clean up addvec/delvec
Browse files Browse the repository at this point in the history
Problem: use of struct flux_msg_handler_spec for both
addvec and delvec functions is confusing.

Drop flux_msg_handler_t * member from struct flux_msg_handler_spec.
In flux_msg_handler_addvec(), make 'tab' parameter const (purely
input), and add an output parameter which is a NULL terminated
list of flux_msg_handler_t's.  The list of message handlers becomes
the sole input to flux_msg_handler_delvec.

Update users and tests.

Fixes flux-framework#1135.
  • Loading branch information
garlick committed Nov 8, 2017
1 parent a2c87d1 commit c076ddc
Show file tree
Hide file tree
Showing 28 changed files with 283 additions and 241 deletions.
11 changes: 6 additions & 5 deletions doc/man3/treduce.c
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ void heartbeat_cb (flux_t *h, flux_msg_handler_t *w,
free (item);
}

struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_EVENT, "hb", heartbeat_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "treduce.forward", forward_cb, 0, NULL },
const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_EVENT, "hb", heartbeat_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "treduce.forward", forward_cb, 0 },
FLUX_MSGHANDLER_TABLE_END,
};

Expand All @@ -123,6 +123,7 @@ int mod_main (flux_t *h, int argc, char **argv)
uint32_t rank;
double timeout = 0.;
int flags;
flux_msg_handler_t **handlers = NULL;

if (argc == 1) {
timeout = strtod (argv[0], NULL);
Expand All @@ -139,11 +140,11 @@ int mod_main (flux_t *h, int argc, char **argv)
return -1;
if (flux_event_subscribe (h, "hb") < 0)
return -1;
if (flux_msg_handler_addvec (h, htab, &ctx) < 0)
if (flux_msg_handler_addvec (h, htab, &ctx, &handlers) < 0)
return -1;
if (flux_reactor_run (flux_get_reactor (h), 0) < 0)
return -1;
flux_msg_handler_delvec (htab);
flux_msg_handler_delvec (handlers);
return 0;
}

Expand Down
18 changes: 8 additions & 10 deletions src/broker/attr.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

struct attr_struct {
zhash_t *hash;
flux_msg_handler_t **handlers;
};

struct entry {
Expand Down Expand Up @@ -415,27 +416,24 @@ void lsattr_request_cb (flux_t *h, flux_msg_handler_t *w,
** Initialization
**/

static struct flux_msg_handler_spec handlers[] = {
{ FLUX_MSGTYPE_REQUEST, "attr.get", getattr_request_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "attr.list", lsattr_request_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "attr.set", setattr_request_cb, 0, NULL },
static const struct flux_msg_handler_spec handlers[] = {
{ FLUX_MSGTYPE_REQUEST, "attr.get", getattr_request_cb, FLUX_ROLE_ALL },
{ FLUX_MSGTYPE_REQUEST, "attr.list", lsattr_request_cb, FLUX_ROLE_ALL },
{ FLUX_MSGTYPE_REQUEST, "attr.set", setattr_request_cb, 0 },
FLUX_MSGHANDLER_TABLE_END,
};


int attr_register_handlers (attr_t *attrs, flux_t *h)
{
if (flux_msg_handler_addvec (h, handlers, attrs) < 0)
if (flux_msg_handler_addvec (h, handlers, attrs, &attrs->handlers) < 0)
return -1;
/* allow any user to attr.get and attr.list */
flux_msg_handler_allow_rolemask (handlers[0].w, FLUX_ROLE_ALL);
flux_msg_handler_allow_rolemask (handlers[1].w, FLUX_ROLE_ALL);
return 0;
}

void attr_unregister_handlers (void)
void attr_unregister_handlers (attr_t *attrs)
{
flux_msg_handler_delvec (handlers);
flux_msg_handler_delvec (attrs->handlers);
}

attr_t *attr_create (void)
Expand Down
2 changes: 1 addition & 1 deletion src/broker/attr.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ void attr_destroy (attr_t *attrs);
/* Register/unregister message handlers
*/
int attr_register_handlers (attr_t *attrs, flux_t *h);
void attr_unregister_handlers (void);
void attr_unregister_handlers (attr_t *attrs);

/* Delete an attribute
*/
Expand Down
27 changes: 15 additions & 12 deletions src/broker/broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ typedef struct {
zlist_t *subscriptions; /* subscripts for internal services */
content_cache_t *cache;
int tbon_k;
flux_msg_handler_t **handlers; /* internal broker message handlers */
/* Bootstrap
*/
hello_t *hello;
Expand Down Expand Up @@ -651,7 +652,7 @@ int main (int argc, char *argv[])

/* Unregister builtin services
*/
attr_unregister_handlers ();
attr_unregister_handlers (ctx.attrs);
content_cache_destroy (ctx.cache);

broker_unhandle_signals (sigwatchers);
Expand All @@ -668,6 +669,8 @@ int main (int argc, char *argv[])
service_switch_destroy (ctx.services);
hello_destroy (ctx.hello);
attr_destroy (ctx.attrs);
shutdown_destroy (ctx.shutdown);
flux_msg_handler_delvec (ctx.handlers);
flux_close (ctx.h);
flux_reactor_destroy (ctx.reactor);
if (ctx.subscriptions) {
Expand Down Expand Up @@ -1694,16 +1697,16 @@ static int route_to_handle (const flux_msg_t *msg, void *arg)
return 0;
}

static struct flux_msg_handler_spec handlers[] = {
{ FLUX_MSGTYPE_REQUEST, "cmb.rmmod", cmb_rmmod_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.insmod", cmb_insmod_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.lsmod", cmb_lsmod_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.lspeer", cmb_lspeer_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.panic", cmb_panic_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.event-mute", cmb_event_mute_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.disconnect", cmb_disconnect_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.sub", cmb_sub_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.unsub", cmb_unsub_cb, 0, NULL },
static const struct flux_msg_handler_spec handlers[] = {
{ FLUX_MSGTYPE_REQUEST, "cmb.rmmod", cmb_rmmod_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.insmod", cmb_insmod_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.lsmod", cmb_lsmod_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.lspeer", cmb_lspeer_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.panic", cmb_panic_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.event-mute", cmb_event_mute_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.disconnect", cmb_disconnect_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.sub", cmb_sub_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.unsub", cmb_unsub_cb, 0 },
FLUX_MSGHANDLER_TABLE_END,
};

Expand Down Expand Up @@ -1738,7 +1741,7 @@ static void broker_add_services (broker_ctx_t *ctx)
log_err_exit ("error registering service for %s", svc->name);
}

if (flux_msg_handler_addvec (ctx->h, handlers, ctx) < 0)
if (flux_msg_handler_addvec (ctx->h, handlers, ctx, &ctx->handlers) < 0)
log_err_exit ("error registering message handlers");
}

Expand Down
23 changes: 13 additions & 10 deletions src/broker/content-cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ struct cache_entry {
struct content_cache {
flux_t *h;
flux_t *enclosing_h;
flux_msg_handler_t **handlers;
uint32_t rank;
zhash_t *entries;
uint8_t backing:1; /* 'content.backing' service available */
Expand Down Expand Up @@ -855,22 +856,24 @@ static void heartbeat_event (flux_t *h, flux_msg_handler_t *w,
/* Initialization
*/

static struct flux_msg_handler_spec handlers[] = {
{ FLUX_MSGTYPE_REQUEST, "content.load", content_load_request, FLUX_ROLE_USER, NULL },
{ FLUX_MSGTYPE_REQUEST, "content.store", content_store_request, FLUX_ROLE_USER, NULL },
{ FLUX_MSGTYPE_REQUEST, "content.backing", content_backing_request, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "content.dropcache", content_dropcache_request, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "content.stats.get", content_stats_request, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "content.flush", content_flush_request, 0, NULL },
{ FLUX_MSGTYPE_EVENT, "hb", heartbeat_event, 0, NULL },
static const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_REQUEST, "content.load", content_load_request,
FLUX_ROLE_USER },
{ FLUX_MSGTYPE_REQUEST, "content.store", content_store_request,
FLUX_ROLE_USER },
{ FLUX_MSGTYPE_REQUEST, "content.backing", content_backing_request, 0 },
{ FLUX_MSGTYPE_REQUEST, "content.dropcache", content_dropcache_request, 0 },
{ FLUX_MSGTYPE_REQUEST, "content.stats.get", content_stats_request, 0 },
{ FLUX_MSGTYPE_REQUEST, "content.flush", content_flush_request, 0 },
{ FLUX_MSGTYPE_EVENT, "hb", heartbeat_event, 0 },
FLUX_MSGHANDLER_TABLE_END,
};

int content_cache_set_flux (content_cache_t *cache, flux_t *h)
{
cache->h = h;

if (flux_msg_handler_addvec (h, handlers, cache) < 0)
if (flux_msg_handler_addvec (h, htab, cache, &cache->handlers) < 0)
return -1;
if (flux_get_rank (h, &cache->rank) < 0)
return -1;
Expand Down Expand Up @@ -976,7 +979,7 @@ void content_cache_destroy (content_cache_t *cache)
if (cache) {
if (cache->h) {
(void)flux_event_unsubscribe (cache->h, "hb");
flux_msg_handler_delvec (handlers);
flux_msg_handler_delvec (cache->handlers);
}
if (cache->backing_name)
free (cache->backing_name);
Expand Down
15 changes: 8 additions & 7 deletions src/broker/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

typedef struct {
flux_t *h;
flux_msg_handler_t **handlers;
struct subprocess_manager *sm;
uint32_t rank;
const char *local_uri;
Expand Down Expand Up @@ -507,19 +508,19 @@ static void ps_request_cb (flux_t *h, flux_msg_handler_t *w,
json_decref (procs);
}

static struct flux_msg_handler_spec handlers[] = {
{ FLUX_MSGTYPE_REQUEST, "cmb.exec", exec_request_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.exec.signal", signal_request_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.exec.write", write_request_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "cmb.processes", ps_request_cb, 0, NULL },
static const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_REQUEST, "cmb.exec", exec_request_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.exec.signal", signal_request_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.exec.write", write_request_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "cmb.processes", ps_request_cb, 0 },
FLUX_MSGHANDLER_TABLE_END,
};

static void exec_finalize (void *arg)
{
exec_t *x = arg;
flux_msg_handler_delvec (x->handlers);
free (x);
flux_msg_handler_delvec (handlers);
}

int exec_initialize (flux_t *h, struct subprocess_manager *sm,
Expand All @@ -537,7 +538,7 @@ int exec_initialize (flux_t *h, struct subprocess_manager *sm,
free (x);
return -1;
}
if (flux_msg_handler_addvec (h, handlers, x) < 0) {
if (flux_msg_handler_addvec (h, htab, x, &x->handlers) < 0) {
free (x);
return -1;
}
Expand Down
12 changes: 7 additions & 5 deletions src/broker/heaptrace.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
#include <flux/core.h>
#include "heaptrace.h"

static flux_msg_handler_t **handlers = NULL;

static void start_cb (flux_t *h, flux_msg_handler_t *w,
const flux_msg_t *msg, void *arg)
{
Expand Down Expand Up @@ -111,10 +113,10 @@ static void stop_cb (flux_t *h, flux_msg_handler_t *w,
FLUX_LOG_ERROR (h);
}

static struct flux_msg_handler_spec handlers[] = {
{ FLUX_MSGTYPE_REQUEST, "heaptrace.start", start_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "heaptrace.dump", dump_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "heaptrace.stop", stop_cb, 0, NULL },
static const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_REQUEST, "heaptrace.start", start_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "heaptrace.dump", dump_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "heaptrace.stop", stop_cb, 0 },
FLUX_MSGHANDLER_TABLE_END,
};

Expand All @@ -126,7 +128,7 @@ static void heaptrace_finalize (void *arg)
int heaptrace_initialize (flux_t *h)
{
char *dummy = "hello";
if (flux_msg_handler_addvec (h, handlers, NULL) < 0)
if (flux_msg_handler_addvec (h, htab, NULL, &handlers) < 0)
return -1;
flux_aux_set (h, "flux::heaptrace", dummy, heaptrace_finalize);
return 0;
Expand Down
9 changes: 6 additions & 3 deletions src/broker/hello.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ static double default_reduction_timeout = 10.;
struct hello_struct {
flux_t *h;
attr_t *attrs;
flux_msg_handler_t **handlers;
uint32_t rank;
uint32_t size;
uint32_t count;
Expand Down Expand Up @@ -82,6 +83,7 @@ void hello_destroy (hello_t *hello)
{
if (hello) {
flux_reduce_destroy (hello->reduce);
flux_msg_handler_delvec (hello->handlers);
free (hello);
}
}
Expand Down Expand Up @@ -123,8 +125,8 @@ int hello_register_attrs (hello_t *hello, attr_t *attrs)
return 0;
}

static struct flux_msg_handler_spec handlers[] = {
{ FLUX_MSGTYPE_REQUEST, "hello.join", join_request, 0, NULL },
static const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_REQUEST, "hello.join", join_request, 0 },
FLUX_MSGHANDLER_TABLE_END,
};

Expand Down Expand Up @@ -169,7 +171,8 @@ int hello_start (hello_t *hello)
log_err ("hello: error getting rank/size");
goto done;
}
if (flux_msg_handler_addvec (hello->h, handlers, hello) < 0) {
if (flux_msg_handler_addvec (hello->h, htab, hello,
&hello->handlers) < 0) {
log_err ("hello: adding message handlers");
goto done;
}
Expand Down
15 changes: 8 additions & 7 deletions src/broker/log.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ static const int default_level = LOG_DEBUG;
typedef struct {
int magic;
flux_t *h;
flux_msg_handler_t **handlers;
uint32_t rank;
char *filename;
FILE *f;
Expand Down Expand Up @@ -642,19 +643,19 @@ static void disconnect_request_cb (flux_t *h, flux_msg_handler_t *w,
/* no response */
}

static struct flux_msg_handler_spec handlers[] = {
{ FLUX_MSGTYPE_REQUEST, "log.append", append_request_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "log.clear", clear_request_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "log.dmesg", dmesg_request_cb, 0, NULL },
{ FLUX_MSGTYPE_REQUEST, "log.disconnect", disconnect_request_cb, 0, NULL },
static const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_REQUEST, "log.append", append_request_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "log.clear", clear_request_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "log.dmesg", dmesg_request_cb, 0 },
{ FLUX_MSGTYPE_REQUEST, "log.disconnect", disconnect_request_cb, 0 },
FLUX_MSGHANDLER_TABLE_END,
};

static void logbuf_finalize (void *arg)
{
logbuf_t *logbuf = arg;
flux_msg_handler_delvec (logbuf->handlers);
logbuf_destroy (logbuf);
flux_msg_handler_delvec (handlers);
/* FIXME: need logbuf_unregister_attrs() */
}

Expand All @@ -674,7 +675,7 @@ int logbuf_initialize (flux_t *h, uint32_t rank, attr_t *attrs)
goto error;
if (fake_rank (h, rank) < 0)
goto error;
if (flux_msg_handler_addvec (h, handlers, logbuf) < 0)
if (flux_msg_handler_addvec (h, htab, logbuf, &logbuf->handlers) < 0)
goto error;
flux_log_set_appname (h, "broker");
flux_log_set_redirect (h, logbuf_append_redirect, logbuf);
Expand Down
9 changes: 5 additions & 4 deletions src/broker/sequence.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

typedef struct {
zhash_t *vhash;
flux_msg_handler_t **handlers;
} seqhash_t;

static seqhash_t * sequence_hash_create (void)
Expand Down Expand Up @@ -231,24 +232,24 @@ static void sequence_request_cb (flux_t *h, flux_msg_handler_t *w,
}
}

static struct flux_msg_handler_spec handlers[] = {
{ FLUX_MSGTYPE_REQUEST, "seq.*", sequence_request_cb, 0, NULL },
static const struct flux_msg_handler_spec htab[] = {
{ FLUX_MSGTYPE_REQUEST, "seq.*", sequence_request_cb, 0 },
FLUX_MSGHANDLER_TABLE_END,
};

static void sequence_hash_finalize (void *arg)
{
seqhash_t *seq = arg;
flux_msg_handler_delvec (seq->handlers);
sequence_hash_destroy (seq);
flux_msg_handler_delvec (handlers);
}

int sequence_hash_initialize (flux_t *h)
{
seqhash_t *seq = sequence_hash_create ();
if (!seq)
return -1;
if (flux_msg_handler_addvec (h, handlers, seq) < 0) {
if (flux_msg_handler_addvec (h, htab, seq, &seq->handlers) < 0) {
sequence_hash_destroy (seq);
return -1;
}
Expand Down
Loading

0 comments on commit c076ddc

Please sign in to comment.