Skip to content

Commit

Permalink
Merge pull request #5512 from grondo/no-shell-standalone
Browse files Browse the repository at this point in the history
shell: drop job shell standalone mode
  • Loading branch information
mergify[bot] authored Oct 20, 2023
2 parents 4bf109f + 9659ef8 commit e90547f
Show file tree
Hide file tree
Showing 22 changed files with 215 additions and 1,062 deletions.
30 changes: 2 additions & 28 deletions doc/man1/flux-shell.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,35 +32,14 @@ even extend the shell itself via simple shell plugins developed directly
in Lua. See the SHELL INITRC section below for details of the ``initrc``
format and features.

Most Flux users will interact with flux-shell(1) indirectly through
the execution of Flux jobs, however flux-shell(1) accepts the following
OPTIONS in standalone mode (``-s, --standalone``), for testing purposes.

OPTIONS
=======

**-h, --help**
Summarize available options.

**-v, --verbose**
Log actions to stderr.

**--initrc**\ =\ *FILE*
Load shell initrc from FILE instead of the system default.

**-R, --resources**\ =\ *FILE*
Load resource set **R** from a file instead of job-info service. This is
used for testing.

**-j, --jobspec**\ =\ *FILE*
Load jobspec from FILE instead of job-info service. This is used for
testing.

**-s, --standalone**
Run as as a local program without Flux instance. Used for testing.
In standalone mode an initrc file is not loaded unless specifically
requested via the ``--initrc`` option or specified in jobspec.

**--reconnect**
Attempt to reconnect if broker connection is lost.

OPERATION
=========
Expand Down Expand Up @@ -485,9 +464,6 @@ supported. Job shell specific functions and tables are described below:
Current flux-shell verbosity. This value may be changed at runtime,
e.g. ``shell.options.verbose = 2`` to set maximum verbosity.

**shell.options.standalone**
True if the shell is running in "standalone" mode for testing.

**shell.info**
Returns a Lua table of shell information obtained via
:man3:`flux_shell_get_info`. This table includes
Expand All @@ -504,8 +480,6 @@ supported. Job shell specific functions and tables are described below:
The service string advertised by the shell.
**options.verbose**
True if the shell is running in verbose mode.
**options.standalone**
True if the shell was run in standalone mode.
**jobspec**
The jobspec of the current job
**R**
Expand Down
4 changes: 0 additions & 4 deletions src/shell/evlog.c
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,6 @@ static int log_eventlog_start (flux_plugin_t *p,
flux_shell_t *shell = flux_plugin_get_shell (p);
struct evlog *evlog = NULL;

/* Do not activate eventlogger in standalone mode */
if (shell->standalone)
return 0;

if (!(evlog = evlog_create (shell)))
return -1;
if (flux_plugin_aux_set (p, "evlog", evlog,
Expand Down
9 changes: 3 additions & 6 deletions src/shell/exception.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,17 @@ static int exception_init (flux_plugin_t *p,
flux_t *h;
flux_jobid_t id;
int shell_rank;
int standalone;
flux_shell_t *shell = flux_plugin_get_shell (p);
if (!shell)
return -1;
if (!(h = flux_shell_get_flux (shell)))
return -1;
if (flux_shell_info_unpack (shell,
"{s:I s:i s:{s:b}}",
"{s:I s:i}",
"jobid", &id,
"rank", &shell_rank,
"options",
"standalone", &standalone) < 0)
"rank", &shell_rank) < 0)
return -1;
if (standalone || shell_rank != 0)
if (shell_rank != 0)
return 0;

if (flux_shell_service_register (shell,
Expand Down
165 changes: 31 additions & 134 deletions src/shell/info.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,123 +29,53 @@
#include "info.h"
#include "jobspec.h"

/* Append string 's' to JSON array 'array'.
* Return 0 on success, -1 on failure.
*/
static int array_append_string (json_t *array, const char *s)
{
json_t *o;

if (!(o = json_string (s)) || json_array_append_new (array, o) < 0) {
json_decref (o);
return -1;
}
return 0;
}

/* If either *jobspec or *R is NULL, fetch it from future and assign.
/* Get R and jobspec from job-info.lookup future and assign.
* Return 0 on success, -1 on failure (and log error).
* N.B. assigned values remain valid until future is destroyed.
*/
static int lookup_job_info_get (flux_future_t *f,
char **jobspec,
const char **R)
{
if (*jobspec == NULL) {
flux_error_t error;
const char *J;
if (flux_rpc_get_unpack (f, "{s:s}", "J", &J) < 0)
goto error;
if (!(*jobspec = flux_unwrap_string (J, true, NULL, &error))) {
shell_log_error ("failed to unwrap J: %s", error.text);
return -1;
}
flux_error_t error;
const char *J;
if (flux_rpc_get_unpack (f, "{s:s}", "J", &J) < 0)
goto error;
if (!(*jobspec = flux_unwrap_string (J, true, NULL, &error))) {
shell_log_error ("failed to unwrap J: %s", error.text);
return -1;
}
if (!*R && flux_rpc_get_unpack (f, "{s:s}", "R", R) < 0)
if (flux_rpc_get_unpack (f, "{s:s}", "R", R) < 0)
goto error;
return 0;
error:
shell_log_error ("job-info: %s", future_strerror (f, errno));
return -1;
}

/* If either jobspec or R is NULL, fetch it from the job-info service.
/* Fetch R and J from the job-info service.
* Return future on success or NULL on failure (and log error).
*/
static flux_future_t *lookup_job_info (flux_t *h,
flux_jobid_t jobid,
const char *jobspec,
const char *R)
static flux_future_t *lookup_job_info (flux_t *h, flux_jobid_t jobid)
{
json_t *keys;
flux_future_t *f;

if (!(keys = json_array ())
|| (!R && array_append_string (keys, "R") < 0)
|| (!jobspec && array_append_string (keys, "J") < 0)) {
shell_log_error ("error building json array");
return NULL;
}
f = flux_rpc_pack (h,
"job-info.lookup",
FLUX_NODEID_ANY,
0,
"{s:I s:O s:i}",
"{s:I s:[ss] s:i}",
"id", jobid,
"keys", keys,
"keys", "R", "J",
"flags", 0);
if (!f)
shell_log_error ("error sending job-info request");
json_decref (keys);
return f;
}

/* Read content of file 'optarg' and return it or NULL on failure (log error).
* Caller must free returned result.
*/
static char *parse_arg_file (const char *optarg)
{
int fd;
ssize_t size;
void *buf = NULL;

if (streq (optarg, "-"))
fd = STDIN_FILENO;
else {
if ((fd = open (optarg, O_RDONLY)) < 0) {
shell_log_errno ("error opening %s", optarg);
return NULL;
}
}
if ((size = read_all (fd, &buf)) < 0)
shell_log_errno ("error reading %s", optarg);
if (fd != STDIN_FILENO)
(void)close (fd);
return buf;
}

/* If option 'name' exists, read it as a file and exit on failure.
* O/w, return NULL.
*/
static char *optparse_check_and_loadfile (optparse_t *p, const char *name)
{
char *result = NULL;
const char *path = optparse_get_str (p, name, NULL);
if (path) {
if (!(result = parse_arg_file (path)))
exit (1);
return result;
}
return NULL;
}

/* Fetch jobinfo (jobspec, R) from job-info service if not provided on
* command line, and parse.
*/
static int shell_init_jobinfo (flux_shell_t *shell,
struct shell_info *info,
const char *jobspec_provided,
const char *R_provided)
static int shell_init_jobinfo (flux_shell_t *shell, struct shell_info *info)
{
int rc = -1;
flux_future_t *f_info = NULL;
Expand All @@ -155,53 +85,31 @@ static int shell_init_jobinfo (flux_shell_t *shell,
const char *R;
json_error_t error;

R = R_provided;
if (jobspec_provided && !(jobspec = strdup (jobspec_provided)))
shell_die (1, "Out of memory copying provided jobspec");

/* If shell is not running standalone, fetch hwloc topology
* from resource module to avoid having to load from scratch
* here. The topology XML is then cached for future shell plugin
* use.
/* fetch hwloc topology from resource module to avoid having to
* load from scratch here. The topology XML is then cached for
* future shell plugin use.
*/
if (!shell->standalone
&& !(f_hwloc = flux_rpc (shell->h,
"resource.topo-get",
NULL,
FLUX_NODEID_ANY,
0)))
if (!(f_hwloc = flux_rpc (shell->h,
"resource.topo-get",
NULL,
FLUX_NODEID_ANY,
0)))
goto out;

if (!R || !jobspec) {
/* Fetch missing jobinfo from broker job-info service */
if (shell->standalone) {
shell_log_error ("Invalid arguments: standalone and R/jobspec are unset");
goto out;
}
if (!(f_info = lookup_job_info (shell->h,
shell->jobid,
jobspec_provided,
R_provided)))
goto out;
}
/* fetch jobspec (via J) and R for this job
*/
if (!(f_info = lookup_job_info (shell->h, shell->jobid)))
goto out;

if (f_hwloc) {
if (flux_rpc_get (f_hwloc, &xml) < 0
|| !(info->hwloc_xml = strdup (xml))) {
shell_log_error ("error fetching local hwloc xml");
goto out;
}
}
else {
/* We couldn't fetch hwloc, load a copy manually */
if (flux_rpc_get (f_hwloc, &xml) < 0
|| !(info->hwloc_xml = strdup (xml))) {
shell_log_error ("error fetching local hwloc xml");
if (!(info->hwloc_xml = rhwloc_local_topology_xml (0))) {
shell_log_error ("error loading local hwloc xml");
goto out;
}
}

if (f_info &&
lookup_job_info_get (f_info, &jobspec, &R) < 0) {
if (lookup_job_info_get (f_info, &jobspec, &R) < 0) {
shell_log_error ("error fetching jobspec,R");
goto out;
}
Expand Down Expand Up @@ -295,8 +203,6 @@ int shell_info_set_taskmap (struct shell_info *info,
struct shell_info *shell_info_create (flux_shell_t *shell)
{
struct shell_info *info;
char *R = NULL;
char *jobspec = NULL;
const char *per_resource = NULL;
int per_resource_count = -1;
int broker_rank = shell->broker_rank;
Expand All @@ -308,18 +214,9 @@ struct shell_info *shell_info_create (flux_shell_t *shell)
}
info->jobid = shell->jobid;

/* Check for jobspec and/or R on cmdline:
*/
jobspec = optparse_check_and_loadfile (shell->p, "jobspec");
R = optparse_check_and_loadfile (shell->p, "resources");

if (shell_init_jobinfo (shell, info, jobspec, R) < 0)
if (shell_init_jobinfo (shell, info) < 0)
goto error;

/* Done with potentially allocated jobspec, R strings */
free (jobspec);
free (R);

if (get_per_resource_option (info->jobspec,
&per_resource,
&per_resource_count) < 0)
Expand Down
45 changes: 20 additions & 25 deletions src/shell/input.c
Original file line number Diff line number Diff line change
Expand Up @@ -445,30 +445,27 @@ struct shell_input *shell_input_create (flux_shell_t *shell)
goto error;

if (shell->info->shell_rank == 0) {
/* can't use stdin in standalone, no kvs to write to */
if (!in->shell->standalone) {
if (in->stdin_type == FLUX_INPUT_TYPE_SERVICE) {
if (flux_shell_service_register (in->shell,
"stdin",
shell_input_stdin_cb,
in) < 0)
shell_die_errno (1, "flux_shell_service_register");

/* Do not add a completion reference for the stdin service, we
* don't care if the user ever sends stdin */
}
if (in->stdin_type == FLUX_INPUT_TYPE_SERVICE) {
if (flux_shell_service_register (in->shell,
"stdin",
shell_input_stdin_cb,
in) < 0)
shell_die_errno (1, "flux_shell_service_register");

/* Do not add a completion reference for the stdin service, we
* don't care if the user ever sends stdin */
}

if (shell_input_header (in) < 0)
goto error;
if (shell_input_header (in) < 0)
goto error;

if (in->stdin_type == FLUX_INPUT_TYPE_FILE) {
if (shell_input_type_file_setup (in) < 0)
goto error;
/* Ok to start fd watcher now since shell_input_header()
* synchronously write guest.input header.
*/
flux_watcher_start (in->stdin_file.w);
}
if (in->stdin_type == FLUX_INPUT_TYPE_FILE) {
if (shell_input_type_file_setup (in) < 0)
goto error;
/* Ok to start fd watcher now since shell_input_header()
* synchronously write guest.input header.
*/
flux_watcher_start (in->stdin_file.w);
}
}

Expand Down Expand Up @@ -622,9 +619,7 @@ static int shell_input_task_init (flux_plugin_t *p,
task_input->task = task;

if (task_input->type == FLUX_TASK_INPUT_KVS) {
/* can't read stdin in standalone mode, no KVS to read from */
if (!task_input->in->shell->standalone
&& shell_task_input_kvs_start (task_input) < 0)
if (shell_task_input_kvs_start (task_input) < 0)
shell_die_errno (1, "shell_input_start_task_watch");
}
return 0;
Expand Down
1 change: 0 additions & 1 deletion src/shell/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ struct flux_shell {
int rc;

int verbose;
bool standalone;
int nosetpgrp;

struct aux_item *aux;
Expand Down
Loading

0 comments on commit e90547f

Please sign in to comment.