diff --git a/src/broker/broker.c b/src/broker/broker.c index 220f5fe44fe8..a0eaa1508532 100644 --- a/src/broker/broker.c +++ b/src/broker/broker.c @@ -187,13 +187,15 @@ static int create_persistdir (attr_t *attrs, uint32_t rank); static int create_rundir (attr_t *attrs); static int create_dummyattrs (flux_t *h, uint32_t rank, uint32_t size); -static char *calc_endpoint (broker_ctx_t *ctx, const char *endpoint); +static char *calc_endpoint (attr_t *attrs, const char *endpoint); -static int boot_pmi (broker_ctx_t *ctx, double *elapsed_sec); +static int boot_pmi (overlay_t *overlay, attr_t *attrs, int tbon_k, + uint32_t *rank_out, uint32_t *size_out, + double *elapsed_sec); static int attr_get_overlay (const char *name, const char **val, void *arg); -static void init_attrs (broker_ctx_t *ctx); +static void init_attrs (attr_t *attrs, pid_t pid); static const struct flux_handle_ops broker_handle_ops; @@ -228,6 +230,79 @@ static void usage (void) exit (1); } +void parse_command_line_arguments(int argc, char *argv[], + broker_ctx_t *ctx, int *sec_typemask) +{ + int c; + int e; + char *endptr; + + while ((c = getopt_long (argc, argv, OPTIONS, longopts, NULL)) != -1) { + switch (c) { + case 's': /* --security=MODE */ + if (!strcmp (optarg, "none")) { + *sec_typemask = 0; + } else if (!strcmp (optarg, "plain")) { + *sec_typemask |= FLUX_SEC_TYPE_PLAIN; + *sec_typemask &= ~FLUX_SEC_TYPE_CURVE; + } else if (!strcmp (optarg, "curve")) { + *sec_typemask |= FLUX_SEC_TYPE_CURVE; + *sec_typemask &= ~FLUX_SEC_TYPE_PLAIN; + } else { + log_msg_exit ("--security arg must be none|plain|curve"); + } + break; + case 'v': /* --verbose */ + ctx->verbose = true; + break; + case 'q': /* --quiet */ + ctx->quiet = true; + break; + case 'X': /* --module-path PATH */ + if (attr_set (ctx->attrs, "conf.module_path", optarg, true) < 0) + log_err_exit ("setting conf.module_path attribute"); + break; + case 'k': /* --k-ary k */ + errno = 0; + ctx->tbon.k = strtoul (optarg, &endptr, 10); + if (errno || *endptr != '\0') + log_err_exit ("k-ary '%s'", optarg); + if (ctx->tbon.k < 1) + usage (); + break; + case 'H': /* --heartrate SECS */ + if (heartbeat_set_ratestr (ctx->heartbeat, optarg) < 0) + log_err_exit ("heartrate `%s'", optarg); + break; + case 'g': /* --shutdown-grace SECS */ + errno = 0; + ctx->shutdown_grace = strtod (optarg, &endptr); + if (errno || *endptr != '\0') + log_err_exit ("shutdown-grace '%s'", optarg); + if (ctx->shutdown_grace < 0) + usage (); + break; + case 'S': { /* --setattr ATTR=VAL */ + char *val, *attr = xstrdup (optarg); + if ((val = strchr (attr, '='))) + *val++ = '\0'; + if (attr_add (ctx->attrs, attr, val, 0) < 0) + if (attr_set (ctx->attrs, attr, val, true) < 0) + log_err_exit ("setattr %s=%s", attr, val); + free (attr); + break; + } + default: + usage (); + } + } + if (optind < argc) { + if ((e = argz_create (argv + optind, &ctx->init_shell_cmd, + &ctx->init_shell_cmd_len)) != 0) + log_errn_exit (e, "argz_create"); + } +} + static int setup_profiling (const char *program, int rank) { #if HAVE_CALIPER @@ -246,12 +321,9 @@ static int setup_profiling (const char *program, int rank) int main (int argc, char *argv[]) { - int c; broker_ctx_t ctx; zlist_t *sigwatchers; int sec_typemask = FLUX_SEC_TYPE_CURVE | FLUX_SEC_TYPE_MUNGE; - int e; - char *endptr; sigset_t old_sigmask; struct sigaction old_sigact_int; struct sigaction old_sigact_term; @@ -281,76 +353,13 @@ int main (int argc, char *argv[]) ctx.pid = getpid(); - init_attrs (&ctx); + init_attrs (ctx.attrs, ctx.pid); if (!(ctx.sm = subprocess_manager_create ())) oom (); subprocess_manager_set (ctx.sm, SM_WAIT_FLAGS, WNOHANG); - while ((c = getopt_long (argc, argv, OPTIONS, longopts, NULL)) != -1) { - switch (c) { - case 's': /* --security=MODE */ - if (!strcmp (optarg, "none")) { - sec_typemask = 0; - } else if (!strcmp (optarg, "plain")) { - sec_typemask |= FLUX_SEC_TYPE_PLAIN; - sec_typemask &= ~FLUX_SEC_TYPE_CURVE; - } else if (!strcmp (optarg, "curve")) { - sec_typemask |= FLUX_SEC_TYPE_CURVE; - sec_typemask &= ~FLUX_SEC_TYPE_PLAIN; - } else { - log_msg_exit ("--security arg must be none|plain|curve"); - } - break; - case 'v': /* --verbose */ - ctx.verbose = true; - break; - case 'q': /* --quiet */ - ctx.quiet = true; - break; - case 'X': /* --module-path PATH */ - if (attr_set (ctx.attrs, "conf.module_path", optarg, true) < 0) - log_err_exit ("setting conf.module_path attribute"); - break; - case 'k': /* --k-ary k */ - errno = 0; - ctx.tbon.k = strtoul (optarg, &endptr, 10); - if (errno || *endptr != '\0') - log_err_exit ("k-ary '%s'", optarg); - if (ctx.tbon.k < 1) - usage (); - break; - case 'H': /* --heartrate SECS */ - if (heartbeat_set_ratestr (ctx.heartbeat, optarg) < 0) - log_err_exit ("heartrate `%s'", optarg); - break; - case 'g': /* --shutdown-grace SECS */ - errno = 0; - ctx.shutdown_grace = strtod (optarg, &endptr); - if (errno || *endptr != '\0') - log_err_exit ("shutdown-grace '%s'", optarg); - if (ctx.shutdown_grace < 0) - usage (); - break; - case 'S': { /* --setattr ATTR=VAL */ - char *val, *attr = xstrdup (optarg); - if ((val = strchr (attr, '='))) - *val++ = '\0'; - if (attr_add (ctx.attrs, attr, val, 0) < 0) - if (attr_set (ctx.attrs, attr, val, true) < 0) - log_err_exit ("setattr %s=%s", attr, val); - free (attr); - break; - } - default: - usage (); - } - } - if (optind < argc) { - if ((e = argz_create (argv + optind, &ctx.init_shell_cmd, - &ctx.init_shell_cmd_len)) != 0) - log_errn_exit (e, "argz_create"); - } + parse_command_line_arguments(argc, argv, &ctx, &sec_typemask); /* Record the instance owner: the effective uid of the broker. * Set default rolemask for messages sent with flux_send() @@ -430,7 +439,8 @@ int main (int argc, char *argv[]) /* Boot with PMI. */ double pmi_elapsed_sec; - if (boot_pmi (&ctx, &pmi_elapsed_sec) < 0) + if (boot_pmi (ctx.overlay, ctx.attrs, ctx.tbon.k, + &ctx.rank, &ctx.size, &pmi_elapsed_sec) < 0) log_msg_exit ("bootstrap failed"); assert (ctx.rank != FLUX_NODEID_ANY); @@ -592,13 +602,6 @@ int main (int argc, char *argv[]) if (overlay_connect (ctx.overlay) < 0) log_err_exit ("overlay_connect"); - { - const char *rundir; - if (attr_get (ctx.attrs, "broker.rundir", &rundir, NULL) < 0) { - log_msg_exit ("broker.rundir attribute is not set"); - } - } - shutdown_set_handle (ctx.shutdown, ctx.h); shutdown_set_callback (ctx.shutdown, shutdown_cb, &ctx); @@ -759,31 +762,31 @@ static void init_attrs_from_environment (attr_t *attrs) } } -static void init_attrs_overlay (broker_ctx_t *ctx) +static void init_attrs_overlay (attr_t *attrs) { - char *tbonendpoint = "tbon.endpoint"; - char *mcastendpoint = "mcast.endpoint"; + const char *tbonendpoint = "tbon.endpoint"; + const char *mcastendpoint = "mcast.endpoint"; - if (attr_add (ctx->attrs, + if (attr_add (attrs, tbonendpoint, "tcp://%h:*", 0) < 0) log_err_exit ("attr_add %s", tbonendpoint); - if (attr_add (ctx->attrs, + if (attr_add (attrs, mcastendpoint, "tbon", 0) < 0) log_err_exit ("attr_add %s", mcastendpoint); } -static void init_attrs_broker_pid (broker_ctx_t *ctx) +static void init_attrs_broker_pid (attr_t *attrs, pid_t pid) { char *attrname = "broker.pid"; char *pidval; - pidval = xasprintf ("%u", ctx->pid); - if (attr_add (ctx->attrs, + pidval = xasprintf ("%u", pid); + if (attr_add (attrs, attrname, pidval, FLUX_ATTRFLAG_IMMUTABLE) < 0) @@ -791,16 +794,16 @@ static void init_attrs_broker_pid (broker_ctx_t *ctx) free (pidval); } -static void init_attrs (broker_ctx_t *ctx) +static void init_attrs (attr_t *attrs, pid_t pid) { /* Initialize config attrs from environment set up by flux(1) */ - init_attrs_from_environment (ctx->attrs); + init_attrs_from_environment (attrs); /* Initialize other miscellaneous attrs */ - init_attrs_overlay (ctx); - init_attrs_broker_pid (ctx); + init_attrs_overlay (attrs); + init_attrs_broker_pid (attrs, pid); } static void hello_update_cb (hello_t *hello, void *arg) @@ -1037,7 +1040,7 @@ static int create_persistdir (attr_t *attrs, uint32_t rank) * * Caller is responsible for freeing memory of returned value. */ -static char * calc_endpoint (broker_ctx_t *ctx, const char *endpoint) +static char * calc_endpoint (attr_t *attrs, const char *endpoint) { char ipaddr[HOST_NAME_MAX + 1]; char *ptr, *buf, *rv = NULL; @@ -1060,7 +1063,7 @@ static char * calc_endpoint (broker_ctx_t *ctx, const char *endpoint) len += strlen (ipaddr); } else if (*ptr == 'B') { - if (attr_get (ctx->attrs, "broker.rundir", &rundir, NULL) < 0) { + if (attr_get (attrs, "broker.rundir", &rundir, NULL) < 0) { log_msg ("broker.rundir attribute is not set"); goto done; } @@ -1101,7 +1104,9 @@ static char * calc_endpoint (broker_ctx_t *ctx, const char *endpoint) return (rv); } -static int boot_pmi (broker_ctx_t *ctx, double *elapsed_sec) +static int boot_pmi (overlay_t *overlay, attr_t *attrs, int tbon_k, + uint32_t *rank_out, uint32_t *size_out, + double *elapsed_sec) { int spawned, size, rank, appnum; int relay_rank = -1, parent_rank; @@ -1137,25 +1142,26 @@ static int boot_pmi (broker_ctx_t *ctx, double *elapsed_sec) log_msg ("PMI_Get_rank: %s", pmi_strerror (e)); goto done; } + *rank_out = (uint32_t)rank; if ((e = PMI_Get_appnum (&appnum)) != PMI_SUCCESS) { log_msg ("PMI_Get_appnum: %s", pmi_strerror (e)); goto done; } - ctx->rank = rank; - ctx->size = size; - overlay_set_rank (ctx->overlay, ctx->rank); + *size_out = (uint32_t)size; + + overlay_set_rank (overlay, (uint32_t)rank); /* Get id string. */ - if (attr_get (ctx->attrs, "session-id", NULL, NULL) < 0) { + if (attr_get (attrs, "session-id", NULL, NULL) < 0) { id = xasprintf ("%d", appnum); - if (attr_add (ctx->attrs, "session-id", id, FLUX_ATTRFLAG_IMMUTABLE) < 0) + if (attr_add (attrs, "session-id", id, FLUX_ATTRFLAG_IMMUTABLE) < 0) goto done; } /* Initialize rundir */ - if (create_rundir (ctx->attrs) < 0) { + if (create_rundir (attrs) < 0) { log_err ("could not initialize rundir"); goto done; } @@ -1163,34 +1169,34 @@ static int boot_pmi (broker_ctx_t *ctx, double *elapsed_sec) /* Set TBON endpoint and mcast endpoint based on user settings */ - if (attr_get (ctx->attrs, "tbon.endpoint", &attrtbonendpoint, NULL) < 0) { + if (attr_get (attrs, "tbon.endpoint", &attrtbonendpoint, NULL) < 0) { log_err ("tbon.endpoint is not set"); goto done; } - if (!(tbonendpoint = calc_endpoint (ctx, attrtbonendpoint))) { + if (!(tbonendpoint = calc_endpoint (attrs, attrtbonendpoint))) { log_msg ("calc_endpoint error"); goto done; } - if (attr_set (ctx->attrs, "tbon.endpoint", tbonendpoint, true) < 0) { + if (attr_set (attrs, "tbon.endpoint", tbonendpoint, true) < 0) { log_err ("tbon.endpoint could not be set"); goto done; } - overlay_set_child (ctx->overlay, tbonendpoint); + overlay_set_child (overlay, tbonendpoint); - if (attr_get (ctx->attrs, "mcast.endpoint", &attrmcastendpoint, NULL) < 0) { + if (attr_get (attrs, "mcast.endpoint", &attrmcastendpoint, NULL) < 0) { log_err ("mcast.endpoint is not set"); goto done; } - if (!(mcastendpoint = calc_endpoint (ctx, attrmcastendpoint))) { + if (!(mcastendpoint = calc_endpoint (attrs, attrmcastendpoint))) { log_msg ("calc_endpoint error"); goto done; } - if (attr_set (ctx->attrs, "mcast.endpoint", mcastendpoint, true) < 0) { + if (attr_set (attrs, "mcast.endpoint", mcastendpoint, true) < 0) { log_err ("mcast.endpoint could not be set"); goto done; } @@ -1217,17 +1223,17 @@ static int boot_pmi (broker_ctx_t *ctx, double *elapsed_sec) for (i = 0; i < clique_size; i++) if (relay_rank == -1 || clique_ranks[i] < relay_rank) relay_rank = clique_ranks[i]; - if (relay_rank >= 0 && ctx->rank == relay_rank) { + if (relay_rank >= 0 && rank == relay_rank) { const char *rundir; char *relayfile = NULL; - if (attr_get (ctx->attrs, "broker.rundir", &rundir, NULL) < 0) { + if (attr_get (attrs, "broker.rundir", &rundir, NULL) < 0) { log_msg ("broker.rundir attribute is not set"); goto done; } relayfile = xasprintf ("%s/relay", rundir); - overlay_set_relay (ctx->overlay, "ipc://%s", relayfile); + overlay_set_relay (overlay, "ipc://%s", relayfile); cleanup_push_string (cleanup_file, relayfile); free (relayfile); } @@ -1260,14 +1266,14 @@ static int boot_pmi (broker_ctx_t *ctx, double *elapsed_sec) /* Bind to addresses to expand URI wildcards, so we can exchange * the real addresses. */ - if (overlay_bind (ctx->overlay) < 0) { + if (overlay_bind (overlay) < 0) { log_err ("overlay_bind failed"); /* function is idempotent */ goto done; } /* Write the URI of downstream facing socket under the rank (if any). */ - if ((child_uri = overlay_get_child (ctx->overlay))) { + if ((child_uri = overlay_get_child (overlay))) { if (snprintf (key, key_len, "cmbd.%d.uri", rank) >= key_len) { log_msg ("pmi key string overflow"); goto done; @@ -1286,7 +1292,7 @@ static int boot_pmi (broker_ctx_t *ctx, double *elapsed_sec) * (if any). */ if (strcasecmp (mcastendpoint, "tbon") - && (relay_uri = overlay_get_relay (ctx->overlay))) { + && (relay_uri = overlay_get_relay (overlay))) { if (snprintf (key, key_len, "cmbd.%d.relay", rank) >= key_len) { log_msg ("pmi key string overflow"); goto done; @@ -1314,8 +1320,8 @@ static int boot_pmi (broker_ctx_t *ctx, double *elapsed_sec) /* Read the uri of our parent, after computing its rank */ - if (ctx->rank > 0) { - parent_rank = kary_parentof (ctx->tbon.k, ctx->rank); + if (rank > 0) { + parent_rank = kary_parentof (tbon_k, (uint32_t)rank); if (snprintf (key, key_len, "cmbd.%d.uri", parent_rank) >= key_len) { log_msg ("pmi key string overflow"); goto done; @@ -1324,7 +1330,7 @@ static int boot_pmi (broker_ctx_t *ctx, double *elapsed_sec) log_msg ("pmi_kvs_get: %s", pmi_strerror (e)); goto done; } - overlay_set_parent (ctx->overlay, "%s", val); + overlay_set_parent (overlay, "%s", val); } /* Event distribution (four configurations): @@ -1352,9 +1358,9 @@ static int boot_pmi (broker_ctx_t *ctx, double *elapsed_sec) log_msg ("PMI_KVS_Get: %s", pmi_strerror (e)); goto done; } - overlay_set_event (ctx->overlay, "%s", val); + overlay_set_event (overlay, "%s", val); } else - overlay_set_event (ctx->overlay, mcastendpoint); + overlay_set_event (overlay, mcastendpoint); } if ((e = PMI_Barrier ()) != PMI_SUCCESS) { log_msg ("PMI_Barrier: %s", pmi_strerror (e));