diff --git a/qmanager/modules/qmanager.cpp b/qmanager/modules/qmanager.cpp index 54ee5454a..a0f01765c 100644 --- a/qmanager/modules/qmanager.cpp +++ b/qmanager/modules/qmanager.cpp @@ -280,7 +280,6 @@ static int handshake_jobmanager (std::shared_ptr &ctx) static void status_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_t *msg, void *arg) { - size_t len = 0; const char *payload; flux_future_t *f = NULL; @@ -288,12 +287,12 @@ static void status_request_cb (flux_t *h, flux_msg_handler_t *w, const flux_msg_ flux_log_error (h, "%s: flux_rpc (sched-fluxion-resource.status)", __FUNCTION__); goto out; } - if (flux_rpc_get_raw (f, (const void **)&payload, &len) < 0) { - flux_log_error (h, "%s: flux_rpc_get_raw", __FUNCTION__); + if (flux_rpc_get (f, &payload) < 0) { + flux_log_error (h, "%s: flux_rpc_get", __FUNCTION__); goto out; } - if (flux_respond_raw (h, msg, (const void *)payload, len) < 0) { - flux_log_error (h, "%s: flux_respond_raw", __FUNCTION__); + if (flux_respond (h, msg, payload) < 0) { + flux_log_error (h, "%s: flux_respond", __FUNCTION__); goto out; } flux_future_destroy (f); @@ -310,25 +309,19 @@ static void feasibility_request_cb (flux_t *h, const flux_msg_t *msg, void *arg) { - size_t size = 0; flux_future_t *f = nullptr; const char *data = nullptr; - if (flux_request_decode_raw (msg, nullptr, (const void **)&data, &size) < 0) + if (flux_request_decode (msg, nullptr, &data) < 0) goto error; - if (!(f = flux_rpc_raw (h, - "sched-fluxion-resource.satisfiability", - data, - size, - FLUX_NODEID_ANY, - 0))) { + if (!(f = flux_rpc (h, "sched-fluxion-resource.satisfiability", data, FLUX_NODEID_ANY, 0))) { flux_log_error (h, "%s: flux_rpc (sched-fluxion-resource.satisfiability)", __FUNCTION__); goto error; } - if (flux_rpc_get_raw (f, (const void **)&data, &size) < 0) + if (flux_rpc_get (f, &data) < 0) goto error; - if (flux_respond_raw (h, msg, (const void *)data, size) < 0) { - flux_log_error (h, "%s: flux_respond_raw", __FUNCTION__); + if (flux_respond (h, msg, data) < 0) { + flux_log_error (h, "%s: flux_respond", __FUNCTION__); goto error; } flux_log (h, LOG_DEBUG, "%s: feasibility succeeded", __FUNCTION__); @@ -558,9 +551,14 @@ static std::shared_ptr qmanager_new (flux_t *h) ctx = nullptr; goto done; } + int schedutil_flags = 0; +#ifdef SCHEDUTIL_HELLO_PARTIAL_OK + // flag was added in flux-core 0.70.0 + schedutil_flags |= SCHEDUTIL_HELLO_PARTIAL_OK; +#endif if (!(ctx->schedutil = schedutil_create (ctx->h, - SCHEDUTIL_FREE_NOLOOKUP, + schedutil_flags, &ops, std::static_pointer_cast (ctx).get ()))) { flux_log_error (ctx->h, "%s: schedutil_create", __FUNCTION__); diff --git a/qmanager/modules/qmanager_callbacks.cpp b/qmanager/modules/qmanager_callbacks.cpp index c344aa0fa..4a5ffe5d2 100644 --- a/qmanager/modules/qmanager_callbacks.cpp +++ b/qmanager/modules/qmanager_callbacks.cpp @@ -150,15 +150,21 @@ int qmanager_cb_t::jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg, const unsigned int prio; uint32_t uid; double ts; + const char *free_ranks = NULL; json_t *jobspec = NULL; flux_future_t *f = NULL; + json_t *R_jsontmp = NULL; + json_t *free_ranks_j = NULL; + json_t *sched = NULL; + json_error_t json_err; + const char *R_final = NULL; /* Don't expect jobspec to be set here as it is not currently defined * in RFC 27. However, add it anyway in case the hello protocol * evolves to include it. If it is not set, it must be looked up. */ if (flux_msg_unpack (msg, - "{s:I s:i s:i s:f s?o}", + "{s:I s:i s:i s:f s?s s?o}", "id", &id, "priority", @@ -167,6 +173,8 @@ int qmanager_cb_t::jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg, const &uid, "t_submit", &ts, + "free", + &free_ranks, "jobspec", &jobspec) < 0) { @@ -199,9 +207,34 @@ int qmanager_cb_t::jobmanager_hello_cb (flux_t *h, const flux_msg_t *msg, const queue_name.c_str ()); goto out; } + // if free ranks is populated, insert the free ranks into the scheduling key + if (free_ranks) { + if (!(R_jsontmp = json_loads (R, 0, &json_err))) { + errno = ENOMEM; + flux_log (h, LOG_ERR, "%s: json_loads", __FUNCTION__); + goto out; + } + if ((sched = json_object_get (R_jsontmp, "scheduling")) == NULL) { + R_final = R; + } else { + free_ranks_j = json_string (free_ranks); + json_object_set (sched, "free_ranks", free_ranks_j); + if (!(R_final = json_dumps (R_jsontmp, JSON_COMPACT))) { + errno = ENOMEM; + flux_log (h, LOG_ERR, "%s: json_dumps", __FUNCTION__); + goto out; + } + } + } else { + R_final = R; + } queue = ctx->queues.at (queue_name); - running_job = - std::make_shared (job_state_kind_t::RUNNING, id, uid, calc_priority (prio), ts, R); + running_job = std::make_shared (job_state_kind_t::RUNNING, + id, + uid, + calc_priority (prio), + ts, + R_final); if (queue->reconstruct (static_cast (h), running_job, R_out) < 0) { flux_log_error (h, diff --git a/resource/modules/resource_match.cpp b/resource/modules/resource_match.cpp index f8554555a..c66905b90 100644 --- a/resource/modules/resource_match.cpp +++ b/resource/modules/resource_match.cpp @@ -1528,8 +1528,8 @@ static int parse_R (std::shared_ptr &ctx, int rc = 0; int version = 0; int saved_errno; - uint64_t st = 0; - uint64_t et = 0; + double tstart = 0; + double expiration = 0; json_t *o = NULL; json_t *graph = NULL; json_error_t error; @@ -1541,23 +1541,26 @@ static int parse_R (std::shared_ptr &ctx, errno = EINVAL; goto out; } - if ((rc = json_unpack (o, - "{s:i s:{s:I s:I} s?:o}", - "version", - &version, - "execution", - "starttime", - &st, - "expiration", - &et, - "scheduling", - &graph)) + if ((rc = json_unpack_ex (o, + &error, + 0, + "{s:i s:{s:F s:F} s?:o}", + "version", + &version, + "execution", + "starttime", + &tstart, + "expiration", + &expiration, + "scheduling", + &graph)) < 0) { errno = EINVAL; - flux_log (ctx->h, LOG_ERR, "%s: json_unpack", __FUNCTION__); + flux_log (ctx->h, LOG_ERR, "%s: json_unpack: %s", __FUNCTION__, error.text); goto freemem_out; } - if (version != 1 || st < 0 || et < st) { + if (version != 1 || tstart < 0 || expiration < tstart + || expiration > static_cast (std::numeric_limits::max ())) { rc = -1; errno = EPROTO; flux_log (ctx->h, @@ -1565,8 +1568,8 @@ static int parse_R (std::shared_ptr &ctx, "%s: version=%d, starttime=%jd, expiration=%jd", __FUNCTION__, version, - static_cast (st), - static_cast (et)); + static_cast (tstart), + static_cast (expiration)); goto freemem_out; } if (graph != NULL) { @@ -1585,8 +1588,8 @@ static int parse_R (std::shared_ptr &ctx, format = "rv1exec"; } - starttime = static_cast (st); - duration = et - st; + starttime = static_cast (tstart); + duration = static_cast (expiration - tstart); freemem_out: saved_errno = errno; diff --git a/resource/readers/resource_reader_jgf.cpp b/resource/readers/resource_reader_jgf.cpp index 6f8f6f3c7..2661c0b58 100644 --- a/resource/readers/resource_reader_jgf.cpp +++ b/resource/readers/resource_reader_jgf.cpp @@ -12,6 +12,7 @@ extern "C" { #if HAVE_CONFIG_H #include "config.h" #endif +#include } #include @@ -212,10 +213,16 @@ std::string diff (const resource_pool_t &r, const fetch_helper_t &f) int resource_reader_jgf_t::fetch_jgf (const std::string &str, json_t **jgf_p, json_t **nodes_p, - json_t **edges_p) + json_t **edges_p, + jgf_updater_data &update_data) { int rc = -1; + int64_t rank; json_t *graph = NULL; + json_t *free_ranks = NULL; + struct idset *r_ids = nullptr; + const char *ranks = nullptr; + std::string ranks_stripped; json_error_t json_err; if ((*jgf_p = json_loads (str.c_str (), 0, &json_err)) == NULL) { @@ -234,6 +241,30 @@ int resource_reader_jgf_t::fetch_jgf (const std::string &str, m_err_msg += ": JGF does not contain a required key (graph).\n"; goto done; } + if ((free_ranks = json_object_get (*jgf_p, "free_ranks")) != NULL) { + update_data.isect_ranks = true; + if (!(ranks = json_dumps (free_ranks, JSON_ENCODE_ANY | JSON_COMPACT))) { + errno = ENOMEM; + m_err_msg += __FUNCTION__; + m_err_msg += ": json_dumps failed.\n"; + goto done; + } + // Need to strip double quotes inserted by json_dumps above + ranks_stripped = std::string (ranks); + ranks_stripped.erase (std::remove (ranks_stripped.begin (), ranks_stripped.end (), '"'), + ranks_stripped.end ()); + if ((r_ids = idset_decode (ranks_stripped.c_str ())) == NULL) { + errno = EINVAL; + m_err_msg += __FUNCTION__; + m_err_msg += ": failed to decode ranks.\n"; + goto done; + } + rank = idset_first (r_ids); + while (rank != IDSET_INVALID_ID) { + update_data.ranks.insert (rank); + rank = idset_next (r_ids, rank); + } + } if ((*nodes_p = json_object_get (graph, "nodes")) == NULL) { errno = EINVAL; m_err_msg += __FUNCTION__; @@ -903,6 +934,13 @@ int resource_reader_jgf_t::update_vtx (resource_graph_t &g, goto done; if ((rc = check_root (v, g, root_checks)) != 0) goto done; + // Check if skipping due to previous partial free + if (update_data.isect_ranks && !update_data.ranks.empty ()) { + if (update_data.ranks.find (fetcher.rank) != update_data.ranks.end ()) { + rc = 0; + goto done; + } + } if ((rc = update_vmap (vmap, v, root_checks, fetcher)) != 0) goto done; if (update_data.update) { @@ -1025,7 +1063,8 @@ int resource_reader_jgf_t::unpack_edge (json_t *element, std::map &vmap, std::string &source, std::string &target, - std::string &subsystem) + std::string &subsystem, + jgf_updater_data &update_data) { int rc = -1; json_t *metadata = NULL; @@ -1042,11 +1081,17 @@ int resource_reader_jgf_t::unpack_edge (json_t *element, source = src; target = tgt; if (vmap.find (source) == vmap.end () || vmap.find (target) == vmap.end ()) { - errno = EINVAL; - m_err_msg += __FUNCTION__; - m_err_msg += ": source and/or target vertex not found"; - m_err_msg += source + std::string (" -> ") + target + ".\n"; - goto done; + if (update_data.isect_ranks) { + update_data.skipped = true; + rc = 0; + goto done; + } else { + errno = EINVAL; + m_err_msg += __FUNCTION__; + m_err_msg += ": source and/or target vertex not found"; + m_err_msg += source + std::string (" -> ") + target + ".\n"; + goto done; + } } if ((json_unpack (element, "{ s?{ s?s } }", "metadata", "subsystem", &subsys)) < 0) { errno = EINVAL; @@ -1077,10 +1122,11 @@ int resource_reader_jgf_t::unpack_edges (resource_graph_t &g, std::string source{}; std::string target{}; std::string subsystem{}; + jgf_updater_data update_data; for (i = 0; i < json_array_size (edges); i++) { element = json_array_get (edges, i); - if ((unpack_edge (element, vmap, source, target, subsystem)) != 0) + if ((unpack_edge (element, vmap, source, target, subsystem, update_data)) != 0) goto done; // We only add the edge when it connects at least one newly added vertex if ((added_vtcs.count (source) == 1) || (added_vtcs.count (target) == 1)) { @@ -1184,7 +1230,8 @@ int resource_reader_jgf_t::update_edges (resource_graph_t &g, resource_graph_metadata_t &m, std::map &vmap, json_t *edges, - uint64_t token) + uint64_t token, + jgf_updater_data &update_data) { edg_t e; int rc = -1; @@ -1197,8 +1244,13 @@ int resource_reader_jgf_t::update_edges (resource_graph_t &g, for (i = 0; i < json_array_size (edges); i++) { element = json_array_get (edges, i); // We only check protocol errors in JGF edges in the following... - if ((rc = unpack_edge (element, vmap, source, target, subsystem)) != 0) + update_data.skipped = false; + if ((rc = unpack_edge (element, vmap, source, target, subsystem, update_data)) != 0) goto done; + if (update_data.skipped) { + update_data.skipped = false; + continue; + } if ((rc = update_src_edge (g, m, vmap, source, token)) != 0) goto done; if ((rc = update_tgt_edge (g, m, vmap, source, target, token)) != 0) @@ -1273,6 +1325,7 @@ int resource_reader_jgf_t::unpack (resource_graph_t &g, json_t *edges = NULL; std::map vmap; std::unordered_set added_vtcs; + jgf_updater_data update_data; if (rank != -1) { errno = ENOTSUP; @@ -1280,7 +1333,7 @@ int resource_reader_jgf_t::unpack (resource_graph_t &g, m_err_msg += "rank != -1 unsupported for JGF unpack.\n"; goto done; } - if ((rc = fetch_jgf (str, &jgf, &nodes, &edges)) != 0) + if ((rc = fetch_jgf (str, &jgf, &nodes, &edges, update_data)) != 0) goto done; if ((rc = unpack_vertices (g, m, vmap, nodes, added_vtcs)) != 0) goto done; @@ -1340,13 +1393,13 @@ int resource_reader_jgf_t::update (resource_graph_t &g, update_data.reserved = rsv; update_data.update = true; - if ((rc = fetch_jgf (str, &jgf, &nodes, &edges)) != 0) + if ((rc = fetch_jgf (str, &jgf, &nodes, &edges, update_data)) != 0) goto done; if ((rc = update_vertices (g, m, vmap, nodes, update_data)) != 0) { undo_vertices (g, vmap, update_data); goto done; } - if ((rc = update_edges (g, m, vmap, edges, token)) != 0) + if ((rc = update_edges (g, m, vmap, edges, token, update_data)) != 0) goto done; done: @@ -1414,7 +1467,7 @@ int resource_reader_jgf_t::partial_cancel (resource_graph_t &g, p_cancel_data.jobid = jobid; p_cancel_data.update = false; - if ((rc = fetch_jgf (R, &jgf, &nodes, &edges)) != 0) + if ((rc = fetch_jgf (R, &jgf, &nodes, &edges, p_cancel_data)) != 0) goto done; if ((rc = update_vertices (g, m, vmap, nodes, p_cancel_data)) != 0) goto done; diff --git a/resource/readers/resource_reader_jgf.hpp b/resource/readers/resource_reader_jgf.hpp index c93f4f531..d0c27f1c5 100644 --- a/resource/readers/resource_reader_jgf.hpp +++ b/resource/readers/resource_reader_jgf.hpp @@ -34,7 +34,11 @@ struct jgf_updater_data { // track count of rank vertices to determine if rank // should be removed from by_rank map std::unordered_set ranks; - bool update = true; // Updating or partial cancel + // track vertices that are skipped because their ranks are freed + std::unordered_set skip_vertices; + bool update = true; // Updating or partial cancel + bool isect_ranks = false; // Updating with partial_ok; intersecting with ranks key + bool skipped = false; }; /*! JGF resource reader class. @@ -123,7 +127,11 @@ class resource_reader_jgf_t : public resource_reader_base_t { virtual bool is_allowlist_supported (); private: - int fetch_jgf (const std::string &str, json_t **jgf_p, json_t **nodes_p, json_t **edges_p); + int fetch_jgf (const std::string &str, + json_t **jgf_p, + json_t **nodes_p, + json_t **edges_p, + jgf_updater_data &update_data); int unpack_and_remap_vtx (fetch_helper_t &f, json_t *paths, json_t *properties); int remap_aware_unpack_vtx (fetch_helper_t &f, json_t *paths, json_t *properties); int apply_defaults (fetch_helper_t &f, const char *name); @@ -192,7 +200,8 @@ class resource_reader_jgf_t : public resource_reader_base_t { std::map &vmap, std::string &source, std::string &target, - std::string &subsystem); + std::string &subsystem, + jgf_updater_data &update_data); int update_src_edge (resource_graph_t &g, resource_graph_metadata_t &m, std::map &vmap, @@ -213,7 +222,8 @@ class resource_reader_jgf_t : public resource_reader_base_t { resource_graph_metadata_t &m, std::map &vmap, json_t *edges, - uint64_t token); + uint64_t token, + jgf_updater_data &update_data); int get_subgraph_vertices (resource_graph_t &g, vtx_t node, std::vector &node_list); int get_parent_vtx (resource_graph_t &g, vtx_t node, vtx_t &parent_node); }; diff --git a/t/t1026-rv1-partial-release.t b/t/t1026-rv1-partial-release.t index da315d633..4f02e05d3 100755 --- a/t/t1026-rv1-partial-release.t +++ b/t/t1026-rv1-partial-release.t @@ -30,24 +30,44 @@ hk_wait_for_running () { sleep 0.1 done } - -fluxion_free_cores() { +# Usage: hk_wait_for_allocated_nnodes count +hk_wait_for_allocated_nnodes () { + count=0 + while test $(flux housekeeping list -no {allocated.nnodes}) -ne $1; do + count=$(($count+1)); + test $count -eq 300 && return 1 # max 300 * 0.1s sleep = 30s + sleep 0.1 + done +} +# Usage: fluxion_free ncores|nnodes +fluxion_free () { FLUX_RESOURCE_LIST_RPC=sched.resource-status \ - flux resource list -s free -no {ncores} + flux resource list -s free -no {$1} +} +# Usage: fluxion_allocated ncores|nnodes +fluxion_allocated () { + FLUX_RESOURCE_LIST_RPC=sched.resource-status \ + flux resource list -s allocated -no {$1} } test_expect_success 'load fluxion modules' ' flux module remove -f sched-simple && - flux module load sched-fluxion-resource && - flux module load sched-fluxion-qmanager && + load_resource match-format=rv1_nosched && + load_qmanager_sync && flux resource list && FLUX_RESOURCE_LIST_RPC=sched.resource-status flux resource list ' + +# Check job manager hello debug message for +partial-ok flag +if flux dmesg | grep +partial-ok; then + test_set_prereq HAVE_PARTIAL_OK +fi + test_expect_success 'run a normal job, resources are free' ' flux run -vvv -xN4 /bin/true && - test_debug "echo free=\$(fluxion_free_cores)" && - test $(fluxion_free_cores) -eq $TOTAL_NCORES + test_debug "echo free=\$(fluxion_free ncores)" && + test $(fluxion_free ncores) -eq $TOTAL_NCORES ' test_expect_success 'run 4 single node jobs, resources are free' ' flux submit -v --cc=1-4 -xN1 --wait /bin/true && @@ -56,8 +76,8 @@ test_expect_success 'run 4 single node jobs, resources are free' ' ' test_expect_success 'run 16 single core jobs, resources are free' ' flux submit -v --cc=1-16 -n1 --wait /bin/true && - test_debug "echo free=\$(fluxion_free_cores)" && - test $(fluxion_free_cores) -eq $TOTAL_NCORES + test_debug "echo free=\$(fluxion_free ncores)" && + test $(fluxion_free ncores) -eq $TOTAL_NCORES ' test_expect_success 'clear dmesg buffer' ' flux dmesg -C @@ -65,8 +85,8 @@ test_expect_success 'clear dmesg buffer' ' test_expect_success 'run a job with unequal core distribution, resources are free' ' flux run -vvv -n7 -l flux getattr rank && test_debug "flux job info $(flux job last) R | jq" && - test_debug "echo free=\$(fluxion_free_cores)" && - test $(fluxion_free_cores) -eq $TOTAL_NCORES + test_debug "echo free=\$(fluxion_free ncores)" && + test $(fluxion_free ncores) -eq $TOTAL_NCORES ' test_expect_success 'attempt to ensure dmesg buffer synchronized' ' flux logger test-sentinel && @@ -96,16 +116,91 @@ test_expect_success 'attempt to ensure dmesg buffer synchronized' ' dmesg_wait test-sentinel ' test_expect_success 'all resources free' ' - test_debug "echo free=\$(fluxion_free_cores)" && - test $(fluxion_free_cores) -eq $TOTAL_NCORES + test_debug "echo free=\$(fluxion_free ncores)" && + test $(fluxion_free ncores) -eq $TOTAL_NCORES ' test_expect_success 'no errors from fluxion' ' flux dmesg -H >log2.out && test_must_fail grep "free RPC failed to remove all resources" log.out ' +test_expect_success HAVE_PARTIAL_OK 'reconfigure housekeeping with sticky node' ' + flux config load <<-EOF + [job-manager.housekeeping] + command = [ + "sh", + "-c", + "test \$(flux getattr rank) -eq 0 && sleep inf; exit 0" + ] + release-after = "0s" + EOF +' +test_expect_success HAVE_PARTIAL_OK 'run a job and wait for node to get stuck' ' + flux run -N4 true && + hk_wait_for_running 1 && + hk_wait_for_allocated_nnodes 1 +' +test_expect_success HAVE_PARTIAL_OK 'fluxion shows 1 node allocated' ' + test $(fluxion_allocated nnodes) -eq 1 +' +test_expect_success HAVE_PARTIAL_OK 'reload fluxion modules' ' + remove_qmanager && + reload_resource match-format=rv1_nosched && + load_qmanager_sync && + flux resource list && + FLUX_RESOURCE_LIST_RPC=sched.resource-status flux resource list +' +test_expect_success HAVE_PARTIAL_OK 'fluxion still shows 1 node allocated' ' + test $(fluxion_allocated nnodes) -eq 1 +' +test_expect_success HAVE_PARTIAL_OK 'kill housekeeping' ' + flux housekeeping kill --all +' +test_expect_success HAVE_PARTIAL_OK 'fluxion shows 0 nodes allocated' ' + hk_wait_for_running 0 && + test $(fluxion_allocated nnodes) -eq 0 +' +test_expect_success HAVE_PARTIAL_OK 'reload fluxion modules with match-format=rv1' ' + remove_qmanager && + reload_resource match-format=rv1 && + load_qmanager_sync && + flux resource list && + FLUX_RESOURCE_LIST_RPC=sched.resource-status flux resource list +' +test_expect_success HAVE_PARTIAL_OK 'run a job and wait for node to get stuck' ' + flux run -N4 true && + hk_wait_for_running 1 && + hk_wait_for_allocated_nnodes 1 +' +test_expect_success HAVE_PARTIAL_OK 'fluxion shows 1 nodes allocated' ' + test $(fluxion_allocated nnodes) -eq 1 +' +test_expect_success HAVE_PARTIAL_OK 'run a sleep job on one node' ' + flux submit --wait-event=alloc -N1 sleep 3600 +' +test_expect_success HAVE_PARTIAL_OK 'fluxion shows 2 nodes allocated' ' + test $(fluxion_allocated nnodes) -eq 2 +' +test_expect_success HAVE_PARTIAL_OK 'reload fluxion modules with match-format=rv1' ' + remove_qmanager && + reload_resource match-format=rv1 && + load_qmanager_sync && + flux resource list && + FLUX_RESOURCE_LIST_RPC=sched.resource-status flux resource list +' +test_expect_success HAVE_PARTIAL_OK 'fluxion still shows 2 nodes allocated' ' + test $(fluxion_allocated nnodes) -eq 2 +' +test_expect_success HAVE_PARTIAL_OK 'kill housekeeping' ' + flux housekeeping kill --all +' +test_expect_success HAVE_PARTIAL_OK 'fluxion shows 1 node allocated' ' + hk_wait_for_running 0 && + test $(fluxion_allocated nnodes) -eq 1 +' + test_expect_success 'unload fluxion modules' ' - flux module remove sched-fluxion-qmanager && - flux module remove sched-fluxion-resource && + remove_qmanager && + remove_resource && flux module load sched-simple ' test_done