Skip to content

Commit

Permalink
Merge pull request #1241 from morrone/broker_cleanup_3
Browse files Browse the repository at this point in the history
Broker cleanup 3
  • Loading branch information
garlick authored Oct 19, 2017
2 parents 778e42b + 1adc7bb commit 2095397
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 103 deletions.
128 changes: 38 additions & 90 deletions src/broker/broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,6 @@ typedef enum {
ERROR_MODE_RETURN,
} request_error_mode_t;

struct tbon_param {
int k;
int level;
int maxlevel;
int descendants;
};

typedef struct {
/* 0MQ
*/
Expand All @@ -105,7 +98,6 @@ typedef struct {
*/
flux_t *h;
flux_reactor_t *reactor;
zlist_t *sigwatchers;

/* Sockets.
*/
Expand All @@ -123,8 +115,6 @@ typedef struct {
/* Misc
*/
bool verbose;
bool quiet;
pid_t pid;
int event_recv_seq;
int event_send_seq;
bool event_active; /* primary event source is active */
Expand All @@ -134,7 +124,7 @@ typedef struct {
double shutdown_grace;
zlist_t *subscriptions; /* subscripts for internal services */
content_cache_t *cache;
struct tbon_param tbon;
int tbon_k;
/* Bootstrap
*/
hello_t *hello;
Expand Down Expand Up @@ -185,12 +175,9 @@ static int create_persistdir (attr_t *attrs, uint32_t rank);
static int create_rundir (attr_t *attrs);
static int create_dummyattrs (flux_t *h, uint32_t rank, uint32_t size);

static char *calc_endpoint (attr_t *attrs, const char *endpoint);
static char *format_endpoint (attr_t *attrs, const char *endpoint);

static int boot_pmi (overlay_t *overlay, attr_t *attrs, int tbon_k,
double *elapsed_sec);

static int attr_get_overlay (const char *name, const char **val, void *arg);
static int boot_pmi (overlay_t *overlay, attr_t *attrs, int tbon_k);

static void init_attrs (attr_t *attrs, pid_t pid);

Expand Down Expand Up @@ -253,18 +240,17 @@ void parse_command_line_arguments(int argc, char *argv[],
ctx->verbose = true;
break;
case 'q': /* --quiet */
ctx->quiet = true;
break;
case 'X': /* --module-path PATH */
if (attr_set (ctx->attrs, "conf.module_path", optarg, true) < 0)
log_err_exit ("setting conf.module_path attribute");
break;
case 'k': /* --k-ary k */
errno = 0;
ctx->tbon.k = strtoul (optarg, &endptr, 10);
ctx->tbon_k = strtoul (optarg, &endptr, 10);
if (errno || *endptr != '\0')
log_err_exit ("k-ary '%s'", optarg);
if (ctx->tbon.k < 1)
if (ctx->tbon_k < 1)
usage ();
break;
case 'H': /* --heartrate SECS */
Expand Down Expand Up @@ -336,7 +322,7 @@ int main (int argc, char *argv[])
log_err_exit ("service_switch_create");
ctx.overlay = overlay_create ();
ctx.hello = hello_create ();
ctx.tbon.k = 2; /* binary TBON is default */
ctx.tbon_k = 2; /* binary TBON is default */
ctx.heartbeat = heartbeat_create ();
ctx.shutdown = shutdown_create ();
ctx.attrs = attr_create ();
Expand All @@ -347,9 +333,7 @@ int main (int argc, char *argv[])
if (!(ctx.runlevel = runlevel_create ()))
oom ();

ctx.pid = getpid();

init_attrs (ctx.attrs, ctx.pid);
init_attrs (ctx.attrs, getpid());

if (!(ctx.sm = subprocess_manager_create ()))
oom ();
Expand Down Expand Up @@ -435,18 +419,17 @@ int main (int argc, char *argv[])
/* Boot with PMI.
*/
double pmi_elapsed_sec;
if (boot_pmi (ctx.overlay, ctx.attrs, ctx.tbon.k, &pmi_elapsed_sec) < 0)
struct timespec pmi_start_time;
monotime (&pmi_start_time);
if (boot_pmi (ctx.overlay, ctx.attrs, ctx.tbon_k) < 0)
log_msg_exit ("bootstrap failed");
pmi_elapsed_sec = monotime_since (pmi_start_time) / 1000;
uint32_t rank = overlay_get_rank(ctx.overlay);
uint32_t size = overlay_get_size(ctx.overlay);

assert (size > 0);
assert (attr_get (ctx.attrs, "session-id", NULL, NULL) == 0);

ctx.tbon.level = kary_levelof (ctx.tbon.k, rank);
ctx.tbon.maxlevel = kary_levelof (ctx.tbon.k, size - 1);
ctx.tbon.descendants = kary_sum_descendants (ctx.tbon.k, size, rank);

if (ctx.verbose) {
const char *sid = "unknown";
(void)attr_get (ctx.attrs, "session-id", &sid, NULL);
Expand Down Expand Up @@ -479,8 +462,6 @@ int main (int argc, char *argv[])
if (create_dummyattrs (ctx.h, rank, size) < 0)
log_err_exit ("creating dummy attributes");

overlay_set_rank (ctx.overlay, rank);

/* Registers message handlers and obtains rank.
*/
if (content_cache_set_flux (ctx.cache, ctx.h) < 0)
Expand All @@ -490,28 +471,10 @@ int main (int argc, char *argv[])

/* Configure attributes.
*/
if (attr_add_active (ctx.attrs, "tbon.parent-endpoint", 0,
attr_get_overlay, NULL, ctx.overlay) < 0
|| attr_add_active (ctx.attrs, "mcast.relay-endpoint",
FLUX_ATTRFLAG_IMMUTABLE,
attr_get_overlay, NULL, ctx.overlay) < 0
|| attr_add_uint32 (ctx.attrs, "rank", rank,
FLUX_ATTRFLAG_IMMUTABLE) < 0
|| attr_add_uint32 (ctx.attrs, "size", size,
FLUX_ATTRFLAG_IMMUTABLE) < 0
|| attr_add_int (ctx.attrs, "tbon.arity", ctx.tbon.k,
FLUX_ATTRFLAG_IMMUTABLE) < 0
|| attr_add_int (ctx.attrs, "tbon.level", ctx.tbon.level,
FLUX_ATTRFLAG_IMMUTABLE) < 0
|| attr_add_int (ctx.attrs, "tbon.maxlevel",
ctx.tbon.maxlevel,
FLUX_ATTRFLAG_IMMUTABLE) < 0
|| attr_add_int (ctx.attrs, "tbon.descendants",
ctx.tbon.descendants,
FLUX_ATTRFLAG_IMMUTABLE) < 0
|| hello_register_attrs (ctx.hello, ctx.attrs) < 0) {
if (overlay_register_attrs(ctx.overlay, ctx.attrs) < 0)
log_err_exit ("registering overlay attributes");
if (hello_register_attrs (ctx.hello, ctx.attrs) < 0)
log_err_exit ("configuring attributes");
}

if (rank == 0) {
if (runlevel_register_attrs (ctx.runlevel, ctx.attrs) < 0)
Expand Down Expand Up @@ -1038,7 +1001,7 @@ static int create_persistdir (attr_t *attrs, uint32_t rank)
*
* Caller is responsible for freeing memory of returned value.
*/
static char * calc_endpoint (attr_t *attrs, const char *endpoint)
static char * format_endpoint (attr_t *attrs, const char *endpoint)
{
char ipaddr[HOST_NAME_MAX + 1];
char *ptr, *buf, *rv = NULL;
Expand Down Expand Up @@ -1102,27 +1065,31 @@ static char * calc_endpoint (attr_t *attrs, const char *endpoint)
return (rv);
}

static int boot_pmi (overlay_t *overlay, attr_t *attrs, int tbon_k,
double *elapsed_sec)
static int boot_pmi (overlay_t *overlay, attr_t *attrs, int tbon_k)
{
int spawned, size, rank, appnum;
int relay_rank = -1, parent_rank;
int spawned;
int size;
int rank;
int appnum;
int relay_rank = -1;
int parent_rank;
int clique_size;
int *clique_ranks = NULL;
const char *child_uri, *relay_uri;
int kvsname_len, key_len, val_len;
const char *child_uri;
const char *relay_uri;
int kvsname_len;
int key_len;
int val_len;
char *kvsname = NULL;
char *key = NULL;
char *val = NULL;
int e, rc = -1;
struct timespec start_time;
int e;
int rc = -1;
const char *attrtbonendpoint;
char *tbonendpoint = NULL;
const char *attrmcastendpoint;
char *mcastendpoint = NULL;

monotime (&start_time);

if ((e = PMI_Init (&spawned)) != PMI_SUCCESS) {
log_msg ("PMI_Init: %s", pmi_strerror (e));
goto done;
Expand All @@ -1134,18 +1101,18 @@ static int boot_pmi (overlay_t *overlay, attr_t *attrs, int tbon_k,
log_msg ("PMI_Get_size: %s", pmi_strerror (e));
goto done;
}
overlay_set_size (overlay, (uint32_t)size);
if ((e = PMI_Get_rank (&rank)) != PMI_SUCCESS) {
log_msg ("PMI_Get_rank: %s", pmi_strerror (e));
goto done;
}
overlay_set_rank (overlay, (uint32_t)rank);
if ((e = PMI_Get_appnum (&appnum)) != PMI_SUCCESS) {
log_msg ("PMI_Get_appnum: %s", pmi_strerror (e));
goto done;
}

/* Get id string.
overlay_init (overlay, (uint32_t)size, (uint32_t)rank, tbon_k);

/* Set session-id attribute from PMI appnum if not already set.
*/
if (attr_get (attrs, "session-id", NULL, NULL) < 0) {
if (attr_add_int (attrs, "session-id", appnum,
Expand All @@ -1168,8 +1135,8 @@ static int boot_pmi (overlay_t *overlay, attr_t *attrs, int tbon_k,
goto done;
}

if (!(tbonendpoint = calc_endpoint (attrs, attrtbonendpoint))) {
log_msg ("calc_endpoint error");
if (!(tbonendpoint = format_endpoint (attrs, attrtbonendpoint))) {
log_msg ("format_endpoint error");
goto done;
}

Expand All @@ -1185,8 +1152,8 @@ static int boot_pmi (overlay_t *overlay, attr_t *attrs, int tbon_k,
goto done;
}

if (!(mcastendpoint = calc_endpoint (attrs, attrmcastendpoint))) {
log_msg ("calc_endpoint error");
if (!(mcastendpoint = format_endpoint (attrs, attrmcastendpoint))) {
log_msg ("format_endpoint error");
goto done;
}

Expand Down Expand Up @@ -1363,7 +1330,6 @@ static int boot_pmi (overlay_t *overlay, attr_t *attrs, int tbon_k,
PMI_Finalize ();
rc = 0;
done:
*elapsed_sec = monotime_since (start_time) / 1000;
if (clique_ranks)
free (clique_ranks);
if (kvsname)
Expand Down Expand Up @@ -1526,24 +1492,6 @@ static void broker_unhandle_signals (zlist_t *sigwatchers)
}
}

static int attr_get_overlay (const char *name, const char **val, void *arg)
{
overlay_t *overlay = arg;
int rc = -1;

if (!strcmp (name, "tbon.parent-endpoint"))
*val = overlay_get_parent (overlay);
else if (!strcmp (name, "mcast.relay-endpoint"))
*val = overlay_get_relay (overlay);
else {
errno = ENOENT;
goto done;
}
rc = 0;
done:
return rc;
}

/**
** Built-in services
**/
Expand Down Expand Up @@ -2134,7 +2082,7 @@ static int broker_request_sendmsg (broker_ctx_t *ctx, const flux_msg_t *msg,
rc = service_send (ctx->services, msg);
if (rc < 0)
goto error;
} else if ((gw = kary_child_route (ctx->tbon.k, size, rank, nodeid))
} else if ((gw = kary_child_route (ctx->tbon_k, size, rank, nodeid))
!= KARY_NONE) {
rc = subvert_sendmsg_child (ctx, msg, gw);
if (rc < 0)
Expand Down Expand Up @@ -2170,7 +2118,7 @@ static int broker_response_sendmsg (broker_ctx_t *ctx, const flux_msg_t *msg)
goto done;
}

parent = kary_parentof (ctx->tbon.k, overlay_get_rank(ctx->overlay));
parent = kary_parentof (ctx->tbon_k, overlay_get_rank(ctx->overlay));
snprintf (puuid, sizeof (puuid), "%"PRIu32, parent);

/* See if it should go to the parent (backwards!)
Expand Down
Loading

0 comments on commit 2095397

Please sign in to comment.