Skip to content

Commit

Permalink
Merge pull request flux-framework#4474 from garlick/overlay_topo
Browse files Browse the repository at this point in the history
refactor broker overlay for topology flexibility
  • Loading branch information
mergify[bot] authored Aug 9, 2022
2 parents 59be0fd + f004baf commit 23beb76
Show file tree
Hide file tree
Showing 15 changed files with 866 additions and 133 deletions.
4 changes: 3 additions & 1 deletion doc/man7/flux-broker-attributes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ TREE BASED OVERLAY NETWORK
==========================

tbon.fanout [Updates: C]
Branching factor of the tree based overlay network. Default: ``2``.
Branching factor of the tree based overlay network. A value of ``0``
means the topology is "flat" (rank 0 is the parent of all other ranks).
Default: ``2``.

tbon.descendants
Number of descendants "below" this node of the tree based
Expand Down
1 change: 0 additions & 1 deletion etc/flux.service.in
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ ExecStart=/bin/bash -c '\
@X_BINDIR@/flux broker \
--config-path=@X_SYSCONFDIR@/flux/system/conf.d \
-Scron.directory=@X_SYSCONFDIR@/flux/system/cron.d \
-Stbon.fanout=256 \
-Srundir=@X_RUNSTATEDIR@/flux \
-Sstatedir=${STATE_DIRECTORY:-/var/lib/flux} \
-Slocal-uri=local://@X_RUNSTATEDIR@/flux/local \
Expand Down
12 changes: 10 additions & 2 deletions src/broker/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ libbroker_la_SOURCES = \
groups.h \
groups.c \
shutdown.h \
shutdown.c
shutdown.c \
topology.h \
topology.c

flux_broker_LDADD = \
$(builddir)/libbroker.la \
Expand All @@ -88,7 +90,8 @@ TESTS = test_attr.t \
test_pmiutil.t \
test_boot_config.t \
test_runat.t \
test_overlay.t
test_overlay.t \
test_topology.t

test_ldadd = \
$(builddir)/libbroker.la \
Expand Down Expand Up @@ -150,3 +153,8 @@ test_overlay_t_SOURCES = test/overlay.c
test_overlay_t_CPPFLAGS = $(test_cppflags)
test_overlay_t_LDADD = $(test_ldadd)
test_overlay_t_LDFLAGS = $(test_ldflags)

test_topology_t_SOURCES = test/topology.c
test_topology_t_CPPFLAGS = $(test_cppflags)
test_topology_t_LDADD = $(test_ldadd)
test_topology_t_LDFLAGS = $(test_ldflags)
33 changes: 26 additions & 7 deletions src/broker/attr.c
Original file line number Diff line number Diff line change
Expand Up @@ -242,13 +242,14 @@ static int set_int (const char *name, const char *val, void *arg)
errno = EINVAL;
return -1;
}
errno = 0;
n = strtol (val, &endptr, 0);
if (n <= INT_MIN || n >= INT_MAX) {
errno = ERANGE;
if (errno != 0 || *endptr != '\0') {
errno = EINVAL;
return -1;
}
if (*endptr != '\0') {
errno = EINVAL;
if (n <= INT_MIN || n >= INT_MAX) {
errno = ERANGE;
return -1;
}
*i = (int)n;
Expand Down Expand Up @@ -288,10 +289,9 @@ static int set_uint32 (const char *name, const char *val, void *arg)
char *endptr;
unsigned long n;

errno = 0;
n = strtoul (val, &endptr, 0);
if (n == ULONG_MAX) /* ERANGE set by strtol */
return -1;
if (endptr == val || *endptr != '\0') {
if (errno != 0 || *endptr != '\0') {
errno = EINVAL;
return -1;
}
Expand All @@ -314,6 +314,25 @@ int attr_add_active_uint32 (attr_t *attrs, const char *name, uint32_t *val,
return attr_add_active (attrs, name, flags, get_uint32, set_uint32, val);
}

int attr_get_uint32 (attr_t *attrs, const char *name, uint32_t *value)
{
const char *s;
uint32_t i;
char *endptr;

if (attr_get (attrs, name, &s, NULL) < 0)
return -1;

errno = 0;
i = strtoul (s, &endptr, 10);
if (errno != 0 || *endptr != '\0') {
errno = EINVAL;
return -1;
}
*value = i;
return 0;
}

const char *attr_first (attr_t *attrs)
{
struct entry *e = zhash_first (attrs->hash);
Expand Down
4 changes: 4 additions & 0 deletions src/broker/attr.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ int attr_add_active_int (attr_t *attrs, const char *name, int *val,
int attr_add_active_uint32 (attr_t *attrs, const char *name, uint32_t *val,
int flags);

/* Get an attribute and parse it as an integer value.
*/
int attr_get_uint32 (attr_t *attrs, const char *name, uint32_t *value);

/* Iterate over attribute names with internal cursor.
*/
const char *attr_first (attr_t *attrs);
Expand Down
30 changes: 25 additions & 5 deletions src/broker/boot_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@
#include <flux/hostlist.h>

#include "src/common/libutil/log.h"
#include "src/common/libutil/kary.h"
#include "src/common/libutil/errno_safe.h"
#include "src/common/libpmi/clique.h"

#include "attr.h"
#include "overlay.h"
#include "topology.h"
#include "boot_config.h"

#define DEFAULT_FANOUT 0


/* Copy 'fmt' into 'buf', substituting the following tokens:
* - %h host
Expand Down Expand Up @@ -456,8 +458,21 @@ int boot_config (flux_t *h, struct overlay *overlay, attr_t *attrs)
struct boot_conf conf;
uint32_t rank;
uint32_t size;
int fanout = overlay_get_fanout (overlay);
uint32_t fanout;
json_t *hosts = NULL;
struct topology *topo = NULL;

/* Fetch the tbon.fanout attribute and supply a default value if unset.
*/
if (attr_get_uint32 (attrs, "tbon.fanout", &fanout) < 0)
fanout = DEFAULT_FANOUT;
else
(void)attr_delete (attrs, "tbon.fanout", true);
if (attr_add_uint32 (attrs,
"tbon.fanout",
fanout,
FLUX_ATTRFLAG_IMMUTABLE) < 0)
return -1;

/* Ingest the [bootstrap] stanza.
*/
Expand Down Expand Up @@ -492,7 +507,10 @@ int boot_config (flux_t *h, struct overlay *overlay, attr_t *attrs)
/* Tell overlay network this broker's rank and size.
* If a curve certificate was provided, load it.
*/
if (overlay_set_geometry (overlay, size, rank) < 0)
if (!(topo = topology_create (size))
|| topology_set_kary (topo, fanout) < 0
|| topology_set_rank (topo, rank) < 0
|| overlay_set_topology (overlay, topo) < 0)
goto error;
if (conf.curve_cert) {
if (overlay_cert_load (overlay, conf.curve_cert) < 0)
Expand All @@ -509,7 +527,7 @@ int boot_config (flux_t *h, struct overlay *overlay, attr_t *attrs)
* attribute to the URI peers will connect to. If broker has no
* downstream peers, set tbon.endpoint to NULL.
*/
if (kary_childof (fanout, size, rank, 0) != KARY_NONE) {
if (topology_get_child_ranks (topo, NULL, 0) > 0) {
char bind_uri[MAX_URI + 1];
char my_uri[MAX_URI + 1];

Expand Down Expand Up @@ -559,7 +577,7 @@ int boot_config (flux_t *h, struct overlay *overlay, attr_t *attrs)
char parent_uri[MAX_URI + 1];
if (boot_config_geturibyrank (hosts,
&conf,
kary_parentof (fanout, rank),
topology_get_parent (topo),
parent_uri,
sizeof (parent_uri)) < 0)
goto error;
Expand All @@ -584,9 +602,11 @@ int boot_config (flux_t *h, struct overlay *overlay, attr_t *attrs)
goto error;
}
json_decref (hosts);
topology_decref (topo);
return 0;
error:
ERRNO_SAFE_WRAP (json_decref, hosts);
ERRNO_SAFE_WRAP (topology_decref, topo);
return -1;
}

Expand Down
56 changes: 39 additions & 17 deletions src/broker/boot_pmi.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,19 @@
#include "src/common/libutil/log.h"
#include "src/common/libutil/cleanup.h"
#include "src/common/libutil/ipaddr.h"
#include "src/common/libutil/kary.h"
#include "src/common/libutil/errno_safe.h"
#include "src/common/libpmi/pmi.h"
#include "src/common/libpmi/pmi_strerror.h"
#include "src/common/libpmi/clique.h"

#include "attr.h"
#include "overlay.h"
#include "topology.h"
#include "boot_pmi.h"
#include "pmiutil.h"

#define DEFAULT_FANOUT 2


/* If the broker is launched via flux-shell, then the shell may opt
* to set a "flux.instance-level" parameter in the PMI kvs to tell
Expand Down Expand Up @@ -178,8 +180,7 @@ static int set_hostlist_attr (attr_t *attrs, struct hostlist *hl)

int boot_pmi (struct overlay *overlay, attr_t *attrs)
{
int fanout = overlay_get_fanout (overlay);
int rank;
uint32_t fanout;
char key[64];
char val[1024];
char hostname[MAXHOSTNAMELEN + 1];
Expand All @@ -188,10 +189,26 @@ int boot_pmi (struct overlay *overlay, attr_t *attrs)
json_t *o;
struct pmi_handle *pmi;
struct pmi_params pmi_params;
struct topology *topo = NULL;
int child_count;
int *child_ranks = NULL;
int result;
const char *uri;
int i;

/* Fetch the tbon.fanout attribute and supply a default value if unset.
*/
if (attr_get_uint32 (attrs, "tbon.fanout", &fanout) < 0)
fanout = DEFAULT_FANOUT;
else
(void)attr_delete (attrs, "tbon.fanout", true);
if (attr_add_uint32 (attrs,
"tbon.fanout",
fanout,
FLUX_ATTRFLAG_IMMUTABLE) < 0)
return -1;


memset (&pmi_params, 0, sizeof (pmi_params));
if (!(pmi = broker_pmi_create ())) {
log_err ("broker_pmi_create");
Expand All @@ -215,9 +232,10 @@ int boot_pmi (struct overlay *overlay, attr_t *attrs)
log_err ("error setting broker.mapping attribute");
goto error;
}
if (overlay_set_geometry (overlay,
pmi_params.size,
pmi_params.rank) < 0)
if (!(topo = topology_create (pmi_params.size))
|| topology_set_kary (topo, fanout) < 0
|| topology_set_rank (topo, pmi_params.rank) < 0
|| overlay_set_topology (overlay, topo) < 0)
goto error;
if (gethostname (hostname, sizeof (hostname)) < 0) {
log_err ("gethostname");
Expand All @@ -242,15 +260,17 @@ int boot_pmi (struct overlay *overlay, attr_t *attrs)
*/
overlay_set_ipv6 (overlay, 1);

child_count = topology_get_child_ranks (topo, NULL, 0);
if (child_count > 0) {
if (!(child_ranks = calloc (child_count, sizeof (child_ranks[0])))
|| topology_get_child_ranks (topo, child_ranks, child_count) < 0)
goto error;
}

/* If there are to be downstream peers, then bind to socket and extract
* the concretized URI for sharing with other ranks.
* N.B. there are no downstream peers if the 0th child of this rank
* in k-ary tree does not exist.
*/
if (kary_childof (fanout,
pmi_params.size,
pmi_params.rank,
0) != KARY_NONE) {
if (child_count > 0) {
char buf[1024];

if (format_bind_uri (buf, sizeof (buf), attrs, pmi_params.rank) < 0)
Expand Down Expand Up @@ -306,8 +326,8 @@ int boot_pmi (struct overlay *overlay, attr_t *attrs)
if (pmi_params.rank > 0) {
const char *peer_pubkey;
const char *peer_uri;
int rank = topology_get_parent (topo);

rank = kary_parentof (fanout, pmi_params.rank);
if (snprintf (key, sizeof (key), "%d", rank) >= sizeof (key)) {
log_msg ("pmi key string overflow");
goto error;
Expand Down Expand Up @@ -343,12 +363,10 @@ int boot_pmi (struct overlay *overlay, attr_t *attrs)

/* Fetch the business card of children and inform overlay of public keys.
*/
for (i = 0; i < fanout; i++) {
for (i = 0; i < child_count; i++) {
const char *peer_pubkey;
int rank = child_ranks[i];

rank = kary_childof (fanout, pmi_params.size, pmi_params.rank, i);
if (rank == KARY_NONE)
break;
if (snprintf (key, sizeof (key), "%d", rank) >= sizeof (key)) {
log_msg ("pmi key string overflow");
goto error;
Expand Down Expand Up @@ -427,11 +445,15 @@ int boot_pmi (struct overlay *overlay, attr_t *attrs)
free (bizcard);
broker_pmi_destroy (pmi);
hostlist_destroy (hl);
free (child_ranks);
topology_decref (topo);
return 0;
error:
free (bizcard);
broker_pmi_destroy (pmi);
hostlist_destroy (hl);
free (child_ranks);
topology_decref (topo);
return -1;
}

Expand Down
1 change: 0 additions & 1 deletion src/broker/broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
#include "src/common/libutil/cleanup.h"
#include "src/common/libidset/idset.h"
#include "src/common/libutil/ipaddr.h"
#include "src/common/libutil/kary.h"
#include "src/common/libpmi/pmi.h"
#include "src/common/libpmi/pmi_strerror.h"
#include "src/common/libutil/fsd.h"
Expand Down
Loading

0 comments on commit 23beb76

Please sign in to comment.