Skip to content

Commit

Permalink
readers: add partial cancel support for JGF
Browse files Browse the repository at this point in the history
  • Loading branch information
milroy committed Jun 17, 2024
1 parent c84b978 commit d51891a
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 37 deletions.
186 changes: 160 additions & 26 deletions resource/readers/resource_reader_jgf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -725,8 +725,7 @@ int resource_reader_jgf_t::find_vtx (resource_graph_t &g,

int resource_reader_jgf_t::update_vtx_plan (vtx_t v, resource_graph_t &g,
const fetch_helper_t &fetcher,
uint64_t jobid, int64_t at,
uint64_t dur, bool rsv)
jgf_updater_data &update_data)
{
int rc = -1;
int64_t span = -1;
Expand All @@ -739,7 +738,9 @@ int resource_reader_jgf_t::update_vtx_plan (vtx_t v, resource_graph_t &g,
m_err_msg += ": plan for " + g[v].name + " is null.\n";
goto done;
}
if ( (avail = planner_avail_resources_during (plans, at, dur)) == -1) {
if ( (avail = planner_avail_resources_during (plans,
update_data.at,
update_data.duration)) == -1) {
m_err_msg += __FUNCTION__;
m_err_msg += ": planner_avail_resource_during return -1 for ";
m_err_msg += g[v].name + ".\n";
Expand All @@ -749,16 +750,17 @@ int resource_reader_jgf_t::update_vtx_plan (vtx_t v, resource_graph_t &g,
if (fetcher.exclusive) {
// Update the vertex plan here (not in traverser code) so vertices
// that the traverser won't walk still get their plans updated.
if ( (span = planner_add_span (plans, at, dur,
static_cast<const uint64_t> (g[v].size))) == -1) {
if ( (span = planner_add_span (plans, update_data.at,
update_data.duration,
static_cast<const uint64_t> (g[v].size))) == -1) {
m_err_msg += __FUNCTION__;
m_err_msg += ": can't add span into " + g[v].name + ".\n";
goto done;
}
if (rsv)
g[v].schedule.reservations[jobid] = span;
if (update_data.reserved)
g[v].schedule.reservations[update_data.jobid] = span;

Check warning on line 761 in resource/readers/resource_reader_jgf.cpp

View check run for this annotation

Codecov / codecov/patch

resource/readers/resource_reader_jgf.cpp#L761

Added line #L761 was not covered by tests
else
g[v].schedule.allocations[jobid] = span;
g[v].schedule.allocations[update_data.jobid] = span;
} else {
if (avail < g[v].size) {
// if g[v] has already been allocated/reserved, this is an error
Expand All @@ -773,12 +775,98 @@ int resource_reader_jgf_t::update_vtx_plan (vtx_t v, resource_graph_t &g,
return rc;
}

int resource_reader_jgf_t::cancel_vtx (vtx_t vtx, resource_graph_t &g,

Check warning on line 778 in resource/readers/resource_reader_jgf.cpp

View check run for this annotation

Codecov / codecov/patch

resource/readers/resource_reader_jgf.cpp#L778

Added line #L778 was not covered by tests
resource_graph_metadata_t &m,
const fetch_helper_t &fetcher,
jgf_updater_data &update_data)
{
int rc = -1;
int64_t span = -1;
int64_t prev_avail = -1;
planner_multi_t *subtree_plan = NULL;
planner_t *x_checker = NULL;
planner_t *plans = NULL;
auto &job2span = g[vtx].idata.job2span;
auto &x_spans = g[vtx].idata.x_spans;
auto &tags = g[vtx].idata.tags;
std::map<int64_t, std::vector<vtx_t>>::iterator rank_vect;
std::vector<vtx_t>::iterator rank_position;

Check warning on line 793 in resource/readers/resource_reader_jgf.cpp

View check run for this annotation

Codecov / codecov/patch

resource/readers/resource_reader_jgf.cpp#L783-L793

Added lines #L783 - L793 were not covered by tests

// remove from aggregate filter
auto agg_span = job2span.find (update_data.jobid);
if (agg_span == job2span.end ())
goto ret;
if ((subtree_plan = g[vtx].idata.subplans["containment"]) == NULL)
goto ret;
if (planner_multi_rem_span (subtree_plan, agg_span->second) != 0) {
goto ret;

Check warning on line 802 in resource/readers/resource_reader_jgf.cpp

View check run for this annotation

Codecov / codecov/patch

resource/readers/resource_reader_jgf.cpp#L796-L802

Added lines #L796 - L802 were not covered by tests
}
// Delete from job2span tracker
job2span.erase (update_data.jobid);

Check warning on line 805 in resource/readers/resource_reader_jgf.cpp

View check run for this annotation

Codecov / codecov/patch

resource/readers/resource_reader_jgf.cpp#L805

Added line #L805 was not covered by tests

// remove from tx filter;
if (x_spans.find (update_data.jobid) == x_spans.end ()) {
errno = EINVAL;
goto ret;

Check warning on line 810 in resource/readers/resource_reader_jgf.cpp

View check run for this annotation

Codecov / codecov/patch

resource/readers/resource_reader_jgf.cpp#L808-L810

Added lines #L808 - L810 were not covered by tests
}

x_checker = g[vtx].idata.x_checker;
g[vtx].idata.tags.erase (update_data.jobid);
span = g[vtx].idata.x_spans[update_data.jobid];
g[vtx].idata.x_spans.erase (update_data.jobid);
if (planner_rem_span (x_checker, span) == -1) {
errno = EINVAL;
goto ret;

Check warning on line 819 in resource/readers/resource_reader_jgf.cpp

View check run for this annotation

Codecov / codecov/patch

resource/readers/resource_reader_jgf.cpp#L813-L819

Added lines #L813 - L819 were not covered by tests
}

// rem plan
if (g[vtx].schedule.allocations.find (update_data.jobid)
!= g[vtx].schedule.allocations.end ()) {
span = g[vtx].schedule.allocations[update_data.jobid];
g[vtx].schedule.allocations.erase (update_data.jobid);

Check warning on line 826 in resource/readers/resource_reader_jgf.cpp

View check run for this annotation

Codecov / codecov/patch

resource/readers/resource_reader_jgf.cpp#L823-L826

Added lines #L823 - L826 were not covered by tests
} else {
errno = EINVAL;
goto ret;

Check warning on line 829 in resource/readers/resource_reader_jgf.cpp

View check run for this annotation

Codecov / codecov/patch

resource/readers/resource_reader_jgf.cpp#L828-L829

Added lines #L828 - L829 were not covered by tests
}

plans = g[vtx].schedule.plans;
prev_avail = planner_avail_resources_at (plans, 0);
if (planner_rem_span (plans, span) == -1) {
errno = EINVAL;
goto ret;

Check warning on line 836 in resource/readers/resource_reader_jgf.cpp

View check run for this annotation

Codecov / codecov/patch

resource/readers/resource_reader_jgf.cpp#L832-L836

Added lines #L832 - L836 were not covered by tests
}
// Add the newly freed counts, Can't assume it freed everything.
update_data.type_to_count[g[vtx].type] += planner_avail_resources_at (plans, 0)
- prev_avail;
rank_vect = m.by_rank.find (g[vtx].rank);
if (rank_vect == m.by_rank.end ()) {
m_err_msg += __FUNCTION__;
m_err_msg += ": attepted to cancel rank that does not exist ";
m_err_msg += "in by_rank map\n";
goto ret;

Check warning on line 846 in resource/readers/resource_reader_jgf.cpp

View check run for this annotation

Codecov / codecov/patch

resource/readers/resource_reader_jgf.cpp#L839-L846

Added lines #L839 - L846 were not covered by tests
}
rank_position = std::find (rank_vect->second.begin (),
rank_vect->second.end (), vtx);
if (rank_position == rank_vect->second.end ()) {
m_err_msg += __FUNCTION__;
m_err_msg += ": attepted to remove vertex which does not exist ";
m_err_msg += "in by_rank map\n";
goto ret;

Check warning on line 854 in resource/readers/resource_reader_jgf.cpp

View check run for this annotation

Codecov / codecov/patch

resource/readers/resource_reader_jgf.cpp#L848-L854

Added lines #L848 - L854 were not covered by tests
}
rank_vect->second.erase (rank_position);
update_data.ranks.insert (g[vtx].rank);

Check warning on line 857 in resource/readers/resource_reader_jgf.cpp

View check run for this annotation

Codecov / codecov/patch

resource/readers/resource_reader_jgf.cpp#L856-L857

Added lines #L856 - L857 were not covered by tests

rc = 0;

Check warning on line 859 in resource/readers/resource_reader_jgf.cpp

View check run for this annotation

Codecov / codecov/patch

resource/readers/resource_reader_jgf.cpp#L859

Added line #L859 was not covered by tests

ret:
return rc;

Check warning on line 862 in resource/readers/resource_reader_jgf.cpp

View check run for this annotation

Codecov / codecov/patch

resource/readers/resource_reader_jgf.cpp#L861-L862

Added lines #L861 - L862 were not covered by tests
}

int resource_reader_jgf_t::update_vtx (resource_graph_t &g,
resource_graph_metadata_t &m,
std::map<std::string, vmap_val_t> &vmap,
const fetch_helper_t &fetcher,
uint64_t jobid, int64_t at,
uint64_t dur, bool rsv)
jgf_updater_data &update_data)
{
int rc = -1;
std::map<std::string, bool> root_checks;
Expand All @@ -791,16 +879,21 @@ int resource_reader_jgf_t::update_vtx (resource_graph_t &g,
goto done;
if ( (rc = update_vmap (vmap, v, root_checks, fetcher)) != 0)
goto done;
if ( (rc = update_vtx_plan (v, g, fetcher, jobid, at, dur, rsv)) != 0)
goto done;
if (update_data.update) {
if ( (rc = update_vtx_plan (v, g, fetcher, update_data)) != 0)
goto done;
} else {
if ( (rc = cancel_vtx (v, g, m, fetcher, update_data)) != 0)
goto done;

Check warning on line 887 in resource/readers/resource_reader_jgf.cpp

View check run for this annotation

Codecov / codecov/patch

resource/readers/resource_reader_jgf.cpp#L886-L887

Added lines #L886 - L887 were not covered by tests
}

done:
return rc;
}

int resource_reader_jgf_t::undo_vertices (resource_graph_t &g,
std::map<std::string, vmap_val_t> &vmap,
uint64_t jobid, bool rsv)
jgf_updater_data &update_data)
{
int rc = 0;
int rc2 = 0;
Expand All @@ -813,12 +906,12 @@ int resource_reader_jgf_t::undo_vertices (resource_graph_t &g,
continue;
try {
v = kv.second.v;
if (rsv) {
span = g[v].schedule.reservations.at (jobid);
g[v].schedule.reservations.erase (jobid);
if (update_data.reserved) {
span = g[v].schedule.reservations.at (update_data.jobid);
g[v].schedule.reservations.erase (update_data.jobid);

Check warning on line 911 in resource/readers/resource_reader_jgf.cpp

View check run for this annotation

Codecov / codecov/patch

resource/readers/resource_reader_jgf.cpp#L910-L911

Added lines #L910 - L911 were not covered by tests
} else {
span = g[v].schedule.allocations.at (jobid);
g[v].schedule.allocations.erase (jobid);
span = g[v].schedule.allocations.at (update_data.jobid);
g[v].schedule.allocations.erase (update_data.jobid);
}

plans = g[v].schedule.plans;
Expand Down Expand Up @@ -886,9 +979,8 @@ int resource_reader_jgf_t::update_vertices (resource_graph_t &g,
resource_graph_metadata_t &m,
std::map<std::string,
vmap_val_t> &vmap,
json_t *nodes, int64_t jobid,
int64_t at, uint64_t dur,
bool rsv)
json_t *nodes,
jgf_updater_data &update_data)
{
int rc = -1;
unsigned int i = 0;
Expand All @@ -898,7 +990,7 @@ int resource_reader_jgf_t::update_vertices (resource_graph_t &g,
fetcher.scrub ();
if ( (rc = unpack_vtx (json_array_get (nodes, i), fetcher)) != 0)
goto done;
if ( (rc = update_vtx (g, m, vmap, fetcher, jobid, at, dur, rsv)) != 0)
if ( (rc = update_vtx (g, m, vmap, fetcher, update_data)) != 0)
goto done;
}
rc = 0;
Expand Down Expand Up @@ -1236,6 +1328,7 @@ int resource_reader_jgf_t::update (resource_graph_t &g,
json_t *nodes = NULL;
json_t *edges = NULL;
std::map<std::string, vmap_val_t> vmap;
jgf_updater_data update_data;

if (at < 0 || dur == 0) {
errno = EINVAL;
Expand All @@ -1245,10 +1338,18 @@ int resource_reader_jgf_t::update (resource_graph_t &g,
+ std::to_string (dur) + ").\n";
goto done;
}

// Fill in updater data
update_data.jobid = jobid;
update_data.at = at;
update_data.duration = dur;
update_data.reserved = rsv;
update_data.update = true;

if ( (rc = fetch_jgf (str, &jgf, &nodes, &edges)) != 0)
goto done;
if ( (rc = update_vertices (g, m, vmap, nodes, jobid, at, dur, rsv)) != 0) {
undo_vertices (g, vmap, jobid, rsv);
if ( (rc = update_vertices (g, m, vmap, nodes, update_data)) != 0) {
undo_vertices (g, vmap, update_data);
goto done;
}
if ( (rc = update_edges (g, m, vmap, edges, token)) != 0)
Expand Down Expand Up @@ -1301,8 +1402,41 @@ int resource_reader_jgf_t::partial_cancel (resource_graph_t &g,
std::unordered_map<std::string, int64_t> &type_to_count,
const std::string &R, int64_t jobid)
{
errno = ENOTSUP; // JGF reader does not support partial cancel
return -1;
int rc = -1;
json_t *jgf = NULL;
json_t *nodes = NULL;
json_t *edges = NULL;
std::map<std::string, vmap_val_t> vmap;
jgf_updater_data p_cancel_data;

Check warning on line 1410 in resource/readers/resource_reader_jgf.cpp

View check run for this annotation

Codecov / codecov/patch

resource/readers/resource_reader_jgf.cpp#L1405-L1410

Added lines #L1405 - L1410 were not covered by tests

if (jobid <= 0) {
errno = EINVAL;
m_err_msg += __FUNCTION__;
m_err_msg += ": invalid jobid\n";
goto done;

Check warning on line 1416 in resource/readers/resource_reader_jgf.cpp

View check run for this annotation

Codecov / codecov/patch

resource/readers/resource_reader_jgf.cpp#L1412-L1416

Added lines #L1412 - L1416 were not covered by tests
}

// Fill in updater data
p_cancel_data.jobid = jobid;
p_cancel_data.update = false;

Check warning on line 1421 in resource/readers/resource_reader_jgf.cpp

View check run for this annotation

Codecov / codecov/patch

resource/readers/resource_reader_jgf.cpp#L1420-L1421

Added lines #L1420 - L1421 were not covered by tests

if ( (rc = fetch_jgf (R, &jgf, &nodes, &edges)) != 0)
goto done;
if ( (rc = update_vertices (g, m, vmap, nodes, p_cancel_data)) != 0)
goto done;

Check warning on line 1426 in resource/readers/resource_reader_jgf.cpp

View check run for this annotation

Codecov / codecov/patch

resource/readers/resource_reader_jgf.cpp#L1423-L1426

Added lines #L1423 - L1426 were not covered by tests

// Remove from by_rank map if all vertices in the rank were
// cancelled.
for (auto rank : p_cancel_data.ranks) {
auto existing_rank = m.by_rank.find (rank);

Check warning on line 1431 in resource/readers/resource_reader_jgf.cpp

View check run for this annotation

Codecov / codecov/patch

resource/readers/resource_reader_jgf.cpp#L1430-L1431

Added lines #L1430 - L1431 were not covered by tests
// We know it's in by_rank or else update_vertices would have failed
if (existing_rank->second.size () == 0)
m.by_rank.erase (existing_rank);

Check warning on line 1434 in resource/readers/resource_reader_jgf.cpp

View check run for this annotation

Codecov / codecov/patch

resource/readers/resource_reader_jgf.cpp#L1433-L1434

Added lines #L1433 - L1434 were not covered by tests
}

done:
json_decref (jgf);
return rc;

Check warning on line 1439 in resource/readers/resource_reader_jgf.cpp

View check run for this annotation

Codecov / codecov/patch

resource/readers/resource_reader_jgf.cpp#L1437-L1439

Added lines #L1437 - L1439 were not covered by tests
}

bool resource_reader_jgf_t::is_allowlist_supported ()
Expand Down
34 changes: 23 additions & 11 deletions resource/readers/resource_reader_jgf.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@ struct vmap_val_t;
namespace Flux {
namespace resource_model {

// Struct to track data for updates
struct jgf_updater_data {
int64_t jobid = 0;
int64_t at = 0;
uint64_t duration = 0;
bool reserved = false;
// track counts of resources to be cancelled
std::unordered_map<std::string, int64_t> type_to_count;
// track count of rank vertices to determine if rank
// should be removed from by_rank map
std::unordered_set<int64_t> ranks;
bool update = true; // Updating or partial cancel
};

/*! JGF resource reader class.
*/
Expand Down Expand Up @@ -138,27 +151,26 @@ class resource_reader_jgf_t : public resource_reader_base_t {
std::map<std::string, vmap_val_t> &vmap,
const fetch_helper_t &fetcher, vtx_t &ret_v);
int update_vtx_plan (vtx_t v, resource_graph_t &g,
const fetch_helper_t &fetcher, uint64_t jobid,
int64_t at, uint64_t dur, bool rsv);
const fetch_helper_t &fetcher,
jgf_updater_data &update_data);
int cancel_vtx (vtx_t v, resource_graph_t &g,
resource_graph_metadata_t &m,
const fetch_helper_t &fetcher,
jgf_updater_data &update_data);
int update_vtx (resource_graph_t &g, resource_graph_metadata_t &m,
std::map<std::string, vmap_val_t> &vmap,
const fetch_helper_t &fetcher, uint64_t jobid, int64_t at,
uint64_t dur, bool rsv);
const fetch_helper_t &fetcher,
jgf_updater_data &updater_data);
int unpack_vertices (resource_graph_t &g, resource_graph_metadata_t &m,
std::map<std::string, vmap_val_t> &vmap,
json_t *nodes,
std::unordered_set<std::string> &added_vtcs);
int undo_vertices (resource_graph_t &g,
std::map<std::string, vmap_val_t> &vmap,
uint64_t jobid, bool rsv);
int update_vertices (resource_graph_t &g, resource_graph_metadata_t &m,
std::map<std::string, vmap_val_t> &vmap,
json_t *nodes, int64_t jobid, int64_t at,
uint64_t dur, bool rsv);
jgf_updater_data &updater_data);
int update_vertices (resource_graph_t &g, resource_graph_metadata_t &m,
std::map<std::string, vmap_val_t> &vmap,
json_t *nodes, int64_t jobid, int64_t at,
uint64_t dur);
json_t *nodes, jgf_updater_data &updater_data);
int unpack_edge (json_t *element, std::map<std::string, vmap_val_t> &vmap,
std::string &source, std::string &target, json_t **name);
int update_src_edge (resource_graph_t &g, resource_graph_metadata_t &m,
Expand Down

0 comments on commit d51891a

Please sign in to comment.