From 3e3e5da103dbeb42c23e3f8ca9b1f7cc92ed8ebf Mon Sep 17 00:00:00 2001 From: "Christopher J. Morrone" Date: Fri, 6 Oct 2017 15:43:19 -0700 Subject: [PATCH 1/3] broker: Minor refactoring to improve code isolation Reduce the number of times that the giant semi-global "ctx" is passed to functions that only need some small subset of that data. --- src/broker/broker.c | 94 ++++++++++++++++++++++++--------------------- 1 file changed, 50 insertions(+), 44 deletions(-) diff --git a/src/broker/broker.c b/src/broker/broker.c index 220f5fe44fe8..4f8adab7338f 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -187,13 +187,15 @@ static int create_persistdir (attr_t *attrs, uint32_t rank); static int create_rundir (attr_t *attrs); static int create_dummyattrs (flux_t *h, uint32_t rank, uint32_t size); -static char *calc_endpoint (broker_ctx_t *ctx, const char *endpoint); +static char *calc_endpoint (attr_t *attrs, const char *endpoint); -static int boot_pmi (broker_ctx_t *ctx, double *elapsed_sec); +static int boot_pmi (overlay_t *overlay, attr_t *attrs, int tbon_k, + uint32_t *rank_out, uint32_t *size_out, + double *elapsed_sec); static int attr_get_overlay (const char *name, const char **val, void *arg); -static void init_attrs (broker_ctx_t *ctx); +static void init_attrs (attr_t *attrs, pid_t pid); static const struct flux_handle_ops broker_handle_ops; @@ -281,7 +283,7 @@ int main (int argc, char *argv[]) ctx.pid = getpid(); - init_attrs (&ctx); + init_attrs (ctx.attrs, ctx.pid); if (!(ctx.sm = subprocess_manager_create ())) oom (); @@ -430,7 +432,8 @@ int main (int argc, char *argv[]) /* Boot with PMI. */ double pmi_elapsed_sec; - if (boot_pmi (&ctx, &pmi_elapsed_sec) < 0) + if (boot_pmi (ctx.overlay, ctx.attrs, ctx.tbon.k, + &ctx.rank, &ctx.size, &pmi_elapsed_sec) < 0) log_msg_exit ("bootstrap failed"); assert (ctx.rank != FLUX_NODEID_ANY); @@ -759,31 +762,31 @@ static void init_attrs_from_environment (attr_t *attrs) } } -static void init_attrs_overlay (broker_ctx_t *ctx) +static void init_attrs_overlay (attr_t *attrs) { - char *tbonendpoint = "tbon.endpoint"; - char *mcastendpoint = "mcast.endpoint"; + const char *tbonendpoint = "tbon.endpoint"; + const char *mcastendpoint = "mcast.endpoint"; - if (attr_add (ctx->attrs, + if (attr_add (attrs, tbonendpoint, "tcp://%h:*", 0) < 0) log_err_exit ("attr_add %s", tbonendpoint); - if (attr_add (ctx->attrs, + if (attr_add (attrs, mcastendpoint, "tbon", 0) < 0) log_err_exit ("attr_add %s", mcastendpoint); } -static void init_attrs_broker_pid (broker_ctx_t *ctx) +static void init_attrs_broker_pid (attr_t *attrs, pid_t pid) { char *attrname = "broker.pid"; char *pidval; - pidval = xasprintf ("%u", ctx->pid); - if (attr_add (ctx->attrs, + pidval = xasprintf ("%u", pid); + if (attr_add (attrs, attrname, pidval, FLUX_ATTRFLAG_IMMUTABLE) < 0) @@ -791,16 +794,16 @@ static void init_attrs_broker_pid (broker_ctx_t *ctx) free (pidval); } -static void init_attrs (broker_ctx_t *ctx) +static void init_attrs (attr_t *attrs, pid_t pid) { /* Initialize config attrs from environment set up by flux(1) */ - init_attrs_from_environment (ctx->attrs); + init_attrs_from_environment (attrs); /* Initialize other miscellaneous attrs */ - init_attrs_overlay (ctx); - init_attrs_broker_pid (ctx); + init_attrs_overlay (attrs); + init_attrs_broker_pid (attrs, pid); } static void hello_update_cb (hello_t *hello, void *arg) @@ -1037,7 +1040,7 @@ static int create_persistdir (attr_t *attrs, uint32_t rank) * * Caller is responsible for freeing memory of returned value. */ -static char * calc_endpoint (broker_ctx_t *ctx, const char *endpoint) +static char * calc_endpoint (attr_t *attrs, const char *endpoint) { char ipaddr[HOST_NAME_MAX + 1]; char *ptr, *buf, *rv = NULL; @@ -1060,7 +1063,7 @@ static char * calc_endpoint (broker_ctx_t *ctx, const char *endpoint) len += strlen (ipaddr); } else if (*ptr == 'B') { - if (attr_get (ctx->attrs, "broker.rundir", &rundir, NULL) < 0) { + if (attr_get (attrs, "broker.rundir", &rundir, NULL) < 0) { log_msg ("broker.rundir attribute is not set"); goto done; } @@ -1101,7 +1104,9 @@ static char * calc_endpoint (broker_ctx_t *ctx, const char *endpoint) return (rv); } -static int boot_pmi (broker_ctx_t *ctx, double *elapsed_sec) +static int boot_pmi (overlay_t *overlay, attr_t *attrs, int tbon_k, + uint32_t *rank_out, uint32_t *size_out, + double *elapsed_sec) { int spawned, size, rank, appnum; int relay_rank = -1, parent_rank; @@ -1137,25 +1142,26 @@ static int boot_pmi (broker_ctx_t *ctx, double *elapsed_sec) log_msg ("PMI_Get_rank: %s", pmi_strerror (e)); goto done; } + *rank_out = (uint32_t)rank; if ((e = PMI_Get_appnum (&appnum)) != PMI_SUCCESS) { log_msg ("PMI_Get_appnum: %s", pmi_strerror (e)); goto done; } - ctx->rank = rank; - ctx->size = size; - overlay_set_rank (ctx->overlay, ctx->rank); + *size_out = (uint32_t)size; + + overlay_set_rank (overlay, (uint32_t)rank); /* Get id string. */ - if (attr_get (ctx->attrs, "session-id", NULL, NULL) < 0) { + if (attr_get (attrs, "session-id", NULL, NULL) < 0) { id = xasprintf ("%d", appnum); - if (attr_add (ctx->attrs, "session-id", id, FLUX_ATTRFLAG_IMMUTABLE) < 0) + if (attr_add (attrs, "session-id", id, FLUX_ATTRFLAG_IMMUTABLE) < 0) goto done; } /* Initialize rundir */ - if (create_rundir (ctx->attrs) < 0) { + if (create_rundir (attrs) < 0) { log_err ("could not initialize rundir"); goto done; } @@ -1163,34 +1169,34 @@ static int boot_pmi (broker_ctx_t *ctx, double *elapsed_sec) /* Set TBON endpoint and mcast endpoint based on user settings */ - if (attr_get (ctx->attrs, "tbon.endpoint", &attrtbonendpoint, NULL) < 0) { + if (attr_get (attrs, "tbon.endpoint", &attrtbonendpoint, NULL) < 0) { log_err ("tbon.endpoint is not set"); goto done; } - if (!(tbonendpoint = calc_endpoint (ctx, attrtbonendpoint))) { + if (!(tbonendpoint = calc_endpoint (attrs, attrtbonendpoint))) { log_msg ("calc_endpoint error"); goto done; } - if (attr_set (ctx->attrs, "tbon.endpoint", tbonendpoint, true) < 0) { + if (attr_set (attrs, "tbon.endpoint", tbonendpoint, true) < 0) { log_err ("tbon.endpoint could not be set"); goto done; } - overlay_set_child (ctx->overlay, tbonendpoint); + overlay_set_child (overlay, tbonendpoint); - if (attr_get (ctx->attrs, "mcast.endpoint", &attrmcastendpoint, NULL) < 0) { + if (attr_get (attrs, "mcast.endpoint", &attrmcastendpoint, NULL) < 0) { log_err ("mcast.endpoint is not set"); goto done; } - if (!(mcastendpoint = calc_endpoint (ctx, attrmcastendpoint))) { + if (!(mcastendpoint = calc_endpoint (attrs, attrmcastendpoint))) { log_msg ("calc_endpoint error"); goto done; } - if (attr_set (ctx->attrs, "mcast.endpoint", mcastendpoint, true) < 0) { + if (attr_set (attrs, "mcast.endpoint", mcastendpoint, true) < 0) { log_err ("mcast.endpoint could not be set"); goto done; } @@ -1217,17 +1223,17 @@ static int boot_pmi (broker_ctx_t *ctx, double *elapsed_sec) for (i = 0; i < clique_size; i++) if (relay_rank == -1 || clique_ranks[i] < relay_rank) relay_rank = clique_ranks[i]; - if (relay_rank >= 0 && ctx->rank == relay_rank) { + if (relay_rank >= 0 && rank == relay_rank) { const char *rundir; char *relayfile = NULL; - if (attr_get (ctx->attrs, "broker.rundir", &rundir, NULL) < 0) { + if (attr_get (attrs, "broker.rundir", &rundir, NULL) < 0) { log_msg ("broker.rundir attribute is not set"); goto done; } relayfile = xasprintf ("%s/relay", rundir); - overlay_set_relay (ctx->overlay, "ipc://%s", relayfile); + overlay_set_relay (overlay, "ipc://%s", relayfile); cleanup_push_string (cleanup_file, relayfile); free (relayfile); } @@ -1260,14 +1266,14 @@ static int boot_pmi (broker_ctx_t *ctx, double *elapsed_sec) /* Bind to addresses to expand URI wildcards, so we can exchange * the real addresses. */ - if (overlay_bind (ctx->overlay) < 0) { + if (overlay_bind (overlay) < 0) { log_err ("overlay_bind failed"); /* function is idempotent */ goto done; } /* Write the URI of downstream facing socket under the rank (if any). */ - if ((child_uri = overlay_get_child (ctx->overlay))) { + if ((child_uri = overlay_get_child (overlay))) { if (snprintf (key, key_len, "cmbd.%d.uri", rank) >= key_len) { log_msg ("pmi key string overflow"); goto done; @@ -1286,7 +1292,7 @@ static int boot_pmi (broker_ctx_t *ctx, double *elapsed_sec) * (if any). */ if (strcasecmp (mcastendpoint, "tbon") - && (relay_uri = overlay_get_relay (ctx->overlay))) { + && (relay_uri = overlay_get_relay (overlay))) { if (snprintf (key, key_len, "cmbd.%d.relay", rank) >= key_len) { log_msg ("pmi key string overflow"); goto done; @@ -1314,8 +1320,8 @@ static int boot_pmi (broker_ctx_t *ctx, double *elapsed_sec) /* Read the uri of our parent, after computing its rank */ - if (ctx->rank > 0) { - parent_rank = kary_parentof (ctx->tbon.k, ctx->rank); + if (rank > 0) { + parent_rank = kary_parentof (tbon_k, (uint32_t)rank); if (snprintf (key, key_len, "cmbd.%d.uri", parent_rank) >= key_len) { log_msg ("pmi key string overflow"); goto done; @@ -1324,7 +1330,7 @@ static int boot_pmi (broker_ctx_t *ctx, double *elapsed_sec) log_msg ("pmi_kvs_get: %s", pmi_strerror (e)); goto done; } - overlay_set_parent (ctx->overlay, "%s", val); + overlay_set_parent (overlay, "%s", val); } /* Event distribution (four configurations): @@ -1352,9 +1358,9 @@ static int boot_pmi (broker_ctx_t *ctx, double *elapsed_sec) log_msg ("PMI_KVS_Get: %s", pmi_strerror (e)); goto done; } - overlay_set_event (ctx->overlay, "%s", val); + overlay_set_event (overlay, "%s", val); } else - overlay_set_event (ctx->overlay, mcastendpoint); + overlay_set_event (overlay, mcastendpoint); } if ((e = PMI_Barrier ()) != PMI_SUCCESS) { log_msg ("PMI_Barrier: %s", pmi_strerror (e)); From ac97ff3d2be19b3490d1c0306a8eeb153d09abca Mon Sep 17 00:00:00 2001 From: "Christopher J. Morrone" Date: Tue, 10 Oct 2017 11:48:36 -0700 Subject: [PATCH 2/3] broker: Remove dead code This attribute check for "broker.rundir" would appear to be redundant. create_rundir() will return an error if the broker.rundir attribute is not set, and main() will exit long before this check occurs. It does not appear that anything else would change broker.rundir later. --- src/broker/broker.c | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/broker/broker.c b/src/broker/broker.c index 4f8adab7338f..079f84bbf4b4 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -595,13 +595,6 @@ int main (int argc, char *argv[]) if (overlay_connect (ctx.overlay) < 0) log_err_exit ("overlay_connect"); - { - const char *rundir; - if (attr_get (ctx.attrs, "broker.rundir", &rundir, NULL) < 0) { - log_msg_exit ("broker.rundir attribute is not set"); - } - } - shutdown_set_handle (ctx.shutdown, ctx.h); shutdown_set_callback (ctx.shutdown, shutdown_cb, &ctx); From a89023f31c01b1890e4868a721ed35325d69e4d9 Mon Sep 17 00:00:00 2001 From: "Christopher J. Morrone" Date: Tue, 10 Oct 2017 13:50:55 -0700 Subject: [PATCH 3/3] broker: Move command line parsing into a function --- src/broker/broker.c | 141 +++++++++++++++++++++++--------------------- 1 file changed, 74 insertions(+), 67 deletions(-) diff --git a/src/broker/broker.c b/src/broker/broker.c index 079f84bbf4b4..a0eaa1508532 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -230,6 +230,79 @@ static void usage (void) exit (1); } +void parse_command_line_arguments(int argc, char *argv[], + broker_ctx_t *ctx, int *sec_typemask) +{ + int c; + int e; + char *endptr; + + while ((c = getopt_long (argc, argv, OPTIONS, longopts, NULL)) != -1) { + switch (c) { + case 's': /* --security=MODE */ + if (!strcmp (optarg, "none")) { + *sec_typemask = 0; + } else if (!strcmp (optarg, "plain")) { + *sec_typemask |= FLUX_SEC_TYPE_PLAIN; + *sec_typemask &= ~FLUX_SEC_TYPE_CURVE; + } else if (!strcmp (optarg, "curve")) { + *sec_typemask |= FLUX_SEC_TYPE_CURVE; + *sec_typemask &= ~FLUX_SEC_TYPE_PLAIN; + } else { + log_msg_exit ("--security arg must be none|plain|curve"); + } + break; + case 'v': /* --verbose */ + ctx->verbose = true; + break; + case 'q': /* --quiet */ + ctx->quiet = true; + break; + case 'X': /* --module-path PATH */ + if (attr_set (ctx->attrs, "conf.module_path", optarg, true) < 0) + log_err_exit ("setting conf.module_path attribute"); + break; + case 'k': /* --k-ary k */ + errno = 0; + ctx->tbon.k = strtoul (optarg, &endptr, 10); + if (errno || *endptr != '\0') + log_err_exit ("k-ary '%s'", optarg); + if (ctx->tbon.k < 1) + usage (); + break; + case 'H': /* --heartrate SECS */ + if (heartbeat_set_ratestr (ctx->heartbeat, optarg) < 0) + log_err_exit ("heartrate `%s'", optarg); + break; + case 'g': /* --shutdown-grace SECS */ + errno = 0; + ctx->shutdown_grace = strtod (optarg, &endptr); + if (errno || *endptr != '\0') + log_err_exit ("shutdown-grace '%s'", optarg); + if (ctx->shutdown_grace < 0) + usage (); + break; + case 'S': { /* --setattr ATTR=VAL */ + char *val, *attr = xstrdup (optarg); + if ((val = strchr (attr, '='))) + *val++ = '\0'; + if (attr_add (ctx->attrs, attr, val, 0) < 0) + if (attr_set (ctx->attrs, attr, val, true) < 0) + log_err_exit ("setattr %s=%s", attr, val); + free (attr); + break; + } + default: + usage (); + } + } + if (optind < argc) { + if ((e = argz_create (argv + optind, &ctx->init_shell_cmd, + &ctx->init_shell_cmd_len)) != 0) + log_errn_exit (e, "argz_create"); + } +} + static int setup_profiling (const char *program, int rank) { #if HAVE_CALIPER @@ -248,12 +321,9 @@ static int setup_profiling (const char *program, int rank) int main (int argc, char *argv[]) { - int c; broker_ctx_t ctx; zlist_t *sigwatchers; int sec_typemask = FLUX_SEC_TYPE_CURVE | FLUX_SEC_TYPE_MUNGE; - int e; - char *endptr; sigset_t old_sigmask; struct sigaction old_sigact_int; struct sigaction old_sigact_term; @@ -289,70 +359,7 @@ int main (int argc, char *argv[]) oom (); subprocess_manager_set (ctx.sm, SM_WAIT_FLAGS, WNOHANG); - while ((c = getopt_long (argc, argv, OPTIONS, longopts, NULL)) != -1) { - switch (c) { - case 's': /* --security=MODE */ - if (!strcmp (optarg, "none")) { - sec_typemask = 0; - } else if (!strcmp (optarg, "plain")) { - sec_typemask |= FLUX_SEC_TYPE_PLAIN; - sec_typemask &= ~FLUX_SEC_TYPE_CURVE; - } else if (!strcmp (optarg, "curve")) { - sec_typemask |= FLUX_SEC_TYPE_CURVE; - sec_typemask &= ~FLUX_SEC_TYPE_PLAIN; - } else { - log_msg_exit ("--security arg must be none|plain|curve"); - } - break; - case 'v': /* --verbose */ - ctx.verbose = true; - break; - case 'q': /* --quiet */ - ctx.quiet = true; - break; - case 'X': /* --module-path PATH */ - if (attr_set (ctx.attrs, "conf.module_path", optarg, true) < 0) - log_err_exit ("setting conf.module_path attribute"); - break; - case 'k': /* --k-ary k */ - errno = 0; - ctx.tbon.k = strtoul (optarg, &endptr, 10); - if (errno || *endptr != '\0') - log_err_exit ("k-ary '%s'", optarg); - if (ctx.tbon.k < 1) - usage (); - break; - case 'H': /* --heartrate SECS */ - if (heartbeat_set_ratestr (ctx.heartbeat, optarg) < 0) - log_err_exit ("heartrate `%s'", optarg); - break; - case 'g': /* --shutdown-grace SECS */ - errno = 0; - ctx.shutdown_grace = strtod (optarg, &endptr); - if (errno || *endptr != '\0') - log_err_exit ("shutdown-grace '%s'", optarg); - if (ctx.shutdown_grace < 0) - usage (); - break; - case 'S': { /* --setattr ATTR=VAL */ - char *val, *attr = xstrdup (optarg); - if ((val = strchr (attr, '='))) - *val++ = '\0'; - if (attr_add (ctx.attrs, attr, val, 0) < 0) - if (attr_set (ctx.attrs, attr, val, true) < 0) - log_err_exit ("setattr %s=%s", attr, val); - free (attr); - break; - } - default: - usage (); - } - } - if (optind < argc) { - if ((e = argz_create (argv + optind, &ctx.init_shell_cmd, - &ctx.init_shell_cmd_len)) != 0) - log_errn_exit (e, "argz_create"); - } + parse_command_line_arguments(argc, argv, &ctx, &sec_typemask); /* Record the instance owner: the effective uid of the broker. * Set default rolemask for messages sent with flux_send()