Skip to content

Commit

Permalink
Merge pull request #205 from dongahn/jsc_topics
Browse files Browse the repository at this point in the history
Integrate Job Status and Control (JSC) interface
  • Loading branch information
garlick committed May 29, 2015
2 parents 631ce4d + 15b6f87 commit 48534ac
Show file tree
Hide file tree
Showing 13 changed files with 1,870 additions and 2 deletions.
1 change: 1 addition & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ AC_CONFIG_FILES( \
src/modules/libmrpc/Makefile \
src/modules/libzio/Makefile \
src/modules/libkz/Makefile \
src/modules/libjsc/Makefile \
src/modules/live/Makefile \
src/modules/mecho/Makefile \
src/modules/barrier/Makefile \
Expand Down
7 changes: 6 additions & 1 deletion src/cmd/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ fluxcmd_PROGRAMS = \
flux-comms-stats \
flux-kvs \
flux-start \
flux-config
flux-config \
flux-jstat

flux_zio_SOURCES = \
flux-zio.c \
Expand All @@ -67,3 +68,7 @@ flux_module_LDADD = \
$(top_builddir)/src/modules/libmrpc/libmrpc.la \
$(fluxcmd_ldadd) \
$(LIBUTIL)
flux_jstat_LDADD = \
$(top_builddir)/src/modules/libjsc/libjsc.la \
$(fluxcmd_ldadd) \
$(LIBUTIL)
307 changes: 307 additions & 0 deletions src/cmd/flux-jstat.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,307 @@
/*****************************************************************************\
* Copyright (c) 2014 Lawrence Livermore National Security, LLC. Produced at
* the Lawrence Livermore National Laboratory (cf, AUTHORS, DISCLAIMER.LLNS).
* LLNL-CODE-658032 All rights reserved.
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the license, or (at your option)
* any later version.
*
* Flux is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the IMPLIED WARRANTY OF MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the terms and conditions of the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
* See also: http://www.gnu.org/licenses/
\*****************************************************************************/

#if HAVE_CONFIG_H
#include "config.h"
#endif
#include <json.h>
#include <stdio.h>
#include <getopt.h>
#include <libgen.h>
#include <signal.h>
#include <unistd.h>
#include <stdbool.h>
#include <flux/core.h>

#include "src/common/libutil/log.h"
//#include "src/modules/libjsc/jstatctl.h"
#include "src/common/libutil/jsonutil.h"
#include "src/common/libutil/xzmalloc.h"
#include "src/common/libutil/shortjson.h"


/******************************************************************************
* *
* Internal types, macros and static variables *
* *
******************************************************************************/
typedef struct {
flux_t h;
FILE *op;
} jstatctx_t;

/* Note: Should only be used for signal handling */
static flux_t sig_flux_h;

#define OPTIONS "o:h"
static const struct option longopts[] = {
{"help", no_argument, 0, 'h'},
{"testout", required_argument, 0, 'o'},
{ 0, 0, 0, 0 },
};


/******************************************************************************
* *
* Utilities *
* *
******************************************************************************/
static void usage (void)
{
fprintf (stderr,
"Usage: flux-jstat notify\n"
" flux-jstat query jobid <top-level JCB attribute>\n"
" flux-jstat update jobid <top-level JCB attribute> <JCB JSON>\n"
);
exit (1);
}

static void freectx (jstatctx_t *ctx)
{
if (ctx->op)
fclose (ctx->op);
}

static jstatctx_t *getctx (flux_t h)
{
jstatctx_t *ctx = (jstatctx_t *)flux_aux_get (h, "jstat");
if (!ctx) {
ctx = xzmalloc (sizeof (*ctx));
ctx->h = h;
ctx->op = NULL;
flux_aux_set (h, "jstat", ctx, (FluxFreeFn)freectx);
}
return ctx;
}

static void sig_handler (int s)
{
if (s == SIGINT) {
fprintf (stdout, "Exit on INT");
exit (0);
}
}

static FILE *open_test_outfile (const char *fn)
{
FILE *fp;
if (!fn)
fp = NULL;
else if ( !(fp = fopen (fn, "w")))
fprintf (stderr, "Failed to open %s\n", fn);
return fp;
}

static JSON parse_json_str (const char *jcbstr)
{
int len = 0;
JSON jcb = NULL;
struct json_tokener *tok = NULL;

if (jcbstr)
len = strnlen (jcbstr, sysconf (_SC_ARG_MAX));
if (!(tok = json_tokener_new ()))
errno = ENOMEM;
else if (!(jcb = json_tokener_parse_ex (tok, jcbstr, len)))
errno = EPROTO;

if (tok)
json_tokener_free (tok);

return jcb;
}

static inline void get_jobid (JSON jcb, int64_t *j)
{
Jget_int64 (jcb, JSC_JOBID, j);
}

static inline void get_states (JSON jcb, int64_t *os, int64_t *ns)
{
JSON o = NULL;
Jget_obj (jcb, JSC_STATE_PAIR, &o);
Jget_int64 (o, JSC_STATE_PAIR_OSTATE, os);
Jget_int64 (o, JSC_STATE_PAIR_NSTATE, ns);
}


/******************************************************************************
* *
* Async notification callback *
* *
******************************************************************************/

static int job_status_cb (JSON jcb, void *arg, int errnum)
{
int64_t os = 0;
int64_t ns = 0;
int64_t j = 0;
jstatctx_t *ctx = NULL;
flux_t h = (flux_t)arg;

ctx = getctx (h);
if (errnum > 0) {
flux_log (ctx->h, LOG_ERR, "job_status_cb: errnum passed in");
return -1;
}

get_jobid (jcb, &j);
get_states (jcb, &os, &ns);
Jput (jcb);

fprintf (ctx->op, "%s->%s\n",
jsc_job_num2state ((job_state_t)os),
jsc_job_num2state ((job_state_t)ns));
fflush (ctx->op);

return 0;
}


/******************************************************************************
* *
* Top-level Handlers for each of notify, query and update commands *
* *
******************************************************************************/

static int handle_notify_req (flux_t h, const char *ofn)
{
jstatctx_t *ctx = NULL;

sig_flux_h = h;
if (signal (SIGINT, sig_handler) == SIG_ERR)
return -1;

ctx = getctx (h);
ctx->op = (ofn)? open_test_outfile (ofn) : stdout;

if (jsc_notify_status (h, job_status_cb, (void *)h) != 0) {
flux_log (h, LOG_ERR, "failed to reg a job status change CB");
return -1;
}
if (flux_reactor_start (h) < 0)
flux_log (h, LOG_ERR, "error in flux_reactor_start");

return 0;
}

static int handle_query_req (flux_t h, int64_t j, const char *k, const char *n)
{
JSON jcb = NULL;
jstatctx_t *ctx = NULL;

ctx = getctx (h);
ctx->op = n? open_test_outfile (n) : stdout;
if (jsc_query_jcb (h, j, k, &jcb) != 0) {
flux_log (h, LOG_ERR, "jsc_query_jcb reported an error\n");
return -1;
}
fprintf (ctx->op, "Job Control Block: attribute %s for job %ld\n", k, j);
fprintf (ctx->op, "%s\n",
json_object_to_json_string_ext (jcb, JSON_C_TO_STRING_PRETTY));
Jput (jcb);
return 0;
}

static int handle_update_req (flux_t h, int64_t j, const char *k,
const char *s, const char *n)
{
JSON jcb = NULL;
jstatctx_t *ctx = NULL;
ctx = getctx (h);
ctx->op = n? open_test_outfile (n) : stdout;

if (!(jcb = parse_json_str (s))) {
flux_log (h, LOG_ERR, "parse_json_str parse error.\n");
return -1;
}
return (jsc_update_jcb (ctx->h, j, k, jcb));
}


/******************************************************************************
* *
* Main entry point *
* *
******************************************************************************/

int main (int argc, char *argv[])
{
flux_t h;
int ch = 0;
int rc = 0;
char *cmd = NULL;
const char *j = NULL;
const char *ofn = NULL;
const char *attr = NULL;
const char *jcbstr = NULL;

log_init ("flux-jstat");
while ((ch = getopt_long (argc, argv, OPTIONS, longopts, NULL)) != -1) {
switch (ch) {
case 'h': /* --help */
usage ();
break;
case 'o': /* --testout */
ofn = xasprintf ("%s", optarg);
break;
default:
usage ();
break;
}
}
if (optind == argc)
usage ();

if (!(h = flux_open (NULL, 0)))
err_exit ("flux_open");

flux_log_set_facility (h, "jstat");
cmd = argv[optind++];

if (!strcmp ("notify", cmd))
rc = handle_notify_req (h, (const char *)ofn);
else if (!strcmp ("query", cmd)) {
j = (const char *)(*(argv+optind));
attr = (const char *)(*(argv+optind+1));
rc = handle_query_req (h, strtol (j, NULL, 10), attr, ofn);
}
else if (!strcmp ("update", cmd)) {
j = (const char *)(*(argv+optind));
attr = (const char *)(*(argv+optind+1));
jcbstr = (const char *)(*(argv+optind+2));
rc = handle_update_req (h, strtol (j, NULL, 10), attr, jcbstr, ofn);
}
else
usage ();

flux_close (h);
log_fini ();

return (!rc)? 0: 42;
}

/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/
1 change: 1 addition & 0 deletions src/common/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "core/kvs.h"
#include "core/live.h"
#include "core/barrier.h"
#include "core/jstatctl.h"

#endif /* !_FLUX_CORE_H */

Expand Down
11 changes: 11 additions & 0 deletions src/common/libutil/shortjson.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,17 @@ Jadd_int (JSON o, const char *name, int i)
json_object_object_add (o, (char *)name, n);
}

/* Add 64bit integer to JSON.
*/
static __inline__ void
Jadd_int64 (JSON o, const char *name, int64_t i)
{
JSON n = json_object_new_int64 (i);
if (!n)
oom ();
json_object_object_add (o, (char *)name, n);
}

/* Add double to JSON.
*/
static __inline__ void
Expand Down
1 change: 1 addition & 0 deletions src/include/flux/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@
#include "src/modules/kvs/kvs.h"
#include "src/modules/live/live.h"
#include "src/modules/barrier/barrier.h"
#include "src/modules/libjsc/jstatctl.h"

#endif /* FLUX_CORE_H */
2 changes: 1 addition & 1 deletion src/modules/Makefile.am
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
#This order is *important*
SUBDIRS = api kvs libmrpc libzio libkz modctl live mecho barrier wreck
SUBDIRS = api kvs libmrpc libzio libkz modctl live mecho barrier wreck libjsc
16 changes: 16 additions & 0 deletions src/modules/libjsc/Makefile.am
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
AM_CFLAGS = @GCCWARN@

AM_CPPFLAGS = \
$(JSON_CFLAGS) \
-I$(top_srcdir) -I$(top_srcdir)/src/include

fluxlib_LTLIBRARIES = libjsc.la
fluxcoreinclude_HEADERS = jstatctl.h

libjsc_la_SOURCES = \
jstatctl.c \
jstatctl.h
libjsc_la_LDFLAGS = -shared -export-dynamic --disable-static \
$(top_builddir)/src/modules/kvs/libkvs.la \
$(top_builddir)/src/common/libflux-internal.la \
$(top_builddir)/src/common/libflux-core.la
Loading

0 comments on commit 48534ac

Please sign in to comment.