Skip to content

Commit

Permalink
Merge pull request #1163 from milroy/partial-cancel
Browse files Browse the repository at this point in the history
Add support for broker rank-based partial release
  • Loading branch information
mergify[bot] authored Jul 10, 2024
2 parents 5e54a68 + a20ad21 commit 7234b37
Show file tree
Hide file tree
Showing 70 changed files with 8,679 additions and 276 deletions.
3 changes: 0 additions & 3 deletions qmanager/modules/qmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -580,9 +580,6 @@ static void qmanager_destroy (std::shared_ptr<qmanager_ctx_t> &ctx)
while ( (job = ctx->queues.at (kv.first)->pending_pop ())
!= nullptr)
flux_respond_error (ctx->h, job->msg, ENOSYS, "unloading");
while ( (job = ctx->queues.at (kv.first)->complete_pop ())
!= nullptr)
flux_respond_error (ctx->h, job->msg, ENOSYS, "unloading");
}
schedutil_destroy (ctx->schedutil);
flux_watcher_destroy (ctx->prep);
Expand Down
25 changes: 19 additions & 6 deletions qmanager/modules/qmanager_callbacks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,29 +292,42 @@ void qmanager_cb_t::jobmanager_free_cb (flux_t *h, const flux_msg_t *msg,
const char *R, void *arg)
{
flux_jobid_t id;
json_t *Res;
const char *Rstr = NULL;
qmanager_cb_ctx_t *ctx = nullptr;
ctx = static_cast<qmanager_cb_ctx_t *> (arg);
std::shared_ptr<queue_policy_base_t> queue;
std::string queue_name;

if (flux_request_unpack (msg, NULL, "{s:I}", "id", &id) < 0) {
if (flux_request_unpack (msg, NULL, "{s:I s:O}",
"id", &id, "R", &Res) < 0) {
flux_log_error (h, "%s: flux_request_unpack", __FUNCTION__);
return;
}
if (!(Rstr = json_dumps (Res, JSON_COMPACT))) {
errno = ENOMEM;
flux_log (h, LOG_ERR, "%s: json_dumps ", __FUNCTION__);
goto done;
}
if (ctx->find_queue (id, queue_name, queue) < 0) {
flux_log_error (h, "%s: can't find queue for job (id=%jd)",
__FUNCTION__, static_cast<intmax_t> (id));
return;
goto done;
}
if ((queue->remove (id)) < 0) {
if ( (queue->remove (static_cast<void *> (h), id, Rstr)) < 0) {
flux_log_error (h, "%s: remove (queue=%s id=%jd)", __FUNCTION__,
queue_name.c_str (), static_cast<intmax_t> (id));
return;
queue_name.c_str (), static_cast<intmax_t> (id));
goto done;
}
if (schedutil_free_respond (ctx->schedutil, msg) < 0) {
flux_log_error (h, "%s: schedutil_free_respond", __FUNCTION__);
return;
goto done;
}

done:
json_decref (Res);
free ((void *)Rstr);
return;
}

void qmanager_cb_t::jobmanager_cancel_cb (flux_t *h, const flux_msg_t *msg,
Expand Down
142 changes: 65 additions & 77 deletions qmanager/policies/base/queue_policy_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -492,49 +492,80 @@ class queue_policy_base_t : public resource_model::queue_adapter_base_t
* state. This queue becomes "schedulable" if pending job
* queue is not empty: i.e., is_schedulable() returns true;
*
* \param h Opaque handle. How it is used is an implementation
* detail. However, when it is used within a Flux's
* service module such as qmanager, it is expected
* to be a pointer to a flux_t object.
* \param id jobid of flux_jobid_t type.
* \param R Resource set for partial cancel
* \return 0 on success; -1 on error.
* ENOENT: unknown id.
*/
int remove (flux_jobid_t id)
int remove (void *h, flux_jobid_t id, const char *R)
{
int rc = -1;
std::shared_ptr<job_t> job = nullptr;
int rc = -1;
bool full_removal = false;

if (m_jobs.find (id) == m_jobs.end ()) {
errno = ENOENT;
goto out;
}
auto job_it = m_jobs.find (id);
if (job_it == m_jobs.end ()) {
errno = ENOENT;
goto out;
}

job = m_jobs[id];
switch (job->state) {
case job_state_kind_t::PENDING:
this->remove_pending(job.get ());
break;
case job_state_kind_t::ALLOC_RUNNING:
m_alloced.erase (job->t_stamps.running_ts);
// deliberately fall through
case job_state_kind_t::RUNNING:
m_running.erase (job->t_stamps.running_ts);
job->t_stamps.complete_ts = m_cq_cnt++;
job->state = job_state_kind_t::COMPLETE;
m_complete.insert (std::pair<uint64_t,
flux_jobid_t> (job->t_stamps.complete_ts,
job->id));
set_schedulability (true);
break;
default:
switch (job_it->second->state) {
case job_state_kind_t::PENDING:
this->remove_pending(job_it->second.get ());
break;
case job_state_kind_t::ALLOC_RUNNING:
// deliberately fall through
case job_state_kind_t::RUNNING:
if ( (rc = cancel (h, job_it->second->id, R, true, full_removal) != 0))
break;
if (full_removal) {
m_alloced.erase (job_it->second->t_stamps.running_ts);
m_running.erase (job_it->second->t_stamps.running_ts);
job_it->second->t_stamps.complete_ts = m_cq_cnt++;
job_it->second->state = job_state_kind_t::COMPLETE;
m_jobs.erase (job_it);
}
// with a job finishing or being canceled, restart the sched loop
cancel_sched_loop ();
// blocked jobs must be reconsidered after a job completes
// this covers cases where jobs that couldn't run because of an
// existing job's reservation can when it completes early
reconsider_blocked_jobs ();
rc = 0;
out:
return rc;
set_schedulability (true);
break;
default:
break;
}
cancel_sched_loop ();
// blocked jobs must be reconsidered after a job completes
// this covers cases where jobs that couldn't run because of an
// existing job's reservation can when it completes early
reconsider_blocked_jobs ();

rc = 0;
out:
return rc;
}

/*! Remove a job whose jobid is id from any internal queues
* (e.g., pending queue, running queue, and alloced queue.)
* If succeeds, it changes the pending queue or resource
* state. This queue becomes "schedulable" if pending job
* queue is not empty: i.e., is_schedulable() returns true;
*
* \param h Opaque handle. How it is used is an implementation
* detail. However, when it is used within a Flux's
* service module such as qmanager, it is expected
* to be a pointer to a flux_t object.
* \param id jobid of flux_jobid_t type.
* \param R Resource set for partial cancel
* \param noent_ok don't return an error on nonexistent jobid
* \param full_removal bool indictating whether the job is fully canceled
* \return 0 on success; -1 on error.
* ENOENT: unknown id.
*/
virtual int cancel (void *h, flux_jobid_t id, const char *R, bool noent_ok,
bool &full_removal)
{
full_removal = true;
return 0;
}

/*! Return true if this queue has become schedulable since
Expand Down Expand Up @@ -649,26 +680,6 @@ class queue_policy_base_t : public resource_model::queue_adapter_base_t
return job;
}

/*! Pop the first job from the internal completed job queue.
* The popped is completely graduated from the queue policy layer.
* \return a shared pointer pointing to a job_t object
* on success; nullptr when the queue is empty.
*/
std::shared_ptr<job_t> complete_pop () {
std::shared_ptr<job_t> job;
flux_jobid_t id;
if (m_complete.empty ())
return nullptr;
id = m_complete.begin ()->second;
if (m_jobs.find (id) == m_jobs.end ())
return nullptr;
job = m_jobs[id];
m_complete.erase (job->t_stamps.complete_ts);
m_jobs.erase (id);
return job;
}


/*! Pop the first job from the alloced job queue. The popped
* job still remains in the queue policy layer (i.e., in the
* internal running job queue).
Expand Down Expand Up @@ -939,28 +950,6 @@ class queue_policy_base_t : public resource_model::queue_adapter_base_t
return 0;
}

std::map<uint64_t, flux_jobid_t>::iterator
to_complete (std::map<uint64_t, flux_jobid_t>::iterator running_iter)
{
flux_jobid_t id = running_iter->second;
if (m_jobs.find (id) == m_jobs.end ()) {
errno = EINVAL;
return running_iter;
}

std::shared_ptr<job_t> job = m_jobs[id];
job->state = job_state_kind_t::COMPLETE;
job->t_stamps.complete_ts = m_cq_cnt++;
auto res = m_complete.insert (std::pair<uint64_t, flux_jobid_t>(
job->t_stamps.complete_ts, job->id));
if (!res.second) {
errno = ENOMEM;
return running_iter;
}
m_alloced.erase (job->t_stamps.running_ts);
return m_running.erase (running_iter);
}

job_map_iter to_rejected (job_map_iter pending_iter,
const std::string &note)
{
Expand Down Expand Up @@ -1008,7 +997,6 @@ class queue_policy_base_t : public resource_model::queue_adapter_base_t
unsigned int>> m_pending_reprio_provisional;
std::map<uint64_t, flux_jobid_t> m_running;
std::map<uint64_t, flux_jobid_t> m_alloced;
std::map<uint64_t, flux_jobid_t> m_complete;
std::map<uint64_t, flux_jobid_t> m_rejected;
std::map<uint64_t, flux_jobid_t> m_canceled;
std::map<flux_jobid_t, std::shared_ptr<job_t>> m_jobs;
Expand Down
3 changes: 2 additions & 1 deletion qmanager/policies/queue_policy_bf_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ class queue_policy_bf_base_t : public queue_policy_base_t
virtual int handle_match_success (flux_jobid_t jobid, const char *status,
const char *R, int64_t at, double ov);
virtual int handle_match_failure (flux_jobid_t jobid, int errcode);
int cancel (void *h, flux_jobid_t id, const char *R, bool noent_ok,
bool &full_removal) override;

protected:
unsigned int m_reservation_depth;
unsigned int m_max_reservation_depth = MAX_RESERVATION_DEPTH;

private:
int next_match_iter ();
int cancel_completed_jobs (void *h);
int cancel_reserved_jobs (void *h);
int allocate_orelse_reserve_jobs (void *h);
std::map<uint64_t, flux_jobid_t> m_reserved;
Expand Down
24 changes: 7 additions & 17 deletions qmanager/policies/queue_policy_bf_base_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,6 @@ namespace detail {
// Private Methods of Queue Policy Backfill Base
////////////////////////////////////////////////////////////////////////////////

template<class reapi_type>
int queue_policy_bf_base_t<reapi_type>::cancel_completed_jobs (void *h)
{
int rc = 0;
std::shared_ptr<job_t> job;

// Pop newly completed jobs (e.g., per a free request from
// job-manager as received by qmanager) to remove them from the
// resource infrastructure.
while ((job = complete_pop ()) != nullptr)
rc += reapi_type::cancel (h, job->id, true);
return rc;
}

template<class reapi_type>
int queue_policy_bf_base_t<reapi_type>::cancel_reserved_jobs (void *h)
{
Expand Down Expand Up @@ -116,6 +102,13 @@ int queue_policy_bf_base_t<reapi_type>::allocate_orelse_reserve_jobs (void *h)
return 0;
}

template<class reapi_type>
int queue_policy_bf_base_t<reapi_type>::cancel (void *h, flux_jobid_t id,
const char *R, bool noent_ok,
bool &full_removal)
{
return reapi_type::cancel (h, id, R, noent_ok, full_removal);
}

////////////////////////////////////////////////////////////////////////////////
// Public API of Queue Policy Backfill Base
Expand Down Expand Up @@ -227,9 +220,6 @@ int queue_policy_bf_base_t<reapi_type>::run_sched_loop (void *h, bool use_alloce
int rc = 0;
if (!is_sched_loop_active ()) {
set_schedulability (false);
rc = cancel_completed_jobs (h);
if (rc != 0)
return rc;
rc = cancel_reserved_jobs (h);
if (rc != 0)
return rc;
Expand Down
3 changes: 2 additions & 1 deletion qmanager/policies/queue_policy_fcfs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ class queue_policy_fcfs_t : public queue_policy_base_t
virtual int handle_match_success (flux_jobid_t jobid, const char *status,
const char *R, int64_t at, double ov);
virtual int handle_match_failure (flux_jobid_t jobid, int errcode);
int cancel (void *h, flux_jobid_t id, const char *R, bool noent_ok,
bool &full_removal) override;

private:
int cancel_completed_jobs (void *h);
int pack_jobs (json_t *jobs);
int allocate_jobs (void *h, bool use_alloced_queue);
bool m_queue_depth_limit = false;
Expand Down
21 changes: 7 additions & 14 deletions qmanager/policies/queue_policy_fcfs_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,6 @@ namespace detail {
// Private Methods of Queue Policy FCFS
////////////////////////////////////////////////////////////////////////////////

template<class reapi_type>
int queue_policy_fcfs_t<reapi_type>::cancel_completed_jobs (void *h)
{
int rc = 0;
std::shared_ptr<job_t> job;

// Pop newly completed jobs (e.g., per a free request from job-manager
// as received by qmanager) to remove them from the resource infrastructure.
while ((job = complete_pop ()) != nullptr)
rc += reapi_type::cancel (h, job->id, true);
return rc;
}

template<class reapi_type>
int queue_policy_fcfs_t<reapi_type>::pack_jobs (json_t *jobs)
{
Expand Down Expand Up @@ -157,6 +144,13 @@ int queue_policy_fcfs_t<reapi_type>::handle_match_failure (flux_jobid_t jobid, i
return 0;
}

template<class reapi_type>
int queue_policy_fcfs_t<reapi_type>::cancel (void *h, flux_jobid_t id,
const char *R, bool noent_ok,
bool &full_removal)
{
return reapi_type::cancel (h, id, R, noent_ok, full_removal);
}


////////////////////////////////////////////////////////////////////////////////
Expand All @@ -183,7 +177,6 @@ int queue_policy_fcfs_t<reapi_type>::run_sched_loop (void *h,
return 1;
int rc = 0;
set_schedulability (false);
rc = cancel_completed_jobs (h);
rc += allocate_jobs (h, use_alloced_queue);
return rc;
}
Expand Down
Loading

0 comments on commit 7234b37

Please sign in to comment.