-
Notifications
You must be signed in to change notification settings - Fork 50
/
jobspec.c
309 lines (279 loc) · 10.4 KB
/
jobspec.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
/************************************************************\
* Copyright 2019 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/
#if HAVE_CONFIG_H
#include "config.h"
#endif
#include <flux/core.h>
#include <jansson.h>
#include "jobspec.h"
struct res_level {
const char *type;
int count;
json_t *with;
};
static void set_error (json_error_t *error, const char *fmt, ...)
{
va_list ap;
if (error) {
va_start (ap, fmt);
vsnprintf (error->text, sizeof (error->text), fmt, ap);
va_end (ap);
}
}
static int parse_res_level (json_t *o,
int level,
struct res_level *resp,
json_error_t *error)
{
json_error_t loc_error;
struct res_level res;
if (o == NULL) {
set_error (error, "level %d: missing", level);
return -1;
}
res.with = NULL;
/* For jobspec version 1, expect exactly one array element per level.
*/
if (json_unpack_ex (o, &loc_error, 0,
"{s:s s:i s?o}",
"type", &res.type,
"count", &res.count,
"with", &res.with) < 0) {
set_error (error, "level %d: %s", level, loc_error.text);
return -1;
}
*resp = res;
return 0;
}
void jobspec_destroy (struct jobspec *job)
{
if (job) {
/* refcounts were incremented on environment, options */
json_decref (job->environment);
json_decref (job->options);
json_decref (job->jobspec);
free (job);
}
}
static int recursive_parse_helper (struct jobspec *job,
json_t *curr_resource,
json_error_t *error,
int level,
int with_multiplier)
{
size_t index;
json_t *value;
size_t size = json_array_size (curr_resource);
struct res_level res;
int curr_multiplier;
if (size == 0) {
set_error (error, "Malformed jobspec: resource entry is not a list");
return -1;
}
json_array_foreach (curr_resource, index, value) {
if (parse_res_level (value, level, &res, error) < 0) {
return -1;
}
curr_multiplier = with_multiplier * res.count;
if (!strcmp (res.type, "node")) {
if (job->slot_count > 0) {
set_error (error, "node resource encountered after slot resource");
return -1;
}
if (job->cores_per_slot > 0) {
set_error (error, "node resource encountered after core resource");
return -1;
}
if (job->node_count > 0) {
set_error (error, "node resource encountered after node resource");
return -1;
}
job->node_count = curr_multiplier;
} else if (!strcmp (res.type, "slot")) {
if (job->cores_per_slot > 0) {
set_error (error, "slot resource encountered after core resource");
return -1;
}
if (job->slot_count > 0) {
set_error (error, "slot resource encountered after slot resource");
return -1;
}
job->slot_count = curr_multiplier;
// Reset the multiplier since we are now looking
// to calculate the cores_per_slot value
curr_multiplier = 1;
// Check if we already encountered the `node` resource
if (job->node_count > 0) {
// N.B.: with a strictly enforced ordering of node then slot
// (with arbitrary non-core resources in between)
// the slots_per_node will always be a perfectly round integer
// (i.e., job->slot_count % job->node_count == 0)
job->slots_per_node = job->slot_count / job->node_count;
}
} else if (!strcmp (res.type, "core")) {
if (job->slot_count < 1) {
set_error (error, "core resource encountered before slot resource");
return -1;
}
if (job->cores_per_slot > 0) {
set_error (error, "core resource encountered after core resource");
return -1;
}
job->cores_per_slot = curr_multiplier;
// N.B.: despite having found everything we were looking for (i.e.,
// node, slot, and core resources), we have to keep recursing to
// make sure their aren't additional erroneous node/slot/core
// resources in the jobspec
}
if (res.with != NULL) {
if (recursive_parse_helper (job,
res.with,
error,
level+1,
curr_multiplier)
< 0) {
return -1;
}
}
if (!strcmp (res.type, "node")) {
if ((job->slot_count <= 0) || (job->cores_per_slot <= 0)) {
set_error (error,
"node encountered without slot&core below it");
return -1;
}
} else if (!strcmp (res.type, "slot")) {
if (job->cores_per_slot <= 0) {
set_error (error, "slot encountered without core below it");
return -1;
}
}
}
return 0;
}
/* This function requires that the jobspec resource ordering is the same as the
* ordering specified in V1, but it allows additional resources before and in
* between the V1 resources (i.e., node, slot, and core). In shorthand, it
* requires that the jobspec follows the form ...->[node]->...->slot->...->core.
* Where `node` is optional, and `...` represents any non-V1
* resources. Additionally, this function also allows multiple resources at any
* level, as long as there is only a single node, slot, and core within the
* entire jobspec.
*/
static int recursive_parse_jobspec_resources (struct jobspec *job,
json_t *curr_resource,
json_error_t *error)
{
if (curr_resource == NULL) {
set_error (error, "jobspec top-level resources empty");
return -1;
}
// Set node-related values to -1 ahead of time, if the recursive descent
// encounters node in the jobspec, it will overwrite these values
job->slots_per_node = -1;
job->node_count = -1;
int rc = recursive_parse_helper (job, curr_resource, error, 0, 1);
if ((rc == 0) && (job->cores_per_slot < 1)) {
set_error (error, "Missing core resource");
return -1;
}
return rc;
}
struct jobspec *jobspec_parse (const char *jobspec, json_error_t *error)
{
struct jobspec *job;
json_t *tasks;
json_t *resources;
if (!(job = calloc (1, sizeof (*job)))) {
set_error (error, "Out of memory");
goto error;
}
if (!(job->jobspec = json_loads (jobspec, 0, error)))
goto error;
/* N.B.: members of jobspec like environment and shell.options may
* be modified with json_object_update_new() via the shell API
* calls flux_shell_setenvf(3), flux_shell_unsetenv(3), and
* flux_shell_setopt(3). Therefore, the refcount of these objects
* is incremented during unpack (via the "O" specifier), so that
* the objects have json_decref() called directly on them to
* avoid potential leaks (the json_decref() of the outer jobspec
* object itself doesn't seem to catch the changes to these inner
* json_t * objects)
*/
if (json_unpack_ex (job->jobspec, error, 0,
"{s:i s:o s:o s:{s:{s?:s s?:O s?:{s?:O}}}}",
"version", &job->version,
"resources", &resources,
"tasks", &tasks,
"attributes",
"system",
"cwd", &job->cwd,
"environment", &job->environment,
"shell", "options", &job->options) < 0) {
goto error;
}
if (job->version != 1) {
set_error (error, "Invalid jobspec version: expected 1 got %d",
job->version);
goto error;
}
if (job->environment && !json_is_object (job->environment)) {
set_error (error, "attributes.system.environment is not object type");
goto error;
}
/* Ensure that shell options and environment are never NULL, so a shell
* component or plugin may set a new option or environment var.
*/
if ((!job->options && !(job->options = json_object ()))
|| (!job->environment && !(job->environment = json_object ()))) {
set_error (error, "unable to create empty jobspec options/environment");
goto error;
}
if (recursive_parse_jobspec_resources (job, resources, error) < 0) {
// recursive_parse_jobspec_resources calls set_error
goto error;
}
/* Set job->task_count
*/
if (json_unpack_ex (tasks, NULL, 0,
"[{s:{s:i}}]",
"count", "total", &job->task_count) < 0) {
int per_slot;
if (json_unpack_ex (tasks, NULL, 0,
"[{s:{s:i}}]",
"count", "per_slot", &per_slot) < 0) {
set_error (error, "Unable to parse task count");
goto error;
}
if (per_slot != 1) {
set_error (error, "per_slot count: expected 1 got %d", per_slot);
goto error;
}
job->task_count = job->slot_count;
}
/* Get command
*/
if (json_unpack_ex (tasks, NULL, 0,
"[{s:o}]",
"command", &job->command) < 0) {
set_error (error, "Unable to parse command");
goto error;
}
if (!json_is_array (job->command)) {
set_error (error, "Malformed command entry");
goto error;
}
return job;
error:
jobspec_destroy (job);
return NULL;
}
/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/