diff --git a/doc/man3/treduce.c b/doc/man3/treduce.c index 21f162cc4619..02f7b058122a 100644 --- a/doc/man3/treduce.c +++ b/doc/man3/treduce.c @@ -73,7 +73,7 @@ void reduce (flux_reduce_t *r, int batchnum, void *arg) } } -void forward_cb (flux_t *h, flux_msg_handler_t *w, +void forward_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { struct context *ctx = arg; @@ -94,7 +94,7 @@ void forward_cb (flux_t *h, flux_msg_handler_t *w, Jput (in); } -void heartbeat_cb (flux_t *h, flux_msg_handler_t *w, +void heartbeat_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { struct context *ctx = arg; diff --git a/src/broker/attr.c b/src/broker/attr.c index d6f1f4b650ee..480a143f7be9 100644 --- a/src/broker/attr.c +++ b/src/broker/attr.c @@ -316,7 +316,7 @@ const char *attr_next (attr_t *attrs) ** Service **/ -void getattr_request_cb (flux_t *h, flux_msg_handler_t *w, +void getattr_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { attr_t *attrs = arg; @@ -342,7 +342,7 @@ void getattr_request_cb (flux_t *h, flux_msg_handler_t *w, FLUX_LOG_ERROR (h); } -void setattr_request_cb (flux_t *h, flux_msg_handler_t *w, +void setattr_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { attr_t *attrs = arg; @@ -375,7 +375,7 @@ void setattr_request_cb (flux_t *h, flux_msg_handler_t *w, FLUX_LOG_ERROR (h); } -void lsattr_request_cb (flux_t *h, flux_msg_handler_t *w, +void lsattr_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { attr_t *attrs = arg; diff --git a/src/broker/broker.c b/src/broker/broker.c index 935184beb240..96972c22876d 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -1496,7 +1496,7 @@ static void broker_unhandle_signals (zlist_t *sigwatchers) ** Built-in services **/ -static void cmb_rmmod_cb (flux_t *h, flux_msg_handler_t *w, +static void cmb_rmmod_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { broker_ctx_t *ctx = arg; @@ -1521,7 +1521,7 @@ static void cmb_rmmod_cb (flux_t *h, flux_msg_handler_t *w, free (name); } -static void cmb_insmod_cb (flux_t *h, flux_msg_handler_t *w, +static void cmb_insmod_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { broker_ctx_t *ctx = arg; @@ -1550,7 +1550,7 @@ static void cmb_insmod_cb (flux_t *h, flux_msg_handler_t *w, free (argz); } -static void cmb_lsmod_cb (flux_t *h, flux_msg_handler_t *w, +static void cmb_lsmod_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { broker_ctx_t *ctx = arg; @@ -1575,7 +1575,7 @@ static void cmb_lsmod_cb (flux_t *h, flux_msg_handler_t *w, flux_modlist_destroy (mods); } -static void cmb_lspeer_cb (flux_t *h, flux_msg_handler_t *w, +static void cmb_lspeer_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { broker_ctx_t *ctx = arg; @@ -1591,7 +1591,7 @@ static void cmb_lspeer_cb (flux_t *h, flux_msg_handler_t *w, free (out); } -static void cmb_panic_cb (flux_t *h, flux_msg_handler_t *w, +static void cmb_panic_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { const char *s = NULL; @@ -1608,7 +1608,7 @@ static void cmb_panic_cb (flux_t *h, flux_msg_handler_t *w, flux_log_error (h, "%s: flux_respond", __FUNCTION__); } -static void cmb_event_mute_cb (flux_t *h, flux_msg_handler_t *w, +static void cmb_event_mute_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { broker_ctx_t *ctx = arg; @@ -1620,7 +1620,7 @@ static void cmb_event_mute_cb (flux_t *h, flux_msg_handler_t *w, /* no response */ } -static void cmb_disconnect_cb (flux_t *h, flux_msg_handler_t *w, +static void cmb_disconnect_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { char *sender = NULL;; @@ -1632,7 +1632,7 @@ static void cmb_disconnect_cb (flux_t *h, flux_msg_handler_t *w, /* no response */ } -static void cmb_sub_cb (flux_t *h, flux_msg_handler_t *w, +static void cmb_sub_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { broker_ctx_t *ctx = arg; @@ -1659,7 +1659,7 @@ static void cmb_sub_cb (flux_t *h, flux_msg_handler_t *w, free (uuid); } -static void cmb_unsub_cb (flux_t *h, flux_msg_handler_t *w, +static void cmb_unsub_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { broker_ctx_t *ctx = arg; diff --git a/src/broker/content-cache.c b/src/broker/content-cache.c index b9d0235cdf65..809491b830bb 100644 --- a/src/broker/content-cache.c +++ b/src/broker/content-cache.c @@ -347,7 +347,7 @@ static int cache_load (content_cache_t *cache, struct cache_entry *e) return rc; } -void content_load_request (flux_t *h, flux_msg_handler_t *w, +void content_load_request (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { content_cache_t *cache = arg; @@ -514,7 +514,7 @@ static int cache_store (content_cache_t *cache, struct cache_entry *e) return rc; } -static void content_store_request (flux_t *h, flux_msg_handler_t *w, +static void content_store_request (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { content_cache_t *cache = arg; @@ -632,7 +632,7 @@ static int cache_flush (content_cache_t *cache) return rc; } -static void content_backing_request (flux_t *h, flux_msg_handler_t *w, +static void content_backing_request (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { content_cache_t *cache = arg; @@ -672,7 +672,7 @@ static void content_backing_request (flux_t *h, flux_msg_handler_t *w, * N.B. this walks the entire cache in one go. */ -static void content_dropcache_request (flux_t *h, flux_msg_handler_t *w, +static void content_dropcache_request (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { content_cache_t *cache = arg; @@ -715,7 +715,7 @@ static void content_dropcache_request (flux_t *h, flux_msg_handler_t *w, /* Return stats about the cache. */ -static void content_stats_request (flux_t *h, flux_msg_handler_t *w, +static void content_stats_request (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { content_cache_t *cache = arg; @@ -758,7 +758,7 @@ static void flush_respond (content_cache_t *cache) __FUNCTION__); } -static void content_flush_request (flux_t *h, flux_msg_handler_t *w, +static void content_flush_request (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { content_cache_t *cache = arg; @@ -841,7 +841,7 @@ static int cache_purge (content_cache_t *cache) return rc; } -static void heartbeat_event (flux_t *h, flux_msg_handler_t *w, +static void heartbeat_event (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { content_cache_t *cache = arg; diff --git a/src/broker/exec.c b/src/broker/exec.c index 0c89f771298a..0ce5c1dc3dc4 100644 --- a/src/broker/exec.c +++ b/src/broker/exec.c @@ -202,7 +202,7 @@ static int write_to_child (struct subprocess *p, const char *s) return rc; } -static void write_request_cb (flux_t *h, flux_msg_handler_t *w, +static void write_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { exec_t *x = arg; @@ -235,7 +235,7 @@ static void write_request_cb (flux_t *h, flux_msg_handler_t *w, flux_log_error (h, "write_request_cb: flux_respond_pack"); } -static void signal_request_cb (flux_t *h, flux_msg_handler_t *w, +static void signal_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { exec_t *x = arg; @@ -346,7 +346,7 @@ static int prepare_subprocess (exec_t *x, return -1; } -static void exec_request_cb (flux_t *h, flux_msg_handler_t *w, +static void exec_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { exec_t *x = arg; @@ -475,7 +475,7 @@ static json_t *subprocess_json_info (struct subprocess *p) return NULL; } -static void ps_request_cb (flux_t *h, flux_msg_handler_t *w, +static void ps_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { struct subprocess *p; diff --git a/src/broker/heaptrace.c b/src/broker/heaptrace.c index 923e3cde21db..8c15764bb15d 100644 --- a/src/broker/heaptrace.c +++ b/src/broker/heaptrace.c @@ -38,7 +38,7 @@ #include #include "heaptrace.h" -static void start_cb (flux_t *h, flux_msg_handler_t *w, +static void start_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { const char *filename; @@ -63,7 +63,7 @@ static void start_cb (flux_t *h, flux_msg_handler_t *w, FLUX_LOG_ERROR (h); } -static void dump_cb (flux_t *h, flux_msg_handler_t *w, +static void dump_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { const char *reason; @@ -88,7 +88,7 @@ static void dump_cb (flux_t *h, flux_msg_handler_t *w, FLUX_LOG_ERROR (h); } -static void stop_cb (flux_t *h, flux_msg_handler_t *w, +static void stop_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { if (flux_request_decode (msg, NULL, NULL) < 0) diff --git a/src/broker/heartbeat.c b/src/broker/heartbeat.c index d0974c3648e9..a6b754572040 100644 --- a/src/broker/heartbeat.c +++ b/src/broker/heartbeat.c @@ -38,7 +38,7 @@ struct heartbeat_struct { flux_t *h; double rate; flux_watcher_t *timer; - flux_msg_handler_t *handler; + flux_msg_handler_t *mh; int send_epoch; int epoch; }; @@ -52,7 +52,7 @@ void heartbeat_destroy (heartbeat_t *hb) { if (hb) { flux_watcher_destroy (hb->timer); - flux_msg_handler_destroy (hb->handler); + flux_msg_handler_destroy (hb->mh); free (hb); } } @@ -116,7 +116,7 @@ int heartbeat_get_epoch (heartbeat_t *hb) return hb->epoch; } -static void event_cb (flux_t *h, flux_msg_handler_t *w, +static void event_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { heartbeat_t *hb = arg; @@ -167,9 +167,9 @@ int heartbeat_start (heartbeat_t *hb) flux_watcher_start (hb->timer); } match.topic_glob = "hb"; - if (!(hb->handler = flux_msg_handler_create (hb->h, match, event_cb, hb))) + if (!(hb->mh = flux_msg_handler_create (hb->h, match, event_cb, hb))) return -1; - flux_msg_handler_start (hb->handler); + flux_msg_handler_start (hb->mh); return 0; } @@ -177,8 +177,8 @@ void heartbeat_stop (heartbeat_t *hb) { if (hb->timer) flux_watcher_stop (hb->timer); - if (hb->handler) - flux_msg_handler_stop (hb->handler); + if (hb->mh) + flux_msg_handler_stop (hb->mh); } /* diff --git a/src/broker/hello.c b/src/broker/hello.c index 5b281ee9d197..fab594c9567c 100644 --- a/src/broker/hello.c +++ b/src/broker/hello.c @@ -54,7 +54,7 @@ struct hello_struct { flux_reduce_t *reduce; }; -static void join_request (flux_t *h, flux_msg_handler_t *w, +static void join_request (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg); static void r_reduce (flux_reduce_t *r, int batch, void *arg); @@ -212,7 +212,7 @@ int hello_start (hello_t *hello) /* handle a message sent from downstream via downstream's r_forward op. */ -static void join_request (flux_t *h, flux_msg_handler_t *w, +static void join_request (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { hello_t *hello = arg; diff --git a/src/broker/log.c b/src/broker/log.c index ea6e8a1125bd..f4640a8636e3 100644 --- a/src/broker/log.c +++ b/src/broker/log.c @@ -69,7 +69,7 @@ struct logbuf_entry { struct sleeper { int magic; flux_t *h; - flux_msg_handler_t *w; + flux_msg_handler_t *mh; flux_msg_handler_f fun; flux_msg_t *msg; void *arg; @@ -86,7 +86,7 @@ static void sleeper_destroy (struct sleeper *s) } static struct sleeper *sleeper_create (flux_msg_handler_f fun, - flux_t *h, flux_msg_handler_t *w, + flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { struct sleeper *s = calloc (1, sizeof (*s)); @@ -94,7 +94,7 @@ static struct sleeper *sleeper_create (flux_msg_handler_f fun, return NULL; s->magic = SLEEPER_MAGIC; s->h = h; - s->w = w; + s->mh = mh; s->fun = fun; s->arg = arg; if (!(s->msg = flux_msg_copy (msg, true))) { @@ -167,11 +167,11 @@ static int logbuf_get (logbuf_t *logbuf, int seq_index, int *seq, } static int logbuf_sleepon (logbuf_t *logbuf, flux_msg_handler_f fun, flux_t *h, - flux_msg_handler_t *w, const flux_msg_t *msg, + flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { assert (logbuf->magic == LOGBUF_MAGIC); - struct sleeper *s = sleeper_create (fun, h, w, msg, arg); + struct sleeper *s = sleeper_create (fun, h, mh, msg, arg); if (!s) return -1; if (zlist_append (logbuf->sleepers, s) < 0) { @@ -198,7 +198,7 @@ static int append_new_entry (logbuf_t *logbuf, const char *buf, int len) return -1; } while ((s = zlist_pop (logbuf->sleepers))) { - s->fun (s->h, s->w, s->msg, s->arg); + s->fun (s->h, s->mh, s->msg, s->arg); sleeper_destroy (s); } } @@ -520,7 +520,7 @@ static void logbuf_append_redirect (const char *buf, int len, void *arg) /* N.B. log requests have no response. */ -static void append_request_cb (flux_t *h, flux_msg_handler_t *w, +static void append_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { logbuf_t *logbuf = arg; @@ -548,7 +548,7 @@ static void append_request_cb (flux_t *h, flux_msg_handler_t *w, } } -static void clear_request_cb (flux_t *h, flux_msg_handler_t *w, +static void clear_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { logbuf_t *logbuf = arg; @@ -563,7 +563,7 @@ static void clear_request_cb (flux_t *h, flux_msg_handler_t *w, flux_respond (h, msg, rc < 0 ? errno : 0, NULL); } -static void dmesg_request_cb (flux_t *h, flux_msg_handler_t *w, +static void dmesg_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { logbuf_t *logbuf = arg; @@ -577,7 +577,7 @@ static void dmesg_request_cb (flux_t *h, flux_msg_handler_t *w, goto error; if (logbuf_get (logbuf, seq, &seq, &buf, &len) < 0) { if (follow && errno == ENOENT) { - if (logbuf_sleepon (logbuf, dmesg_request_cb, h, w, msg, arg) < 0) + if (logbuf_sleepon (logbuf, dmesg_request_cb, h, mh, msg, arg) < 0) goto error; return; /* no reply */ } @@ -608,7 +608,7 @@ static int cmp_sender (flux_msg_t *msg, const char *uuid) return rc; } -static void disconnect_request_cb (flux_t *h, flux_msg_handler_t *w, +static void disconnect_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { logbuf_t *logbuf = arg; diff --git a/src/broker/modservice.c b/src/broker/modservice.c index 549d554db96d..0ad27098ef3b 100644 --- a/src/broker/modservice.c +++ b/src/broker/modservice.c @@ -62,9 +62,9 @@ typedef struct { static void freectx (void *arg) { modservice_ctx_t *ctx = arg; - flux_msg_handler_t *w; - while ((w = zlist_pop (ctx->handlers))) - flux_msg_handler_destroy (w); + flux_msg_handler_t *mh; + while ((mh = zlist_pop (ctx->handlers))) + flux_msg_handler_destroy (mh); zlist_destroy (&ctx->handlers); flux_watcher_destroy (ctx->w_prepare); flux_watcher_destroy (ctx->w_check); @@ -86,7 +86,7 @@ static modservice_ctx_t *getctx (flux_t *h, module_t *p) return ctx; } -static void stats_get_cb (flux_t *h, flux_msg_handler_t *w, +static void stats_get_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { flux_msgcounters_t mcs; @@ -105,13 +105,13 @@ static void stats_get_cb (flux_t *h, flux_msg_handler_t *w, FLUX_LOG_ERROR (h); } -static void stats_clear_event_cb (flux_t *h, flux_msg_handler_t *w, +static void stats_clear_event_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { flux_clr_msgcounters (h); } -static void stats_clear_request_cb (flux_t *h, flux_msg_handler_t *w, +static void stats_clear_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { flux_clr_msgcounters (h); @@ -119,13 +119,13 @@ static void stats_clear_request_cb (flux_t *h, flux_msg_handler_t *w, FLUX_LOG_ERROR (h); } -static void shutdown_cb (flux_t *h, flux_msg_handler_t *w, +static void shutdown_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { flux_reactor_stop (flux_get_reactor (h)); } -static void debug_cb (flux_t *h, flux_msg_handler_t *w, +static void debug_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { int flags; @@ -190,13 +190,13 @@ static void register_event (modservice_ctx_t *ctx, const char *name, flux_msg_handler_f cb) { struct flux_match match = FLUX_MATCH_EVENT; - flux_msg_handler_t *w; + flux_msg_handler_t *mh; match.topic_glob = xasprintf ("%s.%s", module_get_name (ctx->p), name); - if (!(w = flux_msg_handler_create (ctx->h, match, cb, ctx->p))) + if (!(mh = flux_msg_handler_create (ctx->h, match, cb, ctx->p))) log_err_exit ("flux_msg_handler_create"); - flux_msg_handler_start (w); - if (zlist_append (ctx->handlers, w) < 0) + flux_msg_handler_start (mh); + if (zlist_append (ctx->handlers, mh) < 0) oom (); if (flux_event_subscribe (ctx->h, match.topic_glob) < 0) log_err_exit ("%s: flux_event_subscribe %s", @@ -208,14 +208,14 @@ static void register_request (modservice_ctx_t *ctx, const char *name, flux_msg_handler_f cb, uint32_t rolemask) { struct flux_match match = FLUX_MATCH_REQUEST; - flux_msg_handler_t *w; + flux_msg_handler_t *mh; match.topic_glob = xasprintf ("%s.%s", module_get_name (ctx->p), name); - if (!(w = flux_msg_handler_create (ctx->h, match, cb, ctx->p))) + if (!(mh = flux_msg_handler_create (ctx->h, match, cb, ctx->p))) log_err_exit ("flux_msg_handler_create"); - flux_msg_handler_allow_rolemask (w, rolemask); - flux_msg_handler_start (w); - if (zlist_append (ctx->handlers, w) < 0) + flux_msg_handler_allow_rolemask (mh, rolemask); + flux_msg_handler_start (mh); + if (zlist_append (ctx->handlers, mh) < 0) oom (); free (match.topic_glob); } diff --git a/src/broker/overlay.c b/src/broker/overlay.c index f51f39c5533a..fc8bce5dd914 100644 --- a/src/broker/overlay.c +++ b/src/broker/overlay.c @@ -85,7 +85,7 @@ typedef struct { bool mute; } child_t; -static void heartbeat_handler (flux_t *h, flux_msg_handler_t *w, +static void heartbeat_handler (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg); static void endpoint_destroy (struct endpoint *ep) @@ -290,7 +290,7 @@ static int overlay_keepalive_parent (overlay_t *ov) return rc; } -static void heartbeat_handler (flux_t *h, flux_msg_handler_t *w, +static void heartbeat_handler (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { overlay_t *ov = arg; diff --git a/src/broker/ping.c b/src/broker/ping.c index 83bd4ecb8227..3f86f592eebd 100644 --- a/src/broker/ping.c +++ b/src/broker/ping.c @@ -30,7 +30,7 @@ #include "ping.h" struct ping_context { - flux_msg_handler_t *w; + flux_msg_handler_t *mh; }; static char *make_json_response_payload (const char *request_payload, @@ -65,7 +65,7 @@ static char *make_json_response_payload (const char *request_payload, return result; } -static void ping_request_cb (flux_t *h, flux_msg_handler_t *w, +static void ping_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { const char *json_str; @@ -109,8 +109,8 @@ static void ping_request_cb (flux_t *h, flux_msg_handler_t *w, static void ping_finalize (void *arg) { struct ping_context *p = arg; - flux_msg_handler_stop (p->w); - flux_msg_handler_destroy (p->w); + flux_msg_handler_stop (p->mh); + flux_msg_handler_destroy (p->mh); free (p); } @@ -127,20 +127,17 @@ int ping_initialize (flux_t *h, const char *service) errno = ENOMEM; goto error; } - if (!(p->w = flux_msg_handler_create (h, match, ping_request_cb, p))) + if (!(p->mh = flux_msg_handler_create (h, match, ping_request_cb, p))) goto error; - flux_msg_handler_allow_rolemask (p->w, FLUX_ROLE_ALL); - flux_msg_handler_start (p->w); + flux_msg_handler_allow_rolemask (p->mh, FLUX_ROLE_ALL); + flux_msg_handler_start (p->mh); flux_aux_set (h, "flux::ping", p, ping_finalize); free (match.topic_glob); return 0; error: - if (p) { - free (match.topic_glob); - flux_msg_handler_stop (p->w); - flux_msg_handler_destroy (p->w); - free (p); - } + free (match.topic_glob); + if (p) + ping_finalize (p); return -1; } diff --git a/src/broker/rusage.c b/src/broker/rusage.c index a7a3f571cad6..8808588dbac4 100644 --- a/src/broker/rusage.c +++ b/src/broker/rusage.c @@ -31,10 +31,7 @@ #include "rusage.h" struct rusage_context { - flux_msg_handler_t *w; -}; - -static void rusage_request_cb (flux_t *h, flux_msg_handler_t *w, + flux_msg_handler_t *mh; }; static void rusage_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { struct rusage ru; @@ -73,8 +70,8 @@ static void rusage_request_cb (flux_t *h, flux_msg_handler_t *w, static void rusage_finalize (void *arg) { struct rusage_context *r = arg; - flux_msg_handler_stop (r->w); - flux_msg_handler_destroy (r->w); + flux_msg_handler_stop (r->mh); + flux_msg_handler_destroy (r->mh); free (r); } @@ -91,19 +88,16 @@ int rusage_initialize (flux_t *h, const char *service) errno = ENOMEM; goto error; } - if (!(r->w = flux_msg_handler_create (h, match, rusage_request_cb, r))) + if (!(r->mh = flux_msg_handler_create (h, match, rusage_request_cb, r))) goto error; - flux_msg_handler_start (r->w); + flux_msg_handler_start (r->mh); flux_aux_set (h, "flux::rusage", r, rusage_finalize); free (match.topic_glob); return 0; error: - if (r) { - free (match.topic_glob); - flux_msg_handler_stop (r->w); - flux_msg_handler_destroy (r->w); - free (r); - } + if (r) + rusage_finalize (r); + free (match.topic_glob); return -1; } diff --git a/src/broker/sequence.c b/src/broker/sequence.c index 099a476db77a..c5262098131c 100644 --- a/src/broker/sequence.c +++ b/src/broker/sequence.c @@ -206,7 +206,7 @@ static int handle_seq_fetch (flux_t *h, seqhash_t *s, const flux_msg_t *msg) "value", v); } -static void sequence_request_cb (flux_t *h, flux_msg_handler_t *w, +static void sequence_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { seqhash_t *seq = arg; diff --git a/src/broker/shutdown.c b/src/broker/shutdown.c index 5cb431cd5082..78fc692c65f3 100644 --- a/src/broker/shutdown.c +++ b/src/broker/shutdown.c @@ -86,7 +86,7 @@ static void timer_handler (flux_reactor_t *r, flux_watcher_t *w, /* On receipt of the shutdown event message, begin the grace timer, * and log the "shutdown in..." message on rank 0. */ -void shutdown_handler (flux_t *h, flux_msg_handler_t *w, +void shutdown_handler (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { shutdown_t *s = arg; diff --git a/src/cmd/builtin/proxy.c b/src/cmd/builtin/proxy.c index d604f04cb206..c5d231e19808 100644 --- a/src/cmd/builtin/proxy.c +++ b/src/cmd/builtin/proxy.c @@ -587,7 +587,7 @@ static void client_read_cb (flux_reactor_t *r, flux_watcher_t *w, * Look up the sender uuid in clients hash and deliver. * Responses for disconnected clients are silently discarded. */ -static void response_cb (flux_t *h, flux_msg_handler_t *w, +static void response_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { proxy_ctx_t *ctx = arg; @@ -632,7 +632,7 @@ static void response_cb (flux_t *h, flux_msg_handler_t *w, /* Received an event message from broker. * Find all subscribers and deliver. */ -static void event_cb (flux_t *h, flux_msg_handler_t *w, +static void event_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { proxy_ctx_t *ctx = arg; diff --git a/src/common/libcompat/reactor.c b/src/common/libcompat/reactor.c index 09502efc796c..8a40f76ab9c3 100644 --- a/src/common/libcompat/reactor.c +++ b/src/common/libcompat/reactor.c @@ -51,7 +51,7 @@ struct ctx { }; struct msg_compat { - flux_msg_handler_t *w; + flux_msg_handler_t *mh; FluxMsgHandler fn; void *arg; }; @@ -121,7 +121,7 @@ static int libzmq_to_events (int events) /* message */ -void msg_compat_cb (flux_t *h, flux_msg_handler_t *w, +void msg_compat_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { struct msg_compat *compat = arg; @@ -141,7 +141,7 @@ void msg_compat_cb (flux_t *h, flux_msg_handler_t *w, static void msg_compat_free (struct msg_compat *c) { if (c) { - flux_msg_handler_destroy (c->w); + flux_msg_handler_destroy (c->mh); free (c); } } @@ -160,11 +160,11 @@ static int msghandler_add (flux_t *h, int typemask, const char *pattern, c->fn = cb; c->arg = arg; - if (!(c->w = flux_msg_handler_create (h, match, msg_compat_cb, c))) { + if (!(c->mh = flux_msg_handler_create (h, match, msg_compat_cb, c))) { free (c); return -1; } - flux_msg_handler_start (c->w); + flux_msg_handler_start (c->mh); snprintf (hashkey, sizeof (hashkey), "msg:%d:%s", typemask, pattern); zhash_update (ctx->watchers, hashkey, c); zhash_freefn (ctx->watchers, hashkey, (zhash_free_fn *)msg_compat_free); @@ -185,7 +185,7 @@ void flux_msghandler_remove (flux_t *h, int typemask, const char *pattern) snprintf (hashkey, sizeof (hashkey), "msg:%d:%s", typemask, pattern); if ((c = zhash_lookup (ctx->watchers, hashkey))) { - flux_msg_handler_stop (c->w); + flux_msg_handler_stop (c->mh); zhash_delete (ctx->watchers, hashkey); } } diff --git a/src/common/libflux/mrpc.c b/src/common/libflux/mrpc.c index c45272f5f4f3..dc5f934427ff 100644 --- a/src/common/libflux/mrpc.c +++ b/src/common/libflux/mrpc.c @@ -54,7 +54,7 @@ struct flux_mrpc_struct { flux_t *h; flux_mrpc_continuation_f then_cb; void *then_arg; - flux_msg_handler_t *w; + flux_msg_handler_t *mh; uint32_t nodeid; flux_msg_t *rx_msg; int rx_errnum; @@ -76,9 +76,9 @@ static void flux_mrpc_usecount_decr (flux_mrpc_t *mrpc) return; assert (mrpc->magic == MRPC_MAGIC); if (--mrpc->usecount == 0) { - if (mrpc->w) { - flux_msg_handler_stop (mrpc->w); - flux_msg_handler_destroy (mrpc->w); + if (mrpc->mh) { + flux_msg_handler_stop (mrpc->mh); + flux_msg_handler_destroy (mrpc->mh); } if (mrpc->m.matchtag != FLUX_MATCHTAG_NONE) { /* FIXME: we cannot safely return matchtags to the pool here @@ -312,7 +312,7 @@ int flux_mrpc_get_nodeid (flux_mrpc_t *mrpc, uint32_t *nodeid) * For the multi-response case, overwrite previous message if * flux_mrpc_next () was not called. */ -static void mrpc_cb (flux_t *h, flux_msg_handler_t *w, +static void mrpc_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { flux_mrpc_t *mrpc = arg; @@ -327,7 +327,7 @@ static void mrpc_cb (flux_t *h, flux_msg_handler_t *w, mrpc->then_cb (mrpc, mrpc->then_arg); done: if (mrpc->rx_count >= mrpc->rx_expected || flux_fatality (mrpc->h)) - flux_msg_handler_stop (mrpc->w); + flux_msg_handler_stop (mrpc->mh); flux_mrpc_usecount_decr (mrpc); } @@ -341,12 +341,12 @@ int flux_mrpc_then (flux_mrpc_t *mrpc, flux_mrpc_continuation_f cb, void *arg) goto done; } if (cb && !mrpc->then_cb) { - if (!mrpc->w) { - if (!(mrpc->w = flux_msg_handler_create (mrpc->h, mrpc->m, - mrpc_cb, mrpc))) + if (!mrpc->mh) { + if (!(mrpc->mh = flux_msg_handler_create (mrpc->h, mrpc->m, + mrpc_cb, mrpc))) goto done; } - flux_msg_handler_start (mrpc->w); + flux_msg_handler_start (mrpc->mh); if (mrpc->rx_msg || mrpc->rx_errnum) { if (mrpc->rx_msg) if (flux_requeue (mrpc->h, mrpc->rx_msg, FLUX_RQ_HEAD) < 0) @@ -354,7 +354,7 @@ int flux_mrpc_then (flux_mrpc_t *mrpc, flux_mrpc_continuation_f cb, void *arg) (void)flux_mrpc_next (mrpc); } } else if (!cb && mrpc->then_cb) { - flux_msg_handler_stop (mrpc->w); + flux_msg_handler_stop (mrpc->mh); } mrpc->then_cb = cb; mrpc->then_arg = arg; diff --git a/src/common/libflux/msg_handler.c b/src/common/libflux/msg_handler.c index 1bff466aba18..0f363d7849f3 100644 --- a/src/common/libflux/msg_handler.c +++ b/src/common/libflux/msg_handler.c @@ -85,7 +85,7 @@ struct flux_msg_handler { static void handle_cb (flux_reactor_t *r, flux_watcher_t *w, int revents, void *arg); -static void free_msg_handler (flux_msg_handler_t *w); +static void free_msg_handler (flux_msg_handler_t *mh); static void fastpath_init (struct fastpath *fp); static void fastpath_free (struct fastpath *fp); @@ -207,16 +207,16 @@ static int fastpath_grow (struct fastpath *fp) } static int fastpath_get (struct fastpath *fp, uint32_t tag, - struct flux_msg_handler **hpp) + struct flux_msg_handler **mhp) { if (tag >= fp->len || fp->map[tag] == NULL) return -1; - *hpp = fp->map[tag]; + *mhp = fp->map[tag]; return 0; } static int fastpath_set (struct fastpath *fp, uint32_t tag, - struct flux_msg_handler *hp) + struct flux_msg_handler *mh) { while (tag >= fp->len) { if (fastpath_grow (fp) < 0) @@ -226,7 +226,7 @@ static int fastpath_set (struct fastpath *fp, uint32_t tag, errno = EINVAL; return -1; } - fp->map[tag] = hp; + fp->map[tag] = mh; return 0; } @@ -237,7 +237,7 @@ static void fastpath_clr (struct fastpath *fp, uint32_t tag) } static int fastpath_response_lookup (struct dispatch *d, const flux_msg_t *msg, - struct flux_msg_handler **hpp) + struct flux_msg_handler **mhp) { uint32_t tag, group; @@ -245,20 +245,20 @@ static int fastpath_response_lookup (struct dispatch *d, const flux_msg_t *msg, return -1; group = tag>>FLUX_MATCHTAG_GROUP_SHIFT; if (group > 0) - return fastpath_get (&d->group, group, hpp); + return fastpath_get (&d->group, group, mhp); else - return fastpath_get (&d->norm, tag, hpp); + return fastpath_get (&d->norm, tag, mhp); } static int fastpath_response_register (struct dispatch *d, - struct flux_msg_handler *hp) + struct flux_msg_handler *mh) { - uint32_t tag = hp->match.matchtag; + uint32_t tag = mh->match.matchtag; uint32_t group = tag>>FLUX_MATCHTAG_GROUP_SHIFT; if (group > 0) - return fastpath_set (&d->group, group, hp); + return fastpath_set (&d->group, group, mh); else - return fastpath_set (&d->norm, tag, hp); + return fastpath_set (&d->norm, tag, mh); } static void fastpath_response_unregister (struct dispatch *d, uint32_t tag) @@ -285,44 +285,45 @@ static int copy_match (struct flux_match *dst, return 0; } -static void call_handler (flux_msg_handler_t *w, const flux_msg_t *msg) +static void call_handler (flux_msg_handler_t *mh, const flux_msg_t *msg) { uint32_t rolemask, matchtag; if (flux_msg_get_rolemask (msg, &rolemask) < 0) return; - if (!(rolemask & w->rolemask)) { + if (!(rolemask & mh->rolemask)) { if (flux_msg_cmp (msg, FLUX_MATCH_REQUEST) && flux_msg_get_matchtag (msg, &matchtag) == 0 && matchtag != FLUX_MATCHTAG_NONE) { - (void)flux_respond (w->d->h, msg, EPERM, NULL); + (void)flux_respond (mh->d->h, msg, EPERM, NULL); } return; } - w->fn (w->d->h, w, msg, w->arg); + mh->fn (mh->d->h, mh, msg, mh->arg); } static bool dispatch_message (struct dispatch *d, const flux_msg_t *msg, int type) { - flux_msg_handler_t *w; + flux_msg_handler_t *mh; bool match = false; /* fastpath */ if (type == FLUX_MSGTYPE_RESPONSE) { - if (fastpath_response_lookup (d, msg, &w) == 0 && w->running - && flux_msg_cmp (msg, w->match)) { - call_handler (w, msg); + if (fastpath_response_lookup (d, msg, &mh) == 0 + && mh->running + && flux_msg_cmp (msg, mh->match)) { + call_handler (mh, msg); match = true; } } /* slowpath */ if (!match) { - FOREACH_ZLIST (d->handlers, w) { - if (!w->running) + FOREACH_ZLIST (d->handlers, mh) { + if (!mh->running) continue; - if (flux_msg_cmp (msg, w->match)) { - call_handler (w, msg); + if (flux_msg_cmp (msg, mh->match)) { + call_handler (mh, msg); if (type != FLUX_MSGTYPE_EVENT) { match = true; break; @@ -445,72 +446,72 @@ static void handle_cb (flux_reactor_t *r, flux_msg_destroy (msg); } -void flux_msg_handler_start (flux_msg_handler_t *w) +void flux_msg_handler_start (flux_msg_handler_t *mh) { - struct dispatch *d = w->d; + struct dispatch *d = mh->d; - assert (w->magic == HANDLER_MAGIC); - if (w->running == 0) { - w->running = 1; + assert (mh->magic == HANDLER_MAGIC); + if (mh->running == 0) { + mh->running = 1; d->running_count++; flux_watcher_start (d->w); } } -void flux_msg_handler_stop (flux_msg_handler_t *w) +void flux_msg_handler_stop (flux_msg_handler_t *mh) { - if (!w) + if (!mh) return; - assert (w->magic == HANDLER_MAGIC); - if (w->running == 1) { - struct dispatch *d = w->d; - w->running = 0; + assert (mh->magic == HANDLER_MAGIC); + if (mh->running == 1) { + struct dispatch *d = mh->d; + mh->running = 0; d->running_count--; if (d->running_count == 0) flux_watcher_stop (d->w); } } -void flux_msg_handler_allow_rolemask (flux_msg_handler_t *w, uint32_t rolemask) +void flux_msg_handler_allow_rolemask (flux_msg_handler_t *mh, uint32_t rolemask) { - if (w) { - w->rolemask |= rolemask; + if (mh) { + mh->rolemask |= rolemask; } } -void flux_msg_handler_deny_rolemask (flux_msg_handler_t *w, uint32_t rolemask) +void flux_msg_handler_deny_rolemask (flux_msg_handler_t *mh, uint32_t rolemask) { - if (w) { - w->rolemask &= ~rolemask; - w->rolemask |= FLUX_ROLE_OWNER; + if (mh) { + mh->rolemask &= ~rolemask; + mh->rolemask |= FLUX_ROLE_OWNER; } } -static void free_msg_handler (flux_msg_handler_t *w) +static void free_msg_handler (flux_msg_handler_t *mh) { - if (w) { - assert (w->magic == HANDLER_MAGIC); - if (w->match.topic_glob) - free (w->match.topic_glob); - w->magic = ~HANDLER_MAGIC; - free (w); + if (mh) { + assert (mh->magic == HANDLER_MAGIC); + if (mh->match.topic_glob) + free (mh->match.topic_glob); + mh->magic = ~HANDLER_MAGIC; + free (mh); } } -void flux_msg_handler_destroy (flux_msg_handler_t *w) +void flux_msg_handler_destroy (flux_msg_handler_t *mh) { - if (w) { - assert (w->magic == HANDLER_MAGIC); - if (w->match.typemask == FLUX_MSGTYPE_RESPONSE - && w->match.matchtag != FLUX_MATCHTAG_NONE) { - fastpath_response_unregister (w->d, w->match.matchtag); + if (mh) { + assert (mh->magic == HANDLER_MAGIC); + if (mh->match.typemask == FLUX_MSGTYPE_RESPONSE + && mh->match.matchtag != FLUX_MATCHTAG_NONE) { + fastpath_response_unregister (mh->d, mh->match.matchtag); } else { - zlist_remove (w->d->handlers_new, w); - zlist_remove (w->d->handlers, w); + zlist_remove (mh->d->handlers_new, mh); + zlist_remove (mh->d->handlers, mh); } - flux_msg_handler_stop (w); - dispatch_usecount_decr (w->d); - free_msg_handler (w); + flux_msg_handler_stop (mh); + dispatch_usecount_decr (mh->d); + free_msg_handler (mh); } } @@ -519,38 +520,38 @@ flux_msg_handler_t *flux_msg_handler_create (flux_t *h, flux_msg_handler_f cb, void *arg) { struct dispatch *d = dispatch_get (h); - flux_msg_handler_t *w = NULL; + flux_msg_handler_t *mh = NULL; int saved_errno; if (!d) { saved_errno = errno; goto error; } - if (!(w = calloc (1, sizeof (*w)))) + if (!(mh = calloc (1, sizeof (*mh)))) goto nomem; - w->magic = HANDLER_MAGIC; - if (copy_match (&w->match, match) < 0) + mh->magic = HANDLER_MAGIC; + if (copy_match (&mh->match, match) < 0) goto nomem; - w->rolemask = FLUX_ROLE_OWNER; - w->fn = cb; - w->arg = arg; - w->d = d; - if (w->match.typemask == FLUX_MSGTYPE_RESPONSE - && w->match.matchtag != FLUX_MATCHTAG_NONE) { - if (fastpath_response_register (d, w) < 0) { + mh->rolemask = FLUX_ROLE_OWNER; + mh->fn = cb; + mh->arg = arg; + mh->d = d; + if (mh->match.typemask == FLUX_MSGTYPE_RESPONSE + && mh->match.matchtag != FLUX_MATCHTAG_NONE) { + if (fastpath_response_register (d, mh) < 0) { saved_errno = errno; goto error; } } else { - if (zlist_append (d->handlers_new, w) < 0) + if (zlist_append (d->handlers_new, mh) < 0) goto nomem; } dispatch_usecount_incr (d); - return w; + return mh; nomem: saved_errno = ENOMEM; error: - free_msg_handler (w); + free_msg_handler (mh); errno = saved_errno; return NULL; } diff --git a/src/common/libflux/msg_handler.h b/src/common/libflux/msg_handler.h index 82f68058c63c..fc95e9e8fa39 100644 --- a/src/common/libflux/msg_handler.h +++ b/src/common/libflux/msg_handler.h @@ -10,24 +10,26 @@ extern "C" { typedef struct flux_msg_handler flux_msg_handler_t; -typedef void (*flux_msg_handler_f)(flux_t *h, flux_msg_handler_t *w, +typedef void (*flux_msg_handler_f)(flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg); flux_msg_handler_t *flux_msg_handler_create (flux_t *h, const struct flux_match match, flux_msg_handler_f cb, void *arg); -void flux_msg_handler_destroy (flux_msg_handler_t *w); +void flux_msg_handler_destroy (flux_msg_handler_t *mh); -void flux_msg_handler_start (flux_msg_handler_t *w); -void flux_msg_handler_stop (flux_msg_handler_t *w); +void flux_msg_handler_start (flux_msg_handler_t *mh); +void flux_msg_handler_stop (flux_msg_handler_t *mh); /* By default, only messages from FLUX_ROLE_OWNER are delivered to handler. * Use _allow_rolemask() add roles, _deny_rolemask() to remove them. * (N.B. FLUX_ROLE_OWNER cannot be denied) */ -void flux_msg_handler_allow_rolemask (flux_msg_handler_t *w, uint32_t rolemask); -void flux_msg_handler_deny_rolemask (flux_msg_handler_t *w, uint32_t rolemask); +void flux_msg_handler_allow_rolemask (flux_msg_handler_t *mh, + uint32_t rolemask); +void flux_msg_handler_deny_rolemask (flux_msg_handler_t *mh, + uint32_t rolemask); struct flux_msg_handler_spec { int typemask; diff --git a/src/common/libflux/rpc.c b/src/common/libflux/rpc.c index f2c323cd5738..dc50b2fd1c8e 100644 --- a/src/common/libflux/rpc.c +++ b/src/common/libflux/rpc.c @@ -145,7 +145,7 @@ int flux_rpc_get_unpack (flux_future_t *f, const char *fmt, ...) * instead of flux_rpc_get() to test result of RPC with no response payload. * Fulfill future. */ -static void response_cb (flux_t *h, flux_msg_handler_t *w, +static void response_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { flux_future_t *f = arg; @@ -153,7 +153,7 @@ static void response_cb (flux_t *h, flux_msg_handler_t *w, flux_msg_t *cpy; int saved_errno; - flux_msg_handler_stop (w); + flux_msg_handler_stop (mh); rpc->inflight = false; #if HAVE_CALIPER cali_begin_string_byname ("flux.message.rpc", "single"); @@ -178,20 +178,20 @@ static void response_cb (flux_t *h, flux_msg_handler_t *w, static void initialize_cb (flux_future_t *f, void *arg) { struct flux_rpc *rpc = flux_future_aux_get (f, "flux::rpc"); - flux_msg_handler_t *w; + flux_msg_handler_t *mh; flux_t *h = flux_future_get_flux (f); struct flux_match m = FLUX_MATCH_RESPONSE; m.matchtag = rpc->matchtag; - if (!(w = flux_msg_handler_create (h, m, response_cb, f))) + if (!(mh = flux_msg_handler_create (h, m, response_cb, f))) goto error; - flux_msg_handler_allow_rolemask (w, FLUX_ROLE_ALL); - if (flux_future_aux_set (f, NULL, w, + flux_msg_handler_allow_rolemask (mh, FLUX_ROLE_ALL); + if (flux_future_aux_set (f, NULL, mh, (flux_free_f)flux_msg_handler_destroy) < 0) { - flux_msg_handler_destroy (w); + flux_msg_handler_destroy (mh); goto error; } - flux_msg_handler_start (w); + flux_msg_handler_start (mh); return; error: flux_future_fulfill_error (f, errno); diff --git a/src/common/libjsc/jstatctl.c b/src/common/libjsc/jstatctl.c index 2bcaaf5cc761..ccc622b965ac 100644 --- a/src/common/libjsc/jstatctl.c +++ b/src/common/libjsc/jstatctl.c @@ -1077,7 +1077,7 @@ static bool job_is_finished (const char *state) return false; } -static void job_state_cb (flux_t *h, flux_msg_handler_t *w, +static void job_state_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { int64_t jobid = -1; diff --git a/src/common/libkvs/kvs_watch.c b/src/common/libkvs/kvs_watch.c index 5ad32f6de98a..23320b9c9da9 100644 --- a/src/common/libkvs/kvs_watch.c +++ b/src/common/libkvs/kvs_watch.c @@ -48,10 +48,10 @@ typedef struct { typedef struct { zhash_t *watchers; /* kvs_watch_t hashed by stringified matchtag */ - flux_msg_handler_t *w; + flux_msg_handler_t *mh; } kvs_watch_ctx_t; -static void watch_response_cb (flux_t *h, flux_msg_handler_t *w, +static void watch_response_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg); static int decode_val_object (json_t *val, char **json_str); @@ -59,7 +59,7 @@ static void freectx (kvs_watch_ctx_t *ctx) { if (ctx) { zhash_destroy (&ctx->watchers); - flux_msg_handler_destroy (ctx->w); + flux_msg_handler_destroy (ctx->mh); free (ctx); } } @@ -76,8 +76,8 @@ static kvs_watch_ctx_t *getctx (flux_t *h, bool create) if (!(ctx->watchers = zhash_new ())) goto nomem; match.topic_glob = "kvs.watch"; - if (!(ctx->w = flux_msg_handler_create (h, match, watch_response_cb, - ctx))) + if (!(ctx->mh = flux_msg_handler_create (h, match, + watch_response_cb, ctx))) goto nomem; flux_aux_set (h, auxkey, ctx, (flux_free_f)freectx); } @@ -130,7 +130,7 @@ static kvs_watcher_t *add_watcher (flux_t *h, const char *key, zhash_freefn (ctx->watchers, k, (zhash_free_fn *)destroy_watcher); if (lastcount == 0) - flux_msg_handler_start (ctx->w); + flux_msg_handler_start (ctx->mh); return wp; } @@ -168,7 +168,7 @@ int flux_kvs_unwatch (flux_t *h, const char *key) } zlist_destroy (&hashkeys); if (zhash_size (ctx->watchers) == 0) - flux_msg_handler_stop (ctx->w); + flux_msg_handler_stop (ctx->mh); } rc = 0; done: @@ -204,7 +204,7 @@ static int dispatch_watch (flux_t *h, kvs_watcher_t *wp, const char *json_str) return rc; } -static void watch_response_cb (flux_t *h, flux_msg_handler_t *w, +static void watch_response_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { char *json_str = NULL; diff --git a/src/modules/aggregator/aggregator.c b/src/modules/aggregator/aggregator.c index be1bd689eaa6..e8f0ec4b8ce3 100644 --- a/src/modules/aggregator/aggregator.c +++ b/src/modules/aggregator/aggregator.c @@ -466,7 +466,7 @@ aggregator_new_aggregate (struct aggregator *ctx, const char *key, /* * Callback for "aggregator.push" */ -static void push_cb (flux_t *h, flux_msg_handler_t *w, +static void push_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { int rc = -1; diff --git a/src/modules/barrier/barrier.c b/src/modules/barrier/barrier.c index bc89a7520e8c..56e64feea471 100644 --- a/src/modules/barrier/barrier.c +++ b/src/modules/barrier/barrier.c @@ -188,7 +188,7 @@ static void send_enter_request (barrier_ctx_t *ctx, barrier_t *b) * notification upon barrier termination. */ -static void enter_request_cb (flux_t *h, flux_msg_handler_t *w, +static void enter_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { barrier_ctx_t *ctx = arg; @@ -246,7 +246,7 @@ static void enter_request_cb (flux_t *h, flux_msg_handler_t *w, * participating in. */ -static void disconnect_request_cb (flux_t *h, flux_msg_handler_t *w, +static void disconnect_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { barrier_ctx_t *ctx = arg; @@ -282,7 +282,7 @@ static int exit_event_send (flux_t *h, const char *name, int errnum) return rc; } -static void exit_event_cb (flux_t *h, flux_msg_handler_t *w, +static void exit_event_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { barrier_ctx_t *ctx = arg; diff --git a/src/modules/connector-local/local.c b/src/modules/connector-local/local.c index ad2de8fa1bcf..2e371e118dfe 100644 --- a/src/modules/connector-local/local.c +++ b/src/modules/connector-local/local.c @@ -779,7 +779,7 @@ static bool allowed_message (client_t *c, const flux_msg_t *msg) * Look up the sender uuid in clients hash and deliver. * Responses for disconnected clients are silently discarded. */ -static void response_cb (flux_t *h, flux_msg_handler_t *w, +static void response_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { mod_local_ctx_t *ctx = arg; @@ -827,7 +827,7 @@ static void response_cb (flux_t *h, flux_msg_handler_t *w, /* Received an event message from broker. * Find all subscribers and deliver. */ -static void event_cb (flux_t *h, flux_msg_handler_t *w, +static void event_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { mod_local_ctx_t *ctx = arg; @@ -861,10 +861,10 @@ static void event_cb (flux_t *h, flux_msg_handler_t *w, /* Accept a connection from new client. */ -static void listener_cb (flux_reactor_t *r, flux_watcher_t *w, +static void listener_cb (flux_reactor_t *r, flux_watcher_t *mh, int revents, void *arg) { - int fd = flux_fd_watcher_get_fd (w); + int fd = flux_fd_watcher_get_fd (mh); mod_local_ctx_t *ctx = arg; flux_t *h = ctx->h; diff --git a/src/modules/content-sqlite/content-sqlite.c b/src/modules/content-sqlite/content-sqlite.c index 0bb8e241f98f..649bb8e31853 100644 --- a/src/modules/content-sqlite/content-sqlite.c +++ b/src/modules/content-sqlite/content-sqlite.c @@ -242,7 +242,7 @@ int grow_lzo_buf (sqlite_ctx_t *ctx, size_t size) return 0; } -void load_cb (flux_t *h, flux_msg_handler_t *w, +void load_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { sqlite_ctx_t *ctx = arg; @@ -323,7 +323,7 @@ void load_cb (flux_t *h, flux_msg_handler_t *w, pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state); } -void store_cb (flux_t *h, flux_msg_handler_t *w, +void store_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { sqlite_ctx_t *ctx = arg; @@ -419,7 +419,7 @@ int register_backing_store (flux_t *h, bool value, const char *name) /* Intercept broker shutdown event. If broker is shutting down, * avoid transferring data back to the content cache at unload time. */ -void broker_shutdown_cb (flux_t *h, flux_msg_handler_t *w, +void broker_shutdown_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { sqlite_ctx_t *ctx = arg; @@ -431,7 +431,7 @@ void broker_shutdown_cb (flux_t *h, flux_msg_handler_t *w, * Tell content cache to disable backing store, * then write everything back to it before exiting. */ -void shutdown_cb (flux_t *h, flux_msg_handler_t *w, +void shutdown_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { sqlite_ctx_t *ctx = arg; diff --git a/src/modules/kvs/kvs.c b/src/modules/kvs/kvs.c index 0bbd3a7ee551..e3ed15c54de2 100644 --- a/src/modules/kvs/kvs.c +++ b/src/modules/kvs/kvs.c @@ -589,7 +589,7 @@ static void commit_check_cb (flux_reactor_t *r, flux_watcher_t *w, } } -static void dropcache_request_cb (flux_t *h, flux_msg_handler_t *w, +static void dropcache_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { kvs_ctx_t *ctx = arg; @@ -612,7 +612,7 @@ static void dropcache_request_cb (flux_t *h, flux_msg_handler_t *w, flux_log_error (h, "%s: flux_respond", __FUNCTION__); } -static void dropcache_event_cb (flux_t *h, flux_msg_handler_t *w, +static void dropcache_event_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { kvs_ctx_t *ctx = arg; @@ -630,7 +630,7 @@ static void dropcache_event_cb (flux_t *h, flux_msg_handler_t *w, expcount, size); } -static void heartbeat_cb (flux_t *h, flux_msg_handler_t *w, +static void heartbeat_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { kvs_ctx_t *ctx = arg; @@ -670,7 +670,7 @@ static int lookup_load_cb (lookup_t *lh, const char *ref, void *data) return 0; } -static void get_request_cb (flux_t *h, flux_msg_handler_t *w, +static void get_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { kvs_ctx_t *ctx = NULL; @@ -745,7 +745,7 @@ static void get_request_cb (flux_t *h, flux_msg_handler_t *w, if (!lookup (lh)) { struct kvs_cb_data cbd; - if (!(wait = wait_create_msg_handler (h, w, msg, get_request_cb, lh))) + if (!(wait = wait_create_msg_handler (h, mh, msg, get_request_cb, lh))) goto done; cbd.ctx = ctx; @@ -805,7 +805,7 @@ static void get_request_cb (flux_t *h, flux_msg_handler_t *w, json_decref (val); } -static void watch_request_cb (flux_t *h, flux_msg_handler_t *w, +static void watch_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { kvs_ctx_t *ctx = NULL; @@ -869,7 +869,8 @@ static void watch_request_cb (flux_t *h, flux_msg_handler_t *w, if (!lookup (lh)) { struct kvs_cb_data cbd; - if (!(wait = wait_create_msg_handler (h, w, msg, watch_request_cb, lh))) + if (!(wait = wait_create_msg_handler (h, mh, msg, + watch_request_cb, lh))) goto done; cbd.ctx = ctx; @@ -937,7 +938,7 @@ static void watch_request_cb (flux_t *h, flux_msg_handler_t *w, flux_log_error (h, "%s: flux_msg_pack", __FUNCTION__); goto done; } - if (!(watcher = wait_create_msg_handler (h, w, cpy, + if (!(watcher = wait_create_msg_handler (h, mh, cpy, watch_request_cb, ctx))) goto done; if (wait_addqueue (ctx->watchlist, watcher) < 0) { @@ -1004,7 +1005,7 @@ static bool unwatch_cmp (const flux_msg_t *msg, void *arg) return match; } -static void unwatch_request_cb (flux_t *h, flux_msg_handler_t *w, +static void unwatch_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { kvs_ctx_t *ctx = arg; @@ -1089,7 +1090,7 @@ static void finalize_fences_bynames (kvs_ctx_t *ctx, json_t *names, int errnum) /* kvs.relayfence (rank 0 only, no response). */ -static void relayfence_request_cb (flux_t *h, flux_msg_handler_t *w, +static void relayfence_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { kvs_ctx_t *ctx = arg; @@ -1139,7 +1140,7 @@ static void relayfence_request_cb (flux_t *h, flux_msg_handler_t *w, /* kvs.fence * Sent from users to local kvs module. */ -static void fence_request_cb (flux_t *h, flux_msg_handler_t *w, +static void fence_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { kvs_ctx_t *ctx = arg; @@ -1210,7 +1211,7 @@ static void fence_request_cb (flux_t *h, flux_msg_handler_t *w, /* For wait_version(). */ -static void sync_request_cb (flux_t *h, flux_msg_handler_t *w, +static void sync_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { kvs_ctx_t *ctx = arg; @@ -1223,7 +1224,8 @@ static void sync_request_cb (flux_t *h, flux_msg_handler_t *w, goto error; } if (ctx->root.seq < rootseq) { - if (!(wait = wait_create_msg_handler (h, w, msg, sync_request_cb, arg))) + if (!(wait = wait_create_msg_handler (h, mh, msg, + sync_request_cb, arg))) goto error; if (wait_addqueue (ctx->watchlist, wait) < 0) { saved_errno = errno; @@ -1247,7 +1249,7 @@ static void sync_request_cb (flux_t *h, flux_msg_handler_t *w, flux_log_error (h, "%s: flux_respond", __FUNCTION__); } -static void getroot_request_cb (flux_t *h, flux_msg_handler_t *w, +static void getroot_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { kvs_ctx_t *ctx = arg; @@ -1297,7 +1299,7 @@ static int getroot_rpc (kvs_ctx_t *ctx, int *rootseq, blobref_t rootref) return rc; } -static void error_event_cb (flux_t *h, flux_msg_handler_t *w, +static void error_event_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { kvs_ctx_t *ctx = arg; @@ -1383,7 +1385,7 @@ static void prime_cache_with_rootdir (kvs_ctx_t *ctx, json_t *rootdir) /* Alter the (rootref, rootseq) in response to a setroot event. */ -static void setroot_event_cb (flux_t *h, flux_msg_handler_t *w, +static void setroot_event_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { kvs_ctx_t *ctx = arg; @@ -1475,7 +1477,7 @@ static bool disconnect_cmp (const flux_msg_t *msg, void *arg) return match; } -static void disconnect_request_cb (flux_t *h, flux_msg_handler_t *w, +static void disconnect_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { kvs_ctx_t *ctx = arg; @@ -1499,7 +1501,7 @@ static void disconnect_request_cb (flux_t *h, flux_msg_handler_t *w, free (sender); } -static void stats_get_cb (flux_t *h, flux_msg_handler_t *w, +static void stats_get_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { kvs_ctx_t *ctx = arg; @@ -1555,14 +1557,14 @@ static void stats_clear (kvs_ctx_t *ctx) commit_mgr_clear_noop_stores (ctx->cm); } -static void stats_clear_event_cb (flux_t *h, flux_msg_handler_t *w, +static void stats_clear_event_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { kvs_ctx_t *ctx = arg; stats_clear (ctx); } -static void stats_clear_request_cb (flux_t *h, flux_msg_handler_t *w, +static void stats_clear_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { kvs_ctx_t *ctx = arg; diff --git a/src/modules/kvs/test/waitqueue.c b/src/modules/kvs/test/waitqueue.c index 7a12098684b8..0e404f94e413 100644 --- a/src/modules/kvs/test/waitqueue.c +++ b/src/modules/kvs/test/waitqueue.c @@ -8,7 +8,8 @@ void wait_cb (void *arg) (*count)++; } -void msghand (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) +void msghand (flux_t *h, flux_msg_handler_t *mh, + const flux_msg_t *msg, void *arg) { int *count = arg; (*count)++; diff --git a/src/modules/kvs/waitqueue.c b/src/modules/kvs/waitqueue.c index 8cba48a61910..686aebddec14 100644 --- a/src/modules/kvs/waitqueue.c +++ b/src/modules/kvs/waitqueue.c @@ -33,7 +33,7 @@ struct handler { flux_msg_handler_f cb; flux_t *h; - flux_msg_handler_t *w; + flux_msg_handler_t *mh; flux_msg_t *msg; void *arg; }; @@ -71,7 +71,7 @@ wait_t *wait_create (wait_cb_f cb, void *arg) return w; } -wait_t *wait_create_msg_handler (flux_t *h, flux_msg_handler_t *wh, +wait_t *wait_create_msg_handler (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, flux_msg_handler_f cb, void *arg) { @@ -80,7 +80,7 @@ wait_t *wait_create_msg_handler (flux_t *h, flux_msg_handler_t *wh, w->hand.cb = cb; w->hand.arg = arg; w->hand.h = h; - w->hand.w = wh; + w->hand.mh = mh; if (msg && !(w->hand.msg = flux_msg_copy (msg, true))) { wait_destroy (w); errno = ENOMEM; @@ -156,7 +156,7 @@ static void wait_runone (wait_t *w) if (w->cb) w->cb (w->cb_arg); else if (w->hand.cb) - w->hand.cb (w->hand.h, w->hand.w, w->hand.msg, w->hand.arg); + w->hand.cb (w->hand.h, w->hand.mh, w->hand.msg, w->hand.arg); wait_destroy (w); } } diff --git a/src/modules/resource-hwloc/resource.c b/src/modules/resource-hwloc/resource.c index 714a8f52fbdc..4b2cc5b8a033 100644 --- a/src/modules/resource-hwloc/resource.c +++ b/src/modules/resource-hwloc/resource.c @@ -434,7 +434,7 @@ static int decode_reload_request (flux_t *h, resource_ctx_t *ctx, } static void reload_request_cb (flux_t *h, - flux_msg_handler_t *watcher, + flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { @@ -450,7 +450,7 @@ static void reload_request_cb (flux_t *h, } static void topo_request_cb (flux_t *h, - flux_msg_handler_t *watcher, + flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { diff --git a/src/modules/userdb/userdb.c b/src/modules/userdb/userdb.c index 3552b4e7b67f..3acaa7fa8158 100644 --- a/src/modules/userdb/userdb.c +++ b/src/modules/userdb/userdb.c @@ -208,7 +208,7 @@ static void user_delete (userdb_ctx_t *ctx, uint32_t userid) zhash_delete (ctx->db, key); } -static void lookup (flux_t *h, flux_msg_handler_t *w, +static void lookup (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { userdb_ctx_t *ctx = arg; @@ -233,7 +233,7 @@ static void lookup (flux_t *h, flux_msg_handler_t *w, flux_log_error (h, "%s", __FUNCTION__); } -static void addrole (flux_t *h, flux_msg_handler_t *w, +static void addrole (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { userdb_ctx_t *ctx = arg; @@ -264,7 +264,7 @@ static void addrole (flux_t *h, flux_msg_handler_t *w, flux_log_error (h, "%s", __FUNCTION__); } -static void delrole (flux_t *h, flux_msg_handler_t *w, +static void delrole (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { userdb_ctx_t *ctx = arg; @@ -300,7 +300,7 @@ static int compare_keys (const char *s1, const char *s2) return 0; } -static void getnext (flux_t *h, flux_msg_handler_t *w, +static void getnext (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { userdb_ctx_t *ctx = arg; @@ -339,7 +339,7 @@ static void getnext (flux_t *h, flux_msg_handler_t *w, free (uuid); } -static void disconnect (flux_t *h, flux_msg_handler_t *w, +static void disconnect (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { userdb_ctx_t *ctx = arg; diff --git a/t/loop/dispatch.c b/t/loop/dispatch.c index 070362ef5ce3..307dfdc3bb4c 100644 --- a/t/loop/dispatch.c +++ b/t/loop/dispatch.c @@ -7,14 +7,14 @@ int cb_called; flux_t *cb_h; -flux_msg_handler_t *cb_w; +flux_msg_handler_t *cb_mh; const flux_msg_t *cb_msg; void *cb_arg; -void cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) +void cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { cb_called++; cb_h = h; - cb_w = w; + cb_mh = mh; cb_msg = msg; cb_arg = arg; } @@ -28,11 +28,11 @@ void cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) */ void test_simple_msg_handler (flux_t *h) { - flux_msg_handler_t *w; + flux_msg_handler_t *mh; flux_msg_t *msg; int rc; - ok ((w = flux_msg_handler_create (h, FLUX_MATCH_EVENT, cb, &w)) != NULL, + ok ((mh = flux_msg_handler_create (h, FLUX_MATCH_EVENT, cb, &mh)) != NULL, "handle created dispatcher on demand"); ok ((msg = flux_event_encode ("test", NULL)) != NULL, "encoded event message"); @@ -46,19 +46,19 @@ void test_simple_msg_handler (flux_t *h) "message handler that was not started did not run"); cb_called = 0; cb_h = NULL; - cb_w = NULL; + cb_mh = NULL; cb_arg = NULL; - flux_msg_handler_start (w); + flux_msg_handler_start (mh); diag ("started message handler"); rc = flux_reactor_run (flux_get_reactor (h), FLUX_REACTOR_NOWAIT); ok (rc >= 0, "flux_reactor_run ran"); ok (cb_called == 1, "message handler was called after being started"); - ok (cb_h == h && cb_w == w && cb_arg == &w && cb_msg != NULL, + ok (cb_h == h && cb_mh == mh && cb_arg == &mh && cb_msg != NULL, "message handler was called with appropriate args"); flux_msg_destroy (msg); - flux_msg_handler_destroy (w); + flux_msg_handler_destroy (mh); diag ("destroyed message and message handler"); } @@ -67,13 +67,13 @@ void test_simple_msg_handler (flux_t *h) void test_fastpath (flux_t *h) { struct flux_match m = FLUX_MATCH_RESPONSE; - flux_msg_handler_t *w; + flux_msg_handler_t *mh; flux_msg_t *msg; int rc; ok ((m.matchtag = flux_matchtag_alloc (h, 0)) != FLUX_MATCHTAG_NONE, "allocated matchtag"); - ok ((w = flux_msg_handler_create (h, m, cb, NULL)) != NULL, + ok ((mh = flux_msg_handler_create (h, m, cb, NULL)) != NULL, "created handler for response"); ok ((msg = flux_response_encode ("foo", 0, NULL)) != NULL, "encoded response message"); @@ -87,7 +87,7 @@ void test_fastpath (flux_t *h) "flux_reactor_run ran"); ok (cb_called == 0, "message handler that was not started did not run"); - flux_msg_handler_start (w); + flux_msg_handler_start (mh); diag ("started message handler"); rc = flux_reactor_run (flux_get_reactor (h), FLUX_REACTOR_NOWAIT); ok (rc >= 0, @@ -111,7 +111,7 @@ void test_fastpath (flux_t *h) flux_matchtag_free (h, m.matchtag); flux_msg_destroy (msg); - flux_msg_handler_destroy (w); + flux_msg_handler_destroy (mh); diag ("freed matchtag, destroyed message and message handler"); } @@ -120,7 +120,7 @@ void test_cloned_dispatch (flux_t *orig) flux_t *h; flux_reactor_t *r; flux_msg_t *msg; - flux_msg_handler_t *w, *w2; + flux_msg_handler_t *mh, *mh2; struct flux_match m = FLUX_MATCH_RESPONSE; struct flux_match m2 = FLUX_MATCH_RESPONSE; int rc; @@ -140,9 +140,9 @@ void test_cloned_dispatch (flux_t *orig) "set reactor in cloned handle"); /* event */ - ok ((w = flux_msg_handler_create (h, FLUX_MATCH_EVENT, cb, NULL)) != NULL, + ok ((mh = flux_msg_handler_create (h, FLUX_MATCH_EVENT, cb, NULL)) != NULL, "handle created dispatcher on demand"); - flux_msg_handler_start (w); + flux_msg_handler_start (mh); ok ((msg = flux_event_encode ("test", NULL)) != NULL, "encoded event message"); ok (flux_send (h, msg, 0) == 0, @@ -154,9 +154,9 @@ void test_cloned_dispatch (flux_t *orig) m.matchtag = flux_matchtag_alloc (h, 0); ok (m.matchtag != FLUX_MATCHTAG_NONE, "allocated matchtag (%d)", m.matchtag); // 1 - ok ((w2 = flux_msg_handler_create (h, m, cb, NULL)) != NULL, + ok ((mh2 = flux_msg_handler_create (h, m, cb, NULL)) != NULL, "created handler for response"); - flux_msg_handler_start (w2); + flux_msg_handler_start (mh2); ok ((msg = flux_response_encode ("foo", 0, NULL)) != NULL, "encoded response message"); ok (flux_msg_set_matchtag (msg, m.matchtag) == 0, @@ -234,8 +234,8 @@ void test_cloned_dispatch (flux_t *orig) "there are no more messages"); /* close the clone */ - flux_msg_handler_destroy (w); - flux_msg_handler_destroy (w2); + flux_msg_handler_destroy (mh); + flux_msg_handler_destroy (mh2); flux_matchtag_free (h, m.matchtag); flux_matchtag_free (h, m2.matchtag); flux_close (h); diff --git a/t/loop/reactor.c b/t/loop/reactor.c index 29d33392b448..180373004c80 100644 --- a/t/loop/reactor.c +++ b/t/loop/reactor.c @@ -24,29 +24,29 @@ static int send_request (flux_t *h, const char *topic) } static int multmatch_count = 0; -static void multmatch1 (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, - void *arg) +static void multmatch1 (flux_t *h, flux_msg_handler_t *mh, + const flux_msg_t *msg, void *arg) { const char *topic; if (flux_msg_get_topic (msg, &topic) < 0 || strcmp (topic, "foo.baz")) flux_reactor_stop_error (flux_get_reactor (h)); - flux_msg_handler_stop (w); + flux_msg_handler_stop (mh); multmatch_count++; } -static void multmatch2 (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, - void *arg) +static void multmatch2 (flux_t *h, flux_msg_handler_t *mh, + const flux_msg_t *msg, void *arg) { const char *topic; if (flux_msg_get_topic (msg, &topic) < 0 || strcmp (topic, "foo.bar")) flux_reactor_stop_error (flux_get_reactor (h)); - flux_msg_handler_stop (w); + flux_msg_handler_stop (mh); multmatch_count++; } static void test_multmatch (flux_t *h) { - flux_msg_handler_t *w1, *w2; + flux_msg_handler_t *mh1, *mh2; struct flux_match m1 = FLUX_MATCH_ANY; struct flux_match m2 = FLUX_MATCH_ANY; @@ -56,41 +56,41 @@ static void test_multmatch (flux_t *h) /* test #1: verify multiple match behaves as documented, that is, * a message is matched (only) by the most recently added watcher */ - ok ((w1 = flux_msg_handler_create (h, m1, multmatch1, NULL)) != NULL, + ok ((mh1 = flux_msg_handler_create (h, m1, multmatch1, NULL)) != NULL, "multmatch: first added handler for foo.*"); - ok ((w2 = flux_msg_handler_create (h, m2, multmatch2, NULL)) != NULL, + ok ((mh2 = flux_msg_handler_create (h, m2, multmatch2, NULL)) != NULL, "multmatch: next added handler for foo.bar"); - flux_msg_handler_start (w1); - flux_msg_handler_start (w2); + flux_msg_handler_start (mh1); + flux_msg_handler_start (mh2); ok (send_request (h, "foo.bar") == 0, "multmatch: send foo.bar msg"); ok (send_request (h, "foo.baz") == 0, "multmatch: send foo.baz msg"); ok (flux_reactor_run (flux_get_reactor (h), 0) == 0 && multmatch_count == 2, - "multmatch: last added watcher handled foo.bar"); - flux_msg_handler_destroy (w1); - flux_msg_handler_destroy (w2); + "multmatch: last added handler handled foo.bar"); + flux_msg_handler_destroy (mh1); + flux_msg_handler_destroy (mh2); } static int msgwatcher_count = 100; -static void msgreader (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, - void *arg) +static void msgreader (flux_t *h, flux_msg_handler_t *mh, + const flux_msg_t *msg, void *arg) { static int count = 0; count++; if (count == msgwatcher_count) - flux_msg_handler_stop (w); + flux_msg_handler_stop (mh); } static void test_msg (flux_t *h) { - flux_msg_handler_t *w; + flux_msg_handler_t *mh; int i; - ok ((w = flux_msg_handler_create (h, FLUX_MATCH_ANY, msgreader, NULL)) - != NULL, + mh = flux_msg_handler_create (h, FLUX_MATCH_ANY, msgreader, NULL); + ok (mh != NULL, "msg: created handler for any message"); - flux_msg_handler_start (w); + flux_msg_handler_start (mh); for (i = 0; i < msgwatcher_count; i++) { if (send_request (h, "foo") < 0) break; @@ -99,11 +99,11 @@ static void test_msg (flux_t *h) "msg: sent %d requests", i); ok (flux_reactor_run (flux_get_reactor (h), 0) == 0, "msg: reactor ran to completion after %d requests", msgwatcher_count); - flux_msg_handler_stop (w); - flux_msg_handler_destroy (w); + flux_msg_handler_stop (mh); + flux_msg_handler_destroy (mh); } -static void dummy (flux_t *h, flux_msg_handler_t *w, +static void dummy (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { } @@ -111,13 +111,13 @@ static void dummy (flux_t *h, flux_msg_handler_t *w, static void leak_msg_handler (void) { flux_t *h; - flux_msg_handler_t *w; + flux_msg_handler_t *mh; if (!(h = flux_open ("loop://", 0))) exit (1); - if (!(w = flux_msg_handler_create (h, FLUX_MATCH_ANY, dummy, NULL))) + if (!(mh = flux_msg_handler_create (h, FLUX_MATCH_ANY, dummy, NULL))) exit (1); - flux_msg_handler_start (w); + flux_msg_handler_start (mh); flux_close (h); } diff --git a/t/module/parent.c b/t/module/parent.c index d658c8485a6b..2d153fb3dbee 100644 --- a/t/module/parent.c +++ b/t/module/parent.c @@ -112,7 +112,7 @@ static flux_modlist_t *module_list (void) return mods; } -static void insmod_request_cb (flux_t *h, flux_msg_handler_t *w, +static void insmod_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { const char *json_str; @@ -149,7 +149,7 @@ static void insmod_request_cb (flux_t *h, flux_msg_handler_t *w, free (argz); } -static void rmmod_request_cb (flux_t *h, flux_msg_handler_t *w, +static void rmmod_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { const char *json_str; @@ -182,7 +182,7 @@ static void rmmod_request_cb (flux_t *h, flux_msg_handler_t *w, free (name); } -static void lsmod_request_cb (flux_t *h, flux_msg_handler_t *w, +static void lsmod_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { flux_modlist_t *mods = NULL; diff --git a/t/request/req.c b/t/request/req.c index 65371b546ebf..8736ddbe00c3 100644 --- a/t/request/req.c +++ b/t/request/req.c @@ -61,7 +61,7 @@ static t_req_ctx_t *getctx (flux_t *h) /* Return number of queued clog requests */ -void count_request_cb (flux_t *h, flux_msg_handler_t *w, +void count_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { t_req_ctx_t *ctx = getctx (h); @@ -75,7 +75,7 @@ void count_request_cb (flux_t *h, flux_msg_handler_t *w, /* Don't reply to request - just queue it for later. */ -void clog_request_cb (flux_t *h, flux_msg_handler_t *w, +void clog_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { t_req_ctx_t *ctx = getctx (h); @@ -87,7 +87,7 @@ void clog_request_cb (flux_t *h, flux_msg_handler_t *w, /* Reply to all queued requests. */ -void flush_request_cb (flux_t *h, flux_msg_handler_t *w, +void flush_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { t_req_ctx_t *ctx = getctx (h); @@ -106,7 +106,7 @@ void flush_request_cb (flux_t *h, flux_msg_handler_t *w, /* Accept a json payload, verify it and return error if it doesn't * match expected. */ -void sink_request_cb (flux_t *h, flux_msg_handler_t *w, +void sink_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { const char *json_str; @@ -135,7 +135,7 @@ void sink_request_cb (flux_t *h, flux_msg_handler_t *w, /* Return a fixed json payload */ -void src_request_cb (flux_t *h, flux_msg_handler_t *w, +void src_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { json_object *o = Jnew (); @@ -148,7 +148,7 @@ void src_request_cb (flux_t *h, flux_msg_handler_t *w, /* Return 'n' sequenced responses. */ -void nsrc_request_cb (flux_t *h, flux_msg_handler_t *w, +void nsrc_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { const char *json_str; @@ -186,7 +186,7 @@ void nsrc_request_cb (flux_t *h, flux_msg_handler_t *w, /* Always return an error 42 */ -void err_request_cb (flux_t *h, flux_msg_handler_t *w, +void err_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { if (flux_respond (h, msg, 42, NULL) < 0) @@ -195,7 +195,7 @@ void err_request_cb (flux_t *h, flux_msg_handler_t *w, /* Echo a json payload back to requestor. */ -void echo_request_cb (flux_t *h, flux_msg_handler_t *w, +void echo_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { const char *json_str; @@ -219,7 +219,7 @@ void echo_request_cb (flux_t *h, flux_msg_handler_t *w, /* Proxy ping. */ -void xping_request_cb (flux_t *h, flux_msg_handler_t *w, +void xping_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { t_req_ctx_t *ctx = arg; @@ -277,7 +277,7 @@ void xping_request_cb (flux_t *h, flux_msg_handler_t *w, /* Handle ping response for proxy ping. * Match it with a request and respond to that request. */ -void ping_response_cb (flux_t *h, flux_msg_handler_t *w, +void ping_response_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { t_req_ctx_t *ctx = arg; @@ -322,7 +322,7 @@ void ping_response_cb (flux_t *h, flux_msg_handler_t *w, /* Handle the simplest possible request. * Verify that everything is as expected; log it and stop the reactor if not. */ -void null_request_cb (flux_t *h, flux_msg_handler_t *w, +void null_request_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { t_req_ctx_t *ctx = arg; diff --git a/t/rolemask/loop.c b/t/rolemask/loop.c index b7f9316c7bec..d4a237117fc3 100644 --- a/t/rolemask/loop.c +++ b/t/rolemask/loop.c @@ -96,7 +96,7 @@ static void check_rpc_oneway_faked (flux_t *h) } static bool testrpc1_called; -static void testrpc1 (flux_t *h, flux_msg_handler_t *w, +static void testrpc1 (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { diag ("testrpc1 handler invoked"); @@ -120,19 +120,19 @@ flux_msg_handler_t *testrpc1_handler_create (flux_t *h) static void check_rpc_default_policy (flux_t *h) { flux_future_t *f; - flux_msg_handler_t *w; + flux_msg_handler_t *mh; struct creds saved, new, cr; int rc; - ok ((w = testrpc1_handler_create (h)) != NULL, + ok ((mh = testrpc1_handler_create (h)) != NULL, "created message handler with default policy"); - if (w == NULL) + if (mh == NULL) BAIL_OUT ("flux_msg_handler_create: %s", flux_strerror (errno)); /* This should be a no-op since "deny all" can't deny FLUX_ROLE_OWNER, * and the default policy is to require FLUX_ROLE_OWNER. */ - flux_msg_handler_deny_rolemask (w, FLUX_ROLE_ALL); + flux_msg_handler_deny_rolemask (mh, FLUX_ROLE_ALL); /* Attempt with default creds. @@ -176,21 +176,21 @@ static void check_rpc_default_policy (flux_t *h) ok (cred_set (h, &saved) == 0, "restored connector creds"); - flux_msg_handler_destroy (w); + flux_msg_handler_destroy (mh); } static void check_rpc_open_policy (flux_t *h) { flux_future_t *f; - flux_msg_handler_t *w; + flux_msg_handler_t *mh; struct creds saved, new, cr; int rc; - ok ((w = testrpc1_handler_create (h)) != NULL, + ok ((mh = testrpc1_handler_create (h)) != NULL, "created message handler with open policy"); - if (w == NULL) + if (mh == NULL) BAIL_OUT ("flux_msg_handler_create: %s", flux_strerror (errno)); - flux_msg_handler_allow_rolemask (w, FLUX_ROLE_ALL); + flux_msg_handler_allow_rolemask (mh, FLUX_ROLE_ALL); /* Attempt with default creds. */ @@ -230,23 +230,23 @@ static void check_rpc_open_policy (flux_t *h) ok (cred_set (h, &saved) == 0, "restored connector creds"); - flux_msg_handler_destroy (w); + flux_msg_handler_destroy (mh); } static void check_rpc_targetted_policy (flux_t *h) { flux_future_t *f; - flux_msg_handler_t *w; + flux_msg_handler_t *mh; struct creds saved, new, cr; uint32_t allow = 0x1000; int rc; - ok ((w = testrpc1_handler_create (h)) != NULL, + ok ((mh = testrpc1_handler_create (h)) != NULL, "created message handler with targetted policy"); - if (w == NULL) + if (mh == NULL) BAIL_OUT ("flux_msg_handler_create: %s", flux_strerror (errno)); - flux_msg_handler_deny_rolemask (w, FLUX_ROLE_ALL); - flux_msg_handler_allow_rolemask (w, allow); + flux_msg_handler_deny_rolemask (mh, FLUX_ROLE_ALL); + flux_msg_handler_allow_rolemask (mh, allow); ok (cred_get (h, &saved) == 0 && saved.userid == geteuid() && saved.rolemask == FLUX_ROLE_OWNER, @@ -308,7 +308,7 @@ static void check_rpc_targetted_policy (flux_t *h) ok (cred_set (h, &saved) == 0, "restored connector creds"); - flux_msg_handler_destroy (w); + flux_msg_handler_destroy (mh); } static void fatal_err (const char *message, void *arg) diff --git a/t/rpc/mrpc.c b/t/rpc/mrpc.c index a04b42127274..4bf9f7bbdc5a 100644 --- a/t/rpc/mrpc.c +++ b/t/rpc/mrpc.c @@ -13,7 +13,7 @@ static uint32_t fake_rank = 0; /* request nodeid and flags returned in response */ static int nodeid_fake_error = -1; -void rpctest_nodeid_cb (flux_t *h, flux_msg_handler_t *w, +void rpctest_nodeid_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { int errnum = 0; @@ -39,7 +39,7 @@ void rpctest_nodeid_cb (flux_t *h, flux_msg_handler_t *w, Jput (o); } -void rpcftest_nodeid_cb (flux_t *h, flux_msg_handler_t *w, +void rpcftest_nodeid_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { int errnum = 0; @@ -63,7 +63,7 @@ void rpcftest_nodeid_cb (flux_t *h, flux_msg_handler_t *w, } /* request payload echoed in response */ -void rpctest_echo_cb (flux_t *h, flux_msg_handler_t *w, +void rpctest_echo_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { int errnum = 0; @@ -83,7 +83,7 @@ void rpctest_echo_cb (flux_t *h, flux_msg_handler_t *w, /* no-payload response */ static int hello_count = 0; -void rpctest_hello_cb (flux_t *h, flux_msg_handler_t *w, +void rpctest_hello_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { int errnum = 0; @@ -102,7 +102,7 @@ void rpctest_hello_cb (flux_t *h, flux_msg_handler_t *w, (void)flux_respond (h, msg, errnum, NULL); } -void rpcftest_hello_cb (flux_t *h, flux_msg_handler_t *w, +void rpcftest_hello_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { int errnum = 0; diff --git a/t/rpc/rpc.c b/t/rpc/rpc.c index 286f8af13bc2..d74cc5f86626 100644 --- a/t/rpc/rpc.c +++ b/t/rpc/rpc.c @@ -6,7 +6,7 @@ #include "util.h" /* increment integer and send it back */ -void rpctest_incr_cb (flux_t *h, flux_msg_handler_t *w, +void rpctest_incr_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { int i; @@ -18,7 +18,7 @@ void rpctest_incr_cb (flux_t *h, flux_msg_handler_t *w, } /* request nodeid and flags returned in response */ -void rpctest_nodeid_cb (flux_t *h, flux_msg_handler_t *w, +void rpctest_nodeid_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { int errnum = 0; @@ -40,7 +40,7 @@ void rpctest_nodeid_cb (flux_t *h, flux_msg_handler_t *w, } /* request payload echoed in response */ -void rpctest_echo_cb (flux_t *h, flux_msg_handler_t *w, +void rpctest_echo_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { int errnum = 0; @@ -59,7 +59,7 @@ void rpctest_echo_cb (flux_t *h, flux_msg_handler_t *w, } /* raw request payload echoed in response */ -void rpctest_rawecho_cb (flux_t *h, flux_msg_handler_t *w, +void rpctest_rawecho_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { int errnum = 0; @@ -75,7 +75,7 @@ void rpctest_rawecho_cb (flux_t *h, flux_msg_handler_t *w, } /* no-payload response */ -void rpctest_hello_cb (flux_t *h, flux_msg_handler_t *w, +void rpctest_hello_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { int errnum = 0; @@ -94,7 +94,7 @@ void rpctest_hello_cb (flux_t *h, flux_msg_handler_t *w, diag ("%s: flux_respond: %s", __FUNCTION__, flux_strerror (errno)); } -void rpcftest_hello_cb (flux_t *h, flux_msg_handler_t *w, +void rpcftest_hello_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { int errnum = 0; diff --git a/t/rpc/util.c b/t/rpc/util.c index bebdf9fd9ec0..ecc042ba54c5 100644 --- a/t/rpc/util.c +++ b/t/rpc/util.c @@ -13,7 +13,7 @@ struct test_server { flux_t *c; flux_t *s; - flux_msg_handler_t *w; + flux_msg_handler_t *mh; test_server_f cb; void *arg; pthread_t thread; @@ -21,7 +21,7 @@ struct test_server { zuuid_t *uuid; }; -void shutdown_cb (flux_t *h, flux_msg_handler_t *w, +void shutdown_cb (flux_t *h, flux_msg_handler_t *mh, const flux_msg_t *msg, void *arg) { flux_reactor_stop (flux_get_reactor (h)); @@ -71,7 +71,7 @@ int test_server_stop (flux_t *c) static void test_server_destroy (struct test_server *a) { if (a) { - flux_msg_handler_destroy (a->w); + flux_msg_handler_destroy (a->mh); flux_close (a->s); zuuid_destroy (&a->uuid); free (a); @@ -123,12 +123,12 @@ flux_t *test_server_create (test_server_f cb, void *arg) */ struct flux_match match = FLUX_MATCH_REQUEST; match.topic_glob = "shutdown"; - if (!(a->w = flux_msg_handler_create (a->s, match, shutdown_cb, a))) { + if (!(a->mh = flux_msg_handler_create (a->s, match, shutdown_cb, a))) { diag ("%s: flux_msg_handler_create: %s\n", __FUNCTION__, flux_strerror (errno)); goto error; } - flux_msg_handler_start (a->w); + flux_msg_handler_start (a->mh); /* Start server thread */