Skip to content

Commit

Permalink
resource: integrate with resource.acquire RPC
Browse files Browse the repository at this point in the history
Perform the initial resource graph data store
population when the first response
of resource.acquire arrives.

Update the resource graph whenever subsequence
responses arrive.

Introduce a structure to ultimately map the keys
from the responses of resource.acquire to "grow",
"shrink", "up" and "down" -- to reduce churns
in the future.

The target update semantics is the following.
- grow: grow the resource graph by attaching
  the acquired subgraphs to the overall graph and
  the resulting resource graph becomes the basis
  for satisfiability

- shrink: shrink the resource graph by detaching
  the subgraphs from the graph and the resulting
  resource graph becomes the basis for
  satisfiability

- up/down: mark the resource subgraph up or down.
  The resources that are marked down will not
  be used for further scheduling. However, the state
  changes due to the marking do not affect
  satisfiability.

Currently there are a few deficiencies that prevent
this commit from implementing this semantics fully:
- resource.acquire currently reports
  "administrative node exclusion" as "down". Since
  Fluxion can't tell the difference between
  an exclusion event and other types of real
  down events that shouldn't  affect satisfiability,
  satisfiability semantics will not be completely
  correct when a node is administratively
  excluded at runtime via resource config file
  reloading.

- vertex/edge removal required for shrink requires
  more investigations due to Boost Graph Library
  limitation.

- mark() method within traverser has not yet
  been implemented so marking is currently NOOP.
  • Loading branch information
dongahn committed Jun 17, 2020
1 parent 45d893a commit 1be7d51
Showing 1 changed file with 180 additions and 43 deletions.
223 changes: 180 additions & 43 deletions resource/modules/resource_match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ extern "C" {
#include "config.h"
#endif
#include <flux/core.h>
#include <flux/idset.h>
#include <jansson.h>
#include "src/common/libutil/shortjansson.h"
}
Expand Down Expand Up @@ -91,6 +92,7 @@ struct resource_ctx_t : public resource_interface_t {
std::shared_ptr<resource_graph_db_t> db; /* Resource graph data store */
std::shared_ptr<f_resource_graph_t> fgraph; /* Filtered graph */
std::shared_ptr<match_writers_t> writers; /* Vertex/Edge writers */
std::shared_ptr<resource_reader_base_t> reader; /* resource reader */
match_perf_t perf; /* Match performance stats */
std::map<uint64_t, std::shared_ptr<job_info_t>> jobs; /* Jobs table */
std::map<uint64_t, uint64_t> allocations; /* Allocation table */
Expand Down Expand Up @@ -227,6 +229,7 @@ static std::shared_ptr<resource_ctx_t> getctx (flux_t *h)
ctx->matcher = nullptr; /* Cannot be allocated at this point */
ctx->fgraph = nullptr; /* Cannot be allocated at this point */
ctx->writers = nullptr; /* Cannot be allocated at this point */
ctx->reader = nullptr; /* Cannot be allocated at this point */
}

done:
Expand Down Expand Up @@ -385,8 +388,7 @@ static json_t *get_string_blocking (flux_t *h, const char *key)
return NULL;
}

static int populate_resource_db_file (std::shared_ptr<resource_ctx_t> &ctx,
std::shared_ptr<resource_reader_base_t> rd)
static int populate_resource_db_file (std::shared_ptr<resource_ctx_t> &ctx)
{
int rc = -1;
std::ifstream in_file;
Expand All @@ -401,9 +403,9 @@ static int populate_resource_db_file (std::shared_ptr<resource_ctx_t> &ctx,
}
buffer << in_file.rdbuf ();
in_file.close ();
if ( (rc = ctx->db->load (buffer.str (), rd)) < 0) {
if ( (rc = ctx->db->load (buffer.str (), ctx->reader)) < 0) {
flux_log (ctx->h, LOG_ERR, "%s: reader: %s",
__FUNCTION__, rd->err_message ().c_str ());
__FUNCTION__, ctx->reader->err_message ().c_str ());
goto done;
}
rc = 0;
Expand All @@ -412,65 +414,200 @@ static int populate_resource_db_file (std::shared_ptr<resource_ctx_t> &ctx,
return rc;
}

static int populate_resource_db_kvs (std::shared_ptr<resource_ctx_t> &ctx,
std::shared_ptr<resource_reader_base_t> rd)
static int grow (std::shared_ptr<resource_ctx_t> &ctx,
vtx_t v, unsigned int rank)
{
int n = -1;
int rc = -1;
char k[64] = {0};
uint32_t rank = 0;
uint32_t size = 0;
json_t *o = NULL;
flux_t *h = ctx->h;
int saved_errno = 0;
const char *hwloc_xml = NULL;
resource_graph_db_t &db = *(ctx->db);
vtx_t v = boost::graph_traits<resource_graph_t>::null_vertex ();

if (flux_get_size (h, &size) == -1) {
flux_log (h, LOG_ERR, "%s: flux_get_size", __FUNCTION__);
goto done;
}

// For 0th rank -- special case to use rd->unpack
rank = 0;
n = snprintf (k, sizeof (k), "resource.hwloc.xml.%" PRIu32 "", rank);
if ((n < 0) || ((unsigned int) n > sizeof (k))) {
errno = ENOMEM;
goto done;
goto ret;
}
if ( !(o = get_string_blocking (ctx->h, k))) {
flux_log_error (ctx->h, "%s: get_string_blocking", __FUNCTION__);
goto ret;
}
o = get_string_blocking (h, k);
hwloc_xml = json_string_value (o);
if ( (rc = db.load (hwloc_xml, rd, rank)) < 0) {
flux_log (ctx->h, LOG_ERR, "%s: reader: %s",
__FUNCTION__, rd->err_message ().c_str ());
goto done;
if (v == boost::graph_traits<resource_graph_t>::null_vertex ()) {
if ( (rc = db.load (hwloc_xml, ctx->reader, rank)) < 0) {
flux_log (ctx->h, LOG_ERR, "%s: reader: %s",
__FUNCTION__, ctx->reader->err_message ().c_str ());
goto freemem_ret;
}
} else {
if ( (rc = db.load (hwloc_xml, ctx->reader, v, rank)) < 0) {
flux_log (ctx->h, LOG_ERR, "%s: reader: %s",
__FUNCTION__, ctx->reader->err_message ().c_str ());
goto freemem_ret;
}
}

freemem_ret:
saved_errno = errno;
json_decref (o);
errno = saved_errno;
ret:
return rc;
}

static int grow_resource_db (std::shared_ptr<resource_ctx_t> &ctx,
struct idset *ids)
{
int rc = -1;
resource_graph_db_t &db = *(ctx->db);
unsigned int rank = idset_first (ids);
vtx_t v = boost::graph_traits<resource_graph_t>::null_vertex ();

if (db.metadata.roots.find ("containment") == db.metadata.roots.end ()) {
// Special case to use rd->unpacpk
if ( (rc = grow (ctx, v, rank)) < 0)
goto done;
}
Jput (o);
if (db.metadata.roots.find ("containment") == db.metadata.roots.end ()) {
rc = -1;
errno = EINVAL;
flux_log (ctx->h, LOG_ERR, "%s: cluster vertex is unavailable",
__FUNCTION__);
goto done;
}
v = db.metadata.roots.at ("containment");

// For the rest of the ranks -- general case
for (rank=1; rank < size; rank++) {
n = snprintf (k, sizeof (k), "resource.hwloc.xml.%" PRIu32 "", rank);
if ((n < 0) || ((unsigned int) n > sizeof (k))) {
errno = ENOMEM;
goto done;
rank = idset_next (ids, rank);
while (rank != IDSET_INVALID_ID) {
// For the rest of the ranks -- general case
if ( (rc = grow (ctx, v, rank)) < 0)
goto done;
rank = idset_next (ids, rank);
}

done:
return rc;
}

static int update_resource_db (std::shared_ptr<resource_ctx_t> &ctx,
struct idset *grow_set,
const char *up,
const char *down)
{
int rc = 0;
if (grow_set && (rc = grow_resource_db (ctx, grow_set)) < 0) {
flux_log_error (ctx->h, "%s: grow_resource_db", __FUNCTION__);
goto done;
}
if (up)
flux_log (ctx->h, LOG_DEBUG, "%s: mark(up) NYI", __FUNCTION__);
if (down)
flux_log (ctx->h, LOG_DEBUG, "%s: mark(down) NYI", __FUNCTION__);

#if 0
// FIXME: once mark() is implemented within traverser, take this off
// FIXME: unpack up into std::set and pass the reference to the set
// object into mark()
if (up && (rc = ctx->traverser->mark (up,
resource_pool_t::status_t::UP)) < 0) {
flux_log_error (ctx->h, "%s: mark (up)", __FUNCTION__);
goto done;
}
if (down
&& (rc = ctx->traverser->mark (down,
resource_pool_t::status_t::DOWN)) < 0) {
flux_log_error (ctx->h, "%s: mark (down)", __FUNCTION__);
goto done;
}
#endif
done:
return rc;
}

static struct idset *get_grow_idset (json_t *o)
{
json_t *v = NULL;
const char *k = NULL;
struct idset *ids = NULL;

if ( !(ids = idset_create (0, IDSET_FLAG_AUTOGROW)))
goto done;

json_object_foreach (o, k, v) {
struct idset *ids_tmp = NULL;
if ( !(ids_tmp = idset_decode (k)))
goto done;
unsigned int id = idset_first (ids_tmp);
while (id != IDSET_INVALID_ID) {
if (idset_set (ids, id) < 0)
goto done;
id = idset_next (ids_tmp, id);
}
o = get_string_blocking (h, k);
hwloc_xml = json_string_value (o);
if ( (rc = db.load (hwloc_xml, rd, v, rank)) < 0) {
flux_log (ctx->h, LOG_ERR, "%s: reader: %s",
__FUNCTION__, rd->err_message ().c_str ());
idset_destroy (ids_tmp);
}
done:
return ids;
}

static void update_resource (flux_future_t *f, void *arg)
{
int rc = -1;
const char *up = NULL;
const char *down = NULL;
json_t *grows = NULL;
struct idset *grow_set = NULL;
std::shared_ptr<resource_ctx_t> ctx = getctx ((flux_t *)arg);

if ( (rc = flux_rpc_get_unpack (f, "{s?:o s?:s s?:s}",
"resources", &grows,
"up", &up,
"down", &down)) < 0) {
flux_log_error (ctx->h, "%s: exiting due to resource.acquire failure",
__FUNCTION__);
flux_reactor_stop (flux_get_reactor (ctx->h));
goto done;
}
if (grows) {
if ( !(grow_set = get_grow_idset (grows))) {
rc = -1;
flux_log_error (ctx->h, "%s: get_grow_idset", __FUNCTION__);
goto done;
}
Jput (o);
}
rc = 0;
if ( (rc = update_resource_db (ctx, grow_set, up, down)) < 0) {
flux_log_error (ctx->h, "%s: update_resource_db", __FUNCTION__);
goto done;
}
done:
idset_destroy (grow_set);
flux_future_reset (f);
ctx->set_update_rc (rc);
}

static int populate_resource_db_kvs (std::shared_ptr<resource_ctx_t> &ctx)
{
int rc = -1;
json_t *o = NULL;

if ( !(ctx->update_f = flux_rpc (ctx->h, "resource.acquire", NULL,
FLUX_NODEID_ANY, FLUX_RPC_STREAMING))) {
flux_log_error (ctx->h, "%s: flux_rpc", __FUNCTION__);
goto done;
}

update_resource (ctx->update_f, static_cast<void *> (ctx->h));
if ( (rc = ctx->fetch_and_reset_update_rc ()) < 0) {
flux_log_error (ctx->h, "%s: update_resource", __FUNCTION__);
goto done;
}

if ( (rc = flux_future_then (ctx->update_f, -1.0, update_resource,
static_cast<void *> (ctx->h))) < 0) {
flux_log_error (ctx->h, "%s: flux_future_then", __FUNCTION__);
goto done;
}
done:
return rc;
}
Expand All @@ -480,19 +617,19 @@ static int populate_resource_db (std::shared_ptr<resource_ctx_t> &ctx)
int rc = -1;
double elapse;
struct timeval st, et;
std::shared_ptr<resource_reader_base_t> rd;

if (ctx->args.reserve_vtx_vec != 0)
ctx->db->resource_graph.m_vertices.reserve (ctx->args.reserve_vtx_vec);
if ( (rd = create_resource_reader (ctx->args.load_format)) == nullptr) {
if ( (ctx->reader = create_resource_reader (
ctx->args.load_format)) == nullptr) {
flux_log (ctx->h, LOG_ERR, "%s: can't create load reader",
__FUNCTION__);
goto done;
}
if (ctx->args.load_whitelist != "") {
if (rd->set_whitelist (ctx->args.load_whitelist) < 0)
if (ctx->reader->set_whitelist (ctx->args.load_whitelist) < 0)
flux_log (ctx->h, LOG_ERR, "%s: setting whitelist", __FUNCTION__);
if (!rd->is_whitelist_supported ())
if (!ctx->reader->is_whitelist_supported ())
flux_log (ctx->h, LOG_WARNING, "%s: whitelist unsupported",
__FUNCTION__);
}
Expand All @@ -501,15 +638,15 @@ static int populate_resource_db (std::shared_ptr<resource_ctx_t> &ctx)
goto done;
}
if (ctx->args.load_file != "") {
if (populate_resource_db_file (ctx, rd) < 0) {
if (populate_resource_db_file (ctx) < 0) {
flux_log (ctx->h, LOG_ERR, "%s: error loading resources from file",
__FUNCTION__);
goto done;
}
flux_log (ctx->h, LOG_INFO, "%s: loaded resources from %s",
__FUNCTION__, ctx->args.load_file.c_str ());
} else {
if (populate_resource_db_kvs (ctx, rd) < 0) {
if (populate_resource_db_kvs (ctx) < 0) {
flux_log (ctx->h, LOG_ERR, "%s: loading resources from the KVS",
__FUNCTION__);
goto done;
Expand Down

0 comments on commit 1be7d51

Please sign in to comment.