Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor broker overlay for topology flexibility #4474

Merged
merged 8 commits into from
Aug 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion doc/man7/flux-broker-attributes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,9 @@ TREE BASED OVERLAY NETWORK
==========================

tbon.fanout [Updates: C]
Branching factor of the tree based overlay network. Default: ``2``.
Branching factor of the tree based overlay network. A value of ``0``
means the topology is "flat" (rank 0 is the parent of all other ranks).
Default: ``2``.

tbon.descendants
Number of descendants "below" this node of the tree based
Expand Down
1 change: 0 additions & 1 deletion etc/flux.service.in
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ ExecStart=/bin/bash -c '\
@X_BINDIR@/flux broker \
--config-path=@X_SYSCONFDIR@/flux/system/conf.d \
-Scron.directory=@X_SYSCONFDIR@/flux/system/cron.d \
-Stbon.fanout=256 \
-Srundir=@X_RUNSTATEDIR@/flux \
-Sstatedir=${STATE_DIRECTORY:-/var/lib/flux} \
-Slocal-uri=local://@X_RUNSTATEDIR@/flux/local \
Expand Down
12 changes: 10 additions & 2 deletions src/broker/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ libbroker_la_SOURCES = \
groups.h \
groups.c \
shutdown.h \
shutdown.c
shutdown.c \
topology.h \
topology.c

flux_broker_LDADD = \
$(builddir)/libbroker.la \
Expand All @@ -88,7 +90,8 @@ TESTS = test_attr.t \
test_pmiutil.t \
test_boot_config.t \
test_runat.t \
test_overlay.t
test_overlay.t \
test_topology.t

test_ldadd = \
$(builddir)/libbroker.la \
Expand Down Expand Up @@ -150,3 +153,8 @@ test_overlay_t_SOURCES = test/overlay.c
test_overlay_t_CPPFLAGS = $(test_cppflags)
test_overlay_t_LDADD = $(test_ldadd)
test_overlay_t_LDFLAGS = $(test_ldflags)

test_topology_t_SOURCES = test/topology.c
test_topology_t_CPPFLAGS = $(test_cppflags)
test_topology_t_LDADD = $(test_ldadd)
test_topology_t_LDFLAGS = $(test_ldflags)
33 changes: 26 additions & 7 deletions src/broker/attr.c
Original file line number Diff line number Diff line change
Expand Up @@ -242,13 +242,14 @@ static int set_int (const char *name, const char *val, void *arg)
errno = EINVAL;
return -1;
}
errno = 0;
n = strtol (val, &endptr, 0);
if (n <= INT_MIN || n >= INT_MAX) {
errno = ERANGE;
if (errno != 0 || *endptr != '\0') {
errno = EINVAL;
return -1;
}
if (*endptr != '\0') {
errno = EINVAL;
if (n <= INT_MIN || n >= INT_MAX) {
errno = ERANGE;
return -1;
}
*i = (int)n;
Expand Down Expand Up @@ -288,10 +289,9 @@ static int set_uint32 (const char *name, const char *val, void *arg)
char *endptr;
unsigned long n;

errno = 0;
n = strtoul (val, &endptr, 0);
if (n == ULONG_MAX) /* ERANGE set by strtol */
return -1;
if (endptr == val || *endptr != '\0') {
if (errno != 0 || *endptr != '\0') {
errno = EINVAL;
return -1;
}
Expand All @@ -314,6 +314,25 @@ int attr_add_active_uint32 (attr_t *attrs, const char *name, uint32_t *val,
return attr_add_active (attrs, name, flags, get_uint32, set_uint32, val);
}

int attr_get_uint32 (attr_t *attrs, const char *name, uint32_t *value)
{
const char *s;
uint32_t i;
char *endptr;

if (attr_get (attrs, name, &s, NULL) < 0)
return -1;

errno = 0;
i = strtoul (s, &endptr, 10);
if (errno != 0 || *endptr != '\0') {
errno = EINVAL;
return -1;
}
*value = i;
return 0;
}

const char *attr_first (attr_t *attrs)
{
struct entry *e = zhash_first (attrs->hash);
Expand Down
4 changes: 4 additions & 0 deletions src/broker/attr.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ int attr_add_active_int (attr_t *attrs, const char *name, int *val,
int attr_add_active_uint32 (attr_t *attrs, const char *name, uint32_t *val,
int flags);

/* Get an attribute and parse it as an integer value.
*/
int attr_get_uint32 (attr_t *attrs, const char *name, uint32_t *value);

/* Iterate over attribute names with internal cursor.
*/
const char *attr_first (attr_t *attrs);
Expand Down
30 changes: 25 additions & 5 deletions src/broker/boot_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@
#include <flux/hostlist.h>

#include "src/common/libutil/log.h"
#include "src/common/libutil/kary.h"
#include "src/common/libutil/errno_safe.h"
#include "src/common/libpmi/clique.h"

#include "attr.h"
#include "overlay.h"
#include "topology.h"
#include "boot_config.h"

#define DEFAULT_FANOUT 0


/* Copy 'fmt' into 'buf', substituting the following tokens:
* - %h host
Expand Down Expand Up @@ -456,8 +458,21 @@ int boot_config (flux_t *h, struct overlay *overlay, attr_t *attrs)
struct boot_conf conf;
uint32_t rank;
uint32_t size;
int fanout = overlay_get_fanout (overlay);
uint32_t fanout;
json_t *hosts = NULL;
struct topology *topo = NULL;

/* Fetch the tbon.fanout attribute and supply a default value if unset.
*/
if (attr_get_uint32 (attrs, "tbon.fanout", &fanout) < 0)
fanout = DEFAULT_FANOUT;
else
(void)attr_delete (attrs, "tbon.fanout", true);
if (attr_add_uint32 (attrs,
"tbon.fanout",
fanout,
FLUX_ATTRFLAG_IMMUTABLE) < 0)
return -1;

/* Ingest the [bootstrap] stanza.
*/
Expand Down Expand Up @@ -492,7 +507,10 @@ int boot_config (flux_t *h, struct overlay *overlay, attr_t *attrs)
/* Tell overlay network this broker's rank and size.
* If a curve certificate was provided, load it.
*/
if (overlay_set_geometry (overlay, size, rank) < 0)
if (!(topo = topology_create (size))
|| topology_set_kary (topo, fanout) < 0
|| topology_set_rank (topo, rank) < 0
|| overlay_set_topology (overlay, topo) < 0)
goto error;
if (conf.curve_cert) {
if (overlay_cert_load (overlay, conf.curve_cert) < 0)
Expand All @@ -509,7 +527,7 @@ int boot_config (flux_t *h, struct overlay *overlay, attr_t *attrs)
* attribute to the URI peers will connect to. If broker has no
* downstream peers, set tbon.endpoint to NULL.
*/
if (kary_childof (fanout, size, rank, 0) != KARY_NONE) {
if (topology_get_child_ranks (topo, NULL, 0) > 0) {
char bind_uri[MAX_URI + 1];
char my_uri[MAX_URI + 1];

Expand Down Expand Up @@ -559,7 +577,7 @@ int boot_config (flux_t *h, struct overlay *overlay, attr_t *attrs)
char parent_uri[MAX_URI + 1];
if (boot_config_geturibyrank (hosts,
&conf,
kary_parentof (fanout, rank),
topology_get_parent (topo),
parent_uri,
sizeof (parent_uri)) < 0)
goto error;
Expand All @@ -584,9 +602,11 @@ int boot_config (flux_t *h, struct overlay *overlay, attr_t *attrs)
goto error;
}
json_decref (hosts);
topology_decref (topo);
return 0;
error:
ERRNO_SAFE_WRAP (json_decref, hosts);
ERRNO_SAFE_WRAP (topology_decref, topo);
return -1;
}

Expand Down
56 changes: 39 additions & 17 deletions src/broker/boot_pmi.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,19 @@
#include "src/common/libutil/log.h"
#include "src/common/libutil/cleanup.h"
#include "src/common/libutil/ipaddr.h"
#include "src/common/libutil/kary.h"
#include "src/common/libutil/errno_safe.h"
#include "src/common/libpmi/pmi.h"
#include "src/common/libpmi/pmi_strerror.h"
#include "src/common/libpmi/clique.h"

#include "attr.h"
#include "overlay.h"
#include "topology.h"
#include "boot_pmi.h"
#include "pmiutil.h"

#define DEFAULT_FANOUT 2


/* If the broker is launched via flux-shell, then the shell may opt
* to set a "flux.instance-level" parameter in the PMI kvs to tell
Expand Down Expand Up @@ -178,8 +180,7 @@ static int set_hostlist_attr (attr_t *attrs, struct hostlist *hl)

int boot_pmi (struct overlay *overlay, attr_t *attrs)
{
int fanout = overlay_get_fanout (overlay);
int rank;
uint32_t fanout;
char key[64];
char val[1024];
char hostname[MAXHOSTNAMELEN + 1];
Expand All @@ -188,10 +189,26 @@ int boot_pmi (struct overlay *overlay, attr_t *attrs)
json_t *o;
struct pmi_handle *pmi;
struct pmi_params pmi_params;
struct topology *topo = NULL;
int child_count;
int *child_ranks = NULL;
int result;
const char *uri;
int i;

/* Fetch the tbon.fanout attribute and supply a default value if unset.
*/
if (attr_get_uint32 (attrs, "tbon.fanout", &fanout) < 0)
fanout = DEFAULT_FANOUT;
else
(void)attr_delete (attrs, "tbon.fanout", true);
if (attr_add_uint32 (attrs,
"tbon.fanout",
fanout,
FLUX_ATTRFLAG_IMMUTABLE) < 0)
return -1;


memset (&pmi_params, 0, sizeof (pmi_params));
if (!(pmi = broker_pmi_create ())) {
log_err ("broker_pmi_create");
Expand All @@ -215,9 +232,10 @@ int boot_pmi (struct overlay *overlay, attr_t *attrs)
log_err ("error setting broker.mapping attribute");
goto error;
}
if (overlay_set_geometry (overlay,
pmi_params.size,
pmi_params.rank) < 0)
if (!(topo = topology_create (pmi_params.size))
|| topology_set_kary (topo, fanout) < 0
|| topology_set_rank (topo, pmi_params.rank) < 0
|| overlay_set_topology (overlay, topo) < 0)
goto error;
if (gethostname (hostname, sizeof (hostname)) < 0) {
log_err ("gethostname");
Expand All @@ -242,15 +260,17 @@ int boot_pmi (struct overlay *overlay, attr_t *attrs)
*/
overlay_set_ipv6 (overlay, 1);

child_count = topology_get_child_ranks (topo, NULL, 0);
if (child_count > 0) {
if (!(child_ranks = calloc (child_count, sizeof (child_ranks[0])))
|| topology_get_child_ranks (topo, child_ranks, child_count) < 0)
goto error;
}

/* If there are to be downstream peers, then bind to socket and extract
* the concretized URI for sharing with other ranks.
* N.B. there are no downstream peers if the 0th child of this rank
* in k-ary tree does not exist.
*/
if (kary_childof (fanout,
pmi_params.size,
pmi_params.rank,
0) != KARY_NONE) {
if (child_count > 0) {
char buf[1024];

if (format_bind_uri (buf, sizeof (buf), attrs, pmi_params.rank) < 0)
Expand Down Expand Up @@ -306,8 +326,8 @@ int boot_pmi (struct overlay *overlay, attr_t *attrs)
if (pmi_params.rank > 0) {
const char *peer_pubkey;
const char *peer_uri;
int rank = topology_get_parent (topo);

rank = kary_parentof (fanout, pmi_params.rank);
if (snprintf (key, sizeof (key), "%d", rank) >= sizeof (key)) {
log_msg ("pmi key string overflow");
goto error;
Expand Down Expand Up @@ -343,12 +363,10 @@ int boot_pmi (struct overlay *overlay, attr_t *attrs)

/* Fetch the business card of children and inform overlay of public keys.
*/
for (i = 0; i < fanout; i++) {
for (i = 0; i < child_count; i++) {
const char *peer_pubkey;
int rank = child_ranks[i];

rank = kary_childof (fanout, pmi_params.size, pmi_params.rank, i);
if (rank == KARY_NONE)
break;
if (snprintf (key, sizeof (key), "%d", rank) >= sizeof (key)) {
log_msg ("pmi key string overflow");
goto error;
Expand Down Expand Up @@ -427,11 +445,15 @@ int boot_pmi (struct overlay *overlay, attr_t *attrs)
free (bizcard);
broker_pmi_destroy (pmi);
hostlist_destroy (hl);
free (child_ranks);
topology_decref (topo);
return 0;
error:
free (bizcard);
broker_pmi_destroy (pmi);
hostlist_destroy (hl);
free (child_ranks);
topology_decref (topo);
return -1;
}

Expand Down
1 change: 0 additions & 1 deletion src/broker/broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
#include "src/common/libutil/cleanup.h"
#include "src/common/libidset/idset.h"
#include "src/common/libutil/ipaddr.h"
#include "src/common/libutil/kary.h"
#include "src/common/libpmi/pmi.h"
#include "src/common/libpmi/pmi_strerror.h"
#include "src/common/libutil/fsd.h"
Expand Down
Loading