Skip to content

Commit

Permalink
job-info: Support job annotations
Browse files Browse the repository at this point in the history
Listen to the newly created job-annotations event and make
job annotations available to be returned by job listing services.
  • Loading branch information
chu11 committed Jul 12, 2020
1 parent 908fa53 commit 90a04e5
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 1 deletion.
5 changes: 5 additions & 0 deletions src/modules/job-info/job-info.c
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ static const struct flux_msg_handler_spec htab[] = {
.cb = job_state_cb,
.rolemask = 0
},
{ .typemask = FLUX_MSGTYPE_EVENT,
.topic_glob = "job-annotations",
.cb = job_annotations_cb,
.rolemask = 0
},
FLUX_MSGHANDLER_TABLE_END,
};

Expand Down
86 changes: 86 additions & 0 deletions src/modules/job-info/job_state.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ static void job_destroy (void *data)
struct job *job = data;
if (job) {
json_decref (job->exception_context);
json_decref (job->annotations);
json_decref (job->jobspec_job);
json_decref (job->jobspec_cmd);
json_decref (job->R);
Expand Down Expand Up @@ -166,6 +167,11 @@ struct job_state_ctx *job_state_create (flux_t *h)
goto error;
}

if (flux_event_subscribe (h, "job-annotations") < 0) {
flux_log_error (h, "flux_event_subscribe");
goto error;
}

return jsctx;

error:
Expand Down Expand Up @@ -1237,6 +1243,73 @@ void job_state_cb (flux_t *h, flux_msg_handler_t *mh,
return;
}

static void update_annotations (struct info_ctx *ctx, json_t *annotations)
{
struct job_state_ctx *jsctx = ctx->jsctx;
size_t index;
json_t *value;

if (!json_is_array (annotations)) {
flux_log_error (ctx->h, "%s: annotations EPROTO", __FUNCTION__);
return;
}

json_array_foreach (annotations, index, value) {
struct job *job;
json_t *o;
flux_jobid_t id;

if (!json_is_array (value)) {
flux_log_error (jsctx->h, "%s: annotation EPROTO", __FUNCTION__);
return;
}

if (!(o = json_array_get (value, 0))
|| !json_is_integer (o)) {
flux_log_error (jsctx->h, "%s: annotation EPROTO", __FUNCTION__);
return;
}

id = json_integer_value (o);

if (!(o = json_array_get (value, 1))
|| (!json_is_object (o) && !json_is_null (o))) {
flux_log_error (jsctx->h, "%s: annotation EPROTO", __FUNCTION__);
return;
}

if ((job = zhashx_lookup (jsctx->index, &id))) {
json_decref (job->annotations);
if (json_is_null (o))
job->annotations = NULL;
else
job->annotations = json_incref (o);
}
else
flux_log_error (jsctx->h, "%s: job %ju not found",
__FUNCTION__, (uintmax_t)id);
}

}

void job_annotations_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
struct info_ctx *ctx = arg;
json_t *annotations;

if (flux_event_unpack (msg, NULL, "{s:o}",
"annotations",
&annotations) < 0) {
flux_log_error (h, "%s: flux_event_unpack", __FUNCTION__);
return;
}

update_annotations (ctx, annotations);

return;
}

void job_state_pause_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
Expand Down Expand Up @@ -1364,6 +1437,19 @@ static struct job *eventlog_restart_parse (struct info_ctx *ctx,
update_job_state (ctx, job, FLUX_JOB_CLEANUP, timestamp);
}
else if (!strcmp (name, "alloc")) {
/* context not required if no annotations */
if (context) {
json_t *annotations;
if (json_unpack (context,
"{ s:o }",
"annotations", &annotations) < 0) {
flux_log_error (ctx->h, "%s: alloc context for %ju invalid",
__FUNCTION__, (uintmax_t)job->id);
goto error;
}
job->annotations = json_incref (annotations);
}

if (job->state == FLUX_JOB_SCHED)
update_job_state (ctx, job, FLUX_JOB_RUN, timestamp);
}
Expand Down
4 changes: 4 additions & 0 deletions src/modules/job-info/job_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ struct job {
const char *exception_type;
const char *exception_note;
flux_job_result_t result;
json_t *annotations;

/* cache of job information */
json_t *jobspec_job;
Expand Down Expand Up @@ -123,6 +124,9 @@ void job_state_destroy (void *data);
void job_state_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg);

void job_annotations_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg);

void job_state_pause_cb (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg);

Expand Down
5 changes: 5 additions & 0 deletions src/modules/job-info/job_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ json_t *job_to_json (struct job *job, json_t *attrs, job_info_error_t *errp)
continue;
val = json_integer (job->result);
}
else if (!strcmp (attr, "annotations")) {
if (!job->annotations)
continue;
val = json_incref (job->annotations);
}
else {
seterror (errp, "%s is not a valid attribute", attr);
errno = EINVAL;
Expand Down
2 changes: 1 addition & 1 deletion src/modules/job-info/list.c
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ void list_attrs_cb (flux_t *h, flux_msg_handler_t *mh,
"state", "name", "ntasks", "nnodes", "ranks",
"success", "exception_occurred", "exception_type",
"exception_severity", "exception_note", "result",
"expiration", NULL };
"expiration", "annotations", NULL };
json_t *a = NULL;
int i;

Expand Down

0 comments on commit 90a04e5

Please sign in to comment.