From e7022625956d9c883a3f20470decc12647dcf955 Mon Sep 17 00:00:00 2001 From: "Mark A. Grondona" Date: Wed, 8 Dec 2021 20:56:59 -0800 Subject: [PATCH] python: add slurm URI resolver plugin Problem: When running a Flux under Slurm, there is no convenient method to get the URI for instances running as Slurm jobs. Add an experimental "slurm" resolver plugin for the FluxURIResolver class. The slurm plugin works using the following method: * run `scontrol listpids` on the first node of the Slurm job via `srun --overlap --jobid=JOBID` * Try resolving each job PID using the "pid" resolver and return the first URI on success This plugin is best effort and can probably be easily fooled, for example if `flux start` or `flux broker` isn't run direclty as a Slurm job. --- src/bindings/python/flux/Makefile.am | 1 + .../python/flux/uri/resolvers/slurm.py | 107 ++++++++++++++++++ 2 files changed, 108 insertions(+) create mode 100644 src/bindings/python/flux/uri/resolvers/slurm.py diff --git a/src/bindings/python/flux/Makefile.am b/src/bindings/python/flux/Makefile.am index f73ef81b066f..b2988ed12413 100644 --- a/src/bindings/python/flux/Makefile.am +++ b/src/bindings/python/flux/Makefile.am @@ -44,6 +44,7 @@ nobase_fluxpy_PYTHON = \ uri/__init__.py \ uri/resolvers/jobid.py \ uri/resolvers/pid.py \ + uri/resolvers/slurm.py \ utils/parsedatetime/__init__.py \ utils/parsedatetime/parsedatetime.py \ utils/parsedatetime/warns.py \ diff --git a/src/bindings/python/flux/uri/resolvers/slurm.py b/src/bindings/python/flux/uri/resolvers/slurm.py new file mode 100644 index 000000000000..725db9dcaa84 --- /dev/null +++ b/src/bindings/python/flux/uri/resolvers/slurm.py @@ -0,0 +1,107 @@ +############################################################### +# Copyright 2021 Lawrence Livermore National Security, LLC +# (c.f. AUTHORS, NOTICE.LLNS, COPYING) +# +# This file is part of the Flux resource manager framework. +# For details, see https://github.com/flux-framework. +# +# SPDX-License-Identifier: LGPL-3.0 +############################################################### + +import os +import errno +import subprocess +from pathlib import PurePath + +from flux.uri import URIResolverPlugin, FluxURIResolver + + +def slurm_job_pids(jobid): + """Read pids for Slurm jobid using scontrol listpids""" + + pids = [] + sp = subprocess.run( + ["scontrol", "listpids", jobid], stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + if sp.returncode != 0: + raise OSError(errno.ENOENT) + for line in sp.stdout.decode("utf-8").split("\n"): + # Output fields are "PID,JOBID,STEPID,LOCALID,GLOBALID" + # + # The "main" flux-broker should have been directly launched + # from slurmstepd, so its LOCALID should be 0. Thus, only + # process pids with LOCALID 0 so we don't accidentally get + # the FLUX_URI for a subinstance of the main Flux instance + # (avoids the need to query the instance-level attribute) + fields = line.split() + try: + if int(fields[3]) != 0: + continue + pid = int(fields[0]) + if pid != os.getpid(): + pids.append(pid) + except (ValueError, IndexError): + pass + return pids + + +def slurm_resolve_remote(jobid): + """ + Attempt to resolve a Flux job URI for Slurm jobid by running + + srun --overlap --jobid flux uri slurm:jobid + + on the first node of the Slurm job + """ + + # Clear FLUX_URI in srun environment so we don't confuse + # ourselves and return the current FLUX_URI from flux-uri's + # environment + env = os.environ.copy() + if "FLUX_URI" in env: + del env["FLUX_URI"] + sp = subprocess.run( + [ + "srun", + "--overlap", + f"--jobid={jobid}", + "-n1", + "-N1", + "flux", + "uri", + f"slurm:{jobid}", + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env, + ) + if sp.returncode != 0: + raise ValueError(f"Unable to resolve Flux URI for Slurm job {jobid}") + return sp.stdout.decode("utf-8").rstrip() + + +class URIResolver(URIResolverPlugin): + """A URIResolver that can fetch a FLUX_URI from a Slurm job""" + + def describe(self): + return "Get URI for a Flux instance launched under Slurm" + + def resolve(self, uri): + jobid = PurePath(uri.path).parts[0] + + # Get list of local Slurm job pids from scontrol. + # If that fails, then the job might be running remotely, so + # try using srun to run `flux uri slurm:JOBID` on the first + # node of the Slurm job. + try: + pids = slurm_job_pids(jobid) + except OSError: + return slurm_resolve_remote(jobid) + + resolver = FluxURIResolver() + for pid in pids: + try: + return resolver.resolve(f"pid:{pid}").remote + except (ValueError, OSError): + pass + raise ValueError(f"PID {pid} doesn't seem to have a FLUX_URI")