Skip to content

Commit

Permalink
resource-hwloc: add fwd_count hint to aggregation
Browse files Browse the repository at this point in the history
Use the `fwd_count` hint in the aggregator.push rpc to allow the
aggregator module to more efficiently forward hwloc aggregate
upstream. When fwd_count == 0, the module only forwards aggregates
after a timeout, while with fwd_count the module is able to
immediately forward aggregate entries upstream when all descendants
have added their entries to the aggregate.
  • Loading branch information
grondo committed Jan 21, 2019
1 parent c9ab9dd commit 0c951a4
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 4 deletions.
6 changes: 4 additions & 2 deletions src/modules/resource-hwloc/aggregate.c
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ int aggregate_unpack_to_kvs (flux_future_t *f, const char *path)
return (rc);
}

flux_future_t *aggregator_push_json (flux_t *h, const char *key, json_t *o)
flux_future_t *aggregator_push_json (flux_t *h, int fwd_count,
const char *key, json_t *o)
{
uint32_t size;
uint32_t rank;
Expand All @@ -169,9 +170,10 @@ flux_future_t *aggregator_push_json (flux_t *h, const char *key, json_t *o)
return NULL;

return flux_rpc_pack (h, "aggregator.push", FLUX_NODEID_ANY, 0,
"{s:s,s:i,s:{s:o}}",
"{s:s,s:i,s:i,s:{s:o}}",
"key", key,
"total", size,
"fwd_count", fwd_count,
"entries", rankstr, o);
}

Expand Down
3 changes: 2 additions & 1 deletion src/modules/resource-hwloc/aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
* Push single json object `o` to local aggregator module via RPC.
* Steals the reference to `o`.
*/
flux_future_t *aggregator_push_json (flux_t *h, const char *key, json_t *o);
flux_future_t *aggregator_push_json (flux_t *h, int fwd_count,
const char *key, json_t *o);

/* Fulfill future when aggregate at `key` is "complete", i.e.
* count == total. Use aggreate_wait_get_unpack () to unpack final
Expand Down
13 changes: 12 additions & 1 deletion src/modules/resource-hwloc/resource.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
typedef struct
{
uint32_t rank;
int fwd_count; // Local forward count for reload aggregation
unsigned int reload_count; // Sequence number for reload request
hwloc_topology_t topology;
} resource_ctx_t;
Expand Down Expand Up @@ -130,13 +131,23 @@ static void resource_hwloc_ctx_destroy (resource_ctx_t *ctx)
}
}

static int get_fwd_count (flux_t *h)
{
const char *s = flux_attr_get (h, "tbon.descendants");
long v = strtol (s, NULL, 10);
if (v >= 0)
return ((int) v + 1);
return (0);
}

static resource_ctx_t *resource_hwloc_ctx_create (flux_t *h)
{
resource_ctx_t *ctx = xzmalloc (sizeof(resource_ctx_t));
if (flux_get_rank (h, &ctx->rank) < 0) {
flux_log_error (h, "flux_get_rank");
goto error;
}
ctx->fwd_count = get_fwd_count (h);
if (ctx_hwloc_init (h, ctx)) {
flux_log_error (h, "hwloc context could not be created");
goto error;
Expand Down Expand Up @@ -268,7 +279,7 @@ static int aggregate_push_rank_info (flux_t *h, resource_ctx_t *ctx,
flux_log_error (h, "%s: topo_info_tojson", __FUNCTION__);
goto done;
}
if (!(f = aggregator_push_json (h, key, o))
if (!(f = aggregator_push_json (h, ctx->fwd_count, key, o))
|| (flux_future_get (f, NULL) < 0)) {
flux_log_error (h, "%s: aggregator.push", __FUNCTION__);
goto done;
Expand Down

0 comments on commit 0c951a4

Please sign in to comment.