From a1640326cd8c69ac62ed0ae591a3f7d317a952e1 Mon Sep 17 00:00:00 2001 From: Daniel Milroy Date: Sat, 13 Apr 2024 14:23:19 -0700 Subject: [PATCH] rv1exec reader: implement update () Problem: issue https://github.com/flux-framework/flux-sched/issues/991 identified the need for an `rv1exec` implementation of update (). The need for the implementation is described in detail in the issue, but the primary motivation is to enable reloading Fluxion when using RV1 without the scheduling key and payload. The reader was not originally implemented due to the lack of information in the format. Examples include edges, exclusivity, paths with subsystems, and vertex sizes. To create a workable implementation, strong assumptions need to be made about resource exclusivity and edges. Add support for update () through helper functions that update vertex and edge metadata and infer exclusivity of node-level resources. --- resource/readers/resource_reader_rv1exec.cpp | 454 +++++++++++++++++-- resource/readers/resource_reader_rv1exec.hpp | 57 ++- 2 files changed, 460 insertions(+), 51 deletions(-) diff --git a/resource/readers/resource_reader_rv1exec.cpp b/resource/readers/resource_reader_rv1exec.cpp index 8ff14f3dd..e2e89a407 100644 --- a/resource/readers/resource_reader_rv1exec.cpp +++ b/resource/readers/resource_reader_rv1exec.cpp @@ -222,6 +222,243 @@ int resource_reader_rv1exec_t::add_cluster_vertex (resource_graph_t &g, return 0; } +vtx_t resource_reader_rv1exec_t::find_vertex (resource_graph_t &g, + resource_graph_metadata_t &m, + vtx_t parent, int64_t id, + const std::string &subsys, + const std::string &type, + const std::string &basename, + const std::string &name, + int size, int rank) +{ + bool is_root = false; + std::string path = ""; + std::string vtx_name = ""; + vtx_t vtx = boost::graph_traits::null_vertex (); + + // Get properties of the vertex + if (parent == boost::graph_traits::null_vertex ()) + is_root = true; + + std::string idstr = (id != -1)? std::to_string (id) : ""; + std::string prefix = is_root ? "" : g[parent].paths[subsys]; + vtx_name = (name != "")? name : basename + idstr; + path = prefix + "/" + vtx_name; + + // Search graph metadata for vertex + auto vtx_iter = m.by_path.find (path); + if (vtx_iter != m.by_path.end ()) { + for (vtx_t v : vtx_iter->second) { + if (g[v].rank == rank) { + vtx = v; + break; + } + } + } else // Not found in by_path map + return boost::graph_traits::null_vertex (); + // Check properties are the same + if (vtx == boost::graph_traits::null_vertex () + || g[vtx].id != id + || g[vtx].size != size + || g[vtx].type != type) + return boost::graph_traits::null_vertex (); + + return vtx; +} + +int resource_reader_rv1exec_t::update_vertex (resource_graph_t &g, + vtx_t vtx, + updater_data &update_data) +{ + + int rc = -1; + int64_t span = -1; + int64_t avail = -1; + planner_t *plans = NULL; + + // Check and update plan + if ( (plans = g[vtx].schedule.plans) == NULL) { + errno = EINVAL; + m_err_msg += __FUNCTION__; + m_err_msg += ": plan for " + g[vtx].name + " is null.\n"; + goto error; + } + if ( (avail = planner_avail_resources_during (plans, + update_data.at, + update_data.duration)) == -1) { + m_err_msg += __FUNCTION__; + m_err_msg += ": planner_avail_resource_during return -1 for "; + m_err_msg += g[vtx].name + ".\n"; + goto error; + } + if (avail < g[vtx].size) { + // if g[v] has already been allocated/reserved, this is an error + m_err_msg += __FUNCTION__; + m_err_msg += ": " + g[vtx].name + " is unavailable.\n"; + goto error; + } + // Update the vertex plan here (not in traverser code). + // Traverser update () will handle aggregate filters and + // exclusivity checking filter. + // Can't update the rank-level (node) vertex yet- we + // don't know if all its children are allocated. + // Note this is a hard-coded option. Support for more flexible + // types may require extending rv1exec. + if (g[vtx].type != "node") { + if ( (span = planner_add_span (plans, + update_data.at, + update_data.duration, + static_cast (g[vtx].size))) == -1) { + m_err_msg += __FUNCTION__; + m_err_msg += ": can't add span into " + g[vtx].name + ".\n"; + goto error; + } + + if (update_data.reserved) + g[vtx].schedule.reservations[update_data.jobid] = span; + else + g[vtx].schedule.allocations[update_data.jobid] = span; + + update_data.updated_vertices[g[vtx].rank].push_back (vtx); + } + + rc = 0; + +error: + return rc; +} + +int resource_reader_rv1exec_t::undo_vertices (resource_graph_t &g, + updater_data &update_data) +{ + + int rc = -1; + int64_t span = -1; + planner_t *plans = NULL; + + for (auto rank : update_data.updated_vertices) { + for (vtx_t vtx : update_data.updated_vertices.at (rank.first)) { + // Check plan + if ( (plans = g[vtx].schedule.plans) == NULL) { + errno = EINVAL; + m_err_msg += __FUNCTION__; + m_err_msg += ": plan for " + g[vtx].name + " is null.\n"; + goto error; + } + // Remove job tags + if (update_data.reserved) { + span = g[vtx].schedule.reservations.at (update_data.jobid); + g[vtx].schedule.reservations.erase (update_data.jobid); + } else { + span = g[vtx].schedule.allocations.at (update_data.jobid); + g[vtx].schedule.allocations.erase (update_data.jobid); + } + // Remove the span. + if (planner_rem_span (plans, span) == -1) { + m_err_msg += __FUNCTION__; + m_err_msg += ": can't remove span from " + g[vtx].name + ".\n"; + goto error; + } + } + } + + rc = 0; + +error: + return rc; +} + +int resource_reader_rv1exec_t::update_edges (resource_graph_t &g, + resource_graph_metadata_t &m, + vtx_t src, vtx_t dst, + const std::string &subsys, + const std::string &relation, + const std::string &rev_relation, + updater_data &update_data) +{ + edg_t e; + int rc = -1; + bool found = false; + boost::graph_traits::out_edge_iterator ei, ei_end; + + boost::tie (ei, ei_end) = boost::out_edges (src, g); + for (; ei != ei_end; ++ei) { + if (boost::target (*ei, g) == dst) { + e = *ei; + found = true; + break; + } + } + if (!found) { + errno = EINVAL; + m_err_msg += __FUNCTION__; + m_err_msg += ": JGF edge not found in resource graph.\n"; + goto error; + } + g[e].idata.set_for_trav_update (g[dst].size, + true, update_data.token); + + return 0; + +error: + return -1; +} + +int resource_reader_rv1exec_t::update_exclusivity (resource_graph_t &g, + resource_graph_metadata_t &m, + vtx_t vtx, + updater_data &update_data) +{ + // idata tag and exclusive checker update + int64_t span = -1; + planner_t *plans = NULL; + + auto rank_ex = update_data.updated_vertices.find (g[vtx].rank); + if (rank_ex == update_data.updated_vertices.end ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": rank not found in agfilters map.\n"; + return -1; + } + // This check enforces a rigid constraint on rank equivalence + // between graph initialization and rank in rv1exec string. + auto by_rank = m.by_rank.find (g[vtx].rank); + if (by_rank == m.by_rank.end ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": rank not found in by_rank graph map.\n"; + return -1; + } + + // If all child vertices allocated, allocate this vertex. + // Subtract one since the current node hasn't been added to the + // updated_vertices map. + if (rank_ex->second.size () == (by_rank->second.size () - 1)) { + if ( (plans = g[vtx].schedule.plans) == NULL) { + errno = EINVAL; + m_err_msg += __FUNCTION__; + m_err_msg += ": plan for " + g[vtx].name + " is null.\n"; + return -1; + } + // Update the vertex plan here (not in traverser code). + // Traverser update () will handle aggregate filters and + // exclusivity checking filter. + if ( (span = planner_add_span (plans, update_data.at, update_data.duration, + static_cast (g[vtx].size))) == -1) { + m_err_msg += __FUNCTION__; + m_err_msg += ": can't add span into " + g[vtx].name + ".\n"; + return -1; + } + // Add job tags + if (update_data.reserved) + g[vtx].schedule.reservations[update_data.jobid] = span; + else + g[vtx].schedule.allocations[update_data.jobid] = span; + // Add to the updated vertices vector to undo upon error. + update_data.updated_vertices[g[vtx].rank].push_back (vtx); + } + + return 0; +} + int resource_reader_rv1exec_t::build_rmap (json_t *rlite, std::map &rmap) { @@ -350,7 +587,8 @@ int resource_reader_rv1exec_t::unpack_child (resource_graph_t &g, const char *resource_ids, unsigned rank, std::map &pmap) + properties_t> &pmap, + updater_data &update_data) { int rc = -1; unsigned id; @@ -358,36 +596,73 @@ int resource_reader_rv1exec_t::unpack_child (resource_graph_t &g, if (!resource_type || !resource_ids) { errno = EINVAL; - goto ret; + goto error; } if ( !(ids = idset_decode (resource_ids))) - goto ret; + goto error; id = idset_first (ids); while (id != IDSET_INVALID_ID) { edg_t e; - vtx_t v; + vtx_t vtx; std::string name = resource_type + std::to_string (id); - std::map p; + std::map properties; if (pmap.find (rank) != pmap.end ()) { if (pmap[rank].exist (resource_type)) { - if (pmap[rank].copy (resource_type, p) < 0) - goto ret; + if (pmap[rank].copy (resource_type, properties) < 0) + goto error; } } - v = add_vertex (g, m, parent, id, - "containment", resource_type, - resource_type, name, p, 1, rank); - if (v == boost::graph_traits::null_vertex ()) - goto ret; - if (add_edges (g, m, parent, v, "containment", "contains", "in") < 0) - goto ret; + if (!update_data.update) { + vtx = find_vertex (g, m, parent, id, "containment", resource_type, + resource_type, name, 1, rank); + // Shouldn't be found + if (vtx != boost::graph_traits::null_vertex ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": found duplicate vertex in graph for "; + m_err_msg += name + ".\n"; + goto error; + } + // Add resources + vtx = add_vertex (g, m, parent, id, + "containment", resource_type, + resource_type, name, properties, 1, rank); + if (vtx == boost::graph_traits::null_vertex ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": failed to add vertex for "; + m_err_msg += name + ".\n"; + goto error; + } + if (add_edges (g, m, parent, vtx, "containment", + "contains", "in") < 0) { + m_err_msg += __FUNCTION__; + m_err_msg += ": failed to add edges for "; + m_err_msg += name + ".\n"; + goto error; + } + } else { + // Update resources + vtx = find_vertex (g, m, parent, id, "containment", resource_type, + resource_type, name, 1, rank); + // Not found + if (vtx == boost::graph_traits::null_vertex ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": couldn't find vertex in graph for "; + m_err_msg += name + ".\n"; + goto error; + } + if (update_vertex (g, vtx, update_data) == -1) + goto error; + if (update_edges (g, m, parent, vtx, "containment", "contains", + "in", update_data) == -1) + goto error; + } id = idset_next (ids, id); } rc = 0; -ret: +error: idset_destroy (ids); return rc; } @@ -399,7 +674,8 @@ int resource_reader_rv1exec_t::unpack_children (resource_graph_t &g, json_t *children, unsigned rank, std::map &pmap) + properties_t> &pmap, + updater_data &update_data) { json_t *res_ids = nullptr; const char *res_type = nullptr; @@ -415,7 +691,8 @@ int resource_reader_rv1exec_t::unpack_children (resource_graph_t &g, goto error; } const char *ids_str = json_string_value (res_ids); - if (unpack_child (g, m, parent, res_type, ids_str, rank, pmap) < 0) + if (unpack_child (g, m, parent, res_type, ids_str, rank, pmap, + update_data) < 0) goto error; } return 0; @@ -432,11 +709,12 @@ int resource_reader_rv1exec_t::unpack_rank (resource_graph_t &g, struct hostlist *hlist, std::map &rmap, std::map &pmap) + properties_t> &pmap, + updater_data &update_data) { edg_t e; - vtx_t v; - int64_t iden; + vtx_t vtx; + int64_t id; const char *hostname = nullptr; std::string basename; std::map properties; @@ -452,7 +730,7 @@ int resource_reader_rv1exec_t::unpack_rank (resource_graph_t &g, if ( !(hostname = hostlist_nth (hlist, static_cast (rmap[rank])))) goto error; - if (get_hostname_suffix (hostname, iden) < 0 + if (get_hostname_suffix (hostname, id) < 0 || get_host_basename (hostname, basename) < 0) { m_err_msg += __FUNCTION__; m_err_msg += ": error splitting hostname="; @@ -465,18 +743,63 @@ int resource_reader_rv1exec_t::unpack_rank (resource_graph_t &g, goto error; } } - - // Create and add a node vertex and link with cluster vertex - v = add_vertex (g, m, parent, iden, "containment", - "node", basename, hostname, properties, 1, rank); - if (v == boost::graph_traits::null_vertex ()) - goto error; - if (add_edges (g, m, parent, v, "containment", "contains", "in") < 0) - goto error; + if (!update_data.update) { + vtx_t tmp_vtx; + tmp_vtx = find_vertex (g, m, parent, id, "containment", "node", basename, + hostname, 1, rank); + // Shouldn't be in graph + if (tmp_vtx != boost::graph_traits::null_vertex ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": found duplicate vertex in graph for "; + m_err_msg += std::string (hostname) + ".\n"; + goto error; + } + // Create and add a node vertex and link with cluster vertex + vtx = add_vertex (g, m, parent, id, "containment", + "node", basename, hostname, properties, 1, rank); + if (vtx == boost::graph_traits::null_vertex ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": failed to add vertex for "; + m_err_msg += std::string (hostname) + ".\n"; + goto error; + } + if (add_edges (g, m, parent, vtx, "containment", "contains", "in") < 0) { + m_err_msg += __FUNCTION__; + m_err_msg += ": failed to add edges for "; + m_err_msg += std::string (hostname) + ".\n"; + goto error; + } + } else { + // Update resources + vtx = find_vertex (g, m, parent, id, "containment", "node", basename, + hostname, 1, rank); + // Not found + if (vtx == boost::graph_traits::null_vertex ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": couldn't find vertex in graph for "; + m_err_msg += std::string (hostname) + ".\n"; + goto error; + } + if (update_vertex (g, vtx, update_data) == -1) + goto error; + if (update_edges (g, m, parent, vtx, "containment", "contains", + "in", update_data) == -1) + goto error; + } // Unpack children node-local resources - if (unpack_children (g, m, v, children, rank, pmap) < 0) + if (unpack_children (g, m, vtx, children, rank, pmap, update_data) < 0) goto error; + if (update_data.update) { + // Update the rank's planner if all children allocated + if (update_exclusivity (g, m, vtx, update_data) == -1) { + m_err_msg += __FUNCTION__; + m_err_msg += ": exclusive filter update failed for "; + m_err_msg += std::string (hostname) + ".\n"; + goto error; + } + } + return 0; error: @@ -491,7 +814,8 @@ int resource_reader_rv1exec_t::unpack_rlite_entry (resource_graph_t &g, std::map &rmap, std::map &pmap) + properties_t> &pmap, + updater_data &update_data) { int rc = -1; unsigned rank; @@ -501,29 +825,30 @@ int resource_reader_rv1exec_t::unpack_rlite_entry (resource_graph_t &g, if (!entry || !hlist) { errno = EINVAL; - goto ret; + goto error; } if (json_unpack (entry, "{s:s s:o}", "rank", &ranks, "children", &children) < 0) { errno = EINVAL; - goto ret; + goto error; } if ( !(r_ids = idset_decode (ranks))) - goto ret; + goto error; rank = idset_first (r_ids); while (rank != IDSET_INVALID_ID) { - if (unpack_rank (g, m, parent, rank, children, hlist, rmap, pmap) < 0) - goto ret; + if (unpack_rank (g, m, parent, rank, children, hlist, rmap, pmap, + update_data) < 0) + goto error; rank = idset_next (r_ids, rank); } rc = 0; -ret: +error: idset_destroy (r_ids); return rc; } @@ -534,7 +859,8 @@ int resource_reader_rv1exec_t::unpack_rlite (resource_graph_t &g, struct hostlist *hlist, std::map &rmap, std::map &pmap) + properties_t> &pmap, + updater_data &update_data) { size_t index; vtx_t cluster_vtx; @@ -551,9 +877,12 @@ int resource_reader_rv1exec_t::unpack_rlite (resource_graph_t &g, } cluster_vtx = m.roots["containment"]; + // Set the cluster "needs" and make the update shared access to the cluster + m.v_rt_edges["containment"].set_for_trav_update (g[cluster_vtx].size, + false, update_data.token); json_array_foreach (rlite, index, entry) { if (unpack_rlite_entry (g, m, cluster_vtx, - entry, hlist, rmap, pmap) < 0) + entry, hlist, rmap, pmap, update_data) < 0) goto error; } return 0; @@ -564,7 +893,8 @@ int resource_reader_rv1exec_t::unpack_rlite (resource_graph_t &g, int resource_reader_rv1exec_t::unpack_internal (resource_graph_t &g, resource_graph_metadata_t &m, - json_t *rv1) + json_t *rv1, + updater_data &update_data) { int rc = -1; int version; @@ -612,7 +942,7 @@ int resource_reader_rv1exec_t::unpack_internal (resource_graph_t &g, if (hostlist_append (hlist, hlist_str) < 0) goto ret; } - if (unpack_rlite (g, m, rlite, hlist, rmap, pmap) < 0) + if (unpack_rlite (g, m, rlite, hlist, rmap, pmap, update_data) < 0) goto ret; rc = 0; @@ -642,6 +972,9 @@ int resource_reader_rv1exec_t::unpack (resource_graph_t &g, json_error_t error; json_t *rv1 = nullptr; int saved_errno; + updater_data null_data; + // Indicate adding, not updating + null_data.update = false; if (str == "") { errno = EINVAL; @@ -654,7 +987,8 @@ int resource_reader_rv1exec_t::unpack (resource_graph_t &g, errno = ENOMEM; goto ret; } - rc = unpack_internal (g, m, rv1); + + rc = unpack_internal (g, m, rv1, null_data); ret: saved_errno = errno; @@ -682,12 +1016,42 @@ int resource_reader_rv1exec_t::remove_subgraph (resource_graph_t &g, int resource_reader_rv1exec_t::update (resource_graph_t &g, resource_graph_metadata_t &m, - const std::string &str, int64_t jobid, + const std::string &R, int64_t jobid, int64_t at, uint64_t dur, bool rsv, uint64_t token) { - errno = ENOTSUP; // RV1Exec reader currently does not support update - return -1; + int rc = -1; + json_error_t error; + json_t *rv1 = nullptr; + int saved_errno; + updater_data update_data; + + if (R == "") { + errno = EINVAL; + goto ret; + } + + if ( !(rv1 = json_loads (R.c_str (), 0, &error))) { + errno = ENOMEM; + goto ret; + } + + update_data.update = true; + update_data.jobid = jobid; + update_data.at = at; + update_data.duration = dur; + update_data.reserved = rsv; + update_data.token = token; + + if ( (rc = unpack_internal (g, m, rv1, update_data)) == -1) { + undo_vertices (g, update_data); + } + +ret: + saved_errno = errno; + json_decref (rv1); + errno = saved_errno; + return rc; } bool resource_reader_rv1exec_t::is_allowlist_supported () diff --git a/resource/readers/resource_reader_rv1exec.hpp b/resource/readers/resource_reader_rv1exec.hpp index 77445918a..e59f7bf30 100644 --- a/resource/readers/resource_reader_rv1exec.hpp +++ b/resource/readers/resource_reader_rv1exec.hpp @@ -21,6 +21,17 @@ extern "C" { namespace Flux { namespace resource_model { +// Struct to track data for updata +struct updater_data { + int64_t jobid = 0; + int64_t at = 0; + uint64_t duration = 0; + bool reserved = false; + uint64_t token = 0; + // track updated vertices to undo upon error + std::map> updated_vertices; + bool update = true; // Updating or adding vertex +}; /*! RV1EXEC resource reader class. */ @@ -120,6 +131,34 @@ class resource_reader_rv1exec_t : public resource_reader_base_t { int add_cluster_vertex (resource_graph_t &g, resource_graph_metadata_t &m); + // Update functions + vtx_t find_vertex (resource_graph_t &g, resource_graph_metadata_t &m, + vtx_t parent, int64_t id, + const std::string &subsys, + const std::string &type, + const std::string &basename, + const std::string &name, + int size, int rank); + + int update_vertex (resource_graph_t &g, + vtx_t vtx, + updater_data &update_data); + + int undo_vertices (resource_graph_t &g, updater_data &update_data); + + int update_edges (resource_graph_t &g, + resource_graph_metadata_t &m, + vtx_t src, vtx_t dst, + const std::string &subsys, + const std::string &relation, + const std::string &rev_relation, + updater_data &update_data); + + int update_exclusivity (resource_graph_t &g, + resource_graph_metadata_t &m, + vtx_t vtx, + updater_data &update_data); + int build_rmap (json_t *rlite, std::map &rmap); int build_pmap (json_t *properties, @@ -129,35 +168,41 @@ class resource_reader_rv1exec_t : public resource_reader_base_t { resource_graph_metadata_t &m, vtx_t parent, const char *resource_type, const char *resource_ids, unsigned rank, - std::map &pmap); + std::map &pmap, + updater_data &update_data); int unpack_children (resource_graph_t &g, resource_graph_metadata_t &m, vtx_t parent, json_t *children, unsigned rank, - std::map &pmap); + std::map &pmap, + updater_data &update_data); int unpack_rank (resource_graph_t &g, resource_graph_metadata_t &m, vtx_t parent, unsigned rank, json_t *children, struct hostlist *hlist, std::map &rmap, - std::map &pmap); + std::map &pmap, + updater_data &update_data); int unpack_rlite_entry (resource_graph_t &g, resource_graph_metadata_t &m, vtx_t parent, json_t *entry, struct hostlist *hlist, std::map &rmap, - std::map &pmap); + std::map &pmap, + updater_data &update_data); int unpack_rlite (resource_graph_t &g, resource_graph_metadata_t &m, json_t *rlite, struct hostlist *hlist, std::map &rmap, - std::map &pmap); + std::map &pmap, + updater_data &update_data); int unpack_internal (resource_graph_t &g, - resource_graph_metadata_t &m, json_t *rv1); + resource_graph_metadata_t &m, json_t *rv1, + updater_data &update_data); }; } // namespace resource_model