Skip to content

Commit

Permalink
qmanager: integrate resource notification RPC
Browse files Browse the repository at this point in the history
Send sched-fluxion-resource.notify streaming RPC
to get synchronized with sched-fluxion-resource
for resource acquision and state change events.

Delay its handshaking protocol with job-manager
until receiving the first response of
sched-fluxion-resource.notify.

Upon receiving each response, run schedule loop
as it represents a scheduleable resource event.

Stop the reactor and module exit when receiving
the ECANCELED response
of sched-fluxion-resource.notify, which
occurs when sched-fluxion-resource is unloaded.
  • Loading branch information
dongahn committed Jun 17, 2020
1 parent e6095f9 commit 2c6870d
Showing 1 changed file with 98 additions and 1 deletion.
99 changes: 98 additions & 1 deletion qmanager/modules/qmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,45 @@ using namespace Flux::cplusplus_wrappers;
* *
******************************************************************************/

struct qmanager_ctx_t : public qmanager_cb_ctx_t {
class fluxion_resource_interface_t {
public:
~fluxion_resource_interface_t ();
int fetch_and_reset_notify_rc ();
int get_notify_rc () const;
void set_notify_rc (int rc);
flux_future_t *notify_f{nullptr};
private:
int m_notify_rc = 0;
};

struct qmanager_ctx_t : public qmanager_cb_ctx_t,
public fluxion_resource_interface_t {
flux_t *h;
};

fluxion_resource_interface_t::~fluxion_resource_interface_t ()
{
if (notify_f)
flux_future_destroy (notify_f);
}

int fluxion_resource_interface_t::fetch_and_reset_notify_rc ()
{
int rc = m_notify_rc;
m_notify_rc = 0;
return rc;
}

int fluxion_resource_interface_t::get_notify_rc () const
{
return m_notify_rc;
}

void fluxion_resource_interface_t::set_notify_rc (int rc)
{
m_notify_rc = rc;
}

static int process_args (std::shared_ptr<qmanager_ctx_t> &ctx,
int argc, char **argv)
{
Expand Down Expand Up @@ -132,6 +167,64 @@ static void set_default (std::shared_ptr<qmanager_ctx_t> &ctx)
ctx->opts += ct_opts;
}

static void update_on_resource_response (flux_future_t *f, void *arg)
{
int rc = -1;
qmanager_ctx_t *ctx = static_cast<qmanager_ctx_t *> (arg);

if ( (rc = flux_rpc_get (f, NULL)) < 0) {
flux_log_error (ctx->h,
"%s: exiting due to sched-fluxion-resource.notify failure",
__FUNCTION__);
flux_reactor_stop (flux_get_reactor (ctx->h));
goto out;
}

for (auto &kv : ctx->queues) {
if ( (rc = kv.second->run_sched_loop (static_cast<void *> (ctx->h),
true)) < 0
|| (rc = qmanager_safe_cb_t::post_sched_loop (ctx->h,
ctx->schedutil,
ctx->queues)) < 0) {
flux_log_error (ctx->h, "%s: schedule loop", __FUNCTION__);
goto out;
}
}
out:
flux_future_reset (f);
ctx->set_notify_rc (rc);
return;
}

static int handshake_resource (std::shared_ptr<qmanager_ctx_t> &ctx)
{
int rc = -1;

if ( !(ctx->notify_f = flux_rpc (ctx->h, "sched-fluxion-resource.notify",
NULL,
FLUX_NODEID_ANY,
FLUX_RPC_STREAMING))) {
flux_log_error (ctx->h, "%s: flux_rpc (notify)", __FUNCTION__);
goto out;
}

update_on_resource_response (ctx->notify_f, ctx.get ());
if ( (rc = ctx->fetch_and_reset_notify_rc ()) < 0) {
flux_log_error (ctx->h, "%s: update_on_resource_response",
__FUNCTION__);
goto out;
}
if ( (rc = flux_future_then (ctx->notify_f,
-1.0,
update_on_resource_response,
ctx.get ())) < 0) {
flux_log_error (ctx->h, "%s: flux_future_then", __FUNCTION__);
goto out;
}
out:
return rc;
}

static int handshake_jobmanager (std::shared_ptr<qmanager_ctx_t> &ctx)
{
int rc = -1;
Expand Down Expand Up @@ -269,6 +362,10 @@ static int enforce_options (std::shared_ptr<qmanager_ctx_t> &ctx)
flux_log_error (ctx->h, "%s: enforce_queues", __FUNCTION__);
return rc;
}
if ( (rc = handshake_resource (ctx)) < 0) {
flux_log_error (ctx->h, "%s: handshake_resource", __FUNCTION__);
return rc;
}
if ( (rc = handshake_jobmanager (ctx)) < 0) {
flux_log_error (ctx->h, "%s: handshake_jobmanager", __FUNCTION__);
return rc;
Expand Down

0 comments on commit 2c6870d

Please sign in to comment.