Skip to content

Commit

Permalink
Merge pull request flux-framework#1189 from garlick/dyn_service
Browse files Browse the repository at this point in the history
broker/service: cleanup and prepare for dynamic service registration
  • Loading branch information
grondo authored Sep 14, 2017
2 parents 8443eff + c004490 commit 4157888
Show file tree
Hide file tree
Showing 5 changed files with 310 additions and 110 deletions.
7 changes: 6 additions & 1 deletion src/broker/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ flux_broker_LDFLAGS = ${broker_ldflags}
TESTS = test_shutdown.t \
test_heartbeat.t \
test_hello.t \
test_attr.t
test_attr.t \
test_service.t

test_ldadd = \
$(top_builddir)/src/common/libflux-core.la \
Expand Down Expand Up @@ -94,3 +95,7 @@ test_hello_t_LDADD = $(test_ldadd)
test_attr_t_SOURCES = test/attr.c attr.c
test_attr_t_CPPFLAGS = $(test_cppflags)
test_attr_t_LDADD = $(test_ldadd)

test_service_t_SOURCES = test/service.c service.c
test_service_t_CPPFLAGS = $(test_cppflags)
test_service_t_LDADD = $(test_ldadd)
64 changes: 30 additions & 34 deletions src/broker/broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ typedef struct {
int event_recv_seq;
int event_send_seq;
bool event_active; /* primary event source is active */
svchash_t *services;
struct service_switch *services;
heartbeat_t *heartbeat;
shutdown_t *shutdown;
double shutdown_grace;
Expand Down Expand Up @@ -264,7 +264,8 @@ int main (int argc, char *argv[])

ctx.rank = FLUX_NODEID_ANY;
ctx.modhash = modhash_create ();
ctx.services = svchash_create ();
if (!(ctx.services = service_switch_create ()))
log_err_exit ("service_switch_create");
ctx.overlay = overlay_create ();
ctx.hello = hello_create ();
ctx.tbon.k = 2; /* binary TBON is default */
Expand Down Expand Up @@ -702,7 +703,7 @@ int main (int argc, char *argv[])
flux_sec_destroy (ctx.sec);
overlay_destroy (ctx.overlay);
heartbeat_destroy (ctx.heartbeat);
svchash_destroy (ctx.services);
service_switch_destroy (ctx.services);
hello_destroy (ctx.hello);
attr_destroy (ctx.attrs);
flux_close (ctx.h);
Expand Down Expand Up @@ -1417,10 +1418,13 @@ static int load_module_bypath (broker_ctx_t *ctx, const char *path,
}
if (!(p = module_add (ctx->modhash, path)))
goto error;
if (!svc_add (ctx->services, module_get_name (p),
module_get_service (p), mod_svc_cb, p)) {
errno = EEXIST;
goto error;
if (service_add (ctx->services, module_get_name (p),
module_get_uuid (p), mod_svc_cb, p) < 0)
goto module_remove;
if (module_get_service (p)) {
if (service_add (ctx->services, module_get_service (p),
module_get_uuid (p), mod_svc_cb, p) < 0)
goto service_remove;
}
arg = argz_next (argz, argz_len, NULL);
while (arg) {
Expand All @@ -1430,15 +1434,17 @@ static int load_module_bypath (broker_ctx_t *ctx, const char *path,
module_set_poller_cb (p, module_cb, ctx);
module_set_status_cb (p, module_status_cb, ctx);
if (request && module_push_insmod (p, request) < 0) // response deferred
goto error;
goto service_remove;
if (module_start (p) < 0)
goto error;
goto service_remove;
flux_log (ctx->h, LOG_DEBUG, "insmod %s", name);
free (name);
return 0;
service_remove:
service_remove_byuuid (ctx->services, module_get_uuid (p));
module_remove:
module_remove (ctx->modhash, p);
error:
if (p)
module_remove (ctx->modhash, p);
free (name);
return -1;
}
Expand Down Expand Up @@ -1490,7 +1496,7 @@ static int unload_module_byname (broker_ctx_t *ctx, const char *name,
return -1;
} else {
assert (request == NULL);
svc_remove (ctx->services, module_get_name (p));
service_remove_byuuid (ctx->services, module_get_uuid (p));
module_remove (ctx->modhash, p);
}
flux_log (ctx->h, LOG_DEBUG, "rmmod %s", name);
Expand Down Expand Up @@ -1756,17 +1762,12 @@ static struct flux_msg_handler_spec handlers[] = {
};

struct internal_service {
const char *topic;
const char *name;
const char *nodeset;
};

static struct internal_service services[] = {
{ "cmb.rusage", NULL },
{ "cmb.ping", NULL },
{ "cmb.exec", NULL },
{ "cmb.exec.signal", NULL },
{ "cmb.exec.write", NULL },
{ "cmb.processes", NULL },
{ "cmb", NULL }, // kind of a catch-all, slowly deprecating
{ "log", NULL },
{ "seq", "[0]" },
{ "content", NULL },
Expand All @@ -1777,25 +1778,20 @@ static struct internal_service services[] = {
};

/* Register builtin services (sharing ctx->h and broker thread).
* First loop is for services that are registered in other files.
* Second loop is for services registered here.
* Register message handlers for some cmb services. Others are registered
* in their own initialization functions.
*/
static void broker_add_services (broker_ctx_t *ctx)
{
struct internal_service *svc;
for (svc = &services[0]; svc->topic != NULL; svc++) {
for (svc = &services[0]; svc->name != NULL; svc++) {
if (!nodeset_member (svc->nodeset, ctx->rank))
continue;
if (!svc_add (ctx->services, svc->topic, NULL, route_to_handle, ctx))
log_err_exit ("error registering service for %s", svc->topic);
if (service_add (ctx->services, svc->name, NULL,
route_to_handle, ctx) < 0)
log_err_exit ("error registering service for %s", svc->name);
}

struct flux_msg_handler_spec *spec;
for (spec = &handlers[0]; spec->topic_glob != NULL; spec++) {
if (!svc_add (ctx->services, spec->topic_glob, NULL,
route_to_handle, ctx))
log_err_exit ("error registering service for %s", spec->topic_glob);
}
if (flux_msg_handler_addvec (ctx->h, handlers, ctx) < 0)
log_err_exit ("error registering message handlers");
}
Expand Down Expand Up @@ -2015,7 +2011,7 @@ static void module_status_cb (module_t *p, int prev_status, void *arg)
*/
if (status == FLUX_MODSTATE_EXITED) {
flux_log (ctx->h, LOG_DEBUG, "module %s exited", name);
svc_remove (ctx->services, module_get_name (p));
service_remove_byuuid (ctx->services, module_get_uuid (p));
while ((msg = module_pop_rmmod (p))) {
if (flux_respond (ctx->h, msg, 0, NULL) < 0)
flux_log_error (ctx->h, "flux_respond to rmmod %s", name);
Expand Down Expand Up @@ -2117,7 +2113,7 @@ static int broker_request_sendmsg (broker_ctx_t *ctx, const flux_msg_t *msg,
if (rc < 0)
goto error;
} else if ((flags & FLUX_MSGFLAG_UPSTREAM) && nodeid != ctx->rank) {
rc = svc_sendmsg (ctx->services, msg);
rc = service_send (ctx->services, msg);
if (rc < 0 && errno == ENOSYS) {
rc = overlay_sendmsg_parent (ctx->overlay, msg);
if (rc < 0 && errno == EHOSTUNREACH)
Expand All @@ -2126,7 +2122,7 @@ static int broker_request_sendmsg (broker_ctx_t *ctx, const flux_msg_t *msg,
if (rc < 0)
goto error;
} else if (nodeid == FLUX_NODEID_ANY) {
rc = svc_sendmsg (ctx->services, msg);
rc = service_send (ctx->services, msg);
if (rc < 0 && errno == ENOSYS) {
rc = overlay_sendmsg_parent (ctx->overlay, msg);
if (rc < 0 && errno == EHOSTUNREACH)
Expand All @@ -2135,7 +2131,7 @@ static int broker_request_sendmsg (broker_ctx_t *ctx, const flux_msg_t *msg,
if (rc < 0)
goto error;
} else if (nodeid == ctx->rank) {
rc = svc_sendmsg (ctx->services, msg);
rc = service_send (ctx->services, msg);
if (rc < 0)
goto error;
} else if ((gw = kary_child_route (ctx->tbon.k, ctx->size,
Expand Down
Loading

0 comments on commit 4157888

Please sign in to comment.