diff --git a/src/broker/Makefile.am b/src/broker/Makefile.am index 2f5b75aed531..ee22ff9b3aab 100644 --- a/src/broker/Makefile.am +++ b/src/broker/Makefile.am @@ -61,7 +61,8 @@ flux_broker_LDFLAGS = ${broker_ldflags} TESTS = test_shutdown.t \ test_heartbeat.t \ test_hello.t \ - test_attr.t + test_attr.t \ + test_service.t test_ldadd = \ $(top_builddir)/src/common/libflux-core.la \ @@ -94,3 +95,7 @@ test_hello_t_LDADD = $(test_ldadd) test_attr_t_SOURCES = test/attr.c attr.c test_attr_t_CPPFLAGS = $(test_cppflags) test_attr_t_LDADD = $(test_ldadd) + +test_service_t_SOURCES = test/service.c service.c +test_service_t_CPPFLAGS = $(test_cppflags) +test_service_t_LDADD = $(test_ldadd) diff --git a/src/broker/broker.c b/src/broker/broker.c index ec97da617fff..220f5fe44fe8 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -130,7 +130,7 @@ typedef struct { int event_recv_seq; int event_send_seq; bool event_active; /* primary event source is active */ - svchash_t *services; + struct service_switch *services; heartbeat_t *heartbeat; shutdown_t *shutdown; double shutdown_grace; @@ -264,7 +264,8 @@ int main (int argc, char *argv[]) ctx.rank = FLUX_NODEID_ANY; ctx.modhash = modhash_create (); - ctx.services = svchash_create (); + if (!(ctx.services = service_switch_create ())) + log_err_exit ("service_switch_create"); ctx.overlay = overlay_create (); ctx.hello = hello_create (); ctx.tbon.k = 2; /* binary TBON is default */ @@ -702,7 +703,7 @@ int main (int argc, char *argv[]) flux_sec_destroy (ctx.sec); overlay_destroy (ctx.overlay); heartbeat_destroy (ctx.heartbeat); - svchash_destroy (ctx.services); + service_switch_destroy (ctx.services); hello_destroy (ctx.hello); attr_destroy (ctx.attrs); flux_close (ctx.h); @@ -1417,10 +1418,13 @@ static int load_module_bypath (broker_ctx_t *ctx, const char *path, } if (!(p = module_add (ctx->modhash, path))) goto error; - if (!svc_add (ctx->services, module_get_name (p), - module_get_service (p), mod_svc_cb, p)) { - errno = EEXIST; - goto error; + if (service_add (ctx->services, module_get_name (p), + module_get_uuid (p), mod_svc_cb, p) < 0) + goto module_remove; + if (module_get_service (p)) { + if (service_add (ctx->services, module_get_service (p), + module_get_uuid (p), mod_svc_cb, p) < 0) + goto service_remove; } arg = argz_next (argz, argz_len, NULL); while (arg) { @@ -1430,15 +1434,17 @@ static int load_module_bypath (broker_ctx_t *ctx, const char *path, module_set_poller_cb (p, module_cb, ctx); module_set_status_cb (p, module_status_cb, ctx); if (request && module_push_insmod (p, request) < 0) // response deferred - goto error; + goto service_remove; if (module_start (p) < 0) - goto error; + goto service_remove; flux_log (ctx->h, LOG_DEBUG, "insmod %s", name); free (name); return 0; +service_remove: + service_remove_byuuid (ctx->services, module_get_uuid (p)); +module_remove: + module_remove (ctx->modhash, p); error: - if (p) - module_remove (ctx->modhash, p); free (name); return -1; } @@ -1490,7 +1496,7 @@ static int unload_module_byname (broker_ctx_t *ctx, const char *name, return -1; } else { assert (request == NULL); - svc_remove (ctx->services, module_get_name (p)); + service_remove_byuuid (ctx->services, module_get_uuid (p)); module_remove (ctx->modhash, p); } flux_log (ctx->h, LOG_DEBUG, "rmmod %s", name); @@ -1756,17 +1762,12 @@ static struct flux_msg_handler_spec handlers[] = { }; struct internal_service { - const char *topic; + const char *name; const char *nodeset; }; static struct internal_service services[] = { - { "cmb.rusage", NULL }, - { "cmb.ping", NULL }, - { "cmb.exec", NULL }, - { "cmb.exec.signal", NULL }, - { "cmb.exec.write", NULL }, - { "cmb.processes", NULL }, + { "cmb", NULL }, // kind of a catch-all, slowly deprecating { "log", NULL }, { "seq", "[0]" }, { "content", NULL }, @@ -1777,25 +1778,20 @@ static struct internal_service services[] = { }; /* Register builtin services (sharing ctx->h and broker thread). - * First loop is for services that are registered in other files. - * Second loop is for services registered here. + * Register message handlers for some cmb services. Others are registered + * in their own initialization functions. */ static void broker_add_services (broker_ctx_t *ctx) { struct internal_service *svc; - for (svc = &services[0]; svc->topic != NULL; svc++) { + for (svc = &services[0]; svc->name != NULL; svc++) { if (!nodeset_member (svc->nodeset, ctx->rank)) continue; - if (!svc_add (ctx->services, svc->topic, NULL, route_to_handle, ctx)) - log_err_exit ("error registering service for %s", svc->topic); + if (service_add (ctx->services, svc->name, NULL, + route_to_handle, ctx) < 0) + log_err_exit ("error registering service for %s", svc->name); } - struct flux_msg_handler_spec *spec; - for (spec = &handlers[0]; spec->topic_glob != NULL; spec++) { - if (!svc_add (ctx->services, spec->topic_glob, NULL, - route_to_handle, ctx)) - log_err_exit ("error registering service for %s", spec->topic_glob); - } if (flux_msg_handler_addvec (ctx->h, handlers, ctx) < 0) log_err_exit ("error registering message handlers"); } @@ -2015,7 +2011,7 @@ static void module_status_cb (module_t *p, int prev_status, void *arg) */ if (status == FLUX_MODSTATE_EXITED) { flux_log (ctx->h, LOG_DEBUG, "module %s exited", name); - svc_remove (ctx->services, module_get_name (p)); + service_remove_byuuid (ctx->services, module_get_uuid (p)); while ((msg = module_pop_rmmod (p))) { if (flux_respond (ctx->h, msg, 0, NULL) < 0) flux_log_error (ctx->h, "flux_respond to rmmod %s", name); @@ -2117,7 +2113,7 @@ static int broker_request_sendmsg (broker_ctx_t *ctx, const flux_msg_t *msg, if (rc < 0) goto error; } else if ((flags & FLUX_MSGFLAG_UPSTREAM) && nodeid != ctx->rank) { - rc = svc_sendmsg (ctx->services, msg); + rc = service_send (ctx->services, msg); if (rc < 0 && errno == ENOSYS) { rc = overlay_sendmsg_parent (ctx->overlay, msg); if (rc < 0 && errno == EHOSTUNREACH) @@ -2126,7 +2122,7 @@ static int broker_request_sendmsg (broker_ctx_t *ctx, const flux_msg_t *msg, if (rc < 0) goto error; } else if (nodeid == FLUX_NODEID_ANY) { - rc = svc_sendmsg (ctx->services, msg); + rc = service_send (ctx->services, msg); if (rc < 0 && errno == ENOSYS) { rc = overlay_sendmsg_parent (ctx->overlay, msg); if (rc < 0 && errno == EHOSTUNREACH) @@ -2135,7 +2131,7 @@ static int broker_request_sendmsg (broker_ctx_t *ctx, const flux_msg_t *msg, if (rc < 0) goto error; } else if (nodeid == ctx->rank) { - rc = svc_sendmsg (ctx->services, msg); + rc = service_send (ctx->services, msg); if (rc < 0) goto error; } else if ((gw = kary_child_route (ctx->tbon.k, ctx->size, diff --git a/src/broker/service.c b/src/broker/service.c index 7ccf48742e3d..e0274eaf102d 100644 --- a/src/broker/service.c +++ b/src/broker/service.c @@ -28,119 +28,177 @@ #include #include -#include "src/common/libutil/xzmalloc.h" #include "src/common/libutil/log.h" #include "src/common/libutil/oom.h" #include "service.h" -struct svc_struct { - svc_cb_f cb; +struct service { + service_send_f cb; void *cb_arg; - char *alias; + char *uuid; }; -struct svchash_struct { +struct service_switch { zhash_t *services; - zhash_t *aliases; }; -svchash_t *svchash_create (void) +struct service_switch *service_switch_create (void) { - svchash_t *sh = xzmalloc (sizeof *sh); - sh->services = zhash_new (); - sh->aliases = zhash_new (); - if (!sh->services || !sh->aliases) - oom (); - return sh; + struct service_switch *sw = calloc (1, sizeof *sw); + if (!sw) + goto error; + if (!(sw->services = zhash_new ())) { + errno = ENOMEM; + goto error; + } + return sw; +error: + service_switch_destroy (sw); + return NULL; } -void svchash_destroy (svchash_t *sh) +void service_switch_destroy (struct service_switch *sw) { - if (sh) { - zhash_destroy (&sh->services); - zhash_destroy (&sh->aliases); - free (sh); + if (sw) { + zhash_destroy (&sw->services); + free (sw); } } -static void svc_destroy (svc_t *svc) +static void service_destroy (struct service *svc) { if (svc) { - if (svc->alias) - free (svc->alias); + free (svc->uuid); free (svc); } } -static svc_t *svc_create (void) +static struct service *service_create (const char *uuid) { - svc_t *svc = xzmalloc (sizeof (*svc)); + struct service *svc; + + if (!(svc = calloc (1, sizeof (*svc)))) + goto error; + if (uuid) { + if (!(svc->uuid = strdup (uuid))) + goto error; + } return svc; +error: + service_destroy (svc); + return NULL; } -void svc_remove (svchash_t *sh, const char *name) +void service_remove (struct service_switch *sw, const char *name) { - svc_t *svc = zhash_lookup (sh->services, name); - if (svc) { - if (svc->alias) - zhash_delete (sh->aliases, svc->alias); - zhash_delete (sh->services, name); + zhash_delete (sw->services, name); +} + +/* Delete all services registered by 'uuid'. + */ +void service_remove_byuuid (struct service_switch *sw, const char *uuid) +{ + struct service *svc; + zlist_t *trash = NULL; + const char *key; + + svc = zhash_first (sw->services); + while (svc != NULL) { + if (svc->uuid && !strcmp (svc->uuid, uuid)) { + if (!trash) + trash = zlist_new (); + if (!trash) + break; + if (zlist_push (trash, (char *)zhash_cursor (sw->services)) < 0) + break; + } + svc = zhash_next (sw->services); + } + if (trash) { + while ((key = zlist_pop (trash))) + zhash_delete (sw->services, key); + zlist_destroy (&trash); } } -svc_t *svc_add (svchash_t *sh, const char *name, const char *alias, - svc_cb_f cb, void *arg) +int service_add (struct service_switch *sh, const char *name, + const char *uuid, service_send_f cb, void *arg) { - svc_t *svc; - int rc; - if (zhash_lookup (sh->services, name) - || (alias && zhash_lookup (sh->aliases, alias))) { + struct service *svc = NULL; + + if (strchr (name, '.')) { + errno = EINVAL; + goto error; + } + if (zhash_lookup (sh->services, name)) { errno = EEXIST; - return NULL; + goto error; } - svc = svc_create (); + svc = service_create (uuid); svc->cb = cb; svc->cb_arg = arg; - rc = zhash_insert (sh->services, name, svc); - assert (rc == 0); - zhash_freefn (sh->services, name, (zhash_free_fn *)svc_destroy); - if (alias) { - svc->alias = xstrdup (alias); - rc = zhash_insert (sh->aliases, alias, svc); - assert (rc == 0); + if (zhash_insert (sh->services, name, svc) < 0) { + errno = ENOMEM; + goto error; } - return svc; + zhash_freefn (sh->services, name, (zhash_free_fn *)service_destroy); + return 0; +error: + service_destroy (svc); + return -1; } -int svc_sendmsg (svchash_t *sh, const flux_msg_t *msg) +/* Look up a service named 'topic', truncated to 'length' chars. + * Avoid an extra malloc here if the substring is short. + */ +static struct service *service_lookup_subtopic (struct service_switch *sw, + const char *topic, int length) { - const char *topic; - int type; - svc_t *svc; - int rc = -1; + char buf[16]; + char *cpy = NULL; + char *service; + struct service *svc = NULL; - if (flux_msg_get_type (msg, &type) < 0) - goto done; - if (flux_msg_get_topic (msg, &topic) < 0) - goto done; - if (!(svc = zhash_lookup (sh->services, topic))) - svc = zhash_lookup (sh->aliases, topic); - if (!svc && strchr (topic, '.')) { - char *p, *short_topic = xstrdup (topic); - if ((p = strchr (short_topic, '.'))) - *p = '\0'; - if (!(svc = zhash_lookup (sh->services, short_topic))) - svc = zhash_lookup (sh->aliases, short_topic); - free (short_topic); + if (length < sizeof (buf)) + service = buf; + else { + if (!(cpy = malloc (length + 1))) + goto done; + service = cpy; } - if (!svc) { + memcpy (service, topic, length); + service[length] = '\0'; + + if (!(svc = zhash_lookup (sw->services, service))) { errno = ENOSYS; goto done; } - rc = svc->cb (msg, svc->cb_arg); done: - return rc; + free (cpy); + return svc; +} + +/* Look up a service by first "word" of topic string. + * If found, call the service's callback and return its return value. + * If not found, return -1 with errno set (usually ENOSYS). + */ +int service_send (struct service_switch *sw, const flux_msg_t *msg) +{ + const char *topic, *p; + int length; + struct service *svc; + + if (flux_msg_get_topic (msg, &topic) < 0) + return -1; + if ((p = strchr (topic, '.'))) + length = p - topic; + else + length = strlen (topic); + if (!(svc = service_lookup_subtopic (sw, topic, length))) + return -1; + + return svc->cb (msg, svc->cb_arg); } /* diff --git a/src/broker/service.h b/src/broker/service.h index 7c9ff804b66c..2ceeb973b24d 100644 --- a/src/broker/service.h +++ b/src/broker/service.h @@ -1,18 +1,19 @@ #ifndef _BROKER_SERVICE_H #define _BROKER_SERVICE_H -typedef struct svc_struct svc_t; -typedef struct svchash_struct svchash_t; -typedef int (*svc_cb_f)(const flux_msg_t *msg, void *arg); +typedef int (*service_send_f)(const flux_msg_t *msg, void *arg); -svchash_t *svchash_create (void); -void svchash_destroy (svchash_t *sh); +struct service_switch *service_switch_create (void); +void service_switch_destroy (struct service_switch *sw); -svc_t *svc_add (svchash_t *sh, const char *name, const char *alias, - svc_cb_f cb, void *arg); -void svc_remove (svchash_t *sh, const char *name); +int service_add (struct service_switch *sw, const char *name, + const char *uuid, service_send_f cb, void *arg); -int svc_sendmsg (svchash_t *sh, const flux_msg_t *msg); +void service_remove (struct service_switch *sw, const char *name); + +void service_remove_byuuid (struct service_switch *sw, const char *uuid); + +int service_send (struct service_switch *sw, const flux_msg_t *msg); #endif /* !_BROKER_SERVICE_H */ diff --git a/src/broker/test/service.c b/src/broker/test/service.c new file mode 100644 index 000000000000..71761af051e5 --- /dev/null +++ b/src/broker/test/service.c @@ -0,0 +1,140 @@ +#include +#include +#include + +#include + +#include "service.h" + +#include "src/common/libtap/tap.h" + +const flux_msg_t *foo_cb_msg; +void *foo_cb_arg; +int foo_cb_called; +int foo_cb_rc; +int foo_cb_errno; + +static int foo_cb (const flux_msg_t *msg, void *arg) +{ + foo_cb_msg = msg; + foo_cb_arg = arg; + foo_cb_called++; + + if (foo_cb_rc != 0) + errno = foo_cb_errno; + + return foo_cb_rc; +} + + +int main (int argc, char **argv) +{ + struct service_switch *sw; + flux_msg_t *msg, *msg2, *msg3; + + plan (NO_PLAN); + + sw = service_switch_create (); + ok (sw != NULL, + "service_switch_create works"); + + msg = flux_request_encode ("foo", NULL); + if (!msg) + BAIL_OUT ("flux_request_encode: %s", flux_strerror (errno)); + errno = 0; + ok (service_send (sw, msg) < 0 && errno == ENOSYS, + "service_send to 'foo' fails with ENOSYS"); + + ok (service_add (sw, "foo", NULL, foo_cb, NULL) == 0, + "service_add foo works"); + + foo_cb_msg = NULL; + foo_cb_arg = (void *)(uintptr_t)1; + foo_cb_called = 0; + foo_cb_rc = 0; + ok (service_send (sw, msg) == 0, + "service_send to 'foo' works"); + ok (foo_cb_called == 1 && foo_cb_arg == NULL && foo_cb_msg == msg, + "and callback was called with expected arguments"); + + foo_cb_rc = 42; + foo_cb_errno = ENXIO; + errno = 0; + ok (service_send (sw, msg) == 42 && errno == ENXIO, + "service_send returns callback's return code and preserves errno"); + + service_remove (sw, "foo"); + errno = 0; + ok (service_send (sw, msg) < 0 && errno == ENOSYS, + "service_remove works"); + + flux_msg_destroy (msg); + + + msg = flux_request_encode ("bar.baz", NULL); + if (!msg) + BAIL_OUT ("flux_request_encode: %s", flux_strerror (errno)); + ok (service_add (sw, "bar", NULL, foo_cb, NULL) == 0, + "service_add bar works"); + foo_cb_rc = 0; + ok (service_send (sw, msg) == 0, + "service_send to 'bar.baz' works"); + flux_msg_destroy (msg); + + #define SVC_NAME "reallylongservicenamewowthisisimpressive" + #define SVC_ALT1 "alt1" + #define SVC_ALT2 "alt2" + ok (service_add (sw, SVC_NAME, "fakeuuid", foo_cb, NULL) == 0, + "service_add works for long service name"); + ok (service_add (sw, SVC_ALT1, "fakeuuid", foo_cb, NULL) == 0, + "service_add works for alternate service name 1"); + ok (service_add (sw, SVC_ALT2, "fakeuuid", foo_cb, NULL) == 0, + "service_add works for alternate service name 2"); + + msg = flux_request_encode (SVC_NAME ".baz", NULL); + if (!msg) + BAIL_OUT ("flux_request_encode: %s", flux_strerror (errno)); + msg2 = flux_request_encode (SVC_ALT1 ".oooh", NULL); + if (!msg2) + BAIL_OUT ("flux_request_encode: %s", flux_strerror (errno)); + msg3 = flux_request_encode (SVC_ALT2 ".vroom", NULL); + if (!msg3) + BAIL_OUT ("flux_request_encode: %s", flux_strerror (errno)); + + foo_cb_rc = 0; + foo_cb_called = 0; + ok (service_send (sw, msg) == 0 && foo_cb_called == 1, + "service_send matched long service name"); + ok (service_send (sw, msg2) == 0 && foo_cb_called == 2, + "service_send matched first alternate name"); + ok (service_send (sw, msg3) == 0 && foo_cb_called == 3, + "service_send matched second alternate name"); + + service_remove_byuuid (sw, "fakeuuid"); + + foo_cb_rc = 0; + foo_cb_called = 0; + errno = 0; + ok (service_send (sw, msg) < 0 && errno == ENOSYS && foo_cb_called == 0, + "service_send to long service name fails after remove_byuuid"); + errno = 0; + ok (service_send (sw, msg2) < 0 && errno == ENOSYS && foo_cb_called == 0, + "service_send to first alternate name fails after remove_byuuid"); + errno = 0; + ok (service_send (sw, msg3) < 0 && errno == ENOSYS && foo_cb_called == 0, + "service_send to second alternate name fails after remove_byuuid"); + + flux_msg_destroy (msg); + flux_msg_destroy (msg2); + flux_msg_destroy (msg3); + + service_switch_destroy (sw); + + done_testing (); + + return 0; +} + +/* + * vi:ts=4 sw=4 expandtab + */