Skip to content

Commit

Permalink
resource: add support for post partial cancel processing
Browse files Browse the repository at this point in the history
Problem: a partial cancel successfully removes the allocations of the
other resource vertices (especially core, which is installed in all
pruning filters by default) because they have broker ranks. However,
when the final .free RPC fails to remove an ssd vertex allocation the
full cleanup cancel exits with an error when it hits the vertices it
has already cancelled.

Add support for treating the condition as normal behavior if it is
encountered post partial cancel.
  • Loading branch information
milroy committed Oct 17, 2024
1 parent 7c9b277 commit 40881b3
Show file tree
Hide file tree
Showing 12 changed files with 78 additions and 35 deletions.
15 changes: 12 additions & 3 deletions resource/modules/resource_match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1908,6 +1908,11 @@ static int run_remove (std::shared_ptr<resource_ctx_t> &ctx,
{
int rc = -1;
dfu_traverser_t &tr = *(ctx->traverser);
std::shared_ptr<job_info_t> info = nullptr;
bool pcanceled = false;

if (is_existent_jobid (ctx, jobid))
info = ctx->jobs[jobid];

if (part_cancel) {
// RV1exec only reader supported in production currently
Expand All @@ -1921,21 +1926,25 @@ static int run_remove (std::shared_ptr<resource_ctx_t> &ctx,
static_cast<intmax_t> (jobid));
goto out;
}
if (info)
info->set_pcanceled ();
rc = tr.remove (R, reader, jobid, full_removal);
} else {
rc = tr.remove (jobid);
if (info)
pcanceled = info->get_pcancel_state ();

rc = tr.remove (jobid, pcanceled);
full_removal = true;
}
if (rc != 0) {
if (is_existent_jobid (ctx, jobid)) {
if (info) {
// When this condition arises, we will be less likely
// to be able to reuse this jobid. Having the errored job
// in the jobs map will prevent us from reusing the jobid
// up front. Note that a same jobid can be reserved and
// removed multiple times by the upper queuing layer
// as part of providing advanced queueing policies
// (e.g., conservative backfill).
std::shared_ptr<job_info_t> info = ctx->jobs[jobid];
info->state = job_lifecycle_t::ERROR;
}
flux_log (ctx->h,
Expand Down
4 changes: 3 additions & 1 deletion resource/planner/c/planner_multi.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,15 @@ int64_t planner_multi_add_span (planner_multi_t *ctx,
* \param ctx opaque multi-planner context returned
* from planner_multi_new.
* \param span_id span_id returned from planner_multi_add_span.
* \param post_pcancel bool indicating whether preceded by one or more partial
* cancels
* \return 0 on success; -1 on error with errno set as follows:
* EINVAL: invalid argument.
* EKEYREJECTED: span could not be removed from
* the planner's internal data structures.
* ERANGE: a resource state became out of a valid range.
*/
int planner_multi_rem_span (planner_multi_t *ctx, int64_t span_id);
int planner_multi_rem_span (planner_multi_t *ctx, int64_t span_id, bool post_pcancel);

/*! Reduce the existing span's resources from the planner.
* This function will be called for a partial release/cancel.
Expand Down
10 changes: 7 additions & 3 deletions resource/planner/c/planner_multi_c_interface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ extern "C" int64_t planner_multi_add_span (planner_multi_t *ctx,
return mspan;
}

extern "C" int planner_multi_rem_span (planner_multi_t *ctx, int64_t span_id)
extern "C" int planner_multi_rem_span (planner_multi_t *ctx, int64_t span_id, bool post_pcancel)
{
size_t i;
int rc = -1;
Expand All @@ -423,8 +423,12 @@ extern "C" int planner_multi_rem_span (planner_multi_t *ctx, int64_t span_id)
goto done;
}
for (i = 0; i < it->second.size (); ++i) {
if (planner_rem_span (ctx->plan_multi->get_planner_at (i), it->second[i]) == -1)
goto done;
if (planner_rem_span (ctx->plan_multi->get_planner_at (i), it->second[i]) == -1) {
// If executed after partial cancel, depending on pruning filter settings
// some spans may no longer exist. In that case there is no error.
if (!post_pcancel)
goto done;
}
}
ctx->plan_multi->get_span_lookup ().erase (it);
rc = 0;
Expand Down
4 changes: 2 additions & 2 deletions resource/planner/test/planner_test02.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ static int test_multi_add_remove ()
span3 = planner_multi_add_span (ctx, 2000, 1000, request3, len);
ok ((span3 != -1), "span added for (%s)", ss.str ().c_str ());

rc = planner_multi_rem_span (ctx, span2);
rc = planner_multi_rem_span (ctx, span2, false);
ok (!rc, "multi_rem_span works");

size = planner_multi_span_size (ctx);
Expand Down Expand Up @@ -582,7 +582,7 @@ static int test_constructors_and_overload ()
bo = (bo || !(planner_multis_equal (ctx, ctx2)));
ok (!bo, "test copy constructor doesn't mutate planner");
// Compare planners after mutation
rc = planner_multi_rem_span (ctx2, span);
rc = planner_multi_rem_span (ctx2, span, false);
size = planner_multi_span_size (ctx2);
ok ((size == 2), "planner_multi_span_size works after copy");

Expand Down
2 changes: 1 addition & 1 deletion resource/readers/resource_reader_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
namespace Flux {
namespace resource_model {

enum class job_modify_t { CANCEL, PARTIAL_CANCEL, VTX_CANCEL };
enum class job_modify_t { CANCEL, POST_PCANCEL, PARTIAL_CANCEL, VTX_CANCEL };

struct modify_data_t {
job_modify_t mod_type = job_modify_t::PARTIAL_CANCEL;
Expand Down
2 changes: 1 addition & 1 deletion resource/readers/resource_reader_jgf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,7 @@ int resource_reader_jgf_t::cancel_vtx (vtx_t vtx,
if (agg_span != job2span.end ()) {
if ((subtree_plan = g[vtx].idata.subplans[containment_sub]) == NULL)
goto ret;
if (planner_multi_rem_span (subtree_plan, agg_span->second) != 0)
if (planner_multi_rem_span (subtree_plan, agg_span->second, false) != 0)
goto ret;
// Delete from job2span tracker
job2span.erase (update_data.jobid);
Expand Down
27 changes: 17 additions & 10 deletions resource/reapi/bindings/c++/reapi_cli_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -668,18 +668,23 @@ void resource_query_t::set_job (const uint64_t jobid, const std::shared_ptr<job_
int resource_query_t::remove_job (const uint64_t jobid)
{
int rc = -1;
std::shared_ptr<job_info_t> info = nullptr;
bool pcanceled = false;

if (jobid > (uint64_t)std::numeric_limits<int64_t>::max ()) {
errno = EOVERFLOW;
return rc;
}

rc = traverser->remove (static_cast<int64_t> (jobid));
if (jobs.find (jobid) != jobs.end ()) {
info = jobs[jobid];
pcanceled = info->get_pcancel_state ();
}

rc = traverser->remove (static_cast<int64_t> (jobid), pcanceled);
if (rc == 0) {
if (jobs.find (jobid) != jobs.end ()) {
std::shared_ptr<job_info_t> info = jobs[jobid];
if (info)
info->state = job_lifecycle_t::CANCELED;
}
} else {
m_err_msg += traverser->err_message ();
traverser->clear_err_message ();
Expand All @@ -691,6 +696,7 @@ int resource_query_t::remove_job (const uint64_t jobid, const std::string &R, bo
{
int rc = -1;
std::shared_ptr<resource_reader_base_t> reader;
std::shared_ptr<job_info_t> info = nullptr;

if (jobid > (uint64_t)std::numeric_limits<int64_t>::max ()) {
errno = EOVERFLOW;
Expand All @@ -706,14 +712,15 @@ int resource_query_t::remove_job (const uint64_t jobid, const std::string &R, bo
return rc;
}

if (jobs.find (jobid) != jobs.end ()) {
info = jobs[jobid];
info->set_pcanceled ();
}

rc = traverser->remove (R, reader, static_cast<int64_t> (jobid), full_removal);
if (rc == 0) {
if (full_removal) {
auto job_info_it = jobs.find (jobid);
if (job_info_it != jobs.end ()) {
job_info_it->second->state = job_lifecycle_t::CANCELED;
}
}
if (full_removal && info)
info->state = job_lifecycle_t::CANCELED;
} else {
m_err_msg += traverser->err_message ();
traverser->clear_err_message ();
Expand Down
4 changes: 2 additions & 2 deletions resource/traversers/dfu.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ int dfu_traverser_t::find (std::shared_ptr<match_writers_t> &writers, const std:
return detail::dfu_impl_t::find (writers, criteria);
}

int dfu_traverser_t::remove (int64_t jobid)
int dfu_traverser_t::remove (int64_t jobid, bool pcanceled)
{
subsystem_t dom = get_match_cb ()->dom_subsystem ();
if (!get_graph () || !get_graph_db ()
Expand All @@ -449,7 +449,7 @@ int dfu_traverser_t::remove (int64_t jobid)
}

vtx_t root = get_graph_db ()->metadata.roots.at (dom);
return detail::dfu_impl_t::remove (root, jobid);
return detail::dfu_impl_t::remove (root, jobid, pcanceled);
}

int dfu_traverser_t::remove (const std::string &R_to_cancel,
Expand Down
4 changes: 3 additions & 1 deletion resource/traversers/dfu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,12 @@ class dfu_traverser_t : protected detail::dfu_impl_t {
* the resource state.
*
* \param jobid job id.
* \param pcanceled bool indicating whether preceded by one or more partial
* cancels
* \return 0 on success; -1 on error.
* EINVAL: graph, roots or match callback not set.
*/
int remove (int64_t jobid);
int remove (int64_t jobid, bool pcanceled);

/*! Remove the allocation/reservation referred to by jobid and update
* the resource state.
Expand Down
4 changes: 3 additions & 1 deletion resource/traversers/dfu_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,9 +284,11 @@ class dfu_impl_t {
*
* \param root root resource vertex.
* \param jobid job id.
* \param pcanceled bool indicating whether preceded by one or more partial
* cancels
* \return 0 on success; -1 on error.
*/
int remove (vtx_t root, int64_t jobid);
int remove (vtx_t root, int64_t jobid, bool pcanceled);

/*! Remove the allocation/reservation referred to by jobid and update
* the resource state.
Expand Down
15 changes: 12 additions & 3 deletions resource/traversers/dfu_impl_update.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,11 @@ int dfu_impl_t::mod_agfilter (vtx_t u,
goto done;
}
if (mod_data.mod_type != job_modify_t::PARTIAL_CANCEL) {
if ((rc = planner_multi_rem_span (subtree_plan, span_it->second)) != 0) {
bool post_pcancel = false;
if (mod_data.mod_type == job_modify_t::POST_PCANCEL)
post_pcancel = true;

if ((rc = planner_multi_rem_span (subtree_plan, span_it->second, post_pcancel)) != 0) {
m_err_msg += __FUNCTION__;
m_err_msg += ": planner_multi_rem_span returned -1.\n";
m_err_msg += (*m_graph)[u].name + ".\n";
Expand Down Expand Up @@ -769,12 +773,17 @@ int dfu_impl_t::update (vtx_t root,
return (rc > 0) ? 0 : -1;
}

int dfu_impl_t::remove (vtx_t root, int64_t jobid)
int dfu_impl_t::remove (vtx_t root, int64_t jobid, bool pcanceled)
{
bool root_has_jtag =
((*m_graph)[root].idata.tags.find (jobid) != (*m_graph)[root].idata.tags.end ());
modify_data_t mod_data;
mod_data.mod_type = job_modify_t::CANCEL;

if (pcanceled)
mod_data.mod_type = job_modify_t::POST_PCANCEL;
else
mod_data.mod_type = job_modify_t::CANCEL;

m_color.reset ();
return (root_has_jtag) ? mod_dfv (root, jobid, mod_data) : mod_exv (jobid, mod_data);
}
Expand Down
22 changes: 15 additions & 7 deletions resource/utilities/command.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,16 @@ command_t commands[] =
static int do_remove (std::shared_ptr<resource_context_t> &ctx, int64_t jobid)
{
int rc = -1;
if ((rc = ctx->traverser->remove ((int64_t)jobid)) == 0) {
if (ctx->jobs.find (jobid) != ctx->jobs.end ()) {
std::shared_ptr<job_info_t> info = ctx->jobs[jobid];
bool pcanceled = false;
std::shared_ptr<job_info_t> info = nullptr;

if (ctx->jobs.find (jobid) != ctx->jobs.end ()) {
info = ctx->jobs[jobid];
pcanceled = info->get_pcancel_state ();
}
if ((rc = ctx->traverser->remove ((int64_t)jobid, pcanceled)) == 0) {
if (info)
info->state = job_lifecycle_t::CANCELED;
}
} else {
std::cout << ctx->traverser->err_message ();
ctx->traverser->clear_err_message ();
Expand All @@ -127,12 +132,15 @@ static int do_partial_remove (std::shared_ptr<resource_context_t> &ctx,
bool &full_cancel)
{
int rc = -1;
std::shared_ptr<job_info_t> info = nullptr;

if (ctx->jobs.find (jobid) != ctx->jobs.end ()) {
info = ctx->jobs[jobid];
info->set_pcanceled ();
}
if ((rc = ctx->traverser->remove (R_cancel, reader, (int64_t)jobid, full_cancel)) == 0) {
if (full_cancel && (ctx->jobs.find (jobid) != ctx->jobs.end ())) {
std::shared_ptr<job_info_t> info = ctx->jobs[jobid];
if (full_cancel && info)
info->state = job_lifecycle_t::CANCELED;
}
} else {
std::cout << ctx->traverser->err_message ();
ctx->traverser->clear_err_message ();
Expand Down

0 comments on commit 40881b3

Please sign in to comment.