diff --git a/etc/rc1.d/01-sched-fluxion b/etc/rc1.d/01-sched-fluxion index 2205e6de3..ab9c99d65 100755 --- a/etc/rc1.d/01-sched-fluxion +++ b/etc/rc1.d/01-sched-fluxion @@ -1,13 +1,21 @@ #!/bin/sh -e -test $(flux getattr rank) -eq 0 || exit 0 +if [ -z ${FLUXION_RESOURCE_RC_NOOP} ]; then + if [ $(flux getattr rank) -eq 0 ]; then + flux module reload -f sched-fluxion-resource \ + ${FLUXION_RESOURCE_OPTIONS:-"load-allowlist=node,core,gpu"} + fi +fi if [ -z ${FLUXION_RESOURCE_RC_NOOP} ]; then - flux module reload -f sched-fluxion-resource \ + if [ $(flux getattr rank) -eq 0 ]; then + flux module reload -f sched-fluxion-feasibility \ ${FLUXION_RESOURCE_OPTIONS:-"load-allowlist=node,core,gpu"} + fi fi if [ -z ${FLUXION_QMANAGER_RC_NOOP} ]; then - flux module reload -f sched-fluxion-qmanager ${FLUXION_QMANAGER_OPTIONS} + if [ $(flux getattr rank) -eq 0 ]; then + flux module reload -f sched-fluxion-qmanager ${FLUXION_QMANAGER_OPTIONS} + fi fi - diff --git a/etc/rc3.d/01-sched-fluxion b/etc/rc3.d/01-sched-fluxion index 776d937e9..e9c224e4e 100755 --- a/etc/rc3.d/01-sched-fluxion +++ b/etc/rc3.d/01-sched-fluxion @@ -1,6 +1,10 @@ #!/bin/sh -test $(flux getattr rank) -eq 0 || exit 0 +# Can be loaded on multiple ranks +flux module remove -f sched-fluxion-feasibility + +if [ $(flux getattr rank) -eq 0 ]; then + flux module remove -f sched-fluxion-qmanager + flux module remove -f sched-fluxion-resource +fi -flux module remove -f sched-fluxion-qmanager -flux module remove -f sched-fluxion-resource diff --git a/qmanager/modules/qmanager.cpp b/qmanager/modules/qmanager.cpp index 54ee5454a..4a5840c8a 100644 --- a/qmanager/modules/qmanager.cpp +++ b/qmanager/modules/qmanager.cpp @@ -305,42 +305,6 @@ static void status_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_ flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); } -static void feasibility_request_cb (flux_t *h, - flux_msg_handler_t *w, - const flux_msg_t *msg, - void *arg) -{ - size_t size = 0; - flux_future_t *f = nullptr; - const char *data = nullptr; - - if (flux_request_decode_raw (msg, nullptr, (const void **)&data, &size) < 0) - goto error; - if (!(f = flux_rpc_raw (h, - "sched-fluxion-resource.satisfiability", - data, - size, - FLUX_NODEID_ANY, - 0))) { - flux_log_error (h, "%s: flux_rpc (sched-fluxion-resource.satisfiability)", __FUNCTION__); - goto error; - } - if (flux_rpc_get_raw (f, (const void **)&data, &size) < 0) - goto error; - if (flux_respond_raw (h, msg, (const void *)data, size) < 0) { - flux_log_error (h, "%s: flux_respond_raw", __FUNCTION__); - goto error; - } - flux_log (h, LOG_DEBUG, "%s: feasibility succeeded", __FUNCTION__); - flux_future_destroy (f); - return; - -error: - if (flux_respond_error (h, msg, errno, flux_future_error_string (f)) < 0) - flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); - flux_future_destroy (f); -} - static void params_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { int saved_errno; @@ -599,7 +563,6 @@ static void qmanager_destroy (std::shared_ptr &ctx) static const struct flux_msg_handler_spec htab[] = { {FLUX_MSGTYPE_REQUEST, "sched.resource-status", status_request_cb, FLUX_ROLE_USER}, - {FLUX_MSGTYPE_REQUEST, "*.feasibility", feasibility_request_cb, FLUX_ROLE_USER}, {FLUX_MSGTYPE_REQUEST, "*.params", params_request_cb, FLUX_ROLE_USER}, FLUX_MSGHANDLER_TABLE_END, }; diff --git a/resource/modules/CMakeLists.txt b/resource/modules/CMakeLists.txt index bbf43989f..1b2dba4b9 100644 --- a/resource/modules/CMakeLists.txt +++ b/resource/modules/CMakeLists.txt @@ -1,11 +1,27 @@ -flux_add_plugin ( sched-fluxion-resource MODULE +add_library(sched-fluxion-resource-module SHARED ${CMAKE_CURRENT_SOURCE_DIR}/resource_match.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/resource_match.hpp ${CMAKE_CURRENT_SOURCE_DIR}/resource_match_opts.cpp ${CMAKE_CURRENT_SOURCE_DIR}/resource_match_opts.hpp ) -target_link_libraries (sched-fluxion-resource PRIVATE +target_link_libraries (sched-fluxion-resource-module PUBLIC resource PkgConfig::JANSSON PkgConfig::UUID cppwrappers ) +install(TARGETS sched-fluxion-resource-module LIBRARY) + +flux_add_plugin ( sched-fluxion-resource MODULE + ${CMAKE_CURRENT_SOURCE_DIR}/resource.cpp + ) +flux_add_plugin ( sched-fluxion-feasibility MODULE + ${CMAKE_CURRENT_SOURCE_DIR}/feasibility.cpp + ) + +target_link_libraries (sched-fluxion-resource PRIVATE + sched-fluxion-resource-module + ) +target_link_libraries (sched-fluxion-feasibility PRIVATE + sched-fluxion-resource-module + ) diff --git a/resource/modules/feasibility.cpp b/resource/modules/feasibility.cpp new file mode 100644 index 000000000..e397f5f6f --- /dev/null +++ b/resource/modules/feasibility.cpp @@ -0,0 +1,363 @@ +/*****************************************************************************\ + * Copyright 2014 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, LICENSE) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\*****************************************************************************/ + +#include "resource_match.hpp" + +MOD_NAME ("sched-fluxion-feasibility"); + +//////////////////////////////////////////////////////////////////////////////// +// Request Handler Prototypes +//////////////////////////////////////////////////////////////////////////////// + +static void feasibility_request_cb (flux_t *h, + flux_msg_handler_t *w, + const flux_msg_t *msg, + void *arg); + +static const struct flux_msg_handler_spec htab[] = + {{FLUX_MSGTYPE_REQUEST, "feasibility.check", feasibility_request_cb, 0}, + FLUX_MSGHANDLER_TABLE_END}; + +//////////////////////////////////////////////////////////////////////////////// +// Module Initialization Routines +//////////////////////////////////////////////////////////////////////////////// + +static void set_default_args (std::shared_ptr &ctx) +{ + resource_opts_t ct_opts; + ct_opts.set_load_format ("rv1exec"); + ct_opts.set_match_subsystems ("containment"); + ct_opts.set_match_policy ("first"); + ct_opts.set_prune_filters ("ALL:core"); + ct_opts.set_match_format ("rv1_nosched"); + ct_opts.set_update_interval (0); + ctx->opts += ct_opts; +} + +static std::shared_ptr getctx (flux_t *h) +{ + void *d = NULL; + std::shared_ptr ctx = nullptr; + if ((d = flux_aux_get (h, mod_name)) != NULL) + ctx = *(static_cast *> (d)); + if (!ctx) { + try { + ctx = std::make_shared (); + ctx->traverser = std::make_shared (); + ctx->db = std::make_shared (); + } catch (std::bad_alloc &e) { + errno = ENOMEM; + goto done; + } + ctx->h = h; + ctx->handlers = NULL; + set_default_args (ctx); + ctx->matcher = nullptr; /* Cannot be allocated at this point */ + ctx->writers = nullptr; /* Cannot be allocated at this point */ + ctx->reader = nullptr; /* Cannot be allocated at this point */ + ctx->m_resources_updated = false; + ctx->m_resources_down_updated = true; + ctx->m_resources_alloc_updated = std::chrono::system_clock::now (); + ctx->m_get_up_down_updates = false; + ctx->m_acquire_resources_from_core = false; + } + +done: + return ctx; +} + +static int process_args (std::shared_ptr &ctx, int argc, char **argv) +{ + int rc = 0; + optmgr_kv_t opts_store; + std::string info_str = ""; + + for (int i = 0; i < argc; i++) { + const std::string kv (argv[i]); + if ((rc = opts_store.put (kv)) < 0) { + flux_log_error (ctx->h, "%s: optmgr_kv_t::put (%s)", __FUNCTION__, argv[i]); + return rc; + } + } + if ((rc = opts_store.parse (info_str)) < 0) { + flux_log_error (ctx->h, "%s: optmgr_kv_t::parse: %s", __FUNCTION__, info_str.c_str ()); + return rc; + } + if (info_str != "") { + flux_log (ctx->h, LOG_DEBUG, "%s: %s", __FUNCTION__, info_str.c_str ()); + } + ctx->opts += opts_store.get_opt (); + return rc; +} + +static int process_config_file (std::shared_ptr &ctx) +{ + int rc = 0; + json_t *conf = nullptr; + + if ((rc = flux_conf_unpack (flux_get_conf (ctx->h), nullptr, "{ s?:o }", mod_name, &conf)) + < 0) { + flux_log_error (ctx->h, "%s: flux_conf_unpack", __FUNCTION__); + return rc; + } + + const char *k = nullptr; + char *tmp = nullptr; + json_t *v = nullptr; + optmgr_kv_t opts_store; + std::string info_str = ""; + json_object_foreach (conf, k, v) { + std::string value; + if (!(tmp = json_dumps (v, JSON_ENCODE_ANY | JSON_COMPACT))) { + errno = ENOMEM; + return -1; + } + value = tmp; + free (tmp); + tmp = nullptr; + if (json_typeof (v) == JSON_STRING) + value = value.substr (1, value.length () - 2); + if ((rc = opts_store.put (k, value)) < 0) { + flux_log_error (ctx->h, + "%s: optmgr_kv_t::put (%s, %s)", + __FUNCTION__, + k, + value.c_str ()); + return rc; + } + } + if ((rc = opts_store.parse (info_str)) < 0) { + flux_log_error (ctx->h, "%s: optmgr_kv_t::parse: %s", __FUNCTION__, info_str.c_str ()); + return rc; + } + if (info_str != "") { + flux_log (ctx->h, LOG_DEBUG, "%s: %s", __FUNCTION__, info_str.c_str ()); + } + ctx->opts += opts_store.get_opt (); + return rc; +} + +static std::shared_ptr init_module (flux_t *h, int argc, char **argv) +{ + std::shared_ptr ctx = nullptr; + flux_future_t *f = nullptr; + uint32_t rank = 1; + + if (!(ctx = getctx (h))) { + flux_log (h, LOG_ERR, "%s: can't allocate the context", __FUNCTION__); + return nullptr; + } + if (flux_get_rank (h, &rank) < 0) { + flux_log (h, LOG_ERR, "%s: can't determine rank", __FUNCTION__); + goto error; + } + if (process_config_file (ctx) < 0) { + flux_log_error (h, "%s: config file parsing", __FUNCTION__); + goto error; + } + if (process_args (ctx, argc, argv) < 0) { + flux_log_error (h, "%s: load line argument parsing", __FUNCTION__); + goto error; + } + ctx->opts.canonicalize (); + + // Register feasibility service + f = flux_service_register (h, "feasibility"); + if (flux_future_get (f, NULL) < 0) { + flux_log_error (h, "%s: error registering feasibility service", __FUNCTION__); + flux_future_destroy (f); + goto error; + } else { + flux_log (h, LOG_DEBUG, "service registered"); + flux_future_destroy (f); + } + // Register feasibility handlers + if (flux_msg_handler_addvec (h, htab, (void *)h, &ctx->handlers) < 0) { + flux_log_error (h, "%s: error registering resource event handler", __FUNCTION__); + goto error; + } + return ctx; + +error: + return nullptr; +} + +//////////////////////////////////////////////////////////////////////////////// +// Request Handler Routines +//////////////////////////////////////////////////////////////////////////////// + +static void feasibility_request_cb (flux_t *h, + flux_msg_handler_t *w, + const flux_msg_t *msg, + void *arg) +{ + int64_t at = 0; + int64_t now = 0; + double overhead = 0.0f; + int saved_errno = 0; + std::stringstream R; + json_t *jobspec = nullptr; + const char *js_str = nullptr; + std::string errmsg; + flux_error_t error; + std::shared_ptr ctx = getctx ((flux_t *)arg); + + if (flux_request_unpack (msg, NULL, "{s:o}", "jobspec", &jobspec) < 0) + goto error; + if (!(js_str = json_dumps (jobspec, JSON_INDENT (0)))) { + errno = ENOMEM; + goto error; + } + error.text[0] = '\0'; + if (run_match (ctx, -1, "satisfiability", js_str, &now, &at, &overhead, R, &error) < 0) { + if (errno == ENODEV) + errmsg = "Unsatisfiable request"; + else { + errmsg = "Internal match error: "; + errmsg += error.text; + } + goto error_memfree; + } + free ((void *)js_str); + if (flux_respond (h, msg, NULL) < 0) + flux_log_error (h, "%s: flux_respond", __FUNCTION__); + return; + +error_memfree: + saved_errno = errno; + free ((void *)js_str); + errno = saved_errno; +error: + if (flux_respond_error (h, msg, errno, errmsg.c_str ()) < 0) + flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); +} + +//////////////////////////////////////////////////////////////////////////////// +// Resource Graph and Traverser Initialization +//////////////////////////////////////////////////////////////////////////////// + +static int init_resource_graph (std::shared_ptr &ctx) +{ + int rc = 0; + + // The feasibility module should only use FIRST. Exclusivity has no effect. + if (!(ctx->matcher = create_match_cb (FIRST_MATCH))) { + flux_log (ctx->h, LOG_ERR, "%s: can't create match callback", __FUNCTION__); + return -1; + } + if ((rc = populate_resource_db (ctx)) != 0) { + flux_log (ctx->h, LOG_ERR, "%s: can't populate graph resource database", __FUNCTION__); + return rc; + } + if ((rc = select_subsystems (ctx)) != 0) { + flux_log (ctx->h, + LOG_ERR, + "%s: error processing subsystems %s", + __FUNCTION__, + ctx->opts.get_opt ().get_match_subsystems ().c_str ()); + return rc; + } + + // Create a writers object for matched vertices and edges + match_format_t format = + match_writers_factory_t::get_writers_type (ctx->opts.get_opt ().get_match_format ()); + if (!(ctx->writers = match_writers_factory_t::create (format))) + return -1; + + // TODO remove? + if (ctx->opts.get_opt ().is_prune_filters_set () + && ctx->matcher->set_pruning_types_w_spec (ctx->matcher->dom_subsystem (), + ctx->opts.get_opt ().get_prune_filters ()) + < 0) { + flux_log (ctx->h, + LOG_ERR, + "%s: error setting pruning types with: %s", + __FUNCTION__, + ctx->opts.get_opt ().get_prune_filters ().c_str ()); + return -1; + } + + // Initialize the DFU traverser + if (ctx->traverser->initialize (ctx->db, ctx->matcher) < 0) { + flux_log (ctx->h, LOG_ERR, "%s: traverser initialization", __FUNCTION__); + return -1; + } + + // prevent users from consuming unbounded memory with arbitrary resource types + subsystem_t::storage_t::finalize (); + resource_type_t::storage_t::finalize (); + return 0; +} + +//////////////////////////////////////////////////////////////////////////////// +// Module Main +//////////////////////////////////////////////////////////////////////////////// + +extern "C" int mod_main (flux_t *h, int argc, char **argv) +{ + int rc = -1; + + flux_log (h, LOG_INFO, "version %s", PACKAGE_VERSION); + + try { + std::shared_ptr ctx = nullptr; + uint32_t rank = 1; + + if (!(ctx = init_module (h, argc, argv))) { + flux_log (h, LOG_ERR, "%s: can't initialize feasibility module", __FUNCTION__); + goto done; + } + + // Because mod_main is always active, the following is safe. + flux_aux_set (h, mod_name, &ctx, NULL); + flux_log (h, LOG_DEBUG, "%s: feasibility module starting", __FUNCTION__); + + /* Before beginning synchronous resource.acquire RPC, set module status + * to 'running' to let flux module load return success. + */ + if ((rc = flux_module_set_running (ctx->h)) < 0) { + flux_log_error (ctx->h, "%s: flux_module_set_running", __FUNCTION__); + goto done; + } + if ((rc = init_resource_graph (ctx)) != 0) { + flux_log (h, LOG_ERR, "%s: can't initialize resource graph database", __FUNCTION__); + goto done; + } + flux_log (h, LOG_DEBUG, "%s: resource graph database loaded", __FUNCTION__); + + if ((rc = flux_reactor_run (flux_get_reactor (h), 0)) < 0) { + flux_log (h, LOG_ERR, "%s: flux_reactor_run: %s", __FUNCTION__, strerror (errno)); + goto done; + } + + // Unregister feasibility service + flux_future_t *f = nullptr; + f = flux_service_unregister (h, "feasibility"); + if (flux_future_get (f, NULL) < 0) + flux_log_error (h, "Failed to unregister feasibility service"); + flux_future_destroy (f); + + } catch (std::exception &e) { + errno = ENOSYS; + flux_log (h, LOG_ERR, "%s: %s", __FUNCTION__, e.what ()); + return -1; + } catch (...) { + errno = ENOSYS; + flux_log (h, LOG_ERR, "%s: caught unknown exception", __FUNCTION__); + return -1; + } + +done: + return rc; +} + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ diff --git a/resource/modules/resource.cpp b/resource/modules/resource.cpp new file mode 100644 index 000000000..cef34c876 --- /dev/null +++ b/resource/modules/resource.cpp @@ -0,0 +1,1420 @@ +/*****************************************************************************\ + * Copyright 2014 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, LICENSE) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\*****************************************************************************/ + +#include "resource_match.hpp" + +MOD_NAME ("sched-fluxion-resource"); + +//////////////////////////////////////////////////////////////////////////////// +// Request Handler Prototypes +//////////////////////////////////////////////////////////////////////////////// + +static void match_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg); + +static void match_multi_request_cb (flux_t *h, + flux_msg_handler_t *w, + const flux_msg_t *msg, + void *arg); + +static void update_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg); + +static void cancel_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg); + +static void partial_cancel_request_cb (flux_t *h, + flux_msg_handler_t *w, + const flux_msg_t *msg, + void *arg); + +static void info_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg); + +static void stat_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg); + +static void stat_clear_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg); + +static void next_jobid_request_cb (flux_t *h, + flux_msg_handler_t *w, + const flux_msg_t *msg, + void *arg); + +static void set_property_request_cb (flux_t *h, + flux_msg_handler_t *w, + const flux_msg_t *msg, + void *arg); + +static void get_property_request_cb (flux_t *h, + flux_msg_handler_t *w, + const flux_msg_t *msg, + void *arg); + +static void notify_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg); + +static void disconnect_request_cb (flux_t *h, + flux_msg_handler_t *w, + const flux_msg_t *msg, + void *arg); + +static void find_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg); + +static void status_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg); + +static void ns_info_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg); + +static void params_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg); + +static void set_status_request_cb (flux_t *h, + flux_msg_handler_t *w, + const flux_msg_t *msg, + void *arg); + +static const struct flux_msg_handler_spec htab[] = + {{FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.match", match_request_cb, 0}, + {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.match_multi", match_multi_request_cb, 0}, + {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.update", update_request_cb, 0}, + {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.cancel", cancel_request_cb, 0}, + {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.partial-cancel", partial_cancel_request_cb, 0}, + {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.info", info_request_cb, 0}, + {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.stats-get", stat_request_cb, FLUX_ROLE_USER}, + {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.stats-clear", stat_clear_cb, FLUX_ROLE_USER}, + {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.next_jobid", next_jobid_request_cb, 0}, + {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.set_property", set_property_request_cb, 0}, + {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.get_property", get_property_request_cb, 0}, + {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.notify", notify_request_cb, 0}, + {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.disconnect", disconnect_request_cb, 0}, + {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.find", find_request_cb, 0}, + {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.status", status_request_cb, 0}, + {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.ns-info", ns_info_request_cb, 0}, + {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.params", params_request_cb, 0}, + {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.set_status", set_status_request_cb, 0}, + FLUX_MSGHANDLER_TABLE_END}; + +//////////////////////////////////////////////////////////////////////////////// +// Module Initialization Routines +//////////////////////////////////////////////////////////////////////////////// + +static void set_default_args (std::shared_ptr &ctx) +{ + resource_opts_t ct_opts; + ct_opts.set_load_format ("rv1exec"); + ct_opts.set_match_subsystems ("containment"); + ct_opts.set_match_policy ("first"); + ct_opts.set_prune_filters ("ALL:core"); + ct_opts.set_match_format ("rv1_nosched"); + ct_opts.set_update_interval (0); + ctx->opts += ct_opts; +} + +static std::shared_ptr getctx (flux_t *h) +{ + void *d = NULL; + std::shared_ptr ctx = nullptr; + if ((d = flux_aux_get (h, mod_name)) != NULL) + ctx = *(static_cast *> (d)); + if (!ctx) { + try { + ctx = std::make_shared (); + ctx->traverser = std::make_shared (); + ctx->db = std::make_shared (); + } catch (std::bad_alloc &e) { + errno = ENOMEM; + goto done; + } + ctx->h = h; + ctx->handlers = NULL; + set_default_args (ctx); + ctx->matcher = nullptr; /* Cannot be allocated at this point */ + ctx->writers = nullptr; /* Cannot be allocated at this point */ + ctx->reader = nullptr; /* Cannot be allocated at this point */ + ctx->m_resources_updated = true; + ctx->m_resources_down_updated = true; + ctx->m_resources_alloc_updated = std::chrono::system_clock::now (); + ctx->m_get_up_down_updates = true; + ctx->m_acquire_resources_from_core = true; + } + +done: + return ctx; +} + +static int process_args (std::shared_ptr &ctx, int argc, char **argv) +{ + int rc = 0; + optmgr_kv_t opts_store; + std::string info_str = ""; + + for (int i = 0; i < argc; i++) { + const std::string kv (argv[i]); + if ((rc = opts_store.put (kv)) < 0) { + flux_log_error (ctx->h, "%s: optmgr_kv_t::put (%s)", __FUNCTION__, argv[i]); + return rc; + } + } + if ((rc = opts_store.parse (info_str)) < 0) { + flux_log_error (ctx->h, "%s: optmgr_kv_t::parse: %s", __FUNCTION__, info_str.c_str ()); + return rc; + } + if (info_str != "") { + flux_log (ctx->h, LOG_DEBUG, "%s: %s", __FUNCTION__, info_str.c_str ()); + } + ctx->opts += opts_store.get_opt (); + return rc; +} + +static int process_config_file (std::shared_ptr &ctx) +{ + int rc = 0; + json_t *conf = nullptr; + + if ((rc = flux_conf_unpack (flux_get_conf (ctx->h), nullptr, "{ s?:o }", mod_name, &conf)) + < 0) { + flux_log_error (ctx->h, "%s: flux_conf_unpack", __FUNCTION__); + return rc; + } + + const char *k = nullptr; + char *tmp = nullptr; + json_t *v = nullptr; + optmgr_kv_t opts_store; + std::string info_str = ""; + flux_log (ctx->h, LOG_DEBUG, "%s: process_config_file", __FUNCTION__); + json_object_foreach (conf, k, v) { + flux_log (ctx->h, LOG_DEBUG, "%s: %s", __FUNCTION__, k); + + std::string value; + if (!(tmp = json_dumps (v, JSON_ENCODE_ANY | JSON_COMPACT))) { + errno = ENOMEM; + return -1; + } + value = tmp; + free (tmp); + tmp = nullptr; + if (json_typeof (v) == JSON_STRING) + value = value.substr (1, value.length () - 2); + if ((rc = opts_store.put (k, value)) < 0) { + flux_log_error (ctx->h, + "%s: optmgr_kv_t::put (%s, %s)", + __FUNCTION__, + k, + value.c_str ()); + return rc; + } + } + if ((rc = opts_store.parse (info_str)) < 0) { + flux_log_error (ctx->h, "%s: optmgr_kv_t::parse: %s", __FUNCTION__, info_str.c_str ()); + return rc; + } + if (info_str != "") { + flux_log (ctx->h, LOG_DEBUG, "%s: %s", __FUNCTION__, info_str.c_str ()); + } + ctx->opts += opts_store.get_opt (); + return rc; +} + +static std::shared_ptr init_module (flux_t *h, int argc, char **argv) +{ + std::shared_ptr ctx = nullptr; + uint32_t rank = 1; + + if (!(ctx = getctx (h))) { + flux_log (h, LOG_ERR, "%s: can't allocate the context", __FUNCTION__); + return nullptr; + } + if (flux_get_rank (h, &rank) < 0) { + flux_log (h, LOG_ERR, "%s: can't determine rank", __FUNCTION__); + goto error; + } + if (process_config_file (ctx) < 0) { + flux_log_error (h, "%s: config file parsing", __FUNCTION__); + goto error; + } + if (process_args (ctx, argc, argv) < 0) { + flux_log_error (h, "%s: load line argument parsing", __FUNCTION__); + goto error; + } + ctx->opts.canonicalize (); + + if (rank) { + flux_log (h, LOG_ERR, "%s: resource module must only run on rank 0", __FUNCTION__); + goto error; + } + if (flux_msg_handler_addvec (h, htab, (void *)h, &ctx->handlers) < 0) { + flux_log_error (h, "%s: error registering resource event handler", __FUNCTION__); + goto error; + } + return ctx; + +error: + return nullptr; +} + +//////////////////////////////////////////////////////////////////////////////// +// Request Handler Routines +//////////////////////////////////////////////////////////////////////////////// + +static void update_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) +{ + char *R = NULL; + int64_t at = 0; + double overhead = 0.0f; + int64_t jobid = 0; + uint64_t duration = 0; + std::string status = ""; + std::stringstream o; + std::chrono::time_point start; + std::chrono::duration elapsed; + + std::shared_ptr ctx = getctx ((flux_t *)arg); + if (flux_request_unpack (msg, NULL, "{s:I s:s}", "jobid", &jobid, "R", &R) < 0) { + flux_log_error (ctx->h, "%s: flux_request_unpack", __FUNCTION__); + goto error; + } + if (is_existent_jobid (ctx, jobid)) { + int rc = 0; + start = std::chrono::system_clock::now (); + if ((rc = Rlite_equal (ctx, R, ctx->jobs[jobid]->R.c_str ())) < 0) { + flux_log_error (ctx->h, "%s: Rlite_equal", __FUNCTION__); + goto error; + } else if (rc == 1) { + errno = EINVAL; + flux_log (ctx->h, + LOG_ERR, + "%s: jobid (%jd) with different R exists!", + __FUNCTION__, + static_cast (jobid)); + goto error; + } + elapsed = std::chrono::system_clock::now () - start; + // If a jobid with matching R exists, no need to update + overhead = elapsed.count (); + get_jobstate_str (ctx->jobs[jobid]->state, status); + o << ctx->jobs[jobid]->R; + at = ctx->jobs[jobid]->scheduled_at; + flux_log (ctx->h, + LOG_DEBUG, + "%s: jobid (%jd) with matching R exists", + __FUNCTION__, + static_cast (jobid)); + } else if (run_update (ctx, jobid, R, at, overhead, o) < 0) { + flux_log_error (ctx->h, + "%s: update failed (id=%jd)", + __FUNCTION__, + static_cast (jobid)); + goto error; + } + + if (status == "") + status = get_status_string (at, at); + + if (flux_respond_pack (h, + msg, + "{s:I s:s s:f s:s s:I}", + "jobid", + jobid, + "status", + status.c_str (), + "overhead", + overhead, + "R", + o.str ().c_str (), + "at", + at) + < 0) + flux_log_error (h, "%s", __FUNCTION__); + + return; + +error: + if (flux_respond_error (h, msg, errno, NULL) < 0) + flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); +} + +static void match_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) +{ + int64_t at = 0; + int64_t now = 0; + int64_t jobid = -1; + double overhead = 0.0f; + std::string status = ""; + const char *cmd = NULL; + const char *js_str = NULL; + std::stringstream R; + + std::shared_ptr ctx = getctx ((flux_t *)arg); + if (flux_request_unpack (msg, + NULL, + "{s:s s:I s:s}", + "cmd", + &cmd, + "jobid", + &jobid, + "jobspec", + &js_str) + < 0) + goto error; + if (is_existent_jobid (ctx, jobid)) { + errno = EINVAL; + flux_log_error (h, "%s: existent job (%jd).", __FUNCTION__, (intmax_t)jobid); + goto error; + } + if (run_match (ctx, jobid, cmd, js_str, &now, &at, &overhead, R, NULL) < 0) { + if (errno != EBUSY && errno != ENODEV) + flux_log_error (ctx->h, + "%s: match failed due to match error (id=%jd)", + __FUNCTION__, + (intmax_t)jobid); + // The resources couldn't be allocated *or reserved* + // Kicking back to qmanager, remove from tracking + if (errno == EBUSY) { + ctx->jobs.erase (jobid); + } + goto error; + } + + status = get_status_string (now, at); + if (flux_respond_pack (h, + msg, + "{s:I s:s s:f s:s s:I}", + "jobid", + jobid, + "status", + status.c_str (), + "overhead", + overhead, + "R", + R.str ().c_str (), + "at", + at) + < 0) + flux_log_error (h, "%s", __FUNCTION__); + + return; + +error: + if (flux_respond_error (h, msg, errno, NULL) < 0) + flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); +} + +static void match_multi_request_cb (flux_t *h, + flux_msg_handler_t *w, + const flux_msg_t *msg, + void *arg) +{ + size_t index; + json_t *value; + json_error_t err; + int saved_errno; + json_t *jobs = nullptr; + uint64_t jobid = 0; + std::string errmsg; + const char *cmd = nullptr; + const char *jobs_str = nullptr; + std::shared_ptr ctx = getctx ((flux_t *)arg); + + if (!flux_msg_is_streaming (msg)) { + errno = EPROTO; + goto error; + } + if (flux_request_unpack (msg, NULL, "{s:s s:s}", "cmd", &cmd, "jobs", &jobs_str) < 0) + goto error; + if (!(jobs = json_loads (jobs_str, 0, &err))) { + errno = ENOMEM; + goto error; + } + + json_array_foreach (jobs, index, value) { + const char *js_str; + int64_t at = 0; + int64_t now = 0; + double overhead = 0.0f; + std::string status = ""; + std::stringstream R; + + if (json_unpack (value, "{s:I s:s}", "jobid", &jobid, "jobspec", &js_str) < 0) + goto error; + if (is_existent_jobid (ctx, jobid)) { + errno = EINVAL; + flux_log_error (h, + "%s: existent job (%jd).", + __FUNCTION__, + static_cast (jobid)); + goto error; + } + if (run_match (ctx, jobid, cmd, js_str, &now, &at, &overhead, R, NULL) < 0) { + if (errno != EBUSY && errno != ENODEV) + flux_log_error (ctx->h, + "%s: match failed due to match error (id=%jd)", + __FUNCTION__, + static_cast (jobid)); + // The resources couldn't be allocated *or reserved* + // Kicking back to qmanager, remove from tracking + if (errno == EBUSY) { + ctx->jobs.erase (jobid); + } + goto error; + } + + status = get_status_string (now, at); + if (flux_respond_pack (h, + msg, + "{s:I s:s s:f s:s s:I}", + "jobid", + jobid, + "status", + status.c_str (), + "overhead", + overhead, + "R", + R.str ().c_str (), + "at", + at) + < 0) { + flux_log_error (h, "%s", __FUNCTION__); + goto error; + } + } + errno = ENODATA; + jobid = 0; +error: + if (jobs) { + saved_errno = errno; + json_decref (jobs); + errno = saved_errno; + } + if (jobid != 0) + errmsg += "jobid=" + std::to_string (jobid); + if (flux_respond_error (h, msg, errno, !errmsg.empty () ? errmsg.c_str () : nullptr) < 0) + flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); +} + +static void cancel_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) +{ + std::shared_ptr ctx = getctx ((flux_t *)arg); + int64_t jobid = -1; + char *R = NULL; + bool full_removal = true; + + if (flux_request_unpack (msg, NULL, "{s:I}", "jobid", &jobid) < 0) + goto error; + if (ctx->allocations.find (jobid) != ctx->allocations.end ()) + ctx->allocations.erase (jobid); + else if (ctx->reservations.find (jobid) != ctx->reservations.end ()) + ctx->reservations.erase (jobid); + else { + errno = ENOENT; + flux_log (h, LOG_DEBUG, "%s: nonexistent job (id=%jd)", __FUNCTION__, (intmax_t)jobid); + goto error; + } + + if (run_remove (ctx, jobid, R, false, full_removal) < 0) { + flux_log_error (h, + "%s: remove fails due to match error (id=%jd)", + __FUNCTION__, + (intmax_t)jobid); + goto error; + } + if (flux_respond_pack (h, msg, "{}") < 0) + flux_log_error (h, "%s", __FUNCTION__); + + return; + +error: + if (flux_respond_error (h, msg, errno, NULL) < 0) + flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); +} + +static void partial_cancel_request_cb (flux_t *h, + flux_msg_handler_t *w, + const flux_msg_t *msg, + void *arg) +{ + std::shared_ptr ctx = getctx ((flux_t *)arg); + int64_t jobid = -1; + char *R = NULL; + decltype (ctx->allocations)::iterator jobid_it; + bool full_removal = false; + int int_full_removal = 0; + + if (flux_request_unpack (msg, NULL, "{s:I s:s}", "jobid", &jobid, "R", &R) < 0) + goto error; + + jobid_it = ctx->allocations.find (jobid); + if (jobid_it == ctx->allocations.end ()) { + errno = ENOENT; + flux_log (h, + LOG_DEBUG, + "%s: job (id=%jd) not found in allocations", + __FUNCTION__, + (intmax_t)jobid); + goto error; + } + + if (run_remove (ctx, jobid, R, true, full_removal) < 0) { + flux_log_error (h, + "%s: remove fails due to match error (id=%jd)", + __FUNCTION__, + (intmax_t)jobid); + goto error; + } + int_full_removal = full_removal; + if (flux_respond_pack (h, msg, "{s:i}", "full-removal", int_full_removal) < 0) + flux_log_error (h, "%s", __FUNCTION__); + + if (full_removal) + ctx->allocations.erase (jobid_it); + + return; + +error: + if (flux_respond_error (h, msg, errno, NULL) < 0) + flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); +} + +static void info_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) +{ + std::shared_ptr ctx = getctx ((flux_t *)arg); + int64_t jobid = -1; + std::shared_ptr info = NULL; + std::string status = ""; + + if (flux_request_unpack (msg, NULL, "{s:I}", "jobid", &jobid) < 0) + goto error; + if (!is_existent_jobid (ctx, jobid)) { + errno = ENOENT; + flux_log (h, LOG_DEBUG, "%s: nonexistent job (id=%jd)", __FUNCTION__, (intmax_t)jobid); + goto error; + } + + info = ctx->jobs[jobid]; + get_jobstate_str (info->state, status); + if (flux_respond_pack (h, + msg, + "{s:I s:s s:I s:f}", + "jobid", + jobid, + "status", + status.c_str (), + "at", + info->scheduled_at, + "overhead", + info->overhead) + < 0) + flux_log_error (h, "%s", __FUNCTION__); + + return; + +error: + if (flux_respond_error (h, msg, errno, NULL) < 0) + flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); +} + +static int get_stat_by_rank (std::shared_ptr &ctx, json_t *o) +{ + int rc = -1; + int saved_errno = 0; + char *str = nullptr; + struct idset *ids = nullptr; + std::map s2r; + + for (auto &kv : ctx->db->metadata.by_rank) { + if (kv.first == -1) + continue; + if (s2r.find (kv.second.size ()) == s2r.end ()) { + if (!(ids = idset_create (0, IDSET_FLAG_AUTOGROW))) + goto done; + s2r[kv.second.size ()] = ids; + } + if ((rc = idset_set (s2r[kv.second.size ()], static_cast (kv.first))) < 0) + goto done; + } + + for (auto &kv : s2r) { + if (!(str = idset_encode (kv.second, IDSET_FLAG_BRACKETS | IDSET_FLAG_RANGE))) { + rc = -1; + goto done; + } + if ((rc = json_object_set_new (o, str, json_integer (static_cast (kv.first)))) + < 0) { + errno = ENOMEM; + goto done; + } + saved_errno = errno; + free (str); + errno = saved_errno; + str = nullptr; + } + +done: + for (auto &kv : s2r) + idset_destroy (kv.second); + saved_errno = errno; + s2r.clear (); + free (str); + errno = saved_errno; + return rc; +} + +static void stat_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) +{ + std::shared_ptr ctx = getctx ((flux_t *)arg); + int saved_errno; + json::value o; + json_t *match_succeeded = nullptr; + json_t *match_failed = nullptr; + double avg = 0.0f; + double min = 0.0f; + double variance = 0.0f; + // Failed match stats + double avg_failed = 0.0f; + double min_failed = 0.0f; + double variance_failed = 0.0f; + int64_t graph_uptime_s = 0; + int64_t time_since_reset_s = 0; + std::chrono::time_point now; + + if (perf.succeeded.njobs_reset > 1) { + avg = perf.succeeded.avg; + min = perf.succeeded.min; + // Welford's online algorithm + variance = perf.succeeded.M2 / (double)perf.succeeded.njobs_reset; + } + if (perf.failed.njobs_reset > 1) { + avg_failed = perf.failed.avg; + min_failed = perf.failed.min; + // Welford's online algorithm + variance_failed = perf.failed.M2 / (double)perf.failed.njobs_reset; + } + if (!(o = json::value::take (json_object ()))) { + errno = ENOMEM; + goto error; + } + if (get_stat_by_rank (ctx, o.get ()) < 0) { + flux_log_error (h, "%s: get_stat_by_rank", __FUNCTION__); + goto error; + } + + if (!(match_succeeded = json_pack ("{s:I s:I s:I s:I s:{s:f s:f s:f s:f}}", + "njobs", + perf.succeeded.njobs, + "njobs-reset", + perf.succeeded.njobs_reset, + "max-match-jobid", + perf.succeeded.max_match_jobid, + "max-match-iters", + perf.succeeded.match_iter_count, + "stats", + "min", + min, + "max", + perf.succeeded.max, + "avg", + avg, + "variance", + variance))) { + errno = ENOMEM; + goto error; + } + if (!(match_failed = json_pack ("{s:I s:I s:I s:I s:{s:f s:f s:f s:f}}", + "njobs", + perf.failed.njobs, + "njobs-reset", + perf.failed.njobs_reset, + "max-match-jobid", + perf.failed.max_match_jobid, + "max-match-iters", + perf.failed.match_iter_count, + "stats", + "min", + min_failed, + "max", + perf.failed.max, + "avg", + avg_failed, + "variance", + variance_failed))) { + errno = ENOMEM; + goto error; + } + now = std::chrono::system_clock::now (); + graph_uptime_s = + std::chrono::duration_cast (now - perf.graph_uptime).count (); + time_since_reset_s = + std::chrono::duration_cast (now - perf.time_of_last_reset).count (); + + if (flux_respond_pack (h, + msg, + "{s:I s:I s:O s:f s:I s:I s:{s:O s:O}}", + "V", + num_vertices (ctx->db->resource_graph), + "E", + num_edges (ctx->db->resource_graph), + "by_rank", + o.get (), + "load-time", + perf.load, + "graph-uptime", + graph_uptime_s, + "time-since-reset", + time_since_reset_s, + "match", + "succeeded", + match_succeeded, + "failed", + match_failed) + < 0) { + flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); + } + json_decref (match_succeeded); + json_decref (match_failed); + + return; + +error: + saved_errno = errno; + json_decref (match_succeeded); + json_decref (match_failed); + errno = saved_errno; + if (flux_respond_error (h, msg, errno, NULL) < 0) + flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); +} + +static void stat_clear_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) +{ + std::shared_ptr ctx = getctx ((flux_t *)arg); + + perf.time_of_last_reset = std::chrono::system_clock::now (); + // Clear the jobs-related stats and reset time + perf.succeeded.njobs_reset = 0; + perf.succeeded.max_match_jobid = -1; + perf.succeeded.match_iter_count = -1; + perf.succeeded.min = std::numeric_limits::max (); + perf.succeeded.max = 0.0; + perf.succeeded.avg = 0.0; + perf.succeeded.M2 = 0.0; + // Failed match stats + perf.failed.njobs_reset = 0; + perf.failed.max_match_jobid = -1; + perf.failed.match_iter_count = -1; + perf.failed.min = std::numeric_limits::max (); + perf.failed.max = 0.0; + perf.failed.avg = 0.0; + perf.failed.M2 = 0.0; + + if (flux_respond (h, msg, NULL) < 0) + flux_log_error (h, "%s: flux_respond", __FUNCTION__); +} + +static inline int64_t next_jobid (const std::map> &m) +{ + int64_t jobid = -1; + if (m.empty ()) + jobid = 0; + else if (m.rbegin ()->first < INT64_MAX) + jobid = m.rbegin ()->first + 1; + return jobid; +} + +/* Needed for testing only */ +static void next_jobid_request_cb (flux_t *h, + flux_msg_handler_t *w, + const flux_msg_t *msg, + void *arg) +{ + std::shared_ptr ctx = getctx ((flux_t *)arg); + int64_t jobid = -1; + + if ((jobid = next_jobid (ctx->jobs)) < 0) { + errno = ERANGE; + goto error; + } + if (flux_respond_pack (h, msg, "{s:I}", "jobid", jobid) < 0) + flux_log_error (h, "%s", __FUNCTION__); + + return; + +error: + if (flux_respond_error (h, msg, errno, NULL) < 0) + flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); +} + +static void set_property_request_cb (flux_t *h, + flux_msg_handler_t *w, + const flux_msg_t *msg, + void *arg) +{ + const char *rp = NULL, *kv = NULL; + std::string resource_path = "", keyval = ""; + std::string property_key = "", property_value = ""; + size_t pos; + std::shared_ptr ctx = getctx ((flux_t *)arg); + std::map>::const_iterator it; + std::pair::iterator, bool> ret; + vtx_t v; + + if (flux_request_unpack (msg, NULL, "{s:s s:s}", "sp_resource_path", &rp, "sp_keyval", &kv) < 0) + goto error; + + resource_path = rp; + keyval = kv; + + pos = keyval.find ('='); + + if (pos == 0 || (pos == keyval.size () - 1) || pos == std::string::npos) { + errno = EINVAL; + flux_log_error (h, "%s: Incorrect format.", __FUNCTION__); + flux_log_error (h, "%s: Use set-property PROPERTY=VALUE", __FUNCTION__); + goto error; + } + + property_key = keyval.substr (0, pos); + property_value = keyval.substr (pos + 1); + + it = ctx->db->metadata.by_path.find (resource_path); + + if (it == ctx->db->metadata.by_path.end ()) { + errno = ENOENT; + flux_log_error (h, + "%s: Couldn't find %s in resource graph.", + __FUNCTION__, + resource_path.c_str ()); + goto error; + } + + for (auto &v : it->second) { + ret = ctx->db->resource_graph[v].properties.insert ( + std::pair (property_key, property_value)); + + if (ret.second == false) { + ctx->db->resource_graph[v].properties.erase (property_key); + ctx->db->resource_graph[v].properties.insert ( + std::pair (property_key, property_value)); + } + } + + if (flux_respond_pack (h, msg, "{}") < 0) + flux_log_error (h, "%s", __FUNCTION__); + + return; + +error: + if (flux_respond_error (h, msg, errno, NULL) < 0) + flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); +} + +static void get_property_request_cb (flux_t *h, + flux_msg_handler_t *w, + const flux_msg_t *msg, + void *arg) +{ + const char *rp = NULL, *gp_key = NULL; + std::string resource_path = "", property_key = ""; + std::shared_ptr ctx = getctx ((flux_t *)arg); + std::map>::const_iterator it; + std::map::const_iterator p_it; + vtx_t v; + std::vector resp_values; + json_t *resp_array = nullptr; + + if (flux_request_unpack (msg, NULL, "{s:s s:s}", "gp_resource_path", &rp, "gp_key", &gp_key) + < 0) + goto error; + + resource_path = rp; + property_key = gp_key; + + it = ctx->db->metadata.by_path.find (resource_path); + + if (it == ctx->db->metadata.by_path.end ()) { + errno = ENOENT; + flux_log_error (h, + "%s: Couldn't find %s in resource graph.", + __FUNCTION__, + resource_path.c_str ()); + goto error; + } + + for (auto &v : it->second) { + for (p_it = ctx->db->resource_graph[v].properties.begin (); + p_it != ctx->db->resource_graph[v].properties.end (); + p_it++) { + if (property_key.compare (p_it->first) == 0) + resp_values.push_back (p_it->second); + } + } + if (resp_values.empty ()) { + errno = ENOENT; + flux_log_error (h, + "%s: Property %s was not found for resource %s.", + __FUNCTION__, + property_key.c_str (), + resource_path.c_str ()); + goto error; + } + + if (!(resp_array = json_array ())) { + errno = ENOMEM; + goto error; + } + for (auto &resp_value : resp_values) { + json_t *value = nullptr; + if (!(value = json_string (resp_value.c_str ()))) { + errno = EINVAL; + goto error; + } + if (json_array_append_new (resp_array, value) < 0) { + json_decref (value); + errno = EINVAL; + goto error; + } + } + if (flux_respond_pack (h, msg, "{s:o}", "values", resp_array) < 0) + flux_log_error (h, "%s", __FUNCTION__); + + return; + +error: + if (flux_respond_error (h, msg, errno, NULL) < 0) + flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); +} + +static void disconnect_request_cb (flux_t *h, + flux_msg_handler_t *w, + const flux_msg_t *msg, + void *arg) +{ + const char *route; + std::shared_ptr ctx = getctx ((flux_t *)arg); + + if (!(route = flux_msg_route_first (msg))) { + flux_log_error (h, "%s: flux_msg_route_first", __FUNCTION__); + return; + } + if (ctx->notify_msgs.find (route) != ctx->notify_msgs.end ()) { + ctx->notify_msgs.erase (route); + flux_log (h, LOG_DEBUG, "%s: a notify request aborted", __FUNCTION__); + } +} + +static void notify_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) +{ + try { + const char *route; + std::shared_ptr ctx = getctx ((flux_t *)arg); + std::shared_ptr m = std::make_shared (); + + if (flux_request_decode (msg, NULL, NULL) < 0) { + flux_log_error (h, "%s: flux_request_decode", __FUNCTION__); + goto error; + } + if (!flux_msg_is_streaming (msg)) { + errno = EPROTO; + flux_log_error (h, "%s: streaming flag not set", __FUNCTION__); + goto error; + } + if (!(route = flux_msg_route_first (msg))) { + flux_log_error (h, "%s: flux_msg_route_first", __FUNCTION__); + goto error; + } + if (ctx->opts.get_opt ().is_load_file_set ()) { + errno = ENODATA; + // Since m_acquired_resources is null, + flux_log_error (ctx->h, "%s: cannot notify when load-file set", __FUNCTION__); + goto error; + } + + // Respond only after sched-fluxion-resource gets + // resources from its resource.acquire RPC. + // This is guaranteed by the order of mod_main: + // init_resource_graph runs before flux_reactor_run. + if (flux_respond_pack (ctx->h, + msg, + "{s:O s:f}", + "resources", + ctx->m_acquired_resources.get (), + "expiration", + ctx->m_acquired_resources_expiration) + < 0) { + flux_log_error (ctx->h, "%s: flux_respond_pack", __FUNCTION__); + goto error; + } + + // Add msg as a subscriber to resource UP/DOWN updates + m->set_msg (msg); + auto ret = ctx->notify_msgs.insert ( + std::pair> (route, m)); + if (!ret.second) { + errno = EEXIST; + flux_log_error (h, "%s: insert", __FUNCTION__); + goto error; + } + + } catch (std::bad_alloc &e) { + errno = ENOMEM; + } + return; + +error: + if (flux_respond_error (h, msg, errno, NULL) < 0) + flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); +} + +static void find_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) +{ + json_t *R = nullptr; + int saved_errno; + const char *criteria = nullptr; + const char *format_str = "rv1_nosched"; + std::shared_ptr ctx = getctx ((flux_t *)arg); + + if (flux_request_unpack (msg, + nullptr, + "{s:s, s?:s}", + "criteria", + &criteria, + "format", + &format_str) + < 0) + goto error; + + if (run_find (ctx, criteria, format_str, &R) < 0) + goto error; + if (flux_respond_pack (h, msg, "{s:o?}", "R", R) < 0) { + flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); + goto error; + } + + flux_log (h, LOG_DEBUG, "%s: find succeeded", __FUNCTION__); + return; + +error: + saved_errno = errno; + json_decref (R); + errno = saved_errno; + if (flux_respond_error (h, msg, errno, nullptr) < 0) + flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); +} + +static void status_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) +{ + int saved_errno; + json_t *R_all = nullptr; + json_t *R_down = nullptr; + json_t *R_alloc = nullptr; + std::chrono::time_point now; + std::chrono::duration elapsed; + std::shared_ptr ctx = getctx ((flux_t *)arg); + + now = std::chrono::system_clock::now (); + elapsed = now - ctx->m_resources_alloc_updated; + // Get R alloc whenever m_resources_alloc_updated or + // the elapsed time is greater than configured limit + if ((elapsed.count () > static_cast (ctx->opts.get_opt ().get_update_interval ())) + || ctx->m_resources_updated) { + if (run_find (ctx, "sched-now=allocated", "rv1_nosched", &R_alloc) < 0) + goto error; + ctx->m_r_alloc = json_deep_copy (R_alloc); + ctx->m_resources_alloc_updated = std::chrono::system_clock::now (); + } else + R_alloc = json_deep_copy (ctx->m_r_alloc.get ()); + + if (ctx->m_resources_updated) { + if (run_find (ctx, "status=up or status=down", "rv1_nosched", &R_all) < 0) + goto error; + ctx->m_r_all = json::value::take (json_deep_copy (R_all)); + ctx->m_resources_updated = false; + } else + R_all = json_deep_copy (ctx->m_r_all.get ()); + + if (ctx->m_resources_down_updated) { + if (run_find (ctx, "status=down", "rv1_nosched", &R_down) < 0) + goto error; + ctx->m_r_down = json::value::take (json_deep_copy (R_down)); + ctx->m_resources_down_updated = false; + } else + R_down = json_deep_copy (ctx->m_r_down.get ()); + + if (flux_respond_pack (h, + msg, + "{s:o? s:o? s:o?}", + "all", + R_all, + "down", + R_down, + "allocated", + R_alloc) + < 0) { + flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); + goto error; + } + return; + +error: + saved_errno = errno; + json_decref (R_all); + json_decref (R_alloc); + json_decref (R_down); + errno = saved_errno; + if (flux_respond_error (h, msg, errno, nullptr) < 0) + flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); +} + +static void ns_info_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) +{ + uint64_t rank, id, remapped_id; + const char *type_name; + std::shared_ptr ctx = getctx ((flux_t *)arg); + + if (flux_request_unpack (msg, + nullptr, + "{s:I s:s s:I}", + "rank", + &rank, + "type-name", + &type_name, + "id", + &id) + < 0) { + flux_log_error (h, "%s: flux_respond_unpack", __FUNCTION__); + goto error; + } + if (ctx->reader->namespace_remapper.query (rank, type_name, id, remapped_id) < 0) { + flux_log_error (h, "%s: namespace_remapper.query", __FUNCTION__); + goto error; + } + if (remapped_id > static_cast (std::numeric_limits::max ())) { + errno = EOVERFLOW; + flux_log_error (h, "%s: remapped id too large", __FUNCTION__); + goto error; + } + if (flux_respond_pack (h, msg, "{s:I}", "id", static_cast (remapped_id)) < 0) { + flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); + goto error; + } + return; + +error: + if (flux_respond_error (h, msg, errno, nullptr) < 0) + flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); +} + +static void params_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) +{ + int saved_errno; + json_error_t jerr; + std::string params; + json_t *o{nullptr}; + std::shared_ptr ctx = getctx ((flux_t *)arg); + + if (ctx->opts.jsonify (params) < 0) + goto error; + if (!(o = json_loads (params.c_str (), 0, &jerr))) { + errno = ENOMEM; + goto error; + } + if (flux_respond_pack (h, msg, "{s:o}", "params", o) < 0) { + flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); + goto error; + } + + flux_log (h, LOG_DEBUG, "%s: params succeeded", __FUNCTION__); + return; + +error: + if (o) { + saved_errno = errno; + json_decref (o); + errno = saved_errno; + } + if (flux_respond_error (h, msg, errno, nullptr) < 0) + flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); +} + +/* + * Mark a vertex as up or down + */ +static void set_status_request_cb (flux_t *h, + flux_msg_handler_t *w, + const flux_msg_t *msg, + void *arg) +{ + const char *rp = NULL, *st = NULL; + std::string resource_path = "", status = "", errmsg = ""; + std::shared_ptr ctx = getctx ((flux_t *)arg); + resource_pool_t::string_to_status sts = resource_pool_t::str_to_status; + std::map>::const_iterator it{}; + resource_pool_t::string_to_status::iterator status_it{}; + + if (flux_request_unpack (msg, NULL, "{s:s, s:s}", "resource_path", &rp, "status", &st) < 0) { + errmsg = "malformed RPC"; + goto error; + } + resource_path = rp; + status = st; + // check that the path/vertex exists + it = ctx->db->metadata.by_path.find (resource_path); + if (it == ctx->db->metadata.by_path.end ()) { + errmsg = "could not find path '" + resource_path + "' in resource graph"; + goto error; + } + // check that the status given is valid ('up' or 'down') + status_it = sts.find (status); + if (status_it == sts.end ()) { + errmsg = "unrecognized status '" + status + "'"; + goto error; + } + // mark the vertex + if (ctx->traverser->mark (resource_path, status_it->second) < 0) { + flux_log_error (h, + "%s: traverser::mark: %s", + __FUNCTION__, + ctx->traverser->err_message ().c_str ()); + ctx->traverser->clear_err_message (); + errmsg = "Failed to set status of resource vertex"; + goto error; + } + ctx->m_resources_down_updated = true; + if (flux_respond (h, msg, NULL) < 0) { + flux_log_error (h, "%s: flux_respond", __FUNCTION__); + } + return; + +error: + if (flux_respond_error (h, msg, EINVAL, errmsg.c_str ()) < 0) + flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); + return; +} + +//////////////////////////////////////////////////////////////////////////////// +// Resource Graph and Traverser Initialization +//////////////////////////////////////////////////////////////////////////////// + +static int init_resource_graph (std::shared_ptr &ctx) +{ + int rc = 0; + + // Select the appropriate matcher based on CLI policy. + if (!(ctx->matcher = create_match_cb (ctx->opts.get_opt ().get_match_policy ()))) { + flux_log (ctx->h, LOG_ERR, "%s: can't create match callback", __FUNCTION__); + return -1; + } + if ((rc = populate_resource_db (ctx)) != 0) { + flux_log (ctx->h, LOG_ERR, "%s: can't populate graph resource database", __FUNCTION__); + return rc; + } + if ((rc = select_subsystems (ctx)) != 0) { + flux_log (ctx->h, + LOG_ERR, + "%s: error processing subsystems %s", + __FUNCTION__, + ctx->opts.get_opt ().get_match_subsystems ().c_str ()); + return rc; + } + + // Create a writers object for matched vertices and edges + match_format_t format = + match_writers_factory_t::get_writers_type (ctx->opts.get_opt ().get_match_format ()); + if (!(ctx->writers = match_writers_factory_t::create (format))) + return -1; + + if (ctx->opts.get_opt ().is_prune_filters_set () + && ctx->matcher->set_pruning_types_w_spec (ctx->matcher->dom_subsystem (), + ctx->opts.get_opt ().get_prune_filters ()) + < 0) { + flux_log (ctx->h, + LOG_ERR, + "%s: error setting pruning types with: %s", + __FUNCTION__, + ctx->opts.get_opt ().get_prune_filters ().c_str ()); + return -1; + } + + // Initialize the DFU traverser + if (ctx->traverser->initialize (ctx->db, ctx->matcher) < 0) { + flux_log (ctx->h, LOG_ERR, "%s: traverser initialization", __FUNCTION__); + return -1; + } + + // Perform the initial status marking only when "up" rankset is available + // Rankless reader cases (required for testing e.g., GRUG) must not + // execute the following branch. + // Use ctx->update_f != nullptr to differentiate + if (ctx->update_f) { + if (mark (ctx, "all", resource_pool_t::status_t::DOWN) < 0) { + flux_log (ctx->h, LOG_ERR, "%s: mark (down)", __FUNCTION__); + return -1; + } + if (ctx->is_ups_set ()) { + if (mark (ctx, ctx->get_ups ().c_str (), resource_pool_t::status_t::UP) < 0) { + flux_log (ctx->h, LOG_ERR, "%s: mark (up)", __FUNCTION__); + return -1; + } + } + } + + // prevent users from consuming unbounded memory with arbitrary resource types + subsystem_t::storage_t::finalize (); + resource_type_t::storage_t::finalize (); + return 0; +} + +//////////////////////////////////////////////////////////////////////////////// +// Module Main +//////////////////////////////////////////////////////////////////////////////// + +extern "C" int mod_main (flux_t *h, int argc, char **argv) +{ + int rc = -1; + + flux_log (h, LOG_INFO, "version %s", PACKAGE_VERSION); + + try { + std::shared_ptr ctx = nullptr; + uint32_t rank = 1; + + if (!(ctx = init_module (h, argc, argv))) { + flux_log (h, LOG_ERR, "%s: can't initialize resource module", __FUNCTION__); + goto done; + } + // Because mod_main is always active, the following is safe. + flux_aux_set (h, mod_name, &ctx, NULL); + flux_log (h, LOG_DEBUG, "%s: resource module starting", __FUNCTION__); + + /* Before beginning synchronous resource.acquire RPC, set module status + * to 'running' to let flux module load return success. + */ + if ((rc = flux_module_set_running (ctx->h)) < 0) { + flux_log_error (ctx->h, "%s: flux_module_set_running", __FUNCTION__); + goto done; + } + if ((rc = init_resource_graph (ctx)) != 0) { + flux_log (h, LOG_ERR, "%s: can't initialize resource graph database", __FUNCTION__); + goto done; + } + flux_log (h, LOG_DEBUG, "%s: resource graph database loaded", __FUNCTION__); + + if ((rc = flux_reactor_run (flux_get_reactor (h), 0)) < 0) { + flux_log (h, LOG_ERR, "%s: flux_reactor_run: %s", __FUNCTION__, strerror (errno)); + goto done; + } + } catch (std::exception &e) { + errno = ENOSYS; + flux_log (h, LOG_ERR, "%s: %s", __FUNCTION__, e.what ()); + return -1; + } catch (...) { + errno = ENOSYS; + flux_log (h, LOG_ERR, "%s: caught unknown exception", __FUNCTION__); + return -1; + } + +done: + return rc; +} + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ diff --git a/resource/modules/resource_match.cpp b/resource/modules/resource_match.cpp index f8554555a..2f99a1890 100644 --- a/resource/modules/resource_match.cpp +++ b/resource/modules/resource_match.cpp @@ -25,6 +25,7 @@ extern "C" { #include #include +#include "resource_match.hpp" #include "resource/schema/resource_graph.hpp" #include "resource/readers/resource_reader_factory.hpp" #include "resource/traversers/dfu.hpp" @@ -44,71 +45,6 @@ extern struct Flux::resource_model::match_perf_t Flux::resource_model::perf; // Resource Matching Service Module Context //////////////////////////////////////////////////////////////////////////////// -class msg_wrap_t { - public: - msg_wrap_t () = default; - msg_wrap_t (const msg_wrap_t &o); - msg_wrap_t &operator= (const msg_wrap_t &o); - ~msg_wrap_t (); - const flux_msg_t *get_msg () const; - void set_msg (const flux_msg_t *msg); - - private: - const flux_msg_t *m_msg = nullptr; -}; - -struct resobj_t { - std::string exec_target_range; - std::vector core; - std::vector gpu; -}; - -class resource_interface_t { - public: - resource_interface_t () = default; - resource_interface_t (const resource_interface_t &o); - resource_interface_t &operator= (const resource_interface_t &o); - - ~resource_interface_t (); - int fetch_and_reset_update_rc (); - int get_update_rc () const; - void set_update_rc (int rc); - - const std::string &get_ups () const; - void set_ups (const char *ups); - bool is_ups_set () const; - flux_future_t *update_f = nullptr; - - private: - std::string m_ups = ""; - int m_update_rc = 0; -}; - -struct resource_ctx_t : public resource_interface_t { - ~resource_ctx_t (); - flux_t *h; /* Flux handle */ - flux_msg_handler_t **handlers; /* Message handlers */ - Flux::opts_manager::optmgr_composer_t - opts; /* Option manager */ - std::shared_ptr matcher; /* Match callback object */ - std::shared_ptr traverser; /* Graph traverser object */ - std::shared_ptr db; /* Resource graph data store */ - std::shared_ptr writers; /* Vertex/Edge writers */ - std::shared_ptr reader; /* resource reader */ - std::map> jobs; /* Jobs table */ - std::map allocations; /* Allocation table */ - std::map reservations; /* Reservation table */ - std::map> notify_msgs; - bool m_resources_updated = true; /* resources have been updated */ - bool m_resources_down_updated = true; /* down resources have been updated */ - /* last time allocated resources search updated */ - std::chrono::time_point m_resources_alloc_updated; - /* R caches */ - json::value m_r_all; - json::value m_r_down; - json::value m_r_alloc; -}; - msg_wrap_t::msg_wrap_t (const msg_wrap_t &o) { m_msg = flux_msg_incref (o.m_msg); @@ -201,253 +137,6 @@ resource_ctx_t::~resource_ctx_t () } } -//////////////////////////////////////////////////////////////////////////////// -// Request Handler Prototypes -//////////////////////////////////////////////////////////////////////////////// - -static void match_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg); - -static void match_multi_request_cb (flux_t *h, - flux_msg_handler_t *w, - const flux_msg_t *msg, - void *arg); - -static void update_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg); - -static void cancel_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg); - -static void partial_cancel_request_cb (flux_t *h, - flux_msg_handler_t *w, - const flux_msg_t *msg, - void *arg); - -static void info_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg); - -static void stat_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg); - -static void stat_clear_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg); - -static void next_jobid_request_cb (flux_t *h, - flux_msg_handler_t *w, - const flux_msg_t *msg, - void *arg); - -static void set_property_request_cb (flux_t *h, - flux_msg_handler_t *w, - const flux_msg_t *msg, - void *arg); - -static void get_property_request_cb (flux_t *h, - flux_msg_handler_t *w, - const flux_msg_t *msg, - void *arg); - -static void notify_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg); - -static void disconnect_request_cb (flux_t *h, - flux_msg_handler_t *w, - const flux_msg_t *msg, - void *arg); - -static void find_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg); - -static void status_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg); - -static void ns_info_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg); - -static void satisfiability_request_cb (flux_t *h, - flux_msg_handler_t *w, - const flux_msg_t *msg, - void *arg); - -static void params_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg); - -static void set_status_request_cb (flux_t *h, - flux_msg_handler_t *w, - const flux_msg_t *msg, - void *arg); - -static const struct flux_msg_handler_spec htab[] = - {{FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.match", match_request_cb, 0}, - {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.match_multi", match_multi_request_cb, 0}, - {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.update", update_request_cb, 0}, - {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.cancel", cancel_request_cb, 0}, - {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.partial-cancel", partial_cancel_request_cb, 0}, - {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.info", info_request_cb, 0}, - {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.stats-get", stat_request_cb, FLUX_ROLE_USER}, - {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.stats-clear", stat_clear_cb, FLUX_ROLE_USER}, - {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.next_jobid", next_jobid_request_cb, 0}, - {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.set_property", set_property_request_cb, 0}, - {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.get_property", get_property_request_cb, 0}, - {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.notify", notify_request_cb, 0}, - {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.disconnect", disconnect_request_cb, 0}, - {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.find", find_request_cb, 0}, - {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.status", status_request_cb, 0}, - {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.ns-info", ns_info_request_cb, 0}, - {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.satisfiability", satisfiability_request_cb, 0}, - {FLUX_MSGTYPE_REQUEST, "feasibility.check", satisfiability_request_cb, 0}, - {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.params", params_request_cb, 0}, - {FLUX_MSGTYPE_REQUEST, "sched-fluxion-resource.set_status", set_status_request_cb, 0}, - FLUX_MSGHANDLER_TABLE_END}; - -//////////////////////////////////////////////////////////////////////////////// -// Module Initialization Routines -//////////////////////////////////////////////////////////////////////////////// - -static void set_default_args (std::shared_ptr &ctx) -{ - resource_opts_t ct_opts; - ct_opts.set_load_format ("rv1exec"); - ct_opts.set_match_subsystems ("containment"); - ct_opts.set_match_policy ("first"); - ct_opts.set_prune_filters ("ALL:core"); - ct_opts.set_match_format ("rv1_nosched"); - ct_opts.set_update_interval (0); - ctx->opts += ct_opts; -} - -static std::shared_ptr getctx (flux_t *h) -{ - void *d = NULL; - std::shared_ptr ctx = nullptr; - - if ((d = flux_aux_get (h, "sched-fluxion-resource")) != NULL) - ctx = *(static_cast *> (d)); - if (!ctx) { - try { - ctx = std::make_shared (); - ctx->traverser = std::make_shared (); - ctx->db = std::make_shared (); - } catch (std::bad_alloc &e) { - errno = ENOMEM; - goto done; - } - ctx->h = h; - ctx->handlers = NULL; - set_default_args (ctx); - ctx->matcher = nullptr; /* Cannot be allocated at this point */ - ctx->writers = nullptr; /* Cannot be allocated at this point */ - ctx->reader = nullptr; /* Cannot be allocated at this point */ - ctx->m_resources_updated = true; - ctx->m_resources_down_updated = true; - ctx->m_resources_alloc_updated = std::chrono::system_clock::now (); - } - -done: - return ctx; -} - -static int process_args (std::shared_ptr &ctx, int argc, char **argv) -{ - int rc = 0; - optmgr_kv_t opts_store; - std::string info_str = ""; - - for (int i = 0; i < argc; i++) { - const std::string kv (argv[i]); - if ((rc = opts_store.put (kv)) < 0) { - flux_log_error (ctx->h, "%s: optmgr_kv_t::put (%s)", __FUNCTION__, argv[i]); - return rc; - } - } - if ((rc = opts_store.parse (info_str)) < 0) { - flux_log_error (ctx->h, "%s: optmgr_kv_t::parse: %s", __FUNCTION__, info_str.c_str ()); - return rc; - } - if (info_str != "") { - flux_log (ctx->h, LOG_DEBUG, "%s: %s", __FUNCTION__, info_str.c_str ()); - } - ctx->opts += opts_store.get_opt (); - return rc; -} - -static int process_config_file (std::shared_ptr &ctx) -{ - int rc = 0; - json_t *conf = nullptr; - - if ((rc = flux_conf_unpack (flux_get_conf (ctx->h), - nullptr, - "{ s?:o }", - "sched-fluxion-resource", - &conf)) - < 0) { - flux_log_error (ctx->h, "%s: flux_conf_unpack", __FUNCTION__); - return rc; - } - - const char *k = nullptr; - char *tmp = nullptr; - json_t *v = nullptr; - optmgr_kv_t opts_store; - std::string info_str = ""; - json_object_foreach (conf, k, v) { - std::string value; - if (!(tmp = json_dumps (v, JSON_ENCODE_ANY | JSON_COMPACT))) { - errno = ENOMEM; - return -1; - } - value = tmp; - free (tmp); - tmp = nullptr; - if (json_typeof (v) == JSON_STRING) - value = value.substr (1, value.length () - 2); - if ((rc = opts_store.put (k, value)) < 0) { - flux_log_error (ctx->h, - "%s: optmgr_kv_t::put (%s, %s)", - __FUNCTION__, - k, - value.c_str ()); - return rc; - } - } - if ((rc = opts_store.parse (info_str)) < 0) { - flux_log_error (ctx->h, "%s: optmgr_kv_t::parse: %s", __FUNCTION__, info_str.c_str ()); - return rc; - } - if (info_str != "") { - flux_log (ctx->h, LOG_DEBUG, "%s: %s", __FUNCTION__, info_str.c_str ()); - } - ctx->opts += opts_store.get_opt (); - return rc; -} - -static std::shared_ptr init_module (flux_t *h, int argc, char **argv) -{ - std::shared_ptr ctx = nullptr; - uint32_t rank = 1; - - if (!(ctx = getctx (h))) { - flux_log (h, LOG_ERR, "%s: can't allocate the context", __FUNCTION__); - return nullptr; - } - if (flux_get_rank (h, &rank) < 0) { - flux_log (h, LOG_ERR, "%s: can't determine rank", __FUNCTION__); - goto error; - } - if (rank) { - flux_log (h, LOG_ERR, "%s: resource module must only run on rank 0", __FUNCTION__); - goto error; - } - if (process_config_file (ctx) < 0) { - flux_log_error (h, "%s: config file parsing", __FUNCTION__); - goto error; - } - if (process_args (ctx, argc, argv) < 0) { - flux_log_error (h, "%s: load line argument parsing", __FUNCTION__); - goto error; - } - ctx->opts.canonicalize (); - if (flux_msg_handler_addvec (h, htab, (void *)h, &ctx->handlers) < 0) { - flux_log_error (h, "%s: error registering resource event handler", __FUNCTION__); - goto error; - } - return ctx; - -error: - return nullptr; -} - //////////////////////////////////////////////////////////////////////////////// // Resource Graph and Traverser Initialization //////////////////////////////////////////////////////////////////////////////// @@ -514,6 +203,177 @@ static int populate_resource_db_file (std::shared_ptr &ctx) return rc; } +static void update_resource (flux_future_t *f, void *arg) +{ + int rc = -1; + const char *up = NULL; + const char *down = NULL; + double expiration = -1.; + json_t *resources = NULL; + + std::shared_ptr &ctx = *(static_cast *> (arg)); + + if (rc = flux_rpc_get_unpack (f, + "{s?:o s?:s s?:s s?:F}", + "resources", + &resources, + "up", + &up, + "down", + &down, + "expiration", + &expiration) + < 0) { + flux_log_error (ctx->h, + ctx->m_acquire_resources_from_core ? "%s: exiting due to resource.acquire " + "failure" + : "%s: exiting due to " + "sched-fluxion-resource.notify " + "failure", + __FUNCTION__); + flux_reactor_stop (flux_get_reactor (ctx->h)); /* Cancels notify msgs */ + goto done; + } + if ((rc = update_resource_db (ctx, resources, up, down)) < 0) { + flux_log_error (ctx->h, "%s: update_resource_db", __FUNCTION__); + goto done; + } + if (expiration >= 0.) { + /* Update graph duration: + */ + ctx->db->metadata.graph_duration.graph_end = + std::chrono::system_clock::from_time_t ((time_t)expiration); + flux_log (ctx->h, LOG_INFO, "resource expiration updated to %.2f", expiration); + } + + if (ctx->m_acquire_resources_from_core) { + // Store initial set of resources to broadcast to other fluxion modules + // via sched-fluxion-resource.notify + if (resources != NULL) + ctx->m_acquired_resources = json::value (resources); + if (expiration >= 0.) + ctx->m_acquired_resources_expiration = expiration; + + // Broadcast UP/DOWN updates to subscribed fluxion modules. + // There are no subscribers until the first notify_request_cb, + // which must happen after the first run of update_resource + // TODO actually broadcast the UP/DOWN info + for (auto &kv : ctx->notify_msgs) { + if (rc += flux_respond (ctx->h, kv.second->get_msg (), NULL) < 0) { + flux_log_error (ctx->h, "%s: flux_respond", __FUNCTION__); + } + } + } +done: + flux_future_reset (f); + ctx->set_update_rc (rc); +} + +static void update_resource_no_up_down (flux_future_t *f, void *arg) +{ + int rc = -1; + + std::shared_ptr &ctx = *(static_cast *> (arg)); + + if (rc = flux_rpc_get (f, NULL) < 0) { + flux_log_error (ctx->h, + "%s: exiting due to sched-fluxion-resource.notify failure", + __FUNCTION__); + flux_reactor_stop (flux_get_reactor (ctx->h)); + } + flux_future_reset (f); + ctx->set_update_rc (rc); +} + +static int populate_resource_db_acquire (std::shared_ptr &ctx) +{ + int rc = -1; + json_t *o = NULL; + + // If this module is not getting resources from core, use + // sched-fluxion-resource.notify instead of resource.acquire to avoid + // using more than one resource.acquire RPC, which is not allowed + if (!(ctx->update_f = flux_rpc (ctx->h, + ctx->m_acquire_resources_from_core ? "resource.acquire" + : "sched-fluxion-resource." + "notify", + NULL, + FLUX_NODEID_ANY, + FLUX_RPC_STREAMING))) { + flux_log_error (ctx->h, "%s: flux_rpc", __FUNCTION__); + goto done; + } + + update_resource (ctx->update_f, static_cast (&ctx)); + + if ((rc = ctx->fetch_and_reset_update_rc ()) < 0) { + flux_log_error (ctx->h, "%s: update_resource", __FUNCTION__); + goto done; + } + // Only add full update_resource callback if the module needs UP/DOWN updates + // Otherwise, add update_resource_no_up_down callback to get error updates + if (ctx->m_get_up_down_updates) { + if (rc = flux_future_then (ctx->update_f, -1.0, update_resource, static_cast (&ctx)) + < 0) { + flux_log_error (ctx->h, "%s: flux_future_then", __FUNCTION__); + goto done; + } + } else { + if (rc = flux_future_then (ctx->update_f, + -1.0, + update_resource_no_up_down, + static_cast (&ctx)) + < 0) { + flux_log_error (ctx->h, "%s: flux_future_then", __FUNCTION__); + goto done; + } + } + +done: + return rc; +} + +int populate_resource_db (std::shared_ptr &ctx) +{ + int rc = -1; + std::chrono::time_point start; + std::chrono::duration elapsed; + + if (ctx->opts.get_opt ().is_reserve_vtx_vec_set ()) + ctx->db->resource_graph.m_vertices.reserve (ctx->opts.get_opt ().get_reserve_vtx_vec ()); + + start = std::chrono::system_clock::now (); + if (ctx->opts.get_opt ().is_load_file_set ()) { + if (populate_resource_db_file (ctx) < 0) + goto done; + flux_log (ctx->h, + LOG_INFO, + "%s: loaded resources from %s", + __FUNCTION__, + ctx->opts.get_opt ().get_load_file ().c_str ()); + } else { + if (populate_resource_db_acquire (ctx) < 0) { + flux_log (ctx->h, LOG_ERR, "%s: populate_resource_db_acquire", __FUNCTION__); + goto done; + } + flux_log (ctx->h, + LOG_DEBUG, + ctx->m_acquire_resources_from_core ? "%s: loaded resources from core's " + "resource.acquire" + : "%s: loaded resources from " + "sched-fluxion-resource.notify", + __FUNCTION__); + } + + elapsed = std::chrono::system_clock::now () - start; + perf.load = elapsed.count (); + perf.graph_uptime = std::chrono::system_clock::now (); + rc = 0; + +done: + return rc; +} + /* Add resources associated with 'rank' execution target, * defined by hwloc_xml. This function may be called with * rank == IDSET_INVALID_ID, to instantiate an empty graph. @@ -1217,21 +1077,19 @@ static int mark_now (std::shared_ptr &ctx, return rc; } -static int mark (std::shared_ptr &ctx, - const char *ids, - resource_pool_t::status_t status) +int mark (std::shared_ptr &ctx, const char *ids, resource_pool_t::status_t status) { return (ctx->traverser->is_initialized ()) ? mark_now (ctx, ids, status) : mark_lazy (ctx, ids, status); } -static int update_resource_db (std::shared_ptr &ctx, - json_t *resources, - const char *up, - const char *down) +int update_resource_db (std::shared_ptr &ctx, + json_t *resources, + const char *up, + const char *down) { int rc = 0; - // Will need to get duration update and set graph metadata when + // TODO Will need to get duration update and set graph metadata when // resource.acquire duration update is supported in the future. if (resources && (rc = grow_resource_db (ctx, resources)) < 0) { flux_log_error (ctx->h, "%s: grow_resource_db", __FUNCTION__); @@ -1249,226 +1107,45 @@ static int update_resource_db (std::shared_ptr &ctx, return rc; } -static void update_resource (flux_future_t *f, void *arg) -{ - int rc = -1; - const char *up = NULL; - const char *down = NULL; - double expiration = -1.; - json_t *resources = NULL; - std::shared_ptr ctx = getctx ((flux_t *)arg); - - if ((rc = flux_rpc_get_unpack (f, - "{s?:o s?:s s?:s s?:F}", - "resources", - &resources, - "up", - &up, - "down", - &down, - "expiration", - &expiration)) - < 0) { - flux_log_error (ctx->h, "%s: exiting due to resource.acquire failure", __FUNCTION__); - flux_reactor_stop (flux_get_reactor (ctx->h)); - goto done; - } - if ((rc = update_resource_db (ctx, resources, up, down)) < 0) { - flux_log_error (ctx->h, "%s: update_resource_db", __FUNCTION__); - goto done; - } - if (expiration >= 0.) { - /* Update graph duration: - */ - ctx->db->metadata.graph_duration.graph_end = - std::chrono::system_clock::from_time_t ((time_t)expiration); - flux_log (ctx->h, LOG_INFO, "resource expiration updated to %.2f", expiration); - } - for (auto &kv : ctx->notify_msgs) { - if ((rc += flux_respond (ctx->h, kv.second->get_msg (), NULL)) < 0) { - flux_log_error (ctx->h, "%s: flux_respond", __FUNCTION__); - } - } -done: - flux_future_reset (f); - ctx->set_update_rc (rc); -} - -static int populate_resource_db_acquire (std::shared_ptr &ctx) +int select_subsystems (std::shared_ptr &ctx) { - int rc = -1; - json_t *o = NULL; - - if (!(ctx->update_f = - flux_rpc (ctx->h, "resource.acquire", NULL, FLUX_NODEID_ANY, FLUX_RPC_STREAMING))) { - flux_log_error (ctx->h, "%s: flux_rpc", __FUNCTION__); - goto done; - } + /* + * Format of match_subsystems + * subsystem1[:relation1[:relation2...]],subsystem2[... + */ + int rc = 0; + std::stringstream ss (ctx->opts.get_opt ().get_match_subsystems ()); + subsystem_t subsystem; + std::string token; - update_resource (ctx->update_f, static_cast (ctx->h)); - if ((rc = ctx->fetch_and_reset_update_rc ()) < 0) { - flux_log_error (ctx->h, "%s: update_resource", __FUNCTION__); - goto done; + while (getline (ss, token, ',')) { + size_t found = token.find_first_of (":"); + if (found == std::string::npos) { + subsystem = subsystem_t{token}; + if (!ctx->db->known_subsystem (subsystem)) { + rc = -1; + errno = EINVAL; + goto done; + } + ctx->matcher->add_subsystem (subsystem, "*"); + } else { + subsystem = subsystem_t{token.substr (0, found)}; + if (!ctx->db->known_subsystem (subsystem)) { + rc = -1; + errno = EINVAL; + goto done; + } + std::stringstream relations (token.substr (found + 1, std::string::npos)); + std::string relation; + while (getline (relations, relation, ':')) + ctx->matcher->add_subsystem (subsystem, relation); + } } - if ((rc = flux_future_then (ctx->update_f, -1.0, update_resource, static_cast (ctx->h))) - < 0) { - flux_log_error (ctx->h, "%s: flux_future_then", __FUNCTION__); - goto done; - } done: return rc; } -static int populate_resource_db (std::shared_ptr &ctx) -{ - int rc = -1; - std::chrono::time_point start; - std::chrono::duration elapsed; - - if (ctx->opts.get_opt ().is_reserve_vtx_vec_set ()) - ctx->db->resource_graph.m_vertices.reserve (ctx->opts.get_opt ().get_reserve_vtx_vec ()); - - start = std::chrono::system_clock::now (); - if (ctx->opts.get_opt ().is_load_file_set ()) { - if (populate_resource_db_file (ctx) < 0) - goto done; - flux_log (ctx->h, - LOG_INFO, - "%s: loaded resources from %s", - __FUNCTION__, - ctx->opts.get_opt ().get_load_file ().c_str ()); - } else { - if (populate_resource_db_acquire (ctx) < 0) { - flux_log (ctx->h, - LOG_ERR, - "%s: loading resources using resource.acquire", - __FUNCTION__); - goto done; - } - flux_log (ctx->h, - LOG_INFO, - "%s: loaded resources from core's resource.acquire", - __FUNCTION__); - } - - elapsed = std::chrono::system_clock::now () - start; - perf.load = elapsed.count (); - perf.graph_uptime = std::chrono::system_clock::now (); - rc = 0; - -done: - return rc; -} - -static int select_subsystems (std::shared_ptr &ctx) -{ - /* - * Format of match_subsystems - * subsystem1[:relation1[:relation2...]],subsystem2[... - */ - int rc = 0; - std::stringstream ss (ctx->opts.get_opt ().get_match_subsystems ()); - subsystem_t subsystem; - std::string token; - - while (getline (ss, token, ',')) { - size_t found = token.find_first_of (":"); - if (found == std::string::npos) { - subsystem = subsystem_t{token}; - if (!ctx->db->known_subsystem (subsystem)) { - rc = -1; - errno = EINVAL; - goto done; - } - ctx->matcher->add_subsystem (subsystem, "*"); - } else { - subsystem = subsystem_t{token.substr (0, found)}; - if (!ctx->db->known_subsystem (subsystem)) { - rc = -1; - errno = EINVAL; - goto done; - } - std::stringstream relations (token.substr (found + 1, std::string::npos)); - std::string relation; - while (getline (relations, relation, ':')) - ctx->matcher->add_subsystem (subsystem, relation); - } - } - -done: - return rc; -} - -static int init_resource_graph (std::shared_ptr &ctx) -{ - int rc = 0; - - // Select the appropriate matcher based on CLI policy. - if (!(ctx->matcher = create_match_cb (ctx->opts.get_opt ().get_match_policy ()))) { - flux_log (ctx->h, LOG_ERR, "%s: can't create match callback", __FUNCTION__); - return -1; - } - if ((rc = populate_resource_db (ctx)) != 0) { - flux_log (ctx->h, LOG_ERR, "%s: can't populate graph resource database", __FUNCTION__); - return rc; - } - if ((rc = select_subsystems (ctx)) != 0) { - flux_log (ctx->h, - LOG_ERR, - "%s: error processing subsystems %s", - __FUNCTION__, - ctx->opts.get_opt ().get_match_subsystems ().c_str ()); - return rc; - } - - // Create a writers object for matched vertices and edges - match_format_t format = - match_writers_factory_t::get_writers_type (ctx->opts.get_opt ().get_match_format ()); - if (!(ctx->writers = match_writers_factory_t::create (format))) - return -1; - - if (ctx->opts.get_opt ().is_prune_filters_set () - && ctx->matcher->set_pruning_types_w_spec (ctx->matcher->dom_subsystem (), - ctx->opts.get_opt ().get_prune_filters ()) - < 0) { - flux_log (ctx->h, - LOG_ERR, - "%s: error setting pruning types with: %s", - __FUNCTION__, - ctx->opts.get_opt ().get_prune_filters ().c_str ()); - return -1; - } - - // Initialize the DFU traverser - if (ctx->traverser->initialize (ctx->db, ctx->matcher) < 0) { - flux_log (ctx->h, LOG_ERR, "%s: traverser initialization", __FUNCTION__); - return -1; - } - - // Perform the initial status marking only when "up" rankset is available - // Rankless reader cases (required for testing e.g., GRUG) must not - // execute the following branch. - // Use ctx->update_f != nullptr to differentiate - if (ctx->update_f) { - if (mark (ctx, "all", resource_pool_t::status_t::DOWN) < 0) { - flux_log (ctx->h, LOG_ERR, "%s: mark (down)", __FUNCTION__); - return -1; - } - if (ctx->is_ups_set ()) { - if (mark (ctx, ctx->get_ups ().c_str (), resource_pool_t::status_t::UP) < 0) { - flux_log (ctx->h, LOG_ERR, "%s: mark (up)", __FUNCTION__); - return -1; - } - } - } - - // prevent users from consuming unbounded memory with arbitrary resource types - subsystem_t::storage_t::finalize (); - resource_type_t::storage_t::finalize (); - return 0; -} - //////////////////////////////////////////////////////////////////////////////// // Request Handler Routines //////////////////////////////////////////////////////////////////////////////// @@ -1481,16 +1158,6 @@ static void update_match_perf (double elapsed, int64_t jobid, bool match_success perf.failed.update_stats (elapsed, jobid, perf.tmp_iter_count); } -static inline std::string get_status_string (int64_t now, int64_t at) -{ - return (at == now) ? "ALLOCATED" : "RESERVED"; -} - -static inline bool is_existent_jobid (const std::shared_ptr &ctx, uint64_t jobid) -{ - return (ctx->jobs.find (jobid) != ctx->jobs.end ()) ? true : false; -} - static int track_schedule_info (std::shared_ptr &ctx, int64_t id, bool reserved, @@ -1596,7 +1263,7 @@ static int parse_R (std::shared_ptr &ctx, return rc; } -static int Rlite_equal (const std::shared_ptr &ctx, const char *R1, const char *R2) +int Rlite_equal (const std::shared_ptr &ctx, const char *R1, const char *R2) { int rc = -1; int saved_errno; @@ -1723,15 +1390,15 @@ static int run (std::shared_ptr &ctx, return rc; } -static int run_match (std::shared_ptr &ctx, - int64_t jobid, - const char *cmd, - const std::string &jstr, - int64_t *now, - int64_t *at, - double *overhead, - std::stringstream &o, - flux_error_t *errp) +int run_match (std::shared_ptr &ctx, + int64_t jobid, + const char *cmd, + const std::string &jstr, + int64_t *now, + int64_t *at, + double *overhead, + std::stringstream &o, + flux_error_t *errp) { int rc = 0; std::chrono::time_point start; @@ -1781,12 +1448,12 @@ static int run_match (std::shared_ptr &ctx, return rc; } -static int run_update (std::shared_ptr &ctx, - int64_t jobid, - const char *R, - int64_t &at, - double &overhead, - std::stringstream &o) +int run_update (std::shared_ptr &ctx, + int64_t jobid, + const char *R, + int64_t &at, + double &overhead, + std::stringstream &o) { int rc = 0; uint64_t duration = 0; @@ -1823,88 +1490,11 @@ static int run_update (std::shared_ptr &ctx, return rc; } -static void update_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) -{ - char *R = NULL; - int64_t at = 0; - double overhead = 0.0f; - int64_t jobid = 0; - uint64_t duration = 0; - std::string status = ""; - std::stringstream o; - std::chrono::time_point start; - std::chrono::duration elapsed; - - std::shared_ptr ctx = getctx ((flux_t *)arg); - if (flux_request_unpack (msg, NULL, "{s:I s:s}", "jobid", &jobid, "R", &R) < 0) { - flux_log_error (ctx->h, "%s: flux_request_unpack", __FUNCTION__); - goto error; - } - if (is_existent_jobid (ctx, jobid)) { - int rc = 0; - start = std::chrono::system_clock::now (); - if ((rc = Rlite_equal (ctx, R, ctx->jobs[jobid]->R.c_str ())) < 0) { - flux_log_error (ctx->h, "%s: Rlite_equal", __FUNCTION__); - goto error; - } else if (rc == 1) { - errno = EINVAL; - flux_log (ctx->h, - LOG_ERR, - "%s: jobid (%jd) with different R exists!", - __FUNCTION__, - static_cast (jobid)); - goto error; - } - elapsed = std::chrono::system_clock::now () - start; - // If a jobid with matching R exists, no need to update - overhead = elapsed.count (); - get_jobstate_str (ctx->jobs[jobid]->state, status); - o << ctx->jobs[jobid]->R; - at = ctx->jobs[jobid]->scheduled_at; - flux_log (ctx->h, - LOG_DEBUG, - "%s: jobid (%jd) with matching R exists", - __FUNCTION__, - static_cast (jobid)); - } else if (run_update (ctx, jobid, R, at, overhead, o) < 0) { - flux_log_error (ctx->h, - "%s: update failed (id=%jd)", - __FUNCTION__, - static_cast (jobid)); - goto error; - } - - if (status == "") - status = get_status_string (at, at); - - if (flux_respond_pack (h, - msg, - "{s:I s:s s:f s:s s:I}", - "jobid", - jobid, - "status", - status.c_str (), - "overhead", - overhead, - "R", - o.str ().c_str (), - "at", - at) - < 0) - flux_log_error (h, "%s", __FUNCTION__); - - return; - -error: - if (flux_respond_error (h, msg, errno, NULL) < 0) - flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); -} - -static int run_remove (std::shared_ptr &ctx, - int64_t jobid, - const char *R, - bool part_cancel, - bool &full_removal) +int run_remove (std::shared_ptr &ctx, + int64_t jobid, + const char *R, + bool part_cancel, + bool &full_removal) { int rc = -1; dfu_traverser_t &tr = *(ctx->traverser); @@ -1954,720 +1544,10 @@ static int run_remove (std::shared_ptr &ctx, return rc; } -static void match_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) -{ - int64_t at = 0; - int64_t now = 0; - int64_t jobid = -1; - double overhead = 0.0f; - std::string status = ""; - const char *cmd = NULL; - const char *js_str = NULL; - std::stringstream R; - - std::shared_ptr ctx = getctx ((flux_t *)arg); - if (flux_request_unpack (msg, - NULL, - "{s:s s:I s:s}", - "cmd", - &cmd, - "jobid", - &jobid, - "jobspec", - &js_str) - < 0) - goto error; - if (is_existent_jobid (ctx, jobid)) { - errno = EINVAL; - flux_log_error (h, "%s: existent job (%jd).", __FUNCTION__, (intmax_t)jobid); - goto error; - } - if (run_match (ctx, jobid, cmd, js_str, &now, &at, &overhead, R, NULL) < 0) { - if (errno != EBUSY && errno != ENODEV) - flux_log_error (ctx->h, - "%s: match failed due to match error (id=%jd)", - __FUNCTION__, - (intmax_t)jobid); - // The resources couldn't be allocated *or reserved* - // Kicking back to qmanager, remove from tracking - if (errno == EBUSY) { - ctx->jobs.erase (jobid); - } - goto error; - } - - status = get_status_string (now, at); - if (flux_respond_pack (h, - msg, - "{s:I s:s s:f s:s s:I}", - "jobid", - jobid, - "status", - status.c_str (), - "overhead", - overhead, - "R", - R.str ().c_str (), - "at", - at) - < 0) - flux_log_error (h, "%s", __FUNCTION__); - - return; - -error: - if (flux_respond_error (h, msg, errno, NULL) < 0) - flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); -} - -static void match_multi_request_cb (flux_t *h, - flux_msg_handler_t *w, - const flux_msg_t *msg, - void *arg) -{ - size_t index; - json_t *value; - json_error_t err; - int saved_errno; - json_t *jobs = nullptr; - uint64_t jobid = 0; - std::string errmsg; - const char *cmd = nullptr; - const char *jobs_str = nullptr; - std::shared_ptr ctx = getctx ((flux_t *)arg); - - if (!flux_msg_is_streaming (msg)) { - errno = EPROTO; - goto error; - } - if (flux_request_unpack (msg, NULL, "{s:s s:s}", "cmd", &cmd, "jobs", &jobs_str) < 0) - goto error; - if (!(jobs = json_loads (jobs_str, 0, &err))) { - errno = ENOMEM; - goto error; - } - - json_array_foreach (jobs, index, value) { - const char *js_str; - int64_t at = 0; - int64_t now = 0; - double overhead = 0.0f; - std::string status = ""; - std::stringstream R; - - if (json_unpack (value, "{s:I s:s}", "jobid", &jobid, "jobspec", &js_str) < 0) - goto error; - if (is_existent_jobid (ctx, jobid)) { - errno = EINVAL; - flux_log_error (h, - "%s: existent job (%jd).", - __FUNCTION__, - static_cast (jobid)); - goto error; - } - if (run_match (ctx, jobid, cmd, js_str, &now, &at, &overhead, R, NULL) < 0) { - if (errno != EBUSY && errno != ENODEV) - flux_log_error (ctx->h, - "%s: match failed due to match error (id=%jd)", - __FUNCTION__, - static_cast (jobid)); - // The resources couldn't be allocated *or reserved* - // Kicking back to qmanager, remove from tracking - if (errno == EBUSY) { - ctx->jobs.erase (jobid); - } - goto error; - } - - status = get_status_string (now, at); - if (flux_respond_pack (h, - msg, - "{s:I s:s s:f s:s s:I}", - "jobid", - jobid, - "status", - status.c_str (), - "overhead", - overhead, - "R", - R.str ().c_str (), - "at", - at) - < 0) { - flux_log_error (h, "%s", __FUNCTION__); - goto error; - } - } - errno = ENODATA; - jobid = 0; -error: - if (jobs) { - saved_errno = errno; - json_decref (jobs); - errno = saved_errno; - } - if (jobid != 0) - errmsg += "jobid=" + std::to_string (jobid); - if (flux_respond_error (h, msg, errno, !errmsg.empty () ? errmsg.c_str () : nullptr) < 0) - flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); -} - -static void cancel_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) -{ - std::shared_ptr ctx = getctx ((flux_t *)arg); - int64_t jobid = -1; - char *R = NULL; - bool full_removal = true; - - if (flux_request_unpack (msg, NULL, "{s:I}", "jobid", &jobid) < 0) - goto error; - if (ctx->allocations.find (jobid) != ctx->allocations.end ()) - ctx->allocations.erase (jobid); - else if (ctx->reservations.find (jobid) != ctx->reservations.end ()) - ctx->reservations.erase (jobid); - else { - errno = ENOENT; - flux_log (h, LOG_DEBUG, "%s: nonexistent job (id=%jd)", __FUNCTION__, (intmax_t)jobid); - goto error; - } - - if (run_remove (ctx, jobid, R, false, full_removal) < 0) { - flux_log_error (h, - "%s: remove fails due to match error (id=%jd)", - __FUNCTION__, - (intmax_t)jobid); - goto error; - } - if (flux_respond_pack (h, msg, "{}") < 0) - flux_log_error (h, "%s", __FUNCTION__); - - return; - -error: - if (flux_respond_error (h, msg, errno, NULL) < 0) - flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); -} - -static void partial_cancel_request_cb (flux_t *h, - flux_msg_handler_t *w, - const flux_msg_t *msg, - void *arg) -{ - std::shared_ptr ctx = getctx ((flux_t *)arg); - int64_t jobid = -1; - char *R = NULL; - decltype (ctx->allocations)::iterator jobid_it; - bool full_removal = false; - int int_full_removal = 0; - - if (flux_request_unpack (msg, NULL, "{s:I s:s}", "jobid", &jobid, "R", &R) < 0) - goto error; - - jobid_it = ctx->allocations.find (jobid); - if (jobid_it == ctx->allocations.end ()) { - errno = ENOENT; - flux_log (h, - LOG_DEBUG, - "%s: job (id=%jd) not found in allocations", - __FUNCTION__, - (intmax_t)jobid); - goto error; - } - - if (run_remove (ctx, jobid, R, true, full_removal) < 0) { - flux_log_error (h, - "%s: remove fails due to match error (id=%jd)", - __FUNCTION__, - (intmax_t)jobid); - goto error; - } - int_full_removal = full_removal; - if (flux_respond_pack (h, msg, "{s:i}", "full-removal", int_full_removal) < 0) - flux_log_error (h, "%s", __FUNCTION__); - - if (full_removal) - ctx->allocations.erase (jobid_it); - - return; - -error: - if (flux_respond_error (h, msg, errno, NULL) < 0) - flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); -} - -static void info_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) -{ - std::shared_ptr ctx = getctx ((flux_t *)arg); - int64_t jobid = -1; - std::shared_ptr info = NULL; - std::string status = ""; - - if (flux_request_unpack (msg, NULL, "{s:I}", "jobid", &jobid) < 0) - goto error; - if (!is_existent_jobid (ctx, jobid)) { - errno = ENOENT; - flux_log (h, LOG_DEBUG, "%s: nonexistent job (id=%jd)", __FUNCTION__, (intmax_t)jobid); - goto error; - } - - info = ctx->jobs[jobid]; - get_jobstate_str (info->state, status); - if (flux_respond_pack (h, - msg, - "{s:I s:s s:I s:f}", - "jobid", - jobid, - "status", - status.c_str (), - "at", - info->scheduled_at, - "overhead", - info->overhead) - < 0) - flux_log_error (h, "%s", __FUNCTION__); - - return; - -error: - if (flux_respond_error (h, msg, errno, NULL) < 0) - flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); -} - -static int get_stat_by_rank (std::shared_ptr &ctx, json_t *o) -{ - int rc = -1; - int saved_errno = 0; - char *str = nullptr; - struct idset *ids = nullptr; - std::map s2r; - - for (auto &kv : ctx->db->metadata.by_rank) { - if (kv.first == -1) - continue; - if (s2r.find (kv.second.size ()) == s2r.end ()) { - if (!(ids = idset_create (0, IDSET_FLAG_AUTOGROW))) - goto done; - s2r[kv.second.size ()] = ids; - } - if ((rc = idset_set (s2r[kv.second.size ()], static_cast (kv.first))) < 0) - goto done; - } - - for (auto &kv : s2r) { - if (!(str = idset_encode (kv.second, IDSET_FLAG_BRACKETS | IDSET_FLAG_RANGE))) { - rc = -1; - goto done; - } - if ((rc = json_object_set_new (o, str, json_integer (static_cast (kv.first)))) - < 0) { - errno = ENOMEM; - goto done; - } - saved_errno = errno; - free (str); - errno = saved_errno; - str = nullptr; - } - -done: - for (auto &kv : s2r) - idset_destroy (kv.second); - saved_errno = errno; - s2r.clear (); - free (str); - errno = saved_errno; - return rc; -} - -static void stat_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) -{ - std::shared_ptr ctx = getctx ((flux_t *)arg); - int saved_errno; - json::value o; - json_t *match_succeeded = nullptr; - json_t *match_failed = nullptr; - double avg = 0.0f; - double min = 0.0f; - double variance = 0.0f; - // Failed match stats - double avg_failed = 0.0f; - double min_failed = 0.0f; - double variance_failed = 0.0f; - int64_t graph_uptime_s = 0; - int64_t time_since_reset_s = 0; - std::chrono::time_point now; - - if (perf.succeeded.njobs_reset > 1) { - avg = perf.succeeded.avg; - min = perf.succeeded.min; - // Welford's online algorithm - variance = perf.succeeded.M2 / (double)perf.succeeded.njobs_reset; - } - if (perf.failed.njobs_reset > 1) { - avg_failed = perf.failed.avg; - min_failed = perf.failed.min; - // Welford's online algorithm - variance_failed = perf.failed.M2 / (double)perf.failed.njobs_reset; - } - if (!(o = json::value::take (json_object ()))) { - errno = ENOMEM; - goto error; - } - if (get_stat_by_rank (ctx, o.get ()) < 0) { - flux_log_error (h, "%s: get_stat_by_rank", __FUNCTION__); - goto error; - } - - if (!(match_succeeded = json_pack ("{s:I s:I s:I s:I s:{s:f s:f s:f s:f}}", - "njobs", - perf.succeeded.njobs, - "njobs-reset", - perf.succeeded.njobs_reset, - "max-match-jobid", - perf.succeeded.max_match_jobid, - "max-match-iters", - perf.succeeded.match_iter_count, - "stats", - "min", - min, - "max", - perf.succeeded.max, - "avg", - avg, - "variance", - variance))) { - errno = ENOMEM; - goto error; - } - if (!(match_failed = json_pack ("{s:I s:I s:I s:I s:{s:f s:f s:f s:f}}", - "njobs", - perf.failed.njobs, - "njobs-reset", - perf.failed.njobs_reset, - "max-match-jobid", - perf.failed.max_match_jobid, - "max-match-iters", - perf.failed.match_iter_count, - "stats", - "min", - min_failed, - "max", - perf.failed.max, - "avg", - avg_failed, - "variance", - variance_failed))) { - errno = ENOMEM; - goto error; - } - now = std::chrono::system_clock::now (); - graph_uptime_s = - std::chrono::duration_cast (now - perf.graph_uptime).count (); - time_since_reset_s = - std::chrono::duration_cast (now - perf.time_of_last_reset).count (); - - if (flux_respond_pack (h, - msg, - "{s:I s:I s:O s:f s:I s:I s:{s:O s:O}}", - "V", - num_vertices (ctx->db->resource_graph), - "E", - num_edges (ctx->db->resource_graph), - "by_rank", - o.get (), - "load-time", - perf.load, - "graph-uptime", - graph_uptime_s, - "time-since-reset", - time_since_reset_s, - "match", - "succeeded", - match_succeeded, - "failed", - match_failed) - < 0) { - flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); - } - json_decref (match_succeeded); - json_decref (match_failed); - - return; - -error: - saved_errno = errno; - json_decref (match_succeeded); - json_decref (match_failed); - errno = saved_errno; - if (flux_respond_error (h, msg, errno, NULL) < 0) - flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); -} - -static void stat_clear_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) -{ - std::shared_ptr ctx = getctx ((flux_t *)arg); - - perf.time_of_last_reset = std::chrono::system_clock::now (); - // Clear the jobs-related stats and reset time - perf.succeeded.njobs_reset = 0; - perf.succeeded.max_match_jobid = -1; - perf.succeeded.match_iter_count = -1; - perf.succeeded.min = std::numeric_limits::max (); - perf.succeeded.max = 0.0; - perf.succeeded.avg = 0.0; - perf.succeeded.M2 = 0.0; - // Failed match stats - perf.failed.njobs_reset = 0; - perf.failed.max_match_jobid = -1; - perf.failed.match_iter_count = -1; - perf.failed.min = std::numeric_limits::max (); - perf.failed.max = 0.0; - perf.failed.avg = 0.0; - perf.failed.M2 = 0.0; - - if (flux_respond (h, msg, NULL) < 0) - flux_log_error (h, "%s: flux_respond", __FUNCTION__); -} - -static inline int64_t next_jobid (const std::map> &m) -{ - int64_t jobid = -1; - if (m.empty ()) - jobid = 0; - else if (m.rbegin ()->first < INT64_MAX) - jobid = m.rbegin ()->first + 1; - return jobid; -} - -/* Needed for testing only */ -static void next_jobid_request_cb (flux_t *h, - flux_msg_handler_t *w, - const flux_msg_t *msg, - void *arg) -{ - std::shared_ptr ctx = getctx ((flux_t *)arg); - int64_t jobid = -1; - - if ((jobid = next_jobid (ctx->jobs)) < 0) { - errno = ERANGE; - goto error; - } - if (flux_respond_pack (h, msg, "{s:I}", "jobid", jobid) < 0) - flux_log_error (h, "%s", __FUNCTION__); - - return; - -error: - if (flux_respond_error (h, msg, errno, NULL) < 0) - flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); -} - -static void set_property_request_cb (flux_t *h, - flux_msg_handler_t *w, - const flux_msg_t *msg, - void *arg) -{ - const char *rp = NULL, *kv = NULL; - std::string resource_path = "", keyval = ""; - std::string property_key = "", property_value = ""; - size_t pos; - std::shared_ptr ctx = getctx ((flux_t *)arg); - std::map>::const_iterator it; - std::pair::iterator, bool> ret; - vtx_t v; - - if (flux_request_unpack (msg, NULL, "{s:s s:s}", "sp_resource_path", &rp, "sp_keyval", &kv) < 0) - goto error; - - resource_path = rp; - keyval = kv; - - pos = keyval.find ('='); - - if (pos == 0 || (pos == keyval.size () - 1) || pos == std::string::npos) { - errno = EINVAL; - flux_log_error (h, "%s: Incorrect format.", __FUNCTION__); - flux_log_error (h, "%s: Use set-property PROPERTY=VALUE", __FUNCTION__); - goto error; - } - - property_key = keyval.substr (0, pos); - property_value = keyval.substr (pos + 1); - - it = ctx->db->metadata.by_path.find (resource_path); - - if (it == ctx->db->metadata.by_path.end ()) { - errno = ENOENT; - flux_log_error (h, - "%s: Couldn't find %s in resource graph.", - __FUNCTION__, - resource_path.c_str ()); - goto error; - } - - for (auto &v : it->second) { - ret = ctx->db->resource_graph[v].properties.insert ( - std::pair (property_key, property_value)); - - if (ret.second == false) { - ctx->db->resource_graph[v].properties.erase (property_key); - ctx->db->resource_graph[v].properties.insert ( - std::pair (property_key, property_value)); - } - } - - if (flux_respond_pack (h, msg, "{}") < 0) - flux_log_error (h, "%s", __FUNCTION__); - - return; - -error: - if (flux_respond_error (h, msg, errno, NULL) < 0) - flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); -} - -static void get_property_request_cb (flux_t *h, - flux_msg_handler_t *w, - const flux_msg_t *msg, - void *arg) -{ - const char *rp = NULL, *gp_key = NULL; - std::string resource_path = "", property_key = ""; - std::shared_ptr ctx = getctx ((flux_t *)arg); - std::map>::const_iterator it; - std::map::const_iterator p_it; - vtx_t v; - std::vector resp_values; - json_t *resp_array = nullptr; - - if (flux_request_unpack (msg, NULL, "{s:s s:s}", "gp_resource_path", &rp, "gp_key", &gp_key) - < 0) - goto error; - - resource_path = rp; - property_key = gp_key; - - it = ctx->db->metadata.by_path.find (resource_path); - - if (it == ctx->db->metadata.by_path.end ()) { - errno = ENOENT; - flux_log_error (h, - "%s: Couldn't find %s in resource graph.", - __FUNCTION__, - resource_path.c_str ()); - goto error; - } - - for (auto &v : it->second) { - for (p_it = ctx->db->resource_graph[v].properties.begin (); - p_it != ctx->db->resource_graph[v].properties.end (); - p_it++) { - if (property_key.compare (p_it->first) == 0) - resp_values.push_back (p_it->second); - } - } - if (resp_values.empty ()) { - errno = ENOENT; - flux_log_error (h, - "%s: Property %s was not found for resource %s.", - __FUNCTION__, - property_key.c_str (), - resource_path.c_str ()); - goto error; - } - - if (!(resp_array = json_array ())) { - errno = ENOMEM; - goto error; - } - for (auto &resp_value : resp_values) { - json_t *value = nullptr; - if (!(value = json_string (resp_value.c_str ()))) { - errno = EINVAL; - goto error; - } - if (json_array_append_new (resp_array, value) < 0) { - json_decref (value); - errno = EINVAL; - goto error; - } - } - if (flux_respond_pack (h, msg, "{s:o}", "values", resp_array) < 0) - flux_log_error (h, "%s", __FUNCTION__); - - return; - -error: - if (flux_respond_error (h, msg, errno, NULL) < 0) - flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); -} - -static void disconnect_request_cb (flux_t *h, - flux_msg_handler_t *w, - const flux_msg_t *msg, - void *arg) -{ - const char *route; - std::shared_ptr ctx = getctx ((flux_t *)arg); - - if (!(route = flux_msg_route_first (msg))) { - flux_log_error (h, "%s: flux_msg_route_first", __FUNCTION__); - return; - } - if (ctx->notify_msgs.find (route) != ctx->notify_msgs.end ()) { - ctx->notify_msgs.erase (route); - flux_log (h, LOG_DEBUG, "%s: a notify request aborted", __FUNCTION__); - } -} - -static void notify_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) -{ - try { - const char *route; - std::shared_ptr ctx = getctx ((flux_t *)arg); - std::shared_ptr m = std::make_shared (); - - if (flux_request_decode (msg, NULL, NULL) < 0) { - flux_log_error (h, "%s: flux_request_decode", __FUNCTION__); - goto error; - } - if (!flux_msg_is_streaming (msg)) { - errno = EPROTO; - flux_log_error (h, "%s: streaming flag not set", __FUNCTION__); - goto error; - } - if (!(route = flux_msg_route_first (msg))) { - flux_log_error (h, "%s: flux_msg_route_first", __FUNCTION__); - goto error; - } - - m->set_msg (msg); - auto ret = ctx->notify_msgs.insert ( - std::pair> (route, m)); - if (!ret.second) { - errno = EEXIST; - flux_log_error (h, "%s: insert", __FUNCTION__); - goto error; - } - - if (flux_respond (ctx->h, msg, NULL) < 0) { - flux_log_error (ctx->h, "%s: flux_respond", __FUNCTION__); - goto error; - } - } catch (std::bad_alloc &e) { - errno = ENOMEM; - } - return; - -error: - if (flux_respond_error (h, msg, errno, NULL) < 0) - flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); -} - -static int run_find (std::shared_ptr &ctx, - const std::string &criteria, - const std::string &format_str, - json_t **R) +int run_find (std::shared_ptr &ctx, + const std::string &criteria, + const std::string &format_str, + json_t **R) { int rc = -1; json_t *o = nullptr; @@ -2697,352 +1577,6 @@ static int run_find (std::shared_ptr &ctx, return rc; } -static void find_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) -{ - json_t *R = nullptr; - int saved_errno; - const char *criteria = nullptr; - const char *format_str = "rv1_nosched"; - std::shared_ptr ctx = getctx ((flux_t *)arg); - - if (flux_request_unpack (msg, - nullptr, - "{s:s, s?:s}", - "criteria", - &criteria, - "format", - &format_str) - < 0) - goto error; - - if (run_find (ctx, criteria, format_str, &R) < 0) - goto error; - if (flux_respond_pack (h, msg, "{s:o?}", "R", R) < 0) { - flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); - goto error; - } - - flux_log (h, LOG_DEBUG, "%s: find succeeded", __FUNCTION__); - return; - -error: - saved_errno = errno; - json_decref (R); - errno = saved_errno; - if (flux_respond_error (h, msg, errno, nullptr) < 0) - flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); -} - -static void status_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) -{ - int saved_errno; - json_t *R_all = nullptr; - json_t *R_down = nullptr; - json_t *R_alloc = nullptr; - std::chrono::time_point now; - std::chrono::duration elapsed; - std::shared_ptr ctx = getctx ((flux_t *)arg); - - now = std::chrono::system_clock::now (); - elapsed = now - ctx->m_resources_alloc_updated; - // Get R alloc whenever m_resources_alloc_updated or - // the elapsed time is greater than configured limit - if ((elapsed.count () > static_cast (ctx->opts.get_opt ().get_update_interval ())) - || ctx->m_resources_updated) { - if (run_find (ctx, "sched-now=allocated", "rv1_nosched", &R_alloc) < 0) - goto error; - ctx->m_r_alloc = json_deep_copy (R_alloc); - ctx->m_resources_alloc_updated = std::chrono::system_clock::now (); - } else - R_alloc = json_deep_copy (ctx->m_r_alloc.get ()); - - if (ctx->m_resources_updated) { - if (run_find (ctx, "status=up or status=down", "rv1_nosched", &R_all) < 0) - goto error; - ctx->m_r_all = json::value::take (json_deep_copy (R_all)); - ctx->m_resources_updated = false; - } else - R_all = json_deep_copy (ctx->m_r_all.get ()); - - if (ctx->m_resources_down_updated) { - if (run_find (ctx, "status=down", "rv1_nosched", &R_down) < 0) - goto error; - ctx->m_r_down = json::value::take (json_deep_copy (R_down)); - ctx->m_resources_down_updated = false; - } else - R_down = json_deep_copy (ctx->m_r_down.get ()); - - if (flux_respond_pack (h, - msg, - "{s:o? s:o? s:o?}", - "all", - R_all, - "down", - R_down, - "allocated", - R_alloc) - < 0) { - flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); - goto error; - } - return; - -error: - saved_errno = errno; - json_decref (R_all); - json_decref (R_alloc); - json_decref (R_down); - errno = saved_errno; - if (flux_respond_error (h, msg, errno, nullptr) < 0) - flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); -} - -static void ns_info_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) -{ - uint64_t rank, id, remapped_id; - const char *type_name; - std::shared_ptr ctx = getctx ((flux_t *)arg); - - if (flux_request_unpack (msg, - nullptr, - "{s:I s:s s:I}", - "rank", - &rank, - "type-name", - &type_name, - "id", - &id) - < 0) { - flux_log_error (h, "%s: flux_respond_unpack", __FUNCTION__); - goto error; - } - if (ctx->reader->namespace_remapper.query (rank, type_name, id, remapped_id) < 0) { - flux_log_error (h, "%s: namespace_remapper.query", __FUNCTION__); - goto error; - } - if (remapped_id > static_cast (std::numeric_limits::max ())) { - errno = EOVERFLOW; - flux_log_error (h, "%s: remapped id too large", __FUNCTION__); - goto error; - } - if (flux_respond_pack (h, msg, "{s:I}", "id", static_cast (remapped_id)) < 0) { - flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); - goto error; - } - return; - -error: - if (flux_respond_error (h, msg, errno, nullptr) < 0) - flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); -} - -static void satisfiability_request_cb (flux_t *h, - flux_msg_handler_t *w, - const flux_msg_t *msg, - void *arg) -{ - int64_t at = 0; - int64_t now = 0; - double overhead = 0.0f; - int saved_errno = 0; - std::stringstream R; - json_t *jobspec = nullptr; - const char *js_str = nullptr; - std::string errmsg; - flux_error_t error; - std::shared_ptr ctx = getctx ((flux_t *)arg); - - if (flux_request_unpack (msg, NULL, "{s:o}", "jobspec", &jobspec) < 0) - goto error; - if (!(js_str = json_dumps (jobspec, JSON_INDENT (0)))) { - errno = ENOMEM; - goto error; - } - error.text[0] = '\0'; - if (run_match (ctx, -1, "satisfiability", js_str, &now, &at, &overhead, R, &error) < 0) { - if (errno == ENODEV) - errmsg = "Unsatisfiable request"; - else { - errmsg = "Internal match error: "; - errmsg += error.text; - } - goto error_memfree; - } - free ((void *)js_str); - if (flux_respond (h, msg, NULL) < 0) - flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); - return; - -error_memfree: - saved_errno = errno; - free ((void *)js_str); - errno = saved_errno; -error: - if (flux_respond_error (h, msg, errno, errmsg.c_str ()) < 0) - flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); -} - -static void params_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) -{ - int saved_errno; - json_error_t jerr; - std::string params; - json_t *o{nullptr}; - std::shared_ptr ctx = getctx ((flux_t *)arg); - - if (ctx->opts.jsonify (params) < 0) - goto error; - if (!(o = json_loads (params.c_str (), 0, &jerr))) { - errno = ENOMEM; - goto error; - } - if (flux_respond_pack (h, msg, "{s:o}", "params", o) < 0) { - flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__); - goto error; - } - - flux_log (h, LOG_DEBUG, "%s: params succeeded", __FUNCTION__); - return; - -error: - if (o) { - saved_errno = errno; - json_decref (o); - errno = saved_errno; - } - if (flux_respond_error (h, msg, errno, nullptr) < 0) - flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); -} - -/* - * Mark a vertex as up or down - */ -static void set_status_request_cb (flux_t *h, - flux_msg_handler_t *w, - const flux_msg_t *msg, - void *arg) -{ - const char *rp = NULL, *st = NULL; - std::string resource_path = "", status = "", errmsg = ""; - std::shared_ptr ctx = getctx ((flux_t *)arg); - resource_pool_t::string_to_status sts = resource_pool_t::str_to_status; - std::map>::const_iterator it{}; - resource_pool_t::string_to_status::iterator status_it{}; - - if (flux_request_unpack (msg, NULL, "{s:s, s:s}", "resource_path", &rp, "status", &st) < 0) { - errmsg = "malformed RPC"; - goto error; - } - resource_path = rp; - status = st; - // check that the path/vertex exists - it = ctx->db->metadata.by_path.find (resource_path); - if (it == ctx->db->metadata.by_path.end ()) { - errmsg = "could not find path '" + resource_path + "' in resource graph"; - goto error; - } - // check that the status given is valid ('up' or 'down') - status_it = sts.find (status); - if (status_it == sts.end ()) { - errmsg = "unrecognized status '" + status + "'"; - goto error; - } - // mark the vertex - if (ctx->traverser->mark (resource_path, status_it->second) < 0) { - flux_log_error (h, - "%s: traverser::mark: %s", - __FUNCTION__, - ctx->traverser->err_message ().c_str ()); - ctx->traverser->clear_err_message (); - errmsg = "Failed to set status of resource vertex"; - goto error; - } - ctx->m_resources_down_updated = true; - if (flux_respond (h, msg, NULL) < 0) { - flux_log_error (h, "%s: flux_respond", __FUNCTION__); - } - return; - -error: - if (flux_respond_error (h, msg, EINVAL, errmsg.c_str ()) < 0) - flux_log_error (h, "%s: flux_respond_error", __FUNCTION__); - return; -} - -static int register_feasibility (flux_t *h) -{ - flux_future_t *f; - int rc; - - if (!(f = flux_service_register (h, "feasibility"))) - return -1; - rc = flux_future_get (f, NULL); - flux_future_destroy (f); - return rc; -} - -//////////////////////////////////////////////////////////////////////////////// -// Module Main -//////////////////////////////////////////////////////////////////////////////// - -extern "C" int mod_main (flux_t *h, int argc, char **argv) -{ - int rc = -1; - - flux_log (h, LOG_INFO, "version %s", PACKAGE_VERSION); - - try { - std::shared_ptr ctx = nullptr; - uint32_t rank = 1; - - if (!(ctx = init_module (h, argc, argv))) { - flux_log (h, LOG_ERR, "%s: can't initialize resource module", __FUNCTION__); - goto done; - } - // Because mod_main is always active, the following is safe. - flux_aux_set (h, "sched-fluxion-resource", &ctx, NULL); - flux_log (h, LOG_DEBUG, "%s: resource module starting", __FUNCTION__); - - /* Attempt to register the feasibility service. Print a warning - * if this fails (likely because sched-simple is still loaded), but - * do not make it a fatal error. - */ - if (register_feasibility (h) < 0) - flux_log (ctx->h, LOG_WARNING, "unable to register feasibility service"); - - /* Before beginning synchronous resource.acquire RPC, set module status - * to 'running' to let flux module load return success. - */ - if ((rc = flux_module_set_running (ctx->h)) < 0) { - flux_log_error (ctx->h, "%s: flux_module_set_running", __FUNCTION__); - goto done; - } - if ((rc = init_resource_graph (ctx)) != 0) { - flux_log (h, LOG_ERR, "%s: can't initialize resource graph database", __FUNCTION__); - goto done; - } - flux_log (h, LOG_DEBUG, "%s: resource graph database loaded", __FUNCTION__); - - if ((rc = flux_reactor_run (flux_get_reactor (h), 0)) < 0) { - flux_log (h, LOG_ERR, "%s: flux_reactor_run: %s", __FUNCTION__, strerror (errno)); - goto done; - } - } catch (std::exception &e) { - errno = ENOSYS; - flux_log (h, LOG_ERR, "%s: %s", __FUNCTION__, e.what ()); - return -1; - } catch (...) { - errno = ENOSYS; - flux_log (h, LOG_ERR, "%s: caught unknown exception", __FUNCTION__); - return -1; - } - -done: - return rc; -} - -MOD_NAME ("sched-fluxion-resource"); - /* * vi:tabstop=4 shiftwidth=4 expandtab */ diff --git a/resource/modules/resource_match.hpp b/resource/modules/resource_match.hpp new file mode 100644 index 000000000..40ccb2af6 --- /dev/null +++ b/resource/modules/resource_match.hpp @@ -0,0 +1,182 @@ +/*****************************************************************************\ + * Copyright 2014 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, LICENSE) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\*****************************************************************************/ + +#ifndef RESOURCE_MATCH_HPP +#define RESOURCE_MATCH_HPP + +extern "C" { +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include +} + +#include +#include +#include +#include +#include +#include +#include + +#include "resource_match.hpp" +#include "resource/schema/resource_graph.hpp" +#include "resource/readers/resource_reader_factory.hpp" +#include "resource/traversers/dfu.hpp" +#include "resource/jobinfo/jobinfo.hpp" +#include "resource/policies/dfu_match_policy_factory.hpp" +#include "resource_match_opts.hpp" +#include "resource/schema/perf_data.hpp" +#include + +using namespace Flux::resource_model; +using namespace Flux::opts_manager; + +//////////////////////////////////////////////////////////////////////////////// +// Resource Matching Service Module Context +//////////////////////////////////////////////////////////////////////////////// + +class msg_wrap_t { + public: + msg_wrap_t () = default; + msg_wrap_t (const msg_wrap_t &o); + msg_wrap_t &operator= (const msg_wrap_t &o); + ~msg_wrap_t (); + const flux_msg_t *get_msg () const; + void set_msg (const flux_msg_t *msg); + + private: + const flux_msg_t *m_msg = nullptr; +}; + +struct resobj_t { + std::string exec_target_range; + std::vector core; + std::vector gpu; +}; + +class resource_interface_t { + public: + resource_interface_t () = default; + resource_interface_t (const resource_interface_t &o); + resource_interface_t &operator= (const resource_interface_t &o); + + ~resource_interface_t (); + int fetch_and_reset_update_rc (); + int get_update_rc () const; + void set_update_rc (int rc); + + const std::string &get_ups () const; + void set_ups (const char *ups); + bool is_ups_set () const; + flux_future_t *update_f = nullptr; + + private: + std::string m_ups = ""; + int m_update_rc = 0; +}; + +struct resource_ctx_t : public resource_interface_t { + ~resource_ctx_t (); + flux_t *h; /* Flux handle */ + flux_msg_handler_t **handlers; /* Message handlers */ + Flux::opts_manager::optmgr_composer_t + opts; /* Option manager */ + std::shared_ptr matcher; /* Match callback object */ + std::shared_ptr traverser; /* Graph traverser object */ + std::shared_ptr db; /* Resource graph data store */ + std::shared_ptr writers; /* Vertex/Edge writers */ + std::shared_ptr reader; /* resource reader */ + std::map> jobs; /* Jobs table */ + std::map allocations; /* Allocation table */ + std::map reservations; /* Reservation table */ + std::map> notify_msgs; + bool m_resources_updated = true; /* resources have been updated */ + bool m_resources_down_updated = true; /* down resources have been updated */ + /* last time allocated resources search updated */ + std::chrono::time_point m_resources_alloc_updated; + /* R caches */ + json::value m_r_all; + json::value m_r_down; + json::value m_r_alloc; + + /* Resource acquire behavior */ + bool m_get_up_down_updates = true; + bool m_acquire_resources_from_core = false; /* s.-f.-resource only */ + /* All resources from resource.acquire */ + json::value m_acquired_resources = nullptr; + double m_acquired_resources_expiration = -1.; +}; + +//////////////////////////////////////////////////////////////////////////////// +// Request Handler Routines +//////////////////////////////////////////////////////////////////////////////// + +inline std::string get_status_string (int64_t now, int64_t at) +{ + return (at == now) ? "ALLOCATED" : "RESERVED"; +} + +inline bool is_existent_jobid (const std::shared_ptr &ctx, uint64_t jobid) +{ + return (ctx->jobs.find (jobid) != ctx->jobs.end ()) ? true : false; +} + +int Rlite_equal (const std::shared_ptr &ctx, const char *R1, const char *R2); + +int run_match (std::shared_ptr &ctx, + int64_t jobid, + const char *cmd, + const std::string &jstr, + int64_t *now, + int64_t *at, + double *overhead, + std::stringstream &o, + flux_error_t *errp); + +int run_update (std::shared_ptr &ctx, + int64_t jobid, + const char *R, + int64_t &at, + double &overhead, + std::stringstream &o); + +int run_remove (std::shared_ptr &ctx, + int64_t jobid, + const char *R, + bool part_cancel, + bool &full_removal); + +int run_find (std::shared_ptr &ctx, + const std::string &criteria, + const std::string &format_str, + json_t **R); + +//////////////////////////////////////////////////////////////////////////////// +// Resource Graph and Traverser Initialization +//////////////////////////////////////////////////////////////////////////////// + +int populate_resource_db (std::shared_ptr &ctx); + +int mark (std::shared_ptr &ctx, const char *ids, resource_pool_t::status_t status); + +int update_resource_db (std::shared_ptr &ctx, + json_t *resources, + const char *up, + const char *down); + +int select_subsystems (std::shared_ptr &ctx); + +#endif // RESOURCE_MATCH_HPP + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ diff --git a/resource/modules/resource_match_opts.cpp b/resource/modules/resource_match_opts.cpp index 9e6179ae8..23d6236e0 100644 --- a/resource/modules/resource_match_opts.cpp +++ b/resource/modules/resource_match_opts.cpp @@ -454,7 +454,6 @@ resource_opts_t &resource_opts_t::operator+= (const resource_opts_t &src) bool resource_opts_t::operator() (const std::string &k1, const std::string &k2) const { if (m_tab.find (k1) == m_tab.end () || m_tab.find (k2) == m_tab.end ()) - return k1 < k2; return m_tab.at (k1) < m_tab.at (k2); } @@ -509,6 +508,9 @@ int resource_opts_t::parse (const std::string &k, const std::string &v, std::str case static_cast (resource_opts_key_t::LOAD_ALLOWLIST): m_resource_prop.set_load_allowlist (v); + info += k; + info += "; "; + info += v; break; case static_cast (resource_opts_key_t::MATCH_POLICY): diff --git a/src/cmd/flux-ion-resource.py b/src/cmd/flux-ion-resource.py index e184e079b..f80d0d64f 100755 --- a/src/cmd/flux-ion-resource.py +++ b/src/cmd/flux-ion-resource.py @@ -109,7 +109,7 @@ def rpc_namespace_info(self, rank, type_name, identity): def rpc_satisfiability(self, jobspec): payload = {"jobspec": jobspec} - return self.handle.rpc("sched-fluxion-resource.satisfiability", payload).get() + return self.handle.rpc("feasibility.check", payload).get() def rpc_params(self): return self.handle.rpc("sched-fluxion-resource.params").get() diff --git a/t/CMakeLists.txt b/t/CMakeLists.txt index 82368ac13..67f5b82eb 100644 --- a/t/CMakeLists.txt +++ b/t/CMakeLists.txt @@ -83,6 +83,7 @@ set(ALL_TESTS t4011-match-duration.t t4012-set-status.t t4013-unreservable.sh + t4014-match-feasibility.t t5000-valgrind.t t5100-issues-test-driver.t t6000-graph-size.t diff --git a/t/sharness.d/sched-sharness.sh b/t/sharness.d/sched-sharness.sh index 48b6961db..116311536 100644 --- a/t/sharness.d/sched-sharness.sh +++ b/t/sharness.d/sched-sharness.sh @@ -69,6 +69,10 @@ load_resource () { flux module load sched-fluxion-resource "$@" } +load_feasibility () { + flux module load sched-fluxion-feasibility "$@" +} + reload_qmanager () { flux module reload -f sched-fluxion-qmanager "$@" } @@ -86,6 +90,10 @@ reload_resource () { flux module reload -f sched-fluxion-resource "$@" } +reload_feasibility () { + flux module reload -f sched-fluxion-feasibility "$@" +} + remove_qmanager () { flux module remove sched-fluxion-qmanager "$@" } @@ -94,6 +102,10 @@ remove_resource () { flux module remove sched-fluxion-resource "$@" } +remove_feasibility () { + flux module remove sched-fluxion-feasibility "$@" +} + # Usage: load_test_resources hwloc-dir # where hwloc-dir contains .xml files load_test_resources () { diff --git a/t/t1020-qmanager-feasibility.t b/t/t1020-qmanager-feasibility.t index 44c7b1fb4..b041b4860 100755 --- a/t/t1020-qmanager-feasibility.t +++ b/t/t1020-qmanager-feasibility.t @@ -15,9 +15,10 @@ test_expect_success 'feasibility: loading test resources works' ' load_test_resources ${excl_1N1B} ' -test_expect_success 'feasibility: loading resource and qmanager modules works' ' +test_expect_success 'feasibility: loading resource, feasibility, and qmanager modules works' ' flux module load sched-fluxion-resource prune-filters=ALL:core \ subsystems=containment policy=low && + load_feasibility && load_qmanager ' @@ -73,8 +74,9 @@ test_expect_success 'feasibility: cleanup active jobs' ' cleanup_active_jobs ' -test_expect_success 'feasibility: removing resource and qmanager modules' ' +test_expect_success 'feasibility: removing resource, feasibility, and qmanager modules' ' remove_qmanager && + remove_feasibility && remove_resource ' diff --git a/t/t4005-match-unsat.t b/t/t4005-match-unsat.t index 0e5bb861b..fb451200f 100755 --- a/t/t4005-match-unsat.t +++ b/t/t4005-match-unsat.t @@ -47,13 +47,6 @@ match allocate_with_satisfiability ${jobspec1} && match allocate_with_satisfiability ${jobspec1} ' -test_expect_success 'jobspec is still satisfiable even when no available resources' ' - flux ion-resource match satisfiability ${jobspec1} && - flux ion-resource match satisfiability ${jobspec1} && - flux ion-resource match satisfiability ${jobspec1} && - flux ion-resource match satisfiability ${jobspec1} -' - test_expect_success 'satisfiability returns ENODEV on unsatisfiable jobspec' ' test_expect_code 19 flux ion-resource \ match allocate_with_satisfiability ${jobspec2} && @@ -62,9 +55,7 @@ match allocate_with_satisfiability ${jobspec2} && test_expect_code 19 flux ion-resource \ match allocate_with_satisfiability ${jobspec2} && test_expect_code 19 flux ion-resource \ -match allocate_with_satisfiability ${jobspec2} && - test_expect_code 19 flux ion-resource \ -match satisfiability ${jobspec2} +match allocate_with_satisfiability ${jobspec2} ' test_expect_success 'removing resource works' ' diff --git a/t/t4010-match-conf.t b/t/t4010-match-conf.t index f25268489..c9a7bf9c4 100755 --- a/t/t4010-match-conf.t +++ b/t/t4010-match-conf.t @@ -22,19 +22,27 @@ start_resource () { RESOURCE_OPTIONS=$* echo $RESOURCE_OPTIONS > options flux broker --config-path=${config} bash -c \ +"flux dmesg --clear && "\ "flux module reload -f sched-fluxion-resource ${RESOURCE_OPTIONS} && "\ "flux module reload -f sched-fluxion-qmanager && "\ "flux module stats sched-fluxion-qmanager && "\ -"flux ion-resource params >${outfile}" +"flux module reload -f sched-fluxion-feasibility ${RESOURCE_OPTIONS} && "\ +"flux ion-resource params >${outfile} && "\ +"flux dmesg && "\ +"flux config get" } start_resource_noconfig () { local outfile=$1; shift RESOURCE_OPTIONS=$* flux broker bash -c \ +"flux dmesg --clear && "\ "flux module reload -f sched-fluxion-resource ${RESOURCE_OPTIONS} && "\ "flux module reload -f sched-fluxion-qmanager && "\ "flux module stats sched-fluxion-qmanager && "\ -"flux ion-resource params >${outfile}" +"flux module reload -f sched-fluxion-feasibility ${RESOURCE_OPTIONS} && "\ +"flux ion-resource params >${outfile} && "\ +"flux dmesg && "\ +"flux config get" } check_load_file(){ test $(jq '.params."load-file"' ${1}) = ${2} @@ -71,13 +79,15 @@ test_expect_success 'resource: sched-fluxion-resource loads with no config' ' check_match_format ${outfile} "\"rv1_nosched\"" && check_match_subsystems ${outfile} "\"containment\"" && check_reserve_vtx_vec ${outfile} 0 && - check_prune_filters ${outfile} "\"ALL:core\"" + check_prune_filters ${outfile} "\"ALL:core\"" && + cat ${outfile} ' test_expect_success 'resource: sched-fluxion-resource loads with valid toml' ' conf_name="01-default" && outfile=${conf_name}.out && start_resource ${conf_base}/${conf_name} ${outfile} && + cat ${outfile} && check_load_file ${outfile} null && check_load_format ${outfile} "\"rv1exec\"" && check_load_allowlist ${outfile} "\"node,core,gpu\"" && @@ -85,7 +95,8 @@ test_expect_success 'resource: sched-fluxion-resource loads with valid toml' ' check_match_format ${outfile} "\"rv1_nosched\"" && check_match_subsystems ${outfile} "\"containment\"" && check_reserve_vtx_vec ${outfile} 200000 && - check_prune_filters ${outfile} "\"ALL:core,ALL:gpu\"" + check_prune_filters ${outfile} "\"ALL:core,ALL:gpu\"" && + cat ${outfile} ' test_expect_success 'resource: module load options take precedence' ' @@ -93,6 +104,7 @@ test_expect_success 'resource: module load options take precedence' ' outfile=${conf_name}.out && start_resource ${conf_base}/${conf_name} ${outfile} \ policy=high match-format=rv1 && + cat ${outfile} && check_load_file ${outfile} null && check_load_format ${outfile} "\"rv1exec\"" && check_load_allowlist ${outfile} "\"node,core,gpu\"" && @@ -100,7 +112,8 @@ test_expect_success 'resource: module load options take precedence' ' check_match_format ${outfile} "\"rv1\"" && check_match_subsystems ${outfile} "\"containment\"" && check_reserve_vtx_vec ${outfile} 200000 && - check_prune_filters ${outfile} "\"ALL:core,ALL:gpu\"" + check_prune_filters ${outfile} "\"ALL:core,ALL:gpu\"" && + cat ${outfile} ' test_expect_success 'resource: sched-fluxion-resource loads with no keys' ' @@ -114,13 +127,15 @@ test_expect_success 'resource: sched-fluxion-resource loads with no keys' ' check_match_format ${outfile} "\"rv1_nosched\"" && check_match_subsystems ${outfile} "\"containment\"" && check_reserve_vtx_vec ${outfile} 0 && - check_prune_filters ${outfile} "\"ALL:core\"" + check_prune_filters ${outfile} "\"ALL:core\"" && + cat ${outfile} ' test_expect_success 'resource: sched-fluxion-resource loads with extra keys' ' conf_name="03-extra-keys" && outfile=${conf_name}.out && start_resource ${conf_base}/${conf_name} ${outfile} && + cat ${outfile} && check_load_file ${outfile} null && check_load_format ${outfile} "\"rv1exec\"" && check_load_allowlist ${outfile} "\"node,core,gpu,foo\"" && @@ -128,14 +143,16 @@ test_expect_success 'resource: sched-fluxion-resource loads with extra keys' ' check_match_format ${outfile} "\"rv1_nosched\"" && check_match_subsystems ${outfile} "\"containment\"" && check_reserve_vtx_vec ${outfile} 200000 && - check_prune_filters ${outfile} "\"ALL:core,ALL:gpu\"" + check_prune_filters ${outfile} "\"ALL:core,ALL:gpu\"" && + cat ${outfile} ' test_expect_success 'resource: load must tolerate an invalid policy' ' conf_name="09-invalid-policy" && outfile=${conf_name}.out && start_resource ${conf_base}/${conf_name} ${outfile} && - check_match_policy ${outfile} "\"first\"" + check_match_policy ${outfile} "\"first\"" && + cat ${outfile} ' test_expect_success 'resource: load must fail on a bad value' ' diff --git a/t/t4012-set-status.t b/t/t4012-set-status.t index a6089b53b..551cf2ba7 100755 --- a/t/t4012-set-status.t +++ b/t/t4012-set-status.t @@ -60,7 +60,6 @@ test_expect_success 'jobs fail when all nodes are marked down' ' flux ion-resource set-status /tiny0/rack0/node0 down && flux ion-resource set-status /tiny0/rack0/node1 down && flux ion-resource find status=up | grep null && - flux ion-resource match satisfiability $jobspec && test_must_fail flux ion-resource match allocate $jobspec && flux ion-resource set-status /tiny0/rack0/node0 up && flux ion-resource set-status /tiny0/rack0/node1 up && @@ -71,8 +70,6 @@ test_expect_success 'jobs fail when all racks are marked down' ' flux ion-resource find status=down | grep null && flux ion-resource set-status /tiny0/rack0 down && flux ion-resource find status=up | grep null && - flux ion-resource match satisfiability $jobspec && - test_must_fail flux ion-resource match allocate $jobspec && flux ion-resource set-status /tiny0/rack0 up && flux ion-resource find status=down | grep null ' diff --git a/t/t4014-match-feasibility.t b/t/t4014-match-feasibility.t new file mode 100755 index 000000000..55eaa501b --- /dev/null +++ b/t/t4014-match-feasibility.t @@ -0,0 +1,106 @@ +#!/bin/sh +#set -x + +# Adapted from t4005 + +test_description='Test the basic functionality of match satisfiability +' + +. `dirname $0`/sharness.sh + +grug="${SHARNESS_TEST_SRCDIR}/data/resource/grugs/tiny.graphml" +jobspec1="${SHARNESS_TEST_SRCDIR}/data/resource/jobspecs/basics/test001.yaml" +jobspec2="${SHARNESS_TEST_SRCDIR}/data/resource/jobspecs/satisfiability/test001.yaml" + +test_under_flux 1 + +test_debug ' + echo ${grug} && + echo ${jobspec1} && + echo ${jobspec2} +' + +test_expect_success 'loading feasibility module over sched-simple fails' ' + load_feasibility 2>&1 | grep -q "File exists" +' + +test_expect_success 'removing sched-simple works' ' + flux module remove sched-simple && + flux dmesg -c | grep -q "rmmod sched-simple" +' + +test_expect_success 'loading feasibility module before resource fails' ' + load_feasibility && + flux dmesg -c | grep -q "Function not implemented" +' + +test_expect_success 'loading resource module with a tiny machine config works' ' + load_resource load-file=${grug} load-format=grug \ +prune-filters=ALL:core subsystems=containment policy=high && + test -z "$(flux dmesg -c | grep -q err)" +' + +test_expect_success 'loading feasibility module with a tiny machine config works' ' + load_feasibility load-file=${grug} load-format=grug \ +prune-filters=ALL:core subsystems=containment policy=high && + test -z "$(flux dmesg -c | grep -q err)" +' + +test_expect_success 'satisfiability works with a 1-node, 1-socket jobspec' ' + flux ion-resource match allocate_with_satisfiability ${jobspec1} && + flux ion-resource match allocate_with_satisfiability ${jobspec1} && + flux ion-resource match allocate_with_satisfiability ${jobspec1} && + flux ion-resource match allocate_with_satisfiability ${jobspec1} +' + +test_expect_success 'satisfiability returns EBUSY when no available resources' ' + test_expect_code 16 flux ion-resource \ +match allocate_with_satisfiability ${jobspec1} && + test_expect_code 16 flux ion-resource \ +match allocate_with_satisfiability ${jobspec1} && + test_expect_code 16 flux ion-resource \ +match allocate_with_satisfiability ${jobspec1} && + test_expect_code 16 flux ion-resource \ +match allocate_with_satisfiability ${jobspec1} +' + +test_expect_success 'jobspec is still satisfiable even when no available resources' ' + flux ion-resource match satisfiability ${jobspec1} && + flux ion-resource match satisfiability ${jobspec1} && + flux ion-resource match satisfiability ${jobspec1} && + flux ion-resource match satisfiability ${jobspec1} +' + +test_expect_success 'removing load-file feasibility module works' ' + remove_feasibility && + test -z "$(flux dmesg -c | grep -q err)" +' + +# A resource module that has a load-file will not relay those resources to +# the feasibility module. The feasibility module needs the same load-file. +test_expect_success 'loading feasibility module from load-file resource module fails' ' + load_feasibility && + flux dmesg -c | grep -q err && + ! flux module list | grep -q sched-fluxion-feasib +' + +test_expect_success 'removing resource module works' ' + remove_resource +' + +test_expect_success 'loading non-load-file resource module works' ' + load_resource && + test -z "$(flux dmesg -c | grep -q err)" +' + +test_expect_success 'loading feasibility from non-load-file resource module works' ' + load_feasibility && + test -z "$(flux dmesg -c | grep -q err)" +' + +test_expect_success 'removing resource works and removes feasibility' ' + remove_resource && + flux dmesg -c | grep -q "exiting due to sched-fluxion-resource.notify failure" +' + +test_done