Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

python/test: update to new tmpdir scheme #5

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/bindings/lua/wreck.lua
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ local function get_filtered_env ()
for k,v in pairs (env) do
if k:match ("SLURM_") then env[k] = nil end
if k:match ("FLUX_API") then env[k] = nil end
if k:match ("FLUX_TMPDIR") then env[k] = nil end
if k:match ("FLUX_URI") then env[k] = nil end
end
-- XXX: MVAPICH2 at least requires MPIRUN_RSH_LAUNCH to be set
-- in the environment or PMI doesn't work (for unknown reason)
Expand Down
14 changes: 5 additions & 9 deletions src/bindings/python/flux/command_helpers.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
from __future__ import print_function
import sys, os, re, flux

def list_instances(list_all, top_only, sid=None):
def list_instances(sid=None):
tmpdir = '/tmp'
tmpdir = os.environ.get('TMPDIR', tmpdir)
if not list_all:
tmpdir = os.environ.get('FLUX_TMPDIR', tmpdir)

fdir = re.compile('flux-(?P<id>[^-]+)-0')
fdir = re.compile('flux-(?P<id>[^-]+)-')

# Sometimes flux tmpdirs end up in sub-directories of previous tmpdirs
for dirname, dirs, files in os.walk(tmpdir, topdown=True):
for m in [fdir.match(d) for d in dirs]:
if not m: continue
if sid is not None and not re.search('flux-' + sid + '-0',
if sid is not None and not re.search('flux-' + sid + '-',
m.string):
continue
job = os.path.join(tmpdir, m.string)
job = os.path.join (os.path.join(tmpdir, m.string), '0')
uri = 'local://' + job
try:
with open(os.path.join(job, 'broker.pid')) as f:
Expand All @@ -26,5 +23,4 @@ def list_instances(list_all, top_only, sid=None):
yield (m.group('id'), uri)
except:
pass
if top_only:
break
break
8 changes: 2 additions & 6 deletions src/bindings/python/test_commands/sideflux.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,15 @@ def __init__(self, size=1):
global flux_exe
self.size = size
self.tmpdir = tempfile.mkdtemp(prefix='flux-sandbox-')
self.flux_tmpdir = os.path.join(self.tmpdir, 'flux-sideflux-0')
self.flux_uri = 'local://' + self.flux_tmpdir
os.makedirs(self.flux_tmpdir)
self.flux_uri = 'local://' + self.tmpdir + '/0'
self.cleaned = False

def start(self):
flux_command = [flux_exe, 'start', '--size={}'.format(self.size), '-o',
'-L,stderr,--sid,sideflux', 'bash']
'-L,stderr,--socket-directory,' + self.tmpdir , 'bash']
# print ' '.join(flux_command)
FNULL = open(os.devnull, 'w+')
self.subenv = os.environ.copy()
self.subenv.pop('FLUX_TMPDIR', None)
self.subenv.pop('FLUX_URI', None)
self.subenv['TMPDIR'] = self.tmpdir
self.sub = subprocess.Popen(
Expand All @@ -72,7 +69,6 @@ def start(self):
print('echo READY', file=self.sub.stdin)

self.env_items = {}
# self.env_items['FLUX_TMPDIR'] = self.flux_tmpdir
self.env_items['FLUX_URI'] = self.flux_uri

while True:
Expand Down
146 changes: 84 additions & 62 deletions src/broker/broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ typedef struct {
uint32_t size; /* session size */
uint32_t rank; /* our rank in session */
char *sid; /* session id */
char *socket_dir;
char *local_uri;
char *parent_uri;

/* Modules
*/
modhash_t *modhash;
Expand Down Expand Up @@ -152,8 +156,8 @@ static void load_modules (ctx_t *ctx, zlist_t *modules, zlist_t *modopts,
zhash_t *modexclude, const char *modpath);

static void update_proctitle (ctx_t *ctx);
static void update_environment (ctx_t *ctx);
static void update_pidfile (ctx_t *ctx, bool force);
static void create_rankdir (ctx_t *ctx);
static void update_pidfile (ctx_t *ctx);
static void rank0_shell (ctx_t *ctx);
static int rank0_shell_exit_handler (struct subprocess *p, void *arg);

Expand All @@ -162,7 +166,7 @@ static void boot_local (ctx_t *ctx);

static const struct flux_handle_ops broker_handle_ops;

#define OPTIONS "t:vqR:S:p:M:X:L:N:Pk:e:r:s:c:fnH:O:x:T:g:"
#define OPTIONS "t:vqR:S:p:M:X:L:N:Pk:e:r:s:c:nH:O:x:T:g:D:"
static const struct option longopts[] = {
{"sid", required_argument, 0, 'N'},
{"child-uri", required_argument, 0, 't'},
Expand All @@ -186,6 +190,7 @@ static const struct option longopts[] = {
{"heartrate", required_argument, 0, 'H'},
{"timeout", required_argument, 0, 'T'},
{"shutdown-grace", required_argument, 0, 'g'},
{"socket-directory",required_argument, 0, 'D'},
{0, 0, 0, 0},
};

Expand All @@ -212,10 +217,10 @@ static void usage (void)
" -k,--k-ary K Wire up in a k-ary tree\n"
" -c,--command string Run command on rank 0\n"
" -n,--noshell Do not spawn a shell even if on a tty\n"
" -f,--force Kill rival broker and start\n"
" -H,--heartrate SECS Set heartrate in seconds (rank 0 only)\n"
" -T,--timeout SECS Set wireup timeout in seconds (rank 0 only)\n"
" -g,--shutdown-grace SECS Set shutdown grace period in seconds\n"
" -D,--socket-directory DIR Create ipc sockets in DIR (local bootstrap)\n"
);
exit (1);
}
Expand All @@ -224,7 +229,6 @@ int main (int argc, char *argv[])
{
int c;
ctx_t ctx;
bool fopt = false;
bool nopt = false;
zlist_t *modules, *modopts;
zhash_t *modexclude;
Expand Down Expand Up @@ -340,9 +344,6 @@ int main (int argc, char *argv[])
case 'n': /* --noshell */
nopt = true;
break;
case 'f': /* --force */
fopt = true;
break;
case 'H': /* --heartrate SECS */
if (heartbeat_set_ratestr (ctx.heartbeat, optarg) < 0)
err_exit ("heartrate `%s'", optarg);
Expand All @@ -353,6 +354,17 @@ int main (int argc, char *argv[])
case 'g': /* --shutdown-grace SECS */
ctx.shutdown_grace = strtod (optarg, NULL);
break;
case 'D': { /* --socket-directory DIR */
struct stat sb;
if (stat (optarg, &sb) < 0)
err_exit ("%s", optarg);
if (!S_ISDIR (sb.st_mode))
msg_exit ("%s: not a directory", optarg);
if ((sb.st_mode & S_IRWXU) != S_IRWXU)
msg_exit ("%s: invalid mode: 0%o", optarg, sb.st_mode);
ctx.socket_dir = xstrdup (optarg);
break;
}
default:
usage ();
}
Expand Down Expand Up @@ -380,7 +392,6 @@ int main (int argc, char *argv[])

/* Process config from the KVS of enclosing instance (if any)
* and not forced to use a config file by the command line.
* (FLUX_TMPDIR has not yet been overridden within this instance)
*/
ctx.cf = flux_conf_create ();
if (!(confdir = getenv ("FLUX_CONF_DIRECTORY")))
Expand All @@ -391,7 +402,7 @@ int main (int argc, char *argv[])
msg ("Loading config from %s", confdir);
if (flux_conf_load (ctx.cf) < 0 && errno != ESRCH)
err_exit ("%s", confdir);
} else if (getenv ("FLUX_TMPDIR")) {
} else if (getenv ("FLUX_URI")) {
flux_t h;
if (ctx.verbose)
msg ("Loading config from KVS");
Expand All @@ -400,6 +411,11 @@ int main (int argc, char *argv[])
if (kvs_conf_load (h, ctx.cf) < 0)
err_exit ("could not load config from KVS");
flux_close (h);
/* Stash FLUX_URI value for later use, but unset it in the environment
* so a connection to the enclosing instance is not made inadvertantly.
*/
ctx.parent_uri = xstrdup (getenv ("FLUX_URI"));
unsetenv ("FLUX_URI");
}

/* Arrange to load config entries into kvs config.*
Expand Down Expand Up @@ -480,6 +496,23 @@ int main (int argc, char *argv[])
}
if (!ctx.sid)
ctx.sid = xstrdup ("0");

/* Now that we know the rank (either from command line or PMI,
* create a subdirectory of socket_dir for the sockets and pidfile
* specific to this rank of the broker.
*/
if (!ctx.socket_dir) {
char *tmpdir = getenv ("TMPDIR");
char *template = xasprintf ("%s/flux-%s-XXXXXX",
tmpdir ? tmpdir : "/tmp", ctx.sid);

if (!(ctx.socket_dir = mkdtemp (template)))
err_exit ("mkdtemp %s", template);
cleanup_push_string (cleanup_directory, ctx.socket_dir);
}

create_rankdir (&ctx);

/* If we're missing the wiring, presume that the session is to be
* started on a single node and compute appropriate ipc:/// sockets.
*/
Expand Down Expand Up @@ -523,8 +556,7 @@ int main (int argc, char *argv[])
}

update_proctitle (&ctx);
update_environment (&ctx);
update_pidfile (&ctx, fopt);
update_pidfile (&ctx);

if (!nopt && ctx.rank == 0 && (isatty (STDIN_FILENO) || ctx.shell_cmd)) {
ctx.shell = subprocess_create (ctx.sm);
Expand Down Expand Up @@ -622,6 +654,11 @@ int main (int argc, char *argv[])
zlist_destroy (&modopts); /* autofree set */
zhash_destroy (&modexclude); /* values const (no destructor) */

if (ctx.parent_uri)
free (ctx.parent_uri);
if (ctx.local_uri)
free (ctx.local_uri);

return 0;
}

Expand Down Expand Up @@ -678,51 +715,32 @@ static void update_proctitle (ctx_t *ctx)
ctx->proctitle = s;
}

static void update_environment (ctx_t *ctx)
/* The 'ranktmp' dir will contain the broker.pid file and local:// socket.
* It will be created in ctx->socket_dir.
*/
static void create_rankdir (ctx_t *ctx)
{
const char *oldtmp = flux_get_tmpdir ();
static char tmpdir[PATH_MAX + 1];

(void)snprintf (tmpdir, sizeof (tmpdir), "%s/flux-%s-%d",
oldtmp, ctx->sid, ctx->rank);
if (mkdir (tmpdir, 0700) < 0 && errno != EEXIST)
err_exit ("mkdir %s", tmpdir);
if (ctx->verbose)
msg ("FLUX_TMPDIR: %s", tmpdir);
if (flux_set_tmpdir (tmpdir) < 0)
err_exit ("flux_set_tmpdir");
cleanup_push_string(cleanup_directory, tmpdir);
char *ranktmp = xasprintf ("%s/%d", ctx->socket_dir, ctx->rank);

if (mkdir (ranktmp, 0700) < 0)
err_exit ("mkdir %s", ranktmp);
cleanup_push_string (cleanup_directory, ranktmp);
ctx->local_uri = xasprintf ("local://%s", ranktmp);
free (ranktmp);
}

static void update_pidfile (ctx_t *ctx, bool force)
static void update_pidfile (ctx_t *ctx)
{
const char *tmpdir = flux_get_tmpdir ();
char *pidfile;
pid_t pid;
char *pidfile = xasprintf ("%s/%d/broker.pid", ctx->socket_dir, ctx->rank);
FILE *f;

if (asprintf (&pidfile, "%s/broker.pid", tmpdir) < 0)
oom ();
if ((f = fopen (pidfile, "r"))) {
if (fscanf (f, "%u", &pid) == 1 && kill (pid, 0) == 0) {
if (force) {
if (kill (pid, SIGKILL) < 0)
err_exit ("kill %d", pid);
msg ("killed broker with pid %d", pid);
} else
msg_exit ("broker is already running in %s, pid %d", tmpdir, pid);
}
(void)fclose (f);
}
if (!(f = fopen (pidfile, "w+")))
err_exit ("%s", pidfile);
if (fprintf (f, "%u", getpid ()) < 0)
err_exit ("%s", pidfile);
if (fclose(f) < 0)
err_exit ("%s", pidfile);
if (ctx->verbose)
msg ("pidfile: %s", pidfile);
cleanup_push_string(cleanup_file, pidfile);
cleanup_push_string (cleanup_file, pidfile);
free (pidfile);
}

Expand Down Expand Up @@ -757,6 +775,7 @@ static void rank0_shell (ctx_t *ctx)
subprocess_argv_append (ctx->shell, ctx->shell_cmd);
}
subprocess_set_environ (ctx->shell, environ);
subprocess_setenv (ctx->shell, "FLUX_URI", ctx->local_uri, 1);

if (!ctx->quiet)
flux_log (ctx->h, LOG_INFO, "starting shell");
Expand Down Expand Up @@ -821,25 +840,24 @@ static void boot_pmi (ctx_t *ctx)

static void boot_local (ctx_t *ctx)
{
const char *tmpdir = flux_get_tmpdir ();
int rrank = ctx->rank == 0 ? ctx->size - 1 : ctx->rank - 1;
char * reqfile = xasprintf("%s/flux-%s-%d-req", tmpdir, ctx->sid, ctx->rank);
char * eventfile = xasprintf("%s/flux-%s-event", tmpdir, ctx->sid);
char *reqfile = xasprintf ("%s/%d/req", ctx->socket_dir, ctx->rank);
char *eventfile = xasprintf ("%s/event", ctx->socket_dir);
overlay_set_child (ctx->overlay, "ipc://%s", reqfile);
cleanup_push_string(cleanup_file, reqfile);
free(reqfile);
cleanup_push_string (cleanup_file, reqfile);
free (reqfile);
if (ctx->rank > 0) {
int prank = ctx->k_ary == 0 ? 0 : (ctx->rank - 1) / ctx->k_ary;
overlay_push_parent (ctx->overlay, "ipc://%s/flux-%s-%d-req",
tmpdir, ctx->sid, prank);
overlay_push_parent (ctx->overlay, "ipc://%s/%d/req",
ctx->socket_dir, prank);
}
overlay_set_event (ctx->overlay, "ipc://%s", eventfile);
if (ctx->rank == 0) {
cleanup_push_string(cleanup_file, eventfile);
cleanup_push_string (cleanup_file, eventfile);
}
free(eventfile);
overlay_set_right (ctx->overlay, "ipc://%s/flux-%s-%d-req",
tmpdir, ctx->sid, rrank);
free (eventfile);
overlay_set_right (ctx->overlay, "ipc://%s/%d/req",
ctx->socket_dir, rrank);
}

static bool nodeset_suffix_member (char *name, uint32_t rank)
Expand Down Expand Up @@ -1183,13 +1201,13 @@ static int cmb_exec_cb (zmsg_t **zmsg, void *arg)
if (val != NULL)
subprocess_setenv (p, iter.key, val, 1);
}
/*
* Override key FLUX environment variables in env array
*/
subprocess_setenv (p, "FLUX_TMPDIR", getenv ("FLUX_TMPDIR"), 1);
}
else
subprocess_set_environ (p, environ);
/*
* Override key FLUX environment variables in env array
*/
subprocess_setenv (p, "FLUX_URI", ctx->local_uri, 1);

if (json_object_object_get_ex (request, "cwd", &o) && o != NULL) {
const char *dir = json_object_get_string (o);
Expand Down Expand Up @@ -1340,10 +1358,14 @@ static int cmb_getattr_cb (zmsg_t **zmsg, void *arg)
}
if (!strcmp (name, "snoop-uri"))
val = snoop_get_uri (ctx->snoop);
else if (!strcmp (name, "parent-uri"))
else if (!strcmp (name, "tbon-parent-uri"))
val = overlay_get_parent (ctx->overlay);
else if (!strcmp (name, "request-uri"))
else if (!strcmp (name, "tbon-request-uri"))
val = overlay_get_child (ctx->overlay);
else if (!strcmp (name, "local-uri"))
val = ctx->local_uri;
else if (!strcmp (name, "parent-uri"))
val = ctx->parent_uri;
else
errno = ENOENT;
if (!val)
Expand Down
4 changes: 2 additions & 2 deletions src/cmd/flux-comms.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ void usage (void)
"Usage: flux-comms [-r N] idle\n"
" flux-comms [-r N] getattr attr\n"
" flux-comms info\n"
" flux-comms [-r N] reparent new-parent-uri\n"
" flux-comms [-r N] reparent new-uri\n"
" flux-comms [-r N] panic [msg ...]\n"
" flux-comms [-r N] failover\n"
" flux-comms [-r N] recover\n"
Expand Down Expand Up @@ -104,7 +104,7 @@ int main (int argc, char *argv[])
} else if (!strcmp (cmd, "getattr")) {
char *s;
if (optind != argc - 1)
msg_exit ("getattr snoop-uri, parent-uri, or request-uri");
msg_exit ("Usage: flux comms getattr attrname");
if (!(s = flux_getattr (h, rank, argv[optind])))
err_exit ("%s", argv[optind]);
printf ("%s\n", s);
Expand Down
Loading