From 51e7a059c79a5f90aa8d02b6baf28b9b6e40b092 Mon Sep 17 00:00:00 2001 From: James Corbett Date: Fri, 30 Aug 2024 12:55:01 -0700 Subject: [PATCH 1/5] dws2jgf: add option to use config file Problem: admins prefer configuring clusters with a config file to doing so with R. However, the current design of flux-dws2jgf forces the usage of R as input to generate JGF. When https://github.com/flux-framework/flux-core/issues/6245 is implemented, Flux will be able to combine a config file with JGF. Add an option to use a config file as input to the JGF generation, so administrators will be able to use a config file + JGF instead of R + JGF. --- src/cmd/flux-dws2jgf.py | 29 +++++++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/src/cmd/flux-dws2jgf.py b/src/cmd/flux-dws2jgf.py index d042458..9f6e818 100755 --- a/src/cmd/flux-dws2jgf.py +++ b/src/cmd/flux-dws2jgf.py @@ -6,6 +6,7 @@ import re import logging import itertools +import subprocess import socket import flux @@ -255,7 +256,10 @@ def main(): parser = argparse.ArgumentParser( prog="flux-dws2jgf", formatter_class=flux.util.help_formatter(), - description="Print JGF representation of Rabbit nodes", + description=( + "Print JGF representation of Rabbit nodes. Reads R from stdin" + "or a config file passed with the --from-config option." + ), ) parser.add_argument( "--no-validate", @@ -281,13 +285,34 @@ def main(): "If unspecified, use the hostname stripped of numerics." ), ) + parser.add_argument( + "--from-config", + metavar="FILE", + help=( + "Generate JGF based on a Flux config TOML file containing " + "a resource.config table" + ), + ) args = parser.parse_args() if not args.cluster_name: args.cluster_name = "".join( i for i in socket.gethostname() if not i.isdigit() ).rstrip("-") - input_r = json.load(sys.stdin) + if args.from_config is None: + input_r = json.load(sys.stdin) + else: + proc = subprocess.run( + f"flux R parse-config {args.from_config}".split(), + capture_output=True, + check=False, + ) + if proc.returncode != 0: + raise ValueError( + f"Could not parse config file {args.from_config!r}, " + "error message was {proc.stderr}" + ) + input_r = json.load(proc.stdout) nnfs = [x for x in get_storage()["items"]] r_hostlist = Hostlist(input_r["execution"]["nodelist"]) dws_computes = set( From 45625fd911f2c65bee32b8477a078f6672501917 Mon Sep 17 00:00:00 2001 From: James Corbett Date: Fri, 30 Aug 2024 13:54:27 -0700 Subject: [PATCH 2/5] dws2jgf: add script for generating rabbit map Problem: as described in #193, JGF is too unwieldy to be stored in Ansible. On the other hand, Flux's ability to start up and run jobs cannot be dependent on the responsiveness of kubernetes, so generating JGF from kubernetes before starting Flux is not an option. A solution would be to store some static rabbit data in ansible, generated by reading from kubernetes. This data could be read in to generate JGF. Add a script that generates a JSON file describing which nodes map to which rabbits and what the capacity of each rabbit is. --- src/cmd/Makefile.am | 3 +- src/cmd/flux-rabbitmapping.py | 72 +++++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+), 1 deletion(-) create mode 100644 src/cmd/flux-rabbitmapping.py diff --git a/src/cmd/Makefile.am b/src/cmd/Makefile.am index 019c00a..5158825 100644 --- a/src/cmd/Makefile.am +++ b/src/cmd/Makefile.am @@ -1,2 +1,3 @@ dist_fluxcmd_SCRIPTS = \ - flux-dws2jgf.py + flux-dws2jgf.py \ + flux-rabbitmapping.py diff --git a/src/cmd/flux-rabbitmapping.py b/src/cmd/flux-rabbitmapping.py new file mode 100644 index 0000000..8e1724f --- /dev/null +++ b/src/cmd/flux-rabbitmapping.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python3 + +import argparse +import sys +import json +import subprocess + +import flux +from flux.hostlist import Hostlist + +import kubernetes as k8s +from kubernetes.client.rest import ApiException + + +def get_storage(config_file): + k8s_client = k8s.config.new_client_from_config(config_file=config_file) + try: + api_instance = k8s.client.CustomObjectsApi(k8s_client) + except ApiException as rest_exception: + if rest_exception.status == 403: + raise Exception( + "You must be logged in to the K8s or OpenShift cluster to continue" + ) + raise + + group = "dataworkflowservices.github.io" + version = "v1alpha2" + plural = "storages" + return api_instance.list_cluster_custom_object(group, version, plural) + + +def main(): + parser = argparse.ArgumentParser( + formatter_class=flux.util.help_formatter(), + description=("Create a mapping between compute nodes and rabbit nodes"), + ) + parser.add_argument( + "--kubeconfig", + "-k", + default=None, + metavar="FILE", + help="Path to kubeconfig file to use", + ) + parser.add_argument( + "--indent", + "-i", + default=None, + type=int, + metavar="N", + help="Number of spaces to indent output JSON document", + ) + parser.add_argument( + "--nosort", + action="store_false", + help="Do not sort keys in output JSON document", + ) + args = parser.parse_args() + rabbit_mapping = {"computes": {}, "rabbits": {}} + for nnf in get_storage(args.kubeconfig)["items"]: + hlist = Hostlist() + nnf_name = nnf["metadata"]["name"] + for compute in nnf["status"]["access"].get("computes", []): + hlist.append(compute["name"]) + rabbit_mapping["computes"][compute["name"]] = nnf_name + rabbit_mapping["rabbits"][nnf_name] = {} + rabbit_mapping["rabbits"][nnf_name]["hostlist"] = hlist.uniq().encode() + rabbit_mapping["rabbits"][nnf_name]["capacity"] = nnf["status"]["capacity"] + json.dump(rabbit_mapping, sys.stdout, indent=args.indent, sort_keys=args.nosort) + + +if __name__ == "__main__": + main() From 10d961ccd29cf67bde83ab510af8dd36322baf57 Mon Sep 17 00:00:00 2001 From: James Corbett Date: Fri, 30 Aug 2024 15:28:58 -0700 Subject: [PATCH 3/5] dws2jgf: read from JSON file instead of k8s Problem: as described in #193, JGF is too unwieldy to be stored in Ansible. On the other hand, Flux's ability to start up and run jobs cannot be dependent on the responsiveness of kubernetes, so generating JGF from kubernetes before starting Flux is not an option. Change flux-dws2jgf to read from a static JSON file generated by the flux-rabbitmapping script, instead of from kubernetes. --- src/cmd/flux-dws2jgf.py | 94 +++++++++++++++-------------------------- 1 file changed, 34 insertions(+), 60 deletions(-) diff --git a/src/cmd/flux-dws2jgf.py b/src/cmd/flux-dws2jgf.py index 9f6e818..1c5f785 100755 --- a/src/cmd/flux-dws2jgf.py +++ b/src/cmd/flux-dws2jgf.py @@ -3,23 +3,18 @@ import argparse import sys import json -import re import logging -import itertools import subprocess import socket import flux from flux.idset import IDset from flux.hostlist import Hostlist -from flux.idset import IDset from fluxion.resourcegraph.V1 import ( FluxionResourceGraphV1, FluxionResourcePoolV1, FluxionResourceRelationshipV1, ) -import kubernetes as k8s -from kubernetes.client.rest import ApiException class ElCapResourcePoolV1(FluxionResourcePoolV1): @@ -54,12 +49,12 @@ class Coral2Graph(FluxionResourceGraphV1): CORAL2 Graph: extend jsongraph's Graph class """ - def __init__(self, rv1, nnfs, r_hostlist, chunks_per_nnf, cluster_name): + def __init__(self, rv1, rabbit_mapping, r_hostlist, chunks_per_nnf, cluster_name): """Constructor rv1 -- RV1 Dictorary that conforms to Flux RFC 20: Resource Set Specification Version 1 """ - self._nnfs = nnfs + self._rabbit_mapping = rabbit_mapping self._r_hostlist = r_hostlist self._chunks_per_nnf = chunks_per_nnf self._rackids = 0 @@ -94,7 +89,7 @@ def _encode_rank(self, parent, rank, children, hName): for i in IDset(val): self._encode_child(vtx.get_id(), hPath, rank, str(key), i, {}) - def _encode_ssds(self, parent, nnf): + def _encode_ssds(self, parent, capacity): res_type = "ssd" for i in range(self._chunks_per_nnf): res_name = f"{res_type}{i}" @@ -108,7 +103,7 @@ def _encode_ssds(self, parent, nnf): -1, True, "GiB", - to_gibibytes(nnf["capacity"] // self._chunks_per_nnf), + to_gibibytes(capacity // self._chunks_per_nnf), {}, f"{parent.path}/{res_name}", 1, # status=1 marks the ssds as 'down' initially @@ -116,7 +111,7 @@ def _encode_ssds(self, parent, nnf): edg = ElCapResourceRelationshipV1(parent.get_id(), vtx.get_id()) self._add_and_tick_uniq_id(vtx, edg) - def _encode_rack(self, parent, nnf): + def _encode_rack(self, parent, rabbit_name, entry): res_type = "rack" res_name = f"{res_type}{self._rackids}" vtx = ElCapResourcePoolV1( @@ -130,30 +125,26 @@ def _encode_rack(self, parent, nnf): True, "", 1, - {"rabbit": nnf["metadata"]["name"], "ssdcount": str(self._chunks_per_nnf)}, + {"rabbit": rabbit_name, "ssdcount": str(self._chunks_per_nnf)}, f"{parent.path}/{res_name}", ) edg = ElCapResourceRelationshipV1(parent.get_id(), vtx.get_id()) self._add_and_tick_uniq_id(vtx, edg) - self._encode_ssds(vtx, nnf["status"]) - for node in nnf["status"]["access"].get("computes", []): + self._encode_ssds(vtx, entry["capacity"]) + for node in Hostlist(entry["hostlist"]): try: - index = self._r_hostlist.index(node["name"])[0] + index = self._r_hostlist.index(node)[0] except FileNotFoundError: pass else: - self._encode_rank( - vtx, index, self._rank_to_children[index], node["name"] - ) + self._encode_rank(vtx, index, self._rank_to_children[index], node) # if the rabbit itself is in R, add it to the rack as a compute node as well try: - index = self._r_hostlist.index(nnf["metadata"]["name"])[0] + index = self._r_hostlist.index(rabbit_name)[0] except FileNotFoundError: pass else: - self._encode_rank( - vtx, index, self._rank_to_children[index], nnf["metadata"]["name"] - ) + self._encode_rank(vtx, index, self._rank_to_children[index], rabbit_name) self._rackids += 1 def _encode(self): @@ -172,15 +163,11 @@ def _encode(self): f"/{self._cluster_name}0", ) self._add_and_tick_uniq_id(vtx) - for nnf in self._nnfs: - self._encode_rack(vtx, nnf) + for rabbit_name, entry in self._rabbit_mapping["rabbits"].items(): + self._encode_rack(vtx, rabbit_name, entry) # add nodes not in rabbit racks, making the nodes contained by 'cluster' - dws_computes = set( - compute["name"] - for nnf in self._nnfs - for compute in nnf["status"]["access"].get("computes", []) - ) - dws_computes |= set(nnf["metadata"]["name"] for nnf in self._nnfs) + dws_computes = set(self._rabbit_mapping["computes"].keys()) + dws_computes |= set(self._rabbit_mapping["rabbits"].keys()) for rank, node in enumerate(self._r_hostlist): if node not in dws_computes: self._encode_rank( @@ -221,33 +208,12 @@ def to_gibibytes(byt): return byt // (1024**3) -def encode(rv1, nnfs, r_hostlist, chunks_per_nnf, cluster_name): - graph = Coral2Graph(rv1, nnfs, r_hostlist, chunks_per_nnf, cluster_name) +def encode(rv1, rabbit_mapping, r_hostlist, chunks_per_nnf, cluster_name): + graph = Coral2Graph(rv1, rabbit_mapping, r_hostlist, chunks_per_nnf, cluster_name) rv1["scheduling"] = graph.to_JSON() return rv1 -def get_storage(): - k8s_client = k8s.config.new_client_from_config() - try: - api_instance = k8s.client.CustomObjectsApi(k8s_client) - except ApiException as rest_exception: - if rest_exception.status == 403: - raise Exception( - "You must be logged in to the K8s or OpenShift cluster to continue" - ) - raise - - group = "dataworkflowservices.github.io" - version = "v1alpha2" - plural = "storages" - try: - api_response = api_instance.list_cluster_custom_object(group, version, plural) - except ApiException as e: - print("Exception: %s\n" % e, file=sys.stderr) - return api_response - - LOGGER = logging.getLogger("flux-dws2jgf") @@ -293,6 +259,14 @@ def main(): "a resource.config table" ), ) + parser.add_argument( + "rabbitmapping", + metavar="FILE", + help=( + "Path to JSON object giving rabbit layout and capacity, as generated " + "e.g. by the 'flux rabbitmapping' script" + ), + ) args = parser.parse_args() if not args.cluster_name: args.cluster_name = "".join( @@ -313,19 +287,19 @@ def main(): "error message was {proc.stderr}" ) input_r = json.load(proc.stdout) - nnfs = [x for x in get_storage()["items"]] + with open(args.rabbitmapping, "r", encoding="utf8") as rabbitmap_fd: + rabbit_mapping = json.load(rabbitmap_fd) r_hostlist = Hostlist(input_r["execution"]["nodelist"]) - dws_computes = set( - compute["name"] - for nnf in nnfs - for compute in nnf["status"]["access"].get("computes", []) - ) + dws_computes = set(rabbit_mapping["computes"].keys()) if not args.no_validate and not dws_computes <= set(r_hostlist): raise RuntimeError( - f"Node(s) {dws_computes - set(r_hostlist)} found in DWS but not R from stdin" + f"Node(s) {dws_computes - set(r_hostlist)} found in rabbit_mapping " + "but not R from stdin" ) json.dump( - encode(input_r, nnfs, r_hostlist, args.chunks_per_nnf, args.cluster_name), + encode( + input_r, rabbit_mapping, r_hostlist, args.chunks_per_nnf, args.cluster_name + ), sys.stdout, ) From 692150677066b90f2ef85c6fd00b2d755f66d5d8 Mon Sep 17 00:00:00 2001 From: James Corbett Date: Fri, 30 Aug 2024 16:25:57 -0700 Subject: [PATCH 4/5] test: update dws2jgf tests to use rabbit mapping Problem: tests for the flux-dws2jgf script need to be updated now that the script reads from a mapping generated by flux-rabbitmapping instead of by polling kubernetes. Change the tests and add a simple test for flux-rabbitmapping. --- t/data/dws2jgf/rabbits.json | 18 ++++++++++++++++++ t/t1002-dws-workflow-obj.t | 3 ++- t/t1003-dws-nnf-watch.t | 5 +++-- t/t2000-dws2jgf.t | 16 +++++++++++----- 4 files changed, 34 insertions(+), 8 deletions(-) create mode 100644 t/data/dws2jgf/rabbits.json diff --git a/t/data/dws2jgf/rabbits.json b/t/data/dws2jgf/rabbits.json new file mode 100644 index 0000000..f66c508 --- /dev/null +++ b/t/data/dws2jgf/rabbits.json @@ -0,0 +1,18 @@ +{ + "computes": { + "compute-01": "kind-worker2", + "compute-02": "kind-worker2", + "compute-03": "kind-worker2", + "compute-04": "kind-worker3" + }, + "rabbits": { + "kind-worker2": { + "capacity": 39582418599936, + "hostlist": "compute-[01-03]" + }, + "kind-worker3": { + "capacity": 39582418599936, + "hostlist": "compute-04" + } + } +} \ No newline at end of file diff --git a/t/t1002-dws-workflow-obj.t b/t/t1002-dws-workflow-obj.t index 4488384..7d9fd46 100755 --- a/t/t1002-dws-workflow-obj.t +++ b/t/t1002-dws-workflow-obj.t @@ -87,8 +87,9 @@ test_expect_success 'job submission with valid DW string works with fluxion-rabb test_expect_success 'load fluxion with rabbits' ' flux cancel ${DWS_JOBID} && + flux python ${FLUX_SOURCE_DIR}/src/cmd/flux-rabbitmapping.py > rabbits.json && flux R encode -l | flux python ${FLUX_SOURCE_DIR}/src/cmd/flux-dws2jgf.py \ - --no-validate | jq . > R.local && + --no-validate rabbits.json | jq . > R.local && flux kvs put resource.R="$(cat R.local)" && flux module remove -f sched-fluxion-qmanager && flux module remove -f sched-fluxion-resource && diff --git a/t/t1003-dws-nnf-watch.t b/t/t1003-dws-nnf-watch.t index 3db47b3..94f350a 100755 --- a/t/t1003-dws-nnf-watch.t +++ b/t/t1003-dws-nnf-watch.t @@ -30,8 +30,9 @@ test_expect_success 'job-manager: load alloc-bypass plugin' ' test_expect_success 'load Fluxion with rabbit resource graph' ' echo $PYTHONPATH >&2 && + flux python ${FLUX_SOURCE_DIR}/src/cmd/flux-rabbitmapping.py > rabbits.json && flux R encode -l | flux python ${FLUX_SOURCE_DIR}/src/cmd/flux-dws2jgf.py \ - --no-validate | jq . > R.local && + --no-validate rabbits.json | jq . > R.local && flux kvs put resource.R="$(cat R.local)" && flux module remove -f sched-fluxion-qmanager && flux module remove -f sched-fluxion-resource && @@ -172,7 +173,7 @@ test_expect_success 'exec Storage watching script with invalid --drain-queues ar test_expect_success 'configure flux with queues' ' flux R encode -l | jq ".execution.properties.debug = \"0\"" | \ flux python ${FLUX_SOURCE_DIR}/src/cmd/flux-dws2jgf.py \ - --no-validate | jq . > R.local.queues && + --no-validate rabbits.json | jq . > R.local.queues && flux kvs put resource.R="$(cat R.local.queues)" && flux module remove -f sched-fluxion-qmanager && flux module remove -f sched-fluxion-resource && diff --git a/t/t2000-dws2jgf.t b/t/t2000-dws2jgf.t index 9ca6ef8..816da09 100755 --- a/t/t2000-dws2jgf.t +++ b/t/t2000-dws2jgf.t @@ -33,27 +33,32 @@ test_expect_success HAVE_JQ 'smoke test to ensure the storage resources are expe test $(hostname) = compute-01 ' +test_expect_success HAVE_JQ 'flux-rabbitmapping outputs expected mapping' ' + flux python ${FLUX_SOURCE_DIR}/src/cmd/flux-rabbitmapping.py -i2 > rabbits.json + test_cmp ${DATADIR}/rabbits.json rabbits.json +' + test_expect_success HAVE_JQ 'flux-dws2jgf.py outputs expected JGF for single compute node' ' flux R encode -Hcompute-01 | flux python ${CMD} --no-validate --cluster-name=ElCapitan \ - | jq . > actual-compute-01.jgf && + rabbits.json | jq . > actual-compute-01.jgf && test_cmp ${DATADIR}/expected-compute-01.jgf actual-compute-01.jgf ' test_expect_success HAVE_JQ 'flux-dws2jgf.py outputs expected JGF for multiple compute nodes' ' flux R encode -Hcompute-[01-04] -c0-4 | flux python ${CMD} --no-validate --cluster-name=ElCapitan \ - | jq . > actual-compute-01-04.jgf && + rabbits.json | jq . > actual-compute-01-04.jgf && test_cmp ${DATADIR}/expected-compute-01-04.jgf actual-compute-01-04.jgf ' test_expect_success HAVE_JQ 'flux-dws2jgf.py outputs expected JGF for compute nodes not in DWS' ' flux R encode -Hcompute-[01-04],nodws[0-5] -c0-4 | \ - flux python ${CMD} --no-validate | jq . > actual-compute-01-nodws.jgf && + flux python ${CMD} --no-validate rabbits.json | jq . > actual-compute-01-nodws.jgf && test_cmp ${DATADIR}/expected-compute-01-nodws.jgf actual-compute-01-nodws.jgf ' test_expect_success HAVE_JQ 'flux-dws2jgf.py handles properties correctly' ' cat ${DATADIR}/R-properties | \ - flux python ${CMD} --no-validate | jq . > actual-properties.jgf && + flux python ${CMD} --no-validate rabbits.json | jq . > actual-properties.jgf && test_cmp ${DATADIR}/expected-properties.jgf actual-properties.jgf ' @@ -70,7 +75,8 @@ test_expect_success HAVE_JQ 'fluxion rejects a rack/rabbit job when no rabbits a test_expect_success HAVE_JQ 'fluxion can be loaded with output of dws2jgf' ' flux run -n1 hostname && - flux R encode -l | flux python ${CMD} --no-validate --cluster-name=ElCapitan | jq . > R.local && + flux R encode -l | flux python ${CMD} --no-validate --cluster-name=ElCapitan rabbits.json \ + | jq . > R.local && flux kvs put resource.R="$(cat R.local)" && flux module list && flux module remove -f sched-fluxion-qmanager && From bcf9be2666b2e32f02d1c2c96d63570b22a3f12e Mon Sep 17 00:00:00 2001 From: James Corbett Date: Fri, 30 Aug 2024 22:30:44 -0700 Subject: [PATCH 5/5] dws: improve error message Problem: the error message for requesting too much lustre storage gives a float, but an integer would be better and more consistent with the other file system types. Provide an integer in the error message by doing integer division instead of normal float division. --- src/python/flux_k8s/directivebreakdown.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python/flux_k8s/directivebreakdown.py b/src/python/flux_k8s/directivebreakdown.py index 1df2991..207284d 100644 --- a/src/python/flux_k8s/directivebreakdown.py +++ b/src/python/flux_k8s/directivebreakdown.py @@ -46,7 +46,7 @@ def validate(self, nodecount): requested = getattr(self, attr) allowable = getattr(type(self), attr) if attr == "lustre": - requested = requested / nodecount + requested = requested // nodecount if allowable is not None and requested > allowable: raise ValueError( f"Requested a total of {requested} GiB of {attr} storage "