Skip to content

Commit

Permalink
Merge pull request flux-framework#1211 from trws/process-blocked-in-c…
Browse files Browse the repository at this point in the history
…ancel

qmanager: always process unblocked jobs and make bf reactive during sched loop
  • Loading branch information
mergify[bot] authored Jun 5, 2024
2 parents 9bdd6d6 + 3ca473c commit e90f707
Show file tree
Hide file tree
Showing 10 changed files with 234 additions and 145 deletions.
81 changes: 81 additions & 0 deletions .clang-format
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
BasedOnStyle : google
SpaceBeforeParens : Always
IndentWidth : 4
BreakBeforeBraces : Custom
BraceWrapping :
BeforeElse : false
AfterFunction : true
UseTab: Never
AllowShortIfStatementsOnASingleLine : false
ConstructorInitializerAllOnOneLineOrOnePerLine : true
AllowShortFunctionsOnASingleLine : false
AllowShortLoopsOnASingleLine : false
BinPackParameters : false
BinPackArguments : false
AllowAllParametersOfDeclarationOnNextLine : false
AlignTrailingComments : true
ColumnLimit : 100

# do not put all arguments on one line unless it's the same line as the call
PenaltyBreakBeforeFirstCallParameter : 10000000
PenaltyReturnTypeOnItsOwnLine : 65000
PenaltyBreakString : 10

# preserve formatting of arguments to pack/unpack functions
# found by running: rg --only-matching --no-filename --no-line-number '(flux|json)(_[^ ,()]+)?_(un)?pack' | sort -u
WhitespaceSensitiveMacros :
- flux_conf_unpack
- flux_event_pack
- flux_event_publish_pack
- flux_event_unpack
- flux_job_result_get_unpack
- flux_jobspec1_attr_pack
- flux_jobspec1_attr_unpack
- flux_jobspec_info_unpack
- flux_jobtap_event_post_pack
- flux_kvs_lookup_get_unpack
- flux_kvs_lookup_unpack
- flux_kvs_txn_pack
- flux_lookup_get_unpack
- flux_mrpc_pack
- flux_msg_pack
- flux_msg_unpack
- flux_plugin_arg_pack
- flux_plugin_arg_unpack
- flux_plugin_args_unpack
- flux_plugin_conf_unpack
- flux_request_unpack
- flux_respond_pack
- flux_rpc_get_unpack
- flux_rpc_pack
- flux_shell_getopt_unpack
- flux_shell_info_unpack
- flux_shell_jobspec_info_unpack
- flux_shell_rank_info_unpack
- flux_shell_rpc_pack
- flux_shell_setopt_pack
- flux_shell_setopt_unpack
- flux_shell_task_info_unpack
- json_pack
- json_unpack

# treat foreach macros as for loops
ForEachMacros :
- json_array_foreach
- json_object_foreach

SortIncludes : false
BreakBeforeBinaryOperators : NonAssignment
AlignAfterOpenBracket: Align
AlignOperands : true
BreakBeforeTernaryOperators : true
SpaceBeforeSquareBrackets: false
IndentPPDirectives: None
NamespaceIndentation: None
SpaceAfterTemplateKeyword: false
DerivePointerAlignment: false
PointerAlignment: Right

#
# vi:tabstop=4 shiftwidth=4 expandtab ft=yaml
#
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ compile_flags.txt
.vscode
.idea
.clangd
.cache

# Rules to ignore auto-generated test harness scripts
test_*.t
Expand Down
8 changes: 5 additions & 3 deletions qmanager/modules/qmanager_callbacks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ int qmanager_cb_t::post_sched_loop (flux_t *h, schedutil_t *schedutil,
for (auto& kv: queues) {
const std::string &queue_name = kv.first;
std::shared_ptr<queue_policy_base_t> &queue = kv.second;
if (queue->is_sched_loop_active ())
continue;
while ( (job = queue->alloced_pop ()) != nullptr) {
if (schedutil_alloc_respond_success_pack (schedutil, job->msg,
job->schedule.R.c_str (),
Expand Down Expand Up @@ -405,9 +407,7 @@ void qmanager_cb_t::prep_watcher_cb (flux_reactor_t *r, flux_watcher_t *w,
ctx->pls_post_loop = false;
for (auto &kv: ctx->queues) {
std::shared_ptr<queue_policy_base_t> &queue = kv.second;
ctx->pls_sched_loop = ctx->pls_sched_loop
|| (queue->is_schedulable ()
&& !queue->is_sched_loop_active ());
ctx->pls_sched_loop = ctx->pls_sched_loop || queue->is_schedulable ();
ctx->pls_post_loop = ctx->pls_post_loop
|| queue->is_scheduled ();
}
Expand All @@ -429,6 +429,8 @@ void qmanager_cb_t::check_watcher_cb (flux_reactor_t *r, flux_watcher_t *w,
for (auto &kv: ctx->queues) {
std::shared_ptr<queue_policy_base_t> &queue = kv.second;
if (queue->run_sched_loop (static_cast<void *> (ctx->h), true) < 0) {
if (errno == EAGAIN)
continue;
flux_log_error (ctx->h, "%s: run_sched_loop", __FUNCTION__);
return;
}
Expand Down
15 changes: 14 additions & 1 deletion qmanager/policies/base/queue_policy_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,22 @@ class queue_policy_base_t : public resource_model::queue_adapter_base_t
std::string &ret_R) = 0;

/// @brief move any jobs in blocked state to pending
void reconsider_blocked_jobs () {
void process_provisional_reconsider () {
if (!m_pending_reconsider)
return;
m_pending_reconsider = false;
m_pending.merge (m_blocked);
assert (m_blocked.size () == 0);
}

/// @brief move any jobs in blocked state to pending
void reconsider_blocked_jobs () {
m_pending_reconsider = true;
if (!is_sched_loop_active ()) {
process_provisional_reconsider();
}
}

/*! Set queue parameters. Can be called multiple times.
*
* \param params comma-delimited key-value pairs string
Expand Down Expand Up @@ -547,6 +558,7 @@ class queue_policy_base_t : public resource_model::queue_adapter_base_t
if (prev && !m_sched_loop_active) {
rc = process_provisional_reprio ();
rc += process_provisional_cancel ();
process_provisional_reconsider ();
}
return rc;
}
Expand Down Expand Up @@ -982,6 +994,7 @@ class queue_policy_base_t : public resource_model::queue_adapter_base_t
std::map<std::vector<double>, flux_jobid_t> m_pending;
std::map<std::vector<double>, flux_jobid_t> m_pending_provisional;
std::map<uint64_t, flux_jobid_t> m_pending_cancel_provisional;
bool m_pending_reconsider = false;
std::map<uint64_t, std::pair<flux_jobid_t,
unsigned int>> m_pending_reprio_provisional;
std::map<uint64_t, flux_jobid_t> m_running;
Expand Down
12 changes: 3 additions & 9 deletions qmanager/policies/queue_policy_bf_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,11 @@ class queue_policy_bf_base_t : public queue_policy_base_t
private:
int cancel_completed_jobs (void *h);
int cancel_reserved_jobs (void *h);
std::map<std::vector<double>, flux_jobid_t>::iterator &
allocate_orelse_reserve (void *h, std::shared_ptr<job_t> job,
bool use_alloced_queue,
std::map<std::vector<double>,
flux_jobid_t>::iterator &iter);
std::map<std::vector<double>, flux_jobid_t>::iterator &
allocate (void *h, std::shared_ptr<job_t> job, bool use_alloced_queue,
std::map<std::vector<double>, flux_jobid_t>::iterator &iter);
int allocate_orelse_reserve_jobs (void *h, bool use_alloced_queue);
std::map<uint64_t, flux_jobid_t> m_reserved;
unsigned int m_reservation_cnt;
int m_reservation_cnt;
int m_scheduled_cnt;
decltype (m_pending)::iterator m_in_progress_iter = m_pending.end();
};

} // namespace Flux::queue_manager::detail
Expand Down
Loading

0 comments on commit e90f707

Please sign in to comment.