diff --git a/config/systemd.m4 b/config/systemd.m4 new file mode 100644 index 000000000000..98e1cc776ada --- /dev/null +++ b/config/systemd.m4 @@ -0,0 +1,47 @@ +dnl Probe for systemd libraries and installation paths. +dnl +dnl Provides the RRA_WITH_SYSTEMD_UNITDIR macro, which adds the +dnl --with-systemdsystemunitdir configure flag, sets the systemdsystemunitdir +dnl substitution variable, and provides the HAVE_SYSTEMD Automake conditional +dnl to use to control whether to install unit files. +dnl +dnl Provides the RRA_LIB_SYSTEMD_DAEMON_OPTIONAL macro, which sets +dnl SYSTEMD_CFLAGS and SYSTEMD_LIBS substitution variables if +dnl libsystemd-daemon is available and defines HAVE_SD_NOTIFY. pkg-config +dnl support for libsystemd-daemon is required for it to be detected. +dnl +dnl Depends on the Autoconf macros that come with pkg-config. +dnl +dnl The canonical version of this file is maintained in the rra-c-util +dnl package, available at . +dnl +dnl Written by Russ Allbery +dnl Copyright 2013, 2014 +dnl The Board of Trustees of the Leland Stanford Junior University +dnl +dnl This file is free software; the authors give unlimited permission to copy +dnl and/or distribute it, with or without modifications, as long as this +dnl notice is preserved. + +dnl Determine the systemd system unit directory, along with a configure flag +dnl to override, and sets @systemdsystemunitdir@. Provides the Automake +dnl HAVE_SYSTEMD Automake conditional. +AC_DEFUN([RRA_WITH_SYSTEMD_UNITDIR], +[AC_REQUIRE([PKG_PROG_PKG_CONFIG]) + AS_IF([test x"$PKG_CONFIG" = x], [PKG_CONFIG=false]) + AC_ARG_WITH([systemdsystemunitdir], + [AS_HELP_STRING([--with-systemdsystemunitdir=DIR], + [Directory for systemd service files])], + [], + [with_systemdsystemunitdir=$($PKG_CONFIG --variable=systemdsystemunitdir systemd)]) + AS_IF([test x"$with_systemdsystemunitdir" != xno], + [AC_SUBST([systemdsystemunitdir], [$with_systemdsystemunitdir])]) + AM_CONDITIONAL([HAVE_SYSTEMD], + [test -n "$with_systemdsystemunitdir" -a x"$with_systemdsystemunitdir" != xno])]) + +dnl Check for libsystemd-daemon and define SYSTEMD_DAEMON_{CFLAGS,LIBS} if it +dnl is available. +AC_DEFUN([RRA_LIB_SYSTEMD_DAEMON_OPTIONAL], +[PKG_CHECK_EXISTS([libsystemd-daemon], + [PKG_CHECK_MODULES([SYSTEMD_DAEMON], [libsystemd-daemon]) + AC_DEFINE([HAVE_SD_NOTIFY], 1, [Define if sd_notify is available.])])]) diff --git a/configure.ac b/configure.ac index 06819dd4a22e..930330ff5ec0 100644 --- a/configure.ac +++ b/configure.ac @@ -180,6 +180,11 @@ if test "$enable_caliper" = "yes"; then AC_DEFINE([HAVE_CALIPER], [1], [Define if you have libcaliper]) fi +## +# Check for systemd +## +RRA_WITH_SYSTEMD_UNITDIR + ## # Embedded libev @@ -280,6 +285,7 @@ AC_CONFIG_FILES( \ etc/Makefile \ etc/flux-core.pc \ etc/flux-pmi.pc \ + etc/flux.service \ doc/Makefile \ doc/man1/Makefile \ doc/man3/Makefile \ diff --git a/doc/man1/flux-module.adoc b/doc/man1/flux-module.adoc index 900939ff739e..dac5f592a8c8 100644 --- a/doc/man1/flux-module.adoc +++ b/doc/man1/flux-module.adoc @@ -43,18 +43,69 @@ inferred from the name specified on the command line. *list* ['OPTIONS'] ['service']:: List modules loaded by 'service', or by flux-broker(1) if 'service' is unspecified. +*stats* ['OPTIONS'] ['name']:: +Request statistics from module 'name'. A JSON object containing a set of +counters for each type of Flux message is returned by default, however +the object may be customized on a module basis. + +*debug* ['OPTIONS'] ['name']:: +Manipulate debug flags in module 'name'. The interpretation of debug +flag bits is private to the module and its test drivers. OPTIONS ------- *-r, --rank*'=NODESET':: Specify which ranks to apply the command to. See NODESET FORMAT below -for more information. +for more information. The 'stats' and 'debug' commands accept only +a single rank here. *-x, --exclude*'=NODESET':: Specify ranks to exclude the command from. See NODESET FORMAT below -for more information. +for more information. This option is not accepted by the 'stats' +and 'debug' commands. + +STATS OPTIONS +------------- +*-p, --parse*'=OBJNAME':: +OBJNAME is a period delimited list of field names that should be walked +to obtain a specific value or object in the returned JSON. + +*-t, --type*'=int|double':: +Force the returned value to be converted to int or double. + +*-s, --scale*'=N':: +Multiply the returned (int or double) value by the specified +floating point value. + +*-R, --rusage*:: +Return a JSON object representing an 'rusage' structure +returned by getrusage(2). + +*-c, --clear*:: +Send a request message to clear statistics in the target module. + +*-C, --clear-all*:: +Broadcast an event message to clear statistics in the target module +on all ranks. + +DEBUG OPTIONS +------------- + +*-c, --clear*:: +Set debug flags to zero. + +*-S, --set*'=MASK':: +Set debug flags to MASK. +The value may be prefixed with 0x to indicate hexadecimal or 0 +to indicate octal, otherwise the value is interpreted as decimal. +*-c, --clearbit*'=MASK':: +Clear the debug bits specified in MASK without disturbing other bits. +The value is interpreted as above. +*-s, --setbit*'=MASK':: +Set the debug bits specified in MASK without disturbing other bits. +The value is interpreted as above. LIST OUTPUT ----------- diff --git a/etc/Makefile.am b/etc/Makefile.am index 2bd47af3ffee..cbd3cb95ffde 100644 --- a/etc/Makefile.am +++ b/etc/Makefile.am @@ -1,3 +1,7 @@ +#if HAVE_SYSTEMD +systemdsystemunit_SCRIPTS = flux.service +#endif + noinst_DATA = \ flux/curve @@ -30,4 +34,5 @@ pkgconfig_DATA = flux-core.pc flux-pmi.pc endif EXTRA_DIST = \ - gen-cmdhelp.pl + gen-cmdhelp.pl \ + flux.service diff --git a/etc/flux.service.in b/etc/flux.service.in new file mode 100644 index 000000000000..a449ffa66c9a --- /dev/null +++ b/etc/flux.service.in @@ -0,0 +1,12 @@ +[Unit] +Description=Flux message broker + +[Service] +ExecStart=@X_BINDIR@/flux start -o,-Sbroker.rundir=%t/flux,-Ssession-id=%H sleep inf +User=flux +Group=flux +RuntimeDirectory=flux +RuntimeDirectoryMode=0755 + +[Install] +WantedBy=multi-user.target diff --git a/src/broker/broker.c b/src/broker/broker.c index a6a5e1958f58..cda12b2dab24 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -180,7 +180,6 @@ static void broker_add_services (broker_ctx_t *ctx); static void load_modules (broker_ctx_t *ctx, const char *default_modules); static void update_proctitle (broker_ctx_t *ctx); -static void update_pidfile (broker_ctx_t *ctx); static void runlevel_cb (runlevel_t *r, int level, int rc, double elapsed, const char *state, void *arg); static void runlevel_io_cb (runlevel_t *r, const char *name, @@ -556,7 +555,6 @@ int main (int argc, char *argv[]) } update_proctitle (&ctx); - update_pidfile (&ctx); if (ctx.rank == 0) { const char *rc1, *rc3, *pmi, *uri; @@ -806,25 +804,6 @@ static void update_proctitle (broker_ctx_t *ctx) ctx->proctitle = s; } -static void update_pidfile (broker_ctx_t *ctx) -{ - const char *rundir; - char *pidfile; - FILE *f; - - if (attr_get (ctx->attrs, "broker.rundir", &rundir, NULL) < 0) - log_msg_exit ("broker.rundir attribute is not set"); - pidfile = xasprintf ("%s/broker.pid", rundir); - if (!(f = fopen (pidfile, "w+"))) - log_err_exit ("%s", pidfile); - if (fprintf (f, "%u", ctx->pid) < 0) - log_err_exit ("%s", pidfile); - if (fclose (f) < 0) - log_err_exit ("%s", pidfile); - cleanup_push_string (cleanup_file, pidfile); - free (pidfile); -} - /* Handle line by line output on stdout, stderr of runlevel subprocess. */ static void runlevel_io_cb (runlevel_t *r, const char *name, diff --git a/src/broker/modservice.c b/src/broker/modservice.c index cf556589db4f..a5c277f82eca 100644 --- a/src/broker/modservice.c +++ b/src/broker/modservice.c @@ -177,6 +177,43 @@ static void shutdown_cb (flux_t *h, flux_msg_handler_t *w, flux_reactor_stop (flux_get_reactor (h)); } +static void debug_cb (flux_t *h, flux_msg_handler_t *w, + const flux_msg_t *msg, void *arg) +{ + int flags; + int *debug_flags; + const char *op; + + if (flux_request_decodef (msg, NULL, "{s:s s:i}", "op", &op, + "flags", &flags) < 0) + goto error; + if (!(debug_flags = flux_aux_get (h, "flux::debug_flags"))) { + if (!(debug_flags = calloc (1, sizeof (*debug_flags)))) { + errno = ENOMEM; + goto error; + } + flux_aux_set (h, "flux::debug_flags", debug_flags, free); + } + if (!strcmp (op, "setbit")) + *debug_flags |= flags; + else if (!strcmp (op, "clrbit")) + *debug_flags &= ~flags; + else if (!strcmp (op, "set")) + *debug_flags = flags; + else if (!strcmp (op, "clr")) + *debug_flags = 0; + else { + errno = EPROTO; + goto error; + } + if (flux_respondf (h, msg, "{s:i}", "flags", *debug_flags) < 0) + flux_log_error (h, "%s: flux_respond", __FUNCTION__); + return; +error: + if (flux_respond (h, msg, errno, NULL) < 0) + flux_log_error (h, "%s: flux_respond", __FUNCTION__); +} + /* Reactor loop is about to block. */ static void prepare_cb (flux_reactor_t *r, flux_watcher_t *w, @@ -244,6 +281,7 @@ void modservice_register (flux_t *h, module_t *p) register_request (ctx, "stats.get", stats_get_cb); register_request (ctx, "stats.clear", stats_clear_request_cb); register_request (ctx, "rusage", rusage_cb); + register_request (ctx, "debug", debug_cb); register_event (ctx, "stats.clear", stats_clear_event_cb); diff --git a/src/cmd/Makefile.am b/src/cmd/Makefile.am index 17088ff64997..e5b40daec496 100644 --- a/src/cmd/Makefile.am +++ b/src/cmd/Makefile.am @@ -71,7 +71,6 @@ fluxcmd_PROGRAMS = \ flux-event \ flux-module \ flux-comms \ - flux-comms-stats \ flux-kvs \ flux-start \ flux-jstat diff --git a/src/cmd/builtin/proxy.c b/src/cmd/builtin/proxy.c index 21cf3ee2a1f0..4e3a172ea6bc 100644 --- a/src/cmd/builtin/proxy.c +++ b/src/cmd/builtin/proxy.c @@ -683,6 +683,7 @@ static int check_cred (proxy_ctx_t *ctx, int fd) if (ucred.uid != ctx->session_owner) { flux_log (ctx->h, LOG_ERR, "connect by uid=%d pid=%d denied", ucred.uid, (int)ucred.pid); + errno = EPERM; goto done; } rc = 0; @@ -690,6 +691,11 @@ static int check_cred (proxy_ctx_t *ctx, int fd) return rc; } +static int send_auth_response (int fd, unsigned char e) +{ + return write (fd, &e, 1); +} + /* Accept a connection from new client. */ static void listener_cb (flux_reactor_t *r, flux_watcher_t *w, @@ -708,9 +714,11 @@ static void listener_cb (flux_reactor_t *r, flux_watcher_t *w, goto done; } if (check_cred (ctx, cfd) < 0) { + send_auth_response (cfd, errno); close (cfd); goto done; } + send_auth_response (cfd, 0); if (!(c = client_create (ctx, cfd, cfd))) { close (cfd); goto done; @@ -894,7 +902,6 @@ static int cmd_proxy (optparse_t *p, int ac, char *av[]) const char *tmpdir = getenv ("TMPDIR"); char workpath[PATH_MAX + 1]; char sockpath[PATH_MAX + 1]; - char pidfile[PATH_MAX + 1]; const char *job; const char *optarg; int optindex; @@ -954,16 +961,6 @@ static int cmd_proxy (optparse_t *p, int ac, char *av[]) log_err_exit ("error creating proxy socket directory"); cleanup_push_string(cleanup_directory, workpath); - /* Write proxy pid to broker.pid file. - * Local connector expects this. - */ - n = snprintf (pidfile, sizeof (pidfile), "%s/broker.pid", workpath); - assert (n < sizeof (pidfile)); - FILE *f = fopen (pidfile, "w"); - if (!f || fprintf (f, "%d", getpid ()) < 0 || fclose (f) == EOF) - log_err_exit ("%s", pidfile); - cleanup_push_string(cleanup_file, pidfile); - /* Listen on socket */ n = snprintf (sockpath, sizeof (sockpath), "%s/local", workpath); diff --git a/src/cmd/flux-comms-stats.c b/src/cmd/flux-comms-stats.c deleted file mode 100644 index 2e168ea458f2..000000000000 --- a/src/cmd/flux-comms-stats.c +++ /dev/null @@ -1,229 +0,0 @@ -/*****************************************************************************\ - * Copyright (c) 2014 Lawrence Livermore National Security, LLC. Produced at - * the Lawrence Livermore National Laboratory (cf, AUTHORS, DISCLAIMER.LLNS). - * LLNL-CODE-658032 All rights reserved. - * - * This file is part of the Flux resource manager framework. - * For details, see https://github.com/flux-framework. - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the Free - * Software Foundation; either version 2 of the license, or (at your option) - * any later version. - * - * Flux is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the IMPLIED WARRANTY OF MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the terms and conditions of the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License along - * with this program; if not, write to the Free Software Foundation, Inc., - * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. - * See also: http://www.gnu.org/licenses/ -\*****************************************************************************/ - -#if HAVE_CONFIG_H -#include "config.h" -#endif -#include -#include -#include -#include -#include -#include - -#include "src/common/libutil/xzmalloc.h" -#include "src/common/libutil/log.h" -#include "src/common/libjson-c/json.h" - - -#define OPTIONS "hcCp:s:t:r:R" -static const struct option longopts[] = { - {"help", no_argument, 0, 'h'}, - {"clear", no_argument, 0, 'c'}, - {"clear-all", no_argument, 0, 'C'}, - {"rusage", no_argument, 0, 'R'}, - {"parse", required_argument, 0, 'p'}, - {"scale", required_argument, 0, 's'}, - {"type", required_argument, 0, 't'}, - {"rank", required_argument, 0, 'r'}, - { 0, 0, 0, 0 }, -}; - -static void parse_json (const char *n, const char *json_str, double scale, - json_type type); - -void usage (void) -{ - fprintf (stderr, -"Usage: flux-comms-stats [--scale N] [--type int|double] --parse a[.b]... name\n" -" flux-comms-stats --clear-all name\n" -" flux-comms-stats --clear name\n" -" flux-comms-stats --rusage name\n" -); - exit (1); -} - -int main (int argc, char *argv[]) -{ - flux_t *h; - int ch; - uint32_t nodeid = FLUX_NODEID_ANY; - char *target; - char *objname = NULL; - bool copt = false; - bool Copt = false; - bool Ropt = false; - double scale = 1.0; - json_type type = json_type_object; - char *endptr; - - log_init ("flux-stats"); - - while ((ch = getopt_long (argc, argv, OPTIONS, longopts, NULL)) != -1) { - switch (ch) { - case 'h': /* --help */ - usage (); - break; - case 'r': /* --rank */ - errno = 0; - nodeid = strtoul (optarg, &endptr, 10); - if (errno || *endptr != '\0') - usage(); - break; - case 'c': /* --clear */ - copt = true; - break; - case 'C': /* --clear-all */ - Copt = true; - break; - case 'R': /* --rusage */ - Ropt = true; - break; - case 'p': /* --parse objname */ - objname = optarg; - break; - case 's': /* --scale N */ - errno = 0; - scale = strtod (optarg, &endptr); - if (errno || *endptr != '\0') - usage(); - break; - case 't': /* --type TYPE */ - if (!strcasecmp (optarg, "int")) - type = json_type_int; - else if (!strcasecmp (optarg, "double")) - type = json_type_double; - else - usage (); - break; - case 'd': /* --parse-double objname */ - objname = optarg; - break; - default: - usage (); - break; - } - } - if (optind != argc - 1) - usage (); - if (scale != 1.0 && type != json_type_int && type != json_type_double) - log_msg_exit ("Use --scale only with --type int or --type double"); - target = argv[optind++]; - - if (Copt && nodeid != FLUX_NODEID_ANY) - log_msg_exit ("Use --clear not --clear-all to clear a single node."); - - if (!(h = flux_open (NULL, 0))) - log_err_exit ("flux_open"); - - if (copt) { - flux_rpc_t *rpc; - char *topic = xasprintf ("%s.stats.clear", target); - if (!(rpc = flux_rpc (h, topic, NULL, nodeid, 0)) - || flux_rpc_get (rpc, NULL) < 0) - log_err_exit ("%s", topic); - free (topic); - flux_rpc_destroy (rpc); - } else if (Copt) { - char *topic = xasprintf ("%s.stats.clear", target); - flux_msg_t *msg = flux_event_encode (target, NULL); - if (!msg || flux_send (h, msg, 0) < 0) - log_err_exit ("sending event"); - flux_msg_destroy (msg); - free (topic); - } else if (Ropt) { - flux_rpc_t *rpc; - const char *json_str; - char *topic = xasprintf ("%s.rusage", target); - if (!(rpc = flux_rpc (h, topic, NULL, nodeid, 0)) - || flux_rpc_get (rpc, &json_str) < 0) - log_err_exit ("%s", topic); - parse_json (objname, json_str, scale, type); - free (topic); - flux_rpc_destroy (rpc); - } else { - flux_rpc_t *rpc; - const char *json_str; - char *topic = xasprintf ("%s.stats.get", target); - if (!(rpc = flux_rpc (h, topic, NULL, nodeid, 0)) - || flux_rpc_get (rpc, &json_str) < 0) - log_err_exit ("%s", topic); - parse_json (objname, json_str, scale, type); - free (topic); - flux_rpc_destroy (rpc); - } - flux_close (h); - log_fini (); - return 0; -} - -static void parse_json (const char *n, const char *json_str, double scale, - json_type type) -{ - json_object *o = json_tokener_parse (json_str); - if (!o) - log_err_exit ("error parsing JSON response"); - if (n) { - char *cpy = xstrdup (n); - char *name, *saveptr = NULL, *a1 = cpy; - - while ((name = strtok_r (a1, ".", &saveptr))) { - if (!json_object_object_get_ex (o, name, &o) || o == NULL) - log_err_exit ("`%s' not found in response", n); - a1 = NULL; - } - free (cpy); - } - switch (type) { - case json_type_double: { - double d; - errno = 0; - d = json_object_get_double (o); - if (errno != 0) - log_err_exit ("couldn't convert value to double"); - printf ("%lf\n", d * scale); - break; - } - case json_type_int: { - double d; - errno = 0; - d = json_object_get_double (o); - if (errno != 0) - log_err_exit ("couldn't convert value to double (en route to int)"); - printf ("%d\n", (int)(d * scale)); - break; - } - default: { - const char *s; - s = json_object_to_json_string_ext (o, JSON_C_TO_STRING_PRETTY); - printf ("%s\n", s); - break; - } - } - json_object_put (o); -} - -/* - * vi:tabstop=4 shiftwidth=4 expandtab - */ diff --git a/src/cmd/flux-module.c b/src/cmd/flux-module.c index d8ebac832dcc..4d3807ef7258 100644 --- a/src/cmd/flux-module.c +++ b/src/cmd/flux-module.c @@ -29,6 +29,8 @@ #include #include #include +#include +#include #include #include @@ -41,117 +43,165 @@ const int max_idle = 99; -typedef struct { - const char *nodeset; - int argc; - char **argv; -} opt_t; - -#define OPTIONS "+hr:x:" -static const struct option longopts[] = { - {"help", no_argument, 0, 'h'}, - {"rank", required_argument, 0, 'r'}, - {"exclude", required_argument, 0, 'x'}, - { 0, 0, 0, 0 }, -}; +int cmd_list (optparse_t *p, int argc, char **argv); +int cmd_remove (optparse_t *p, int argc, char **argv); +int cmd_load (optparse_t *p, int argc, char **argv); +int cmd_info (optparse_t *p, int argc, char **argv); +int cmd_stats (optparse_t *p, int argc, char **argv); +int cmd_debug (optparse_t *p, int argc, char **argv); -void mod_lsmod (flux_t *h, opt_t opt); -void mod_rmmod (flux_t *h, opt_t opt); -void mod_insmod (flux_t *h, opt_t opt); -void mod_info (flux_t *h, opt_t opt); +#define RANK_OPTION { \ + .name = "rank", .key = 'r', .has_arg = 1, .arginfo = "NODESET", \ + .usage = "Include NODESET in operation target", \ +} +#define EXCLUDE_OPTION { \ + .name = "exclude", .key = 'x', .has_arg = 1, .arginfo = "NODESET", \ + .usage = "Exclude NODESET from operation target", \ +} -typedef struct { - const char *name; - void (*fun)(flux_t *h, opt_t opt); -} func_t; - -static func_t funcs[] = { - { "list", &mod_lsmod}, - { "remove", &mod_rmmod}, - { "load", &mod_insmod}, - { "info", &mod_info}, +static struct optparse_option list_opts[] = { + RANK_OPTION, + EXCLUDE_OPTION, + OPTPARSE_TABLE_END +}; +static struct optparse_option remove_opts[] = { + RANK_OPTION, + EXCLUDE_OPTION, + OPTPARSE_TABLE_END +}; +static struct optparse_option load_opts[] = { + RANK_OPTION, + EXCLUDE_OPTION, + OPTPARSE_TABLE_END +}; +static struct optparse_option stats_opts[] = { + { .name = "parse", .key = 'p', .has_arg = 1, .arginfo = "OBJNAME", + .usage = "Parse object period-delimited object name", + }, + { .name = "scale", .key = 's', .has_arg = 1, .arginfo = "N", + .usage = "Scale numeric JSON value by N", + }, + { .name = "type", .key = 't', .has_arg = 1, .arginfo = "int|double", + .usage = "Convert JSON value to specified type", + }, + { .name = "rank", .key = 'r', .has_arg = 1, .arginfo = "RANK", + .usage = "Target specified rank", + }, + { .name = "rusage", .key = 'R', .has_arg = 0, + .usage = "Request rusage data instead of stats", + }, + { .name = "clear", .key = 'c', .has_arg = 0, + .usage = "Clear stats on target rank", + }, + { .name = "clear-all", .key = 'C', .has_arg = 0, + .usage = "Clear stats on all ranks", + }, + OPTPARSE_TABLE_END +}; +static struct optparse_option debug_opts[] = { + { .name = "clear", .key = 'C', .has_arg = 0, + .usage = "Set debug flags to 0", }, + { .name = "set", .key = 'S', .has_arg = 1, + .usage = "Set debug flags to MASK", }, + { .name = "setbit", .key = 's', .has_arg = 1, + .usage = "Set one debug flag to 1", }, + { .name = "clearbit", .key = 'c', .has_arg = 1, + .usage = "Set one debug flag to 0", }, + OPTPARSE_TABLE_END, }; -func_t *func_lookup (const char *name) -{ - int i; - for (i = 0; i < sizeof (funcs) / sizeof (funcs[0]); i++) - if (!strcmp (funcs[i].name, name)) - return &funcs[i]; - return NULL; -} +static struct optparse_subcommand subcommands[] = { + { "list", + "[OPTIONS] [module]", + "List loaded modules", + cmd_list, + 0, + list_opts, + }, + { "remove", + "[OPTIONS] module", + "Unload module", + cmd_remove, + 0, + remove_opts, + }, + { "load", + "[OPTIONS] module", + "Load module", + cmd_load, + 0, + load_opts, + }, + { "info", + "[OPTIONS] module", + "Display module info", + cmd_info, + 0, + NULL + }, + { "stats", + "[OPTIONS] module", + "Display stats on module", + cmd_stats, + 0, + stats_opts, + }, + { "debug", + "[OPTIONS] module", + "Get/set module debug flags", + cmd_debug, + 0, + debug_opts, + }, + OPTPARSE_SUBCMD_END +}; -void usage (void) +int usage (optparse_t *p, struct optparse_option *o, const char *optarg) { - fprintf (stderr, -"Usage: flux-module list [OPTIONS]\n" -" flux-module info [OPTIONS] module\n" -" flux-module load [OPTIONS] module [arg ...]\n" -" flux-module remove [OPTIONS] module\n" -"where OPTIONS are:\n" -" -r,--rank=NODESET add ranks (default \"self\") \n" -" -x,--exclude=NODESET exclude ranks\n" -); + struct optparse_subcommand *s; + optparse_print_usage (p); + fprintf (stderr, "\n"); + fprintf (stderr, "flux module subcommands:\n"); + s = subcommands; + while (s->name) { + fprintf (stderr, " %-15s %s\n", s->name, s->doc); + s++; + } exit (1); } int main (int argc, char *argv[]) { - flux_t *h = NULL; - int ch; - char *cmd; - func_t *f; - opt_t opt; - const char *rankopt = "self"; - const char *excludeopt = NULL; + optparse_t *p; + char *cmdusage = "COMMAND [OPTIONS]"; + int optindex; + int exitval; log_init ("flux-module"); - memset (&opt, 0, sizeof (opt)); - if (argc < 2) - usage (); - cmd = argv[1]; - argc--; - argv++; - - while ((ch = getopt_long (argc, argv, OPTIONS, longopts, NULL)) != -1) { - switch (ch) { - case 'h': /* --help */ - usage (); - break; - case 'r': /* --rank=NODESET */ - rankopt = optarg; - break; - case 'x': /* --exclude=NODESET */ - excludeopt = optarg; - break; - default: - usage (); - break; - } - } - opt.argc = argc - optind; - opt.argv = argv + optind; - - if (!(f = func_lookup (cmd))) - log_msg_exit ("unknown function '%s'", cmd); - - if (strcmp (cmd, "info") != 0) { - if (!(h = flux_open (NULL, 0))) - log_err_exit ("flux_open"); - if (!(opt.nodeset = flux_get_nodeset (h, rankopt, excludeopt))) - log_err_exit ("--exclude/--rank"); - if (strlen (opt.nodeset) == 0) - exit (0); - } + p = optparse_create ("flux-module"); + + if (optparse_set (p, OPTPARSE_USAGE, cmdusage) != OPTPARSE_SUCCESS) + log_msg_exit ("optparse_set (USAGE)"); - f->fun (h, opt); + if (optparse_reg_subcommands (p, subcommands) != OPTPARSE_SUCCESS) + log_msg_exit ("optparse_reg_subcommands"); - if (h) - flux_close (h); + if (optparse_set (p, OPTPARSE_OPTION_CB, "help", usage) != OPTPARSE_SUCCESS) + log_msg_exit ("optparse_set() failed"); + + if (optparse_set (p, OPTPARSE_PRINT_SUBCMDS, 0) != OPTPARSE_SUCCESS) + log_msg_exit ("optparse_set (PRINT_SUBCMDS)"); + + if ((optindex = optparse_parse_args (p, argc, argv)) < 0) + exit (1); + + if ((exitval = optparse_run_subcommand (p, argc, argv)) < 0) + exit (1); + optparse_destroy (p); log_fini (); - return 0; + return (exitval); } char *sha1 (const char *path) @@ -194,15 +244,18 @@ void parse_modarg (const char *arg, char **name, char **path) *path = modpath; } -void mod_info (flux_t *h, opt_t opt) +int cmd_info (optparse_t *p, int argc, char **argv) { char *modpath = NULL; char *modname = NULL; char *digest = NULL; + int n; - if (opt.argc != 1) - usage (); - parse_modarg (opt.argv[0], &modname, &modpath); + if ((n = optparse_option_index (p)) != argc - 1) { + optparse_print_usage (p); + exit (1); + } + parse_modarg (argv[n], &modname, &modpath); digest = sha1 (modpath); printf ("Module name: %s\n", modname); printf ("Module path: %s\n", modpath); @@ -212,6 +265,21 @@ void mod_info (flux_t *h, opt_t opt) free (modpath); free (modname); free (digest); + return (0); +} + +int parse_nodeset (flux_t *h, optparse_t *p, const char **nsp) +{ + const char *ns; + + ns = flux_get_nodeset (h, optparse_get_str (p, "rank", "self"), + optparse_get_str (p, "exclude", NULL)); + if (!ns) + log_err_exit ("target nodeset"); + if (strlen (ns) == 0) + return -1; + *nsp = ns; + return 0; } /* Derive name of module loading service from module name. @@ -228,32 +296,42 @@ char *getservice (const char *modname) return service; } -void mod_insmod (flux_t *h, opt_t opt) +int cmd_load (optparse_t *p, int argc, char **argv) { char *modname; char *modpath; int errors = 0; + int n; + flux_t *h; + const char *ns; + flux_rpc_t *r = NULL; - if (opt.argc < 1) - usage (); - parse_modarg (opt.argv[0], &modname, &modpath); - opt.argv++; - opt.argc--; + if ((n = optparse_option_index (p)) == argc) { + optparse_print_usage (p); + exit (1); + } + parse_modarg (argv[n++], &modname, &modpath); char *service = getservice (modname); char *topic = xasprintf ("%s.insmod", service); - char *json_str = flux_insmod_json_encode (modpath, opt.argc, opt.argv); + char *json_str = flux_insmod_json_encode (modpath, argc - n, argv + n); assert (json_str != NULL); - flux_rpc_t *r = flux_rpc_multi (h, topic, json_str, opt.nodeset, 0); - if (!r) + + if (!(h = flux_open (NULL, 0))) + log_err_exit ("flux_open"); + if (parse_nodeset (h, p, &ns) < 0) + goto done; + if (!(r = flux_rpc_multi (h, topic, json_str, ns, 0))) log_err_exit ("%s", topic); do { - uint32_t nodeid = FLUX_NODEID_ANY; - if (flux_rpc_get_nodeid (r, &nodeid) < 0 - || flux_rpc_get (r, NULL) < 0) { + if (flux_rpc_get (r, NULL) < 0) { + uint32_t nodeid = FLUX_NODEID_ANY; + int saved_errno = errno; + (void)flux_rpc_get_nodeid (r, &nodeid); + errno = saved_errno; if (errno == EEXIST && nodeid != FLUX_NODEID_ANY) log_msg ("%s[%" PRIu32 "]: %s module/service is in use", - topic, nodeid, modname); + topic, nodeid, modname); else if (nodeid != FLUX_NODEID_ANY) log_err ("%s[%" PRIu32 "]", topic, nodeid); else @@ -261,42 +339,60 @@ void mod_insmod (flux_t *h, opt_t opt) errors++; } } while (flux_rpc_next (r) == 0); +done: flux_rpc_destroy (r); free (topic); free (service); free (json_str); free (modpath); free (modname); - if (errors) - exit (1); + flux_close (h); + return (errors ? 1 : 0); } -void mod_rmmod (flux_t *h, opt_t opt) +int cmd_remove (optparse_t *p, int argc, char **argv) { - char *modname = NULL; + char *modname; + const char *ns; + flux_t *h; + flux_rpc_t *r = NULL; + int n; - if (opt.argc != 1) - usage (); - modname = opt.argv[0]; + if ((n = optparse_option_index (p)) != argc - 1) { + optparse_print_usage (p); + exit (1); + } + modname = argv[n++]; char *service = getservice (modname); char *topic = xasprintf ("%s.rmmod", service); char *json_str = flux_rmmod_json_encode (modname); assert (json_str != NULL); - flux_rpc_t *r = flux_rpc_multi (h, topic, json_str, opt.nodeset, 0); - if (!r) + + if (!(h = flux_open (NULL, 0))) + log_err_exit ("flux_open"); + if (parse_nodeset (h, p, &ns) < 0) + goto done; + if (!(r = flux_rpc_multi (h, topic, json_str, ns, 0))) log_err_exit ("%s %s", topic, modname); do { - uint32_t nodeid = FLUX_NODEID_ANY; - if (flux_rpc_get_nodeid (r, &nodeid) < 0 || flux_rpc_get (r, NULL) < 0) + if (flux_rpc_get (r, NULL) < 0) { + uint32_t nodeid = FLUX_NODEID_ANY; + int saved_errno = errno; + (void)flux_rpc_get_nodeid (r, &nodeid); + errno = saved_errno; log_err ("%s[%d] %s", topic, nodeid == FLUX_NODEID_ANY ? -1 : nodeid, modname); + } } while (flux_rpc_next (r) == 0); +done: flux_rpc_destroy (r); free (topic); free (service); free (json_str); + flux_close (h); + return (0); } int lsmod_print_cb (const char *name, int size, const char *digest, int idle, @@ -425,22 +521,33 @@ int lsmod_merge_result (uint32_t nodeid, const char *json_str, zhash_t *mods) return rc; } -void mod_lsmod (flux_t *h, opt_t opt) +int cmd_list (optparse_t *p, int argc, char **argv) { char *service = "cmb"; - - if (opt.argc > 1) - usage (); - if (opt.argc == 1) - service = opt.argv[0]; - printf ("%-20s %-7s %-7s %4s %c %s\n", - "Module", "Size", "Digest", "Idle", 'S', "Nodeset"); + char *topic = NULL; + const char *ns; + flux_rpc_t *r = NULL; + flux_t *h; + int n; zhash_t *mods = zhash_new (); + if (!mods) oom (); - char *topic = xasprintf ("%s.lsmod", service); - flux_rpc_t *r = flux_rpc_multi (h, topic, NULL, opt.nodeset, 0); - if (!r) + if ((n = optparse_option_index (p)) < argc - 1) { + optparse_print_usage (p); + exit (1); + } + if (n < argc) + service = argv[n++]; + if (!(h = flux_open (NULL, 0))) + log_err_exit ("flux_open"); + if (parse_nodeset (h, p, &ns) < 0) + goto done; + + printf ("%-20s %-7s %-7s %4s %c %s\n", + "Module", "Size", "Digest", "Idle", 'S', "Nodeset"); + topic = xasprintf ("%s.lsmod", service); + if (!(r = flux_rpc_multi (h, topic, NULL, ns, 0))) log_err_exit ("%s", topic); do { const char *json_str; @@ -454,12 +561,156 @@ void mod_lsmod (flux_t *h, opt_t opt) log_err ("%s", topic); } } while (flux_rpc_next (r) == 0); +done: flux_rpc_destroy (r); lsmod_map_hash (mods, lsmod_print_cb, NULL); zhash_destroy (&mods); free (topic); + flux_close (h); + return (0); +} + +static void parse_json (optparse_t *p, const char *json_str) +{ + json_t *obj, *o; + const char *objname, *typestr; + double scale; + + if (!(obj = json_loads (json_str, 0, NULL))) + log_msg_exit ("error parsing JSON response"); + + /* If --parse OBJNAME was provided, walk to that + * portion of the returned object. + */ + o = obj; + if ((objname = optparse_get_str (p, "parse", NULL))) { + char *cpy = xstrdup (objname); + char *name, *saveptr = NULL, *a1 = cpy; + while ((name = strtok_r (a1, ".", &saveptr))) { + if (!(o = json_object_get (o, name))) + log_msg_exit ("`%s' not found in response", objname); + a1 = NULL; + } + free (cpy); + } + + /* Display the resulting object/value, optionally forcing + * the type to int or dobule, and optionally scaling the result. + */ + scale = optparse_get_double (p, "scale", 1.0); + typestr = optparse_get_str (p, "type", NULL); + if (json_typeof (o) == JSON_INTEGER || (typestr && !strcmp (typestr, "int"))) { + double d = json_number_value (o); + printf ("%d\n", (int)(d * scale)); + } else if (json_typeof (o) == JSON_REAL || (typestr && !strcmp (typestr, "double"))) { + double d = json_number_value (o); + printf ("%lf\n", d * scale); + } else { + char *s; + s = json_dumps (o, JSON_INDENT(1) | JSON_ENCODE_ANY); + printf ("%s\n", s ? s : "Error encoding JSON"); + free (s); + } + + json_decref (obj); +} + +int cmd_stats (optparse_t *p, int argc, char **argv) +{ + int n; + char *topic = NULL; + char *service; + uint32_t nodeid; + const char *json_str; + flux_rpc_t *r = NULL; + flux_t *h; + + if ((n = optparse_option_index (p)) < argc - 1) { + optparse_print_usage (p); + exit (1); + } + service = n < argc ? argv[n++] : "cmb"; + nodeid = optparse_get_int (p, "rank", FLUX_NODEID_ANY); + + if (!(h = flux_open (NULL, 0))) + log_err_exit ("flux_open"); + + if (optparse_hasopt (p, "clear")) { + topic = xasprintf ("%s.stats.clear", service); + if (!(r = flux_rpc (h, topic, NULL, nodeid, 0))) + log_err_exit ("%s", topic); + if (flux_rpc_get (r, NULL) < 0) + log_err_exit ("%s", topic); + } else if (optparse_hasopt (p, "clear-all")) { + topic = xasprintf ("%s.stats.clear", service); + flux_msg_t *msg = flux_event_encode (topic, NULL); + if (!msg) + log_err_exit ("creating event"); + if (flux_send (h, msg, 0) < 0) + log_err_exit ("sending event"); + flux_msg_destroy (msg); + } else if (optparse_hasopt (p, "rusage")) { + topic = xasprintf ("%s.rusage", service); + if (!(r = flux_rpc (h, topic, NULL, nodeid, 0))) + log_err_exit ("%s", topic); + if (flux_rpc_get (r, &json_str) < 0) + log_err_exit ("%s", topic); + parse_json (p, json_str); + } else { + topic = xasprintf ("%s.stats.get", service); + if (!(r = flux_rpc (h, topic, NULL, nodeid, 0))) + log_err_exit ("%s", topic); + if (flux_rpc_get (r, &json_str) < 0) + log_err_exit ("%s", topic); + parse_json (p, json_str); + } + free (topic); + flux_rpc_destroy (r); + flux_close (h); + return (0); +} + +int cmd_debug (optparse_t *p, int argc, char **argv) +{ + int n; + flux_t *h; + char *topic = NULL; + const char *op = "setbit"; + int flags = 0; + flux_rpc_t *rpc = NULL; + + if ((n = optparse_option_index (p)) != argc - 1) + log_msg_exit ("flux-debug requires service argument"); + topic = xasprintf ("%s.debug", argv[n]); + + if (!(h = flux_open (NULL, 0))) + log_err_exit ("flux_open"); + if (optparse_hasopt (p, "clear")) { + op = "clr"; + } + else if (optparse_hasopt (p, "set")) { + op = "set"; + flags = strtoul (optparse_get_str (p, "set", NULL), NULL, 0); + } + else if (optparse_hasopt (p, "clearbit")) { + op = "clrbit"; + flags = strtoul (optparse_get_str (p, "clearbit", NULL), NULL, 0); + } + else if (optparse_hasopt (p, "setbit")) { + op = "setbit"; + flags = strtoul (optparse_get_str (p, "setbit", NULL), NULL, 0); + } + if (!(rpc = flux_rpcf (h, topic, FLUX_NODEID_ANY, 0, "{s:s s:i}", + "op", op, "flags", flags))) + log_err_exit ("%s", topic); + if (flux_rpc_getf (rpc, "{s:i}", "flags", &flags) < 0) + log_err_exit ("%s", topic); + printf ("0x%x\n", flags); + flux_close (h); + free (topic); + return (0); } /* - * vi:tabstop=4 shiftwidth=4 expandtab + * vi: ts=4 sw=4 expandtab */ diff --git a/src/connectors/local/local.c b/src/connectors/local/local.c index 0b6783213e42..b9b4208e0ce9 100644 --- a/src/connectors/local/local.c +++ b/src/connectors/local/local.c @@ -169,14 +169,12 @@ static int env_getint (char *name, int dflt) return s ? strtol (s, NULL, 10) : dflt; } -/* Path is interpreted as the directory containing the unix domain socket - * and broker pid. +/* Path is interpreted as the directory containing the unix domain socket. */ flux_t *connector_init (const char *path, int flags) { local_ctx_t *c = NULL; struct sockaddr_un addr; - char pidfile[PATH_MAX + 1]; char sockfile[PATH_MAX + 1]; int n, count; @@ -189,12 +187,6 @@ flux_t *connector_init (const char *path, int flags) errno = EINVAL; goto error; } - n = snprintf (pidfile, sizeof (pidfile), "%s/broker.pid", path); - if (n >= sizeof (pidfile)) { - errno = EINVAL; - goto error; - } - if (!(c = malloc (sizeof (*c)))) { errno = ENOMEM; goto error; @@ -217,6 +209,20 @@ flux_t *connector_init (const char *path, int flags) break; usleep (100*1000); } + /* read 1 byte indicating success or failure of auth */ + unsigned char e; + int rc; + rc = read (c->fd, &e, 1); + if (rc < 0) + goto error; + if (rc == 0) { + errno = ECONNRESET; + goto error; + } + if (e != 0) { + errno = e; + goto error; + } flux_msg_iobuf_init (&c->outbuf); flux_msg_iobuf_init (&c->inbuf); if (!(c->h = flux_handle_create (c, &handle_ops, flags))) diff --git a/src/modules/connector-local/local.c b/src/modules/connector-local/local.c index a5e4361536d0..21a25dee0f26 100644 --- a/src/modules/connector-local/local.c +++ b/src/modules/connector-local/local.c @@ -44,10 +44,7 @@ #include #include -#include "src/common/libutil/xzmalloc.h" #include "src/common/libutil/cleanup.h" -#include "src/common/libutil/shortjson.h" -#include "src/common/libutil/log.h" #define LISTEN_BACKLOG 5 @@ -101,29 +98,40 @@ static void client_write_cb (flux_reactor_t *r, flux_watcher_t *w, static void freectx (void *arg) { mod_local_ctx_t *ctx = arg; - zlist_destroy (&ctx->clients); - zhash_destroy (&ctx->subscriptions); - free (ctx); + if (ctx) { + zlist_destroy (&ctx->clients); + zhash_destroy (&ctx->subscriptions); + free (ctx); + } } static mod_local_ctx_t *getctx (flux_t *h) { - mod_local_ctx_t *ctx = (mod_local_ctx_t *)flux_aux_get (h, "flux::local_connector"); + mod_local_ctx_t *ctx = flux_aux_get (h, "flux::local_connector"); if (!ctx) { - ctx = xzmalloc (sizeof (*ctx)); + if (!(ctx = calloc (1, sizeof (*ctx)))) { + errno = ENOMEM; + goto error; + } ctx->h = h; if (!(ctx->reactor = flux_get_reactor (h))) - log_err_exit ("flux_get_reactor"); - if (!(ctx->clients = zlist_new ())) - oom (); - if (!(ctx->subscriptions = zhash_new ())) - oom (); + goto error; + if (!(ctx->clients = zlist_new ())) { + errno = ENOMEM; + goto error; + } + if (!(ctx->subscriptions = zhash_new ())) { + errno = ENOMEM; + goto error; + } ctx->session_owner = geteuid (); flux_aux_set (h, "flux::local_connector", ctx, freectx); } - return ctx; +error: + freectx (ctx); + return NULL; } static int set_nonblock (int fd, bool nonblock) @@ -140,53 +148,69 @@ static int set_nonblock (int fd, bool nonblock) return 0; } +static int send_auth_response (int fd, unsigned char e) +{ + return write (fd, &e, 1); +} + static client_t * client_create (mod_local_ctx_t *ctx, int fd) { client_t *c; socklen_t crlen = sizeof (c->ucred); flux_t *h = ctx->h; - c = xzmalloc (sizeof (*c)); - c->fd = fd; - if (!(c->uuid = zuuid_new ())) - oom (); + if (!(c = calloc (1, sizeof (*c)))) { + errno = ENOMEM; + goto error; + } c->ctx = ctx; - if (!(c->disconnect_notify = zhash_new ())) - oom (); - if (!(c->subscriptions = zhash_new ())) - oom (); - if (!(c->outqueue = zlist_new ())) - oom (); - if (getsockopt (fd, SOL_SOCKET, SO_PEERCRED, &c->ucred, &crlen) < 0) { - flux_log_error (h, "getsockopt SO_PEERCRED"); + c->fd = -1; + c->uuid = zuuid_new (); + c->disconnect_notify = zhash_new (); + c->subscriptions = zhash_new (); + c->outqueue = zlist_new (); + if (!c->uuid || !c->disconnect_notify || !c->subscriptions + || !c->outqueue) { + errno = ENOMEM; goto error; } - assert (crlen == sizeof (c->ucred)); + if (getsockopt (fd, SOL_SOCKET, SO_PEERCRED, &c->ucred, &crlen) < 0) + goto error; /* Deny connections by uid other than session owner for now. */ if (c->ucred.uid != ctx->session_owner) { flux_log (h, LOG_ERR, "connect by uid=%d pid=%d denied", c->ucred.uid, (int)c->ucred.pid); + errno = EPERM; goto error; } - c->inw = flux_fd_watcher_create (ctx->reactor, - fd, FLUX_POLLIN, client_read_cb, c); - c->outw = flux_fd_watcher_create (ctx->reactor, - fd, FLUX_POLLOUT, client_write_cb, c); - if (!c->inw || !c->outw) { - flux_log_error (h, "flux_fd_watcher_create"); + int *debug_flags = flux_aux_get (h, "flux::debug_flags"); + if (debug_flags && (*debug_flags & 1)) { + flux_log (h, LOG_ERR, "connect by uid=%d pid=%d denied by debug flag", + c->ucred.uid, (int)c->ucred.pid); + *debug_flags &= ~1; // one shot + errno = EPERM; goto error; } + if (!(c->inw = flux_fd_watcher_create (ctx->reactor, fd, FLUX_POLLIN, + client_read_cb, c)) != 0) + goto error; + if (!(c->outw = flux_fd_watcher_create (ctx->reactor, fd, FLUX_POLLOUT, + client_write_cb, c)) != 0) + goto error; flux_watcher_start (c->inw); flux_msg_iobuf_init (&c->inbuf); flux_msg_iobuf_init (&c->outbuf); - if (set_nonblock (c->fd, true) < 0) { - flux_log_error (h, "set_nonblock"); - goto error; - } - + if (send_auth_response (fd, 0) < 0) + goto error_noresponse; + if (set_nonblock (fd, true) < 0) + goto error_noresponse; + c->fd = fd; return (c); error: + if (send_auth_response (fd, errno) < 0) + goto error_noresponse; +error_noresponse: client_destroy (c); return NULL; } @@ -236,18 +260,28 @@ static int client_send (client_t *c, const flux_msg_t *msg) static subscription_t *subscription_create (const char *topic) { - subscription_t *sub = xzmalloc (sizeof (*sub)); - sub->topic = xstrdup (topic); + subscription_t *sub = calloc (1, sizeof (*sub)); + if (!sub) { + errno = ENOMEM; + return NULL; + } + if (!(sub->topic = strdup (topic))) { + free (sub); + errno = ENOMEM; + return NULL; + } return sub; } static void subscription_destroy (void *data) { subscription_t *sub = data; - if (sub->unsubscribe) - (void) sub->unsubscribe (sub->handle, sub->topic); - free (sub->topic); - free (sub); + if (sub) { + if (sub->unsubscribe) + (void) sub->unsubscribe (sub->handle, sub->topic); + free (sub->topic); + free (sub); + } } static int global_subscribe (mod_local_ctx_t *ctx, const char *topic) @@ -256,12 +290,17 @@ static int global_subscribe (mod_local_ctx_t *ctx, const char *topic) int rc = -1; if (!(sub = zhash_lookup (ctx->subscriptions, topic))) { + if (!(sub = subscription_create (topic))) { + flux_log_error (ctx->h, "%s: subscription_create %s", + __FUNCTION__, topic); + goto done; + } if (flux_event_subscribe (ctx->h, topic) < 0) { flux_log_error (ctx->h, "%s: flux_event_subscribe %s", __FUNCTION__, topic); + subscription_destroy (sub); goto done; } - sub = subscription_create (topic); sub->unsubscribe = (unsubscribe_f) flux_event_unsubscribe; sub->handle = ctx->h; zhash_update (ctx->subscriptions, topic, sub); @@ -298,9 +337,15 @@ static int client_subscribe (client_t *c, const char *topic) int rc = -1; if (!(sub = zhash_lookup (c->subscriptions, topic))) { - if (global_subscribe (c->ctx, topic) < 0) + if (!(sub = subscription_create (topic))) { + flux_log_error (c->ctx->h, "%s: subscription_create %s", + __FUNCTION__, topic); + goto done; + } + if (global_subscribe (c->ctx, topic) < 0) { + subscription_destroy (sub); goto done; - sub = subscription_create (topic); + } sub->unsubscribe = (unsubscribe_f) global_unsubscribe; sub->handle = c->ctx; zhash_update (c->subscriptions, topic, sub); @@ -333,22 +378,16 @@ static int client_unsubscribe (client_t *c, const char *topic) int sub_request (client_t *c, const flux_msg_t *msg, bool subscribe) { - const char *json_str, *topic; - json_object *in = NULL; + const char *topic; int rc = -1; - if (flux_request_decode (msg, NULL, &json_str) < 0) + if (flux_request_decodef (msg, NULL, "{s:s}", "topic", &topic) < 0) goto done; - if (!(in = Jfromstr (json_str)) || !Jget_str (in, "topic", &topic)) { - errno = EPROTO; - goto done; - } if (subscribe) rc = client_subscribe (c, topic); else rc = client_unsubscribe (c, topic); done: - Jput (in); return rc; } @@ -423,52 +462,64 @@ static int disconnect_update (client_t *c, const flux_msg_t *msg) goto done; if (flux_msg_get_nodeid (msg, &nodeid, &flags) < 0) goto done; - svc = xstrdup (topic); + if (!(svc = strdup (topic))) { + errno = ENOMEM; + goto done; + } if ((p = strchr (svc, '.'))) *p = '\0'; - key = xasprintf ("%s:%"PRIu32":%d", svc, nodeid, flags); + if (asprintf (&key, "%s:%"PRIu32":%d", svc, nodeid, flags) < 0) { + errno = ENOMEM; + goto done; + } if (!zhash_lookup (c->disconnect_notify, key)) { - d = xzmalloc (sizeof (*d)); + if (!(d = calloc (1, sizeof (*d)))) { + errno = ENOMEM; + goto done; + } d->c = c; d->nodeid = nodeid; d->flags = flags; - d->topic = xasprintf ("%s.disconnect", svc); + if (asprintf (&d->topic, "%s.disconnect", svc) < 0) { + free (d); + errno = ENOMEM; + goto done; + } zhash_update (c->disconnect_notify, key, d); zhash_freefn (c->disconnect_notify, key, disconnect_destroy); } rc = 0; done: - if (svc) - free (svc); - if (key) - free (key); + free (svc); + free (key); return rc; } static void client_destroy (client_t *c) { - zhash_destroy (&c->disconnect_notify); - zhash_destroy (&c->subscriptions); - if (c->uuid) + if (c) { + zhash_destroy (&c->disconnect_notify); + zhash_destroy (&c->subscriptions); zuuid_destroy (&c->uuid); - if (c->outqueue) { - flux_msg_t *msg; - while ((msg = zlist_pop (c->outqueue))) - flux_msg_destroy (msg); - zlist_destroy (&c->outqueue); - } - flux_watcher_stop (c->outw); - flux_watcher_destroy (c->outw); - flux_msg_iobuf_clean (&c->outbuf); + if (c->outqueue) { + flux_msg_t *msg; + while ((msg = zlist_pop (c->outqueue))) + flux_msg_destroy (msg); + zlist_destroy (&c->outqueue); + } + flux_watcher_stop (c->outw); + flux_watcher_destroy (c->outw); + flux_msg_iobuf_clean (&c->outbuf); - flux_watcher_stop (c->inw); - flux_watcher_destroy (c->inw); - flux_msg_iobuf_clean (&c->inbuf); + flux_watcher_stop (c->inw); + flux_watcher_destroy (c->inw); + flux_msg_iobuf_clean (&c->inbuf); - if (c->fd != -1) - close (c->fd); + if (c->fd != -1) + close (c->fd); - free (c); + free (c); + } } static void client_write_cb (flux_reactor_t *r, flux_watcher_t *w, @@ -498,24 +549,38 @@ static bool internal_request (client_t *c, const flux_msg_t *msg) flux_msg_t *rmsg = NULL; uint32_t matchtag; - if (flux_msg_get_topic (msg, &topic) < 0 - || flux_msg_get_matchtag (msg, &matchtag) < 0) - return false; - else if (!strcmp (topic, "local.sub")) + if (flux_msg_get_topic (msg, &topic) < 0) { + flux_log_error (c->ctx->h, "%s: flux_msg_get_topic", __FUNCTION__); + goto done; // drop + } + if (flux_msg_get_matchtag (msg, &matchtag) < 0) { + flux_log_error (c->ctx->h, "%s: flux_msg_get_matchtag", __FUNCTION__); + goto done; // drop + } + if (!strcmp (topic, "local.sub")) { rc = sub_request (c, msg, true); - else if (!strcmp (topic, "local.unsub")) + goto done_respond; + } + else if (!strcmp (topic, "local.unsub")) { rc = sub_request (c, msg, false); + goto done_respond; + } else - return false; - - /* Respond to client - */ - if (!(rmsg = flux_response_encode (topic, rc < 0 ? errno : 0, NULL)) - || flux_msg_set_matchtag (rmsg, matchtag) < 0) - flux_log_error (c->ctx->h, "%s: encoding response", __FUNCTION__); + return false; // no match - forward to broker - else if (client_send_nocopy (c, &rmsg) < 0) +done_respond: + if (!(rmsg = flux_response_encode (topic, rc < 0 ? errno : 0, NULL))) { + flux_log_error (c->ctx->h, "%s: flux_response_encode", __FUNCTION__); + goto done; + } + if (flux_msg_set_matchtag (rmsg, matchtag) < 0) { + flux_log_error (c->ctx->h, "%s: flux_response_set_patchtag", + __FUNCTION__); + goto done; + } + if (client_send_nocopy (c, &rmsg) < 0) flux_log_error (c->ctx->h, "%s: client_send_nocopy", __FUNCTION__); +done: flux_msg_destroy (rmsg); return true; } @@ -529,7 +594,7 @@ static void client_read_cb (flux_reactor_t *r, flux_watcher_t *w, int type; if (revents & FLUX_POLLERR) - goto disconnect; + goto error_disconnect; if (!(revents & FLUX_POLLIN)) return; /* EPROTO, ECONNRESET are normal disconnect errors @@ -543,11 +608,11 @@ static void client_read_cb (flux_reactor_t *r, flux_watcher_t *w, } if (errno != ECONNRESET && errno != EPROTO) flux_log_error (h, "flux_msg_recvfd"); - goto disconnect; + goto error_disconnect; } if (flux_msg_get_type (msg, &type) < 0) { flux_log_error (h, "flux_msg_get_type"); - goto disconnect; + goto error; } switch (type) { case FLUX_MSGTYPE_REQUEST: @@ -555,29 +620,36 @@ static void client_read_cb (flux_reactor_t *r, flux_watcher_t *w, /* insert disconnect notifier before forwarding request */ if (c->disconnect_notify && disconnect_update (c, msg) < 0) { flux_log_error (h, "disconnect_update"); - goto disconnect; + goto error_disconnect; + } + if (flux_msg_push_route (msg, zuuid_str (c->uuid)) < 0) { + flux_log_error (h, "flux_msg_push_route"); + goto error; + } + if (flux_send (h, msg, 0) < 0) { + flux_log_error (h, "%s: flux_send", __FUNCTION__); + goto error; } - if (flux_msg_push_route (msg, zuuid_str (c->uuid)) < 0) - oom (); /* FIXME */ - if (flux_send (h, msg, 0) < 0) - log_err ("%s: flux_send", __FUNCTION__); } break; case FLUX_MSGTYPE_EVENT: - if (flux_send (h, msg, 0) < 0) - log_err ("%s: flux_send", __FUNCTION__); + if (flux_send (h, msg, 0) < 0) { + flux_log_error (h, "%s: flux_send", __FUNCTION__); + goto error; + } break; default: flux_log (h, LOG_ERR, "drop unexpected %s", flux_msg_typestr (type)); - break; + goto error; } flux_msg_destroy (msg); return; -disconnect: - flux_msg_destroy (msg); +error_disconnect: zlist_remove (c->ctx->clients, c); client_destroy (c); +error: + flux_msg_destroy (msg); } /* Received response message from broker. @@ -592,10 +664,14 @@ static void response_cb (flux_t *h, flux_msg_handler_t *w, client_t *c; flux_msg_t *cpy = flux_msg_copy (msg, true); - if (!cpy) - oom (); - if (flux_msg_pop_route (cpy, &uuid) < 0) + if (!cpy) { + flux_log_error (h, "flux_msg_copy"); goto done; + } + if (flux_msg_pop_route (cpy, &uuid) < 0) { + flux_log_error (h, "flux_msg_pop_route"); + goto done; + } if (!uuid) { const char *topic = NULL; (void) flux_msg_get_topic (msg, &topic); @@ -621,8 +697,7 @@ static void response_cb (flux_t *h, flux_msg_handler_t *w, c = zlist_next (ctx->clients); } done: - if (uuid) - free (uuid); + free (uuid); flux_msg_destroy (cpy); } @@ -682,8 +757,11 @@ static void listener_cb (flux_reactor_t *r, flux_watcher_t *w, close (cfd); goto done; } - if (zlist_append (ctx->clients, c) < 0) - oom (); + if (zlist_append (ctx->clients, c) < 0) { + client_destroy (c); // closes cfd + errno = ENOMEM; + goto done; + } } if (revents & FLUX_POLLERR) { flux_log_error (h, "poll listen fd"); @@ -714,6 +792,10 @@ static int listener_init (mod_local_ctx_t *ctx, char *sockpath) flux_log_error (ctx->h, "bind"); goto error_close; } + if (chmod (sockpath, 0777) < 0) { + flux_log_error (ctx->h, "chmod"); + goto error_close; + } if (listen (fd, LISTEN_BACKLOG) < 0) { flux_log_error (ctx->h, "listen"); goto error_close; @@ -741,6 +823,8 @@ int mod_main (flux_t *h, int argc, char **argv) char *tmpdir; int rc = -1; + if (!ctx) + goto done; if (!(local_uri = flux_attr_get (h, "local-uri", NULL))) { flux_log_error (h, "flux_attr_get local-uri"); goto done; @@ -775,11 +859,12 @@ int mod_main (flux_t *h, int argc, char **argv) */ if (flux_reactor_run (ctx->reactor, 0) < 0) { flux_log_error (h, "flux_reactor_run"); - goto done; + goto done_delvec; } rc = 0; -done: +done_delvec: flux_msg_handler_delvec (htab); +done: flux_watcher_destroy (ctx->listen_w); if (ctx->listen_fd >= 0) { if (close (ctx->listen_fd) < 0) diff --git a/t/t0003-module.t b/t/t0003-module.t index b1fbf1ceab0a..b3eef27bdd57 100755 --- a/t/t0003-module.t +++ b/t/t0003-module.t @@ -132,4 +132,132 @@ test_expect_success 'module: info fails on invalid module' ' ! flux module info nosuchmodule ' +# N.B. avoid setting the actual debug bits - lets reserve LSB +TESTMOD=connector-local + +test_expect_success 'flux module debug gets debug flags' ' + FLAGS=$(flux module debug $TESTMOD) && + test "$FLAGS" = "0x0" +' +test_expect_success 'flux module debug --setbit sets individual debug flags' ' + flux module debug --setbit 0x10000 $TESTMOD && + FLAGS=$(flux module debug $TESTMOD) && + test "$FLAGS" = "0x10000" +' +test_expect_success 'flux module debug --set replaces debug flags' ' + flux module debug --set 0xff00 $TESTMOD && + FLAGS=$(flux module debug $TESTMOD) && + test "$FLAGS" = "0xff00" +' +test_expect_success 'flux module debug --clearbit clears individual debug flags' ' + flux module debug --clearbit 0x1000 $TESTMOD && + FLAGS=$(flux module debug $TESTMOD) && + test "$FLAGS" = "0xef00" +' +test_expect_success 'flux module debug --clear clears debug flags' ' + flux module debug --clear $TESTMOD && + FLAGS=$(flux module debug $TESTMOD) && + test "$FLAGS" = "0x0" +' + +# test stats + +test_expect_success 'flux module stats gets comms statistics' ' + flux module stats $TESTMOD >comms.stats && + grep -q "#request (tx)" comms.stats && + grep -q "#request (rx)" comms.stats && + grep -q "#response (tx)" comms.stats && + grep -q "#response (rx)" comms.stats && + grep -q "#event (tx)" comms.stats && + grep -q "#event (rx)" comms.stats && + grep -q "#keepalive (tx)" comms.stats && + grep -q "#keepalive (rx)" comms.stats +' + +test_expect_success 'flux module stats --parse "#event (tx)" counts events' ' + EVENT_TX=$(flux module stats --parse "#event (tx)" $TESTMOD) && + flux event pub xyz && + EVENT_TX2=$(flux module stats --parse "#event (tx)" $TESTMOD) && + test "$EVENT_TX" = $((${EVENT_TX2}-1)) +' + +test_expect_success 'flux module stats --clear works' ' + flux event pub xyz && + flux module stats --clear $TESTMOD + EVENT_TX2=$(flux module stats --parse "#event (tx)" $TESTMOD) && + test "$EVENT_TX" = 0 +' + +test_expect_success 'flux module stats --clear-all works' ' + flux event pub xyz && + flux module stats --clear-all $TESTMOD + EVENT_TX2=$(flux module stats --parse "#event (tx)" $TESTMOD) && + test "$EVENT_TX" = 0 +' + +test_expect_success 'flux module stats --scale works' ' + flux event pub xyz && + EVENT_TX=$(flux module stats --parse "#event (tx)" $TESTMOD) && + EVENT_TX2=$(flux module stats --parse "#event (tx)" --scale=2 $TESTMOD) && + test "$EVENT_TX2" -eq $((${EVENT_TX}*2)) +' + +test_expect_success 'flux module stats --rusage works' ' + flux module stats --rusage $TESTMOD >rusage.stats && + grep -q utime rusage.stats && + grep -q stime rusage.stats && + grep -q maxrss rusage.stats && + grep -q ixrss rusage.stats && + grep -q idrss rusage.stats && + grep -q isrss rusage.stats && + grep -q minflt rusage.stats && + grep -q majflt rusage.stats && + grep -q nswap rusage.stats && + grep -q inblock rusage.stats && + grep -q oublock rusage.stats && + grep -q msgsnd rusage.stats && + grep -q msgrcv rusage.stats && + grep -q nsignals rusage.stats && + grep -q nvcsw rusage.stats && + grep -q nivcsw rusage.stats +' + +test_expect_success 'flux module stats --rusage --parse maxrss works' ' + RSS=$(flux module stats --rusage --parse maxrss $TESTMOD) && + test "$RSS" -gt 0 +' + +# try to hit some error cases + +test_expect_success 'flux module with no arguments prints usage and fails' ' + ! flux module 2>noargs.help && + grep -q Usage: noargs.help +' + +test_expect_success 'flux module -h lists subcommands' ' + ! flux module -h 2>module.help && + grep -q list module.help && + grep -q remove module.help && + grep -q load module.help && + grep -q info module.help && + grep -q stats module.help && + grep -q debug module.help +' + +test_expect_success 'flux module load "noexist" fails' ' + ! flux module load noexist 2>noexist.out && + grep -q "not found" noexist.out +' + +test_expect_success 'flux module detects bad nodeset' ' + ! flux module load -r smurf kvs 2>badns-load.out && + grep -q "target nodeset" badns-load.out + ! flux module remove -r smurf kvs 2>badns-remove.out && + grep -q "target nodeset" badns-remove.out + ! flux module list -r smurf 2>badns-list.out && + grep -q "target nodeset" badns-list.out +' + + + test_done diff --git a/t/t0011-content-cache.t b/t/t0011-content-cache.t index 4f0e956cdd39..45a96d75da7c 100755 --- a/t/t0011-content-cache.t +++ b/t/t0011-content-cache.t @@ -21,7 +21,7 @@ HASHFUN=`flux getattr content-hash` test_expect_success 'store 100 blobs on rank 0' ' for i in `seq 0 99`; do echo test$i | \ flux content store >/dev/null; done && - TOTAL=`flux comms-stats --type int --parse count content` && + TOTAL=`flux module stats --type int --parse count content` && test $TOTAL -ge 100 ' @@ -136,15 +136,15 @@ test_expect_success 'store on all ranks can be retrieved from rank 0' ' # Backing store is not loaded so all entries on rank 0 should be dirty test_expect_success 'rank 0 cache is all dirty' ' - DIRTY=`flux comms-stats --type int --parse dirty content` && - TOTAL=`flux comms-stats --type int --parse count content` && + DIRTY=`flux module stats --type int --parse dirty content` && + TOTAL=`flux module stats --type int --parse count content` && test $DIRTY -eq $TOTAL ' # Backing store is not loaded so all entries on rank 0 should be valid test_expect_success 'rank 0 cache is all valid' ' - VALID=`flux comms-stats --type int --parse valid content` && - TOTAL=`flux comms-stats --type int --parse count content` && + VALID=`flux module stats --type int --parse valid content` && + TOTAL=`flux module stats --type int --parse count content` && test $VALID -eq $TOTAL ' diff --git a/t/t0012-content-sqlite.t b/t/t0012-content-sqlite.t index 469ef85219cc..47321a12c2a5 100755 --- a/t/t0012-content-sqlite.t +++ b/t/t0012-content-sqlite.t @@ -33,7 +33,7 @@ test_expect_success 'load content-sqlite module on rank 0' ' test_expect_success 'store 100 blobs on rank 0' ' store_junk test 100 && - TOTAL=`flux comms-stats --type int --parse count content` && + TOTAL=`flux module stats --type int --parse count content` && test $TOTAL -ge 100 ' @@ -114,13 +114,13 @@ test_expect_success 'load and verify 1m blob on all ranks' ' test_expect_success 'flush rank 0 cache' ' run_timeout 10 flux content flush && - NDIRTY=`flux comms-stats --type int --parse dirty content` && + NDIRTY=`flux module stats --type int --parse dirty content` && test $NDIRTY -eq 0 ' test_expect_success 'drop rank 0 cache' ' flux content dropcache && - ECOUNT=`flux comms-stats --type int --parse count content` && + ECOUNT=`flux module stats --type int --parse count content` && test $ECOUNT -eq 0 ' @@ -129,8 +129,8 @@ test_expect_success 'unload content-sqlite module' ' ' test_expect_success 'check that content returned dirty' ' - NDIRTY=`flux comms-stats --type int --parse dirty content` && - ECOUNT=`flux comms-stats --type int --parse count content` && + NDIRTY=`flux module stats --type int --parse dirty content` && + ECOUNT=`flux module stats --type int --parse count content` && test $NDIRTY -eq $ECOUNT ' @@ -161,7 +161,7 @@ test_expect_success 'load content-sqlite module on rank 0' ' test_expect_success 'flush rank 0 cache' ' run_timeout 10 flux content flush && - NDIRTY=`flux comms-stats --type int --parse dirty content` && + NDIRTY=`flux module stats --type int --parse dirty content` && test $NDIRTY -eq 0 ' @@ -187,19 +187,19 @@ test_expect_success 'exercise batching of synchronous flush to backing store' ' flux setattr content-flush-batch-limit 5 && store_junk loadunload 200 && flux content flush && - NDIRTY=`flux comms-stats --type int --parse dirty content` && + NDIRTY=`flux module stats --type int --parse dirty content` && test ${NDIRTY} -eq 0 ' test_expect_success 'exercise batching of asynchronous flush to backing store' ' - OLD_COUNT=`flux comms-stats --type int --parse count content` && + OLD_COUNT=`flux module stats --type int --parse count content` && flux module remove --rank 0 content-sqlite && flux module load --rank 0 content-sqlite && flux module remove --rank 0 content-sqlite && flux module load --rank 0 content-sqlite && flux module remove --rank 0 content-sqlite && flux module load --rank 0 content-sqlite && - NEW_COUNT=`flux comms-stats --type int --parse count content` && + NEW_COUNT=`flux module stats --type int --parse count content` && test $OLD_COUNT -le $NEW_COUNT ' diff --git a/t/t0017-security.t b/t/t0017-security.t index e4947fea1624..f40b279f6e54 100755 --- a/t/t0017-security.t +++ b/t/t0017-security.t @@ -10,4 +10,11 @@ test_expect_success 'verify fake munge encoding of messages' ' ${FLUX_BUILD_DIR}/src/test/tmunge --fake ' +test_expect_success 'simulated local connector auth failure returns EPERM' ' + flux comms info && + flux module debug --set 1 connector-local && + test_must_fail flux comms info 2>authfail.out && + grep -q "Operation not permitted" authfail.out +' + test_done diff --git a/t/t1006-apidisconnect.t b/t/t1006-apidisconnect.t index 1cb6e94afff5..b50cc850f6f8 100755 --- a/t/t1006-apidisconnect.t +++ b/t/t1006-apidisconnect.t @@ -12,7 +12,7 @@ test_under_flux ${SIZE} kvs check_kvs_watchers() { local i n for i in `seq 1 $2`; do - n=`flux comms-stats --parse "#watchers" kvs` + n=`flux module stats --parse "#watchers" kvs` echo "Try $i: $n" test $n -eq $1 && return 0 sleep 1 @@ -22,7 +22,7 @@ check_kvs_watchers() { test_expect_success 'kvs watcher gets disconnected on client exit' ' - before_watchers=`flux comms-stats --parse "#watchers" kvs` && + before_watchers=`flux module stats --parse "#watchers" kvs` && echo "waiters before test: $before_watchers" && test_expect_code 142 run_timeout 1 flux kvs watch noexist && check_kvs_watchers $before_watchers 3 diff --git a/t/t2000-wreck.t b/t/t2000-wreck.t index 14ef02f38294..d0df9182037b 100755 --- a/t/t2000-wreck.t +++ b/t/t2000-wreck.t @@ -387,12 +387,12 @@ test_expect_success 'wreck job is linked in lwj-complete after failure' ' test_expect_success 'wreck: no KVS watchers leaked after 10 jobs' ' flux exec -r 1-$(($SIZE-1)) -l \ - flux comms-stats --parse "#watchers" kvs | sort -n >w.before && + flux module stats --parse "#watchers" kvs | sort -n >w.before && for i in `seq 1 10`; do flux wreckrun --ntasks $SIZE /bin/true done && flux exec -r 1-$(($SIZE-1)) -l \ - flux comms-stats --parse "#watchers" kvs | sort -n >w.after && + flux module stats --parse "#watchers" kvs | sort -n >w.after && test_cmp w.before w.after '