Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd: support high-level URIs and JOBID arguments in flux-top and flux-proxy #4004

Merged
merged 11 commits into from
Dec 14, 2021
35 changes: 33 additions & 2 deletions doc/man1/flux-proxy.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,23 @@ flux-proxy(1)
SYNOPSIS
========

**flux** **proxy** [*OPTIONS*] URI [command [args...]]
**flux** **proxy** [*OPTIONS*] TARGET [command [args...]]

DESCRIPTION
===========

**flux proxy** connects to the Flux instance identified by *URI*,
**flux proxy** connects to the Flux instance identified by *TARGET*,
then spawns a shell with FLUX_URI pointing to a local:// socket
managed by the proxy program. As long as the shell is running,
the proxy program routes messages between the instance and the
local:// socket. Once the shell terminates, the proxy program
terminates and removes the socket.

The *TARGET* argument is a URI which can be resolved by ``flux uri``,
including a Flux jobid, a fully-resolved native ``ssh`` or ``local``
URI, or a resolvable URI with a scheme supported by a ``flux uri``
plugin. See :man1:`flux-uri` for details.

The purpose of **flux proxy** is to allow a connection to be reused,
for example where connection establishment has high latency or
requires authentication.
Expand Down Expand Up @@ -50,6 +55,32 @@ Connect to the same job remotely on host foo.com:

$ flux proxy ssh://foo.com/tmp/flux-123456-abcdef/0/local

Connect to a Flux instance running as job ƒQBfmbm in the current instance:

::

$ flux proxy ƒQBfmbm

or

::

$ flux proxy jobid:ƒQBfmbm


Connect to a Flux instance running as job ƒQ8ho35 in ƒQBfmbm

::

$ flux proxy jobid:ƒQBfmbm/ƒQ8ho35


Connect to a Flux instance started in Slurm job 1234

::

$ flux proxy slurm:1234


RESOURCES
=========
Expand Down
5 changes: 3 additions & 2 deletions doc/man1/flux-uri.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ SYNOPSIS
DESCRIPTION
===========

Connections to Flux are established via a Uniform Resource Indicator
Connections to Flux are established via a Uniform Resource Identifier
(URI) which is passed to the :man3:`flux_open` API call. These *native*
URIs indicate the "connector" which will be used to establish the
connection, and are typically either *local*, with a ``local`` URI
Expand Down Expand Up @@ -79,7 +79,8 @@ jobid:ID[/ID...]
``scheme:`` is provided in *TARGET* passed to ``flux uri``, so the
``jobid:`` prefix is optional. A hierarchy of Flux jobids is supported,
so ``f1234/f3456`` will resolve the URI for job ``f3456`` running in
job ``f1234`` in the current instance.
job ``f1234`` in the current instance. This scheme will raise an error
if the target job is not running.

pid:PID
This scheme attempts to read the ``FLUX_URI`` value from the process id
Expand Down
11 changes: 6 additions & 5 deletions src/bindings/python/flux/uri/resolvers/jobid.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ def _do_resolve(self, uri, flux_handle, force_local=False):

# Fetch the jobinfo object for this job
try:
uri = (
job_list_id(flux_handle, jobid, attrs=["annotations"])
.get_jobinfo()
.user.uri
)
job = job_list_id(
flux_handle, jobid, attrs=["state", "annotations"]
).get_jobinfo()
if job.state != "RUN":
raise ValueError(f"jobid {arg} is not running")
uri = job.user.uri
except FileNotFoundError as exc:
raise ValueError(f"jobid {arg} not found") from exc

Expand Down
12 changes: 9 additions & 3 deletions src/cmd/builtin/proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <inttypes.h>

#include "src/common/libutil/cleanup.h"
#include "src/common/libutil/uri.h"
#include "src/common/librouter/usock.h"
#include "src/common/librouter/router.h"

Expand Down Expand Up @@ -256,7 +257,8 @@ static int cmd_proxy (optparse_t *p, int ac, char *av[])
const char *tmpdir = getenv ("TMPDIR");
char workpath[PATH_MAX + 1];
char sockpath[PATH_MAX + 1];
const char *uri;
const char *target;
char *uri;
int optindex;
flux_reactor_t *r;

Expand All @@ -265,11 +267,15 @@ static int cmd_proxy (optparse_t *p, int ac, char *av[])
optindex = optparse_option_index (p);
if (optindex == ac)
optparse_fatal_usage (p, 1, "URI argument is required\n");
uri = av[optindex++];

target = av[optindex++];
if (!(uri = uri_resolve (target)))
log_msg_exit ("Unable to resolve %s to a URI", target);

memset (&ctx, 0, sizeof (ctx));
if (!(ctx.h = flux_open (uri, 0)))
log_err_exit ("%s", uri);
free (uri);
flux_log_set_appname (ctx.h, "proxy");
ctx.proxy_user = getuid ();
if (!(r = flux_reactor_create (SIGCHLD)))
Expand Down Expand Up @@ -338,7 +344,7 @@ int subcommand_proxy_register (optparse_t *p)
optparse_err_t e;

e = optparse_reg_subcommand (p, "proxy", cmd_proxy,
"[OPTIONS] URI [COMMAND...]",
"[OPTIONS] JOBID|URI [COMMAND...]",
"Route messages to/from Flux instance",
0,
proxy_opts);
Expand Down
67 changes: 12 additions & 55 deletions src/cmd/top/top.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <unistd.h>
#include <locale.h>

#include "src/common/libutil/uri.h"
#include "top.h"

static const double job_activity_rate_limit = 2;
Expand Down Expand Up @@ -100,65 +101,23 @@ void refresh_cb (flux_reactor_t *r,
doupdate ();
}

static char * job_get_uri (flux_t *h, flux_jobid_t id)
{
flux_future_t *f;
const char *uri = NULL;
char *cpy;
int state;

if (!(f = flux_rpc_pack (h,
"job-list.list-id",
0,
0,
"{s:I s:[ss]}",
"id", id,
"attrs",
"state", "annotations"))
|| flux_rpc_get_unpack (f,
"{s:{s:i s?{s?{s?s}}}}",
"job",
"state", &state,
"annotations",
"user",
"uri", &uri) < 0) {
if (errno == ENOENT)
fatal (0, "unknown jobid");
fatal (0, "failed to list jobid");
}

if (!(state & FLUX_JOB_STATE_RUNNING))
fatal (0, "job is not running");
if (uri == NULL)
fatal (0, "error reading job remote-uri - not a Flux instance?");
cpy = strdup (uri);
flux_future_destroy (f);
return cpy;
}

/* Get handle to Flux instance to be monitored.
* If id = FLUX_JOBID_ANY, merely call flux_open().
* Otherwise, fetch remote-uri from job and open that.
*/
static flux_t *open_flux_instance (flux_jobid_t id)
static flux_t *open_flux_instance (const char *target)
{
flux_t *h;
flux_future_t *f = NULL;
flux_t *job_h = NULL;
char *uri;
char *uri = NULL;

if (!(h = flux_open (NULL, 0)))
if (target && !(uri = uri_resolve (target)))
fatal (0, "failed to resolve target %s to a Flux URI", target);
if (!(h = flux_open (uri, 0)))
fatal (errno, "error connecting to Flux");
if (id == FLUX_JOBID_ANY)
return h;
if (!(uri = job_get_uri (h, id)))
fatal (0, "out of memory getting uri");
if (!(job_h = flux_open (uri, 0)))
fatal (errno, "error connecting to job");
free (uri);
flux_future_destroy (f);
flux_close (h);
return job_h;
return h;
}

/* Initialize 'stdscr' and register colors.
Expand Down Expand Up @@ -194,13 +153,14 @@ static struct optparse_option cmdopts[] = {
OPTPARSE_TABLE_END,
};

static const char *usage_msg = "[OPTIONS] [JOBID]";
static const char *usage_msg = "[OPTIONS] [TARGET]";

int main (int argc, char *argv[])
{
int optindex;
struct top top;
int reactor_flags = 0;
const char *target = NULL;

memset (&top, 0, sizeof (top));
top.id = FLUX_JOBID_ANY;
Expand All @@ -216,16 +176,13 @@ int main (int argc, char *argv[])

if ((optindex = optparse_parse_args (top.opts, argc, argv)) < 0)
exit (1);
if (optindex < argc) {
const char *s = argv[optindex++];
if (flux_job_id_parse (s, &top.id) < 0)
fatal (errno, "failed to parse JOBID argument");
}
if (optindex < argc)
target = argv[optindex++];
if (optindex != argc) {
optparse_print_usage (top.opts);
exit (1);
}
top.h = open_flux_instance (top.id);
top.h = open_flux_instance (target);
flux_fatal_set (top.h, flux_fatal, &top);
if (!(top.refresh = flux_prepare_watcher_create (flux_get_reactor (top.h),
refresh_cb,
Expand Down
4 changes: 3 additions & 1 deletion src/common/libutil/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ libutil_la_SOURCES = \
digest.c \
digest.h \
jpath.c \
jpath.h
jpath.h \
uri.c \
uri.h

EXTRA_DIST = veb_mach.c

Expand Down
64 changes: 64 additions & 0 deletions src/common/libutil/uri.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/************************************************************\
* Copyright 2021 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

#if HAVE_CONFIG_H
#include "config.h"
#endif
#include <stdlib.h>
#include <string.h>

#include "src/common/libyuarel/yuarel.h"
#include "popen2.h"
#include "read_all.h"
#include "log.h"

static void nullify_newline (char *str)
{
int n;
if (str && (n = strlen (str)) > 0) {
if (str[n-1] == '\n')
str[n-1] = '\0';
}
}

char *uri_resolve (const char *uri)
{
struct popen2_child *child = NULL;
struct yuarel yuri;
char *result = NULL;
char *argv[] = { "flux", "uri", (char *) uri, NULL };

char *cpy = strdup (uri);
if (!cpy)
return NULL;
if (yuarel_parse (&yuri, cpy) == 0) {
if (strcmp (yuri.scheme, "ssh") == 0
|| strcmp (yuri.scheme, "local") == 0) {
free (cpy);
return strdup (uri);
}
}
free (cpy);

if (!(child = popen2 ("flux", argv))
|| (read_all (popen2_get_fd (child), (void **)&result) < 0))
goto out;
nullify_newline (result);
out:
if (pclose2 (child) < 0) {
/* flux-uri returned error */
free (result);
result = NULL;
}
return result;
}


// vi:ts=4 sw=4 expandtab
33 changes: 33 additions & 0 deletions src/common/libutil/uri.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/************************************************************\
* Copyright 2021 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

#ifndef _UTIL_URI_H
#define _UTIL_URI_H

/* Resolve a target or "high-level" URI with the flux-uri(1) command,
* returning the result. If the URI is already a native Flux URI (e.g.
* `local://` or `ssh://`), then `flux uri` is *not* called and instead
* the target is returned unmodified to avoid the extra overhead of
* running a subprocess.
*
* On failure, NULL is returned. Stderr is not redirected or consumed,
* so the expectation is that the underlying `flux uri` error will
* already be copied to the callers tty.
*
* Caller must free the returned string on success.
*
* Note: this function uses popen2() to execute flux-uri as a subprocess,
* so care should be taken in when and how this function is called.
*/
char *uri_resolve (const char *target);

#endif /* !_UTIL_URI_H */

// vi:ts=4 sw=4 expandtab
Loading