From b7d13726f610854e5b25f3b1c71d2c3e7c69c85d Mon Sep 17 00:00:00 2001 From: "Dong H. Ahn" Date: Tue, 26 May 2015 13:45:57 -0700 Subject: [PATCH 1/8] Add Jadd_int64 support --- src/common/libutil/shortjson.h | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/common/libutil/shortjson.h b/src/common/libutil/shortjson.h index 3f3d144f8985..78c391276666 100644 --- a/src/common/libutil/shortjson.h +++ b/src/common/libutil/shortjson.h @@ -57,6 +57,17 @@ Jadd_int (JSON o, const char *name, int i) json_object_object_add (o, (char *)name, n); } +/* Add 64bit integer to JSON. + */ +static __inline__ void +Jadd_int64 (JSON o, const char *name, int64_t i) +{ + JSON n = json_object_new_int64 (i); + if (!n) + oom (); + json_object_object_add (o, (char *)name, n); +} + /* Add double to JSON. */ static __inline__ void From 1fb9f7c3b20d52c3666b29ee1367e0ffc7c821b3 Mon Sep 17 00:00:00 2001 From: "Dong H. Ahn" Date: Tue, 26 May 2015 13:50:28 -0700 Subject: [PATCH 2/8] Add Job Status and Control (JSC) interface support --- configure.ac | 1 + src/common/core.h | 1 + src/include/flux/core.h | 1 + src/modules/Makefile.am | 2 +- src/modules/libjsc/Makefile.am | 16 + src/modules/libjsc/README.md | 156 ++++++ src/modules/libjsc/jstatctl.c | 925 +++++++++++++++++++++++++++++++++ src/modules/libjsc/jstatctl.h | 125 +++++ 8 files changed, 1226 insertions(+), 1 deletion(-) create mode 100644 src/modules/libjsc/Makefile.am create mode 100644 src/modules/libjsc/README.md create mode 100644 src/modules/libjsc/jstatctl.c create mode 100644 src/modules/libjsc/jstatctl.h diff --git a/configure.ac b/configure.ac index c9abd5ebdc80..f301e62c9325 100644 --- a/configure.ac +++ b/configure.ac @@ -141,6 +141,7 @@ AC_CONFIG_FILES( \ src/modules/libmrpc/Makefile \ src/modules/libzio/Makefile \ src/modules/libkz/Makefile \ + src/modules/libjsc/Makefile \ src/modules/live/Makefile \ src/modules/mecho/Makefile \ src/modules/barrier/Makefile \ diff --git a/src/common/core.h b/src/common/core.h index f2099d0cf71f..6fdcfad1ce0a 100644 --- a/src/common/core.h +++ b/src/common/core.h @@ -34,6 +34,7 @@ #include "core/kvs.h" #include "core/live.h" #include "core/barrier.h" +#include "core/jstatctl.h" #endif /* !_FLUX_CORE_H */ diff --git a/src/include/flux/core.h b/src/include/flux/core.h index 07f718014e1b..088525277512 100644 --- a/src/include/flux/core.h +++ b/src/include/flux/core.h @@ -9,5 +9,6 @@ #include "src/modules/kvs/kvs.h" #include "src/modules/live/live.h" #include "src/modules/barrier/barrier.h" +#include "src/modules/libjsc/jstatctl.h" #endif /* FLUX_CORE_H */ diff --git a/src/modules/Makefile.am b/src/modules/Makefile.am index 50d4270ce275..9fd66a08b79b 100644 --- a/src/modules/Makefile.am +++ b/src/modules/Makefile.am @@ -1,2 +1,2 @@ #This order is *important* -SUBDIRS = api kvs libmrpc libzio libkz modctl live mecho barrier wreck +SUBDIRS = api kvs libmrpc libzio libkz modctl live mecho barrier wreck libjsc diff --git a/src/modules/libjsc/Makefile.am b/src/modules/libjsc/Makefile.am new file mode 100644 index 000000000000..481a4aaeb0c8 --- /dev/null +++ b/src/modules/libjsc/Makefile.am @@ -0,0 +1,16 @@ +AM_CFLAGS = @GCCWARN@ + +AM_CPPFLAGS = \ + $(JSON_CFLAGS) \ + -I$(top_srcdir) -I$(top_srcdir)/src/include + +fluxlib_LTLIBRARIES = libjsc.la +fluxcoreinclude_HEADERS = jstatctl.h + +libjsc_la_SOURCES = \ + jstatctl.c \ + jstatctl.h +libjsc_la_LDFLAGS = -shared -export-dynamic --disable-static \ + $(top_builddir)/src/modules/kvs/libkvs.la \ + $(top_builddir)/src/common/libflux-internal.la \ + $(top_builddir)/src/common/libflux-core.la diff --git a/src/modules/libjsc/README.md b/src/modules/libjsc/README.md new file mode 100644 index 000000000000..a686c688f94b --- /dev/null +++ b/src/modules/libjsc/README.md @@ -0,0 +1,156 @@ +Job Status and Control Application Programming Interface +=================== + +The Job Status and Control (JSC) API is a high-level abstraction that allows its client software to monitor and control the status of Flux jobs. It is designed to expose the job status and control abstraction in a way to hide the underlying data layout of job information stored within Flux's KVS data store. We expect that schedulers and runtime tools will be its main users. This abstraction provides the producers of job information including a task and program execution service module such as `flux-wreckrun` with an opportunity to change and optimize the data layout of jobs within the KVS without presenting major impacts to the implementation of the schedulers and runtime tools. + +1. Design Considerations +------------- +The main design considerations are the following: +>1. Abstract out implementation-dependent job states; +>2. Provide flexible and easily extensible mechanisms to pass job information; +>3. Use a minimalistic API set. + +The first consideration has led us to use a C enumerator (i.e., *job\_state\_t*) to capture the job states. However, because Flux has not yet defined its job schema, the second consideration discouraged us to use a C user-defined type to pass job information with the client software. Instead, JSC uses an JSON to capture the job information and introduce the notion of Job Control Block (JCB) to have a structure on this information. We will try to keep backward compatibility on JCB's structure as we will extend this to keep abreast of the evolution of Flux's job schema. Finally, the third consideration has led us to introduce three simple API calls as the main JSC idioms: job status-change notification as well as JCB query and update. Client software can use the notification call to get the status of a job asynchronously on a state change; the query and update calls to fetch and update a job's JCB, respectively. + +2. Job States +------------- +The JSC API converts the state strings produced by `flux-wreckrun` and the scheduler framework service comms module into a C enumerator: *job\_state\_t*. Its elements are shown in Table 2-1. If the raw state strings are changed in the future, one must accommodate this JCB implementation accordingly--mapping the new strings to these state elements. We expect the state set will further be extended as we will add more advanced services such as elastic job and resource management service. + +| Elements | Comment | +|--------------|-----------------------------------------------------------------| +| J_NULL | No state has been assigned | +| J_RESERVED | Reserved in KVS | +| J_SUBMITTED | Added to KVS | +| J_PENDING | Pending | +| J_SCHEDREQ | Resource selection requested | +| J_ALLOCATED | Resource allocated/contained in the Flux instance | +| J_RUNREQUEST | Requested to be executed | +| J_STARTING | Starting | +| J_STOPPED | Stopped -- including the stop after `exec` under a tool's control | +| J_RUNNING | Running | +| J_CANCELLED | Cancelled | +| J_COMPLETE | Complete | +| J_REAPED | Reaped to the upper-level Flux instance | +| J_FOR_RENT | To be extended | + +**Table 2-1** Job state elements + + +3. Job Control Block +------------- +Job Control Block (JCB) is our data schema containing the information needed to manage a particular job. It contains information such as jobid, resources owned by the job, as well as the processes spawned by the job. The JSC API converts the raw information on a job into an JCB, implemented as an JSON dictionary object. Our current JCB structure is shown in Table 3-1 through 3-6. As Flux's job schema evolves, we will extend JCB while trying our best to keep backward compatibility. + +| Key | Macro | Value Type | Comment | +|------------|----------------|----------------|--------------------------------------------------------------------------------| +| jobid | JSC_JOBID | 64-bit integer | Job id | +| state-pair | JSC_STATE\_PAIR | dictionary | A dictionary containing this old and new states of the job. See Table 3-2. | +| rdesc | JSC_RDESC | dictionary | Information on the resources owned by this job. See Table 3-3. | +| rdl | JSC_RDL | string | RDL binary string allocated to the job | +| rdl_alloc | JSC_RDL\_ALLOC | array of per-cmbd resources | Resource descriptor array (Resources allocated per cmbd - cmbd rank order). See Table 3-4.| +| pdesc | JSC_PDESC | dictionary | Information on the processes spawned by this job. See Table 3-5. | + +**Table 3-1** Keys and values of top-level JCB attributes + + +| Key | Macro | Value Type | Comment | +|------------|-----------------------|----------------|-------------------------------------| +| ostate | JSC_STATE\_PAIR\_OSTATE | 64-bit integer | Old state (a *job\_state\_t* element) | +| nstate | JSC_STATE\_PAIR\_NSTATE | 64-bit integer | New state (a *job\_state\_t* element) | + +**Table 3-2** Keys and values of *state-pair* attribute + + +| Key | Macro | Value Type | Comment | +|------------|------------------|----------------|---------------| +| nnodes | JSC_RDESC\_NNODES | 64-bit integer | Node count | +| ntasks | JSC_RDESC\_NTASKS | 64-bit integer | Process count | + +**Table 3-3** Keys and values of *rdesc* attribute + + +| Key | Macro | Value Type | Comment | +|------------|------------------------------|----------------|---------------------------------| +| contained | JSC_RDL\_ALLOC\_CONTAINED | dictionary | Per cmdb resource containment See Table 3-4-1 | + +**Table 3-4** Keys and values of *rdl\_alloc* attribute + + +| Key | Macro | Value Type | Comment | +|------------|------------------------------|----------------|---------------------------------| +| cmbdncores | JSC_RDL\_ALLOC\_CONTAINED\_NCORES | 64-bit integer | Core count to use for this cmdb | + +**Table 3-4-1** Keys and values of *rsarray* attribute + + +| Key | Macro | Value Type | Comment | +|------------|---------------------|-----------------------------|---------------------------------------------------------------------| +| procsize | JSC_PDESC\_SIZE | 64-bit integer | Process count | +| hostnames | JSC_PDESC\_HOSTNAMES | array of strings | Host name array (Names are current home cmbd rank) | +| executables| JSC_PDESC\_EXECS | array of strings | Executable name array | +| pdarray | JSC_PDESC\_PDARRAY | array of dictionary objects | Process descriptor array (MPI rank order). See Table 3-6 for each pdarray element | + +**Table 3-5** Keys and values of *pdesc* attribute + + +| Key | Macro | Value Type | Comment | +|------------|------------------------------|----------------|--------------------------------------------------------| +| pid | JSC_PDESC\_RANK\_PDARRAY\_PID | 64-bit integer | Process count | +| hindx | JSC_PDESC\_RANK\_PDARRAY\_HINDX | 64-bit integer | Host name (indexing into the hostname array) | +| eindx | JSC_PDESC\_RANK\_PDARRAY\_EINDX | 64-bit integer | Executable name (indexing into the executable name array) | + +**Table 3-6** Keys and values of each *pdarray* element + + +4. Application Programming Interface +------------- +The Job Status and Control API currently provides three main calls: + +> Note: typedef int (JSC_CB_PTR)(json_object \*base_jcb, void \*arg, int errnum); +> +>- int jsc_notify_status (flux_t h, JSC_CB_PTR \*callback, void \*d); +>- int jsc_query_jcb (flux_t h, int64_t jobid, const char \*key, json_object \*\*jcb); +>- int jsc_update_jcb (flux_t h, int64_t jobid, const char \*key, json_object \*jcb); + + + +#### jsc\_notify\_status +Registers a *callback* to the asynchronous status change notification service. +*callback* will be invoked when the state of a job changes. The *jobid* and *state-pair* +as shown in Table 3-2 will be passed as *base_jcb* into the *callback*. +*d* is arbitrary data that will transparently be passed into *callback*. +However, one should pass its *flux_t* object as part of this callback data. +Note that the caller must start its reactor to get an asynchronous status +change notification via *callback*. This is because it uses the KVS-watch +facility which has the same limitation. One can register mutliple callbacks +by calling this function multiple times. The callbacks will be invoked in the order +they are registered. Returns 0 on success; otherwise -1. + + +#### jsc\_query\_jcb +Queries the *key* attribute of JCB of *jobid*. The JCB info on this attribute +will be passed via *jcb*. It is the responsibility of the caller to release *jcb*. +All ownership associated with the sub-attributes in *jcb*'s hierarchy +are transferred to *jcb*, so that json_object_put (\*jcb) will free this hierarchy +in its entirety. Returns 0 on success; otherwise -1. + +####jsc\_update\_jcb +Updates the *key* attribute within the JCB of *jobid*. The top-level attribute of *jcb* should be the same as *key*. Returns 0 on success; otherwise -1. This will not release *jcb* so it is the responsibility of the caller to free *jcb*. + +>**Notes:** + +>1. JCB granularity optimization -- one can optimize the amounts of JCB information piggybacked with each notification (*base_jcb*). One can further extend the single attribute-wise query/update pattern to group-wise ones once the access patterns of JCS API's clients are known. + +>2. JCB producer-consumer synchronization -- currently there is no built-in synchronization between JCB producers and consumers and thus a race condition can occur. When the remote parallel execution changes the state of a job, and the registered callbacks will be invoked. However, when one of the invoked callbacks is trying to read an JCB attribute, nothing prevents the remote execution from modifying the same JCB attribute! Because producers and consumers use the KVS like a distributed shared memory, one must devise ways to guarantee synchronization. One solution is for the producers also use the JSC API and we build some synchronization primitives into this API. But for now, we ignore these synchronization issues. + + +5. Testing +------------- + +To facilitate the testing of this API, we created an utility command: `flux-jstat`. Its usage is the following: + + Usage: flux-jstat notify + flux-jstat query jobid + flux-jstat update jobid + +Further, `flux-core/t/t2001-jsc.t` contains various test cases that use this utility. + diff --git a/src/modules/libjsc/jstatctl.c b/src/modules/libjsc/jstatctl.c new file mode 100644 index 000000000000..1c4fe691b505 --- /dev/null +++ b/src/modules/libjsc/jstatctl.c @@ -0,0 +1,925 @@ +/*****************************************************************************\ + * Copyright (c) 2015 Lawrence Livermore National Security, LLC. Produced at + * the Lawrence Livermore National Laboratory (cf, AUTHORS, DISCLAIMER.LLNS). + * LLNL-CODE-658032 All rights reserved. + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the license, or (at your option) + * any later version. + * + * Flux is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the IMPLIED WARRANTY OF MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the terms and conditions of the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * See also: http://www.gnu.org/licenses/ +\*****************************************************************************/ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include +#include +#include +#include +#include + +#include "jstatctl.h" +#include "src/common/libutil/log.h" +#include "src/common/libutil/jsonutil.h" +#include "src/common/libutil/xzmalloc.h" +#include "src/common/libutil/shortjson.h" + + +/******************************************************************************* + * * + * Internal User Define Types and Data * + * * + *******************************************************************************/ + +typedef struct { + job_state_t i; + const char *s; +} stab_t; + +typedef struct { + JSC_CB_PTR *cb; + void *arg; +} cb_pair_t; + +typedef struct { + zhash_t *active_jobs; + zlist_t *callbacks; + int first_time; + flux_t h; +} jscctx_t; + +static stab_t job_state_tab[] = { + { J_NULL, "null" }, + { J_RESERVED, "reserved" }, + { J_SUBMITTED, "submitted" }, + { J_PENDING, "pending" }, + { J_SCHEDREQ, "schedreq" }, + { J_SELECTED, "selected" }, + { J_ALLOCATED, "allocated" }, + { J_RUNREQUEST, "runrequest" }, + { J_STARTING, "starting" }, + { J_STOPPED, "stopped" }, + { J_RUNNING, "running" }, + { J_CANCELLED, "cancelled" }, + { J_COMPLETE, "complete" }, + { J_REAPED, "reaped" }, + { J_FOR_RENT, "for_rent" }, + { -1, NULL }, +}; + + +/****************************************************************************** + * * + * Utilities * + * * + ******************************************************************************/ + +const char *jsc_job_num2state (job_state_t i) +{ + stab_t *ss = job_state_tab; + while (ss->s != NULL) { + if (ss->i == i) + return ss->s; + ss++; + } + return NULL; +} + +static int jsc_job_state2num (const char *s) +{ + stab_t *ss = job_state_tab; + while (ss->s != NULL) { + if (!strcmp (ss->s, s)) + return ss->i; + ss++; + } + return -1; +} + +static void freectx (jscctx_t *ctx) +{ + zhash_destroy (&(ctx->active_jobs)); + zlist_destroy (&(ctx->callbacks)); +} + +static jscctx_t *getctx (flux_t h) +{ + jscctx_t *ctx = (jscctx_t *)flux_aux_get (h, "jstatctrl"); + if (!ctx) { + ctx = xzmalloc (sizeof (*ctx)); + if (!(ctx->active_jobs = zhash_new ())) + oom (); + if (!(ctx->callbacks = zlist_new ())) + oom (); + ctx->first_time = 1; + ctx->h = h; + flux_aux_set (h, "jstatctrl", ctx, (FluxFreeFn)freectx); + } + return ctx; +} + +static inline bool is_jobid (const char *k) +{ + return (!strncmp (JSC_JOBID, k, JSC_MAX_ATTR_LEN))? true : false; +} + +static inline bool is_state_pair (const char *k) +{ + return (!strncmp (JSC_STATE_PAIR, k, JSC_MAX_ATTR_LEN))? true : false; +} + +static inline bool is_rdesc (const char *k) +{ + return (!strncmp (JSC_RDESC, k, JSC_MAX_ATTR_LEN))? true : false; +} + +static inline bool is_rdl (const char *k) +{ + return (!strncmp (JSC_RDL, k, JSC_MAX_ATTR_LEN))? true : false; +} + +static inline bool is_rdl_alloc (const char *k) +{ + return (!strncmp (JSC_RDL_ALLOC, k, JSC_MAX_ATTR_LEN))? true : false; +} + +static inline bool is_pdesc (const char *k) +{ + return (!strncmp (JSC_PDESC, k, JSC_MAX_ATTR_LEN))? true : false; +} + +static int fetch_and_update_state (zhash_t *aj , int64_t j, int64_t ns) +{ + int *t = NULL; + char key[20] = {'\0'}; + + if (!aj) return J_FOR_RENT;; + snprintf (key, 20, "%ld", j); + if ( !(t = ((int *)zhash_lookup (aj, (const char *)key)))) + return J_FOR_RENT; + if (ns == J_COMPLETE) + zhash_delete (aj, key); + else + zhash_update (aj, key, (void *)(intptr_t)ns); + + /* safe to convert t to int */ + return (intptr_t) t; +} + + +/****************************************************************************** + * * + * Internal JCB Accessors * + * * + ******************************************************************************/ + +static int parse_jobid (const char *k, int64_t *i) +{ + int rc = -1; + char *kcopy = NULL, *sptr = NULL, *j = NULL, *id = NULL; + + kcopy = xstrdup (k); + j = strtok_r (kcopy, ".", &sptr); + if (strncmp(j, "lwj", 3) != 0) + goto done; + + id = strtok_r (NULL, ".", &sptr); + errno = 0; + *i = strtoul(id, (char **) NULL, 10); + if (errno != 0) + goto done; + rc = 0; + +done: + free (kcopy); + return rc; +} + +static int jobid_exist (flux_t h, int64_t j) +{ + kvsdir_t d; + if (kvs_get_dir (h, &d, "lwj.%ld", j) < 0) { + flux_log (h, LOG_DEBUG, "lwj.%ld doesn't exist", j); + return -1; + } + kvsdir_destroy (d); + return 0; +} + +static bool fetch_rank_pdesc (JSON src, int64_t *p, int64_t *n, const char **c) +{ + if (!src) return false; + if (!Jget_str (src, "command", c)) return false; + if (!Jget_int64 (src, "pid", p)) return false; + if (!Jget_int64 (src, "nodeid", n)) return false; + return true; +} + +static int build_name_array (zhash_t *ha, const char *k, JSON ns) +{ + int i = (intptr_t) zhash_lookup (ha, k); + if ((void *)((intptr_t)i) == NULL) { + char *t = xstrdup (k); + i = json_object_array_length (ns); + Jadd_ar_str (ns, t); + zhash_insert (ha, k, (void *)(intptr_t)i+1); + free (t); + } else + i--; + return i; +} + +static int extract_raw_nnodes (flux_t h, int64_t j, int64_t *nnodes) +{ + int rc = 0; + char key[20] = {'\0'}; + snprintf (key, 20, "lwj.%ld.nnodes", j); + if (kvs_get_int64 (h, key, nnodes) < 0) { + flux_log (h, LOG_ERR, "extract %s: %s", key, strerror (errno)); + rc = -1; + } + else + flux_log (h, LOG_DEBUG, "extract %s: %ld", key, *nnodes); + return rc; +} + +static int extract_raw_ntasks (flux_t h, int64_t j, int64_t *ntasks) +{ + int rc = 0; + char key[20] = {'\0'}; + snprintf (key, 20, "lwj.%ld.ntasks", j); + if (kvs_get_int64 (h, key, ntasks) < 0) { + flux_log (h, LOG_ERR, "extract %s: %s", key, strerror (errno)); + rc = -1; + } + else + flux_log (h, LOG_DEBUG, "extract %s: %ld", key, *ntasks); + return rc; +} + +static int extract_raw_rdl (flux_t h, int64_t j, char **rdlstr) +{ + int rc = 0; + char key[20] = {'\0'}; + snprintf (key, 20, "lwj.%ld.rdl", j); + if (kvs_get_string (h, key, rdlstr) < 0) { + flux_log (h, LOG_ERR, "extract %s: %s", key, strerror (errno)); + rc = -1; + } + else + flux_log (h, LOG_DEBUG, "rdl under %s extracted", key); + return rc; +} + +static int extract_raw_state (flux_t h, int64_t j, int64_t *s) +{ + int rc = 0; + char key[20] = {'\0'}; + char *state = NULL; + snprintf (key, 20, "lwj.%ld.state", j); + if (kvs_get_string (h, key, &state) < 0) { + flux_log (h, LOG_ERR, "extract %s: %s", key, strerror (errno)); + rc = -1; + } + else { + *s = jsc_job_state2num (state); + flux_log (h, LOG_DEBUG, "extract %s: %s", key, state); + } + if (state) + free (state); + return rc; +} + +static int extract_raw_pdesc (flux_t h, int64_t j, int64_t i, JSON *o) +{ + int rc = 0; + char key[20] = {'\0'}; + snprintf (key, 20, "lwj.%ld.%ld.procdesc", j, i); + if (kvs_get (h, key, o) < 0) { + flux_log (h, LOG_ERR, "extract %s: %s", key, strerror (errno)); + rc = -1; + if (*o) + Jput (*o); + } + return rc; +} + +static JSON build_parray_elem (int64_t pid, int64_t eix, int64_t hix) +{ + JSON po = Jnew (); + Jadd_int64 (po, JSC_PDESC_RANK_PDARRAY_PID, pid); + Jadd_int64 (po, JSC_PDESC_RANK_PDARRAY_EINDX, eix); + Jadd_int64 (po, JSC_PDESC_RANK_PDARRAY_HINDX, hix); + return po; +} + +static void add_pdescs_to_jcb (JSON *hns, JSON *ens, JSON *pa, JSON jcb) +{ + json_object_object_add (jcb, JSC_PDESC_HOSTNAMES, *hns); + json_object_object_add (jcb, JSC_PDESC_EXECS, *ens); + json_object_object_add (jcb, JSC_PDESC_PDARRAY, *pa); + /* Because the above transfer ownership, assign NULL should be ok */ + *hns = NULL; + *ens = NULL; + *pa = NULL; +} + +static int extract_raw_pdescs (flux_t h, int64_t j, int64_t n, JSON jcb) +{ + int rc = -1; + int64_t i = 0; + char hnm[20] = {'\0'}; + const char *cmd = NULL; + zhash_t *eh = NULL; /* hash holding a set of unique exec_names */ + zhash_t *hh = NULL; /* hash holding a set of unique host_names */ + JSON o = NULL, po = NULL; + JSON pa = Jnew_ar (); + JSON hns = Jnew_ar (); + JSON ens = Jnew_ar (); + + if (!(eh = zhash_new ()) || !(hh = zhash_new ())) + oom (); + for (i=0; i < (int) n; i++) { + int64_t eix = 0, hix = 0; + int64_t pid = 0, nid = 0; + + if (extract_raw_pdesc (h, j, i, &o) != 0) + goto done; + if (!fetch_rank_pdesc (o, &pid, &nid, &cmd)) + goto done; + + eix = build_name_array (eh, cmd, ens); + /* FIXME: we need a hostname service */ + snprintf (hnm, 20, "%ld", nid); + hix = build_name_array (hh, hnm, hns); + po = build_parray_elem (pid, eix, hix); + json_object_array_add (pa, po); + po = NULL; + Jput (o); + o = NULL; + } + add_pdescs_to_jcb (&hns, &ens, &pa, jcb); + rc = 0; + +done: + if (o) Jput (o); + if (po) Jput (po); + if (pa) Jput (pa); + if (hns) Jput (hns); + if (ens) Jput (ens); + zhash_destroy (&eh); + zhash_destroy (&hh); + return rc; +} + +static int extract_raw_rdl_alloc (flux_t h, int64_t j, JSON jcb) +{ + int i = 0; + char k[20]; + int64_t cores = 0; + JSON ra = Jnew_ar (); + bool processing = true; + + for (i=0; processing; ++i) { + snprintf (k, 20, "lwj.%ld.rank.%d.cores", j, i); + if (kvs_get_int64 (h, k, &cores) < 0) { + if (errno != EINVAL) + flux_log (h, LOG_ERR, "extract %s: %s", k, strerror (errno)); + processing = false; + } else { + JSON elem = Jnew (); + JSON o = Jnew (); + Jadd_int64 (o, JSC_RDL_ALLOC_CONTAINED_NCORES, cores); + json_object_object_add (elem, JSC_RDL_ALLOC_CONTAINED, o); + json_object_array_add (ra, elem); + } + } + json_object_object_add (jcb, JSC_RDL_ALLOC, ra); + return 0; +} + +static int query_jobid (flux_t h, int64_t j, JSON *jcb) +{ + int rc = 0; + if ( ( rc = jobid_exist (h, j)) != 0) + *jcb = NULL; + else { + *jcb = Jnew (); + Jadd_int64 (*jcb, JSC_JOBID, j); + } + return rc; +} + +static int query_state_pair (flux_t h, int64_t j, JSON *jcb) +{ + JSON o = NULL; + int64_t st = (int64_t)J_FOR_RENT;; + + if (extract_raw_state (h, j, &st) < 0) return -1; + + *jcb = Jnew (); + o = Jnew (); + /* Old state is unavailable through the query. + * One should use notification service instead. + */ + Jadd_int64 (o, JSC_STATE_PAIR_OSTATE, st); + Jadd_int64 (o, JSC_STATE_PAIR_NSTATE, st); + json_object_object_add (*jcb, JSC_STATE_PAIR, o); + return 0; +} + +static int query_rdesc (flux_t h, int64_t j, JSON *jcb) +{ + JSON o = NULL; + int64_t nnodes = -1; + int64_t ntasks = -1; + + if (extract_raw_nnodes (h, j, &nnodes) < 0) return -1; + if (extract_raw_ntasks (h, j, &ntasks) < 0) return -1; + + *jcb = Jnew (); + o = Jnew (); + Jadd_int64 (o, JSC_RDESC_NNODES, nnodes); + Jadd_int64 (o, JSC_RDESC_NTASKS, ntasks); + json_object_object_add (*jcb, JSC_RDESC, o); + return 0; +} + +static int query_rdl (flux_t h, int64_t j, JSON *jcb) +{ + char *rdlstr = NULL; + + if (extract_raw_rdl (h, j, &rdlstr) < 0) return -1; + + *jcb = Jnew (); + Jadd_str (*jcb, JSC_RDL, (const char *)rdlstr); + /* Note: seems there is no mechanism to transfer ownership + * of this string to jcb */ + if (rdlstr) + free (rdlstr); + return 0; +} + +static int query_rdl_alloc (flux_t h, int64_t j, JSON *jcb) +{ + *jcb = Jnew (); + return extract_raw_rdl_alloc (h, j, *jcb); +} + +static int query_pdesc (flux_t h, int64_t j, JSON *jcb) +{ + int64_t ntasks = 0; + if (extract_raw_ntasks (h, j, &ntasks) < 0) return -1; + *jcb = Jnew (); + Jadd_int64 (*jcb, JSC_PDESC_SIZE, ntasks); + return extract_raw_pdescs (h, j, ntasks, *jcb); +} + +static int update_state (flux_t h, int64_t j, JSON o) +{ + int rc = -1; + int64_t st = 0; + char key[20] = {'\0'}; + + if (!Jget_int64 (o, JSC_STATE_PAIR_NSTATE, &st)) return -1; + if ((st >= J_FOR_RENT) || (st < J_NULL)) return -1; + + snprintf (key, 20, "lwj.%ld.state", j); + if (kvs_put_string (h, key, jsc_job_num2state ((job_state_t)st)) < 0) + flux_log (h, LOG_ERR, "update %s: %s", key, strerror (errno)); + else if (kvs_commit (h) < 0) + flux_log (h, LOG_ERR, "commit %s: %s", key, strerror (errno)); + else { + flux_log (h, LOG_DEBUG, "job (%ld) assigned new state: %s", j, + jsc_job_num2state ((job_state_t)st)); + rc = 0; + } + + return rc; +} + +static int update_rdesc (flux_t h, int64_t j, JSON o) +{ + int rc = -1; + int64_t nnodes = 0; + int64_t ntasks = 0; + char key1[20] = {'\0'}; + char key2[20] = {'\0'}; + + if (!Jget_int64 (o, JSC_RDESC_NNODES, &nnodes)) return -1; + if (!Jget_int64 (o, JSC_RDESC_NTASKS, &ntasks)) return -1; + if ((nnodes < 0) || (ntasks < 0)) return -1; + + snprintf (key1, 20, "lwj.%ld.nnodes", j); + snprintf (key2, 20, "lwj.%ld.ntasks", j); + if (kvs_put_int64 (h, key1, nnodes) < 0) + flux_log (h, LOG_ERR, "update %s: %s", key1, strerror (errno)); + else if (kvs_put_int64 (h, key2, ntasks) < 0) + flux_log (h, LOG_ERR, "update %s: %s", key2, strerror (errno)); + else if (kvs_commit (h) < 0) + flux_log (h, LOG_ERR, "commit failed"); + else { + flux_log (h, LOG_DEBUG, "job (%ld) assigned new resources.", j); + rc = 0; + } + + return rc; +} + +static int update_rdl (flux_t h, int64_t j, const char *rs) +{ + int rc = -1; + char key[20] = {'\0'}; + + snprintf (key, 20, "lwj.%ld.rdl", j); + if (kvs_put_string (h, key, rs) < 0) + flux_log (h, LOG_ERR, "update %s: %s", key, strerror (errno)); + else if (kvs_commit (h) < 0) + flux_log (h, LOG_ERR, "commit failed"); + else { + flux_log (h, LOG_DEBUG, "job (%ld) assigned new rdl.", j); + rc = 0; + } + + return rc; +} + +static int update_1ra (flux_t h, int r, int64_t j, JSON o) +{ + int rc = 0; + int64_t ncores = 0; + char key[20] = {'\0'}; + JSON c = NULL; + + if (!Jget_obj (o, JSC_RDL_ALLOC_CONTAINED, &c)) return -1; + if (!Jget_int64 (c, JSC_RDL_ALLOC_CONTAINED_NCORES, &ncores)) return -1; + + snprintf (key, 20, "lwj.%ld.rank.%d.cores", j, r); + if ( (rc = kvs_put_int64 (h, key, ncores)) < 0) { + flux_log (h, LOG_ERR, "put %s: %s", key, strerror (errno)); + } + return rc; +} + +static int update_rdl_alloc (flux_t h, int64_t j, JSON o) +{ + int i = 0; + int rc = -1; + int size = 0; + JSON ra_e = NULL; + + if (!Jget_ar_len (o, &size)) return -1; + + for (i=0; i < (int) size; ++i) { + if (!Jget_ar_obj (o, i, &ra_e)) + goto done; + if ( (rc = update_1ra (h, i, j, ra_e)) < 0) + goto done; + } + if (kvs_commit (h) < 0) { + flux_log (h, LOG_ERR, "update_pdesc commit failed"); + goto done; + } + rc = 0; + +done: + return rc; +} + +static int update_1pdesc (flux_t h, int r, int64_t j, JSON o, JSON ha, JSON ea) +{ + int rc = -1; + JSON d = NULL; + char key[20] = {'\0'};; + const char *hn = NULL, *en = NULL; + int64_t pid = 0, hindx = 0, eindx = 0, hrank = 0; + + if (!Jget_int64 (o, JSC_PDESC_RANK_PDARRAY_PID, &pid)) return -1; + if (!Jget_int64 (o, JSC_PDESC_RANK_PDARRAY_HINDX, &hindx)) return -1; + if (!Jget_int64 (o, JSC_PDESC_RANK_PDARRAY_EINDX, &eindx)) return -1; + if (!Jget_ar_str (ha, (int)hindx, &hn)) return -1; + if (!Jget_ar_str (ea, (int)eindx, &en)) return -1; + + snprintf (key, 20, "lwj.%ld.%d.procdesc", j, r); + if (kvs_get (h, key, &d) < 0) { + flux_log (h, LOG_ERR, "extract %s: %s", key, strerror (errno)); + goto done; + } + + Jadd_str (d, "command", en); + Jadd_int64 (d, "pid", pid); + errno = 0; + if ( (hrank = strtoul (hn, NULL, 10)) && errno != 0) { + flux_log (h, LOG_ERR, "invalid hostname %s", hn); + goto done; + } + Jadd_int64 (d, "nodeid", (int64_t)hrank); + if (kvs_put (h, key, d) < 0) { + flux_log (h, LOG_ERR, "put %s: %s", key, strerror (errno)); + goto done; + } + rc = 0; + +done: + if (d) + Jput (d); + return rc; +} + +static int update_pdesc (flux_t h, int64_t j, JSON o) +{ + int i = 0; + int rc = -1; + int64_t size = 0; + JSON h_arr = NULL, e_arr = NULL, pd_arr = NULL, pde = NULL; + + if (!Jget_int64 (o, JSC_PDESC_SIZE, &size)) return -1; + if (!Jget_obj (o, JSC_PDESC_PDARRAY, &pd_arr)) return -1; + if (!Jget_obj (o, JSC_PDESC_HOSTNAMES, &h_arr)) return -1; + if (!Jget_obj (o, JSC_PDESC_EXECS, &e_arr)) return -1; + + for (i=0; i < (int) size; ++i) { + if (!Jget_ar_obj (pd_arr, i, &pde)) + goto done; + if ( (rc = update_1pdesc (h, i, j, pde, h_arr, e_arr)) < 0) + goto done; + } + if (kvs_commit (h) < 0) { + flux_log (h, LOG_ERR, "update_pdesc commit failed"); + goto done; + } + rc = 0; + +done: + return rc; +} + +static inline int chk_errnum (flux_t h, int errnum) +{ + if (errnum > 0) { + /* Ignore ENOENT. It is expected when this cb is called right + * after registration. + */ + if (errnum != ENOENT) { + flux_log (h, LOG_ERR, "in a callback %s", strerror (errnum)); + } + return -1; + } + return 0; +} + +static JSON get_update_jcb (flux_t h, int64_t j) +{ + JSON o = NULL; + JSON ss = NULL; + jscctx_t *ctx = getctx (h); + int64_t ostate = (int64_t) J_FOR_RENT; + int64_t nstate = (int64_t) J_FOR_RENT; + + if (extract_raw_state (h, j, &nstate) < 0) { + flux_log (h, LOG_ERR, "Failed to find %ld's new state", j); + return NULL; + } + if ( (ostate = fetch_and_update_state (ctx->active_jobs, j, nstate)) < 0) { + flux_log (h, LOG_INFO, "%ld's old state unavailable", j); + ostate = nstate; + } + o = Jnew (); + ss = Jnew (); + Jadd_int64 (o, JSC_JOBID, j); + Jadd_int64 (ss, JSC_STATE_PAIR_OSTATE , (int64_t) ostate); + Jadd_int64 (ss, JSC_STATE_PAIR_NSTATE, (int64_t) nstate); + json_object_object_add (o, JSC_STATE_PAIR, ss); + return o; +} + + +/****************************************************************************** + * * + * Internal Asynchronous Notification Mechanisms * + * * + ******************************************************************************/ + +static int invoke_cbs (flux_t h, int64_t j, JSON jcb, int errnum) +{ + int rc = 0; + cb_pair_t *c = NULL; + jscctx_t *ctx = getctx (h); + for (c = zlist_first (ctx->callbacks); c; c = zlist_next (ctx->callbacks)) { + if (c->cb (jcb, c->arg, errnum) < 0) { + flux_log (h, LOG_ERR, "callback returns an error"); + rc = -1; + } + } + return rc; +} + +static int job_state_cb (const char *key, const char *val, void *arg, int errnum) +{ + int64_t jobid = -1; + flux_t h = (flux_t) arg; + + if (chk_errnum (h, errnum) < 0) + flux_log (h, LOG_ERR, "job_state_cb: key(%s), val(%s)", key, val); + else if (parse_jobid (key, &jobid) != 0) + flux_log (h, LOG_ERR, "job_state_cb: key ill-formed"); + else if (invoke_cbs (h, jobid, get_update_jcb (h, jobid), errnum) < 0) + flux_log (h, LOG_ERR, "job_state_cb: failed to invoke callbacks"); + + /* always return 0 so that reactor will not return */ + return 0; +} + +static int reg_jobstate_hdlr (flux_t h, const char *path, KVSSetStringF *func) +{ + int rc = 0; + char key[20] = {'\0'}; + + snprintf (key, 20, "%s.state", path); + if (kvs_watch_string (h, key, func, (void *)h) < 0) { + flux_log (h, LOG_ERR, "watch %s: %s.", key, strerror (errno)); + rc = -1; + } else + flux_log (h, LOG_DEBUG, "registered job %s.state CB", path); + return rc; +} + +static int new_job_cb (const char *key, int64_t val, void *arg, int errnum) +{ + int64_t nj = 0; + int64_t js = 0; + JSON ss = NULL; + JSON jcb = NULL; + char k[20] = {'\0'}; + char path[20] = {'\0'}; + flux_t h = (flux_t) arg; + jscctx_t *ctx = getctx (h); + + if (ctx->first_time == 1) { + /* watch is invoked immediately and we shouldn't + * rely on that event at all. + */ + ctx->first_time = 0; + return 0; + } + + if (chk_errnum (h, errnum) < 0) return 0; + + flux_log (h, LOG_DEBUG, "new_job_cb invoked: key(%s), val(%ld)", key, val); + + js = J_NULL; + nj = val-1; + snprintf (k, 20, "%ld", nj); + snprintf (path, 20, "lwj.%ld", nj); + if (zhash_insert (ctx->active_jobs, k, (void *)(intptr_t)js) < 0) { + flux_log (h, LOG_ERR, "new_job_cb: inserting a job to hash failed"); + goto done; + } + + flux_log (h, LOG_DEBUG, "jobstate_hdlr registered"); + jcb = Jnew (); + ss = Jnew (); + Jadd_int64 (jcb, JSC_JOBID, nj); + Jadd_int64 (ss, JSC_STATE_PAIR_OSTATE , (int64_t) js); + Jadd_int64 (ss, JSC_STATE_PAIR_NSTATE, (int64_t) js); + json_object_object_add (jcb, JSC_STATE_PAIR, ss); + + if (reg_jobstate_hdlr (h, path, (KVSSetStringF *) job_state_cb) == -1) { + flux_log (h, LOG_ERR, "new_job_cb: reg_jobstate_hdlr: %s", + strerror (errno)); + } + if (invoke_cbs (h, nj, jcb, errnum) < 0) { + flux_log (h, LOG_ERR, "new_job_cb: failed to invoke callbacks"); + } + +done: + /* always return 0 so that reactor won't return */ + return 0; +} + +static int reg_newjob_hdlr (flux_t h, KVSSetInt64F *func) +{ + if (kvs_watch_int64 (h,"lwj.next-id", func, (void *) h) < 0) { + flux_log (h, LOG_ERR, "watch lwj.next-id: %s", strerror (errno)); + return -1; + } + flux_log (h, LOG_DEBUG, "registered job creation CB"); + return 0; +} + + +/****************************************************************************** + * * + * Public Job Status and Control API * + * * + ******************************************************************************/ + +int jsc_notify_status (flux_t h, JSC_CB_PTR *func, void *d) +{ + int rc = -1; + cb_pair_t *c = NULL; + jscctx_t *ctx = NULL; + + if (!func) + goto done; + if (reg_newjob_hdlr (h, (KVSSetInt64F*)new_job_cb) == -1) { + flux_log (h, LOG_ERR, "jsc_notify_status: reg_newjob_hdlr failed"); + goto done; + } + + ctx = getctx (h); + c = (cb_pair_t *) xzmalloc (sizeof(*c)); + c->cb = func; + c->arg = d; + if (zlist_append (ctx->callbacks, c) < 0) + goto done; + + zlist_freefn (ctx->callbacks, c, free, true); + rc = 0; + +done: + return rc; +} + +int jsc_query_jcb (flux_t h, int64_t jobid, const char *key, JSON *jcb) +{ + int rc = -1; + + if (!key) return -1; + if (jobid_exist (h, jobid) != 0) return -1; + + if (is_jobid (key)) { + if ( (rc = query_jobid (h, jobid, jcb)) < 0) + flux_log (h, LOG_ERR, "query_jobid failed"); + } else if (is_state_pair (key)) { + if ( (rc = query_state_pair (h, jobid, jcb)) < 0) + flux_log (h, LOG_ERR, "query_pdesc failed"); + } else if (is_rdesc (key)) { + if ( (rc = query_rdesc (h, jobid, jcb)) < 0) + flux_log (h, LOG_ERR, "query_rdesc failed"); + } else if (is_rdl (key)) { + if ( (rc = query_rdl (h, jobid, jcb)) < 0) + flux_log (h, LOG_ERR, "query_rdl failed"); + } else if (is_rdl_alloc (key)) { + if ( (rc = query_rdl_alloc (h, jobid, jcb)) < 0) + flux_log (h, LOG_ERR, "query_rdl_alloc failed"); + } else if (is_pdesc (key)) { + if ( (rc = query_pdesc (h, jobid, jcb)) < 0) + flux_log (h, LOG_ERR, "query_pdesc failed"); + } else + flux_log (h, LOG_ERR, "key (%s) not understood", key); + + return rc; +} + +int jsc_update_jcb (flux_t h, int64_t jobid, const char *key, JSON jcb) +{ + int rc = -1; + JSON o = NULL; + + if (!jcb) return -1; + if (jobid_exist (h, jobid) != 0) return -1; + + if (is_jobid (key)) { + flux_log (h, LOG_ERR, "jobid attr cannot be updated"); + } else if (is_state_pair (key)) { + if (Jget_obj (jcb, JSC_STATE_PAIR, &o)) + rc = update_state (h, jobid, o); + } else if (is_rdesc (key)) { + if (Jget_obj (jcb, JSC_RDESC, &o)) + rc = update_rdesc (h, jobid, o); + } else if (is_rdl (key)) { + const char *s = NULL; + if (Jget_str (jcb, JSC_RDL, &s)) + rc = update_rdl (h, jobid, s); + } else if (is_rdl_alloc (key)) { + if (Jget_obj (jcb, JSC_RDL_ALLOC, &o)) + rc = update_rdl_alloc (h, jobid, o); + } else if (is_pdesc (key)) { + if (Jget_obj (jcb, JSC_PDESC, &o)) + rc = update_pdesc (h, jobid, o); + } + else + flux_log (h, LOG_ERR, "key (%s) not understood", key); + + return rc; +} + + +/* + * vi: ts=4 sw=4 expandtab + */ diff --git a/src/modules/libjsc/jstatctl.h b/src/modules/libjsc/jstatctl.h new file mode 100644 index 000000000000..7388efbcbb0b --- /dev/null +++ b/src/modules/libjsc/jstatctl.h @@ -0,0 +1,125 @@ +/*****************************************************************************\ + * Copyright (c) 2015 Lawrence Livermore National Security, LLC. Produced at + * the Lawrence Livermore National Laboratory (cf, AUTHORS, DISCLAIMER.LLNS). + * LLNL-CODE-658032 All rights reserved. + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the license, or (at your option) + * any later version. + * + * Flux is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the IMPLIED WARRANTY OF MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the terms and conditions of the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * See also: http://www.gnu.org/licenses/ +\*****************************************************************************/ + +#ifndef _FLUX_CORE_JSTATCTRL_H +#define _FLUX_CORE_JSTATCTRL_H 1 + +#include +#include + +/** + * Define the job states (an abstraction independent of + * underlying task and program execution services (RFC 8) + * and scheduler implementation details (e.g., how the + * attributes of a job are stored in KVS.) For more details, + * please refer to README.md + */ +typedef enum { + J_NULL = 1, /*!< The state has yet to be assigned */ + J_RESERVED, /*!< Reserved by the program execution service */ + J_SUBMITTED, /*!< Submitted to the system */ + J_PENDING, /*!< Pending */ + J_SCHEDREQ, /*!< Resources requested to be selected */ + J_SELECTED, /*!< Assigned to requested resource in RDL */ + J_ALLOCATED, /*!< Got allocated/contained by the program executoin service */ + J_RUNREQUEST,/*!< Requested to be executed */ + J_STARTING, /*!< Starting */ + J_STOPPED, /*!< Stopped *including init barrier hit for a tool) */ + J_RUNNING, /*!< Running */ + J_CANCELLED, /*!< Cancelled */ + J_COMPLETE, /*!< Completed */ + J_REAPED, /*!< Reaped */ + J_FOR_RENT /*!< Space For Rent */ +} job_state_t; + +typedef int (JSC_CB_PTR)(json_object *base_jcb, void *arg, int errnum); + +/* TODO: find a better way to manage this hierarchical + * JCB attributes space + */ +#define JSC_MAX_ATTR_LEN 32 +#define JSC_JOBID "jobid" +#define JSC_STATE_PAIR "state-pair" +# define JSC_STATE_PAIR_OSTATE "ostate" +# define JSC_STATE_PAIR_NSTATE "nstate" +#define JSC_RDESC "rdesc" +# define JSC_RDESC_NNODES "nnodes" +# define JSC_RDESC_NTASKS "ntasks" +#define JSC_RDL "rdl" +#define JSC_RDL_ALLOC "rdl_alloc" +# define JSC_RDL_ALLOC_CONTAINED "contained" +# define JSC_RDL_ALLOC_CONTAINED_NCORES "cmbdncores" +#define JSC_PDESC "pdesc" +# define JSC_PDESC_SIZE "procsize" +# define JSC_PDESC_HOSTNAMES "hostnames" +# define JSC_PDESC_EXECS "executables" +# define JSC_PDESC_PDARRAY "pdarray" +# define JSC_PDESC_RANK_PDARRAY_PID "pid" +# define JSC_PDESC_RANK_PDARRAY_HINDX "hindx" +# define JSC_PDESC_RANK_PDARRAY_EINDX "eindx" + +/** + * Register a callback to the asynchronous status change notification service. + * "callback" will be invoked when the state of a job changes. The "jobid" + * and "state-pair" will be passed as "base_jcb" into the callback. + * "d" is arbitrary data that will transparently be passed into "callback." + * However, one should pass its flux_t object as part of this callback data. + * Note that the caller must start its reactor to get an asynchronous status + * change notification via "callback." This is because it uses the KVS-watch + * facility which has the same limitation. + * One can register mutliple callbacks by calling this function + * multiple times. The callbacks will be invoked in the order + * they are registered. Returns 0 on success; otherwise -1. + */ +int jsc_notify_status (flux_t h, JSC_CB_PTR *callback, void *d); + +/** + * Query the "key" attribute of JCB of "jobid." The JCB info on this attribute + * will be passed via "jcb." It is the caller's responsibility to release "jcb." + * All of the ownership associated with the sub-attributes in jcb's hierarchy + * are trasferred to "jcb," so that json_object_put (*jcb) will free this hierarchy + * in its entirety. Returns 0 on success; otherwise -1. + */ +int jsc_query_jcb (flux_t h, int64_t jobid, const char *key, json_object **jcb); + + +/** + * Update the "key" attribute of the JCB of "jobid". The top-level attribute + * of "jcb" should be identical to "key". Return 0 on success; otherwise -1. + * This will not release "jcb," so it is the caller's responsibility to + * free "jcb." + */ +int jsc_update_jcb (flux_t h, int64_t jobid, const char *key, json_object *jcb); + + +/** + * A convenience routine (returning the internal state name correponding to "s.") + */ +const char *jsc_job_num2state (job_state_t s); + +#endif /*! _FLUX_CORE_JSTATCTRL_H */ + +/* + * vi: ts=4 sw=4 expandtab + */ From 3cfb887dd25fbf71a9a7de714e14471b58f3382c Mon Sep 17 00:00:00 2001 From: "Dong H. Ahn" Date: Tue, 26 May 2015 14:11:09 -0700 Subject: [PATCH 3/8] Add flux-jstat --- src/cmd/Makefile.am | 7 +- src/cmd/flux-jstat.c | 307 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 313 insertions(+), 1 deletion(-) create mode 100644 src/cmd/flux-jstat.c diff --git a/src/cmd/Makefile.am b/src/cmd/Makefile.am index f2387ea0ba8b..7926dce7c710 100644 --- a/src/cmd/Makefile.am +++ b/src/cmd/Makefile.am @@ -47,7 +47,8 @@ fluxcmd_PROGRAMS = \ flux-comms-stats \ flux-kvs \ flux-start \ - flux-config + flux-config \ + flux-jstat flux_zio_SOURCES = \ flux-zio.c \ @@ -67,3 +68,7 @@ flux_module_LDADD = \ $(top_builddir)/src/modules/libmrpc/libmrpc.la \ $(fluxcmd_ldadd) \ $(LIBUTIL) +flux_jstat_LDADD = \ + $(top_builddir)/src/modules/libjsc/libjsc.la \ + $(fluxcmd_ldadd) \ + $(LIBUTIL) diff --git a/src/cmd/flux-jstat.c b/src/cmd/flux-jstat.c new file mode 100644 index 000000000000..019c343fa69e --- /dev/null +++ b/src/cmd/flux-jstat.c @@ -0,0 +1,307 @@ +/*****************************************************************************\ + * Copyright (c) 2014 Lawrence Livermore National Security, LLC. Produced at + * the Lawrence Livermore National Laboratory (cf, AUTHORS, DISCLAIMER.LLNS). + * LLNL-CODE-658032 All rights reserved. + * + * This file is part of the Flux resource manager framework. + * For details, see https://github.com/flux-framework. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the Free + * Software Foundation; either version 2 of the license, or (at your option) + * any later version. + * + * Flux is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the IMPLIED WARRANTY OF MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the terms and conditions of the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * See also: http://www.gnu.org/licenses/ +\*****************************************************************************/ + +#if HAVE_CONFIG_H +#include "config.h" +#endif +#include +#include +#include +#include +#include +#include +#include +#include + +#include "src/common/libutil/log.h" +//#include "src/modules/libjsc/jstatctl.h" +#include "src/common/libutil/jsonutil.h" +#include "src/common/libutil/xzmalloc.h" +#include "src/common/libutil/shortjson.h" + + +/****************************************************************************** + * * + * Internal types, macros and static variables * + * * + ******************************************************************************/ +typedef struct { + flux_t h; + FILE *op; +} jstatctx_t; + +/* Note: Should only be used for signal handling */ +static flux_t sig_flux_h; + +#define OPTIONS "o:h" +static const struct option longopts[] = { + {"help", no_argument, 0, 'h'}, + {"testout", required_argument, 0, 'o'}, + { 0, 0, 0, 0 }, +}; + + +/****************************************************************************** + * * + * Utilities * + * * + ******************************************************************************/ +static void usage (void) +{ + fprintf (stderr, +"Usage: flux-jstat notify\n" +" flux-jstat query jobid \n" +" flux-jstat update jobid \n" +); + exit (1); +} + +static void freectx (jstatctx_t *ctx) +{ + if (ctx->op) + fclose (ctx->op); +} + +static jstatctx_t *getctx (flux_t h) +{ + jstatctx_t *ctx = (jstatctx_t *)flux_aux_get (h, "jstat"); + if (!ctx) { + ctx = xzmalloc (sizeof (*ctx)); + ctx->h = h; + ctx->op = NULL; + flux_aux_set (h, "jstat", ctx, (FluxFreeFn)freectx); + } + return ctx; +} + +static void sig_handler (int s) +{ + if (s == SIGINT) { + fprintf (stdout, "Exit on INT"); + exit (0); + } +} + +static FILE *open_test_outfile (const char *fn) +{ + FILE *fp; + if (!fn) + fp = NULL; + else if ( !(fp = fopen (fn, "w"))) + fprintf (stderr, "Failed to open %s\n", fn); + return fp; +} + +static JSON parse_json_str (const char *jcbstr) +{ + int len = 0; + JSON jcb = NULL; + struct json_tokener *tok = NULL; + + if (jcbstr) + len = strnlen (jcbstr, sysconf (_SC_ARG_MAX)); + if (!(tok = json_tokener_new ())) + errno = ENOMEM; + else if (!(jcb = json_tokener_parse_ex (tok, jcbstr, len))) + errno = EPROTO; + + if (tok) + json_tokener_free (tok); + + return jcb; +} + +static inline void get_jobid (JSON jcb, int64_t *j) +{ + Jget_int64 (jcb, JSC_JOBID, j); +} + +static inline void get_states (JSON jcb, int64_t *os, int64_t *ns) +{ + JSON o = NULL; + Jget_obj (jcb, JSC_STATE_PAIR, &o); + Jget_int64 (o, JSC_STATE_PAIR_OSTATE, os); + Jget_int64 (o, JSC_STATE_PAIR_NSTATE, ns); +} + + +/****************************************************************************** + * * + * Async notification callback * + * * + ******************************************************************************/ + +static int job_status_cb (JSON jcb, void *arg, int errnum) +{ + int64_t os = 0; + int64_t ns = 0; + int64_t j = 0; + jstatctx_t *ctx = NULL; + flux_t h = (flux_t)arg; + + ctx = getctx (h); + if (errnum > 0) { + flux_log (ctx->h, LOG_ERR, "job_status_cb: errnum passed in"); + return -1; + } + + get_jobid (jcb, &j); + get_states (jcb, &os, &ns); + Jput (jcb); + + fprintf (ctx->op, "%s->%s\n", + jsc_job_num2state ((job_state_t)os), + jsc_job_num2state ((job_state_t)ns)); + fflush (ctx->op); + + return 0; +} + + +/****************************************************************************** + * * + * Top-level Handlers for each of notify, query and update commands * + * * + ******************************************************************************/ + +static int handle_notify_req (flux_t h, const char *ofn) +{ + jstatctx_t *ctx = NULL; + + sig_flux_h = h; + if (signal (SIGINT, sig_handler) == SIG_ERR) + return -1; + + ctx = getctx (h); + ctx->op = (ofn)? open_test_outfile (ofn) : stdout; + + if (jsc_notify_status (h, job_status_cb, (void *)h) != 0) { + flux_log (h, LOG_ERR, "failed to reg a job status change CB"); + return -1; + } + if (flux_reactor_start (h) < 0) + flux_log (h, LOG_ERR, "error in flux_reactor_start"); + + return 0; +} + +static int handle_query_req (flux_t h, int64_t j, const char *k, const char *n) +{ + JSON jcb = NULL; + jstatctx_t *ctx = NULL; + + ctx = getctx (h); + ctx->op = n? open_test_outfile (n) : stdout; + if (jsc_query_jcb (h, j, k, &jcb) != 0) { + flux_log (h, LOG_ERR, "jsc_query_jcb reported an error\n"); + return -1; + } + fprintf (ctx->op, "Job Control Block: attribute %s for job %ld\n", k, j); + fprintf (ctx->op, "%s\n", + json_object_to_json_string_ext (jcb, JSON_C_TO_STRING_PRETTY)); + Jput (jcb); + return 0; +} + +static int handle_update_req (flux_t h, int64_t j, const char *k, + const char *s, const char *n) +{ + JSON jcb = NULL; + jstatctx_t *ctx = NULL; + ctx = getctx (h); + ctx->op = n? open_test_outfile (n) : stdout; + + if (!(jcb = parse_json_str (s))) { + flux_log (h, LOG_ERR, "parse_json_str parse error.\n"); + return -1; + } + return (jsc_update_jcb (ctx->h, j, k, jcb)); +} + + +/****************************************************************************** + * * + * Main entry point * + * * + ******************************************************************************/ + +int main (int argc, char *argv[]) +{ + flux_t h; + int ch = 0; + int rc = 0; + char *cmd = NULL; + const char *j = NULL; + const char *ofn = NULL; + const char *attr = NULL; + const char *jcbstr = NULL; + + log_init ("flux-jstat"); + while ((ch = getopt_long (argc, argv, OPTIONS, longopts, NULL)) != -1) { + switch (ch) { + case 'h': /* --help */ + usage (); + break; + case 'o': /* --testout */ + ofn = xasprintf ("%s", optarg); + break; + default: + usage (); + break; + } + } + if (optind == argc) + usage (); + + if (!(h = flux_open (NULL, 0))) + err_exit ("flux_open"); + + flux_log_set_facility (h, "jstat"); + cmd = argv[optind++]; + + if (!strcmp ("notify", cmd)) + rc = handle_notify_req (h, (const char *)ofn); + else if (!strcmp ("query", cmd)) { + j = (const char *)(*(argv+optind)); + attr = (const char *)(*(argv+optind+1)); + rc = handle_query_req (h, strtol (j, NULL, 10), attr, ofn); + } + else if (!strcmp ("update", cmd)) { + j = (const char *)(*(argv+optind)); + attr = (const char *)(*(argv+optind+1)); + jcbstr = (const char *)(*(argv+optind+2)); + rc = handle_update_req (h, strtol (j, NULL, 10), attr, jcbstr, ofn); + } + else + usage (); + + flux_close (h); + log_fini (); + + return (!rc)? 0: 42; +} + +/* + * vi:tabstop=4 shiftwidth=4 expandtab + */ From 47f9119258b73d690462ee915a0b0f68142456f6 Mon Sep 17 00:00:00 2001 From: "Dong H. Ahn" Date: Tue, 26 May 2015 14:23:28 -0700 Subject: [PATCH 4/8] Add tests for JSC including --long support --- t/Makefile.am | 1 + t/t2001-jsc.t | 252 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 253 insertions(+) create mode 100755 t/t2001-jsc.t diff --git a/t/Makefile.am b/t/Makefile.am index 9e47eccf812d..90a205789a92 100644 --- a/t/Makefile.am +++ b/t/Makefile.am @@ -15,6 +15,7 @@ TESTS = \ t1004-log.t \ t1005-cmddriver.t \ t2000-wreck.t \ + t2001-jsc.t \ lua/t0001-send-recv.t \ lua/t0002-rpc.t \ lua/t0003-events.t \ diff --git a/t/t2001-jsc.t b/t/t2001-jsc.t new file mode 100755 index 000000000000..6e0041e92cb6 --- /dev/null +++ b/t/t2001-jsc.t @@ -0,0 +1,252 @@ +#!/bin/sh + +test_description='Test basic jstat functionality + +Test the basic functionality of job status and control API.' + +. `dirname $0`/sharness.sh + +test_under_flux 4 +if test "$TEST_LONG" = "t"; then + test_set_prereq LONGTEST +fi + +tr1="null->null" +tr2="null->reserved" +tr3="reserved->starting" +tr4="starting->running" +tr5="running->complete" +trans="$tr1 +$tr2 +$tr3 +$tr4 +$tr5" + +run_flux_jstat () { + sess=$1 + rm -f jstat$sess.pid + ( + # run this in a subshell + flux jstat -o output.$sess notify & + p=$! + cat < jstat$sess.pid +$p +HEREDOC + wait $p + #rm -f output.$sess + )& + return 0 +} + +sync_flux_jstat () { + sess=$1 + while [ ! -f output.$sess ] + do + sleep 2 + done + p=`cat jstat$sess.pid` + echo $p +} + +overlap_flux_wreckruns () { + insts=$(($1-1)) + pids="" + for i in `seq 0 $insts`; do + st=$(($insts + 3 - 2*$i)) + flux wreckrun -n4 -N4 sleep $st & + pids="$pids $!" + done + for i in $pids; do + wait $i + done + return 0 +} + +test_expect_success 'jstat 1: notification works for 1 wreckrun' ' + run_flux_jstat 1 && + p=$( sync_flux_jstat 1) && + run_timeout 2 flux wreckrun -n4 -N4 hostname && + cat >expected <<-EOF && +$trans +EOF + cp output.1 output.1.cp && + kill -INT $p && + test_cmp expected output.1.cp +' + +test_expect_success 'jstat 2: jstat back-to-back works' ' + run_flux_jstat 2 && + p=$( sync_flux_jstat 2) && + run_timeout 2 flux wreckrun -n4 -N4 hostname && + cat >expected <<-EOF && +$trans +EOF + cp output.2 output.2.cp && + kill -INT $p && + test_cmp expected output.2.cp +' + +test_expect_success 'jstat 3: notification works for multiple wreckruns' ' + run_flux_jstat 3 && + p=$( sync_flux_jstat 3 ) && + run_timeout 2 flux wreckrun -n4 -N4 hostname && + run_timeout 2 flux wreckrun -n4 -N4 hostname && + run_timeout 2 flux wreckrun -n4 -N4 hostname && + cat >expected <<-EOF && +$trans +$trans +$trans +EOF + cp output.3 output.3.cp && + kill -INT $p && + test_cmp expected output.3.cp +' + +test_expect_success LONGTEST 'jstat 4: notification works under lock-step stress' ' + run_flux_jstat 4 && + p=$( sync_flux_jstat 4 ) && + for i in `seq 1 20`; do + run_timeout 2 flux wreckrun -n4 -N4 hostname + done && + cat >expected <<-EOF && +$trans +$trans +$trans +$trans +$trans +$trans +$trans +$trans +$trans +$trans +$trans +$trans +$trans +$trans +$trans +$trans +$trans +$trans +$trans +$trans +EOF + cp output.4 output.4.cp && + kill -INT $p && + test_cmp expected output.4.cp +' + +test_expect_success 'jstat 5: notification works for overlapping wreckruns' ' + run_flux_jstat 5 && + p=$( sync_flux_jstat 5 ) && + overlap_flux_wreckruns 3 && + cat >expected <<-EOF && +$trans +$trans +$trans +EOF + sort expected > expected.sort && + cp output.5 output.5.cp && + sort output.5.cp > output.5.sort && + kill -INT $p && + test_cmp expected.sort output.5.sort +' + +test_expect_success LONGTEST 'jstat 6: notification works for overlapping stress' ' + run_flux_jstat 6 && + p=$( sync_flux_jstat 6 ) && + overlap_flux_wreckruns 20 && + cat >expected <<-EOF && +$trans +$trans +$trans +$trans +$trans +$trans +$trans +$trans +$trans +$trans +$trans +$trans +$trans +$trans +$trans +$trans +$trans +$trans +$trans +$trans +EOF + sort expected > expected.sort && + cp output.6 output.6.cp && + sort output.6.cp > output.6.sort && + kill -INT $p && + test_cmp expected.sort output.6.sort +' + +test_expect_success 'jstat 7: basic query works' ' + flux jstat query 1 jobid && + flux jstat query 1 state-pair && + flux jstat query 1 rdesc && + flux jstat query 1 pdesc +' + +test_expect_success 'jstat 8: query detects bad inputs' ' + test_expect_code 42 flux jstat query 0 jobid && + test_expect_code 42 flux jstat query 99999 state-pair && + test_expect_code 42 flux jstat query 1 unknown && + test_expect_code 42 flux jstat query 99999 unknown +' + +test_expect_success 'jstat 9: update state-pair' " + flux jstat update 1 state-pair '{\"state-pair\": {\"ostate\": 13, \"nstate\": 12}}' && + flux kvs get lwj.1.state > output.9.1 && + cat >expected.9.1 <<-EOF && +cancelled +EOF + test_cmp expected.9.1 output.9.1 +" + +test_expect_success 'jstat 10: update procdescs' " + flux kvs get lwj.1.0.procdesc > output.10.1 + flux jstat update 1 pdesc '{\"pdesc\": {\"procsize\":1, \"hostnames\":[\"0\"], \"executables\":[\"fake\"], \"pdarray\":[{\"pid\":8482,\"eindx\":0,\"hindx\":0}]}}' && + flux kvs get lwj.1.0.procdesc > output.10.2 && + test_expect_code 1 diff output.10.1 output.10.2 +" + +test_expect_success 'jstat 11: update rdesc' " + flux jstat update 1 rdesc '{\"rdesc\": {\"nnodes\": 128, \"ntasks\": 128}}' && + flux kvs get lwj.1.ntasks > output.11.1 && + cat > expected.11.1 <<-EOF && +128 +EOF + test_cmp expected.11.1 output.11.1 +" + +test_expect_success 'jstat 12: update rdl' " + flux jstat update 1 rdl '{\"rdl\": \"fake_rdl_string\"}' && + flux kvs get lwj.1.rdl > output.12.1 && + cat > expected.12.1 <<-EOF && +fake_rdl_string +EOF + test_cmp expected.12.1 output.12.1 +" + +test_expect_success 'jstat 13: update rdl_alloc' " + flux jstat update 1 rdl_alloc '{\"rdl_alloc\": [{\"contained\": {\"cmbdncores\": 102}}]}' && + flux kvs get lwj.1.rank.0.cores > output.13.1 && + cat > expected.13.1 <<-EOF && +102 +EOF + test_cmp expected.13.1 output.13.1 +" + +test_expect_success 'jstat 14: update detects bad inputs' " + test_expect_code 42 flux jstat update 1 jobid '{\"jobid\": 1}' && + test_expect_code 42 flux jstat update 0 rdesc '{\"rdesc\": {\"nnodes\": 128, \"ntasks\": 128}}' && + test_expect_code 42 flux jstat update 1 rdesctypo '{\"rdesc\": {\"nnodes\": 128, \"ntasks\": 128}}' && + test_expect_code 42 flux jstat update 1 rdesc '{\"pdesc\": {\"nnodes\": 128, \"ntasks\": 128}}' && + test_expect_code 42 flux jstat update 1 state-pair '{\"unknown\": {\"ostate\": 12, \"nstate\": 11}}' +" + +test_done From 870ec2e5a68f368867c5d7c752f545cd6071ec20 Mon Sep 17 00:00:00 2001 From: "Dong H. Ahn" Date: Wed, 27 May 2015 19:25:23 -0700 Subject: [PATCH 5/8] Change a typedef function pointer name per RFC 7 style revision --- src/modules/libjsc/jstatctl.c | 4 ++-- src/modules/libjsc/jstatctl.h | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/modules/libjsc/jstatctl.c b/src/modules/libjsc/jstatctl.c index 1c4fe691b505..2856bf61d340 100644 --- a/src/modules/libjsc/jstatctl.c +++ b/src/modules/libjsc/jstatctl.c @@ -51,7 +51,7 @@ typedef struct { } stab_t; typedef struct { - JSC_CB_PTR *cb; + jsc_handler_f cb; void *arg; } cb_pair_t; @@ -828,7 +828,7 @@ static int reg_newjob_hdlr (flux_t h, KVSSetInt64F *func) * * ******************************************************************************/ -int jsc_notify_status (flux_t h, JSC_CB_PTR *func, void *d) +int jsc_notify_status (flux_t h, jsc_handler_f func, void *d) { int rc = -1; cb_pair_t *c = NULL; diff --git a/src/modules/libjsc/jstatctl.h b/src/modules/libjsc/jstatctl.h index 7388efbcbb0b..871300811fcc 100644 --- a/src/modules/libjsc/jstatctl.h +++ b/src/modules/libjsc/jstatctl.h @@ -53,7 +53,7 @@ typedef enum { J_FOR_RENT /*!< Space For Rent */ } job_state_t; -typedef int (JSC_CB_PTR)(json_object *base_jcb, void *arg, int errnum); +typedef int (*jsc_handler_f)(json_object *base_jcb, void *arg, int errnum); /* TODO: find a better way to manage this hierarchical * JCB attributes space @@ -92,7 +92,7 @@ typedef int (JSC_CB_PTR)(json_object *base_jcb, void *arg, int errnum); * multiple times. The callbacks will be invoked in the order * they are registered. Returns 0 on success; otherwise -1. */ -int jsc_notify_status (flux_t h, JSC_CB_PTR *callback, void *d); +int jsc_notify_status (flux_t h, jsc_handler_f callback, void *d); /** * Query the "key" attribute of JCB of "jobid." The JCB info on this attribute From 59e7815cb08508b1da7692ac86ead67aaeeec48b Mon Sep 17 00:00:00 2001 From: "Dong H. Ahn" Date: Wed, 27 May 2015 19:25:57 -0700 Subject: [PATCH 6/8] Wrap markdown source lines at 72 Apparently, there is no markdown construct that allows for table row expansion. So some lines corresponding to table rows still remain to be long. --- src/modules/libjsc/README.md | 213 +++++++++++++++++++++++------------ 1 file changed, 140 insertions(+), 73 deletions(-) diff --git a/src/modules/libjsc/README.md b/src/modules/libjsc/README.md index a686c688f94b..99acac81f5d4 100644 --- a/src/modules/libjsc/README.md +++ b/src/modules/libjsc/README.md @@ -1,67 +1,93 @@ Job Status and Control Application Programming Interface =================== -The Job Status and Control (JSC) API is a high-level abstraction that allows its client software to monitor and control the status of Flux jobs. It is designed to expose the job status and control abstraction in a way to hide the underlying data layout of job information stored within Flux's KVS data store. We expect that schedulers and runtime tools will be its main users. This abstraction provides the producers of job information including a task and program execution service module such as `flux-wreckrun` with an opportunity to change and optimize the data layout of jobs within the KVS without presenting major impacts to the implementation of the schedulers and runtime tools. - -1. Design Considerations +The Job Status and Control (JSC) API is a high-level abstraction that +allows its client software to monitor and control the status of Flux +jobs. It is designed to expose the job status and control abstraction +in a way to hide the underlying data layout of job information stored +within Flux's KVS data store. We expect that schedulers and runtime +tools will be its main users. This abstraction provides the producers +of job information including a task and program execution service +module such as `flux-wreckrun` with an opportunity to change and +optimize the data layout of jobs within the KVS without presenting +major impacts to the implementation of the schedulers and runtime tools. + +1. Design Consideration ------------- The main design considerations are the following: ->1. Abstract out implementation-dependent job states; ->2. Provide flexible and easily extensible mechanisms to pass job information; +>1. Abstract out implementation-dependent job states; +>2. Provide flexible and easily extensible mechanisms to pass job +information; >3. Use a minimalistic API set. -The first consideration has led us to use a C enumerator (i.e., *job\_state\_t*) to capture the job states. However, because Flux has not yet defined its job schema, the second consideration discouraged us to use a C user-defined type to pass job information with the client software. Instead, JSC uses an JSON to capture the job information and introduce the notion of Job Control Block (JCB) to have a structure on this information. We will try to keep backward compatibility on JCB's structure as we will extend this to keep abreast of the evolution of Flux's job schema. Finally, the third consideration has led us to introduce three simple API calls as the main JSC idioms: job status-change notification as well as JCB query and update. Client software can use the notification call to get the status of a job asynchronously on a state change; the query and update calls to fetch and update a job's JCB, respectively. +The first consideration has led us to use a C enumerator (i.e., +*job\_state\_t*) to capture the job states. However, because Flux has +not yet defined its job schema, the second consideration discouraged us +to use a Cuser-defined type to pass job information with the client +software. Instead, JSC uses an JSON to capture the job information and +introduce the notion of Job Control Block (JCB) to have a structure on +this information. We will try to keep backward compatibility on JCB's +structure as we will extend this to keep abreast of the evolution of +Flux's job schema. Finally, the third consideration has led us to +introduce three simple API calls as the main JSC idioms: job +status-change notification as well as JCB query and update. Client +software can use the notification call to get the status of a job +asynchronously on a state change; the query and update calls to fetch +and update a job's JCB, respectively. 2. Job States ------------- -The JSC API converts the state strings produced by `flux-wreckrun` and the scheduler framework service comms module into a C enumerator: *job\_state\_t*. Its elements are shown in Table 2-1. If the raw state strings are changed in the future, one must accommodate this JCB implementation accordingly--mapping the new strings to these state elements. We expect the state set will further be extended as we will add more advanced services such as elastic job and resource management service. - -| Elements | Comment | -|--------------|-----------------------------------------------------------------| -| J_NULL | No state has been assigned | -| J_RESERVED | Reserved in KVS | -| J_SUBMITTED | Added to KVS | -| J_PENDING | Pending | -| J_SCHEDREQ | Resource selection requested | -| J_ALLOCATED | Resource allocated/contained in the Flux instance | -| J_RUNREQUEST | Requested to be executed | -| J_STARTING | Starting | -| J_STOPPED | Stopped -- including the stop after `exec` under a tool's control | -| J_RUNNING | Running | -| J_CANCELLED | Cancelled | -| J_COMPLETE | Complete | -| J_REAPED | Reaped to the upper-level Flux instance | -| J_FOR_RENT | To be extended | - -**Table 2-1** Job state elements - +The JSC API converts the state strings produced by `flux-wreckrun` and +the scheduler framework service comms module into a C enumerator: +*job\_state\_t*. Its elements are shown in Table 2-1. If the raw state +strings are changed in the future, one must accommodate this JCB +implementation accordingly--mapping the new strings to these state +elements. We expect the state set will further be extended as we will +add more advanced services such as elastic job and resource management +service. + +| Elements | Comment | +|--------------|------------------------------------------------------| +| J_NULL | No state has been assigned | +| J_RESERVED | Reserved in KVS | +| J_SUBMITTED | Added to KVS | +| J_PENDING | Pending | +| J_SCHEDREQ | Resource selection requested | +| J_ALLOCATED | Resource allocated/contained in the Flux instance | +| J_RUNREQUEST | Requested to be executed | +| J_STARTING | Starting | +| J_STOPPED | Stopped | +| J_RUNNING | Running | +| J_CANCELLED | Cancelled | +| J_COMPLETE | Complete | +| J_REAPED | Reaped to the upper-level Flux instance | +| J_FOR\_RENT | To be extended | + +**Table 2-1** Job state elements -3. Job Control Block -------------- -Job Control Block (JCB) is our data schema containing the information needed to manage a particular job. It contains information such as jobid, resources owned by the job, as well as the processes spawned by the job. The JSC API converts the raw information on a job into an JCB, implemented as an JSON dictionary object. Our current JCB structure is shown in Table 3-1 through 3-6. As Flux's job schema evolves, we will extend JCB while trying our best to keep backward compatibility. | Key | Macro | Value Type | Comment | |------------|----------------|----------------|--------------------------------------------------------------------------------| | jobid | JSC_JOBID | 64-bit integer | Job id | -| state-pair | JSC_STATE\_PAIR | dictionary | A dictionary containing this old and new states of the job. See Table 3-2. | +| state-pair | JSC_STATE\_PAIR| dictionary | A dictionary containing this old and new states of the job. See Table 3-2. | | rdesc | JSC_RDESC | dictionary | Information on the resources owned by this job. See Table 3-3. | | rdl | JSC_RDL | string | RDL binary string allocated to the job | -| rdl_alloc | JSC_RDL\_ALLOC | array of per-cmbd resources | Resource descriptor array (Resources allocated per cmbd - cmbd rank order). See Table 3-4.| +| rdl_alloc | JSC_RDL\_ALLOC | array of per-cmbd resources | Resource descriptor array (Resources allocated per cmbd - cmbd rank order). See Table 3-4.| | pdesc | JSC_PDESC | dictionary | Information on the processes spawned by this job. See Table 3-5. | -**Table 3-1** Keys and values of top-level JCB attributes +**Table 3-1** Keys and values of top-level JCB attributes -| Key | Macro | Value Type | Comment | -|------------|-----------------------|----------------|-------------------------------------| +| Key | Macro | Value Type | Comment | +|------------|-------------------------|----------------|---------------------------------------| | ostate | JSC_STATE\_PAIR\_OSTATE | 64-bit integer | Old state (a *job\_state\_t* element) | | nstate | JSC_STATE\_PAIR\_NSTATE | 64-bit integer | New state (a *job\_state\_t* element) | -**Table 3-2** Keys and values of *state-pair* attribute +**Table 3-2** Keys and values of *state-pair* attribute -| Key | Macro | Value Type | Comment | -|------------|------------------|----------------|---------------| +| Key | Macro | Value Type | Comment | +|------------|------------------|-----------------|---------------| | nnodes | JSC_RDESC\_NNODES | 64-bit integer | Node count | | ntasks | JSC_RDESC\_NTASKS | 64-bit integer | Process count | @@ -70,30 +96,30 @@ Job Control Block (JCB) is our data schema containing the information needed to | Key | Macro | Value Type | Comment | |------------|------------------------------|----------------|---------------------------------| -| contained | JSC_RDL\_ALLOC\_CONTAINED | dictionary | Per cmdb resource containment See Table 3-4-1 | +| contained | JSC_RDL\_ALLOC\_CONTAINED | dictionary | Per cmdb resource containment See Table 3-4-1 | **Table 3-4** Keys and values of *rdl\_alloc* attribute -| Key | Macro | Value Type | Comment | -|------------|------------------------------|----------------|---------------------------------| +| Key | Macro | Value Type | Comment | +|------------|-----------------------------------|----------------|---------------------------------| | cmbdncores | JSC_RDL\_ALLOC\_CONTAINED\_NCORES | 64-bit integer | Core count to use for this cmdb | **Table 3-4-1** Keys and values of *rsarray* attribute -| Key | Macro | Value Type | Comment | -|------------|---------------------|-----------------------------|---------------------------------------------------------------------| +| Key | Macro | Value Type | Comment | +|------------|----------------------|-----------------------------|---------------------------------------------------------------------| | procsize | JSC_PDESC\_SIZE | 64-bit integer | Process count | | hostnames | JSC_PDESC\_HOSTNAMES | array of strings | Host name array (Names are current home cmbd rank) | | executables| JSC_PDESC\_EXECS | array of strings | Executable name array | -| pdarray | JSC_PDESC\_PDARRAY | array of dictionary objects | Process descriptor array (MPI rank order). See Table 3-6 for each pdarray element | +| pdarray | JSC_PDESC\_PDARRAY | array of dictionary objects | Process descriptor array (MPI rank order). See Table 3-6 for each pdarray element | -**Table 3-5** Keys and values of *pdesc* attribute +**Table 3-5** Keys and values of *pdesc* attribute -| Key | Macro | Value Type | Comment | -|------------|------------------------------|----------------|--------------------------------------------------------| +| Key | Macro | Value Type | Comment | +|------------|---------------------------------|----------------|--------------------------------------------------------| | pid | JSC_PDESC\_RANK\_PDARRAY\_PID | 64-bit integer | Process count | | hindx | JSC_PDESC\_RANK\_PDARRAY\_HINDX | 64-bit integer | Host name (indexing into the hostname array) | | eindx | JSC_PDESC\_RANK\_PDARRAY\_EINDX | 64-bit integer | Executable name (indexing into the executable name array) | @@ -101,56 +127,97 @@ Job Control Block (JCB) is our data schema containing the information needed to **Table 3-6** Keys and values of each *pdarray* element +3. Job Control Block +------------- +Job Control Block (JCB) is our data schema containing the information +needed to manage a particular job. It contains information such as +jobid, resources owned by the job, as well as the processes spawned by +the job. The JSC API converts the raw information on a job into an +JCB, implemented as an JSON dictionary object. Our current JCB +structure is shown in Table 3-1 through 3-6. As Flux's job schema +evolves, we will extend JCB while trying our best to keep backward +compatibility. + + 4. Application Programming Interface ------------- The Job Status and Control API currently provides three main calls: -> Note: typedef int (JSC_CB_PTR)(json_object \*base_jcb, void \*arg, int errnum); -> ->- int jsc_notify_status (flux_t h, JSC_CB_PTR \*callback, void \*d); ->- int jsc_query_jcb (flux_t h, int64_t jobid, const char \*key, json_object \*\*jcb); ->- int jsc_update_jcb (flux_t h, int64_t jobid, const char \*key, json_object \*jcb); +> Note: typedef int (\*jsc\_handler\_f)(json\_object \*base\_jcb, void +\*arg, int +errnum); +> +>- int jsc\_notify\_status (flux\_t h, jsc\_handler\_f callback, void +\*d); +>- int jsc\_query\_jcb (flux\_t h, int64\_t jobid, const char \*key, +json\_object +\*\*jcb); +>- int jsc\_update\_jcb (flux\_t h, int64\_t jobid, const char \*key, +json\_object +\*jcb); #### jsc\_notify\_status -Registers a *callback* to the asynchronous status change notification service. -*callback* will be invoked when the state of a job changes. The *jobid* and *state-pair* -as shown in Table 3-2 will be passed as *base_jcb* into the *callback*. -*d* is arbitrary data that will transparently be passed into *callback*. -However, one should pass its *flux_t* object as part of this callback data. -Note that the caller must start its reactor to get an asynchronous status -change notification via *callback*. This is because it uses the KVS-watch -facility which has the same limitation. One can register mutliple callbacks -by calling this function multiple times. The callbacks will be invoked in the order -they are registered. Returns 0 on success; otherwise -1. +Register a *callback* to the asynchronous status change notification +service. *callback* will be invoked when the state of a job changes. +The *jobid* and *state-pair* as shown in Table 3-2 will be passed as +*base_jcb* into the *callback*. *d* is arbitrary data that will +transparently be passed into *callback*. However, one should pass its +*flux_t* object as part of this callback data. Note that the caller +must start its reactor to get an asynchronous status change +notification via *callback*. This is because it uses the KVS-watch +facility which has the same limitation. One can register multiple +callbacks by calling this function multiple times. The callbacks will +be invoked in the order they are registered. Returns 0 on success; +otherwise -1. #### jsc\_query\_jcb -Queries the *key* attribute of JCB of *jobid*. The JCB info on this attribute -will be passed via *jcb*. It is the responsibility of the caller to release *jcb*. -All ownership associated with the sub-attributes in *jcb*'s hierarchy -are transferred to *jcb*, so that json_object_put (\*jcb) will free this hierarchy -in its entirety. Returns 0 on success; otherwise -1. +Query the *key* attribute of JCB of *jobid*. The JCB info on this +attribute will be passed via *jcb*. It is the responsibility of the +caller to release *jcb*. All ownership associated with the +sub-attributes in *jcb*'s hierarchy are transferred to *jcb*, so that +json_object_put (\*jcb) will free this hierarchy in its entirety. +Returns 0 on success; otherwise -1. ####jsc\_update\_jcb -Updates the *key* attribute within the JCB of *jobid*. The top-level attribute of *jcb* should be the same as *key*. Returns 0 on success; otherwise -1. This will not release *jcb* so it is the responsibility of the caller to free *jcb*. +Update the *key* attribute within the JCB of *jobid*. The top-level +attribute of *jcb* should be the same as *key*. Returns 0 on success; +otherwise -1. This will not release *jcb* so it is the responsibility +of the caller to free *jcb*. >**Notes:** ->1. JCB granularity optimization -- one can optimize the amounts of JCB information piggybacked with each notification (*base_jcb*). One can further extend the single attribute-wise query/update pattern to group-wise ones once the access patterns of JCS API's clients are known. - ->2. JCB producer-consumer synchronization -- currently there is no built-in synchronization between JCB producers and consumers and thus a race condition can occur. When the remote parallel execution changes the state of a job, and the registered callbacks will be invoked. However, when one of the invoked callbacks is trying to read an JCB attribute, nothing prevents the remote execution from modifying the same JCB attribute! Because producers and consumers use the KVS like a distributed shared memory, one must devise ways to guarantee synchronization. One solution is for the producers also use the JSC API and we build some synchronization primitives into this API. But for now, we ignore these synchronization issues. +>1. JCB granularity optimization -- one can optimize the amounts of JCB +information piggybacked with each notification (*base_jcb*). One can +further extend the single attribute-wise query/update pattern to +group-wise ones once the access patterns of JCS API's clients are +known. + +>2. JCB producer-consumer synchronization -- currently there is no +built-in synchronization between JCB producers and consumers and thus a +race condition can occur. When the remote parallel execution changes +the state of a job, and the registered callbacks will be invoked. +However, when one of the invoked callbacks is trying to read an JCB +attribute, nothing prevents the remote execution from modifying the +same JCB attribute! Because producers and consumers use the KVS like a +distributed shared memory, one must devise ways to guarantee +synchronization. One solution is for the producers also use the JSC API +and we build some synchronization primitives into this API. But for +now, we ignore these synchronization issues. 5. Testing ------------- -To facilitate the testing of this API, we created an utility command: `flux-jstat`. Its usage is the following: +To facilitate the testing of this API, we created an utility command: +`flux-jstat`. Its usage is the following: Usage: flux-jstat notify flux-jstat query jobid flux-jstat update jobid -Further, `flux-core/t/t2001-jsc.t` contains various test cases that use this utility. +Further, `flux-core/t/t2001-jsc.t` contains various test cases that use +this utility. From 251ae18d1666f930a5ac93f15c5063e639086c76 Mon Sep 17 00:00:00 2001 From: "Dong H. Ahn" Date: Wed, 27 May 2015 21:18:09 -0700 Subject: [PATCH 7/8] Increase run_time to 4 for wreckrun instances --- t/t2001-jsc.t | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/t/t2001-jsc.t b/t/t2001-jsc.t index 6e0041e92cb6..139e01bf67cb 100755 --- a/t/t2001-jsc.t +++ b/t/t2001-jsc.t @@ -65,7 +65,7 @@ overlap_flux_wreckruns () { test_expect_success 'jstat 1: notification works for 1 wreckrun' ' run_flux_jstat 1 && p=$( sync_flux_jstat 1) && - run_timeout 2 flux wreckrun -n4 -N4 hostname && + run_timeout 4 flux wreckrun -n4 -N4 hostname && cat >expected <<-EOF && $trans EOF @@ -77,7 +77,7 @@ EOF test_expect_success 'jstat 2: jstat back-to-back works' ' run_flux_jstat 2 && p=$( sync_flux_jstat 2) && - run_timeout 2 flux wreckrun -n4 -N4 hostname && + run_timeout 4 flux wreckrun -n4 -N4 hostname && cat >expected <<-EOF && $trans EOF @@ -89,9 +89,9 @@ EOF test_expect_success 'jstat 3: notification works for multiple wreckruns' ' run_flux_jstat 3 && p=$( sync_flux_jstat 3 ) && - run_timeout 2 flux wreckrun -n4 -N4 hostname && - run_timeout 2 flux wreckrun -n4 -N4 hostname && - run_timeout 2 flux wreckrun -n4 -N4 hostname && + run_timeout 4 flux wreckrun -n4 -N4 hostname && + run_timeout 4 flux wreckrun -n4 -N4 hostname && + run_timeout 4 flux wreckrun -n4 -N4 hostname && cat >expected <<-EOF && $trans $trans @@ -106,7 +106,7 @@ test_expect_success LONGTEST 'jstat 4: notification works under lock-step stress run_flux_jstat 4 && p=$( sync_flux_jstat 4 ) && for i in `seq 1 20`; do - run_timeout 2 flux wreckrun -n4 -N4 hostname + run_timeout 4 flux wreckrun -n4 -N4 hostname done && cat >expected <<-EOF && $trans From 15b6f87da5ebcb85cb1e75cea1f18304b9f391f0 Mon Sep 17 00:00:00 2001 From: "Dong H. Ahn" Date: Thu, 28 May 2015 08:39:50 -0700 Subject: [PATCH 8/8] Break a race between wreckrun's state change and initial jcb notification --- src/modules/libjsc/jstatctl.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/modules/libjsc/jstatctl.c b/src/modules/libjsc/jstatctl.c index 2856bf61d340..47c0931485fe 100644 --- a/src/modules/libjsc/jstatctl.c +++ b/src/modules/libjsc/jstatctl.c @@ -798,13 +798,13 @@ static int new_job_cb (const char *key, int64_t val, void *arg, int errnum) Jadd_int64 (ss, JSC_STATE_PAIR_NSTATE, (int64_t) js); json_object_object_add (jcb, JSC_STATE_PAIR, ss); + if (invoke_cbs (h, nj, jcb, errnum) < 0) { + flux_log (h, LOG_ERR, "new_job_cb: failed to invoke callbacks"); + } if (reg_jobstate_hdlr (h, path, (KVSSetStringF *) job_state_cb) == -1) { flux_log (h, LOG_ERR, "new_job_cb: reg_jobstate_hdlr: %s", strerror (errno)); } - if (invoke_cbs (h, nj, jcb, errnum) < 0) { - flux_log (h, LOG_ERR, "new_job_cb: failed to invoke callbacks"); - } done: /* always return 0 so that reactor won't return */