From c4b7899d50b8d664026961bdc994fecab3237c21 Mon Sep 17 00:00:00 2001 From: Daniel Milroy Date: Sat, 13 Apr 2024 14:23:19 -0700 Subject: [PATCH] rv1exec reader: implement update () Problem: issue https://github.com/flux-framework/flux-sched/issues/991 identified the need for an `rv1exec` implementation of update (). The need for the implementation is described in detail in the issue, but the primary motivation is to enable reloading Fluxion when using RV1 without the scheduling key and payload. The reader was not originally implemented due to the lack of information in the format. Examples include edges, exclusivity, paths with subsystems, and vertex sizes. To create a workable implementation, strong assumptions need to be made about resource exclusivity and edges. Add support for update () through helper functions that update vertex and edge metadata and infer exclusivity of node-level resources. --- resource/readers/resource_reader_rv1exec.cpp | 442 +++++++++++++++++-- resource/readers/resource_reader_rv1exec.hpp | 67 ++- 2 files changed, 459 insertions(+), 50 deletions(-) diff --git a/resource/readers/resource_reader_rv1exec.cpp b/resource/readers/resource_reader_rv1exec.cpp index 8ff14f3dd..f40c38a1e 100644 --- a/resource/readers/resource_reader_rv1exec.cpp +++ b/resource/readers/resource_reader_rv1exec.cpp @@ -222,6 +222,301 @@ int resource_reader_rv1exec_t::add_cluster_vertex (resource_graph_t &g, return 0; } +vtx_t resource_reader_rv1exec_t::find_vertex (resource_graph_t &g, + resource_graph_metadata_t &m, + vtx_t parent, int64_t id, + const std::string &subsys, + const std::string &type, + const std::string &basename, + const std::string &name, + int size, int rank) +{ + bool is_root = false; + std::string path = ""; + std::string vtx_name = ""; + vtx_t null_vtx = boost::graph_traits::null_vertex (); + + // Get properties of the vertex + if (parent == boost::graph_traits::null_vertex ()) + is_root = true; + + std::string idstr = (id != -1)? std::to_string (id) : ""; + std::string prefix = is_root ? "" : g[parent].paths[subsys]; + vtx_name = (name != "")? name : basename + idstr; + path = prefix + "/" + vtx_name; + + // Search graph metadata for vertex + const auto &vtx_iter = m.by_path.find (path); + // Not found; return null_vertex + if (vtx_iter == m.by_path.end ()) + return null_vtx; + // Found in by_path + for (vtx_t v : vtx_iter->second) { + if (g[v].rank == rank + && g[v].id == id + && g[v].size == size + && g[v].type == type) { + return v; + } + } + + return null_vtx; +} + +int resource_reader_rv1exec_t::update_vertex (resource_graph_t &g, + vtx_t vtx, + updater_data &update_data) +{ + + int rc = -1; + int64_t span = -1; + int64_t avail = -1; + planner_t *plans = NULL; + + // Check and update plan + if ( (plans = g[vtx].schedule.plans) == NULL) { + errno = EINVAL; + m_err_msg += __FUNCTION__; + m_err_msg += ": plan for " + g[vtx].name + " is null.\n"; + goto error; + } + if ( (avail = planner_avail_resources_during (plans, + update_data.at, + update_data.duration)) == -1) { + m_err_msg += __FUNCTION__; + m_err_msg += ": planner_avail_resource_during return -1 for "; + m_err_msg += g[vtx].name + ".\n"; + goto error; + } + if (avail < g[vtx].size) { + // if g[v] has already been allocated/reserved, this is an error + m_err_msg += __FUNCTION__; + m_err_msg += ": " + g[vtx].name + " is unavailable.\n"; + goto error; + } + // Update the vertex plan here (not in traverser code). + // Traverser update () will handle aggregate filters and + // exclusivity checking filter. + // Can't update the rank-level (node) vertex yet- we + // don't know if all its children are allocated. + // Note this is a hard-coded option. Support for more flexible + // types may require extending rv1exec. + if (g[vtx].type == "node") + return 0; + // Name is anything besides node + if ( (span = planner_add_span (plans, + update_data.at, + update_data.duration, + static_cast (g[vtx].size))) == -1) { + m_err_msg += __FUNCTION__; + m_err_msg += ": can't add span into " + g[vtx].name + ".\n"; + goto error; + } + + if (update_data.reserved) + g[vtx].schedule.reservations[update_data.jobid] = span; + else + g[vtx].schedule.allocations[update_data.jobid] = span; + + update_data.updated_vertices[g[vtx].rank].push_back (vtx); + + rc = 0; + +error: + return rc; +} + +int resource_reader_rv1exec_t::undo_vertices (resource_graph_t &g, + updater_data &update_data) +{ + + int rc = -1; + int64_t span = -1; + planner_t *plans = NULL; + + for (auto &[rank, vertices] : update_data.updated_vertices) { + for (vtx_t vtx : vertices) { + // Check plan + if ( (plans = g[vtx].schedule.plans) == NULL) { + errno = EINVAL; + m_err_msg += __FUNCTION__; + m_err_msg += ": plan for " + g[vtx].name + " is null.\n"; + goto error; + } + // Remove job tags + if (update_data.reserved) { + span = g[vtx].schedule.reservations.at (update_data.jobid); + g[vtx].schedule.reservations.erase (update_data.jobid); + } else { + span = g[vtx].schedule.allocations.at (update_data.jobid); + g[vtx].schedule.allocations.erase (update_data.jobid); + } + // Remove the span. + if (planner_rem_span (plans, span) == -1) { + m_err_msg += __FUNCTION__; + m_err_msg += ": can't remove span from " + g[vtx].name + ".\n"; + goto error; + } + } + } + + rc = 0; + +error: + return rc; +} + +int resource_reader_rv1exec_t::update_edges (resource_graph_t &g, + resource_graph_metadata_t &m, + vtx_t src, vtx_t dst, + updater_data &update_data) +{ + edg_t e; + int rc = -1; + bool found = false; + boost::graph_traits::out_edge_iterator ei, ei_end; + + boost::tie (ei, ei_end) = boost::out_edges (src, g); + for (; ei != ei_end; ++ei) { + if (boost::target (*ei, g) == dst) { + e = *ei; + found = true; + break; + } + } + if (!found) { + errno = EINVAL; + m_err_msg += __FUNCTION__; + m_err_msg += ": rv1exec edge not found in resource graph.\n"; + goto error; + } + g[e].idata.set_for_trav_update (g[dst].size, + true, update_data.token); + + return 0; + +error: + return -1; +} + +int resource_reader_rv1exec_t::update_exclusivity (resource_graph_t &g, + resource_graph_metadata_t &m, + vtx_t vtx, + updater_data &update_data) +{ + // idata tag and exclusive checker update + int64_t span = -1; + planner_t *plans = NULL; + + const auto &rank_ex = update_data.updated_vertices.find (g[vtx].rank); + if (rank_ex == update_data.updated_vertices.end ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": rank not found in agfilters map.\n"; + return -1; + } + // This check enforces a rigid constraint on rank equivalence + // between graph initialization and rank in rv1exec string. + const auto &by_rank = m.by_rank.find (g[vtx].rank); + if (by_rank == m.by_rank.end ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": rank not found in by_rank graph map.\n"; + return -1; + } + + // If all child vertices allocated, allocate this vertex. + // Subtract one since the current node hasn't been added to the + // updated_vertices map. + if (rank_ex->second.size () != (by_rank->second.size () - 1)) + return 0; + // Counts indicate exclusive + if ( (plans = g[vtx].schedule.plans) == NULL) { + errno = EINVAL; + m_err_msg += __FUNCTION__; + m_err_msg += ": plan for " + g[vtx].name + " is null.\n"; + return -1; + } + // Update the vertex plan here (not in traverser code). + // Traverser update () will handle aggregate filters and + // exclusivity checking filter. + if ( (span = planner_add_span (plans, update_data.at, update_data.duration, + static_cast (g[vtx].size))) == -1) { + m_err_msg += __FUNCTION__; + m_err_msg += ": can't add span into " + g[vtx].name + ".\n"; + return -1; + } + // Add job tags + if (update_data.reserved) + g[vtx].schedule.reservations[update_data.jobid] = span; + else + g[vtx].schedule.allocations[update_data.jobid] = span; + // Add to the updated vertices vector to undo upon error. + update_data.updated_vertices[g[vtx].rank].push_back (vtx); + + return 0; +} + +vtx_t resource_reader_rv1exec_t::add_or_update (resource_graph_t &g, + resource_graph_metadata_t &m, + vtx_t parent, int64_t id, + const std::string &subsys, + const std::string &type, + const std::string &basename, + const std::string &name, + int size, int rank, + std::map &properties, + updater_data &update_data) +{ + vtx_t vtx; + vtx_t null_vtx = boost::graph_traits::null_vertex (); + + if (!update_data.update) { + vtx = find_vertex (g, m, parent, id, subsys, type, basename, + name, size, rank); + // Shouldn't be found + if (vtx != boost::graph_traits::null_vertex ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": found duplicate vertex in graph for "; + m_err_msg += name + ".\n"; + return null_vtx; + } + // Add resources + vtx = add_vertex (g, m, parent, id, subsys, type, basename, + name, properties, size, rank); + if (vtx == boost::graph_traits::null_vertex ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": failed to add vertex for "; + m_err_msg += name + ".\n"; + return vtx; + } + if (add_edges (g, m, parent, vtx, subsys, + "contains", "in") < 0) { + m_err_msg += __FUNCTION__; + m_err_msg += ": failed to add edges for "; + m_err_msg += name + ".\n"; + return null_vtx; + } + } else { + // Update resources + vtx = find_vertex (g, m, parent, id, subsys, type, basename, + name, size, rank); + // Not found + if (vtx == boost::graph_traits::null_vertex ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": couldn't find vertex in graph for "; + m_err_msg += name + ".\n"; + return vtx; + } + if (update_vertex (g, vtx, update_data) == -1) + return null_vtx; + // Must be the containment subsystem + if (update_edges (g, m, parent, vtx, update_data) == -1) + return null_vtx; + } + + return vtx; +} + int resource_reader_rv1exec_t::build_rmap (json_t *rlite, std::map &rmap) { @@ -350,7 +645,8 @@ int resource_reader_rv1exec_t::unpack_child (resource_graph_t &g, const char *resource_ids, unsigned rank, std::map &pmap) + properties_t> &pmap, + updater_data &update_data) { int rc = -1; unsigned id; @@ -358,36 +654,39 @@ int resource_reader_rv1exec_t::unpack_child (resource_graph_t &g, if (!resource_type || !resource_ids) { errno = EINVAL; - goto ret; + goto error; } if ( !(ids = idset_decode (resource_ids))) - goto ret; + goto error; id = idset_first (ids); while (id != IDSET_INVALID_ID) { edg_t e; - vtx_t v; + vtx_t vtx; std::string name = resource_type + std::to_string (id); - std::map p; + std::map properties; if (pmap.find (rank) != pmap.end ()) { if (pmap[rank].exist (resource_type)) { - if (pmap[rank].copy (resource_type, p) < 0) - goto ret; + if (pmap[rank].copy (resource_type, properties) < 0) + goto error; } } - v = add_vertex (g, m, parent, id, - "containment", resource_type, - resource_type, name, p, 1, rank); - if (v == boost::graph_traits::null_vertex ()) - goto ret; - if (add_edges (g, m, parent, v, "containment", "contains", "in") < 0) - goto ret; + // Returns the added or updated vertex; null_vertex on error. + vtx = add_or_update (g, m, parent, id, "containment", resource_type, + resource_type, name, 1, rank, properties, + update_data); + if (vtx == boost::graph_traits::null_vertex ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": failed unpacking child for "; + m_err_msg += name + ".\n"; + goto error; + } id = idset_next (ids, id); } rc = 0; -ret: +error: idset_destroy (ids); return rc; } @@ -399,7 +698,8 @@ int resource_reader_rv1exec_t::unpack_children (resource_graph_t &g, json_t *children, unsigned rank, std::map &pmap) + properties_t> &pmap, + updater_data &update_data) { json_t *res_ids = nullptr; const char *res_type = nullptr; @@ -415,7 +715,8 @@ int resource_reader_rv1exec_t::unpack_children (resource_graph_t &g, goto error; } const char *ids_str = json_string_value (res_ids); - if (unpack_child (g, m, parent, res_type, ids_str, rank, pmap) < 0) + if (unpack_child (g, m, parent, res_type, ids_str, rank, pmap, + update_data) < 0) goto error; } return 0; @@ -432,11 +733,12 @@ int resource_reader_rv1exec_t::unpack_rank (resource_graph_t &g, struct hostlist *hlist, std::map &rmap, std::map &pmap) + properties_t> &pmap, + updater_data &update_data) { edg_t e; - vtx_t v; - int64_t iden; + vtx_t vtx; + int64_t id; const char *hostname = nullptr; std::string basename; std::map properties; @@ -452,7 +754,7 @@ int resource_reader_rv1exec_t::unpack_rank (resource_graph_t &g, if ( !(hostname = hostlist_nth (hlist, static_cast (rmap[rank])))) goto error; - if (get_hostname_suffix (hostname, iden) < 0 + if (get_hostname_suffix (hostname, id) < 0 || get_host_basename (hostname, basename) < 0) { m_err_msg += __FUNCTION__; m_err_msg += ": error splitting hostname="; @@ -465,18 +767,29 @@ int resource_reader_rv1exec_t::unpack_rank (resource_graph_t &g, goto error; } } - - // Create and add a node vertex and link with cluster vertex - v = add_vertex (g, m, parent, iden, "containment", - "node", basename, hostname, properties, 1, rank); - if (v == boost::graph_traits::null_vertex ()) - goto error; - if (add_edges (g, m, parent, v, "containment", "contains", "in") < 0) + // Returns the added or updated vertex; null_vtertex on error. + vtx = add_or_update (g, m, parent, id, "containment", "node", basename, + hostname, 1, rank, properties, update_data); + if (vtx == boost::graph_traits::null_vertex ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": failed unpacking rank for "; + m_err_msg += std::string (hostname) + ".\n"; goto error; + } // Unpack children node-local resources - if (unpack_children (g, m, v, children, rank, pmap) < 0) + if (unpack_children (g, m, vtx, children, rank, pmap, update_data) < 0) goto error; + if (update_data.update) { + // Update the rank's planner if all children allocated + if (update_exclusivity (g, m, vtx, update_data) == -1) { + m_err_msg += __FUNCTION__; + m_err_msg += ": exclusive filter update failed for "; + m_err_msg += std::string (hostname) + ".\n"; + goto error; + } + } + return 0; error: @@ -491,7 +804,8 @@ int resource_reader_rv1exec_t::unpack_rlite_entry (resource_graph_t &g, std::map &rmap, std::map &pmap) + properties_t> &pmap, + updater_data &update_data) { int rc = -1; unsigned rank; @@ -501,29 +815,30 @@ int resource_reader_rv1exec_t::unpack_rlite_entry (resource_graph_t &g, if (!entry || !hlist) { errno = EINVAL; - goto ret; + goto error; } if (json_unpack (entry, "{s:s s:o}", "rank", &ranks, "children", &children) < 0) { errno = EINVAL; - goto ret; + goto error; } if ( !(r_ids = idset_decode (ranks))) - goto ret; + goto error; rank = idset_first (r_ids); while (rank != IDSET_INVALID_ID) { - if (unpack_rank (g, m, parent, rank, children, hlist, rmap, pmap) < 0) - goto ret; + if (unpack_rank (g, m, parent, rank, children, hlist, rmap, pmap, + update_data) < 0) + goto error; rank = idset_next (r_ids, rank); } rc = 0; -ret: +error: idset_destroy (r_ids); return rc; } @@ -534,7 +849,8 @@ int resource_reader_rv1exec_t::unpack_rlite (resource_graph_t &g, struct hostlist *hlist, std::map &rmap, std::map &pmap) + properties_t> &pmap, + updater_data &update_data) { size_t index; vtx_t cluster_vtx; @@ -551,9 +867,12 @@ int resource_reader_rv1exec_t::unpack_rlite (resource_graph_t &g, } cluster_vtx = m.roots["containment"]; + // Set the cluster "needs" and make the update shared access to the cluster + m.v_rt_edges["containment"].set_for_trav_update (g[cluster_vtx].size, + false, update_data.token); json_array_foreach (rlite, index, entry) { if (unpack_rlite_entry (g, m, cluster_vtx, - entry, hlist, rmap, pmap) < 0) + entry, hlist, rmap, pmap, update_data) < 0) goto error; } return 0; @@ -564,7 +883,8 @@ int resource_reader_rv1exec_t::unpack_rlite (resource_graph_t &g, int resource_reader_rv1exec_t::unpack_internal (resource_graph_t &g, resource_graph_metadata_t &m, - json_t *rv1) + json_t *rv1, + updater_data &update_data) { int rc = -1; int version; @@ -612,7 +932,7 @@ int resource_reader_rv1exec_t::unpack_internal (resource_graph_t &g, if (hostlist_append (hlist, hlist_str) < 0) goto ret; } - if (unpack_rlite (g, m, rlite, hlist, rmap, pmap) < 0) + if (unpack_rlite (g, m, rlite, hlist, rmap, pmap, update_data) < 0) goto ret; rc = 0; @@ -642,6 +962,9 @@ int resource_reader_rv1exec_t::unpack (resource_graph_t &g, json_error_t error; json_t *rv1 = nullptr; int saved_errno; + updater_data null_data; + // Indicate adding, not updating + null_data.update = false; if (str == "") { errno = EINVAL; @@ -654,7 +977,8 @@ int resource_reader_rv1exec_t::unpack (resource_graph_t &g, errno = ENOMEM; goto ret; } - rc = unpack_internal (g, m, rv1); + + rc = unpack_internal (g, m, rv1, null_data); ret: saved_errno = errno; @@ -682,12 +1006,42 @@ int resource_reader_rv1exec_t::remove_subgraph (resource_graph_t &g, int resource_reader_rv1exec_t::update (resource_graph_t &g, resource_graph_metadata_t &m, - const std::string &str, int64_t jobid, + const std::string &R, int64_t jobid, int64_t at, uint64_t dur, bool rsv, uint64_t token) { - errno = ENOTSUP; // RV1Exec reader currently does not support update - return -1; + int rc = -1; + json_error_t error; + json_t *rv1 = nullptr; + int saved_errno; + updater_data update_data; + + if (R == "") { + errno = EINVAL; + goto ret; + } + + if ( !(rv1 = json_loads (R.c_str (), 0, &error))) { + errno = ENOMEM; + goto ret; + } + + update_data.update = true; + update_data.jobid = jobid; + update_data.at = at; + update_data.duration = dur; + update_data.reserved = rsv; + update_data.token = token; + + if ( (rc = unpack_internal (g, m, rv1, update_data)) == -1) { + undo_vertices (g, update_data); + } + +ret: + saved_errno = errno; + json_decref (rv1); + errno = saved_errno; + return rc; } bool resource_reader_rv1exec_t::is_allowlist_supported () diff --git a/resource/readers/resource_reader_rv1exec.hpp b/resource/readers/resource_reader_rv1exec.hpp index 77445918a..2caf9ff4c 100644 --- a/resource/readers/resource_reader_rv1exec.hpp +++ b/resource/readers/resource_reader_rv1exec.hpp @@ -21,6 +21,17 @@ extern "C" { namespace Flux { namespace resource_model { +// Struct to track data for update +struct updater_data { + int64_t jobid = 0; + int64_t at = 0; + uint64_t duration = 0; + bool reserved = false; + uint64_t token = 0; + // track updated vertices to undo upon error + std::map> updated_vertices; + bool update = true; // Updating or adding vertex +}; /*! RV1EXEC resource reader class. */ @@ -120,6 +131,44 @@ class resource_reader_rv1exec_t : public resource_reader_base_t { int add_cluster_vertex (resource_graph_t &g, resource_graph_metadata_t &m); + // Update functions + vtx_t find_vertex (resource_graph_t &g, resource_graph_metadata_t &m, + vtx_t parent, int64_t id, + const std::string &subsys, + const std::string &type, + const std::string &basename, + const std::string &name, + int size, int rank); + + int update_vertex (resource_graph_t &g, + vtx_t vtx, + updater_data &update_data); + + int undo_vertices (resource_graph_t &g, updater_data &update_data); + + int update_edges (resource_graph_t &g, + resource_graph_metadata_t &m, + vtx_t src, vtx_t dst, + updater_data &update_data); + + int update_exclusivity (resource_graph_t &g, + resource_graph_metadata_t &m, + vtx_t vtx, + updater_data &update_data); + + // Returns the added or updated vertex; null_vertex on error. + vtx_t add_or_update (resource_graph_t &g, + resource_graph_metadata_t &m, + vtx_t parent, int64_t id, + const std::string &subsys, + const std::string &type, + const std::string &basename, + const std::string &name, + int size, int rank, + std::map &properties, + updater_data &update_data); + int build_rmap (json_t *rlite, std::map &rmap); int build_pmap (json_t *properties, @@ -129,35 +178,41 @@ class resource_reader_rv1exec_t : public resource_reader_base_t { resource_graph_metadata_t &m, vtx_t parent, const char *resource_type, const char *resource_ids, unsigned rank, - std::map &pmap); + std::map &pmap, + updater_data &update_data); int unpack_children (resource_graph_t &g, resource_graph_metadata_t &m, vtx_t parent, json_t *children, unsigned rank, - std::map &pmap); + std::map &pmap, + updater_data &update_data); int unpack_rank (resource_graph_t &g, resource_graph_metadata_t &m, vtx_t parent, unsigned rank, json_t *children, struct hostlist *hlist, std::map &rmap, - std::map &pmap); + std::map &pmap, + updater_data &update_data); int unpack_rlite_entry (resource_graph_t &g, resource_graph_metadata_t &m, vtx_t parent, json_t *entry, struct hostlist *hlist, std::map &rmap, - std::map &pmap); + std::map &pmap, + updater_data &update_data); int unpack_rlite (resource_graph_t &g, resource_graph_metadata_t &m, json_t *rlite, struct hostlist *hlist, std::map &rmap, - std::map &pmap); + std::map &pmap, + updater_data &update_data); int unpack_internal (resource_graph_t &g, - resource_graph_metadata_t &m, json_t *rv1); + resource_graph_metadata_t &m, json_t *rv1, + updater_data &update_data); }; } // namespace resource_model