From b16122e7b11c213215cf8fdf18aa2e5d60f1f548 Mon Sep 17 00:00:00 2001 From: Jim Garlick Date: Wed, 17 Dec 2014 10:00:03 -0800 Subject: [PATCH] flux-module, broker, libflux: rework module API Services implementing 'module extensions' will implement insmod, rmmod, lsmod operations defined in RFC 5. Provide an API for calling these operations and utility functions to help services implementing them. flux-module is now a general purpose tool for managing modules within any service. modctl functionality is temporarily out; it will be restored with the commit reimplementing modctl on the new interfaces. The broker and flux-module are updated to use the new API. Testing: unit tests for RFC 5 protocol encode/decode are added. One place in test where we loaded modules on two ranks using modctl (t/t0002-request.t) was modified to use the simpler flux-module interface directed at successive ranks. --- src/broker/cmbd.c | 145 ++++++++------- src/broker/module.c | 9 +- src/cmd/flux-module.c | 254 ++++++++++--------------- src/common/libflux/module.c | 356 ++++++++++++++++++++++++++++++------ src/common/libflux/module.h | 101 +++++++--- src/modules/modctl/modctl.c | 6 + t/t0002-request.t | 17 +- 7 files changed, 576 insertions(+), 312 deletions(-) diff --git a/src/broker/cmbd.c b/src/broker/cmbd.c index 2b319b4ec0e0..c7791b707f7d 100644 --- a/src/broker/cmbd.c +++ b/src/broker/cmbd.c @@ -51,6 +51,7 @@ #include "src/common/libutil/jsonutil.h" #include "src/common/libutil/ipaddr.h" #include "src/common/libutil/shortjson.h" +#include "src/common/libutil/argv.h" #include "module.h" #include "boot_pmi.h" @@ -144,7 +145,6 @@ typedef struct { zlist_t *rmmod_reqs; ctx_t *ctx; char *path; - int flags; nodeset_t ns; } module_t; @@ -168,7 +168,7 @@ static void cmbd_init_socks (ctx_t *ctx); static int cmbd_init_child (ctx_t *ctx, endpt_t *ep); static int cmbd_init_gevent_pub (ctx_t *ctx, endpt_t *ep); -static module_t *module_create (ctx_t *ctx, const char *path, int flags); +static module_t *module_create (ctx_t *ctx, const char *path); static void module_destroy (module_t *mod); static void module_unload (module_t *mod, zmsg_t **zmsg); static bool module_select (module_t *mod, const char *nstr); @@ -752,7 +752,7 @@ static void boot_local (ctx_t *ctx) tmpdir, ctx->sid, rrank); } -static module_t *module_create (ctx_t *ctx, const char *path, int flags) +static module_t *module_create (ctx_t *ctx, const char *path) { module_t *mod = xzmalloc (sizeof (*mod)); @@ -762,7 +762,6 @@ static module_t *module_create (ctx_t *ctx, const char *path, int flags) if (!(mod->rmmod_reqs = zlist_new ())) oom (); mod->ctx = ctx; - mod->flags = flags; return mod; } @@ -848,7 +847,6 @@ static bool module_select (module_t *mod, const char *nstr) static module_t *module_prepare_one (ctx_t *ctx, const char *arg) { char *path, *name; - int flags = 0; module_t *mod; if (strchr (arg, '/')) { /* path name given */ @@ -862,7 +860,7 @@ static module_t *module_prepare_one (ctx_t *ctx, const char *arg) } if (!(mod = zhash_lookup (ctx->modules, name))) { - if (!(mod = module_create (ctx, path, flags))) + if (!(mod = module_create (ctx, path))) err_exit ("module %s", name); zhash_update (ctx->modules, name, mod); zhash_freefn (ctx->modules, name, (zhash_free_fn *)module_destroy); @@ -1186,28 +1184,23 @@ static char *cmb_getattr (ctx_t *ctx, const char *name) return val; } -/* Modctl sets 'managed' flag for insert/remove. flux-mod ins,rm does not. - */ -static int cmb_rmmod (ctx_t *ctx, const char *name, int flags, zmsg_t **zmsg) +static int cmb_rmmod (ctx_t *ctx, const char *name, zmsg_t **zmsg) { module_t *mod; if (!(mod = zhash_lookup (ctx->modules, name))) { errno = ENOENT; return -1; } - if ((mod->flags & FLUX_MOD_FLAGS_MANAGED) - != (flags & FLUX_MOD_FLAGS_MANAGED)) { - errno = EINVAL; - return -1; - } module_unload (mod, zmsg); flux_log (ctx->h, LOG_INFO, "rmmod %s", name); return 0; } -static json_object *cmb_lsmod (ctx_t *ctx) +/* Build lsmod response payload per RFC 5. + */ +static JSON cmb_lsmod (ctx_t *ctx) { - json_object *mo, *response = util_json_object_new_object (); + JSON o = flux_lsmod_json_create (); zlist_t *keys; char *name; module_t *mod; @@ -1218,27 +1211,25 @@ static json_object *cmb_lsmod (ctx_t *ctx) while (name) { mod = zhash_lookup (ctx->modules, name); assert (mod != NULL); - mo = util_json_object_new_object (); - util_json_object_add_string (mo, "name", plugin_name (mod->p)); - util_json_object_add_int (mo, "size", plugin_size (mod->p)); - util_json_object_add_string (mo, "digest", plugin_digest (mod->p)); - util_json_object_add_int (mo, "flags", mod->flags); - util_json_object_add_int (mo, "idle", - peer_idle (ctx, plugin_uuid (mod->p))); - util_json_object_add_string (mo, "nodelist", ctx->rankstr); - json_object_object_add (response, name, mo); + if (mod && flux_lsmod_json_append (o, plugin_name (mod->p), + plugin_size (mod->p), + plugin_digest (mod->p), + peer_idle (ctx, plugin_uuid (mod->p))) < 0) { + Jput (o); + o = NULL; + goto done; + } name = zlist_next (keys); } +done: zlist_destroy (&keys); - return response; + return o; } -static int cmb_insmod (ctx_t *ctx, const char *path, int flags, - json_object *args) +static int cmb_insmod (ctx_t *ctx, char *path, int argc, char **argv) { - int rc = -1; + int i, rc = -1; module_t *mod; - json_object_iter iter; char *name = NULL; if (!(name = flux_modname (path))) { @@ -1249,12 +1240,20 @@ static int cmb_insmod (ctx_t *ctx, const char *path, int flags, errno = EEXIST; goto done; } - if (!(mod = module_create (ctx, path, flags))) + if (!(mod = module_create (ctx, path))) goto done; - json_object_object_foreachC (args, iter) { - const char *val = json_object_get_string (iter.val); - zhash_update (mod->args, iter.key, xstrdup (val)); - zhash_freefn (mod->args, iter.key, (zhash_free_fn *)free); + /* Args are transitioning from zhash to argc, argv. + * To avoid too much code perturbation for now we translate here. + * Assume args are in key=val format as before. + */ + for (i = 0; i < argc; i++ ) { + char *key = xstrdup (argv[i]); + char *val = strchr (key, '='); + if (val) + *val++ = '\0'; + zhash_update (mod->args, key, xstrdup (val ? val : "1")); + zhash_freefn (mod->args, key, (zhash_free_fn *)free); + free (key); } if (module_load (ctx, mod) < 0) { module_destroy (mod); @@ -1262,7 +1261,7 @@ static int cmb_insmod (ctx_t *ctx, const char *path, int flags, } zhash_update (ctx->modules, name, mod); zhash_freefn (ctx->modules, name, (zhash_free_fn *)module_destroy); - flux_log (ctx->h, LOG_INFO, "insmod %s %s", name, path); + flux_log (ctx->h, LOG_INFO, "insmod %s", name); rc = 0; done: if (name) @@ -1707,43 +1706,51 @@ static int cmb_internal_request (ctx_t *ctx, zmsg_t **zmsg) json_object_put (response); } } else if (flux_msg_match (*zmsg, "cmb.rmmod")) { - json_object *request = NULL; - const char *name; - int flags; - if (flux_msg_decode (*zmsg, NULL, &request) < 0 || request == NULL - || util_json_object_get_string (request, "name", &name) < 0 - || util_json_object_get_int (request, "flags", &flags) < 0) { - flux_respond_errnum (ctx->h, zmsg, EPROTO); - } else if (cmb_rmmod (ctx, name, flags, zmsg) < 0) { - flux_respond_errnum (ctx->h, zmsg, errno); - } /* else response is deferred until module returns EOF */ - if (request) - json_object_put (request); + char *name = NULL; + int errnum = 0; + if (flux_rmmod_request_decode (*zmsg, &name) < 0) + errnum = errno; + else if (cmb_rmmod (ctx, name, zmsg) < 0) /* responds on success */ + errnum = errno; + if (errnum && flux_err_respond (ctx->h, errnum, zmsg) < 0) + flux_log (ctx->h, LOG_ERR, "%s: flux_err_respond: %s", + __FUNCTION__, strerror (errno)); + if (name) + free (name); } else if (flux_msg_match (*zmsg, "cmb.insmod")) { - json_object *args, *request = NULL; - const char *path; - int flags; - if (flux_msg_decode (*zmsg, NULL, &request) < 0 || request == NULL - || util_json_object_get_string (request, "path", &path) < 0 - || util_json_object_get_int (request, "flags", &flags) < 0 - || !(args = json_object_object_get (request, "args"))) { - flux_respond_errnum (ctx->h, zmsg, EPROTO); - } else if (cmb_insmod (ctx, path, flags, args) < 0) { - flux_respond_errnum (ctx->h, zmsg, errno); - } else { - flux_respond_errnum (ctx->h, zmsg, 0); - } - if (request) - json_object_put (request); + char *path = NULL; + int argc; + char **argv = NULL; + int errnum = 0; + if (flux_insmod_request_decode (*zmsg, &path, &argc, &argv) < 0) + errnum = errno; + else if (cmb_insmod (ctx, path, argc, argv) < 0) + errnum = errno; + if (flux_err_respond (ctx->h, errnum, zmsg) < 0) + flux_log (ctx->h, LOG_ERR, "%s: flux_err_respond: %s", + __FUNCTION__, strerror (errno)); + if (path) + free (path); + if (argv) + argv_destroy (argc, argv); } else if (flux_msg_match (*zmsg, "cmb.lsmod")) { - json_object *response = NULL; - if (!(response = cmb_lsmod (ctx))) { - flux_respond_errnum (ctx->h, zmsg, errno); + if (flux_lsmod_request_decode (*zmsg) < 0) { + if (flux_err_respond (ctx->h, errno, zmsg) < 0) + flux_log (ctx->h, LOG_ERR, "%s: flux_err_respond: %s", + __FUNCTION__, strerror (errno)); } else { - flux_respond (ctx->h, zmsg, response); + JSON out = cmb_lsmod (ctx); + if (!out) { + if (flux_err_respond (ctx->h, errno, zmsg) < 0) + flux_log (ctx->h, LOG_ERR, "%s: flux_err_respond: %s", + __FUNCTION__, strerror (errno)); + } else { + if (flux_json_respond (ctx->h, out, zmsg) < 0) + flux_log (ctx->h, LOG_ERR, "%s: flux_json_respond: %s", + __FUNCTION__, strerror (errno)); + } + Jput (out); } - if (response) - json_object_put (response); } else if (flux_msg_match (*zmsg, "cmb.lspeer")) { json_object *response = NULL; if (!(response = peer_ls (ctx))) { diff --git a/src/broker/module.c b/src/broker/module.c index 54e4b9a15617..f0b9183b09a2 100644 --- a/src/broker/module.c +++ b/src/broker/module.c @@ -53,6 +53,11 @@ #include "module.h" +/* While transitioning to argc, argv - style args per RFC 5, + * we have our own mod_main prototype. + */ +typedef int (mod_main_comms_f)(flux_t h, zhash_t *args); + typedef struct { int request_tx; @@ -77,7 +82,7 @@ struct plugin_ctx_struct { char *svc_uri; zuuid_t *uuid; pthread_t t; - mod_main_f *main; + mod_main_comms_f *main; plugin_stats_t stats; zloop_t *zloop; dq_t *dq; @@ -679,7 +684,7 @@ plugin_ctx_t plugin_create (flux_t h, const char *path, zhash_t *args) plugin_ctx_t p; void *dso; const char **mod_namep; - mod_main_f *mod_main; + mod_main_comms_f *mod_main; zfile_t *zf; dlerror (); diff --git a/src/cmd/flux-module.c b/src/cmd/flux-module.c index f62d08c6264f..237a8f4c3983 100644 --- a/src/cmd/flux-module.c +++ b/src/cmd/flux-module.c @@ -38,22 +38,45 @@ #include "src/common/libutil/readall.h" -#define OPTIONS "+h" +#define OPTIONS "+hr:" static const struct option longopts[] = { {"help", no_argument, 0, 'h'}, + {"rank", required_argument, 0, 'r'}, { 0, 0, 0, 0 }, }; -static void module_list (flux_t h, int argc, char **argv); -static void module_remove (flux_t h, int argc, char **argv); -static void module_load (flux_t h, int argc, char **argv); +void mod_lsmod (flux_t h, uint32_t nodeid, int ac, char **av); +void mod_rmmod (flux_t h, uint32_t nodeid, int ac, char **av); +void mod_insmod (flux_t h, uint32_t nodeid, int ac, char **av); + +typedef struct { + const char *name; + void (*fun)(flux_t h, uint32_t nodeid, int ac, char **av); +} func_t; + +static func_t funcs[] = { + { "list", &mod_lsmod}, + { "remove", &mod_rmmod}, + { "load", &mod_insmod}, +}; + +func_t *func_lookup (const char *name) +{ + int i; + for (i = 0; i < sizeof (funcs) / sizeof (funcs[0]); i++) + if (!strcmp (funcs[i].name, name)) + return &funcs[i]; + return NULL; +} void usage (void) { fprintf (stderr, -"Usage: flux-module list\n" -" flux-module remove module\n" -" flux-module load module [arg=val...]\n" +"Usage: flux-module [OPTIONS] list [service]\n" +" flux-module [OPTIONS] remove module\n" +" flux-module [OPTIONS] load module [arg ...]\n" +"where OPTIONS are:\n" +" -r,--rank N specify nodeid to send request\n" ); exit (1); } @@ -63,6 +86,8 @@ int main (int argc, char *argv[]) flux_t h; int ch; char *cmd; + func_t *f; + uint32_t nodeid = FLUX_NODEID_ANY; log_init ("flux-module"); @@ -71,6 +96,9 @@ int main (int argc, char *argv[]) case 'h': /* --help */ usage (); break; + case 'r': /* --rank N */ + nodeid = strtoul (optarg, NULL, 10); + break; default: usage (); break; @@ -79,185 +107,93 @@ int main (int argc, char *argv[]) if (optind == argc) usage (); cmd = argv[optind++]; + if (!(f = func_lookup (cmd))) + msg_exit ("unknown function '%s'", cmd); if (!(h = flux_api_open ())) err_exit ("flux_api_open"); - - if (!strcmp (cmd, "list") || !strcmp (cmd, "ls")) - module_list (h, argc - optind, argv + optind); - else if (!strcmp (cmd, "remove") || !strcmp (cmd, "rm")) - module_remove (h, argc - optind, argv + optind); - else if (!strcmp (cmd, "load")) - module_load (h, argc - optind, argv + optind); - else - usage (); - + f->fun (h, nodeid, argc - optind, argv + optind); flux_api_close (h); + log_fini (); return 0; } -static char *flagstr (int flags) -{ - char *s = xzmalloc (16); - if ((flags & FLUX_MOD_FLAGS_MANAGED)) - strcat (s, "m"); - return s; -} - -static char *idlestr (int idle) +void mod_insmod (flux_t h, uint32_t nodeid, int ac, char **av) { - char *s; - if (idle > 99) - s = xstrdup ("idle"); - else if (asprintf (&s, "%d", idle) < 0) - oom (); - return s; -} + char *modpath = NULL; + char *modname = NULL; -static void module_list_one (const char *key, JSON mo) -{ - const char *name, *nodelist = NULL; - int flags, idle, size; - char *fs, *is; - - if (!Jget_str (mo, "name", &name) || !Jget_int (mo, "flags", &flags) - || !Jget_int (mo, "size", &size) || !Jget_str (mo, "nodelist", &nodelist) - || !Jget_int (mo, "idle", &idle)) - msg_exit ("error parsing lsmod response"); - fs = flagstr (flags); - is = idlestr (idle); - printf ("%-20.20s %6d %-6s %4s %s\n", key, size, fs, is, nodelist); - free (fs); - free (is); -} - -static void module_list (flux_t h, int argc, char **argv) -{ - JSON lsmod, mods; - json_object_iter iter; - - if (argc > 0) + if (ac < 1) usage (); - if (flux_modctl_update (h) < 0) - err_exit ("flux_modctl_update"); - /* FIXME: flux_modctl_update doesn't wait for KVS to be updated, - * so there is a race here. The following usleep should be removed - * once this is addressed. - */ - usleep (1000*100); - printf ("%-20s %6s %-6s %4s %s\n", - "Module", "Size", "Flags", "Idle", "Nodelist"); - if (kvs_get (h, "conf.modctl.lsmod", &lsmod) == 0) { - if (!Jget_obj (lsmod, "mods", &mods)) - msg_exit ("error parsing lsmod KVS object"); - json_object_object_foreachC (mods, iter) { - module_list_one (iter.key, iter.val); - } - Jput (lsmod); + if (strchr (av[0], '/')) { /* path name given */ + modpath = xstrdup (av[0]); + if (!(modname = flux_modname (modpath))) + msg_exit ("%s", dlerror ()); + } else { + char *searchpath = getenv ("FLUX_MODULE_PATH"); + if (!searchpath) + searchpath = MODULE_PATH; + modname = xstrdup (av[0]); + if (!(modpath = flux_modfind (searchpath, modname))) + msg_exit ("%s: not found in module search path", modname); } + if (flux_insmod (h, nodeid, modpath, ac - 1, av + 1) < 0) + err_exit ("%s", av[0]); + if (modpath) + free (modpath); + if (modname) + free (modname); } -static void module_remove (flux_t h, int argc, char **argv) +void mod_rmmod (flux_t h, uint32_t nodeid, int ac, char **av) { - char *key, *mod; - - if (argc != 1) - usage (); - mod = argv[0]; - if (asprintf (&key, "conf.modctl.modules.%s", mod) < 0) - oom (); - if (kvs_unlink (h, key) < 0) - err_exit ("%s", key); - if (kvs_commit (h) < 0) - err_exit ("kvs_commit"); - if (flux_modctl_rm (h, mod) < 0) - err_exit ("%s", mod); - msg ("%s: unloaded", mod); - free (key); + if (ac != 1) + usage (); + if (flux_rmmod (h, nodeid, av[0]) < 0) + err_exit ("%s", av[0]); } -static JSON parse_modargs (int argc, char **argv) +const char *snip (const char *s, int n) { - JSON args = Jnew (); - int i; - - for (i = 0; i < argc; i++) { - char *val = NULL, *cpy = xstrdup (argv[i]); - if ((val = strchr (cpy, '='))) - *val++ = '\0'; - if (!val) - msg_exit ("malformed argument: %s", cpy); - Jadd_str (args, cpy, val); - free (cpy); - } - - return args; + if (strlen (s) < n) + return s; + else + return s + strlen (s) - n; } -/* Copy mod to KVS (without commit). - */ -static void copymod (flux_t h, const char *name, const char *path, JSON args) +static int lsmod_cb (const char *name, int size, const char *digest, int idle, + const char *nodeset, void *arg) { - JSON mod = Jnew (); - char *key; - int fd, len; - uint8_t *buf; - - if (asprintf (&key, "conf.modctl.modules.%s", name) < 0) - oom (); - if (kvs_get (h, key, &mod) == 0) - errn_exit (EEXIST, "%s", key); - Jadd_obj (mod, "args", args); - if ((fd = open (path, O_RDONLY)) < 0) - err_exit ("%s", path); - if ((len = read_all (fd, &buf)) < 0) - err_exit ("%s", path); - (void)close (fd); - util_json_object_add_data (mod, "data", buf, len); - if (kvs_put (h, key, mod) < 0) - err_exit ("kvs_put %s", key); - free (key); - free (buf); - Jput (mod); + char idle_str[16]; + if (idle < 100) + snprintf (idle_str, sizeof (idle_str), "%d", idle); + else + strncpy (idle_str, "idle", sizeof (idle_str)); + printf ("%-20.20s %6d %7s %4s %s\n", + name, + size, + snip (digest, 7), + idle_str, + nodeset ? nodeset : ""); + return 0; } -static void module_load (flux_t h, int argc, char **argv) +void mod_lsmod (flux_t h, uint32_t nodeid, int ac, char **av) { - JSON args; - char *path, *name; - char *searchpath = getenv ("FLUX_MODULE_PATH"); - - if (!searchpath) - searchpath = MODULE_PATH; - - if (argc == 0) - usage (); - if (strchr (argv[0], '/')) { /* path name given */ - path = xstrdup (argv[0]); - if (!(name = flux_modname (path))) - msg_exit ("%s", dlerror ()); - } else { /* module name given */ - name = xstrdup (argv[0]); - if (!(path = flux_modfind (searchpath, name))) - msg_exit ("%s: not found in module search path", name); - } - argc--; - argv++; - - args = parse_modargs (argc, argv); - copymod (h, name, path, args); - if (kvs_commit (h) < 0) - err_exit ("kvs_commit"); - if (flux_modctl_ins (h, name) < 0) - err_exit ("flux_modctl_ins %s", name); - msg ("module loaded"); + char *svc = "cmb"; - Jput (args); - free (name); - free (path); + if (ac > 1) + usage (); + if (ac == 1) + svc = av[0]; + printf ("%-20s %6s %7s %4s %s\n", + "Module", "Size", "Digest", "Idle", "Nodeset"); + if (flux_lsmod (h, nodeid, svc, lsmod_cb, NULL) < 0) + err_exit ("%s", svc); } + /* * vi:tabstop=4 shiftwidth=4 expandtab */ diff --git a/src/common/libflux/module.c b/src/common/libflux/module.c index 206c0a7c1e22..0405354e7546 100644 --- a/src/common/libflux/module.c +++ b/src/common/libflux/module.c @@ -29,91 +29,136 @@ #include "module.h" #include "request.h" +#include "message.h" #include "src/common/libutil/shortjson.h" #include "src/common/libutil/xzmalloc.h" +#include "src/common/libutil/argv.h" -/* Who will load modname? +/* Get service name from module name string. */ -static char *mod_target (const char *modname) +static char *mod_service (const char *modname) { - char *target = NULL; + char *service = NULL; if (strchr (modname, '.')) { - target = xstrdup (modname); - char *p = strrchr (target, '.'); + service = xstrdup (modname); + char *p = strrchr (service, '.'); *p = '\0'; } else - target = xstrdup ("cmb"); - return target; + service = xstrdup ("cmb"); + return service; } -#ifndef TEST_MAIN /* Not testing this section */ +/** + ** JSON encode/decode functions + **/ -int flux_rmmod (flux_t h, int rank, const char *name, int flags) +int flux_insmod_json_decode (JSON o, char **path, int *argc, char ***argv) { - JSON request = Jnew (); - JSON response = NULL; - char *target = mod_target (name); + JSON args = NULL; + const char *s; + int i, ac; int rc = -1; - Jadd_str (request, "name", name); - Jadd_int (request, "flags", flags); - if ((response = flux_rank_rpc (h, rank, request, "%s.rmmod", target))) { + if (!Jget_str (o, "path", &s) || !Jget_obj (o, "args", &args) + || !Jget_ar_len (args, &ac)) { errno = EPROTO; goto done; } - if (errno != 0) - goto done; + *path = xstrdup (s); + argv_create (argc, argv); + for (i = 0; i < ac; i++) { + (void)Jget_ar_str (args, i, &s); /* can't fail? */ + argv_push (argc, argv, "%s", s); + } rc = 0; done: - free (target); - Jput (request); - Jput (response); + Jput (args); return rc; } -JSON flux_lsmod (flux_t h, int rank, const char *target) +JSON flux_insmod_json_encode (const char *path, int argc, char **argv) +{ + JSON o = Jnew (); + JSON args = Jnew_ar (); + int i; + + Jadd_str (o, "path", path); + for (i = 0; i < argc; i++) + Jadd_ar_str (args, argv[i]); + Jadd_obj (o, "args", args); + return o; +} + +int flux_rmmod_json_decode (JSON o, char **name) { - JSON response = NULL; + const char *s; + int rc = -1; + if (!Jget_str (o, "name", &s)) { + errno = EPROTO; + goto done; + } + *name = xstrdup (s); + rc = 0; +done: + return rc; +} - if (target == NULL) - target = "cmb"; - response = flux_rank_rpc (h, rank, NULL, "%s.lsmod", target); - return response; +JSON flux_rmmod_json_encode (const char *name) +{ + JSON o = Jnew (); + Jadd_str (o, "name", name); + return o; } -int flux_insmod (flux_t h, int rank, const char *path, int flags, JSON args) +int flux_lsmod_json_decode_nth (JSON a, int n, const char **name, int *size, + const char **digest, int *idle) { - JSON request = Jnew (); - JSON response = NULL; - char *name = NULL; - char *target = NULL; + JSON o; int rc = -1; - if (!(name = flux_modname (path))) { - errno = EINVAL; + if (!Jget_ar_obj (a, n, &o) || !Jget_str (o, "name", name) + || !Jget_int (o, "size", size) + || !Jget_str (o, "digest", digest) + || !Jget_int (o, "idle", idle)) { + errno = EPROTO; goto done; } - target = mod_target (name); - Jadd_str (request, "path", path); - Jadd_int (request, "flags", flags); - Jadd_obj (request, "args", args); - if ((response = flux_rank_rpc (h, rank, request, "%s.insmod", target))) { + rc = 0; +done: + return rc; +} + +int flux_lsmod_json_decode (JSON a, int *len) +{ + int rc = -1; + + if (!Jget_ar_len (a, len)) { errno = EPROTO; goto done; } - if (errno != 0) - goto done; rc = 0; done: - if (target) - free (target); - Jput (request); - Jput (response); return rc; } -#endif /* !TEST_MAIN */ +int flux_lsmod_json_append (JSON a, const char *name, int size, + const char *digest, int idle) +{ + JSON o = Jnew (); + Jadd_str (o, "name", name); + Jadd_int (o, "size", size); + Jadd_str (o, "digest", digest); + Jadd_int (o, "idle", idle); + Jadd_ar_obj (a, o); /* takes a ref on o */ + Jput (o); + return 0; +} + +JSON flux_lsmod_json_create (void) +{ + return Jnew_ar (); +} char *flux_modname(const char *path) { @@ -130,6 +175,7 @@ char *flux_modname(const char *path) return name; } +/* helper for flux_modfind() */ static int flux_modname_cmp(const char *path, const char *name) { void *dso; @@ -145,15 +191,18 @@ static int flux_modname_cmp(const char *path, const char *name) return rc; } -#undef MAX +#ifndef MAX #define MAX(a,b) ((a)>(b)?(a):(b)) +#endif +/* helper for flux_modfind() */ static int strcmpend (const char *s1, const char *s2) { int skip = MAX (strlen (s1) - strlen (s2), 0); return strcmp (s1 + skip, s2); } +/* helper for flux_modfind() */ static char *modfind (const char *dirpath, const char *modname) { DIR *dir; @@ -205,36 +254,160 @@ char *flux_modfind (const char *searchpath, const char *modname) return modpath; } +/* It is not convenient to test these directly here. + */ +#ifndef TEST_MAIN +int flux_insmod_request_decode (zmsg_t *zmsg, char **path, + int *argc, char ***argv) +{ + JSON in = NULL; + int rc = -1; + + if (flux_json_request_decode (zmsg, &in) < 0) + goto done; + if (flux_insmod_json_decode (in, path, argc, argv) < 0) + goto done; + rc = 0; +done: + Jput (in); + return rc; +} + +int flux_rmmod_request_decode (zmsg_t *zmsg, char **name) +{ + JSON in = NULL; + int rc = -1; + + if (flux_json_request_decode (zmsg, &in) < 0) + goto done; + if (flux_rmmod_json_decode (in, name) < 0) + goto done; + rc = 0; +done: + Jput (in); + return rc; +} + +int flux_lsmod_request_decode (zmsg_t *zmsg) +{ + int type; + int rc = -1; + + if (flux_msg_get_type (zmsg, &type) < 0) + goto done; + if (type != FLUX_MSGTYPE_REQUEST || flux_msg_has_payload (zmsg)) { + errno = EPROTO; + goto done; + } + rc = 0; +done: + return rc; +} + +int flux_rmmod (flux_t h, uint32_t nodeid, const char *name) +{ + JSON in = NULL; + char *service = mod_service (name); + char *topic = xasprintf ("%s.rmmod", service); + int rc = -1; + + in = flux_rmmod_json_encode (name); + Jadd_str (in, "name", name); + if (flux_json_rpc (h, nodeid, topic, in, NULL) < 0) + goto done; + rc = 0; +done: + free (service); + free (topic); + Jput (in); + return rc; +} + +int flux_lsmod (flux_t h, uint32_t nodeid, const char *service, + flux_lsmod_f cb, void *arg) +{ + JSON out = NULL; + char *topic = xasprintf ("%s.lsmod", service ? service : "cmb"); + int rc = -1; + int i, len; + + if (flux_json_rpc (h, nodeid, topic, NULL, &out) < 0) + goto done; + if (flux_lsmod_json_decode (out, &len) < 0) + goto done; + for (i = 0; i < len; i++) { + const char *name, *digest; + int size, idle; + if (flux_lsmod_json_decode_nth (out, i, &name, &size, &digest, &idle) < 0) + goto done; + if (cb (name, size, digest, idle, NULL, arg) < 0) + goto done; + } + rc = 0; +done: + free (topic); + return rc; +} + +int flux_insmod (flux_t h, uint32_t nodeid, const char *path, + int argc, char **argv) +{ + JSON in = Jnew (); + char *name = NULL; + char *service = NULL; + char *topic = NULL; + int rc = -1; + + if (!(name = flux_modname (path))) { + errno = EINVAL; + goto done; + } + service = mod_service (name); + topic = xasprintf ("%s.insmod", service); + + in = flux_insmod_json_encode (path, argc, argv); + if (flux_json_rpc (h, nodeid, topic, in, NULL) < 0) + goto done; + rc = 0; +done: + if (service) + free (service); + if (topic) + free (topic); + Jput (in); + return rc; +} +#endif /* !TEST_MAIN */ + + #ifdef TEST_MAIN #include "src/common/libtap/tap.h" -int main (int argc, char *argv[]) +void test_helpers (void) { char *name, *path; - plan (16); - ok ((strcmpend ("foo.so", ".so") == 0), "strcmpend matches .so"); ok ((strcmpend ("", ".so") != 0), "strcmpend doesn't match empty string"); - name = mod_target ("kvs"); + name = mod_service ("kvs"); like (name, "^cmb$", - "mod_target of kvs is cmb"); + "mod_service of kvs is cmb"); if (name) free (name); - name = mod_target ("sched.backfill"); + name = mod_service ("sched.backfill"); like (name, "^sched$", - "mod_target of sched.backfill is sched"); + "mod_service of sched.backfill is sched"); if (name) free (name); - name = mod_target ("sched.backfill.priority"); + name = mod_service ("sched.backfill.priority"); like (name, "^sched.backfill$", - "mod_target of sched.backfill.priority is sched.backfill"); + "mod_service of sched.backfill.priority is sched.backfill"); if (name) free (name); @@ -289,10 +462,83 @@ int main (int argc, char *argv[]) if (name) free (name); free (path); +} + +void test_lsmod_codec (void) +{ + JSON o; + int len; + int idle, size; + const char *name, *digest; + + o = flux_lsmod_json_create (); + ok (o != NULL, + "flux_lsmod_json_create works"); + ok (flux_lsmod_json_append (o, "foo", 42, "aa", 3) == 0, + "first flux_lsmod_json_append works"); + ok (flux_lsmod_json_append (o, "bar", 43, "bb", 2) == 0, + "second flux_lsmod_json_append works"); + ok (flux_lsmod_json_decode (o, &len) == 0 && len == 2, + "flux_lsmod_json_decode works"); + ok (flux_lsmod_json_decode_nth (o, 0, &name, &size, &digest, &idle) == 0 + && name && size == 42 && digest && idle == 3 + && !strcmp (name, "foo") && !strcmp (digest, "aa"), + "flux_lsmod_json_decode_nth(0) works"); + ok (flux_lsmod_json_decode_nth (o, 1, &name, &size, &digest, &idle) == 0 + && name && size == 43 && digest && idle == 2 + && !strcmp (name, "bar") && !strcmp (digest, "bb"), + "flux_lsmod_json_decode_nth(1) works"); + + Jput (o); +} + +void test_rmmod_codec (void) +{ + JSON o; + char *s = NULL; + + o = flux_rmmod_json_encode ("xyz"); + ok (o != NULL, + "flux_rmmod_json_encode works"); + ok (flux_rmmod_json_decode (o, &s) == 0 && s != NULL && !strcmp (s, "xyz"), + "flux_rmmod_json_decode works"); + free (s); + Jput (o); +} + +void test_insmod_codec (void) +{ + int ac, argc = 2; + char *argv[] = { "foo", "bar" }; + char **av; + JSON o; + char *s; + + o = flux_insmod_json_encode ("/foo/bar", argc, argv); + ok (o != NULL, + "flux_insmod_json_encode works"); + ok (flux_insmod_json_decode (o, &s, &ac, &av) == 0 + && s != NULL && !strcmp (s, "/foo/bar") + && ac == 2 && !strcmp (av[0], "foo") && !strcmp (av[1], "bar"), + "flux_insmod_json_decode works"); + argv_destroy (ac, av); + free (s); + Jput (o); +} + +int main (int argc, char *argv[]) +{ + plan (26); + + test_helpers (); // 16 + test_lsmod_codec (); // 6 + test_rmmod_codec (); // 2 + test_insmod_codec (); // 2 done_testing (); } + #endif /* diff --git a/src/common/libflux/module.h b/src/common/libflux/module.h index 1c006345f047..d29d3824a408 100644 --- a/src/common/libflux/module.h +++ b/src/common/libflux/module.h @@ -1,44 +1,64 @@ #ifndef _FLUX_CORE_MODULE_H #define _FLUX_CORE_MODULE_H +/* Module management messages are constructed according to Flux RFC 5. + * https://github.com/flux-framework/rfc/blob/master/spec_5.adoc + */ + +#include #include #include #include "handle.h" -/* Flags for module load/unload +/** + ** High level module management functions + **/ + +/* lsmod callback - return 0 on success, -1 to stop iteration and + * have flux_lsmod() return error. + * Note: 'nodeset' will be NULL when called from flux_lsmod(). */ -enum { - FLUX_MOD_FLAGS_MANAGED = 1, /* XXX used by modctl, may go away soon */ -}; +typedef int (flux_lsmod_f)(const char *name, int size, const char *digest, + int idle, const char *nodeset, void *arg); -/* Send a request to load/unload a module (use rank = -1 for local). - * These go to the broker unless the module name is hierarchical, e.g. - * flux_insmod() of kvs would generate a "cmb.insmod" message, while - * flux_rmmod() of sched.backfill would generate a "sched.rmmod" message. +/* Send a request to 'service' to list loaded modules (null for comms mods). + * On success, the 'cb' function is called for each module with 'arg'. + * Returns 0 on success, -1 with errno set on failure. */ -int flux_rmmod (flux_t h, int rank, const char *name, int flags); -int flux_insmod (flux_t h, int rank, const char *path, int flags, - json_object *args); +int flux_lsmod (flux_t h, uint32_t nodeid, const char *service, + flux_lsmod_f cb, void *arg); -/* While the target of an insmod/rmmod message can be determined by - * parsing the module name, lsmod requires it to be explicity specified - * in 'target'. +/* Send a request to remove a module 'name'. + * The request is sent to a service determined by parsing 'name'. + * Returns 0 on success, -1 with errno set on failure. */ -json_object *flux_lsmod (flux_t h, int rank, const char *target); +int flux_rmmod (flux_t h, uint32_t nodeid, const char *name); -/* All flux modules must define the "mod_name" symbol with this macro. +/* Send a request to insert a module 'path'. + * The request is sent to a service determined by parsing the module's name, + * as defined by its symbol 'mod_name' (found by opening 'path'). Pass args + * described by 'argc' and 'argv' to the module's 'mod_main' function. + * Returns 0 on success, -1 with errno set on failure. */ +int flux_insmod (flux_t h, uint32_t nodeid, const char *path, + int argc, char **argv); + + +/** + ** Mandatory symbols for modules + **/ + #define MOD_NAME(x) const char *mod_name = x +typedef int (mod_main_f)(flux_t h, int argc, char *argv[]); -/* Comms modules (loaded by the broker) must define mod_main(). - * (Other types of modules will have their own requirements). - */ -typedef int (mod_main_f)(flux_t h, zhash_t *args); -extern mod_main_f mod_main; -/* Read 'mod_name' from the specified module filename. - * Caller must free the returned name. +/** + ** Convenience functions for services implementing module extensions + **/ + +/* Read the value of 'mod_name' from the specified module filename. + * Caller must free the returned name. Returns NULL on failure. */ char *flux_modname (const char *filename); @@ -48,6 +68,41 @@ char *flux_modname (const char *filename); */ char *flux_modfind (const char *searchpath, const char *modname); +/* Codecs for module control payloads + */ +json_object *flux_lsmod_json_create (void); +int flux_lsmod_json_append (json_object *a, const char *name, int size, + const char *digest, int idle); +int flux_lsmod_json_decode (json_object *a, int *len); +int flux_lsmod_json_decode_nth (json_object *a, int n, const char **name, + int *size, const char **digest, int *idle); + +json_object *flux_rmmod_json_encode (const char *name); +int flux_rmmod_json_decode (json_object *o, char **name); + +json_object *flux_insmod_json_encode (const char *path, + int argc, char **argv); +int flux_insmod_json_decode (json_object *o, char **path, + int *argc, char ***argv); + +/* Decode an insmod request message. + * The 'path' returned on success must be freed by the caller. + * The 'argv' returned on success must be freed, including 'argc' elements + * Returns 0 on success, -1 with errno set on failure. + */ +int flux_insmod_request_decode (zmsg_t *zmsg, char **path, + int *argc, char ***argv); + +/* Decode an rmmod request message. + * The 'name' returned on success must be freed by the caller. + * Returns 0 on success, -1 with errno set on failure. + */ +int flux_rmmod_request_decode (zmsg_t *zmsg, char **name); + +/* Decode an lsmod request message. + * Returns 0 on success, -1 with errno set on failure. + */ +int flux_lsmod_request_decode (zmsg_t *zmsg); #endif /* !FLUX_CORE_MODULE_H */ diff --git a/src/modules/modctl/modctl.c b/src/modules/modctl/modctl.c index 9d5604573fdd..6a5a79f132b1 100644 --- a/src/modules/modctl/modctl.c +++ b/src/modules/modctl/modctl.c @@ -37,6 +37,8 @@ #include #include +#if 0 + #include "src/common/libutil/xzmalloc.h" #include "src/common/libutil/log.h" #include "src/common/libutil/shortjson.h" @@ -431,8 +433,11 @@ static msghandler_t htab[] = { }; const int htablen = sizeof (htab) / sizeof (htab[0]); +#endif + int mod_main (flux_t h, zhash_t *args) { +#if 0 ctx_t *ctx = getctx (h); if (kvs_watch_int (h, "conf.modctl.seq", conf_cb, ctx) < 0) { @@ -443,6 +448,7 @@ int mod_main (flux_t h, zhash_t *args) flux_log (h, LOG_ERR, "flux_msghandler_add: %s", strerror (errno)); return -1; } +#endif if (flux_reactor_start (h) < 0) { flux_log (h, LOG_ERR, "flux_reactor_start: %s", strerror (errno)); return -1; diff --git a/t/t0002-request.t b/t/t0002-request.t index 42c3d18038aa..cc4db92d9cf5 100755 --- a/t/t0002-request.t +++ b/t/t0002-request.t @@ -3,16 +3,23 @@ test_description='Test basic request/response handling + Verify basic request/response/rpc handling. ' . `dirname $0`/sharness.sh test_under_flux 2 -test_expect_success 'request: load req module' ' +test_expect_success 'request: load req module on rank 0' ' flux module load ${FLUX_BUILD_DIR}/src/test/request/.libs/req.so ' +# FIXME: uses rank-addressed requests which we test below +test_expect_success 'request: load req module on rank 1' ' + flux module --rank 1 \ + load ${FLUX_BUILD_DIR}/src/test/request/.libs/req.so +' + test_expect_success 'request: simple rpc with no payload' ' ${FLUX_BUILD_DIR}/src/test/request/treq null ' @@ -21,9 +28,7 @@ test_expect_success 'request: simple rpc to rank 0' ' ${FLUX_BUILD_DIR}/src/test/request/treq --rank 0 null ' -# sleep temporarily required - see issue #118 test_expect_success 'request: simple rpc to rank 1' ' - sleep 1 && ${FLUX_BUILD_DIR}/src/test/request/treq --rank 1 null ' @@ -72,7 +77,11 @@ test_expect_success 'request: proxy ping any from 1 is one hop' ' # ${FLUX_BUILD_DIR}/src/test/request/treq --rank 0 pingany #' -test_expect_success 'request: unloaded req module' ' +test_expect_success 'request: unloaded req module on rank 1' ' + flux module --rank 1 remove req +' + +test_expect_success 'request: unloaded req module on rank 0' ' flux module remove req '