From a34e25ee410ff0f0490663d68e06d41a1e0aab2e Mon Sep 17 00:00:00 2001 From: Daniel Milroy Date: Sat, 25 May 2024 23:16:58 -0700 Subject: [PATCH] readers: add partial cancel support for JGF --- resource/readers/resource_reader_jgf.cpp | 186 +++++++++++++++++++---- resource/readers/resource_reader_jgf.hpp | 34 +++-- 2 files changed, 183 insertions(+), 37 deletions(-) diff --git a/resource/readers/resource_reader_jgf.cpp b/resource/readers/resource_reader_jgf.cpp index 59ecc2286..0963744b4 100644 --- a/resource/readers/resource_reader_jgf.cpp +++ b/resource/readers/resource_reader_jgf.cpp @@ -725,8 +725,7 @@ int resource_reader_jgf_t::find_vtx (resource_graph_t &g, int resource_reader_jgf_t::update_vtx_plan (vtx_t v, resource_graph_t &g, const fetch_helper_t &fetcher, - uint64_t jobid, int64_t at, - uint64_t dur, bool rsv) + jgf_updater_data &update_data) { int rc = -1; int64_t span = -1; @@ -739,7 +738,9 @@ int resource_reader_jgf_t::update_vtx_plan (vtx_t v, resource_graph_t &g, m_err_msg += ": plan for " + g[v].name + " is null.\n"; goto done; } - if ( (avail = planner_avail_resources_during (plans, at, dur)) == -1) { + if ( (avail = planner_avail_resources_during (plans, + update_data.at, + update_data.duration)) == -1) { m_err_msg += __FUNCTION__; m_err_msg += ": planner_avail_resource_during return -1 for "; m_err_msg += g[v].name + ".\n"; @@ -749,16 +750,17 @@ int resource_reader_jgf_t::update_vtx_plan (vtx_t v, resource_graph_t &g, if (fetcher.exclusive) { // Update the vertex plan here (not in traverser code) so vertices // that the traverser won't walk still get their plans updated. - if ( (span = planner_add_span (plans, at, dur, - static_cast (g[v].size))) == -1) { + if ( (span = planner_add_span (plans, update_data.at, + update_data.duration, + static_cast (g[v].size))) == -1) { m_err_msg += __FUNCTION__; m_err_msg += ": can't add span into " + g[v].name + ".\n"; goto done; } - if (rsv) - g[v].schedule.reservations[jobid] = span; + if (update_data.reserved) + g[v].schedule.reservations[update_data.jobid] = span; else - g[v].schedule.allocations[jobid] = span; + g[v].schedule.allocations[update_data.jobid] = span; } else { if (avail < g[v].size) { // if g[v] has already been allocated/reserved, this is an error @@ -773,12 +775,96 @@ int resource_reader_jgf_t::update_vtx_plan (vtx_t v, resource_graph_t &g, return rc; } +int resource_reader_jgf_t::cancel_vtx (vtx_t vtx, resource_graph_t &g, + resource_graph_metadata_t &m, + const fetch_helper_t &fetcher, + jgf_updater_data &update_data) +{ + int rc = -1; + int64_t span = -1; + int64_t prev_avail = -1; + planner_multi_t *subtree_plan = NULL; + planner_t *x_checker = NULL; + planner_t *plans = NULL; + auto &job2span = g[vtx].idata.job2span; + auto &x_spans = g[vtx].idata.x_spans; + auto &tags = g[vtx].idata.tags; + std::map::iterator span_it; + std::map::iterator xspan_it; + std::map>::iterator rank_vect; + std::vector::iterator rank_position; + + // remove from aggregate filter if present + auto agg_span = job2span.find (update_data.jobid); + if (agg_span != job2span.end ()) { + if ((subtree_plan = g[vtx].idata.subplans["containment"]) == NULL) + goto ret; + if (planner_multi_rem_span (subtree_plan, agg_span->second) != 0) + goto ret; + // Delete from job2span tracker + job2span.erase (update_data.jobid); + } + + // remove from exclusive filter; + xspan_it = x_spans.find (update_data.jobid); + if (xspan_it == x_spans.end ()) { + errno = EINVAL; + goto ret; + } + + x_checker = g[vtx].idata.x_checker; + g[vtx].idata.tags.erase (update_data.jobid); + g[vtx].idata.x_spans.erase (update_data.jobid); + if (planner_rem_span (x_checker, xspan_it->second) == -1) { + errno = EINVAL; + goto ret; + } + // rem plan + span_it = g[vtx].schedule.allocations.find (update_data.jobid); + if (span_it != g[vtx].schedule.allocations.end ()) { + g[vtx].schedule.allocations.erase (update_data.jobid); + } else { + errno = EINVAL; + goto ret; + } + plans = g[vtx].schedule.plans; + prev_avail = planner_avail_resources_at (plans, 0); + if (planner_rem_span (plans, span_it->second) == -1) { + errno = EINVAL; + goto ret; + } + // Add the newly freed counts, Can't assume it freed everything. + update_data.type_to_count[g[vtx].type] += (planner_avail_resources_at (plans, 0) + - prev_avail); + rank_vect = m.by_rank.find (g[vtx].rank); + if (rank_vect == m.by_rank.end ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": attepted to cancel rank that does not exist "; + m_err_msg += "in by_rank map\n"; + goto ret; + } + rank_position = std::find (rank_vect->second.begin (), + rank_vect->second.end (), vtx); + if (rank_position == rank_vect->second.end ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": attepted to remove vertex which does not exist "; + m_err_msg += "in by_rank map\n"; + goto ret; + } + rank_vect->second.erase (rank_position); + update_data.ranks.insert (g[vtx].rank); + + rc = 0; + +ret: + return rc; +} + int resource_reader_jgf_t::update_vtx (resource_graph_t &g, resource_graph_metadata_t &m, std::map &vmap, const fetch_helper_t &fetcher, - uint64_t jobid, int64_t at, - uint64_t dur, bool rsv) + jgf_updater_data &update_data) { int rc = -1; std::map root_checks; @@ -791,8 +877,13 @@ int resource_reader_jgf_t::update_vtx (resource_graph_t &g, goto done; if ( (rc = update_vmap (vmap, v, root_checks, fetcher)) != 0) goto done; - if ( (rc = update_vtx_plan (v, g, fetcher, jobid, at, dur, rsv)) != 0) - goto done; + if (update_data.update) { + if ( (rc = update_vtx_plan (v, g, fetcher, update_data)) != 0) + goto done; + } else { + if ( (rc = cancel_vtx (v, g, m, fetcher, update_data)) != 0) + goto done; + } done: return rc; @@ -800,7 +891,7 @@ int resource_reader_jgf_t::update_vtx (resource_graph_t &g, int resource_reader_jgf_t::undo_vertices (resource_graph_t &g, std::map &vmap, - uint64_t jobid, bool rsv) + jgf_updater_data &update_data) { int rc = 0; int rc2 = 0; @@ -813,12 +904,12 @@ int resource_reader_jgf_t::undo_vertices (resource_graph_t &g, continue; try { v = kv.second.v; - if (rsv) { - span = g[v].schedule.reservations.at (jobid); - g[v].schedule.reservations.erase (jobid); + if (update_data.reserved) { + span = g[v].schedule.reservations.at (update_data.jobid); + g[v].schedule.reservations.erase (update_data.jobid); } else { - span = g[v].schedule.allocations.at (jobid); - g[v].schedule.allocations.erase (jobid); + span = g[v].schedule.allocations.at (update_data.jobid); + g[v].schedule.allocations.erase (update_data.jobid); } plans = g[v].schedule.plans; @@ -886,9 +977,8 @@ int resource_reader_jgf_t::update_vertices (resource_graph_t &g, resource_graph_metadata_t &m, std::map &vmap, - json_t *nodes, int64_t jobid, - int64_t at, uint64_t dur, - bool rsv) + json_t *nodes, + jgf_updater_data &update_data) { int rc = -1; unsigned int i = 0; @@ -898,7 +988,7 @@ int resource_reader_jgf_t::update_vertices (resource_graph_t &g, fetcher.scrub (); if ( (rc = unpack_vtx (json_array_get (nodes, i), fetcher)) != 0) goto done; - if ( (rc = update_vtx (g, m, vmap, fetcher, jobid, at, dur, rsv)) != 0) + if ( (rc = update_vtx (g, m, vmap, fetcher, update_data)) != 0) goto done; } rc = 0; @@ -1236,6 +1326,7 @@ int resource_reader_jgf_t::update (resource_graph_t &g, json_t *nodes = NULL; json_t *edges = NULL; std::map vmap; + jgf_updater_data update_data; if (at < 0 || dur == 0) { errno = EINVAL; @@ -1245,10 +1336,18 @@ int resource_reader_jgf_t::update (resource_graph_t &g, + std::to_string (dur) + ").\n"; goto done; } + + // Fill in updater data + update_data.jobid = jobid; + update_data.at = at; + update_data.duration = dur; + update_data.reserved = rsv; + update_data.update = true; + if ( (rc = fetch_jgf (str, &jgf, &nodes, &edges)) != 0) goto done; - if ( (rc = update_vertices (g, m, vmap, nodes, jobid, at, dur, rsv)) != 0) { - undo_vertices (g, vmap, jobid, rsv); + if ( (rc = update_vertices (g, m, vmap, nodes, update_data)) != 0) { + undo_vertices (g, vmap, update_data); goto done; } if ( (rc = update_edges (g, m, vmap, edges, token)) != 0) @@ -1301,8 +1400,43 @@ int resource_reader_jgf_t::partial_cancel (resource_graph_t &g, std::unordered_map &type_to_count, const std::string &R, int64_t jobid) { - errno = ENOTSUP; // JGF reader does not support partial cancel - return -1; + int rc = -1; + json_t *jgf = NULL; + json_t *nodes = NULL; + json_t *edges = NULL; + std::map vmap; + jgf_updater_data p_cancel_data; + + if (jobid <= 0) { + errno = EINVAL; + m_err_msg += __FUNCTION__; + m_err_msg += ": invalid jobid\n"; + goto done; + } + + // Fill in updater data + p_cancel_data.jobid = jobid; + p_cancel_data.update = false; + + if ( (rc = fetch_jgf (R, &jgf, &nodes, &edges)) != 0) + goto done; + if ( (rc = update_vertices (g, m, vmap, nodes, p_cancel_data)) != 0) + goto done; + + // Remove from by_rank map if all vertices in the rank were + // cancelled. + for (auto rank : p_cancel_data.ranks) { + auto existing_rank = m.by_rank.find (rank); + // We know it's in by_rank or else update_vertices would have failed + if (existing_rank->second.size () == 0) + m.by_rank.erase (existing_rank); + } + + type_to_count = p_cancel_data.type_to_count; + +done: + json_decref (jgf); + return rc; } bool resource_reader_jgf_t::is_allowlist_supported () diff --git a/resource/readers/resource_reader_jgf.hpp b/resource/readers/resource_reader_jgf.hpp index 01a247205..fe5ccf53c 100644 --- a/resource/readers/resource_reader_jgf.hpp +++ b/resource/readers/resource_reader_jgf.hpp @@ -23,6 +23,19 @@ struct vmap_val_t; namespace Flux { namespace resource_model { +// Struct to track data for updates +struct jgf_updater_data { + int64_t jobid = 0; + int64_t at = 0; + uint64_t duration = 0; + bool reserved = false; + // track counts of resources to be cancelled + std::unordered_map type_to_count; + // track count of rank vertices to determine if rank + // should be removed from by_rank map + std::unordered_set ranks; + bool update = true; // Updating or partial cancel +}; /*! JGF resource reader class. */ @@ -138,27 +151,26 @@ class resource_reader_jgf_t : public resource_reader_base_t { std::map &vmap, const fetch_helper_t &fetcher, vtx_t &ret_v); int update_vtx_plan (vtx_t v, resource_graph_t &g, - const fetch_helper_t &fetcher, uint64_t jobid, - int64_t at, uint64_t dur, bool rsv); + const fetch_helper_t &fetcher, + jgf_updater_data &update_data); + int cancel_vtx (vtx_t v, resource_graph_t &g, + resource_graph_metadata_t &m, + const fetch_helper_t &fetcher, + jgf_updater_data &update_data); int update_vtx (resource_graph_t &g, resource_graph_metadata_t &m, std::map &vmap, - const fetch_helper_t &fetcher, uint64_t jobid, int64_t at, - uint64_t dur, bool rsv); + const fetch_helper_t &fetcher, + jgf_updater_data &updater_data); int unpack_vertices (resource_graph_t &g, resource_graph_metadata_t &m, std::map &vmap, json_t *nodes, std::unordered_set &added_vtcs); int undo_vertices (resource_graph_t &g, std::map &vmap, - uint64_t jobid, bool rsv); - int update_vertices (resource_graph_t &g, resource_graph_metadata_t &m, - std::map &vmap, - json_t *nodes, int64_t jobid, int64_t at, - uint64_t dur, bool rsv); + jgf_updater_data &updater_data); int update_vertices (resource_graph_t &g, resource_graph_metadata_t &m, std::map &vmap, - json_t *nodes, int64_t jobid, int64_t at, - uint64_t dur); + json_t *nodes, jgf_updater_data &updater_data); int unpack_edge (json_t *element, std::map &vmap, std::string &source, std::string &target, json_t **name); int update_src_edge (resource_graph_t &g, resource_graph_metadata_t &m,