Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Integrate Fluxion with flux-core's resource module #675

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash -e

if [ -z ${FLUXION_QMANAGER_RC_NOOP} ]; then
flux module remove sched-fluxion-qmanager
flux module remove -f sched-fluxion-qmanager
fi

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
# load it via FLUX_RC_EXTRA, the in-tree version can be loaded.

if [ -z ${FLUXION_RESOURCE_RC_NOOP} ]; then
# Unloading sched-simple as sched-fluxion-resource requires resource.acquire
# that is exclusively used by it.
flux module remove -f sched-simple
FLUXION_RESOURCE_OPTIONS=${FLUXION_RESOURCE_OPTIONS:-"load-whitelist=node,core,gpu"}
flux module reload -f sched-fluxion-resource ${FLUXION_RESOURCE_OPTIONS}
fi
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash -e

if [ -z ${FLUXION_RESOURCE_RC_NOOP} ]; then
flux module remove sched-fluxion-resource
flux module remove -f sched-fluxion-resource
fi

8 changes: 4 additions & 4 deletions etc/Makefile.am
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
dist_fluxrc1_SCRIPTS = \
sched-fluxion-resource-start \
sched-fluxion-qmanager-start
01-sched-fluxion-resource-start \
02-sched-fluxion-qmanager-start
dist_fluxrc3_SCRIPTS = \
sched-fluxion-resource-stop \
sched-fluxion-qmanager-stop
02-sched-fluxion-resource-stop \
01-sched-fluxion-qmanager-stop
EXTRA_DIST = sched-fluxion-qmanager.toml

105 changes: 104 additions & 1 deletion qmanager/modules/qmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,45 @@ using namespace Flux::cplusplus_wrappers;
* *
******************************************************************************/

struct qmanager_ctx_t : public qmanager_cb_ctx_t {
class fluxion_resource_interface_t {
public:
~fluxion_resource_interface_t ();
int fetch_and_reset_notify_rc ();
int get_notify_rc () const;
void set_notify_rc (int rc);
flux_future_t *notify_f{nullptr};
private:
int m_notify_rc = 0;
};

struct qmanager_ctx_t : public qmanager_cb_ctx_t,
public fluxion_resource_interface_t {
flux_t *h;
};

fluxion_resource_interface_t::~fluxion_resource_interface_t ()
{
if (notify_f)
flux_future_destroy (notify_f);
}

int fluxion_resource_interface_t::fetch_and_reset_notify_rc ()
{
int rc = m_notify_rc;
m_notify_rc = 0;
return rc;
}

int fluxion_resource_interface_t::get_notify_rc () const
{
return m_notify_rc;
}

void fluxion_resource_interface_t::set_notify_rc (int rc)
{
m_notify_rc = rc;
}

static int process_args (std::shared_ptr<qmanager_ctx_t> &ctx,
int argc, char **argv)
{
Expand Down Expand Up @@ -132,6 +167,64 @@ static void set_default (std::shared_ptr<qmanager_ctx_t> &ctx)
ctx->opts += ct_opts;
}

static void update_on_resource_response (flux_future_t *f, void *arg)
{
int rc = -1;
qmanager_ctx_t *ctx = static_cast<qmanager_ctx_t *> (arg);

if ( (rc = flux_rpc_get (f, NULL)) < 0) {
flux_log_error (ctx->h,
"%s: exiting due to sched-fluxion-resource.notify failure",
__FUNCTION__);
flux_reactor_stop (flux_get_reactor (ctx->h));
goto out;
}

for (auto &kv : ctx->queues) {
if ( (rc = kv.second->run_sched_loop (static_cast<void *> (ctx->h),
true)) < 0
|| (rc = qmanager_safe_cb_t::post_sched_loop (ctx->h,
ctx->schedutil,
ctx->queues)) < 0) {
flux_log_error (ctx->h, "%s: schedule loop", __FUNCTION__);
goto out;
}
}
out:
flux_future_reset (f);
ctx->set_notify_rc (rc);
return;
}

static int handshake_resource (std::shared_ptr<qmanager_ctx_t> &ctx)
{
int rc = -1;

if ( !(ctx->notify_f = flux_rpc (ctx->h, "sched-fluxion-resource.notify",
NULL,
FLUX_NODEID_ANY,
FLUX_RPC_STREAMING))) {
flux_log_error (ctx->h, "%s: flux_rpc (notify)", __FUNCTION__);
goto out;
}

update_on_resource_response (ctx->notify_f, ctx.get ());
if ( (rc = ctx->fetch_and_reset_notify_rc ()) < 0) {
flux_log_error (ctx->h, "%s: update_on_resource_response",
__FUNCTION__);
goto out;
}
if ( (rc = flux_future_then (ctx->notify_f,
-1.0,
update_on_resource_response,
ctx.get ())) < 0) {
flux_log_error (ctx->h, "%s: flux_future_then", __FUNCTION__);
goto out;
}
out:
return rc;
}

static int handshake_jobmanager (std::shared_ptr<qmanager_ctx_t> &ctx)
{
int rc = -1;
Expand Down Expand Up @@ -269,10 +362,20 @@ static int enforce_options (std::shared_ptr<qmanager_ctx_t> &ctx)
flux_log_error (ctx->h, "%s: enforce_queues", __FUNCTION__);
return rc;
}
if ( (rc = handshake_resource (ctx)) < 0) {
flux_log_error (ctx->h, "%s: handshake_resource", __FUNCTION__);
return rc;
}
flux_log (ctx->h, LOG_DEBUG,
"handshaking with sched-fluxion-resource completed");

if ( (rc = handshake_jobmanager (ctx)) < 0) {
flux_log_error (ctx->h, "%s: handshake_jobmanager", __FUNCTION__);
return rc;
}
flux_log (ctx->h, LOG_DEBUG,
"handshaking with job-manager completed");

return rc;
}

Expand Down
15 changes: 15 additions & 0 deletions qmanager/modules/qmanager_callbacks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,21 @@ void qmanager_safe_cb_t::jobmanager_exception_cb (flux_t *h, flux_jobid_t id,
exception_safe_wrapper.get_err_message ());
}

int qmanager_safe_cb_t::post_sched_loop (flux_t *h, schedutil_t *schedutil,
std::map<std::string,
std::shared_ptr<
queue_policy_base_t>> &queues)
{
int rc;
eh_wrapper_t exception_safe_wrapper;
rc = exception_safe_wrapper (qmanager_cb_t::post_sched_loop,
h, schedutil, queues);
if (exception_safe_wrapper.bad ())
flux_log_error (h, "%s: %s", __FUNCTION__,
exception_safe_wrapper.get_err_message ());
return rc;
}

/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/
4 changes: 4 additions & 0 deletions qmanager/modules/qmanager_callbacks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ struct qmanager_safe_cb_t : public qmanager_cb_t {
static void jobmanager_exception_cb (flux_t *h, flux_jobid_t id,
const char *type, int severity,
void *arg);
static int post_sched_loop (flux_t *h,
schedutil_t *schedutil,
std::map<std::string, std::shared_ptr<
Flux::queue_manager::queue_policy_base_t>> &queues);
};

#endif // #define QMANAGER_CALLBACKS_HPP
Expand Down
1 change: 1 addition & 0 deletions resource/modules/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ sched_fluxion_resource_la_CXXFLAGS = \
$(FLUX_CORE_CFLAGS)
sched_fluxion_resource_la_LIBADD = \
../libresource.la \
$(FLUX_IDSET_LIBS) \
$(FLUX_CORE_LIBS) \
$(DL_LIBS) \
$(HWLOC_LIBS) \
Expand Down
Loading