From f363d2ded92792d7a89240c402e282b66023de65 Mon Sep 17 00:00:00 2001 From: Stephen Herbein Date: Wed, 8 May 2019 22:00:24 -0700 Subject: [PATCH 01/13] python: add copy method to message class --- src/bindings/python/flux/message.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/bindings/python/flux/message.py b/src/bindings/python/flux/message.py index 87d1b83d6cd0..f8a0c97eb5c6 100644 --- a/src/bindings/python/flux/message.py +++ b/src/bindings/python/flux/message.py @@ -118,6 +118,16 @@ def type(self, value): def type_str(self): return msg_typestr(self.type) + def copy(self, payload=True): + """Duplicate message + :param payload: Whether the payload should be included in the message copy + :type payload: boolean + :return type: Message + """ + return Message( + type_id=self.type, handle=self.pimpl.copy(payload), destruct=True + ) + # Residing here to avoid cyclic references From 95f4cdc6f8ed8e3a5a977ea40b58dd90cc12ddfa Mon Sep 17 00:00:00 2001 From: Stephen Herbein Date: Thu, 19 Dec 2019 16:18:43 -0800 Subject: [PATCH 02/13] python/util: refactor `encode_*` funcs to use `six.ensure_binary` --- src/bindings/python/flux/util.py | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/src/bindings/python/flux/util.py b/src/bindings/python/flux/util.py index 833f9c2ccc66..b03be1ef5386 100644 --- a/src/bindings/python/flux/util.py +++ b/src/bindings/python/flux/util.py @@ -53,26 +53,23 @@ def func_wrapper(calling_obj, *args, **kwargs): def encode_payload(payload): + # Convert payload to ffi.NULL or utf-8 string if payload is None or payload == ffi.NULL: - payload = ffi.NULL - elif isinstance(payload, six.text_type): - payload = payload.encode("UTF-8") - elif not isinstance(payload, six.binary_type): - payload = json.dumps(payload, ensure_ascii=False).encode("UTF-8") - return payload - + return ffi.NULL + try: + return six.ensure_binary(payload) + except TypeError: + return json.dumps(payload, ensure_ascii=False).encode("UTF-8") def encode_topic(topic): - # Convert topic to utf-8 binary string + # Convert topic to utf-8 string if topic is None or topic == ffi.NULL: raise EnvironmentError(errno.EINVAL, "Topic must not be None/NULL") - elif isinstance(topic, six.text_type): - topic = topic.encode("UTF-8") - elif not isinstance(topic, six.binary_type): - errmsg = "Topic must be a string, not {}".format(type(topic)) - raise TypeError(errno.EINVAL, errmsg) - return topic - + try: + return six.ensure_binary(topic) + except TypeError: + errmsg = "Topic must be a string, not {}".format(topic, type(topic)) + raise EnvironmentError(errno.EINVAL, errmsg) class CLIMain(object): def __init__(self, logger=None): From f21d37c0d093d5c36db44bd5e049b2ec216a1d3b Mon Sep 17 00:00:00 2001 From: Stephen Herbein Date: Tue, 9 Jul 2019 14:38:35 -0700 Subject: [PATCH 03/13] python: add modfind util function --- src/bindings/python/flux/util.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/src/bindings/python/flux/util.py b/src/bindings/python/flux/util.py index b03be1ef5386..9ba0fe456776 100644 --- a/src/bindings/python/flux/util.py +++ b/src/bindings/python/flux/util.py @@ -8,6 +8,7 @@ # SPDX-License-Identifier: LGPL-3.0 ############################################################### +import os import re import sys import errno @@ -27,6 +28,7 @@ "encode_topic", "CLIMain", "parse_fsd", + "modfind", ] @@ -71,6 +73,25 @@ def encode_topic(topic): errmsg = "Topic must be a string, not {}".format(topic, type(topic)) raise EnvironmentError(errno.EINVAL, errmsg) +def modfind(modname): + """Search FLUX_MODULE_PATH for a shared library (.so) of a given name + + :param modname: name of the module to search for + :type modname: str, bytes, unicode + :returns: path of the first matching shared library in FLUX_MODULE_PATH + """ + searchpath = os.getenv("FLUX_MODULE_PATH") + if searchpath is None: + raise ValueError("FLUX_MODULE_PATH not set") + modname = six.ensure_binary(modname) + ret = raw.modfind(searchpath, modname, ffi.NULL, ffi.NULL) + if ret is None: + raise EnvironmentError( + errno.ENOENT, "{} not found in module search path".format(modname) + ) + return ret + + class CLIMain(object): def __init__(self, logger=None): if logger is None: From 57af899152ed2d00221c5ce64b2ca3a4dc475b2b Mon Sep 17 00:00:00 2001 From: Stephen Herbein Date: Sat, 24 Aug 2019 12:25:56 +0100 Subject: [PATCH 04/13] python: add kvs.get_key_raw for getting job eventlogs --- src/bindings/python/flux/kvs.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/bindings/python/flux/kvs.py b/src/bindings/python/flux/kvs.py index 0f1f33d3fbbb..4fcff763fa87 100644 --- a/src/bindings/python/flux/kvs.py +++ b/src/bindings/python/flux/kvs.py @@ -36,17 +36,18 @@ class KVSWrapper(Wrapper): RAW.flux_kvsitr_next.set_error_check(lambda x: False) -def get_key_direct(flux_handle, key): +def get_key_raw(flux_handle, key): valp = ffi.new("char *[1]") future = RAW.flux_kvs_lookup(flux_handle, None, 0, key) RAW.flux_kvs_lookup_get(future, valp) if valp[0] == ffi.NULL: return None - - ret = json.loads(ffi.string(valp[0]).decode("utf-8")) + ret = ffi.string(valp[0]).decode("utf-8") RAW.flux_future_destroy(future) return ret +def get_key_direct(flux_handle, key): + return json.loads(get_key_raw(flux_handle, key)) def exists(flux_handle, key): try: From 9f34d14cffd72c3ef9e40bbdd6faf19345285649 Mon Sep 17 00:00:00 2001 From: Stephen Herbein Date: Sun, 25 Aug 2019 16:19:35 -0700 Subject: [PATCH 05/13] python: include fluid header in bindings --- src/bindings/python/_flux/Makefile.am | 2 ++ src/bindings/python/flux/constants.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/bindings/python/_flux/Makefile.am b/src/bindings/python/_flux/Makefile.am index 288ebbc43eaa..f73dd5c978bf 100644 --- a/src/bindings/python/_flux/Makefile.am +++ b/src/bindings/python/_flux/Makefile.am @@ -30,7 +30,9 @@ _core_build.py: $(MAKE_BINDING) --modname _core \ --add_sub '.*va_list.*|||' \ --additional_headers src/bindings/python/_flux/callbacks.h \ + src/common/libutil/fluid.h \ --ignore_header 'macros' \ + --extra_source '#include ' \ src/include/flux/core.h BUILT_SOURCES= _core.c diff --git a/src/bindings/python/flux/constants.py b/src/bindings/python/flux/constants.py index 283d910f64fa..911e4f9e4223 100644 --- a/src/bindings/python/flux/constants.py +++ b/src/bindings/python/flux/constants.py @@ -17,7 +17,7 @@ MOD = sys.modules[__name__] # Inject enum/define names matching ^FLUX_[A-Z_]+$ into module ALL_LIST = [] -PATTERN = re.compile("^FLUX_[A-Z_]+") +PATTERN = re.compile("^(FLUX|FLUID)_[A-Z_]+") for k in dir(lib): if PATTERN.match(k): setattr(MOD, k, getattr(lib, k)) From 787e14778c9a404c8c27fc6f5f486d28c9d9fc9c Mon Sep 17 00:00:00 2001 From: Stephen Herbein Date: Sun, 25 Aug 2019 19:05:49 +0100 Subject: [PATCH 06/13] python: add convert_id to job module converts flux ids from/to hex, dec, and kvs --- src/bindings/python/flux/job.py | 58 +++++++++++++++++++++++++++++++++ t/python/t0010-job.py | 47 ++++++++++++++++++++++++++ 2 files changed, 105 insertions(+) diff --git a/src/bindings/python/flux/job.py b/src/bindings/python/flux/job.py index ee73a0152229..643ce56694f2 100644 --- a/src/bindings/python/flux/job.py +++ b/src/bindings/python/flux/job.py @@ -19,6 +19,7 @@ import six import yaml +import flux.constants from flux.wrapper import Wrapper from flux.util import check_future_error, parse_fsd from flux.future import Future @@ -561,3 +562,60 @@ def from_command( tasks = [{"command": command, "slot": "task", "count": task_count_dict}] attributes = {"system": {"duration": 0}} return cls(resources, tasks, attributes=attributes) + +def convert_id(jobid, src="dec", dst="dec"): + valid_id_types = six.string_types + six.integer_types + if not any((isinstance(jobid, id_type) for id_type in valid_id_types)): + raise TypeError("Jobid must be an integer or string, not {}".format(type(jobid))) + + valid_formats = ("dec", "hex", "kvs", "words") + if src not in valid_formats: + raise EnvironmentError(errno.EINVAL, "src must be one of {}", valid_formats) + if dst not in valid_formats: + raise EnvironmentError(errno.EINVAL, "dst must be one of {}", valid_formats) + + if isinstance(jobid, six.text_type): + jobid = jobid.encode('utf-8') + + if src == dst: + return jobid + + dec_jobid = ffi.new('uint64_t [1]') # uint64_t* + if src == "dec": + dec_jobid = jobid + elif src == "hex": + if (lib.fluid_decode (jobid, dec_jobid, flux.constants.FLUID_STRING_DOTHEX) < 0): + raise EnvironmentError(errno.EINVAL, "malformed jobid: {}".format(src)); + dec_jobid = dec_jobid[0] + elif src == "kvs": + if jobid[0:4] != 'job.': + raise EnvironmentError(errno.EINVAL, "missing 'job.' prefix") + if (lib.fluid_decode (jobid[4:], dec_jobid, flux.constants.FLUID_STRING_DOTHEX) < 0): + raise EnvironmentError(errno.EINVAL, "malformed jobid: {}".format(src)); + dec_jobid = dec_jobid[0] + elif src == "words": + if (lib.fluid_decode (jobid, dec_jobid, flux.constants.FLUID_STRING_MNEMONIC) < 0): + raise EnvironmentError(errno.EINVAL, "malformed jobid: {}".format(src)); + dec_jobid = dec_jobid[0] + + + buf_size = 64 + buf = ffi.new('char []', buf_size) + def encode(id_format): + pass + + if dst == 'dec': + return dec_jobid + elif dst == 'kvs': + key_len = RAW.flux_job_kvs_key(buf, buf_size, dec_jobid, ffi.NULL) + if key_len < 0: + raise RuntimeError("error enconding id") + return ffi.string(buf, key_len).decode('utf-8') + elif dst == 'hex': + if (lib.fluid_encode (buf, buf_size, dec_jobid, flux.constants.FLUID_STRING_DOTHEX) < 0): + raise RuntimeError("error enconding id") + return ffi.string(buf).decode('utf-8') + elif dst == 'words': + if (lib.fluid_encode (buf, buf_size, dec_jobid, flux.constants.FLUID_STRING_MNEMONIC) < 0): + raise RuntimeError("error enconding id") + return ffi.string(buf).decode('utf-8') diff --git a/t/python/t0010-job.py b/t/python/t0010-job.py index 2e263be600cd..94697e4f324e 100755 --- a/t/python/t0010-job.py +++ b/t/python/t0010-job.py @@ -16,6 +16,7 @@ import json import unittest import datetime +import itertools from glob import glob import yaml @@ -137,6 +138,52 @@ def test_11_environment(self): jobspec.environment = new_env self.assertEqual(jobspec.environment, new_env) + def test_12_convert_id(self): + variants = { + "dec": 74859937792, + "hex": "0000.0011.6e00.0000", + "words": "algebra-arizona-susan--album-academy-academy", + } + variants["kvs"] = 'job.{}'.format(variants['hex']) + + for (src_type, src_value), (dest_type, dest_value) in \ + itertools.product(six.iteritems(variants), repeat=2): + + converted_value = job.convert_id(src_value, src_type, dest_type) + self.assertEqual( + converted_value, + dest_value, + msg="Failed to convert id of type {} into an id of type {} ({} != {})".format( + src_type, + dest_type, + converted_value, + dest_value, + ) + ) + + def test_13_convert_id_unicode(self): + converted_value = job.convert_id( + u"algebra-arizona-susan--album-academy-academy", + "words", + "hex") + self.assertEqual(converted_value, u"0000.0011.6e00.0000") + self.assertEqual(converted_value, b"0000.0011.6e00.0000") + + def test_05_convert_id_errors(self): + with self.assertRaises(TypeError) as error: + job.convert_id(5.0) + + with self.assertRaises(EnvironmentError) as error: + job.convert_id(74859937792, src="foo") + self.assertEqual(error.exception.errno, errno.EINVAL) + + with self.assertRaises(EnvironmentError) as error: + job.convert_id(74859937792, dst="foo") + self.assertEqual(error.exception.errno, errno.EINVAL) + + with self.assertRaises(EnvironmentError) as error: + job.convert_id("foo.bar", src="kvs") + self.assertEqual(error.exception.errno, errno.EINVAL) if __name__ == "__main__": from subflux import rerun_under_flux From 2c39cdebba66f2107e2c9ef6799653071356f82a Mon Sep 17 00:00:00 2001 From: Stephen Herbein Date: Thu, 25 Apr 2019 21:20:38 -0700 Subject: [PATCH 07/13] job-manager: add debug logging to hello exchange --- src/modules/job-manager/start.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/modules/job-manager/start.c b/src/modules/job-manager/start.c index ad311c3bde8f..85a60ca81f53 100644 --- a/src/modules/job-manager/start.c +++ b/src/modules/job-manager/start.c @@ -106,6 +106,7 @@ static void hello_cb (flux_t *h, flux_msg_handler_t *mh, if (flux_request_unpack (msg, NULL, "{s:s}", "service", &service_name) < 0) goto error; + flux_log (h, LOG_DEBUG, "%s: %s said hello", __FUNCTION__, service_name); /* If existing exec service is loaded, ensure it is idle before * allowing new exec service to override. */ @@ -125,6 +126,8 @@ static void hello_cb (flux_t *h, flux_msg_handler_t *mh, goto error; if (flux_respond (h, msg, NULL) < 0) flux_log_error (h, "%s: flux_respond", __FUNCTION__); + flux_log (h, LOG_DEBUG, "%s: registering %s as the start topic", + __FUNCTION__, start->topic); /* Response has been sent, now take action on jobs in run state. */ job = zhashx_first (ctx->active_jobs); From fbb29bb337135da9be69cf2414a60d56cd54a50f Mon Sep 17 00:00:00 2001 From: Stephen Herbein Date: Wed, 31 Jul 2019 20:05:25 -0700 Subject: [PATCH 08/13] schedutil: add optional idle and busy callbacks add optional callbacks to notify schedutil users when there are no longer any outstanding futures/messages in the schedutil context (i.e., idle) and when the schedutil context goes from idle to busy (i.e., now has an outstanding future/message) useful for simulations where the scheduler needs to accurately respond to a `quiescent` request from the job-manager --- src/common/libschedutil/init.c | 18 ++++++++++++++++++ src/common/libschedutil/init.h | 2 ++ src/common/libschedutil/ops.h | 17 +++++++++++++++++ src/common/libschedutil/schedutil_private.h | 2 ++ src/modules/sched-simple/sched.c | 8 +++++++- t/job-manager/sched-dummy.c | 2 ++ 6 files changed, 48 insertions(+), 1 deletion(-) diff --git a/src/common/libschedutil/init.c b/src/common/libschedutil/init.c index d305689fb701..43762c54fe1a 100644 --- a/src/common/libschedutil/init.c +++ b/src/common/libschedutil/init.c @@ -32,6 +32,8 @@ schedutil_t *schedutil_create (flux_t *h, schedutil_alloc_cb_f *alloc_cb, schedutil_free_cb_f *free_cb, schedutil_exception_cb_f *exception_cb, + schedutil_idle_f *idle_cb, + schedutil_busy_f *busy_cb, void *cb_arg) { schedutil_t *util; @@ -47,6 +49,8 @@ schedutil_t *schedutil_create (flux_t *h, util->alloc_cb = alloc_cb; util->free_cb = free_cb; util->exception_cb = exception_cb; + util->idle_cb = idle_cb; + util->busy_cb = busy_cb; util->cb_arg = cb_arg; if ((util->outstanding_futures = zlistx_new ()) == NULL) goto error; @@ -110,6 +114,13 @@ int schedutil_add_outstanding_future (schedutil_t *util, flux_future_t *fut) { if (zlistx_add_end (util->outstanding_futures, fut) == NULL) return -1; + + // If this is the first future, we have gone from idle to busy, so call the + // corresponding busy cb (if it is set) + if (zlistx_size (util->outstanding_futures) == 1 && util->busy_cb) { + flux_log (util->h, LOG_DEBUG, "schedutil: running idle_cb"); + util->busy_cb (util->h, util->cb_arg); + } return 0; } @@ -119,5 +130,12 @@ int schedutil_remove_outstanding_future (schedutil_t *util, flux_future_t *fut) return -1; if (zlistx_detach_cur (util->outstanding_futures) == NULL) return -1; + + // If this is the last future, we have gone from busy to idle, so call the + // corresponding idle cb (if it is set) + if (zlistx_size (util->outstanding_futures) == 0 && util->idle_cb) { + flux_log (util->h, LOG_DEBUG, "schedutil: running idle_cb"); + util->idle_cb (util->h, util->cb_arg); + } return 0; } diff --git a/src/common/libschedutil/init.h b/src/common/libschedutil/init.h index e2be69a57646..6ac83fa8cf91 100644 --- a/src/common/libschedutil/init.h +++ b/src/common/libschedutil/init.h @@ -27,6 +27,8 @@ schedutil_t *schedutil_create (flux_t *h, schedutil_alloc_cb_f *alloc_cb, schedutil_free_cb_f *free_cb, schedutil_exception_cb_f *exception_cb, + schedutil_idle_f *idle_cb, + schedutil_busy_f *busy_cb, void *cb_arg); /* Destory the handle for the schedutil conveinence library. diff --git a/src/common/libschedutil/ops.h b/src/common/libschedutil/ops.h index 91c7278b8e68..7a2268ea0cf8 100644 --- a/src/common/libschedutil/ops.h +++ b/src/common/libschedutil/ops.h @@ -47,6 +47,23 @@ typedef void (schedutil_exception_cb_f)(flux_t *h, int severity, void *arg); +/* Callback for when the schedutil library becomes idle. + * More specifically, idle means that all oustanding requests or futures within + * the schedutil library have been responded to and fulfilled, respectively. + * Useful for determining when to properly reply to any `quiescent` requests + * sent during a simulation. + */ +typedef void (schedutil_idle_f)(flux_t *h, + void *arg); + +/* Callback for when the schedutil library becomes busy. + * More specifically, bust means that an oustanding request or future now exists + * within the schedutil library. Useful for determining when to delay replying + * to any `quiescent` requests sent during a simulation. + */ +typedef void (schedutil_busy_f)(flux_t *h, + void *arg); + #endif /* !_FLUX_SCHEDUTIL_OPS_H */ /* diff --git a/src/common/libschedutil/schedutil_private.h b/src/common/libschedutil/schedutil_private.h index ec234b19968b..2c9fc304d826 100644 --- a/src/common/libschedutil/schedutil_private.h +++ b/src/common/libschedutil/schedutil_private.h @@ -19,6 +19,8 @@ struct schedutil_ctx { schedutil_alloc_cb_f *alloc_cb; schedutil_free_cb_f *free_cb; schedutil_exception_cb_f *exception_cb; + schedutil_idle_f *idle_cb; + schedutil_busy_f *busy_cb; void *cb_arg; zlistx_t *outstanding_futures; }; diff --git a/src/modules/sched-simple/sched.c b/src/modules/sched-simple/sched.c index 7627ecd228b3..0fdeefbfefce 100644 --- a/src/modules/sched-simple/sched.c +++ b/src/modules/sched-simple/sched.c @@ -355,7 +355,13 @@ int mod_main (flux_t *h, int argc, char **argv) if (process_args (h, ss, argc, argv) < 0) return -1; - ss->util_ctx = schedutil_create (h, alloc_cb, free_cb, exception_cb, ss); + ss->util_ctx = schedutil_create (h, + alloc_cb, + free_cb, + exception_cb, + NULL, + NULL, + ss); if (ss->util_ctx == NULL) { flux_log_error (h, "schedutil_create"); goto done; diff --git a/t/job-manager/sched-dummy.c b/t/job-manager/sched-dummy.c index c3560580df5d..612b0cdf9eeb 100644 --- a/t/job-manager/sched-dummy.c +++ b/t/job-manager/sched-dummy.c @@ -255,6 +255,8 @@ struct sched_ctx *sched_create (flux_t *h, int argc, char **argv) alloc_cb, free_cb, exception_cb, + NULL, + NULL, sc); if (sc->schedutil_ctx == NULL) { flux_log_error (h, "schedutil_create"); From 2de5991c8cac5d38f704b80b8aee2d43b259f4fb Mon Sep 17 00:00:00 2001 From: Stephen Herbein Date: Thu, 25 Apr 2019 17:44:59 -0700 Subject: [PATCH 09/13] job-manager,sched: add `.quiescent` services The simulator can now send a `job-manager.quiescent` request, which will only be responded to when the entire system has quiesced (i.e., in the absence of new events/requests, the system will make no further changes - such as allocating or freeing jobs). For the simple scheduler, this simply means that the schedutil library is idle. The job-manager then sends its own `quiescent` request to the scheduler along with every alloc request. It will only respond to the simulator's request after its own request to the scheduler is responded to. In the future, this protocol will be expanded to include the exec and depend modules. --- src/modules/job-manager/Makefile.am | 5 +- src/modules/job-manager/alloc.c | 2 + src/modules/job-manager/job-manager.c | 6 ++ src/modules/job-manager/job-manager.h | 3 + src/modules/job-manager/simulator.c | 130 ++++++++++++++++++++++++++ src/modules/job-manager/simulator.h | 33 +++++++ src/modules/sched-simple/sched.c | 62 +++++++++++- 7 files changed, 238 insertions(+), 3 deletions(-) create mode 100644 src/modules/job-manager/simulator.c create mode 100644 src/modules/job-manager/simulator.h diff --git a/src/modules/job-manager/Makefile.am b/src/modules/job-manager/Makefile.am index 74fc264e132b..defd8c9b4d0b 100644 --- a/src/modules/job-manager/Makefile.am +++ b/src/modules/job-manager/Makefile.am @@ -39,7 +39,9 @@ job_manager_la_SOURCES = \ list.h \ list.c \ priority.h \ - priority.c + priority.c \ + simulator.h \ + simulator.c job_manager_la_LDFLAGS = $(fluxmod_ldflags) -module job_manager_la_LIBADD = $(fluxmod_libadd) \ @@ -60,6 +62,7 @@ test_ldadd = \ $(top_builddir)/src/modules/job-manager/event.o \ $(top_builddir)/src/modules/job-manager/job.o \ $(top_builddir)/src/modules/job-manager/alloc.o \ + $(top_builddir)/src/modules/job-manager/simulator.o \ $(top_builddir)/src/modules/job-manager/start.o \ $(top_builddir)/src/modules/job-manager/drain.o \ $(top_builddir)/src/modules/job-manager/submit.o \ diff --git a/src/modules/job-manager/alloc.c b/src/modules/job-manager/alloc.c index 8254c72ee7b0..718db1277e1a 100644 --- a/src/modules/job-manager/alloc.c +++ b/src/modules/job-manager/alloc.c @@ -87,6 +87,7 @@ #include "job.h" #include "alloc.h" #include "event.h" +#include "simulator.h" typedef enum { SCHED_SINGLE, // only allow one outstanding sched.alloc request @@ -295,6 +296,7 @@ int alloc_request (struct alloc *alloc, struct job *job) if (flux_send (alloc->ctx->h, msg, 0) < 0) goto error; flux_msg_destroy (msg); + sim_sending_sched_request (alloc->ctx->simulator); return 0; error: flux_msg_destroy (msg); diff --git a/src/modules/job-manager/job-manager.c b/src/modules/job-manager/job-manager.c index f7932ad73e67..66772625afd5 100644 --- a/src/modules/job-manager/job-manager.c +++ b/src/modules/job-manager/job-manager.c @@ -27,6 +27,7 @@ #include "event.h" #include "drain.h" #include "wait.h" +#include "simulator.h" #include "job-manager.h" @@ -97,6 +98,10 @@ int mod_main (flux_t *h, int argc, char **argv) flux_log_error (h, "error creating wait interface"); goto done; } + if (!(ctx.simulator = sim_ctx_create (&ctx))) { + flux_log_error (h, "error creating simulator context"); + goto done; + } if (flux_msg_handler_addvec (h, htab, &ctx, &ctx.handlers) < 0) { flux_log_error (h, "flux_msghandler_add"); goto done; @@ -112,6 +117,7 @@ int mod_main (flux_t *h, int argc, char **argv) rc = 0; done: flux_msg_handler_delvec (ctx.handlers); + sim_ctx_destroy (ctx.simulator); wait_ctx_destroy (ctx.wait); drain_ctx_destroy (ctx.drain); start_ctx_destroy (ctx.start); diff --git a/src/modules/job-manager/job-manager.h b/src/modules/job-manager/job-manager.h index 83033224e59e..9c6cb0749286 100644 --- a/src/modules/job-manager/job-manager.h +++ b/src/modules/job-manager/job-manager.h @@ -11,6 +11,8 @@ #ifndef _FLUX_JOB_MANAGER_H #define _FLUX_JOB_MANAGER_H +#include + struct job_manager { flux_t *h; flux_msg_handler_t **handlers; @@ -21,6 +23,7 @@ struct job_manager { struct submit *submit; struct drain *drain; struct waitjob *wait; + struct simulator *simulator; }; #endif /* !_FLUX_JOB_MANAGER_H */ diff --git a/src/modules/job-manager/simulator.c b/src/modules/job-manager/simulator.c new file mode 100644 index 000000000000..747295ec2fef --- /dev/null +++ b/src/modules/job-manager/simulator.c @@ -0,0 +1,130 @@ +/************************************************************\ + * Copyright 2019 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +/* simulator.c - sim interface + * + * This interface is primarily built so that the simulator can determine when + * the system has become quiescent (assuming no further events from external + * sources). Before responding to a quiescent request, the job-manager will + * ensure that all of the relevant modules (e.g., sched, exec, and depend) are + * also quiescent. + */ +#include + +#include "simulator.h" + +struct simulator { + struct job_manager* ctx; + flux_msg_handler_t **handlers; + flux_msg_t *sim_req; + flux_future_t *sched_req; +}; + +void sim_ctx_destroy (struct simulator *ctx) +{ + if (ctx) { + int saved_errno = errno;; + flux_msg_handler_delvec (ctx->handlers); + if (ctx->sim_req) + flux_msg_destroy(ctx->sim_req); + free (ctx); + errno = saved_errno; + } +} + +static void sched_quiescent_continuation(flux_future_t *f, void *arg) +{ + struct job_manager *ctx = arg; + struct simulator *simulator = ctx->simulator; + + if (simulator->sim_req == NULL) { + flux_log_error (ctx->h, "%s: sim quiescent request is NULL", __FUNCTION__); + return; + } + if (simulator->sched_req != f) { + flux_log_error (ctx->h, "%s: stored future does not match continuation future", __FUNCTION__); + return; + } + + const char *sched_payload = NULL; + flux_rpc_get (f, &sched_payload); + + const char *sim_payload = NULL; + flux_msg_get_string (simulator->sim_req, &sim_payload); + flux_log (ctx->h, LOG_DEBUG, "receive quiescent from sched (%s), replying to sim with (%s)", sched_payload, sim_payload); + flux_respond (ctx->h, simulator->sim_req, sim_payload); + flux_future_destroy (f); + flux_msg_destroy (simulator->sim_req); + simulator->sched_req = NULL; + simulator->sim_req = NULL; +} + +void sim_sending_sched_request (struct simulator *simulator) +{ + struct job_manager *ctx = simulator->ctx; + if (simulator->sim_req == NULL) { + // either not in a simulation, or if we are, the simulator does not yet + // care about tracking quiescence + return; + } + if (simulator->sched_req != NULL) { + // we are sending the scheduler more work/events before hearing back + // from the previous quiescent request, destroy the future from that + // previous request before sending a new request + flux_future_destroy (simulator->sched_req); + } + + flux_log (ctx->h, LOG_DEBUG, "sending quiescent req to scheduler"); + simulator->sched_req = flux_rpc (ctx->h, "sched.quiescent", NULL, 0, 0); + if (simulator->sched_req == NULL) + flux_respond_error(ctx->h, simulator->sim_req, errno, "job-manager: sim_sending_sched_request: flux_rpc failed"); + if (flux_future_then (simulator->sched_req, -1, sched_quiescent_continuation, ctx) < 0) + flux_respond_error(ctx->h, simulator->sim_req, errno, "job-manager: sim_sending_sched_request: flux_future_then failed"); +} + + +/* Handle a job-manager.quiescent request. We'll first copy the request into + * ctx for later response, and then kick off the process of verifying that all + * relevant modules are quiesced. + */ +static void quiescent_cb (flux_t *h, flux_msg_handler_t *mh, + const flux_msg_t *msg, void *arg) +{ + struct job_manager *ctx = arg; + struct simulator *simulator = ctx->simulator; + flux_log (ctx->h, LOG_DEBUG, "received quiescent request"); + simulator->sim_req = flux_msg_copy (msg, true); + if (simulator->sim_req == NULL) + flux_respond_error(h, msg, errno, "job-manager: quiescent_cb: flux_msg_copy failed"); + + // Check if the scheduler is quiesced + sim_sending_sched_request(simulator); +} + +static const struct flux_msg_handler_spec htab[] = { + { FLUX_MSGTYPE_REQUEST, "job-manager.quiescent", quiescent_cb, 0}, + FLUX_MSGHANDLER_TABLE_END, +}; + +struct simulator *sim_ctx_create (struct job_manager *ctx) +{ + struct simulator *simulator; + + if (!(simulator = calloc (1, sizeof (*simulator)))) + return NULL; + simulator->ctx = ctx; + simulator->sim_req = NULL; + if (flux_msg_handler_addvec (ctx->h, htab, ctx, &simulator->handlers) < 0) + goto error; + return simulator; +error: + sim_ctx_destroy (simulator); + return NULL; +} diff --git a/src/modules/job-manager/simulator.h b/src/modules/job-manager/simulator.h new file mode 100644 index 000000000000..06bdd9a77cb1 --- /dev/null +++ b/src/modules/job-manager/simulator.h @@ -0,0 +1,33 @@ +/************************************************************\ + * Copyright 2019 Lawrence Livermore National Security, LLC + * (c.f. AUTHORS, NOTICE.LLNS, COPYING) + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * SPDX-License-Identifier: LGPL-3.0 +\************************************************************/ + +#ifndef _FLUX_JOB_MANAGER_SIMULATOR_H +#define _FLUX_JOB_MANAGER_SIMULATOR_H + +#include + +#include "job-manager.h" + +struct simulator; + +void sim_ctx_destroy (struct simulator *ctx); +struct simulator *sim_ctx_create (struct job_manager *ctx); + +/* Call when sending a new RPC/work to the scheduler. + * Triggers a new 'quiescent' RPC to the scheduler and destroys any + * outstanding 'quiescent' requests. + */ +void sim_sending_sched_request (struct simulator *ctx); + +#endif /* ! _FLUX_JOB_MANAGER_SIMULATOR_H */ + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ diff --git a/src/modules/sched-simple/sched.c b/src/modules/sched-simple/sched.c index 0fdeefbfefce..b053eea50db5 100644 --- a/src/modules/sched-simple/sched.c +++ b/src/modules/sched-simple/sched.c @@ -32,6 +32,8 @@ struct simple_sched { struct rlist *rlist; /* list of resources */ struct jobreq *job; /* currently processed job */ schedutil_t *util_ctx; + bool idle; + flux_msg_t *quiescent_req; }; static void jobreq_destroy (struct jobreq *job) @@ -66,6 +68,7 @@ jobreq_create (const flux_msg_t *msg, const char *jobspec) static void simple_sched_destroy (flux_t *h, struct simple_sched *ss) { + flux_msg_destroy (ss->quiescent_req); schedutil_destroy (ss->util_ctx); if (ss->job) { flux_respond_error (h, ss->job->msg, ENOSYS, "simple sched exiting"); @@ -81,6 +84,7 @@ static struct simple_sched * simple_sched_create (void) struct simple_sched *ss = calloc (1, sizeof (*ss)); if (ss == NULL) return NULL; + ss->idle = true; return ss; } @@ -267,6 +271,59 @@ static void status_cb (flux_t *h, flux_msg_handler_t *mh, flux_log_error (h, "flux_respond_error"); } +static inline int respond_to_quiescent (flux_t *h, const flux_msg_t *msg) +{ + const char *payload = NULL; + flux_msg_get_string (msg, &payload); + int rc = flux_respond (h, msg, payload); + flux_log (h, LOG_DEBUG, "responding to quiescent request"); + return rc; +} + +static void idle_cb (flux_t *h, void *arg) +{ + struct simple_sched *ss = arg; + + ss->idle = true; + if (ss->quiescent_req) { + if (respond_to_quiescent (h, ss->quiescent_req) < 0) + flux_log (h, LOG_ERR, + "idle_cb: error responding to quiescent request"); + flux_msg_destroy (ss->quiescent_req); + ss->quiescent_req = NULL; + } +} + +static void busy_cb (flux_t *h, void *arg) +{ + struct simple_sched *ss = arg; + ss->idle = false; +} + +static void quiescent_cb (flux_t *h, flux_msg_handler_t *mh, + const flux_msg_t *msg, void *arg) +{ + struct simple_sched *ss = arg; + + // If the schedutil is idle, with no outstanding futures/messages, then + // respond immediately since this scheduler has no outstanding + // futures/messages itself. Otherwise, delay responding until the `idle_cb` + // is called. + if (ss->idle) { + flux_log (h, LOG_DEBUG, + "quiescent_cb: immediately responding to quiescent request " + "since schedutil is idle"); + if (respond_to_quiescent (h, msg) < 0) + flux_log (h, LOG_ERR, + "quiescent_cb: error responding to quiescent request"); + } else { + flux_log (h, LOG_DEBUG, + "quiescent_cb: delaying response to quiescent request " + "until schedutil is idle"); + ss->quiescent_req = flux_msg_copy (msg, true); + } +} + static int simple_sched_init (flux_t *h, struct simple_sched *ss) { int rc = -1; @@ -336,6 +393,7 @@ static int process_args (flux_t *h, struct simple_sched *ss, } static const struct flux_msg_handler_spec htab[] = { + { FLUX_MSGTYPE_REQUEST, "sched.quiescent", quiescent_cb, 0 }, { FLUX_MSGTYPE_REQUEST, "sched-simple.status", status_cb, FLUX_ROLE_USER }, FLUX_MSGHANDLER_TABLE_END, }; @@ -359,8 +417,8 @@ int mod_main (flux_t *h, int argc, char **argv) alloc_cb, free_cb, exception_cb, - NULL, - NULL, + idle_cb, + busy_cb, ss); if (ss->util_ctx == NULL) { flux_log_error (h, "schedutil_create"); From 67d0de0e27d6c14aa36658eba09dc71fecb0aa3d Mon Sep 17 00:00:00 2001 From: Stephen Herbein Date: Sat, 17 Aug 2019 21:28:39 -0700 Subject: [PATCH 10/13] schedutil: ensure work is completed before idle_cb is run --- src/common/libschedutil/alloc.c | 2 +- src/common/libschedutil/ops.c | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/common/libschedutil/alloc.c b/src/common/libschedutil/alloc.c index 65fde2ad821a..733339bf2e01 100644 --- a/src/common/libschedutil/alloc.c +++ b/src/common/libschedutil/alloc.c @@ -118,11 +118,11 @@ static void alloc_continuation (flux_future_t *f, void *arg) flux_log_error (h, "commit R"); goto error; } - schedutil_remove_outstanding_future (util, f); if (schedutil_alloc_respond (h, ctx->msg, 0, ctx->note) < 0) { flux_log_error (h, "alloc response"); goto error; } + schedutil_remove_outstanding_future (util, f); flux_future_destroy (f); return; error: diff --git a/src/common/libschedutil/ops.c b/src/common/libschedutil/ops.c index d574bc584d07..56cfd9d74c36 100644 --- a/src/common/libschedutil/ops.c +++ b/src/common/libschedutil/ops.c @@ -36,9 +36,9 @@ static void alloc_continuation (flux_future_t *f, void *arg) flux_log_error (h, "sched.alloc lookup R"); goto error; } + util->alloc_cb (h, msg, jobspec, util->cb_arg); if (schedutil_remove_outstanding_future (util, f) < 0) flux_log_error (h, "sched.alloc unable to remove outstanding future"); - util->alloc_cb (h, msg, jobspec, util->cb_arg); flux_future_destroy (f); return; error: @@ -105,9 +105,9 @@ static void free_continuation (flux_future_t *f, void *arg) flux_log_error (h, "sched.free lookup R"); goto error; } + util->free_cb (h, msg, R, util->cb_arg); if (schedutil_remove_outstanding_future (util, f) < 0) flux_log_error (h, "sched.free unable to remove outstanding future"); - util->free_cb (h, msg, R, util->cb_arg); flux_future_destroy (f); return; error: From 3940ed0db14230b9440ea4c7998a8cf2d8b348ca Mon Sep 17 00:00:00 2001 From: Stephen Herbein Date: Sat, 17 Aug 2019 22:03:05 -0700 Subject: [PATCH 11/13] job-manager: track outstanding alloc/start requests for quiescense after receiving an alloc response from the scheduler, the job-manager emits an event, which triggers a `start` request to be sent to the exec system. The re-entrance into the reactor loop between the reception of the alloc response and sending the start request means that the job-manager has a chance to "pre-maturely" process the quiescent response from the scheduler. This ultimately leads to the simulator receiving an erroneous 'quiescent' response from the job-manager. A similar problem exists for outstanding start requests. To solve these problems, ensure that every alloc response has a corresponding start response before sending a quiescent request. Track the number of outstanding requests in the simulator context of the job manager, which is also the piece responsible for responding to the quiescent request. --- src/modules/job-manager/alloc.c | 2 ++ src/modules/job-manager/simulator.c | 56 +++++++++++++++++++++++------ src/modules/job-manager/simulator.h | 12 +++++++ src/modules/job-manager/start.c | 2 ++ 4 files changed, 62 insertions(+), 10 deletions(-) diff --git a/src/modules/job-manager/alloc.c b/src/modules/job-manager/alloc.c index 718db1277e1a..29138c47284b 100644 --- a/src/modules/job-manager/alloc.c +++ b/src/modules/job-manager/alloc.c @@ -210,6 +210,8 @@ static void alloc_response_cb (flux_t *h, flux_msg_handler_t *mh, const char *note = NULL; struct job *job; + sim_received_alloc_response (ctx->simulator); + if (flux_response_decode (msg, NULL, NULL) < 0) goto teardown; // ENOSYS here if scheduler not loaded/shutting down if (flux_msg_unpack (msg, "{s:I s:i s?:s}", diff --git a/src/modules/job-manager/simulator.c b/src/modules/job-manager/simulator.c index 747295ec2fef..7c3b905bd391 100644 --- a/src/modules/job-manager/simulator.c +++ b/src/modules/job-manager/simulator.c @@ -17,6 +17,7 @@ * also quiescent. */ #include +#include #include "simulator.h" @@ -25,6 +26,7 @@ struct simulator { flux_msg_handler_t **handlers; flux_msg_t *sim_req; flux_future_t *sched_req; + int num_outstanding_job_starts; }; void sim_ctx_destroy (struct simulator *ctx) @@ -39,6 +41,29 @@ void sim_ctx_destroy (struct simulator *ctx) } } +static inline bool is_quiescent (struct simulator *simulator) +{ + return (simulator->sched_req == NULL) && (simulator->num_outstanding_job_starts == 0); +} + +static void check_and_respond_to_quiescent_req (struct simulator *simulator) +{ + if (simulator->sim_req == NULL || !is_quiescent (simulator)) { + // Either not in a simulation or not quiesced + return; + } + + const char *sim_payload = NULL; + flux_msg_get_string (simulator->sim_req, &sim_payload); + flux_log (simulator->ctx->h, + LOG_DEBUG, + "replying to sim quiescent req with (%s)", + sim_payload); + flux_respond (simulator->ctx->h, simulator->sim_req, sim_payload); + flux_msg_destroy (simulator->sim_req); + simulator->sim_req = NULL; +} + static void sched_quiescent_continuation(flux_future_t *f, void *arg) { struct job_manager *ctx = arg; @@ -53,17 +78,10 @@ static void sched_quiescent_continuation(flux_future_t *f, void *arg) return; } - const char *sched_payload = NULL; - flux_rpc_get (f, &sched_payload); - - const char *sim_payload = NULL; - flux_msg_get_string (simulator->sim_req, &sim_payload); - flux_log (ctx->h, LOG_DEBUG, "receive quiescent from sched (%s), replying to sim with (%s)", sched_payload, sim_payload); - flux_respond (ctx->h, simulator->sim_req, sim_payload); - flux_future_destroy (f); - flux_msg_destroy (simulator->sim_req); + flux_log (ctx->h, LOG_DEBUG, "receive quiescent from sched"); simulator->sched_req = NULL; - simulator->sim_req = NULL; + check_and_respond_to_quiescent_req (simulator); + flux_future_destroy (f); } void sim_sending_sched_request (struct simulator *simulator) @@ -89,6 +107,24 @@ void sim_sending_sched_request (struct simulator *simulator) flux_respond_error(ctx->h, simulator->sim_req, errno, "job-manager: sim_sending_sched_request: flux_future_then failed"); } +void sim_received_alloc_response (struct simulator *simulator) +{ + simulator->num_outstanding_job_starts++; + flux_log (simulator->ctx->h, + LOG_DEBUG, + "received an alloc response, outstanding job starts == %d", + simulator->num_outstanding_job_starts); +} + +void sim_received_start_response (struct simulator *simulator) +{ + simulator->num_outstanding_job_starts--; + flux_log (simulator->ctx->h, + LOG_DEBUG, + "received a start response, outstanding job starts == %d", + simulator->num_outstanding_job_starts); + check_and_respond_to_quiescent_req (simulator); +} /* Handle a job-manager.quiescent request. We'll first copy the request into * ctx for later response, and then kick off the process of verifying that all diff --git a/src/modules/job-manager/simulator.h b/src/modules/job-manager/simulator.h index 06bdd9a77cb1..ab7ea93ba770 100644 --- a/src/modules/job-manager/simulator.h +++ b/src/modules/job-manager/simulator.h @@ -26,6 +26,18 @@ struct simulator *sim_ctx_create (struct job_manager *ctx); */ void sim_sending_sched_request (struct simulator *ctx); +/* Call after receiving an alloc response from the sched. + * Delays the quiescent response until the job has been started by + * the exec system. + */ +void sim_received_alloc_response (struct simulator *ctx); + +/* Call after receiving a start response from the exec system. + * Triggers a check for quiescense. + */ +void sim_received_start_response (struct simulator *ctx); + + #endif /* ! _FLUX_JOB_MANAGER_SIMULATOR_H */ /* diff --git a/src/modules/job-manager/start.c b/src/modules/job-manager/start.c index 85a60ca81f53..c597bdaa23ea 100644 --- a/src/modules/job-manager/start.c +++ b/src/modules/job-manager/start.c @@ -89,6 +89,7 @@ #include "event.h" #include "start.h" +#include "simulator.h" struct start { struct job_manager *ctx; @@ -201,6 +202,7 @@ static void start_response_cb (flux_t *h, flux_msg_handler_t *mh, goto error; } if (!strcmp (type, "start")) { + sim_received_start_response (ctx->simulator); if (event_job_post_pack (ctx->event, job, "start", NULL) < 0) goto error_post; } From 43f5ba3c2860e10a8ef0b0f82c8ea893372f4936 Mon Sep 17 00:00:00 2001 From: Stephen Herbein Date: Mon, 22 Apr 2019 15:30:59 -0700 Subject: [PATCH 12/13] flux-simulator: intial commit --- src/cmd/flux-simulator.py | 610 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 610 insertions(+) create mode 100755 src/cmd/flux-simulator.py diff --git a/src/cmd/flux-simulator.py b/src/cmd/flux-simulator.py new file mode 100755 index 000000000000..345047706f57 --- /dev/null +++ b/src/cmd/flux-simulator.py @@ -0,0 +1,610 @@ +#!/usr/bin/env python + +from __future__ import print_function +import argparse +import re +import csv +import math +import json +import logging +import heapq +from abc import ABCMeta, abstractmethod +from datetime import datetime, timedelta +from collections import Sequence, namedtuple + +import six +import flux +import flux.job +import flux.util +import flux.kvs +import flux.constants + + +def create_resource(res_type, count, with_child=[]): + assert isinstance(with_child, Sequence), "child resource must be a sequence" + assert not isinstance(with_child, str), "child resource must not be a string" + assert count > 0, "resource count must be > 0" + + res = {"type": res_type, "count": count} + + if len(with_child) > 0: + res["with"] = with_child + return res + + +def create_slot(label, count, with_child): + slot = create_resource("slot", count, with_child) + slot["label"] = label + return slot + + +class Job(object): + def __init__(self, nnodes, ncpus, submit_time, elapsed_time, timelimit, exitcode=0): + self.nnodes = nnodes + self.ncpus = ncpus + self.submit_time = submit_time + self.elapsed_time = elapsed_time + self.timelimit = timelimit + self.exitcode = exitcode + self.start_time = None + self.state_transitions = {} + self._jobid = None + self._jobspec = None + self._submit_future = None + self._start_msg = None + + @property + def jobspec(self): + if self._jobspec is not None: + return self._jobspec + + assert self.ncpus % self.nnodes == 0 + core = create_resource("core", self.ncpus / self.nnodes) + slot = create_slot("task", 1, [core]) + if self.nnodes > 0: + resource_section = create_resource("node", self.nnodes, [slot]) + else: + resource_section = slot + + jobspec = { + "version": 1, + "resources": [resource_section], + "tasks": [ + { + "command": ["sleep", "0"], + "slot": "task", + "count": {"per_slot": 1}, + } + ], + "attributes": {"system": {"duration": self.timelimit}}, + } + + self._jobspec = jobspec + return self._jobspec + + def submit(self, flux_handle): + jobspec_json = json.dumps(self.jobspec) + logger.log(9, jobspec_json) + flags = 0 + if logger.isEnabledFor(logging.DEBUG): + logger.debug("Submitting job with FLUX_JOB_DEBUG enabled") + flags = flux.constants.FLUX_JOB_DEBUG + self._submit_future = flux.job.submit_async(flux_handle, jobspec_json, flags=flags) + + @property + def jobid(self): + if self._jobid is None: + if self._submit_future is None: + raise ValueError("Job was not submitted yet. No ID assigned.") + logger.log(9, "Waiting on jobid") + self._jobid = flux.job.submit_get_id(self._submit_future) + self._submit_future = None + logger.log(9, "Received jobid: {}".format(self._jobid)) + return self._jobid + + @property + def complete_time(self): + if self.start_time is None: + raise ValueError("Job has not started yet") + return self.start_time + self.elapsed_time + + def start(self, flux_handle, start_msg, start_time): + self.start_time = start_time + self._start_msg = start_msg.copy() + flux_handle.respond( + self._start_msg, payload={"id": self.jobid, "type": "start", "data": {}} + ) + + def complete(self, flux_handle): + # TODO: emit "finish" event + flux_handle.respond( + self._start_msg, + payload={"id": self.jobid, "type": "finish", "data": {"status" : 0}} + ) + # TODO: emit "done" event + flux_handle.respond( + self._start_msg, + payload={"id": self.jobid, "type": "release", "data": {"ranks" : "all", "final": True}} + ) + + def cancel(self, flux_handle): + flux.job.RAW.cancel(flux_handle, self.jobid, "Canceled by simulator") + + def insert_apriori_events(self, simulation): + # TODO: add priority to `add_event` so that all submits for a given time + # can happen consecutively, followed by the waits for the jobids + simulation.add_event(self.submit_time, lambda: simulation.submit_job(self)) + + def record_state_transition(self, state, time): + self.state_transitions[state] = time + +class EventList(six.Iterator): + def __init__(self): + self.time_heap = [] + self.time_map = {} + self._current_time = None + + def add_event(self, time, callback): + if self._current_time is not None and time <= self._current_time: + logger.warn( + "Adding a new event at a time ({}) <= the current time ({})".format( + time, self._current_time + ) + ) + + if time in self.time_map: + self.time_map[time].append(callback) + else: + new_event_list = [callback] + self.time_map[time] = new_event_list + heapq.heappush(self.time_heap, (time, new_event_list)) + + def __len__(self): + return len(self.time_heap) + + def __iter__(self): + return self + + def min(self): + if self.time_heap: + return self.time_heap[0] + else: + return None + + def max(self): + if self.time_heap: + time = max(self.time_map.keys()) + return self.time_map[time] + else: + return None + + def __next__(self): + try: + time, event_list = heapq.heappop(self.time_heap) + self.time_map.pop(time) + self._current_time = time # used for warning messages in `add_event` + return time, event_list + except (IndexError, KeyError): + raise StopIteration() + + +class Simulation(object): + def __init__( + self, + flux_handle, + event_list, + job_map, + submit_job_hook=None, + start_job_hook=None, + complete_job_hook=None, + ): + self.event_list = event_list + self.job_map = job_map + self.current_time = 0 + self.flux_handle = flux_handle + self.pending_inactivations = set() + self.job_manager_quiescent = True + self.submit_job_hook = submit_job_hook + self.start_job_hook = start_job_hook + self.complete_job_hook = complete_job_hook + + def add_event(self, time, callback): + self.event_list.add_event(time, callback) + + def submit_job(self, job): + if self.submit_job_hook: + self.submit_job_hook(self, job) + logger.debug("Submitting a new job") + job.submit(self.flux_handle) + self.job_map[job.jobid] = job + logger.info("Submitted job {}".format(job.jobid)) + + def start_job(self, jobid, start_msg): + job = self.job_map[jobid] + if self.start_job_hook: + self.start_job_hook(self, job) + job.start(self.flux_handle, start_msg, self.current_time) + logger.info("Started job {}".format(job.jobid)) + self.add_event(job.complete_time, lambda: self.complete_job(job)) + logger.debug("Registered job {} to complete at {}".format(job.jobid, job.complete_time)) + + def complete_job(self, job): + if self.complete_job_hook: + self.complete_job_hook(self, job) + job.complete(self.flux_handle) + logger.info("Completed job {}".format(job.jobid)) + self.pending_inactivations.add(job) + + def record_job_state_transition(self, jobid, state): + job = self.job_map[jobid] + job.record_state_transition(state, self.current_time) + if state == 'INACTIVE' and job in self.pending_inactivations: + self.pending_inactivations.remove(job) + if self.is_quiescent(): + self.advance() + + def advance(self): + try: + self.current_time, events_at_time = next(self.event_list) + except StopIteration: + logger.info("No more events in event list, running post-sim analysis") + self.post_verification() + logger.info("Ending simulation") + self.flux_handle.reactor_stop(self.flux_handle.get_reactor()) + return + logger.info("Fast-forwarding time to {}".format(self.current_time)) + for event in events_at_time: + event() + logger.debug("Sending quiescent request for time {}".format(self.current_time)) + self.flux_handle.rpc("job-manager.quiescent", {"time": self.current_time}).then( + lambda fut, arg: arg.quiescent_cb(), arg=self + ) + self.job_manager_quiescent = False + + def is_quiescent(self): + return self.job_manager_quiescent and len(self.pending_inactivations) == 0 + + def quiescent_cb(self): + logger.debug("Received a response indicating the system is quiescent") + self.job_manager_quiescent = True + if self.is_quiescent(): + self.advance() + + def post_verification(self): + for jobid, job in six.iteritems(self.job_map): + if 'INACTIVE' not in job.state_transitions: + job_kvs_dir = flux.job.convert_id(jobid, "dec", "kvs") + logger.warn("Job {} had not reached the inactive state by simulation termination time.".format(jobid)) + logger.debug("Job {}'s eventlog:".format(jobid)) + eventlog = flux.kvs.get_key_raw(self.flux_handle, job_kvs_dir + ".eventlog") + for line in eventlog.splitlines(): + json_event = json.loads(line) + logger.debug(json_event) + +def datetime_to_epoch(dt): + return int((dt - datetime(1970, 1, 1)).total_seconds()) + + +re_dhms = re.compile(r"^\s*(\d+)[:-](\d+):(\d+):(\d+)\s*$") +re_hms = re.compile(r"^\s*(\d+):(\d+):(\d+)\s*$") + + +def walltime_str_to_timedelta(walltime_str): + (days, hours, mins, secs) = (0, 0, 0, 0) + match = re_dhms.search(walltime_str) + if match: + days = int(match.group(1)) + hours = int(match.group(2)) + mins = int(match.group(3)) + secs = int(match.group(4)) + else: + match = re_hms.search(walltime_str) + if match: + hours = int(match.group(1)) + mins = int(match.group(2)) + secs = int(match.group(3)) + return timedelta(days=days, hours=hours, minutes=mins, seconds=secs) + + +@six.add_metaclass(ABCMeta) +class JobTraceReader(object): + def __init__(self, tracefile): + self.tracefile = tracefile + + @abstractmethod + def validate_trace(self): + pass + + @abstractmethod + def read_trace(self): + pass + + +def job_from_slurm_row(row): + kwargs = {} + if "ExitCode" in row: + kwargs["exitcode"] = "ExitCode" + + submit_time = datetime_to_epoch( + datetime.strptime(row["Submit"], "%Y-%m-%dT%H:%M:%S") + ) + elapsed = walltime_str_to_timedelta(row["Elapsed"]).total_seconds() + if elapsed <= 0: + logger.warn("Elapsed time ({}) <= 0".format(elapsed)) + timelimit = walltime_str_to_timedelta(row["Timelimit"]).total_seconds() + if elapsed > timelimit: + logger.warn( + "Elapsed time ({}) greater than Timelimit ({})".format(elapsed, timelimit) + ) + nnodes = int(row["NNodes"]) + ncpus = int(row["NCPUS"]) + if nnodes > ncpus: + logger.warn( + "Number of Nodes ({}) greater than Number of CPUs ({}), setting NCPUS = NNodes".format( + nnodes, ncpus + ) + ) + ncpus = nnodes + elif ncpus % nnodes != 0: + old_ncpus = ncpus + ncpus = math.ceil(ncpus / nnodes) * nnodes + logger.warn( + "Number of Nodes ({}) does not evenly divide the Number of CPUs ({}), setting NCPUS to an integer multiple of the number of nodes ({})".format( + nnodes, old_ncpus, ncpus + ) + ) + + return Job(nnodes, ncpus, submit_time, elapsed, timelimit, **kwargs) + + +class SacctReader(JobTraceReader): + required_fields = ["Elapsed", "Timelimit", "Submit", "NNodes", "NCPUS"] + + def __init__(self, tracefile): + super(SacctReader, self).__init__(tracefile) + self.determine_delimiter() + + def determine_delimiter(self): + """ + sacct outputs data with '|' as the delimiter by default, but ',' is a more + common delimiter in general. This is a simple heuristic to figure out if + the job trace is straight from sacct or has had some post-processing + done that converts the delimiter to a comma. + """ + with open(self.tracefile) as infile: + first_line = infile.readline() + self.delim = '|' if '|' in first_line else ',' + + def validate_trace(self): + with open(self.tracefile) as infile: + reader = csv.reader(infile, delimiter=self.delim) + header_fields = set(next(reader)) + for req_field in SacctReader.required_fields: + if req_field not in header_fields: + raise ValueError("Job file is missing '{}'".format(req_field)) + + def read_trace(self): + """ + You can obtain the necessary information from the sacct command using the -o flag. + For example: sacct -o nnodes,ncpus,timelimit,state,submit,elapsed,exitcode + """ + with open(self.tracefile) as infile: + lines = [line for line in infile.readlines() if not line.startswith('#')] + reader = csv.DictReader(lines, delimiter=self.delim) + jobs = [job_from_slurm_row(row) for row in reader] + return jobs + + +def insert_resource_data(flux_handle, num_ranks, cores_per_rank): + """ + Populate the KVS with the resource data of the simulated system + An example of the data format: {"0": {"Package": 7, "Core": 7, "PU": 7, "cpuset": "0-6"}} + """ + if num_ranks <= 0: + raise ValueError("Requires at least one rank") + + kvs_key = "resource.hwloc.by_rank" + resource_dict = {} + for rank in range(num_ranks): + resource_dict[rank] = {} + for key in ["Package", "Core", "PU"]: + resource_dict[rank][key] = cores_per_rank + resource_dict[rank]["cpuset"] = ( + "0-{}".format(cores_per_rank - 1) if cores_per_rank > 1 else "0" + ) + put_rc = flux.kvs.put(flux_handle, kvs_key, resource_dict) + if put_rc < 0: + raise ValueError("Error inserting resource data into KVS, rc={}".format(put_rc)) + flux.kvs.commit(flux_handle) + + +def job_state_cb(flux_handle, watcher, msg, simulation): + ''' + example payload: {u'transitions': [[63652757504, u'CLEANUP'], [63652757504, u'INACTIVE']]} + ''' + logger.log(9, "Received a job state cb. msg payload: {}".format(msg.payload)) + for jobid, state in msg.payload['transitions']: + simulation.record_job_state_transition(jobid, state) + +def get_loaded_modules(flux_handle): + modules = flux_handle.rpc("cmb.lsmod").get() + return modules["mods"] + + +def load_missing_modules(flux_handle): + # TODO: check that necessary modules are loaded + # if not, load them + # return an updated list of loaded modules + loaded_modules = get_loaded_modules(flux_handle) + pass + + +def reload_scheduler(flux_handle): + sched_module = "sched-simple" + # Check if there is a module already loaded providing 'sched' service, + # if so, reload that module + for module in get_loaded_modules(flux_handle): + if "sched" in module["services"]: + sched_module = module["name"] + + logger.debug("Reloading the '{}' module".format(sched_module)) + flux_handle.rpc("cmb.rmmod", payload={"name": "sched-simple"}).get() + path = flux.util.modfind("sched-simple") + flux_handle.rpc("cmb.insmod", payload=json.dumps({"path": path, "args": []})).get() + + +def job_exception_cb(flux_handle, watcher, msg, cb_args): + logger.warn("Detected a job exception, but not handling it") + + +def sim_exec_start_cb(flux_handle, watcher, msg, simulation): + payload = msg.payload + logger.log(9, "Received sim-exec.start request. Payload: {}".format(payload)) + jobid = payload["id"] + simulation.start_job(jobid, msg) + + +def exec_hello(flux_handle): + logger.debug("Registering sim-exec with job-manager") + flux_handle.rpc("job-manager.exec-hello", payload={"service": "sim-exec"}).get() + + +def service_add(f, name): + future = f.service_register(name) + return f.future_get(future, None) + + +def service_remove(f, name): + future = f.service_unregister(name) + return f.future_get(future, None) + + +def setup_watchers(flux_handle, simulation): + watchers = [] + services = set() + for type_mask, topic, cb, args in [ + (flux.constants.FLUX_MSGTYPE_EVENT, "job-state", job_state_cb, simulation), + ( + flux.constants.FLUX_MSGTYPE_REQUEST, + "sim-exec.start", + sim_exec_start_cb, + simulation, + ), + ]: + if type_mask == flux.constants.FLUX_MSGTYPE_EVENT: + flux_handle.event_subscribe(topic) + watcher = flux_handle.msg_watcher_create( + cb, type_mask=type_mask, topic_glob=topic, args=args + ) + watcher.start() + watchers.append(watcher) + if type_mask == flux.constants.FLUX_MSGTYPE_REQUEST: + service_name = topic.split(".")[0] + if service_name not in services: + service_add(flux_handle, service_name) + services.add(service_name) + return watchers, services + + +def teardown_watchers(flux_handle, watchers, services): + for watcher in watchers: + watcher.stop() + for service_name in services: + service_remove(flux_handle, service_name) + + +Makespan = namedtuple('Makespan', ['beginning', 'end']) + +class SimpleExec(object): + def __init__(self, num_nodes, cores_per_node): + self.num_nodes = num_nodes + self.cores_per_node = cores_per_node + self.num_free_nodes = num_nodes + self.used_core_hours = 0 + + self.makespan = Makespan( + beginning=float('inf'), + end=-1, + ) + + def update_makespan(self, current_time): + if current_time < self.makespan.beginning: + self.makespan = self.makespan._replace(beginning=current_time) + if current_time > self.makespan.end: + self.makespan = self.makespan._replace(end=current_time) + + def submit_job(self, simulation, job): + self.update_makespan(simulation.current_time) + + def start_job(self, simulation, job): + self.num_free_nodes -= job.nnodes + if self.num_free_nodes < 0: + logger.error("Scheduler over-subscribed nodes") + if (job.ncpus / job.nnodes) > self.cores_per_node: + logger.error("Scheduler over-subscribed cores on the node") + + def complete_job(self, simulation, job): + self.num_free_nodes += job.nnodes + self.used_core_hours += (job.ncpus * job.elapsed_time) / 3600 + self.update_makespan(simulation.current_time) + + def post_analysis(self, simulation): + if self.makespan.beginning > self.makespan.end: + logger.warn("Makespan beginning ({}) greater than end ({})".format( + self.makespan.beginning, + self.makespan.end, + )) + + total_num_cores = self.num_nodes * self.cores_per_node + print("Makespan (hours): {:.1f}".format((self.makespan.end - self.makespan.beginning) / 3600)) + total_core_hours = (total_num_cores * (self.makespan.end - self.makespan.beginning)) / 3600 + print("Total Core-Hours: {:,.1f}".format(total_core_hours)) + print("Used Core-Hours: {:,.1f}".format(self.used_core_hours)) + print("Average Core-Utilization: {:.2f}%".format((self.used_core_hours / total_core_hours) * 100)) + + +logger = logging.getLogger("flux-simulator") + + +@flux.util.CLIMain(logger) +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("job_file") + parser.add_argument("num_ranks", type=int) + parser.add_argument("cores_per_rank", type=int) + parser.add_argument("--log-level", type=int) + args = parser.parse_args() + + if args.log_level: + logger.setLevel(args.log_level) + + flux_handle = flux.Flux() + + exec_validator = SimpleExec(args.num_ranks, args.cores_per_rank) + simulation = Simulation( + flux_handle, + EventList(), + {}, + submit_job_hook=exec_validator.submit_job, + start_job_hook=exec_validator.start_job, + complete_job_hook=exec_validator.complete_job, + ) + reader = SacctReader(args.job_file) + reader.validate_trace() + jobs = list(reader.read_trace()) + for job in jobs: + job.insert_apriori_events(simulation) + + load_missing_modules(flux_handle) + insert_resource_data(flux_handle, args.num_ranks, args.cores_per_rank) + reload_scheduler(flux_handle) + + watchers, services = setup_watchers(flux_handle, simulation) + exec_hello(flux_handle) + simulation.advance() + flux_handle.reactor_run(flux_handle.get_reactor(), 0) + teardown_watchers(flux_handle, watchers, services) + exec_validator.post_analysis(simulation) + +if __name__ == "__main__": + main() From ce688137208b7d1e3cdd4f9d9f9c093652d5a92b Mon Sep 17 00:00:00 2001 From: Stephen Herbein Date: Mon, 25 Nov 2019 04:59:50 +0200 Subject: [PATCH 13/13] t: add simple test for flux-simulator --- t/simulator/job-traces/10-multi-node.csv | 11 +++++++ t/simulator/job-traces/10-single-node.csv | 11 +++++++ t/t9000-simulation.t | 36 +++++++++++++++++++++++ 3 files changed, 58 insertions(+) create mode 100644 t/simulator/job-traces/10-multi-node.csv create mode 100644 t/simulator/job-traces/10-single-node.csv create mode 100755 t/t9000-simulation.t diff --git a/t/simulator/job-traces/10-multi-node.csv b/t/simulator/job-traces/10-multi-node.csv new file mode 100644 index 000000000000..8e5758afa765 --- /dev/null +++ b/t/simulator/job-traces/10-multi-node.csv @@ -0,0 +1,11 @@ +JobID,NNodes,NCPUS,Timelimit,Submit,Elapsed + 1,10,160,00:10:00,2019-01-01T00:00:00,00:06:00 + 2,10,160,00:10:00,2019-01-01T00:00:00,00:06:00 + 3,10,160,00:10:00,2019-01-01T00:00:00,00:06:00 + 4,10,160,00:10:00,2019-01-01T00:00:00,00:06:00 + 5,10,160,00:10:00,2019-01-01T00:00:00,00:06:00 + 6,10,160,00:10:00,2019-01-01T00:00:00,00:06:00 + 7,10,160,00:10:00,2019-01-01T00:00:00,00:06:00 + 8,10,160,00:10:00,2019-01-01T00:00:00,00:06:00 + 9,10,160,00:10:00,2019-01-01T00:00:00,00:06:00 +10,10,160,00:10:00,2019-01-01T00:00:00,00:06:00 diff --git a/t/simulator/job-traces/10-single-node.csv b/t/simulator/job-traces/10-single-node.csv new file mode 100644 index 000000000000..d522b6fd5d66 --- /dev/null +++ b/t/simulator/job-traces/10-single-node.csv @@ -0,0 +1,11 @@ +JobID,NNodes,NCPUS,Timelimit,Submit,Elapsed + 1,1,16,00:10:00,2019-01-01T00:00:00,00:06:00 + 2,1,16,00:10:00,2019-01-01T00:00:00,00:06:00 + 3,1,16,00:10:00,2019-01-01T00:00:00,00:06:00 + 4,1,16,00:10:00,2019-01-01T00:00:00,00:06:00 + 5,1,16,00:10:00,2019-01-01T00:00:00,00:06:00 + 6,1,16,00:10:00,2019-01-01T00:00:00,00:06:00 + 7,1,16,00:10:00,2019-01-01T00:00:00,00:06:00 + 8,1,16,00:10:00,2019-01-01T00:00:00,00:06:00 + 9,1,16,00:10:00,2019-01-01T00:00:00,00:06:00 +10,1,16,00:10:00,2019-01-01T00:00:00,00:06:00 diff --git a/t/t9000-simulation.t b/t/t9000-simulation.t new file mode 100755 index 000000000000..189003adddde --- /dev/null +++ b/t/t9000-simulation.t @@ -0,0 +1,36 @@ +#!/bin/sh + +test_description='Test flux simulator command' + +. $(dirname $0)/sharness.sh + +test_under_flux 1 + +# Set CLIMain log level to logging.DEBUG (10), to enable stack traces +export FLUX_PYCLI_LOGLEVEL=10 + +flux setattr log-stderr-level 1 + +SIM_JOBTRACES_DIR=${SHARNESS_TEST_SRCDIR}/simulator/job-traces + +test_expect_success 'flux simulator fails with usage message' ' + test_must_fail flux simulator 2>usage.err && + grep -i usage: usage.err +' + +test_expect_success 'flux simulator with single node works' ' + flux simulator $SIM_JOBTRACES_DIR/10-single-node.csv 1 16 >run1.out && + grep -i "utilization: 100" run1.out +' + +test_expect_success 'flux simulator with multiple nodes works' ' + flux simulator $SIM_JOBTRACES_DIR/10-single-node.csv 3 16 >run2.out && + grep -i "utilization: 83" run2.out +' + +test_expect_failure 'flux simulator errors out on unstaisfiable jobs' ' + run_timeout 5 flux simulator $SIM_JOBTRACES_DIR/10-multi-node.csv 1 16 >run3.out 2>run3.err && + grep -i "unsatisfiable" run3.err +' + +test_done \ No newline at end of file