diff --git a/resource/modules/resource_match.cpp b/resource/modules/resource_match.cpp index 5f0e53132..a7e9c8fc8 100644 --- a/resource/modules/resource_match.cpp +++ b/resource/modules/resource_match.cpp @@ -33,6 +33,7 @@ extern "C" { #include "config.h" #endif #include +#include #include #include "src/common/libutil/shortjansson.h" } @@ -91,6 +92,7 @@ struct resource_ctx_t : public resource_interface_t { std::shared_ptr db; /* Resource graph data store */ std::shared_ptr fgraph; /* Filtered graph */ std::shared_ptr writers; /* Vertex/Edge writers */ + std::shared_ptr reader; /* resource reader */ match_perf_t perf; /* Match performance stats */ std::map> jobs; /* Jobs table */ std::map allocations; /* Allocation table */ @@ -227,6 +229,7 @@ static std::shared_ptr getctx (flux_t *h) ctx->matcher = nullptr; /* Cannot be allocated at this point */ ctx->fgraph = nullptr; /* Cannot be allocated at this point */ ctx->writers = nullptr; /* Cannot be allocated at this point */ + ctx->reader = nullptr; /* Cannot be allocated at this point */ } done: @@ -385,8 +388,7 @@ static json_t *get_string_blocking (flux_t *h, const char *key) return NULL; } -static int populate_resource_db_file (std::shared_ptr &ctx, - std::shared_ptr rd) +static int populate_resource_db_file (std::shared_ptr &ctx) { int rc = -1; std::ifstream in_file; @@ -401,9 +403,9 @@ static int populate_resource_db_file (std::shared_ptr &ctx, } buffer << in_file.rdbuf (); in_file.close (); - if ( (rc = ctx->db->load (buffer.str (), rd)) < 0) { + if ( (rc = ctx->db->load (buffer.str (), ctx->reader)) < 0) { flux_log (ctx->h, LOG_ERR, "%s: reader: %s", - __FUNCTION__, rd->err_message ().c_str ()); + __FUNCTION__, ctx->reader->err_message ().c_str ()); goto done; } rc = 0; @@ -412,65 +414,200 @@ static int populate_resource_db_file (std::shared_ptr &ctx, return rc; } -static int populate_resource_db_kvs (std::shared_ptr &ctx, - std::shared_ptr rd) +static int grow (std::shared_ptr &ctx, + vtx_t v, unsigned int rank) { int n = -1; int rc = -1; char k[64] = {0}; - uint32_t rank = 0; - uint32_t size = 0; json_t *o = NULL; - flux_t *h = ctx->h; + int saved_errno = 0; const char *hwloc_xml = NULL; resource_graph_db_t &db = *(ctx->db); - vtx_t v = boost::graph_traits::null_vertex (); - - if (flux_get_size (h, &size) == -1) { - flux_log (h, LOG_ERR, "%s: flux_get_size", __FUNCTION__); - goto done; - } - // For 0th rank -- special case to use rd->unpack - rank = 0; n = snprintf (k, sizeof (k), "resource.hwloc.xml.%" PRIu32 "", rank); if ((n < 0) || ((unsigned int) n > sizeof (k))) { errno = ENOMEM; - goto done; + goto ret; + } + if ( !(o = get_string_blocking (ctx->h, k))) { + flux_log_error (ctx->h, "%s: get_string_blocking", __FUNCTION__); + goto ret; } - o = get_string_blocking (h, k); hwloc_xml = json_string_value (o); - if ( (rc = db.load (hwloc_xml, rd, rank)) < 0) { - flux_log (ctx->h, LOG_ERR, "%s: reader: %s", - __FUNCTION__, rd->err_message ().c_str ()); - goto done; + if (v == boost::graph_traits::null_vertex ()) { + if ( (rc = db.load (hwloc_xml, ctx->reader, rank)) < 0) { + flux_log (ctx->h, LOG_ERR, "%s: reader: %s", + __FUNCTION__, ctx->reader->err_message ().c_str ()); + goto freemem_ret; + } + } else { + if ( (rc = db.load (hwloc_xml, ctx->reader, v, rank)) < 0) { + flux_log (ctx->h, LOG_ERR, "%s: reader: %s", + __FUNCTION__, ctx->reader->err_message ().c_str ()); + goto freemem_ret; + } + } + +freemem_ret: + saved_errno = errno; + json_decref (o); + errno = saved_errno; +ret: + return rc; +} + +static int grow_resource_db (std::shared_ptr &ctx, + struct idset *ids) +{ + int rc = -1; + resource_graph_db_t &db = *(ctx->db); + unsigned int rank = idset_first (ids); + vtx_t v = boost::graph_traits::null_vertex (); + + if (db.metadata.roots.find ("containment") == db.metadata.roots.end ()) { + // Special case to use rd->unpacpk + if ( (rc = grow (ctx, v, rank)) < 0) + goto done; } - Jput (o); if (db.metadata.roots.find ("containment") == db.metadata.roots.end ()) { + rc = -1; + errno = EINVAL; flux_log (ctx->h, LOG_ERR, "%s: cluster vertex is unavailable", __FUNCTION__); goto done; } v = db.metadata.roots.at ("containment"); - // For the rest of the ranks -- general case - for (rank=1; rank < size; rank++) { - n = snprintf (k, sizeof (k), "resource.hwloc.xml.%" PRIu32 "", rank); - if ((n < 0) || ((unsigned int) n > sizeof (k))) { - errno = ENOMEM; - goto done; + rank = idset_next (ids, rank); + while (rank != IDSET_INVALID_ID) { + // For the rest of the ranks -- general case + if ( (rc = grow (ctx, v, rank)) < 0) + goto done; + rank = idset_next (ids, rank); + } + +done: + return rc; +} + +static int update_resource_db (std::shared_ptr &ctx, + struct idset *grow_set, + const char *up, + const char *down) +{ + int rc = 0; + if (grow_set && (rc = grow_resource_db (ctx, grow_set)) < 0) { + flux_log_error (ctx->h, "%s: grow_resource_db", __FUNCTION__); + goto done; + } + if (up) + flux_log (ctx->h, LOG_DEBUG, "%s: mark(up) NYI", __FUNCTION__); + if (down) + flux_log (ctx->h, LOG_DEBUG, "%s: mark(down) NYI", __FUNCTION__); + +#if 0 + // FIXME: once mark() is implemented within traverser, take this off + // FIXME: unpack up into std::set and pass the reference to the set + // object into mark() + if (up && (rc = ctx->traverser->mark (up, + resource_pool_t::status_t::UP)) < 0) { + flux_log_error (ctx->h, "%s: mark (up)", __FUNCTION__); + goto done; + } + if (down + && (rc = ctx->traverser->mark (down, + resource_pool_t::status_t::DOWN)) < 0) { + flux_log_error (ctx->h, "%s: mark (down)", __FUNCTION__); + goto done; + } +#endif +done: + return rc; +} + +static struct idset *get_grow_idset (json_t *o) +{ + json_t *v = NULL; + const char *k = NULL; + struct idset *ids = NULL; + + if ( !(ids = idset_create (0, IDSET_FLAG_AUTOGROW))) + goto done; + + json_object_foreach (o, k, v) { + struct idset *ids_tmp = NULL; + if ( !(ids_tmp = idset_decode (k))) + goto done; + unsigned int id = idset_first (ids_tmp); + while (id != IDSET_INVALID_ID) { + if (idset_set (ids, id) < 0) + goto done; + id = idset_next (ids_tmp, id); } - o = get_string_blocking (h, k); - hwloc_xml = json_string_value (o); - if ( (rc = db.load (hwloc_xml, rd, v, rank)) < 0) { - flux_log (ctx->h, LOG_ERR, "%s: reader: %s", - __FUNCTION__, rd->err_message ().c_str ()); + idset_destroy (ids_tmp); + } +done: + return ids; +} + +static void update_resource (flux_future_t *f, void *arg) +{ + int rc = -1; + const char *up = NULL; + const char *down = NULL; + json_t *grows = NULL; + struct idset *grow_set = NULL; + std::shared_ptr ctx = getctx ((flux_t *)arg); + + if ( (rc = flux_rpc_get_unpack (f, "{s?:o s?:s s?:s}", + "resources", &grows, + "up", &up, + "down", &down)) < 0) { + flux_log_error (ctx->h, "%s: exiting due to resource.acquire failure", + __FUNCTION__); + flux_reactor_stop (flux_get_reactor (ctx->h)); + goto done; + } + if (grows) { + if ( !(grow_set = get_grow_idset (grows))) { + rc = -1; + flux_log_error (ctx->h, "%s: get_grow_idset", __FUNCTION__); goto done; } - Jput (o); } - rc = 0; + if ( (rc = update_resource_db (ctx, grow_set, up, down)) < 0) { + flux_log_error (ctx->h, "%s: update_resource_db", __FUNCTION__); + goto done; + } +done: + idset_destroy (grow_set); + flux_future_reset (f); + ctx->set_update_rc (rc); +} +static int populate_resource_db_kvs (std::shared_ptr &ctx) +{ + int rc = -1; + json_t *o = NULL; + + if ( !(ctx->update_f = flux_rpc (ctx->h, "resource.acquire", NULL, + FLUX_NODEID_ANY, FLUX_RPC_STREAMING))) { + flux_log_error (ctx->h, "%s: flux_rpc", __FUNCTION__); + goto done; + } + + update_resource (ctx->update_f, static_cast (ctx->h)); + if ( (rc = ctx->fetch_and_reset_update_rc ()) < 0) { + flux_log_error (ctx->h, "%s: update_resource", __FUNCTION__); + goto done; + } + + if ( (rc = flux_future_then (ctx->update_f, -1.0, update_resource, + static_cast (ctx->h))) < 0) { + flux_log_error (ctx->h, "%s: flux_future_then", __FUNCTION__); + goto done; + } done: return rc; } @@ -480,19 +617,19 @@ static int populate_resource_db (std::shared_ptr &ctx) int rc = -1; double elapse; struct timeval st, et; - std::shared_ptr rd; if (ctx->args.reserve_vtx_vec != 0) ctx->db->resource_graph.m_vertices.reserve (ctx->args.reserve_vtx_vec); - if ( (rd = create_resource_reader (ctx->args.load_format)) == nullptr) { + if ( (ctx->reader = create_resource_reader ( + ctx->args.load_format)) == nullptr) { flux_log (ctx->h, LOG_ERR, "%s: can't create load reader", __FUNCTION__); goto done; } if (ctx->args.load_whitelist != "") { - if (rd->set_whitelist (ctx->args.load_whitelist) < 0) + if (ctx->reader->set_whitelist (ctx->args.load_whitelist) < 0) flux_log (ctx->h, LOG_ERR, "%s: setting whitelist", __FUNCTION__); - if (!rd->is_whitelist_supported ()) + if (!ctx->reader->is_whitelist_supported ()) flux_log (ctx->h, LOG_WARNING, "%s: whitelist unsupported", __FUNCTION__); } @@ -501,7 +638,7 @@ static int populate_resource_db (std::shared_ptr &ctx) goto done; } if (ctx->args.load_file != "") { - if (populate_resource_db_file (ctx, rd) < 0) { + if (populate_resource_db_file (ctx) < 0) { flux_log (ctx->h, LOG_ERR, "%s: error loading resources from file", __FUNCTION__); goto done; @@ -509,7 +646,7 @@ static int populate_resource_db (std::shared_ptr &ctx) flux_log (ctx->h, LOG_INFO, "%s: loaded resources from %s", __FUNCTION__, ctx->args.load_file.c_str ()); } else { - if (populate_resource_db_kvs (ctx, rd) < 0) { + if (populate_resource_db_kvs (ctx) < 0) { flux_log (ctx->h, LOG_ERR, "%s: loading resources from the KVS", __FUNCTION__); goto done;