Skip to content

Commit

Permalink
flux-module, broker, libflux: rework module API
Browse files Browse the repository at this point in the history
Services implementing 'module extensions' will implement
insmod, rmmod, lsmod operations defined in RFC 5.  Provide an API
for calling these operations and utility functions to help services
implementing them.

flux-module is now a general purpose tool for managing modules
within any service.  modctl functionality is temporarily out;
it will be restored with the commit reimplementing modctl on the
new interfaces.

The broker and flux-module are updated to use the new API.

Testing: unit tests for RFC 5 protocol encode/decode are added.
One place in test where we loaded modules on two ranks using modctl
(t/t0002-request.t) was modified to use the simpler flux-module
interface directed at successive ranks.
  • Loading branch information
garlick committed Dec 17, 2014
1 parent 4fff346 commit b16122e
Show file tree
Hide file tree
Showing 7 changed files with 576 additions and 312 deletions.
145 changes: 76 additions & 69 deletions src/broker/cmbd.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
#include "src/common/libutil/jsonutil.h"
#include "src/common/libutil/ipaddr.h"
#include "src/common/libutil/shortjson.h"
#include "src/common/libutil/argv.h"

#include "module.h"
#include "boot_pmi.h"
Expand Down Expand Up @@ -144,7 +145,6 @@ typedef struct {
zlist_t *rmmod_reqs;
ctx_t *ctx;
char *path;
int flags;
nodeset_t ns;
} module_t;

Expand All @@ -168,7 +168,7 @@ static void cmbd_init_socks (ctx_t *ctx);
static int cmbd_init_child (ctx_t *ctx, endpt_t *ep);
static int cmbd_init_gevent_pub (ctx_t *ctx, endpt_t *ep);

static module_t *module_create (ctx_t *ctx, const char *path, int flags);
static module_t *module_create (ctx_t *ctx, const char *path);
static void module_destroy (module_t *mod);
static void module_unload (module_t *mod, zmsg_t **zmsg);
static bool module_select (module_t *mod, const char *nstr);
Expand Down Expand Up @@ -752,7 +752,7 @@ static void boot_local (ctx_t *ctx)
tmpdir, ctx->sid, rrank);
}

static module_t *module_create (ctx_t *ctx, const char *path, int flags)
static module_t *module_create (ctx_t *ctx, const char *path)
{
module_t *mod = xzmalloc (sizeof (*mod));

Expand All @@ -762,7 +762,6 @@ static module_t *module_create (ctx_t *ctx, const char *path, int flags)
if (!(mod->rmmod_reqs = zlist_new ()))
oom ();
mod->ctx = ctx;
mod->flags = flags;
return mod;
}

Expand Down Expand Up @@ -848,7 +847,6 @@ static bool module_select (module_t *mod, const char *nstr)
static module_t *module_prepare_one (ctx_t *ctx, const char *arg)
{
char *path, *name;
int flags = 0;
module_t *mod;

if (strchr (arg, '/')) { /* path name given */
Expand All @@ -862,7 +860,7 @@ static module_t *module_prepare_one (ctx_t *ctx, const char *arg)
}

if (!(mod = zhash_lookup (ctx->modules, name))) {
if (!(mod = module_create (ctx, path, flags)))
if (!(mod = module_create (ctx, path)))
err_exit ("module %s", name);
zhash_update (ctx->modules, name, mod);
zhash_freefn (ctx->modules, name, (zhash_free_fn *)module_destroy);
Expand Down Expand Up @@ -1186,28 +1184,23 @@ static char *cmb_getattr (ctx_t *ctx, const char *name)
return val;
}

/* Modctl sets 'managed' flag for insert/remove. flux-mod ins,rm does not.
*/
static int cmb_rmmod (ctx_t *ctx, const char *name, int flags, zmsg_t **zmsg)
static int cmb_rmmod (ctx_t *ctx, const char *name, zmsg_t **zmsg)
{
module_t *mod;
if (!(mod = zhash_lookup (ctx->modules, name))) {
errno = ENOENT;
return -1;
}
if ((mod->flags & FLUX_MOD_FLAGS_MANAGED)
!= (flags & FLUX_MOD_FLAGS_MANAGED)) {
errno = EINVAL;
return -1;
}
module_unload (mod, zmsg);
flux_log (ctx->h, LOG_INFO, "rmmod %s", name);
return 0;
}

static json_object *cmb_lsmod (ctx_t *ctx)
/* Build lsmod response payload per RFC 5.
*/
static JSON cmb_lsmod (ctx_t *ctx)
{
json_object *mo, *response = util_json_object_new_object ();
JSON o = flux_lsmod_json_create ();
zlist_t *keys;
char *name;
module_t *mod;
Expand All @@ -1218,27 +1211,25 @@ static json_object *cmb_lsmod (ctx_t *ctx)
while (name) {
mod = zhash_lookup (ctx->modules, name);
assert (mod != NULL);
mo = util_json_object_new_object ();
util_json_object_add_string (mo, "name", plugin_name (mod->p));
util_json_object_add_int (mo, "size", plugin_size (mod->p));
util_json_object_add_string (mo, "digest", plugin_digest (mod->p));
util_json_object_add_int (mo, "flags", mod->flags);
util_json_object_add_int (mo, "idle",
peer_idle (ctx, plugin_uuid (mod->p)));
util_json_object_add_string (mo, "nodelist", ctx->rankstr);
json_object_object_add (response, name, mo);
if (mod && flux_lsmod_json_append (o, plugin_name (mod->p),
plugin_size (mod->p),
plugin_digest (mod->p),
peer_idle (ctx, plugin_uuid (mod->p))) < 0) {
Jput (o);
o = NULL;
goto done;
}
name = zlist_next (keys);
}
done:
zlist_destroy (&keys);
return response;
return o;
}

static int cmb_insmod (ctx_t *ctx, const char *path, int flags,
json_object *args)
static int cmb_insmod (ctx_t *ctx, char *path, int argc, char **argv)
{
int rc = -1;
int i, rc = -1;
module_t *mod;
json_object_iter iter;
char *name = NULL;

if (!(name = flux_modname (path))) {
Expand All @@ -1249,20 +1240,28 @@ static int cmb_insmod (ctx_t *ctx, const char *path, int flags,
errno = EEXIST;
goto done;
}
if (!(mod = module_create (ctx, path, flags)))
if (!(mod = module_create (ctx, path)))
goto done;
json_object_object_foreachC (args, iter) {
const char *val = json_object_get_string (iter.val);
zhash_update (mod->args, iter.key, xstrdup (val));
zhash_freefn (mod->args, iter.key, (zhash_free_fn *)free);
/* Args are transitioning from zhash to argc, argv.
* To avoid too much code perturbation for now we translate here.
* Assume args are in key=val format as before.
*/
for (i = 0; i < argc; i++ ) {
char *key = xstrdup (argv[i]);
char *val = strchr (key, '=');
if (val)
*val++ = '\0';
zhash_update (mod->args, key, xstrdup (val ? val : "1"));
zhash_freefn (mod->args, key, (zhash_free_fn *)free);
free (key);
}
if (module_load (ctx, mod) < 0) {
module_destroy (mod);
goto done;
}
zhash_update (ctx->modules, name, mod);
zhash_freefn (ctx->modules, name, (zhash_free_fn *)module_destroy);
flux_log (ctx->h, LOG_INFO, "insmod %s %s", name, path);
flux_log (ctx->h, LOG_INFO, "insmod %s", name);
rc = 0;
done:
if (name)
Expand Down Expand Up @@ -1707,43 +1706,51 @@ static int cmb_internal_request (ctx_t *ctx, zmsg_t **zmsg)
json_object_put (response);
}
} else if (flux_msg_match (*zmsg, "cmb.rmmod")) {
json_object *request = NULL;
const char *name;
int flags;
if (flux_msg_decode (*zmsg, NULL, &request) < 0 || request == NULL
|| util_json_object_get_string (request, "name", &name) < 0
|| util_json_object_get_int (request, "flags", &flags) < 0) {
flux_respond_errnum (ctx->h, zmsg, EPROTO);
} else if (cmb_rmmod (ctx, name, flags, zmsg) < 0) {
flux_respond_errnum (ctx->h, zmsg, errno);
} /* else response is deferred until module returns EOF */
if (request)
json_object_put (request);
char *name = NULL;
int errnum = 0;
if (flux_rmmod_request_decode (*zmsg, &name) < 0)
errnum = errno;
else if (cmb_rmmod (ctx, name, zmsg) < 0) /* responds on success */
errnum = errno;
if (errnum && flux_err_respond (ctx->h, errnum, zmsg) < 0)
flux_log (ctx->h, LOG_ERR, "%s: flux_err_respond: %s",
__FUNCTION__, strerror (errno));
if (name)
free (name);
} else if (flux_msg_match (*zmsg, "cmb.insmod")) {
json_object *args, *request = NULL;
const char *path;
int flags;
if (flux_msg_decode (*zmsg, NULL, &request) < 0 || request == NULL
|| util_json_object_get_string (request, "path", &path) < 0
|| util_json_object_get_int (request, "flags", &flags) < 0
|| !(args = json_object_object_get (request, "args"))) {
flux_respond_errnum (ctx->h, zmsg, EPROTO);
} else if (cmb_insmod (ctx, path, flags, args) < 0) {
flux_respond_errnum (ctx->h, zmsg, errno);
} else {
flux_respond_errnum (ctx->h, zmsg, 0);
}
if (request)
json_object_put (request);
char *path = NULL;
int argc;
char **argv = NULL;
int errnum = 0;
if (flux_insmod_request_decode (*zmsg, &path, &argc, &argv) < 0)
errnum = errno;
else if (cmb_insmod (ctx, path, argc, argv) < 0)
errnum = errno;
if (flux_err_respond (ctx->h, errnum, zmsg) < 0)
flux_log (ctx->h, LOG_ERR, "%s: flux_err_respond: %s",
__FUNCTION__, strerror (errno));
if (path)
free (path);
if (argv)
argv_destroy (argc, argv);
} else if (flux_msg_match (*zmsg, "cmb.lsmod")) {
json_object *response = NULL;
if (!(response = cmb_lsmod (ctx))) {
flux_respond_errnum (ctx->h, zmsg, errno);
if (flux_lsmod_request_decode (*zmsg) < 0) {
if (flux_err_respond (ctx->h, errno, zmsg) < 0)
flux_log (ctx->h, LOG_ERR, "%s: flux_err_respond: %s",
__FUNCTION__, strerror (errno));
} else {
flux_respond (ctx->h, zmsg, response);
JSON out = cmb_lsmod (ctx);
if (!out) {
if (flux_err_respond (ctx->h, errno, zmsg) < 0)
flux_log (ctx->h, LOG_ERR, "%s: flux_err_respond: %s",
__FUNCTION__, strerror (errno));
} else {
if (flux_json_respond (ctx->h, out, zmsg) < 0)
flux_log (ctx->h, LOG_ERR, "%s: flux_json_respond: %s",
__FUNCTION__, strerror (errno));
}
Jput (out);
}
if (response)
json_object_put (response);
} else if (flux_msg_match (*zmsg, "cmb.lspeer")) {
json_object *response = NULL;
if (!(response = peer_ls (ctx))) {
Expand Down
9 changes: 7 additions & 2 deletions src/broker/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@

#include "module.h"

/* While transitioning to argc, argv - style args per RFC 5,
* we have our own mod_main prototype.
*/
typedef int (mod_main_comms_f)(flux_t h, zhash_t *args);


typedef struct {
int request_tx;
Expand All @@ -77,7 +82,7 @@ struct plugin_ctx_struct {
char *svc_uri;
zuuid_t *uuid;
pthread_t t;
mod_main_f *main;
mod_main_comms_f *main;
plugin_stats_t stats;
zloop_t *zloop;
dq_t *dq;
Expand Down Expand Up @@ -679,7 +684,7 @@ plugin_ctx_t plugin_create (flux_t h, const char *path, zhash_t *args)
plugin_ctx_t p;
void *dso;
const char **mod_namep;
mod_main_f *mod_main;
mod_main_comms_f *mod_main;
zfile_t *zf;

dlerror ();
Expand Down
Loading

0 comments on commit b16122e

Please sign in to comment.