Skip to content

Commit

Permalink
shell: make task list available before shell.init
Browse files Browse the repository at this point in the history
Problem: It would be convenient to have access to the shell task
list in the shell.init plugin callback, but currenty the shell
creates, initializes, and starts tasks in a single loop.

Split task creation and start into separate functions:

 - shell_create_tasks() which creates all local tasks and adds
   them to the shell->tasks list.
 - shell_start_tasks() which completes task initialization and
   starts the tasks.

Move shell_create_tasks() before the shell.init callback is invoked.
  • Loading branch information
grondo committed Dec 25, 2024
1 parent 696350d commit 75f48ca
Showing 1 changed file with 84 additions and 63 deletions.
147 changes: 84 additions & 63 deletions src/shell/shell.c
Original file line number Diff line number Diff line change
Expand Up @@ -1771,11 +1771,86 @@ static int frob_command (flux_shell_t *shell, flux_cmd_t *cmd)
return 0;
}

static int shell_create_tasks (flux_shell_t *shell)
{
int i = 0;
int taskid;

if (!(shell->tasks = zlist_new ()))
shell_die (1, "zlist_new failed");

taskid = idset_first (shell->info->taskids);
while (taskid != IDSET_INVALID_ID) {
struct shell_task *task;

if (!(task = shell_task_create (shell, i, taskid)))
shell_die (1, "shell_task_create index=%d", i);

task->pre_exec_cb = shell_task_exec;
task->pre_exec_arg = shell;

if (zlist_append (shell->tasks, task) < 0)
shell_die (1, "zlist_append failed");
i++;
taskid = idset_next (shell->info->taskids, taskid);
}
return 0;
}

static int shell_start_tasks (flux_shell_t *shell)
{
flux_shell_task_t *task;

task = zlist_first (shell->tasks);
while (task) {
shell->current_task = task;

/* Call all plugin task_init callbacks:
*/
if (shell_task_init (shell) < 0)
shell_die (1, "failed to initialize taskid=%d", task->rank);

/* Render any mustache templates in command args
*/
if (frob_command (shell, task->cmd))
shell_die (1, "failed rendering of mustachioed command args");

if (shell_task_start (shell, task, task_completion_cb, shell) < 0) {
int ec = 1;
/* bash standard, 126 for permission/access denied, 127
* for command not found. Note that shell only launches
* local tasks, therefore no need to check for
* EHOSTUNREACH.
*/
if (errno == EPERM || errno == EACCES)
ec = 126;
else if (errno == ENOENT)
ec = 127;
shell_die (ec,
"task %d (host %s): start failed: %s: %s",
task->rank,
shell->hostname,
flux_cmd_arg (task->cmd, 0),
strerror (errno));
}

if (flux_shell_add_completion_ref (shell, "task%d", task->rank) < 0)
shell_die (1, "flux_shell_add_completion_ref");

/* Call all plugin task_fork callbacks:
*/
if (shell_task_forked (shell) < 0)
shell_die (1, "shell_task_forked");

task = zlist_next (shell->tasks);
}
shell->current_task = NULL;
return 0;
}

int main (int argc, char *argv[])
{
flux_shell_t shell;
int i;
unsigned int taskid;

/* Initialize locale from environment
*/
Expand Down Expand Up @@ -1848,6 +1923,11 @@ int main (int argc, char *argv[])
if (shell_register_event_context (&shell) < 0)
shell_die (1, "failed to add standard shell event context");

/* Create all tasks
*/
if (shell_create_tasks (&shell) < 0)
shell_die_errno (1, "shell_create_tasks");

/* Call "shell_init" plugins.
*/
if (shell_init (&shell) < 0)
Expand All @@ -1874,70 +1954,11 @@ int main (int argc, char *argv[])
if (shell_post_init (&shell) < 0)
shell_die_errno (1, "shell_post_init");

/* Create tasks
/* Start all tasks
*/
if (!(shell.tasks = zlist_new ()))
if (shell_start_tasks (&shell))
shell_die (1, "zlist_new failed");

i = 0;
taskid = idset_first (shell.info->taskids);
while (taskid != IDSET_INVALID_ID) {
struct shell_task *task;

if (!(task = shell_task_create (&shell, i, taskid)))
shell_die (1, "shell_task_create index=%d", i);

task->pre_exec_cb = shell_task_exec;
task->pre_exec_arg = &shell;
shell.current_task = task;

/* Call all plugin task_init callbacks:
*/
if (shell_task_init (&shell) < 0)
shell_die (1, "failed to initialize taskid=%d", i);

/* Render any mustache templates in command args
*/
if (frob_command (&shell, task->cmd))
shell_die (1, "failed rendering of mustachioed command args");

if (shell_task_start (&shell, task, task_completion_cb, &shell) < 0) {
int ec = 1;
/* bash standard, 126 for permission/access denied, 127
* for command not found. Note that shell only launches
* local tasks, therefore no need to check for
* EHOSTUNREACH.
*/
if (errno == EPERM || errno == EACCES)
ec = 126;
else if (errno == ENOENT)
ec = 127;
shell_die (ec,
"task %d (host %s): start failed: %s: %s",
task->rank,
shell.hostname,
flux_cmd_arg (task->cmd, 0),
strerror (errno));
}

if (zlist_append (shell.tasks, task) < 0)
shell_die (1, "zlist_append failed");

if (flux_shell_add_completion_ref (&shell, "task%d", task->rank) < 0)
shell_die (1, "flux_shell_add_completion_ref");

/* Call all plugin task_fork callbacks:
*/
if (shell_task_forked (&shell) < 0)
shell_die (1, "shell_task_forked");

i++;
taskid = idset_next (shell.info->taskids, taskid);
}
/* Reset current task since we've left task-specific context:
*/
shell.current_task = NULL;

if (shell_start (&shell) < 0)
shell_die_errno (1, "shell.start callback(s) failed");

Expand Down

0 comments on commit 75f48ca

Please sign in to comment.