From a78d5157fdf1c5e261913f89eade6630051c6ffb Mon Sep 17 00:00:00 2001 From: Daniel Milroy Date: Mon, 8 Apr 2024 00:58:11 -0700 Subject: [PATCH] traverser: add partial cancellation functionality Problem: Fluxion issue https://github.com/flux-framework/flux-sched/issues/1151 and flux-core issue https://github.com/flux-framework/flux-core/issues/4312 identified the need for partial release of resources. The current functionality need is to release all resources managed by a single broker rank. In the future support for releasing arbitrary subgraphs will be needed for cloud and converged use cases. Modify the rem_* traverser functions to take a modification type and type_to_count unordered_map. Add logic in the recursive job modification calls to distinguish between a full and partial job cancellation and issue corresponding planner interface calls, handling errors as needed. --- resource/traversers/dfu.cpp | 18 ++ resource/traversers/dfu.hpp | 16 ++ resource/traversers/dfu_impl.hpp | 36 ++- resource/traversers/dfu_impl_update.cpp | 300 +++++++++++++++++++----- 4 files changed, 305 insertions(+), 65 deletions(-) diff --git a/resource/traversers/dfu.cpp b/resource/traversers/dfu.cpp index fe768f099..d4f51b6f0 100644 --- a/resource/traversers/dfu.cpp +++ b/resource/traversers/dfu.cpp @@ -391,6 +391,24 @@ int dfu_traverser_t::remove (int64_t jobid) return detail::dfu_impl_t::remove (root, jobid); } +int dfu_traverser_t::remove (const std::string &R_to_cancel, + std::shared_ptr &reader, + int64_t jobid, bool &full_cancel) +{ + const subsystem_t &dom = get_match_cb ()->dom_subsystem (); + if (!get_graph () || !get_graph_db () + || get_graph_db ()->metadata.roots.find (dom) + == get_graph_db ()->metadata.roots.end () + || !get_match_cb ()) { + errno = EINVAL; + return -1; + } + + vtx_t root = get_graph_db ()->metadata.roots.at (dom); + return detail::dfu_impl_t::remove (root, R_to_cancel, reader, jobid, + full_cancel); +} + int dfu_traverser_t::mark (const std::string &root_path, resource_pool_t::status_t status) { diff --git a/resource/traversers/dfu.hpp b/resource/traversers/dfu.hpp index 8040f69d3..f0dce1a61 100644 --- a/resource/traversers/dfu.hpp +++ b/resource/traversers/dfu.hpp @@ -164,6 +164,22 @@ class dfu_traverser_t : protected detail::dfu_impl_t */ int remove (int64_t jobid); + /*! Remove the allocation/reservation referred to by jobid and update + * the resource state. + * + * \param R_to_cancel deallocation string such as written in JGF. + * \param reader reader object that deserialize str to update the + * graph + * \param jobid job id. + * \param full_cancel bool indicating if the partial cancel cancelled all + * job resources + * \return 0 on success; -1 on error. + * EINVAL: graph, roots or match callback not set. + */ + int remove (const std::string &to_cancel, + std::shared_ptr &reader, + int64_t jobid, bool &full_cancel); + /*! Mark the resource status up|down|etc starting at subtree_root. * * \param root_path path to the root of the subtree to update. diff --git a/resource/traversers/dfu_impl.hpp b/resource/traversers/dfu_impl.hpp index 7a43426d5..09b96aa83 100644 --- a/resource/traversers/dfu_impl.hpp +++ b/resource/traversers/dfu_impl.hpp @@ -292,6 +292,21 @@ class dfu_impl_t { */ int remove (vtx_t root, int64_t jobid); + /*! Remove the allocation/reservation referred to by jobid and update + * the resource state. + * + * \param root root resource vertex. + * \param to_cancel deallocation string such as written in JGF. + * \param reader reader object that deserialize str to update the graph + * \param jobid job id. + * \param full_cancel bool indicating if the partial cancel cancelled all + * job resources + * \return 0 on success; -1 on error. + */ + int remove (vtx_t root, const std::string &to_cancel, + std::shared_ptr &reader, + int64_t jobid, bool &full_cancel); + /*! Update the resource status to up|down|etc starting at subtree_root. * * \param root_path path to the root of the subtree to update. @@ -474,14 +489,19 @@ class dfu_impl_t { unsigned int needs, bool excl, const jobmeta_t &jobmeta, bool full, std::map &to_parent, bool emit_shadow); - - int rem_txfilter (vtx_t u, int64_t jobid, bool &stop); - int rem_agfilter (vtx_t u, int64_t jobid, const std::string &s); - int rem_idata (vtx_t u, int64_t jobid, const std::string &s, bool &stop); - int rem_plan (vtx_t u, int64_t jobid); - int rem_upv (vtx_t u, int64_t jobid); - int rem_dfv (vtx_t u, int64_t jobid); - int rem_exv (int64_t jobid); + bool rem_tag (vtx_t u, int64_t jobid); + int rem_exclusive_filter (vtx_t u, int64_t jobid, + const modify_data_t &mod_data); + int mod_agfilter (vtx_t u, int64_t jobid, const std::string &s, + const modify_data_t &mod_data, bool &stop); + int mod_idata (vtx_t u, int64_t jobid, const std::string &s, + const modify_data_t &mod_data, bool &stop); + int mod_plan (vtx_t u, int64_t jobid, modify_data_t &mod_data); + int mod_upv (vtx_t u, int64_t jobid, const modify_data_t &mod_data); + int mod_dfv (vtx_t u, int64_t jobid, modify_data_t &mod_data); + int mod_exv (int64_t jobid, const modify_data_t &mod_data); + int cancel_vertex (vtx_t vtx, modify_data_t &mod_data, + int64_t jobid); /************************************************************************ diff --git a/resource/traversers/dfu_impl_update.cpp b/resource/traversers/dfu_impl_update.cpp index ea7c6a56a..dc4d3f55e 100644 --- a/resource/traversers/dfu_impl_update.cpp +++ b/resource/traversers/dfu_impl_update.cpp @@ -348,29 +348,28 @@ int dfu_impl_t::upd_dfv (vtx_t u, std::shared_ptr &writers, excl, n_plans, jobmeta, full, dfu, to_parent); } -int dfu_impl_t::rem_txfilter (vtx_t u, int64_t jobid, bool &stop) +int dfu_impl_t::rem_exclusive_filter (vtx_t u, int64_t jobid, + const modify_data_t &mod_data) { int rc = -1; int64_t span = -1; planner_t *x_checker = NULL; - auto &x_spans = (*m_graph)[u].idata.x_spans; - auto &tags = (*m_graph)[u].idata.tags; - if (tags.find (jobid) == tags.end ()) { - stop = true; - rc = 0; - goto done; - } - if (x_spans.find (jobid) == x_spans.end ()) { - m_err_msg += __FUNCTION__; - m_err_msg += ": jobid isn't found in x_spans table.\n "; - goto done; + auto span_it = (*m_graph)[u].idata.x_spans.find (jobid); + if (span_it == (*m_graph)[u].idata.x_spans.end ()) { + if (mod_data.mod_type != job_modify_t::PARTIAL_CANCEL) { + m_err_msg += __FUNCTION__; + m_err_msg += ": jobid isn't found in x_spans table.\n "; + goto done; + } else { + rc = 0; + goto done; + } } x_checker = (*m_graph)[u].idata.x_checker; - (*m_graph)[u].idata.tags.erase (jobid); - span = (*m_graph)[u].idata.x_spans[jobid]; - (*m_graph)[u].idata.x_spans.erase (jobid); + span = span_it->second; + (*m_graph)[u].idata.x_spans.erase (span_it); if ( (rc = planner_rem_span (x_checker, span)) == -1) { m_err_msg += __FUNCTION__; m_err_msg += "planner_rem_span returned -1.\n"; @@ -383,96 +382,206 @@ int dfu_impl_t::rem_txfilter (vtx_t u, int64_t jobid, bool &stop) return rc; } -int dfu_impl_t::rem_agfilter (vtx_t u, int64_t jobid, - const std::string &subsystem) +bool dfu_impl_t::rem_tag (vtx_t u, int64_t jobid) +{ + auto tag_it = (*m_graph)[u].idata.tags.find (jobid); + if (tag_it == (*m_graph)[u].idata.tags.end ()) { + // stop removal + return true; + } else { + (*m_graph)[u].idata.tags.erase (tag_it); + return false; + } +} + +int dfu_impl_t::mod_agfilter (vtx_t u, int64_t jobid, + const std::string &subsystem, + const modify_data_t &mod_data, + bool &stop) { int rc = 0; - int span = -1; + bool removed = false; planner_multi_t *subtree_plan = NULL; auto &job2span = (*m_graph)[u].idata.job2span; + std::map::iterator span_it; - if ((subtree_plan = (*m_graph)[u].idata.subplans[subsystem]) == NULL) + if ( (subtree_plan = (*m_graph)[u].idata.subplans[subsystem]) == NULL) goto done; - if (job2span.find (jobid) == job2span.end ()) + + span_it = job2span.find (jobid); + if (span_it == job2span.end ()) { + if (mod_data.mod_type == job_modify_t::PARTIAL_CANCEL) + stop = true; goto done; - if ((span = job2span[jobid]) == -1) { + } + if (span_it->second == -1) { rc = -1; goto done; } - if ((rc = planner_multi_rem_span (subtree_plan, span)) != 0) { - m_err_msg += __FUNCTION__; - m_err_msg += ": planner_multi_rem_span returned -1.\n"; - m_err_msg += (*m_graph)[u].name + ".\n"; - m_err_msg += strerror (errno); - m_err_msg += "\n"; + if (mod_data.mod_type != job_modify_t::PARTIAL_CANCEL) { + if ( (rc = planner_multi_rem_span (subtree_plan, + span_it->second)) != 0) { + m_err_msg += __FUNCTION__; + m_err_msg += ": planner_multi_rem_span returned -1.\n"; + m_err_msg += (*m_graph)[u].name + ".\n"; + m_err_msg += strerror (errno); + m_err_msg += "\n"; + goto done; + } + job2span.erase (span_it); + } else { // PARTIAL_CANCEL + auto &tags = (*m_graph)[u].idata.tags; + if (tags.find (jobid) == tags.end ()) { + // stop removal + stop = true; + goto done; + } + // If not a default/root rank and the rank is still in the graph + // but don't remove exclusive filter as allocation may be exclusive + // at the subgraph rooted here. + if ( (mod_data.ranks_removed.find ((*m_graph)[u].rank) + == mod_data.ranks_removed.end ()) + && (*m_graph)[u].rank != -1) { + stop = true; + goto done; + } + if (mod_data.type_to_count.size () > 0) { + std::vector reduced_types; + std::vector reduced_counts; + for (auto t2ct_it : mod_data.type_to_count) { + reduced_types.push_back (t2ct_it.first); + reduced_counts.push_back (t2ct_it.second); + } + if ( (rc = planner_multi_reduce_span (subtree_plan, + span_it->second, + &(reduced_counts[0]), + &(reduced_types[0]), + mod_data.type_to_count.size (), + removed)) != 0) { + m_err_msg += __FUNCTION__; + m_err_msg += ": planner_multi_reduce_span returned -1.\n"; + m_err_msg += (*m_graph)[u].name + ".\n"; + m_err_msg += strerror (errno); + m_err_msg += "\n"; + goto done; + } + } else { + m_err_msg += __FUNCTION__; + m_err_msg += ": type_to_count empty.\n"; + m_err_msg += (*m_graph)[u].name + ".\n"; + m_err_msg += strerror (errno); + m_err_msg += "\n"; + rc = -1; + goto done; + } + if (removed) { + // Fully removed; need to remove job2span and tag + job2span.erase (span_it); + rem_tag (u, jobid); + } + rc = rem_exclusive_filter (u, jobid, mod_data); } - job2span.erase (jobid); done: return rc; } -int dfu_impl_t::rem_idata (vtx_t u, int64_t jobid, - const std::string &subsystem, bool &stop) +int dfu_impl_t::mod_idata (vtx_t u, int64_t jobid, + const std::string &subsystem, + const modify_data_t &mod_data, + bool &stop) { - int rc = -1; - - if ( (rc = rem_txfilter (u, jobid, stop)) != 0 || stop) - goto done; - if ( (rc = rem_agfilter (u, jobid, subsystem)) != 0) - goto done; - -done: - return rc; + // Only remove the txfilter span and tag first if we're completely + // cancelling the vertex + if (mod_data.mod_type != job_modify_t::PARTIAL_CANCEL) { + // returns true if stopping + if ( (stop = rem_tag (u, jobid))) + return 0; + if (rem_exclusive_filter (u, jobid, mod_data) != 0) + return -1; + } + // If mod_type == job_modify_t::PARTIAL_CANCEL here, + // job_mod_agfilter determines if all resources are removed. If so, + // job_mod_agfilter will then call rem_tag. + return mod_agfilter (u, jobid, subsystem, mod_data, stop); } -int dfu_impl_t::rem_plan (vtx_t u, int64_t jobid) +int dfu_impl_t::mod_plan (vtx_t u, int64_t jobid, + modify_data_t &mod_data) { int rc = 0; int64_t span = -1; + int64_t prev_count = -1; + int64_t to_remove = 0; + bool removed = false; + std::map::iterator alloc_span; + std::map::iterator res_span; planner_t *plans = NULL; - if ((*m_graph)[u].schedule.allocations.find (jobid) - != (*m_graph)[u].schedule.allocations.end ()) { - span = (*m_graph)[u].schedule.allocations[jobid]; - (*m_graph)[u].schedule.allocations.erase (jobid); - } else if ((*m_graph)[u].schedule.reservations.find (jobid) + alloc_span = (*m_graph)[u].schedule.allocations.find (jobid); + if (alloc_span != (*m_graph)[u].schedule.allocations.end ()) { + span = alloc_span->second; + if (mod_data.mod_type != job_modify_t::PARTIAL_CANCEL) { + (*m_graph)[u].schedule.allocations.erase (alloc_span); + } + } else if ( (res_span = (*m_graph)[u].schedule.reservations.find (jobid)) != (*m_graph)[u].schedule.reservations.end ()) { - span = (*m_graph)[u].schedule.reservations[jobid]; - (*m_graph)[u].schedule.reservations.erase (jobid); + span = res_span->second; + // Can't be PARTIAL_CANCEL + (*m_graph)[u].schedule.reservations.erase (res_span); } else { goto done; } plans = (*m_graph)[u].schedule.plans; - if ( (rc = planner_rem_span (plans, span)) == -1) { + if (mod_data.mod_type != job_modify_t::PARTIAL_CANCEL) { + if (mod_data.mod_type == job_modify_t::VTX_CANCEL) + prev_count = planner_span_resource_count (plans, span); + if ( (rc = planner_rem_span (plans, span)) == -1) { + m_err_msg += __FUNCTION__; + m_err_msg += ": planner_rem_span returned -1.\n"; + m_err_msg += (*m_graph)[u].name + ".\n"; + m_err_msg += strerror (errno); + m_err_msg += ".\n"; + goto done; + } + // Accumulate counts per type to partially remove from filters + if (mod_data.mod_type == job_modify_t::VTX_CANCEL) { + mod_data.type_to_count[(*m_graph)[u].type.c_str ()] += prev_count; + } + } else { // PARTIAL_CANCEL m_err_msg += __FUNCTION__; - m_err_msg += ": planner_rem_span returned -1.\n"; + m_err_msg += ": traverser tried to remove schedule and span"; + m_err_msg += " after vtx_cancel during partial cancel:\n"; m_err_msg += (*m_graph)[u].name + ".\n"; m_err_msg += strerror (errno); m_err_msg += ".\n"; + rc = -1; } done: return rc; } -int dfu_impl_t::rem_upv (vtx_t u, int64_t jobid) +int dfu_impl_t::mod_upv (vtx_t u, int64_t jobid, + const modify_data_t &mod_data) { // NYI: remove schedule data for upwalk return 0; } -int dfu_impl_t::rem_dfv (vtx_t u, int64_t jobid) +int dfu_impl_t::mod_dfv (vtx_t u, int64_t jobid, + modify_data_t &mod_data) { int rc = 0; bool stop = false; const std::string &dom = m_match->dom_subsystem (); f_out_edg_iterator_t ei, ei_end; - if ( (rc = rem_idata (u, jobid, dom, stop)) != 0 || stop) + if ( (rc = mod_idata (u, jobid, dom, mod_data, + stop)) != 0 || stop) goto done; - if ( (rc = rem_plan (u, jobid)) != 0) + if ( (rc = mod_plan (u, jobid, mod_data)) != 0) goto done; for (auto &subsystem : m_match->subsystems ()) { for (tie (ei, ei_end) = out_edges (u, *m_graph); ei != ei_end; ++ei) { @@ -480,16 +589,16 @@ int dfu_impl_t::rem_dfv (vtx_t u, int64_t jobid) continue; vtx_t tgt = target (*ei, *m_graph); if (subsystem == dom) - rc += rem_dfv (tgt, jobid); + rc += mod_dfv (tgt, jobid, mod_data); else - rc += rem_upv (tgt, jobid); + rc += mod_upv (tgt, jobid, mod_data); } } done: return rc; } -int dfu_impl_t::rem_exv (int64_t jobid) +int dfu_impl_t::mod_exv (int64_t jobid, const modify_data_t &mod_data) { int rc = -1; int64_t span = -1; @@ -531,7 +640,23 @@ int dfu_impl_t::rem_exv (int64_t jobid) return (!rc)? 0 : -1; } +int dfu_impl_t::cancel_vertex (vtx_t vtx, modify_data_t &mod_data, + int64_t jobid) +{ + int rc = -1; + bool stop = false; + const std::string &dom = m_match->dom_subsystem (); + + if ( (rc = mod_idata (vtx, jobid, dom, mod_data, stop)) == -1) { + errno = EINVAL; + return rc; + } + if ( (rc = mod_plan (vtx, jobid, mod_data)) == -1) + errno = EINVAL; + return rc; + +} //////////////////////////////////////////////////////////////////////////////// // DFU Traverser Implementation Update API @@ -644,8 +769,69 @@ int dfu_impl_t::remove (vtx_t root, int64_t jobid) { bool root_has_jtag = ((*m_graph)[root].idata.tags.find (jobid) != (*m_graph)[root].idata.tags.end ()); + modify_data_t mod_data; + mod_data.mod_type = job_modify_t::CANCEL; m_color.reset (); - return (root_has_jtag)? rem_dfv (root, jobid) : rem_exv (jobid); + return (root_has_jtag)? mod_dfv (root, jobid, mod_data) + : mod_exv (jobid, mod_data); +} + +int dfu_impl_t::remove (vtx_t root, const std::string &R_to_cancel, + std::shared_ptr &reader, + int64_t jobid, bool &full_cancel) +{ + int rc = -1; + modify_data_t mod_data; + resource_graph_t &g = m_graph_db->resource_graph; + resource_graph_metadata_t &m = m_graph_db->metadata; + + if (reader->partial_cancel (g, m, mod_data, R_to_cancel, + jobid) != 0) { + m_err_msg += __FUNCTION__; + m_err_msg += ": partial_cancel returned error.\n"; + return -1; + } + + // If type_to_count size is 0, reader was not JGF + if (mod_data.type_to_count.size () == 0) { + // Set modify type to be vertex cancel + mod_data.mod_type = job_modify_t::VTX_CANCEL; + for (const int64_t rank : mod_data.ranks_removed) { + auto rank_vector = m.by_rank.find (rank); + if (rank_vector == m.by_rank.end ()) { + m_err_msg += __FUNCTION__; + m_err_msg += ": rank not found in by_rank map.\n"; + return -1; + } + for (vtx_t vtx : rank_vector->second) { + // Cancel the vertex if it has job tag. Not necessary + // but reduces number of checks before function return + if ( (*m_graph)[vtx].idata.tags.find (jobid) + != (*m_graph)[vtx].idata.tags.end ()) { + if ( (rc = cancel_vertex (vtx, mod_data, jobid)) != 0) { + errno = EINVAL; + return rc; + } + } + } + } + } + + bool root_has_jtag = ((*m_graph)[root].idata.tags.find (jobid) + != (*m_graph)[root].idata.tags.end ()); + // Now partial cancel DFV from graph root + mod_data.mod_type = job_modify_t::PARTIAL_CANCEL; + m_color.reset (); + if (root_has_jtag) { + rc = mod_dfv (root, jobid, mod_data); + // Was the root vertex's job tag removed? If so, full_cancel + full_cancel = ((*m_graph)[root].idata.tags.find (jobid) + == (*m_graph)[root].idata.tags.end ()); + } else { + rc = mod_exv (jobid, mod_data); + } + + return rc; } int dfu_impl_t::mark (const std::string &root_path,