From 2c6870ded475f1c1dab1f7b5703deebd2c9cf52c Mon Sep 17 00:00:00 2001 From: "Dong H. Ahn" Date: Tue, 16 Jun 2020 23:17:17 -0700 Subject: [PATCH] qmanager: integrate resource notification RPC Send sched-fluxion-resource.notify streaming RPC to get synchronized with sched-fluxion-resource for resource acquision and state change events. Delay its handshaking protocol with job-manager until receiving the first response of sched-fluxion-resource.notify. Upon receiving each response, run schedule loop as it represents a scheduleable resource event. Stop the reactor and module exit when receiving the ECANCELED response of sched-fluxion-resource.notify, which occurs when sched-fluxion-resource is unloaded. --- qmanager/modules/qmanager.cpp | 99 ++++++++++++++++++++++++++++++++++- 1 file changed, 98 insertions(+), 1 deletion(-) diff --git a/qmanager/modules/qmanager.cpp b/qmanager/modules/qmanager.cpp index d812461eb..4122cc4ff 100644 --- a/qmanager/modules/qmanager.cpp +++ b/qmanager/modules/qmanager.cpp @@ -53,10 +53,45 @@ using namespace Flux::cplusplus_wrappers; * * ******************************************************************************/ -struct qmanager_ctx_t : public qmanager_cb_ctx_t { +class fluxion_resource_interface_t { +public: + ~fluxion_resource_interface_t (); + int fetch_and_reset_notify_rc (); + int get_notify_rc () const; + void set_notify_rc (int rc); + flux_future_t *notify_f{nullptr}; +private: + int m_notify_rc = 0; +}; + +struct qmanager_ctx_t : public qmanager_cb_ctx_t, + public fluxion_resource_interface_t { flux_t *h; }; +fluxion_resource_interface_t::~fluxion_resource_interface_t () +{ + if (notify_f) + flux_future_destroy (notify_f); +} + +int fluxion_resource_interface_t::fetch_and_reset_notify_rc () +{ + int rc = m_notify_rc; + m_notify_rc = 0; + return rc; +} + +int fluxion_resource_interface_t::get_notify_rc () const +{ + return m_notify_rc; +} + +void fluxion_resource_interface_t::set_notify_rc (int rc) +{ + m_notify_rc = rc; +} + static int process_args (std::shared_ptr &ctx, int argc, char **argv) { @@ -132,6 +167,64 @@ static void set_default (std::shared_ptr &ctx) ctx->opts += ct_opts; } +static void update_on_resource_response (flux_future_t *f, void *arg) +{ + int rc = -1; + qmanager_ctx_t *ctx = static_cast (arg); + + if ( (rc = flux_rpc_get (f, NULL)) < 0) { + flux_log_error (ctx->h, + "%s: exiting due to sched-fluxion-resource.notify failure", + __FUNCTION__); + flux_reactor_stop (flux_get_reactor (ctx->h)); + goto out; + } + + for (auto &kv : ctx->queues) { + if ( (rc = kv.second->run_sched_loop (static_cast (ctx->h), + true)) < 0 + || (rc = qmanager_safe_cb_t::post_sched_loop (ctx->h, + ctx->schedutil, + ctx->queues)) < 0) { + flux_log_error (ctx->h, "%s: schedule loop", __FUNCTION__); + goto out; + } + } +out: + flux_future_reset (f); + ctx->set_notify_rc (rc); + return; +} + +static int handshake_resource (std::shared_ptr &ctx) +{ + int rc = -1; + + if ( !(ctx->notify_f = flux_rpc (ctx->h, "sched-fluxion-resource.notify", + NULL, + FLUX_NODEID_ANY, + FLUX_RPC_STREAMING))) { + flux_log_error (ctx->h, "%s: flux_rpc (notify)", __FUNCTION__); + goto out; + } + + update_on_resource_response (ctx->notify_f, ctx.get ()); + if ( (rc = ctx->fetch_and_reset_notify_rc ()) < 0) { + flux_log_error (ctx->h, "%s: update_on_resource_response", + __FUNCTION__); + goto out; + } + if ( (rc = flux_future_then (ctx->notify_f, + -1.0, + update_on_resource_response, + ctx.get ())) < 0) { + flux_log_error (ctx->h, "%s: flux_future_then", __FUNCTION__); + goto out; + } +out: + return rc; +} + static int handshake_jobmanager (std::shared_ptr &ctx) { int rc = -1; @@ -269,6 +362,10 @@ static int enforce_options (std::shared_ptr &ctx) flux_log_error (ctx->h, "%s: enforce_queues", __FUNCTION__); return rc; } + if ( (rc = handshake_resource (ctx)) < 0) { + flux_log_error (ctx->h, "%s: handshake_resource", __FUNCTION__); + return rc; + } if ( (rc = handshake_jobmanager (ctx)) < 0) { flux_log_error (ctx->h, "%s: handshake_jobmanager", __FUNCTION__); return rc;