Skip to content

Commit

Permalink
Merge pull request #1395 from garlick/libjsc_cleanup
Browse files Browse the repository at this point in the history
libjsc: misc cleanup
  • Loading branch information
grondo authored Mar 28, 2018
2 parents bccf233 + 9428a88 commit f7bde37
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 189 deletions.
5 changes: 2 additions & 3 deletions src/common/libjsc/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ AM_CPPFLAGS = \
-I$(top_srcdir) -I$(top_srcdir)/src/include \
$(ZMQ_CFLAGS)

noinst_LTLIBRARIES = libjsc.la
noinst_LTLIBRARIES = libjsc.la
fluxcoreinclude_HEADERS = jstatctl.h

libjsc_la_SOURCES = \
jstatctl.c \
jstatctl_deprecated.h
jstatctl.c
168 changes: 35 additions & 133 deletions src/common/libjsc/jstatctl.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
#include <flux/core.h>

#include "jstatctl.h"
#include "jstatctl_deprecated.h"
#include "src/common/libutil/log.h"
#include "src/common/libutil/xzmalloc.h"
#include "src/common/libutil/iterators.h"
Expand All @@ -54,7 +53,7 @@ typedef struct {
} stab_t;

typedef struct {
jsc_handler_obj_f cb;
jsc_handler_f cb;
void *arg;
} cb_pair_t;

Expand All @@ -63,7 +62,6 @@ typedef struct {
lru_cache_t *kvs_paths;
flux_msg_handler_t **handlers;
zlist_t *callbacks;
int first_time;
flux_t *h;
} jscctx_t;

Expand Down Expand Up @@ -137,7 +135,6 @@ static jscctx_t *getctx (flux_t *h)
lru_cache_set_free_f (ctx->kvs_paths, free);
if (!(ctx->callbacks = zlist_new ()))
oom ();
ctx->first_time = 1;
ctx->h = h;
flux_aux_set (h, "jstatctrl", ctx, freectx);
}
Expand Down Expand Up @@ -243,36 +240,6 @@ static const char * jscctx_jobid_path (jscctx_t *ctx, int64_t id)
return (path);
}

static inline bool is_jobid (const char *k)
{
return (!strncmp (JSC_JOBID, k, JSC_MAX_ATTR_LEN))? true : false;
}

static inline bool is_state_pair (const char *k)
{
return (!strncmp (JSC_STATE_PAIR, k, JSC_MAX_ATTR_LEN))? true : false;
}

static inline bool is_rdesc (const char *k)
{
return (!strncmp (JSC_RDESC, k, JSC_MAX_ATTR_LEN))? true : false;
}

static inline bool is_rdl (const char *k)
{
return (!strncmp (JSC_RDL, k, JSC_MAX_ATTR_LEN))? true : false;
}

static inline bool is_rdl_alloc (const char *k)
{
return (!strncmp (JSC_RDL_ALLOC, k, JSC_MAX_ATTR_LEN))? true : false;
}

static inline bool is_pdesc (const char *k)
{
return (!strncmp (JSC_PDESC, k, JSC_MAX_ATTR_LEN))? true : false;
}

static int fetch_and_update_state (zhash_t *aj , int64_t j, int64_t ns)
{
int *t = NULL;
Expand All @@ -294,7 +261,6 @@ static int fetch_and_update_state (zhash_t *aj , int64_t j, int64_t ns)
return (intptr_t) t;
}


/******************************************************************************
* *
* Internal JCB Accessors *
Expand Down Expand Up @@ -1023,7 +989,7 @@ static int invoke_cbs (flux_t *h, int64_t j, json_object *jcb, int errnum)
cb_pair_t *c = NULL;
jscctx_t *ctx = getctx (h);
for (c = zlist_first (ctx->callbacks); c; c = zlist_next (ctx->callbacks)) {
if (c->cb (jcb, c->arg, errnum) < 0) {
if (c->cb (Jtostr (jcb), c->arg, errnum) < 0) {
flux_log (h, LOG_DEBUG, "callback returns an error");
rc = -1;
}
Expand Down Expand Up @@ -1178,7 +1144,7 @@ static const struct flux_msg_handler_spec htab[] = {
FLUX_MSGHANDLER_TABLE_END
};

static int notify_status_obj (flux_t *h, jsc_handler_obj_f func, void *d)
int jsc_notify_status (flux_t *h, jsc_handler_f func, void *d)
{
int rc = -1;
cb_pair_t *c = NULL;
Expand Down Expand Up @@ -1217,143 +1183,79 @@ static int notify_status_obj (flux_t *h, jsc_handler_obj_f func, void *d)
return rc;
}

/* deprecated */
int jsc_notify_status_obj (flux_t *h, jsc_handler_obj_f func, void *d)
{
return notify_status_obj (h, func, d);
}

struct callback_wrapper {
jsc_handler_f cb;
void *arg;
};

static int wrap_handler (json_object *base_jcb, void *arg, int errnum)
{
struct callback_wrapper *wrap = arg;
return wrap->cb (Jtostr (base_jcb), wrap->arg, errnum);
}

int jsc_notify_status (flux_t *h, jsc_handler_f func, void *d)
{
int rc = -1;
struct callback_wrapper *wrap = xzmalloc (sizeof (*wrap));

wrap->cb = func;
wrap->arg = d;

rc = notify_status_obj (h, wrap_handler, wrap);
if (rc < 0)
free (wrap);
return rc;
}

static int query_jcb_obj (flux_t *h, int64_t jobid, const char *key,
json_object **jcb)
int jsc_query_jcb (flux_t *h, int64_t jobid, const char *key, char **jcb_str)
{
int rc = -1;
json_object *jcb = NULL;

if (!key) return -1;
if (jobid_exist (h, jobid) != 0) return -1;

if (is_jobid (key)) {
if ( (rc = query_jobid (h, jobid, jcb)) < 0)
if (!strcmp (key, JSC_JOBID)) {
if ( (rc = query_jobid (h, jobid, &jcb)) < 0)
flux_log (h, LOG_ERR, "query_jobid failed");
} else if (is_state_pair (key)) {
if ( (rc = query_state_pair (h, jobid, jcb)) < 0)
} else if (!strcmp (key, JSC_STATE_PAIR)) {
if ( (rc = query_state_pair (h, jobid, &jcb)) < 0)
flux_log (h, LOG_ERR, "query_pdesc failed");
} else if (is_rdesc (key)) {
if ( (rc = query_rdesc (h, jobid, jcb)) < 0)
} else if (!strcmp (key, JSC_RDESC)) {
if ( (rc = query_rdesc (h, jobid, &jcb)) < 0)
flux_log (h, LOG_ERR, "query_rdesc failed");
} else if (is_rdl (key)) {
if ( (rc = query_rdl (h, jobid, jcb)) < 0)
} else if (!strcmp (key, JSC_RDL)) {
if ( (rc = query_rdl (h, jobid, &jcb)) < 0)
flux_log (h, LOG_ERR, "query_rdl failed");
} else if (is_rdl_alloc (key)) {
if ( (rc = query_rdl_alloc (h, jobid, jcb)) < 0)
} else if (!strcmp (key, JSC_RDL_ALLOC)) {
if ( (rc = query_rdl_alloc (h, jobid, &jcb)) < 0)
flux_log (h, LOG_ERR, "query_rdl_alloc failed");
} else if (is_pdesc (key)) {
if ( (rc = query_pdesc (h, jobid, jcb)) < 0)
} else if (!strcmp(key, JSC_PDESC)) {
if ( (rc = query_pdesc (h, jobid, &jcb)) < 0)
flux_log (h, LOG_ERR, "query_pdesc failed");
} else
flux_log (h, LOG_ERR, "key (%s) not understood", key);

return rc;
}

/* deprecated */
int jsc_query_jcb_obj (flux_t *h, int64_t jobid, const char *key,
json_object **jcb)
{
return query_jcb_obj (h, jobid, key, jcb);
}

int jsc_query_jcb (flux_t *h, int64_t jobid, const char *key, char **jcb)
{
int rc;
json_object *o = NULL;

rc = query_jcb_obj (h, jobid, key, &o);
if (rc < 0)
goto done;
*jcb = o ? xstrdup (Jtostr (o)) : NULL;
*jcb_str = jcb ? xstrdup (Jtostr (jcb)) : NULL;
done:
Jput (o);
Jput (jcb);
return rc;
}

static int update_jcb_obj (flux_t *h, int64_t jobid, const char *key,
json_object *jcb)
int jsc_update_jcb (flux_t *h, int64_t jobid, const char *key,
const char *jcb_str)
{
int rc = -1;
json_object *o = NULL;
json_object *jcb = NULL;

if (!jcb) return -1;
if (jobid_exist (h, jobid) != 0) return -1;
if (!jcb_str || !(jcb = Jfromstr (jcb_str))) {
errno = EINVAL;
return -1;
}
if (jobid_exist (h, jobid) != 0)
goto done;

if (is_jobid (key)) {
if (!strcmp(key, JSC_JOBID)) {
flux_log (h, LOG_ERR, "jobid attr cannot be updated");
} else if (is_state_pair (key)) {
} else if (!strcmp (key, JSC_STATE_PAIR)) {
if (Jget_obj (jcb, JSC_STATE_PAIR, &o))
rc = update_state (h, jobid, o);
} else if (is_rdesc (key)) {
} else if (!strcmp (key, JSC_RDESC)) {
if (Jget_obj (jcb, JSC_RDESC, &o))
rc = update_rdesc (h, jobid, o);
} else if (is_rdl (key)) {
} else if (!strcmp (key, JSC_RDL)) {
const char *s = NULL;
if (Jget_str (jcb, JSC_RDL, &s))
rc = update_rdl (h, jobid, s);
} else if (is_rdl_alloc (key)) {
} else if (!strcmp (key, JSC_RDL_ALLOC)) {
if (Jget_obj (jcb, JSC_RDL_ALLOC, &o))
rc = update_rdl_alloc (h, jobid, o);
} else if (is_pdesc (key)) {
} else if (!strcmp (key, JSC_PDESC)) {
if (Jget_obj (jcb, JSC_PDESC, &o))
rc = update_pdesc (h, jobid, o);
}
else
flux_log (h, LOG_ERR, "key (%s) not understood", key);

return rc;
}

/* deprecated */
int jsc_update_jcb_obj (flux_t *h, int64_t jobid, const char *key,
json_object *jcb)
{
return update_jcb_obj (h, jobid, key, jcb);
}

int jsc_update_jcb (flux_t *h, int64_t jobid, const char *key, const char *jcb)
{
int rc = -1;
json_object *o = NULL;

if (!jcb || !(o = Jfromstr (jcb))) {
errno = EINVAL;
goto done;
}
rc = update_jcb_obj (h, jobid, key, o);
done:
Jput (o);
Jput (jcb);
return rc;
}

Expand Down
10 changes: 3 additions & 7 deletions src/common/libjsc/jstatctl.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ typedef int (*jsc_handler_f)(const char *base_jcb, void *arg, int errnum);
/* TODO: find a better way to manage this hierarchical
* JCB attributes space
*/
#define JSC_MAX_ATTR_LEN 32
#define JSC_JOBID "jobid"
#define JSC_STATE_PAIR "state-pair"
# define JSC_STATE_PAIR_OSTATE "ostate"
Expand Down Expand Up @@ -92,8 +91,7 @@ typedef int (*jsc_handler_f)(const char *base_jcb, void *arg, int errnum);
* "d" is arbitrary data that will transparently be passed into "callback."
* However, one should pass its flux_t object as part of this callback data.
* Note that the caller must start its reactor to get an asynchronous status
* change notification via "callback." This is because it uses the KVS-watch
* facility which has the same limitation.
* change notification via "callback."
* One can register mutliple callbacks by calling this function
* multiple times. The callbacks will be invoked in the order
* they are registered. Returns 0 on success; otherwise -1.
Expand All @@ -102,10 +100,8 @@ int jsc_notify_status (flux_t *h, jsc_handler_f callback, void *d);

/**
* Query the "key" attribute of JCB of "jobid." The JCB info on this attribute
* will be passed via "jcb." It is the caller's responsibility to release "jcb."
* All of the ownership associated with the sub-attributes in jcb's hierarchy
* are trasferred to "jcb," so that json_object_put (*jcb) will free this hierarchy
* in its entirety. Returns 0 on success; otherwise -1.
* will be passed via "jcb." It is the caller's responsibility to free "jcb."
* Returns 0 on success; otherwise -1.
*/
int jsc_query_jcb (flux_t *h, int64_t jobid, const char *key, char **jcb);

Expand Down
46 changes: 0 additions & 46 deletions src/common/libjsc/jstatctl_deprecated.h

This file was deleted.

0 comments on commit f7bde37

Please sign in to comment.