Skip to content

Commit

Permalink
Merge pull request #1234 from morrone/broker_cleanup_2
Browse files Browse the repository at this point in the history
Broker cleanup 2
  • Loading branch information
garlick authored Oct 13, 2017
2 parents 1893b92 + 0e47ed5 commit 2bddc2e
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 61 deletions.
20 changes: 20 additions & 0 deletions src/broker/attr.c
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,17 @@ static int set_int (const char *name, const char *val, void *arg)
return 0;
}

int attr_add_int (attr_t *attrs, const char *name, int val, int flags)
{
char val_string[32];
int n;

n = snprintf (val_string, sizeof (val_string), "%d", val);
assert (n <= sizeof(val_string));

return attr_add (attrs, name, val_string, flags);
}

int attr_add_active_int (attr_t *attrs, const char *name, int *val, int flags)
{
return attr_add_active (attrs, name, flags, get_int, set_int, val);
Expand Down Expand Up @@ -274,6 +285,15 @@ static int set_uint32 (const char *name, const char *val, void *arg)
return 0;
}

int attr_add_uint32 (attr_t *attrs, const char *name, uint32_t val, int flags)
{
char val_string[32];

snprintf (val_string, sizeof (val_string), "%"PRIu32, val);

return attr_add (attrs, name, val_string, flags);
}

int attr_add_active_uint32 (attr_t *attrs, const char *name, uint32_t *val,
int flags)
{
Expand Down
7 changes: 7 additions & 0 deletions src/broker/attr.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ int attr_delete (attr_t *attrs, const char *name, bool force);
*/
int attr_add (attr_t *attrs, const char *name, const char *val, int flags);

/* Helper functions to add a non-string attribute. It performs the conversion
* to a string on the caller's behalf.
*/
int attr_add_int (attr_t *attrs, const char *name, int val, int flags);
int attr_add_uint32 (attr_t *attrs, const char *name, uint32_t val,
int flags);

/* Get/set an attribute.
*/
int attr_get (attr_t *attrs, const char *name, const char **val, int *flags);
Expand Down
108 changes: 51 additions & 57 deletions src/broker/broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ typedef struct {

/* Session parameters
*/
uint32_t size; /* session size */
uint32_t rank; /* our rank in session */
attr_t *attrs;
uint32_t userid; /* instance owner */
uint32_t rolemask;
Expand Down Expand Up @@ -190,7 +188,6 @@ static int create_dummyattrs (flux_t *h, uint32_t rank, uint32_t size);
static char *calc_endpoint (attr_t *attrs, const char *endpoint);

static int boot_pmi (overlay_t *overlay, attr_t *attrs, int tbon_k,
uint32_t *rank_out, uint32_t *size_out,
double *elapsed_sec);

static int attr_get_overlay (const char *name, const char **val, void *arg);
Expand Down Expand Up @@ -334,7 +331,6 @@ int main (int argc, char *argv[])
if (!(sigwatchers = zlist_new ()))
oom ();

ctx.rank = FLUX_NODEID_ANY;
ctx.modhash = modhash_create ();
if (!(ctx.services = service_switch_create ()))
log_err_exit ("service_switch_create");
Expand Down Expand Up @@ -439,29 +435,29 @@ int main (int argc, char *argv[])
/* Boot with PMI.
*/
double pmi_elapsed_sec;
if (boot_pmi (ctx.overlay, ctx.attrs, ctx.tbon.k,
&ctx.rank, &ctx.size, &pmi_elapsed_sec) < 0)
if (boot_pmi (ctx.overlay, ctx.attrs, ctx.tbon.k, &pmi_elapsed_sec) < 0)
log_msg_exit ("bootstrap failed");
uint32_t rank = overlay_get_rank(ctx.overlay);
uint32_t size = overlay_get_size(ctx.overlay);

assert (ctx.rank != FLUX_NODEID_ANY);
assert (ctx.size > 0);
assert (size > 0);
assert (attr_get (ctx.attrs, "session-id", NULL, NULL) == 0);

ctx.tbon.level = kary_levelof (ctx.tbon.k, ctx.rank);
ctx.tbon.maxlevel = kary_levelof (ctx.tbon.k, ctx.size - 1);
ctx.tbon.descendants = kary_sum_descendants (ctx.tbon.k, ctx.size, ctx.rank);
ctx.tbon.level = kary_levelof (ctx.tbon.k, rank);
ctx.tbon.maxlevel = kary_levelof (ctx.tbon.k, size - 1);
ctx.tbon.descendants = kary_sum_descendants (ctx.tbon.k, size, rank);

if (ctx.verbose) {
const char *sid = "unknown";
(void)attr_get (ctx.attrs, "session-id", &sid, NULL);
log_msg ("boot: rank=%d size=%d session-id=%s", ctx.rank, ctx.size, sid);
log_msg ("boot: rank=%d size=%d session-id=%s", rank, size, sid);
}

if (attr_set_flags (ctx.attrs, "session-id", FLUX_ATTRFLAG_IMMUTABLE) < 0)
log_err_exit ("attr_set_flags session-id");

// Setup profiling
setup_profiling (argv[0], ctx.rank);
setup_profiling (argv[0], rank);

/* Create/validate runtime directory (this function is idempotent)
*/
Expand All @@ -470,20 +466,20 @@ int main (int argc, char *argv[])
/* If persist-filesystem or persist-directory are set, initialize those,
* but only on rank 0.
*/
if (create_persistdir (ctx.attrs, ctx.rank) < 0)
if (create_persistdir (ctx.attrs, rank) < 0)
log_err_exit ("create_persistdir");

/* Initialize logging.
* OK to call flux_log*() after this.
*/
logbuf_initialize (ctx.h, ctx.rank, ctx.attrs);
logbuf_initialize (ctx.h, rank, ctx.attrs);

/* Allow flux_get_rank() and flux_get_size() to work in the broker.
*/
if (create_dummyattrs (ctx.h, ctx.rank, ctx.size) < 0)
if (create_dummyattrs (ctx.h, rank, size) < 0)
log_err_exit ("creating dummy attributes");

overlay_set_rank (ctx.overlay, ctx.rank);
overlay_set_rank (ctx.overlay, rank);

/* Registers message handlers and obtains rank.
*/
Expand All @@ -499,25 +495,25 @@ int main (int argc, char *argv[])
|| attr_add_active (ctx.attrs, "mcast.relay-endpoint",
FLUX_ATTRFLAG_IMMUTABLE,
attr_get_overlay, NULL, ctx.overlay) < 0
|| attr_add_active_uint32 (ctx.attrs, "rank", &ctx.rank,
|| attr_add_uint32 (ctx.attrs, "rank", rank,
FLUX_ATTRFLAG_IMMUTABLE) < 0
|| attr_add_active_uint32 (ctx.attrs, "size", &ctx.size,
|| attr_add_uint32 (ctx.attrs, "size", size,
FLUX_ATTRFLAG_IMMUTABLE) < 0
|| attr_add_active_int (ctx.attrs, "tbon.arity", &ctx.tbon.k,
|| attr_add_int (ctx.attrs, "tbon.arity", ctx.tbon.k,
FLUX_ATTRFLAG_IMMUTABLE) < 0
|| attr_add_active_int (ctx.attrs, "tbon.level", &ctx.tbon.level,
|| attr_add_int (ctx.attrs, "tbon.level", ctx.tbon.level,
FLUX_ATTRFLAG_IMMUTABLE) < 0
|| attr_add_active_int (ctx.attrs, "tbon.maxlevel",
&ctx.tbon.maxlevel,
|| attr_add_int (ctx.attrs, "tbon.maxlevel",
ctx.tbon.maxlevel,
FLUX_ATTRFLAG_IMMUTABLE) < 0
|| attr_add_active_int (ctx.attrs, "tbon.descendants",
&ctx.tbon.descendants,
|| attr_add_int (ctx.attrs, "tbon.descendants",
ctx.tbon.descendants,
FLUX_ATTRFLAG_IMMUTABLE) < 0
|| hello_register_attrs (ctx.hello, ctx.attrs) < 0) {
log_err_exit ("configuring attributes");
}

if (ctx.rank == 0) {
if (rank == 0) {
if (runlevel_register_attrs (ctx.runlevel, ctx.attrs) < 0)
log_err_exit ("configuring runlevel attributes");
}
Expand All @@ -542,11 +538,11 @@ int main (int argc, char *argv[])
* make a guess.
*/
if (ctx.shutdown_grace == 0) {
if (ctx.size < 16)
if (size < 16)
ctx.shutdown_grace = 1;
else if (ctx.size < 128)
else if (size < 128)
ctx.shutdown_grace = 2;
else if (ctx.size < 1024)
else if (size < 1024)
ctx.shutdown_grace = 4;
else
ctx.shutdown_grace = 10;
Expand All @@ -563,9 +559,9 @@ int main (int argc, char *argv[])
log_msg ("relay: %s", relay ? relay : "none");
}

set_proctitle (ctx.rank);
set_proctitle (rank);

if (ctx.rank == 0) {
if (rank == 0) {
const char *rc1, *rc3, *pmi, *uri;
const char *rc2 = ctx.init_shell_cmd;
size_t rc2_len = ctx.init_shell_cmd_len;
Expand All @@ -579,7 +575,7 @@ int main (int argc, char *argv[])
if (attr_get (ctx.attrs, "conf.pmi_library_path", &pmi, NULL) < 0)
log_err_exit ("conf.pmi_library_path is not set");

runlevel_set_size (ctx.runlevel, ctx.size);
runlevel_set_size (ctx.runlevel, size);
runlevel_set_subprocess_manager (ctx.runlevel, ctx.sm);
runlevel_set_callback (ctx.runlevel, runlevel_cb, &ctx);
runlevel_set_io_callback (ctx.runlevel, runlevel_io_cb, &ctx);
Expand Down Expand Up @@ -613,7 +609,7 @@ int main (int argc, char *argv[])
log_msg_exit ("heaptrace_initialize");
if (sequence_hash_initialize (ctx.h) < 0)
log_err_exit ("sequence_hash_initialize");
if (exec_initialize (ctx.h, ctx.sm, ctx.rank, ctx.attrs) < 0)
if (exec_initialize (ctx.h, ctx.sm, rank, ctx.attrs) < 0)
log_err_exit ("exec_initialize");
if (ping_initialize (ctx.h, "cmb") < 0)
log_err_exit ("ping_initialize");
Expand All @@ -626,7 +622,7 @@ int main (int argc, char *argv[])
*/
if (ctx.verbose)
log_msg ("initializing modules");
modhash_set_rank (ctx.modhash, ctx.rank);
modhash_set_rank (ctx.modhash, rank);
modhash_set_flux (ctx.modhash, ctx.h);
modhash_set_heartbeat (ctx.modhash, ctx.heartbeat);
/* Load the local connector module.
Expand All @@ -645,7 +641,7 @@ int main (int argc, char *argv[])
log_err_exit ("initializing heartbeat attributes");
if (heartbeat_start (ctx.heartbeat) < 0)
log_err_exit ("heartbeat_start");
if (ctx.rank == 0 && ctx.verbose)
if (rank == 0 && ctx.verbose)
log_msg ("installing session heartbeat: T=%0.1fs",
heartbeat_get_rate (ctx.heartbeat));

Expand Down Expand Up @@ -812,15 +808,17 @@ static void hello_update_cb (hello_t *hello, void *arg)

if (hello_complete (hello)) {
flux_log (ctx->h, LOG_INFO, "wireup: %d/%d (complete) %.1fs",
hello_get_count (hello), ctx->size, hello_get_time (hello));
hello_get_count (hello), overlay_get_size(ctx->overlay),
hello_get_time (hello));
flux_log (ctx->h, LOG_INFO, "Run level %d starting", 1);
overlay_set_idle_warning (ctx->overlay, 3);
if (runlevel_set_level (ctx->runlevel, 1) < 0)
log_err_exit ("runlevel_set_level 1");
/* FIXME: shutdown hello protocol */
} else {
flux_log (ctx->h, LOG_INFO, "wireup: %d/%d (incomplete) %.1fs",
hello_get_count (hello), ctx->size, hello_get_time (hello));
hello_get_count (hello), overlay_get_size(ctx->overlay),
hello_get_time (hello));
}
}

Expand All @@ -830,7 +828,7 @@ static void shutdown_cb (shutdown_t *s, bool expired, void *arg)
{
broker_ctx_t *ctx = arg;
if (expired) {
if (ctx->rank == 0)
if (overlay_get_rank(ctx->overlay) == 0)
exit_rc = shutdown_get_rc (s);
flux_reactor_stop (flux_get_reactor (ctx->h));
}
Expand Down Expand Up @@ -1105,7 +1103,6 @@ static char * calc_endpoint (attr_t *attrs, const char *endpoint)
}

static int boot_pmi (overlay_t *overlay, attr_t *attrs, int tbon_k,
uint32_t *rank_out, uint32_t *size_out,
double *elapsed_sec)
{
int spawned, size, rank, appnum;
Expand All @@ -1114,7 +1111,6 @@ static int boot_pmi (overlay_t *overlay, attr_t *attrs, int tbon_k,
int *clique_ranks = NULL;
const char *child_uri, *relay_uri;
int kvsname_len, key_len, val_len;
char *id = NULL;
char *kvsname = NULL;
char *key = NULL;
char *val = NULL;
Expand All @@ -1138,24 +1134,22 @@ static int boot_pmi (overlay_t *overlay, attr_t *attrs, int tbon_k,
log_msg ("PMI_Get_size: %s", pmi_strerror (e));
goto done;
}
overlay_set_size (overlay, (uint32_t)size);
if ((e = PMI_Get_rank (&rank)) != PMI_SUCCESS) {
log_msg ("PMI_Get_rank: %s", pmi_strerror (e));
goto done;
}
*rank_out = (uint32_t)rank;
overlay_set_rank (overlay, (uint32_t)rank);
if ((e = PMI_Get_appnum (&appnum)) != PMI_SUCCESS) {
log_msg ("PMI_Get_appnum: %s", pmi_strerror (e));
goto done;
}
*size_out = (uint32_t)size;

overlay_set_rank (overlay, (uint32_t)rank);

/* Get id string.
*/
if (attr_get (attrs, "session-id", NULL, NULL) < 0) {
id = xasprintf ("%d", appnum);
if (attr_add (attrs, "session-id", id, FLUX_ATTRFLAG_IMMUTABLE) < 0)
if (attr_add_int (attrs, "session-id", appnum,
FLUX_ATTRFLAG_IMMUTABLE) < 0)
goto done;
}

Expand Down Expand Up @@ -1370,8 +1364,6 @@ static int boot_pmi (overlay_t *overlay, attr_t *attrs, int tbon_k,
rc = 0;
done:
*elapsed_sec = monotime_since (start_time) / 1000;
if (id)
free (id);
if (clique_ranks)
free (clique_ranks);
if (kvsname)
Expand Down Expand Up @@ -1791,7 +1783,7 @@ static void broker_add_services (broker_ctx_t *ctx)
{
struct internal_service *svc;
for (svc = &services[0]; svc->name != NULL; svc++) {
if (!nodeset_member (svc->nodeset, ctx->rank))
if (!nodeset_member (svc->nodeset, overlay_get_rank(ctx->overlay)))
continue;
if (service_add (ctx->services, svc->name, NULL,
route_to_handle, ctx) < 0)
Expand Down Expand Up @@ -2080,7 +2072,7 @@ static int subvert_sendmsg_child (broker_ctx_t *ctx, const flux_msg_t *msg,
char uuid[16];
int rc = -1;

snprintf (uuid, sizeof (uuid), "%"PRIu32, ctx->rank);
snprintf (uuid, sizeof (uuid), "%"PRIu32, overlay_get_rank(ctx->overlay));
if (flux_msg_push_route (cpy, uuid) < 0)
goto done;
snprintf (uuid, sizeof (uuid), "%"PRIu32, nodeid);
Expand Down Expand Up @@ -2111,14 +2103,16 @@ static int broker_request_sendmsg (broker_ctx_t *ctx, const flux_msg_t *msg,
uint32_t nodeid, gw;
int flags;
int rc = -1;
uint32_t rank = overlay_get_rank(ctx->overlay);
uint32_t size = overlay_get_size(ctx->overlay);

if (flux_msg_get_nodeid (msg, &nodeid, &flags) < 0)
goto error;
if ((flags & FLUX_MSGFLAG_UPSTREAM) && nodeid == ctx->rank) {
if ((flags & FLUX_MSGFLAG_UPSTREAM) && nodeid == rank) {
rc = overlay_sendmsg_parent (ctx->overlay, msg);
if (rc < 0)
goto error;
} else if ((flags & FLUX_MSGFLAG_UPSTREAM) && nodeid != ctx->rank) {
} else if ((flags & FLUX_MSGFLAG_UPSTREAM) && nodeid != rank) {
rc = service_send (ctx->services, msg);
if (rc < 0 && errno == ENOSYS) {
rc = overlay_sendmsg_parent (ctx->overlay, msg);
Expand All @@ -2136,12 +2130,12 @@ static int broker_request_sendmsg (broker_ctx_t *ctx, const flux_msg_t *msg,
}
if (rc < 0)
goto error;
} else if (nodeid == ctx->rank) {
} else if (nodeid == rank) {
rc = service_send (ctx->services, msg);
if (rc < 0)
goto error;
} else if ((gw = kary_child_route (ctx->tbon.k, ctx->size,
ctx->rank, nodeid)) != KARY_NONE) {
} else if ((gw = kary_child_route (ctx->tbon.k, size, rank, nodeid))
!= KARY_NONE) {
rc = subvert_sendmsg_child (ctx, msg, gw);
if (rc < 0)
goto error;
Expand Down Expand Up @@ -2176,7 +2170,7 @@ static int broker_response_sendmsg (broker_ctx_t *ctx, const flux_msg_t *msg)
goto done;
}

parent = kary_parentof (ctx->tbon.k, ctx->rank);
parent = kary_parentof (ctx->tbon.k, overlay_get_rank(ctx->overlay));
snprintf (puuid, sizeof (puuid), "%"PRIu32, parent);

/* See if it should go to the parent (backwards!)
Expand Down Expand Up @@ -2210,7 +2204,7 @@ static int broker_event_sendmsg (broker_ctx_t *ctx, const flux_msg_t *msg)

if (!(cpy = flux_msg_copy (msg, true)))
goto done;
if (ctx->rank > 0) {
if (overlay_get_rank(ctx->overlay) > 0) {
if (flux_msg_enable_route (cpy) < 0)
goto done;
rc = overlay_sendmsg_parent (ctx->overlay, cpy);
Expand Down
Loading

0 comments on commit 2bddc2e

Please sign in to comment.