diff --git a/resource/readers/resource_reader_rv1exec.cpp b/resource/readers/resource_reader_rv1exec.cpp index 8ff14f3dd..535816d19 100644 --- a/resource/readers/resource_reader_rv1exec.cpp +++ b/resource/readers/resource_reader_rv1exec.cpp @@ -222,6 +222,239 @@ int resource_reader_rv1exec_t::add_cluster_vertex (resource_graph_t &g, return 0; } +vtx_t resource_reader_rv1exec_t::find_vertex (resource_graph_t &g, + resource_graph_metadata_t &m, + vtx_t parent, int64_t id, + const std::string &subsys, + const std::string &type, + const std::string &basename, + const std::string &name, + int size, int rank) +{ + bool is_root = false; + std::string path = ""; + std::string vtx_name = ""; + vtx_t vtx = boost::graph_traits::null_vertex (); + + // Get properties of the vertex + if (parent == boost::graph_traits::null_vertex ()) + is_root = true; + + std::string idstr = (id != -1)? std::to_string (id) : ""; + std::string prefix = is_root ? "" : g[parent].paths[subsys]; + vtx_name = (name != "")? name : basename + idstr; + path = prefix + "/" + vtx_name; + + // Search graph metadata for vertex + auto vtx_iter = m.by_path.find (path); + if (vtx_iter != m.by_path.end ()) { + for (vtx_t v : vtx_iter->second) { + if (g[v].rank == rank + && g[v].id == id + && g[v].size == size + && g[v].type == type) { + vtx = v; + break; + } + } + } + + return vtx; +} + +int resource_reader_rv1exec_t::update_vertex (resource_graph_t &g, + vtx_t vtx, + updater_data &update_data) +{ + + int rc = -1; + int64_t span = -1; + int64_t avail = -1; + planner_t *plans = NULL; + + // Check and update plan + if ( (plans = g[vtx].schedule.plans) == NULL) { + errno = EINVAL; + m_err_msg += __FUNCTION__; + m_err_msg += ": plan for " + g[vtx].name + " is null.\n"; + goto error; + } + if ( (avail = planner_avail_resources_during (plans, + update_data.at, + update_data.duration)) == -1) { + m_err_msg += __FUNCTION__; + m_err_msg += ": planner_avail_resource_during return -1 for "; + m_err_msg += g[vtx].name + ".\n"; + goto error; + } + if (avail < g[vtx].size) { + // if g[v] has already been allocated/reserved, this is an error + m_err_msg += __FUNCTION__; + m_err_msg += ": " + g[vtx].name + " is unavailable.\n"; + goto error; + } + // Update the vertex plan here (not in traverser code). + // Traverser update () will handle aggregate filters and + // exclusivity checking filter. + // Can't update the rank-level (node) vertex yet- we + // don't know if all its children are allocated. + // Note this is a hard-coded option. Support for more flexible + // types may require extending rv1exec. + if (g[vtx].type != "node") { + if ( (span = planner_add_span (plans, + update_data.at, + update_data.duration, + static_cast (g[vtx].size))) == -1) { + m_err_msg += __FUNCTION__; + m_err_msg += ": can't add span into " + g[vtx].name + ".\n"; + goto error; + } + + if (update_data.reserved) + g[vtx].schedule.reservations[update_data.jobid] = span; + else + g[vtx].schedule.allocations[update_data.jobid] = span; + + update_data.updated_vertices[g[vtx].rank].push_back (vtx); + } + + rc = 0; + +error: + return rc; +} + +int resource_reader_rv1exec_t::undo_vertices (resource_graph_t &g, + updater_data &update_data) +{ + + int rc = -1; + int64_t span = -1; + planner_t *plans = NULL; + + for (auto rank : update_data.updated_vertices) { + for (vtx_t vtx : update_data.updated_vertices.at (rank.first)) { + // Check plan + if ( (plans = g[vtx].schedule.plans) == NULL) { + errno = EINVAL; + m_err_msg += __FUNCTION__; + m_err_msg += ": plan for " + g[vtx].name + " is null.\n"; + goto error; + } + // Remove job tags + if (update_data.reserved) { + span = g[vtx].schedule.reservations.at (update_data.jobid); + g[vtx].schedule.reservations.erase (update_data.jobid); + } else { + span = g[vtx].schedule.allocations.at (update_data.jobid); + g[vtx].schedule.allocations.erase (update_data.jobid); + } + // Remove the span. + if (planner_rem_span (plans, span) == -1) { + m_err_msg += __FUNCTION__; + m_err_msg += ": can't remove span from " + g[vtx].name + ".\n"; + goto error; + } + } + } + + rc = 0; + +error: + return rc; +} + +int resource_reader_rv1exec_t::update_edges (resource_graph_t &g, + resource_graph_metadata_t &m, + vtx_t src, vtx_t dst, + const std::string &subsys, + const std::string &relation, + const std::string &rev_relation, + updater_data &update_data) +{ + edg_t e; + int rc = -1; + bool found = false; + boost::graph_traits::out_edge_iterator ei, ei_end; + + boost::tie (ei, ei_end) = boost::out_edges (src, g); + for (; ei != ei_end; ++ei) { + if (boost::target (*ei, g) == dst) { + e = *ei; + found = true; + break; + } + } + if (!found) { + errno = EINVAL; + m_err_msg += __FUNCTION__; + m_err_msg += ": JGF edge not found in resource graph.\n"; + goto error; + } + g[e].idata.set_for_trav_update (g[dst].size, + true, update_data.token); + + return 0; + +error: + return -1; +} + +int resource_reader_rv1exec_t::update_exclusivity (resource_graph_t &g, + resource_graph_metadata_t &m, + vtx_t vtx, + updater_data &update_data) +{ + // idata tag and exclusive checker update + int64_t span = -1; + planner_t *plans = NULL; + + auto rank_ex = update_data.updated_vertices.find (g[vtx].rank); + if (rank_ex == update_data.updated_vertices.end ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": rank not found in agfilters map.\n"; + return -1; + } + // This check enforces a rigid constraint on rank equivalence + // between graph initialization and rank in rv1exec string. + auto by_rank = m.by_rank.find (g[vtx].rank); + if (by_rank == m.by_rank.end ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": rank not found in by_rank graph map.\n"; + return -1; + } + + // If all child vertices allocated, allocate this vertex. + // Subtract one since the current node hasn't been added to the + // updated_vertices map. + if (rank_ex->second.size () == (by_rank->second.size () - 1)) { + if ( (plans = g[vtx].schedule.plans) == NULL) { + errno = EINVAL; + m_err_msg += __FUNCTION__; + m_err_msg += ": plan for " + g[vtx].name + " is null.\n"; + return -1; + } + // Update the vertex plan here (not in traverser code). + // Traverser update () will handle aggregate filters and + // exclusivity checking filter. + if ( (span = planner_add_span (plans, update_data.at, update_data.duration, + static_cast (g[vtx].size))) == -1) { + m_err_msg += __FUNCTION__; + m_err_msg += ": can't add span into " + g[vtx].name + ".\n"; + return -1; + } + // Add job tags + if (update_data.reserved) + g[vtx].schedule.reservations[update_data.jobid] = span; + else + g[vtx].schedule.allocations[update_data.jobid] = span; + // Add to the updated vertices vector to undo upon error. + update_data.updated_vertices[g[vtx].rank].push_back (vtx); + } + + return 0; +} + int resource_reader_rv1exec_t::build_rmap (json_t *rlite, std::map &rmap) { @@ -350,7 +583,8 @@ int resource_reader_rv1exec_t::unpack_child (resource_graph_t &g, const char *resource_ids, unsigned rank, std::map &pmap) + properties_t> &pmap, + updater_data &update_data) { int rc = -1; unsigned id; @@ -358,36 +592,73 @@ int resource_reader_rv1exec_t::unpack_child (resource_graph_t &g, if (!resource_type || !resource_ids) { errno = EINVAL; - goto ret; + goto error; } if ( !(ids = idset_decode (resource_ids))) - goto ret; + goto error; id = idset_first (ids); while (id != IDSET_INVALID_ID) { edg_t e; - vtx_t v; + vtx_t vtx; std::string name = resource_type + std::to_string (id); - std::map p; + std::map properties; if (pmap.find (rank) != pmap.end ()) { if (pmap[rank].exist (resource_type)) { - if (pmap[rank].copy (resource_type, p) < 0) - goto ret; + if (pmap[rank].copy (resource_type, properties) < 0) + goto error; } } - v = add_vertex (g, m, parent, id, - "containment", resource_type, - resource_type, name, p, 1, rank); - if (v == boost::graph_traits::null_vertex ()) - goto ret; - if (add_edges (g, m, parent, v, "containment", "contains", "in") < 0) - goto ret; + if (!update_data.update) { + vtx = find_vertex (g, m, parent, id, "containment", resource_type, + resource_type, name, 1, rank); + // Shouldn't be found + if (vtx != boost::graph_traits::null_vertex ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": found duplicate vertex in graph for "; + m_err_msg += name + ".\n"; + goto error; + } + // Add resources + vtx = add_vertex (g, m, parent, id, + "containment", resource_type, + resource_type, name, properties, 1, rank); + if (vtx == boost::graph_traits::null_vertex ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": failed to add vertex for "; + m_err_msg += name + ".\n"; + goto error; + } + if (add_edges (g, m, parent, vtx, "containment", + "contains", "in") < 0) { + m_err_msg += __FUNCTION__; + m_err_msg += ": failed to add edges for "; + m_err_msg += name + ".\n"; + goto error; + } + } else { + // Update resources + vtx = find_vertex (g, m, parent, id, "containment", resource_type, + resource_type, name, 1, rank); + // Not found + if (vtx == boost::graph_traits::null_vertex ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": couldn't find vertex in graph for "; + m_err_msg += name + ".\n"; + goto error; + } + if (update_vertex (g, vtx, update_data) == -1) + goto error; + if (update_edges (g, m, parent, vtx, "containment", "contains", + "in", update_data) == -1) + goto error; + } id = idset_next (ids, id); } rc = 0; -ret: +error: idset_destroy (ids); return rc; } @@ -399,7 +670,8 @@ int resource_reader_rv1exec_t::unpack_children (resource_graph_t &g, json_t *children, unsigned rank, std::map &pmap) + properties_t> &pmap, + updater_data &update_data) { json_t *res_ids = nullptr; const char *res_type = nullptr; @@ -415,7 +687,8 @@ int resource_reader_rv1exec_t::unpack_children (resource_graph_t &g, goto error; } const char *ids_str = json_string_value (res_ids); - if (unpack_child (g, m, parent, res_type, ids_str, rank, pmap) < 0) + if (unpack_child (g, m, parent, res_type, ids_str, rank, pmap, + update_data) < 0) goto error; } return 0; @@ -432,11 +705,12 @@ int resource_reader_rv1exec_t::unpack_rank (resource_graph_t &g, struct hostlist *hlist, std::map &rmap, std::map &pmap) + properties_t> &pmap, + updater_data &update_data) { edg_t e; - vtx_t v; - int64_t iden; + vtx_t vtx; + int64_t id; const char *hostname = nullptr; std::string basename; std::map properties; @@ -452,7 +726,7 @@ int resource_reader_rv1exec_t::unpack_rank (resource_graph_t &g, if ( !(hostname = hostlist_nth (hlist, static_cast (rmap[rank])))) goto error; - if (get_hostname_suffix (hostname, iden) < 0 + if (get_hostname_suffix (hostname, id) < 0 || get_host_basename (hostname, basename) < 0) { m_err_msg += __FUNCTION__; m_err_msg += ": error splitting hostname="; @@ -465,18 +739,63 @@ int resource_reader_rv1exec_t::unpack_rank (resource_graph_t &g, goto error; } } - - // Create and add a node vertex and link with cluster vertex - v = add_vertex (g, m, parent, iden, "containment", - "node", basename, hostname, properties, 1, rank); - if (v == boost::graph_traits::null_vertex ()) - goto error; - if (add_edges (g, m, parent, v, "containment", "contains", "in") < 0) - goto error; + if (!update_data.update) { + vtx_t tmp_vtx; + tmp_vtx = find_vertex (g, m, parent, id, "containment", "node", basename, + hostname, 1, rank); + // Shouldn't be in graph + if (tmp_vtx != boost::graph_traits::null_vertex ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": found duplicate vertex in graph for "; + m_err_msg += std::string (hostname) + ".\n"; + goto error; + } + // Create and add a node vertex and link with cluster vertex + vtx = add_vertex (g, m, parent, id, "containment", + "node", basename, hostname, properties, 1, rank); + if (vtx == boost::graph_traits::null_vertex ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": failed to add vertex for "; + m_err_msg += std::string (hostname) + ".\n"; + goto error; + } + if (add_edges (g, m, parent, vtx, "containment", "contains", "in") < 0) { + m_err_msg += __FUNCTION__; + m_err_msg += ": failed to add edges for "; + m_err_msg += std::string (hostname) + ".\n"; + goto error; + } + } else { + // Update resources + vtx = find_vertex (g, m, parent, id, "containment", "node", basename, + hostname, 1, rank); + // Not found + if (vtx == boost::graph_traits::null_vertex ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": couldn't find vertex in graph for "; + m_err_msg += std::string (hostname) + ".\n"; + goto error; + } + if (update_vertex (g, vtx, update_data) == -1) + goto error; + if (update_edges (g, m, parent, vtx, "containment", "contains", + "in", update_data) == -1) + goto error; + } // Unpack children node-local resources - if (unpack_children (g, m, v, children, rank, pmap) < 0) + if (unpack_children (g, m, vtx, children, rank, pmap, update_data) < 0) goto error; + if (update_data.update) { + // Update the rank's planner if all children allocated + if (update_exclusivity (g, m, vtx, update_data) == -1) { + m_err_msg += __FUNCTION__; + m_err_msg += ": exclusive filter update failed for "; + m_err_msg += std::string (hostname) + ".\n"; + goto error; + } + } + return 0; error: @@ -491,7 +810,8 @@ int resource_reader_rv1exec_t::unpack_rlite_entry (resource_graph_t &g, std::map &rmap, std::map &pmap) + properties_t> &pmap, + updater_data &update_data) { int rc = -1; unsigned rank; @@ -501,29 +821,30 @@ int resource_reader_rv1exec_t::unpack_rlite_entry (resource_graph_t &g, if (!entry || !hlist) { errno = EINVAL; - goto ret; + goto error; } if (json_unpack (entry, "{s:s s:o}", "rank", &ranks, "children", &children) < 0) { errno = EINVAL; - goto ret; + goto error; } if ( !(r_ids = idset_decode (ranks))) - goto ret; + goto error; rank = idset_first (r_ids); while (rank != IDSET_INVALID_ID) { - if (unpack_rank (g, m, parent, rank, children, hlist, rmap, pmap) < 0) - goto ret; + if (unpack_rank (g, m, parent, rank, children, hlist, rmap, pmap, + update_data) < 0) + goto error; rank = idset_next (r_ids, rank); } rc = 0; -ret: +error: idset_destroy (r_ids); return rc; } @@ -534,7 +855,8 @@ int resource_reader_rv1exec_t::unpack_rlite (resource_graph_t &g, struct hostlist *hlist, std::map &rmap, std::map &pmap) + properties_t> &pmap, + updater_data &update_data) { size_t index; vtx_t cluster_vtx; @@ -551,9 +873,12 @@ int resource_reader_rv1exec_t::unpack_rlite (resource_graph_t &g, } cluster_vtx = m.roots["containment"]; + // Set the cluster "needs" and make the update shared access to the cluster + m.v_rt_edges["containment"].set_for_trav_update (g[cluster_vtx].size, + false, update_data.token); json_array_foreach (rlite, index, entry) { if (unpack_rlite_entry (g, m, cluster_vtx, - entry, hlist, rmap, pmap) < 0) + entry, hlist, rmap, pmap, update_data) < 0) goto error; } return 0; @@ -564,7 +889,8 @@ int resource_reader_rv1exec_t::unpack_rlite (resource_graph_t &g, int resource_reader_rv1exec_t::unpack_internal (resource_graph_t &g, resource_graph_metadata_t &m, - json_t *rv1) + json_t *rv1, + updater_data &update_data) { int rc = -1; int version; @@ -612,7 +938,7 @@ int resource_reader_rv1exec_t::unpack_internal (resource_graph_t &g, if (hostlist_append (hlist, hlist_str) < 0) goto ret; } - if (unpack_rlite (g, m, rlite, hlist, rmap, pmap) < 0) + if (unpack_rlite (g, m, rlite, hlist, rmap, pmap, update_data) < 0) goto ret; rc = 0; @@ -642,6 +968,9 @@ int resource_reader_rv1exec_t::unpack (resource_graph_t &g, json_error_t error; json_t *rv1 = nullptr; int saved_errno; + updater_data null_data; + // Indicate adding, not updating + null_data.update = false; if (str == "") { errno = EINVAL; @@ -654,7 +983,8 @@ int resource_reader_rv1exec_t::unpack (resource_graph_t &g, errno = ENOMEM; goto ret; } - rc = unpack_internal (g, m, rv1); + + rc = unpack_internal (g, m, rv1, null_data); ret: saved_errno = errno; @@ -682,12 +1012,42 @@ int resource_reader_rv1exec_t::remove_subgraph (resource_graph_t &g, int resource_reader_rv1exec_t::update (resource_graph_t &g, resource_graph_metadata_t &m, - const std::string &str, int64_t jobid, + const std::string &R, int64_t jobid, int64_t at, uint64_t dur, bool rsv, uint64_t token) { - errno = ENOTSUP; // RV1Exec reader currently does not support update - return -1; + int rc = -1; + json_error_t error; + json_t *rv1 = nullptr; + int saved_errno; + updater_data update_data; + + if (R == "") { + errno = EINVAL; + goto ret; + } + + if ( !(rv1 = json_loads (R.c_str (), 0, &error))) { + errno = ENOMEM; + goto ret; + } + + update_data.update = true; + update_data.jobid = jobid; + update_data.at = at; + update_data.duration = dur; + update_data.reserved = rsv; + update_data.token = token; + + if ( (rc = unpack_internal (g, m, rv1, update_data)) == -1) { + undo_vertices (g, update_data); + } + +ret: + saved_errno = errno; + json_decref (rv1); + errno = saved_errno; + return rc; } bool resource_reader_rv1exec_t::is_allowlist_supported () diff --git a/resource/readers/resource_reader_rv1exec.hpp b/resource/readers/resource_reader_rv1exec.hpp index 77445918a..e59f7bf30 100644 --- a/resource/readers/resource_reader_rv1exec.hpp +++ b/resource/readers/resource_reader_rv1exec.hpp @@ -21,6 +21,17 @@ extern "C" { namespace Flux { namespace resource_model { +// Struct to track data for updata +struct updater_data { + int64_t jobid = 0; + int64_t at = 0; + uint64_t duration = 0; + bool reserved = false; + uint64_t token = 0; + // track updated vertices to undo upon error + std::map> updated_vertices; + bool update = true; // Updating or adding vertex +}; /*! RV1EXEC resource reader class. */ @@ -120,6 +131,34 @@ class resource_reader_rv1exec_t : public resource_reader_base_t { int add_cluster_vertex (resource_graph_t &g, resource_graph_metadata_t &m); + // Update functions + vtx_t find_vertex (resource_graph_t &g, resource_graph_metadata_t &m, + vtx_t parent, int64_t id, + const std::string &subsys, + const std::string &type, + const std::string &basename, + const std::string &name, + int size, int rank); + + int update_vertex (resource_graph_t &g, + vtx_t vtx, + updater_data &update_data); + + int undo_vertices (resource_graph_t &g, updater_data &update_data); + + int update_edges (resource_graph_t &g, + resource_graph_metadata_t &m, + vtx_t src, vtx_t dst, + const std::string &subsys, + const std::string &relation, + const std::string &rev_relation, + updater_data &update_data); + + int update_exclusivity (resource_graph_t &g, + resource_graph_metadata_t &m, + vtx_t vtx, + updater_data &update_data); + int build_rmap (json_t *rlite, std::map &rmap); int build_pmap (json_t *properties, @@ -129,35 +168,41 @@ class resource_reader_rv1exec_t : public resource_reader_base_t { resource_graph_metadata_t &m, vtx_t parent, const char *resource_type, const char *resource_ids, unsigned rank, - std::map &pmap); + std::map &pmap, + updater_data &update_data); int unpack_children (resource_graph_t &g, resource_graph_metadata_t &m, vtx_t parent, json_t *children, unsigned rank, - std::map &pmap); + std::map &pmap, + updater_data &update_data); int unpack_rank (resource_graph_t &g, resource_graph_metadata_t &m, vtx_t parent, unsigned rank, json_t *children, struct hostlist *hlist, std::map &rmap, - std::map &pmap); + std::map &pmap, + updater_data &update_data); int unpack_rlite_entry (resource_graph_t &g, resource_graph_metadata_t &m, vtx_t parent, json_t *entry, struct hostlist *hlist, std::map &rmap, - std::map &pmap); + std::map &pmap, + updater_data &update_data); int unpack_rlite (resource_graph_t &g, resource_graph_metadata_t &m, json_t *rlite, struct hostlist *hlist, std::map &rmap, - std::map &pmap); + std::map &pmap, + updater_data &update_data); int unpack_internal (resource_graph_t &g, - resource_graph_metadata_t &m, json_t *rv1); + resource_graph_metadata_t &m, json_t *rv1, + updater_data &update_data); }; } // namespace resource_model