Skip to content

Commit

Permalink
readers: add partial cancel support for JGF
Browse files Browse the repository at this point in the history
  • Loading branch information
milroy committed Jun 24, 2024
1 parent 39072aa commit a34e25e
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 37 deletions.
186 changes: 160 additions & 26 deletions resource/readers/resource_reader_jgf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -725,8 +725,7 @@ int resource_reader_jgf_t::find_vtx (resource_graph_t &g,

int resource_reader_jgf_t::update_vtx_plan (vtx_t v, resource_graph_t &g,
const fetch_helper_t &fetcher,
uint64_t jobid, int64_t at,
uint64_t dur, bool rsv)
jgf_updater_data &update_data)
{
int rc = -1;
int64_t span = -1;
Expand All @@ -739,7 +738,9 @@ int resource_reader_jgf_t::update_vtx_plan (vtx_t v, resource_graph_t &g,
m_err_msg += ": plan for " + g[v].name + " is null.\n";
goto done;
}
if ( (avail = planner_avail_resources_during (plans, at, dur)) == -1) {
if ( (avail = planner_avail_resources_during (plans,
update_data.at,
update_data.duration)) == -1) {
m_err_msg += __FUNCTION__;
m_err_msg += ": planner_avail_resource_during return -1 for ";
m_err_msg += g[v].name + ".\n";
Expand All @@ -749,16 +750,17 @@ int resource_reader_jgf_t::update_vtx_plan (vtx_t v, resource_graph_t &g,
if (fetcher.exclusive) {
// Update the vertex plan here (not in traverser code) so vertices
// that the traverser won't walk still get their plans updated.
if ( (span = planner_add_span (plans, at, dur,
static_cast<const uint64_t> (g[v].size))) == -1) {
if ( (span = planner_add_span (plans, update_data.at,
update_data.duration,
static_cast<const uint64_t> (g[v].size))) == -1) {
m_err_msg += __FUNCTION__;
m_err_msg += ": can't add span into " + g[v].name + ".\n";
goto done;
}
if (rsv)
g[v].schedule.reservations[jobid] = span;
if (update_data.reserved)
g[v].schedule.reservations[update_data.jobid] = span;
else
g[v].schedule.allocations[jobid] = span;
g[v].schedule.allocations[update_data.jobid] = span;
} else {
if (avail < g[v].size) {
// if g[v] has already been allocated/reserved, this is an error
Expand All @@ -773,12 +775,96 @@ int resource_reader_jgf_t::update_vtx_plan (vtx_t v, resource_graph_t &g,
return rc;
}

int resource_reader_jgf_t::cancel_vtx (vtx_t vtx, resource_graph_t &g,
resource_graph_metadata_t &m,
const fetch_helper_t &fetcher,
jgf_updater_data &update_data)
{
int rc = -1;
int64_t span = -1;
int64_t prev_avail = -1;
planner_multi_t *subtree_plan = NULL;
planner_t *x_checker = NULL;
planner_t *plans = NULL;
auto &job2span = g[vtx].idata.job2span;
auto &x_spans = g[vtx].idata.x_spans;
auto &tags = g[vtx].idata.tags;
std::map<int64_t, int64_t>::iterator span_it;
std::map<int64_t, int64_t>::iterator xspan_it;
std::map<int64_t, std::vector<vtx_t>>::iterator rank_vect;
std::vector<vtx_t>::iterator rank_position;

// remove from aggregate filter if present
auto agg_span = job2span.find (update_data.jobid);
if (agg_span != job2span.end ()) {
if ((subtree_plan = g[vtx].idata.subplans["containment"]) == NULL)
goto ret;
if (planner_multi_rem_span (subtree_plan, agg_span->second) != 0)
goto ret;
// Delete from job2span tracker
job2span.erase (update_data.jobid);
}

// remove from exclusive filter;
xspan_it = x_spans.find (update_data.jobid);
if (xspan_it == x_spans.end ()) {
errno = EINVAL;
goto ret;
}

x_checker = g[vtx].idata.x_checker;
g[vtx].idata.tags.erase (update_data.jobid);
g[vtx].idata.x_spans.erase (update_data.jobid);
if (planner_rem_span (x_checker, xspan_it->second) == -1) {
errno = EINVAL;
goto ret;
}
// rem plan
span_it = g[vtx].schedule.allocations.find (update_data.jobid);
if (span_it != g[vtx].schedule.allocations.end ()) {
g[vtx].schedule.allocations.erase (update_data.jobid);
} else {
errno = EINVAL;
goto ret;
}
plans = g[vtx].schedule.plans;
prev_avail = planner_avail_resources_at (plans, 0);
if (planner_rem_span (plans, span_it->second) == -1) {
errno = EINVAL;
goto ret;
}
// Add the newly freed counts, Can't assume it freed everything.
update_data.type_to_count[g[vtx].type] += (planner_avail_resources_at (plans, 0)
- prev_avail);
rank_vect = m.by_rank.find (g[vtx].rank);
if (rank_vect == m.by_rank.end ()) {
m_err_msg += __FUNCTION__;
m_err_msg += ": attepted to cancel rank that does not exist ";
m_err_msg += "in by_rank map\n";
goto ret;
}
rank_position = std::find (rank_vect->second.begin (),
rank_vect->second.end (), vtx);
if (rank_position == rank_vect->second.end ()) {
m_err_msg += __FUNCTION__;
m_err_msg += ": attepted to remove vertex which does not exist ";
m_err_msg += "in by_rank map\n";
goto ret;
}
rank_vect->second.erase (rank_position);
update_data.ranks.insert (g[vtx].rank);

rc = 0;

ret:
return rc;
}

int resource_reader_jgf_t::update_vtx (resource_graph_t &g,
resource_graph_metadata_t &m,
std::map<std::string, vmap_val_t> &vmap,
const fetch_helper_t &fetcher,
uint64_t jobid, int64_t at,
uint64_t dur, bool rsv)
jgf_updater_data &update_data)
{
int rc = -1;
std::map<std::string, bool> root_checks;
Expand All @@ -791,16 +877,21 @@ int resource_reader_jgf_t::update_vtx (resource_graph_t &g,
goto done;
if ( (rc = update_vmap (vmap, v, root_checks, fetcher)) != 0)
goto done;
if ( (rc = update_vtx_plan (v, g, fetcher, jobid, at, dur, rsv)) != 0)
goto done;
if (update_data.update) {
if ( (rc = update_vtx_plan (v, g, fetcher, update_data)) != 0)
goto done;
} else {
if ( (rc = cancel_vtx (v, g, m, fetcher, update_data)) != 0)
goto done;
}

done:
return rc;
}

int resource_reader_jgf_t::undo_vertices (resource_graph_t &g,
std::map<std::string, vmap_val_t> &vmap,
uint64_t jobid, bool rsv)
jgf_updater_data &update_data)
{
int rc = 0;
int rc2 = 0;
Expand All @@ -813,12 +904,12 @@ int resource_reader_jgf_t::undo_vertices (resource_graph_t &g,
continue;
try {
v = kv.second.v;
if (rsv) {
span = g[v].schedule.reservations.at (jobid);
g[v].schedule.reservations.erase (jobid);
if (update_data.reserved) {
span = g[v].schedule.reservations.at (update_data.jobid);
g[v].schedule.reservations.erase (update_data.jobid);
} else {
span = g[v].schedule.allocations.at (jobid);
g[v].schedule.allocations.erase (jobid);
span = g[v].schedule.allocations.at (update_data.jobid);
g[v].schedule.allocations.erase (update_data.jobid);
}

plans = g[v].schedule.plans;
Expand Down Expand Up @@ -886,9 +977,8 @@ int resource_reader_jgf_t::update_vertices (resource_graph_t &g,
resource_graph_metadata_t &m,
std::map<std::string,
vmap_val_t> &vmap,
json_t *nodes, int64_t jobid,
int64_t at, uint64_t dur,
bool rsv)
json_t *nodes,
jgf_updater_data &update_data)
{
int rc = -1;
unsigned int i = 0;
Expand All @@ -898,7 +988,7 @@ int resource_reader_jgf_t::update_vertices (resource_graph_t &g,
fetcher.scrub ();
if ( (rc = unpack_vtx (json_array_get (nodes, i), fetcher)) != 0)
goto done;
if ( (rc = update_vtx (g, m, vmap, fetcher, jobid, at, dur, rsv)) != 0)
if ( (rc = update_vtx (g, m, vmap, fetcher, update_data)) != 0)
goto done;
}
rc = 0;
Expand Down Expand Up @@ -1236,6 +1326,7 @@ int resource_reader_jgf_t::update (resource_graph_t &g,
json_t *nodes = NULL;
json_t *edges = NULL;
std::map<std::string, vmap_val_t> vmap;
jgf_updater_data update_data;

if (at < 0 || dur == 0) {
errno = EINVAL;
Expand All @@ -1245,10 +1336,18 @@ int resource_reader_jgf_t::update (resource_graph_t &g,
+ std::to_string (dur) + ").\n";
goto done;
}

// Fill in updater data
update_data.jobid = jobid;
update_data.at = at;
update_data.duration = dur;
update_data.reserved = rsv;
update_data.update = true;

if ( (rc = fetch_jgf (str, &jgf, &nodes, &edges)) != 0)
goto done;
if ( (rc = update_vertices (g, m, vmap, nodes, jobid, at, dur, rsv)) != 0) {
undo_vertices (g, vmap, jobid, rsv);
if ( (rc = update_vertices (g, m, vmap, nodes, update_data)) != 0) {
undo_vertices (g, vmap, update_data);
goto done;
}
if ( (rc = update_edges (g, m, vmap, edges, token)) != 0)
Expand Down Expand Up @@ -1301,8 +1400,43 @@ int resource_reader_jgf_t::partial_cancel (resource_graph_t &g,
std::unordered_map<std::string, int64_t> &type_to_count,
const std::string &R, int64_t jobid)
{
errno = ENOTSUP; // JGF reader does not support partial cancel
return -1;
int rc = -1;
json_t *jgf = NULL;
json_t *nodes = NULL;
json_t *edges = NULL;
std::map<std::string, vmap_val_t> vmap;
jgf_updater_data p_cancel_data;

if (jobid <= 0) {
errno = EINVAL;
m_err_msg += __FUNCTION__;
m_err_msg += ": invalid jobid\n";
goto done;
}

// Fill in updater data
p_cancel_data.jobid = jobid;
p_cancel_data.update = false;

if ( (rc = fetch_jgf (R, &jgf, &nodes, &edges)) != 0)
goto done;
if ( (rc = update_vertices (g, m, vmap, nodes, p_cancel_data)) != 0)
goto done;

// Remove from by_rank map if all vertices in the rank were
// cancelled.
for (auto rank : p_cancel_data.ranks) {
auto existing_rank = m.by_rank.find (rank);
// We know it's in by_rank or else update_vertices would have failed
if (existing_rank->second.size () == 0)
m.by_rank.erase (existing_rank);
}

type_to_count = p_cancel_data.type_to_count;

done:
json_decref (jgf);
return rc;
}

bool resource_reader_jgf_t::is_allowlist_supported ()
Expand Down
34 changes: 23 additions & 11 deletions resource/readers/resource_reader_jgf.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@ struct vmap_val_t;
namespace Flux {
namespace resource_model {

// Struct to track data for updates
struct jgf_updater_data {
int64_t jobid = 0;
int64_t at = 0;
uint64_t duration = 0;
bool reserved = false;
// track counts of resources to be cancelled
std::unordered_map<std::string, int64_t> type_to_count;
// track count of rank vertices to determine if rank
// should be removed from by_rank map
std::unordered_set<int64_t> ranks;
bool update = true; // Updating or partial cancel
};

/*! JGF resource reader class.
*/
Expand Down Expand Up @@ -138,27 +151,26 @@ class resource_reader_jgf_t : public resource_reader_base_t {
std::map<std::string, vmap_val_t> &vmap,
const fetch_helper_t &fetcher, vtx_t &ret_v);
int update_vtx_plan (vtx_t v, resource_graph_t &g,
const fetch_helper_t &fetcher, uint64_t jobid,
int64_t at, uint64_t dur, bool rsv);
const fetch_helper_t &fetcher,
jgf_updater_data &update_data);
int cancel_vtx (vtx_t v, resource_graph_t &g,
resource_graph_metadata_t &m,
const fetch_helper_t &fetcher,
jgf_updater_data &update_data);
int update_vtx (resource_graph_t &g, resource_graph_metadata_t &m,
std::map<std::string, vmap_val_t> &vmap,
const fetch_helper_t &fetcher, uint64_t jobid, int64_t at,
uint64_t dur, bool rsv);
const fetch_helper_t &fetcher,
jgf_updater_data &updater_data);
int unpack_vertices (resource_graph_t &g, resource_graph_metadata_t &m,
std::map<std::string, vmap_val_t> &vmap,
json_t *nodes,
std::unordered_set<std::string> &added_vtcs);
int undo_vertices (resource_graph_t &g,
std::map<std::string, vmap_val_t> &vmap,
uint64_t jobid, bool rsv);
int update_vertices (resource_graph_t &g, resource_graph_metadata_t &m,
std::map<std::string, vmap_val_t> &vmap,
json_t *nodes, int64_t jobid, int64_t at,
uint64_t dur, bool rsv);
jgf_updater_data &updater_data);
int update_vertices (resource_graph_t &g, resource_graph_metadata_t &m,
std::map<std::string, vmap_val_t> &vmap,
json_t *nodes, int64_t jobid, int64_t at,
uint64_t dur);
json_t *nodes, jgf_updater_data &updater_data);
int unpack_edge (json_t *element, std::map<std::string, vmap_val_t> &vmap,
std::string &source, std::string &target, json_t **name);
int update_src_edge (resource_graph_t &g, resource_graph_metadata_t &m,
Expand Down

0 comments on commit a34e25e

Please sign in to comment.