Skip to content

Commit

Permalink
traverser: add partial cancellation functionality
Browse files Browse the repository at this point in the history
Problem: Fluxion issue
flux-framework#1151 and flux-core
issue flux-framework/flux-core#4312
identified the need for partial release of resources. The current
functionality need is to release all resources managed by a single
broker rank. In the future support for releasing arbitrary subgraphs
will be needed for cloud and converged use cases.

Modify the rem_* traverser functions to take a modification type and
type_to_count unordered_map. Add logic in the recursive job
modification calls to distinguish between a full and partial job
cancellation and issue corresponding planner interface calls, handling
errors as needed.
  • Loading branch information
milroy committed Jun 16, 2024
1 parent 5ebe49f commit 3126501
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 34 deletions.
17 changes: 17 additions & 0 deletions resource/traversers/dfu.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,23 @@ int dfu_traverser_t::remove (int64_t jobid)
return detail::dfu_impl_t::remove (root, jobid);
}

int dfu_traverser_t::remove (int64_t jobid,
const std::unordered_map<std::string,
int64_t> &type_to_count)
{
const subsystem_t &dom = get_match_cb ()->dom_subsystem ();
if (!get_graph () || !get_graph_db ()
|| get_graph_db ()->metadata.roots.find (dom)
== get_graph_db ()->metadata.roots.end ()
|| !get_match_cb ()) {
errno = EINVAL;
return -1;
}

vtx_t root = get_graph_db ()->metadata.roots.at (dom);
return detail::dfu_impl_t::remove (root, jobid, type_to_count);
}

int dfu_traverser_t::mark (const std::string &root_path,
resource_pool_t::status_t status)
{
Expand Down
13 changes: 13 additions & 0 deletions resource/traversers/dfu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,19 @@ class dfu_traverser_t : protected detail::dfu_impl_t
*/
int remove (int64_t jobid);

/*! Remove the allocation/reservation referred to by jobid and update
* the resource state.
*
* \param jobid job id.
* \param type_to_count map keyed by resource type and taking values
* corresponding to the number of resources to remove
* during a partial cancellation.
* \return 0 on success; -1 on error.
* EINVAL: graph, roots or match callback not set.
*/
int remove (int64_t jobid,
const std::unordered_map<std::string, int64_t> &type_to_count);

/*! Mark the resource status up|down|etc starting at subtree_root.
*
* \param root_path path to the root of the subtree to update.
Expand Down
31 changes: 25 additions & 6 deletions resource/traversers/dfu_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ namespace detail {

enum class visit_t { DFV, UPV };

enum class job_modify_t { CANCEL, PARTIAL_CANCEL };

enum class match_kind_t { RESOURCE_MATCH,
SLOT_MATCH,
NONE_MATCH,
Expand Down Expand Up @@ -294,6 +296,9 @@ class dfu_impl_t {
*/
int remove (vtx_t root, int64_t jobid);

int remove (vtx_t root, int64_t jobid,
const std::unordered_map<std::string, int64_t> &type_to_count);

/*! Update the resource status to up|down|etc starting at subtree_root.
*
* \param root_path path to the root of the subtree to update.
Expand Down Expand Up @@ -478,12 +483,26 @@ class dfu_impl_t {
bool emit_shadow);

int rem_txfilter (vtx_t u, int64_t jobid, bool &stop);
int rem_agfilter (vtx_t u, int64_t jobid, const std::string &s);
int rem_idata (vtx_t u, int64_t jobid, const std::string &s, bool &stop);
int rem_plan (vtx_t u, int64_t jobid);
int rem_upv (vtx_t u, int64_t jobid);
int rem_dfv (vtx_t u, int64_t jobid);
int rem_exv (int64_t jobid);
int job_mod_agfilter (vtx_t u, int64_t jobid, const std::string &s,
job_modify_t mod_type,
const std::unordered_map<std::string,
int64_t> &type_to_count);
int job_mod_idata (vtx_t u, int64_t jobid, const std::string &s,
bool &stop, job_modify_t mod_type,
const std::unordered_map<std::string,
int64_t> &type_to_count);
int job_mod_plan (vtx_t u, int64_t jobid, job_modify_t mod_type,
const std::unordered_map<std::string,
int64_t> &type_to_count);
int job_mod_upv (vtx_t u, int64_t jobid, job_modify_t mod_type,
const std::unordered_map<std::string,
int64_t> &type_to_count);
int job_mod_dfv (vtx_t u, int64_t jobid, job_modify_t mod_type,
const std::unordered_map<std::string,
int64_t> &type_to_count);
int job_mod_exv (int64_t jobid, job_modify_t mod_type,
const std::unordered_map<std::string,
int64_t> &type_to_count);


/************************************************************************
Expand Down
150 changes: 122 additions & 28 deletions resource/traversers/dfu_impl_update.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,11 +385,15 @@ int dfu_impl_t::rem_txfilter (vtx_t u, int64_t jobid, bool &stop)
return rc;
}

int dfu_impl_t::rem_agfilter (vtx_t u, int64_t jobid,
const std::string &subsystem)
int dfu_impl_t::job_mod_agfilter (vtx_t u, int64_t jobid,
const std::string &subsystem,
job_modify_t mod_type,
const std::unordered_map<std::string,
int64_t> &type_to_count)
{
int rc = 0;
int span = -1;
bool removed = false;
planner_multi_t *subtree_plan = NULL;
auto &job2span = (*m_graph)[u].idata.job2span;

Expand All @@ -401,97 +405,173 @@ int dfu_impl_t::rem_agfilter (vtx_t u, int64_t jobid,
rc = -1;
goto done;
}
if ((rc = planner_multi_rem_span (subtree_plan, span)) != 0) {
m_err_msg += __FUNCTION__;
m_err_msg += ": planner_multi_rem_span returned -1.\n";
m_err_msg += (*m_graph)[u].name + ".\n";
m_err_msg += strerror (errno);
m_err_msg += "\n";
if (mod_type == job_modify_t::CANCEL) {
removed = true;
if ( (rc = planner_multi_rem_span (subtree_plan, span)) != 0) {
m_err_msg += __FUNCTION__;
m_err_msg += ": planner_multi_rem_span returned -1.\n";
m_err_msg += (*m_graph)[u].name + ".\n";
m_err_msg += strerror (errno);
m_err_msg += "\n";
}
} else { // PARTIAL_CANCEL
if (type_to_count.size () > 0) {
std::vector<uint64_t> reduced_counts;
std::vector<const char *> reduced_types;
for (auto it : type_to_count) {
reduced_types.push_back (it.first.c_str ());
reduced_counts.push_back (it.second);
}
if ( (rc = planner_multi_reduce_span (subtree_plan, span,
&(reduced_counts[0]),
&(reduced_types[0]),
type_to_count.size (),
removed)) != 0) {
m_err_msg += __FUNCTION__;
m_err_msg += ": planner_multi_reduce_span returned -1.\n";
m_err_msg += (*m_graph)[u].name + ".\n";
m_err_msg += strerror (errno);
m_err_msg += "\n";
}
} else {
rc = -1;
m_err_msg += __FUNCTION__;
m_err_msg += ": type_to_count empty.\n";
m_err_msg += (*m_graph)[u].name + ".\n";
m_err_msg += strerror (errno);
m_err_msg += "\n";
}
}
job2span.erase (jobid);
if (removed)
job2span.erase (jobid);

done:
return rc;
}

int dfu_impl_t::rem_idata (vtx_t u, int64_t jobid,
const std::string &subsystem, bool &stop)
int dfu_impl_t::job_mod_idata (vtx_t u, int64_t jobid,
const std::string &subsystem, bool &stop,
job_modify_t mod_type,
const std::unordered_map<std::string,
int64_t> &type_to_count)
{
int rc = -1;

if ( (rc = rem_txfilter (u, jobid, stop)) != 0 || stop)
goto done;
if ( (rc = rem_agfilter (u, jobid, subsystem)) != 0)
if ( (rc = job_mod_agfilter (u, jobid, subsystem,
mod_type, type_to_count)) != 0)
goto done;

done:
return rc;
}

int dfu_impl_t::rem_plan (vtx_t u, int64_t jobid)
int dfu_impl_t::job_mod_plan (vtx_t u, int64_t jobid,
job_modify_t mod_type,
const std::unordered_map<std::string,
int64_t> &type_to_count)
{
int rc = 0;
int64_t span = -1;
int64_t to_remove = 0;
bool removed = false;
planner_t *plans = NULL;

if ((*m_graph)[u].schedule.allocations.find (jobid)
!= (*m_graph)[u].schedule.allocations.end ()) {
span = (*m_graph)[u].schedule.allocations[jobid];
(*m_graph)[u].schedule.allocations.erase (jobid);
if (mod_type == job_modify_t::CANCEL)
(*m_graph)[u].schedule.allocations.erase (jobid);
} else if ((*m_graph)[u].schedule.reservations.find (jobid)
!= (*m_graph)[u].schedule.reservations.end ()) {
span = (*m_graph)[u].schedule.reservations[jobid];
// Can't be PARTIAL_CANCEL
(*m_graph)[u].schedule.reservations.erase (jobid);
} else {
goto done;
}

plans = (*m_graph)[u].schedule.plans;
if ( (rc = planner_rem_span (plans, span)) == -1) {
m_err_msg += __FUNCTION__;
m_err_msg += ": planner_rem_span returned -1.\n";
m_err_msg += (*m_graph)[u].name + ".\n";
m_err_msg += strerror (errno);
m_err_msg += ".\n";
if (mod_type == job_modify_t::CANCEL) {
if ( (rc = planner_rem_span (plans, span)) == -1) {
m_err_msg += __FUNCTION__;
m_err_msg += ": planner_rem_span returned -1.\n";
m_err_msg += (*m_graph)[u].name + ".\n";
m_err_msg += strerror (errno);
m_err_msg += ".\n";
}
} else { // PARTIAL_CANCEL
auto res_to_remove = type_to_count.find ((*m_graph)[u].type);
if (res_to_remove != type_to_count.end ()) {
if ( (rc = planner_reduce_span (plans,
span,
res_to_remove->second,
removed)) == -1) {
m_err_msg += __FUNCTION__;
m_err_msg += ": planner_reduce_span returned -1.\n";
m_err_msg += (*m_graph)[u].name + ".\n";
m_err_msg += strerror (errno);
m_err_msg += ".\n";
}
if (removed) // reduce_span removed the entire allocation
(*m_graph)[u].schedule.allocations.erase (jobid);
} else {
rc = -1;
m_err_msg += __FUNCTION__;
m_err_msg += ": resource not found in type_to_count.\n";
m_err_msg += (*m_graph)[u].name + ".\n";
m_err_msg += strerror (errno);
m_err_msg += ".\n";
}
}

done:
return rc;
}

int dfu_impl_t::rem_upv (vtx_t u, int64_t jobid)
int dfu_impl_t::job_mod_upv (vtx_t u, int64_t jobid,
job_modify_t mod_type,
const std::unordered_map<std::string,
int64_t> &type_to_count)
{
// NYI: remove schedule data for upwalk
return 0;
}

int dfu_impl_t::rem_dfv (vtx_t u, int64_t jobid)
int dfu_impl_t::job_mod_dfv (vtx_t u, int64_t jobid,
job_modify_t mod_type,
const std::unordered_map<std::string,
int64_t> &type_to_count)
{
int rc = 0;
bool stop = false;
const std::string &dom = m_match->dom_subsystem ();
f_out_edg_iterator_t ei, ei_end;

if ( (rc = rem_idata (u, jobid, dom, stop)) != 0 || stop)
if ( (rc = job_mod_idata (u, jobid, dom, stop,
mod_type, type_to_count)) != 0 || stop)
goto done;
if ( (rc = rem_plan (u, jobid)) != 0)
if ( (rc = job_mod_plan (u, jobid, mod_type, type_to_count)) != 0)
goto done;
for (auto &subsystem : m_match->subsystems ()) {
for (tie (ei, ei_end) = out_edges (u, *m_graph); ei != ei_end; ++ei) {
if (!in_subsystem (*ei, subsystem) || stop_explore (*ei, subsystem))
continue;
vtx_t tgt = target (*ei, *m_graph);
if (subsystem == dom)
rc += rem_dfv (tgt, jobid);
rc += job_mod_dfv (tgt, jobid, mod_type, type_to_count);
else
rc += rem_upv (tgt, jobid);
rc += job_mod_upv (tgt, jobid, mod_type, type_to_count);
}
}
done:
return rc;
}

int dfu_impl_t::rem_exv (int64_t jobid)
int dfu_impl_t::job_mod_exv (int64_t jobid, job_modify_t mod_type,
const std::unordered_map<std::string,
int64_t> &type_to_count)
{
int rc = -1;
int64_t span = -1;
Expand Down Expand Up @@ -646,8 +726,22 @@ int dfu_impl_t::remove (vtx_t root, int64_t jobid)
{
bool root_has_jtag = ((*m_graph)[root].idata.tags.find (jobid)
!= (*m_graph)[root].idata.tags.end ());
job_modify_t mod_type = job_modify_t::CANCEL;
const std::unordered_map<std::string, int64_t> type_to_count;
m_color.reset ();
return (root_has_jtag)? job_mod_dfv (root, jobid, mod_type, type_to_count)
: job_mod_exv (jobid, mod_type, type_to_count);
}

int dfu_impl_t::remove (vtx_t root, int64_t jobid,
const std::unordered_map<std::string, int64_t> &type_to_count)
{
bool root_has_jtag = ((*m_graph)[root].idata.tags.find (jobid)
!= (*m_graph)[root].idata.tags.end ());
job_modify_t mod_type = job_modify_t::PARTIAL_CANCEL;
m_color.reset ();
return (root_has_jtag)? rem_dfv (root, jobid) : rem_exv (jobid);
return (root_has_jtag)? job_mod_dfv (root, jobid, mod_type, type_to_count)
: job_mod_exv (jobid, mod_type, type_to_count);
}

int dfu_impl_t::mark (const std::string &root_path,
Expand Down

0 comments on commit 3126501

Please sign in to comment.