From ce8113362afad4c832fa4d4f876b998514c76b4c Mon Sep 17 00:00:00 2001 From: James Corbett Date: Fri, 30 Aug 2024 15:28:58 -0700 Subject: [PATCH] dws2jgf: read from JSON file instead of k8s Problem: as described in #193, JGF is too unwieldy to be stored in Ansible. On the other hand, Flux's ability to start up and run jobs cannot be dependent on the responsiveness of kubernetes, so generating JGF from kubernetes before starting Flux is not an option. Change flux-dws2jgf to read from a static JSON file generated by the flux-rabbitmapping script, instead of from kubernetes. --- src/cmd/flux-dws2jgf.py | 106 ++++++++++++++++------------------------ 1 file changed, 42 insertions(+), 64 deletions(-) diff --git a/src/cmd/flux-dws2jgf.py b/src/cmd/flux-dws2jgf.py index 21b9bbd..4e6f53f 100755 --- a/src/cmd/flux-dws2jgf.py +++ b/src/cmd/flux-dws2jgf.py @@ -3,23 +3,18 @@ import argparse import sys import json -import re import logging -import itertools import subprocess import socket import flux from flux.idset import IDset from flux.hostlist import Hostlist -from flux.idset import IDset from fluxion.resourcegraph.V1 import ( FluxionResourceGraphV1, FluxionResourcePoolV1, FluxionResourceRelationshipV1, ) -import kubernetes as k8s -from kubernetes.client.rest import ApiException class ElCapResourcePoolV1(FluxionResourcePoolV1): @@ -54,12 +49,12 @@ class Coral2Graph(FluxionResourceGraphV1): CORAL2 Graph: extend jsongraph's Graph class """ - def __init__(self, rv1, nnfs, r_hostlist, chunks_per_nnf, cluster_name): + def __init__(self, rv1, rabbit_mapping, r_hostlist, chunks_per_nnf, cluster_name): """Constructor rv1 -- RV1 Dictorary that conforms to Flux RFC 20: Resource Set Specification Version 1 """ - self._nnfs = nnfs + self._rabbit_mapping = rabbit_mapping self._r_hostlist = r_hostlist self._chunks_per_nnf = chunks_per_nnf self._rackids = 0 @@ -94,7 +89,7 @@ def _encode_rank(self, parent, rank, children, hName): for i in IDset(val): self._encode_child(vtx.get_id(), hPath, rank, str(key), i, {}) - def _encode_ssds(self, parent, nnf): + def _encode_ssds(self, parent, capacity): res_type = "ssd" for i in range(self._chunks_per_nnf): res_name = f"{res_type}{i}" @@ -108,7 +103,7 @@ def _encode_ssds(self, parent, nnf): -1, True, "GiB", - to_gibibytes(nnf["capacity"] // self._chunks_per_nnf), + to_gibibytes(capacity // self._chunks_per_nnf), {}, f"{parent.path}/{res_name}", 1, # status=1 marks the ssds as 'down' initially @@ -116,7 +111,7 @@ def _encode_ssds(self, parent, nnf): edg = ElCapResourceRelationshipV1(parent.get_id(), vtx.get_id()) self._add_and_tick_uniq_id(vtx, edg) - def _encode_rack(self, parent, nnf): + def _encode_rack(self, parent, rabbit_name, entry): res_type = "rack" res_name = f"{res_type}{self._rackids}" vtx = ElCapResourcePoolV1( @@ -130,30 +125,28 @@ def _encode_rack(self, parent, nnf): True, "", 1, - {"rabbit": nnf["metadata"]["name"], "ssdcount": str(self._chunks_per_nnf)}, + {"rabbit": rabbit_name, "ssdcount": str(self._chunks_per_nnf)}, f"{parent.path}/{res_name}", ) edg = ElCapResourceRelationshipV1(parent.get_id(), vtx.get_id()) self._add_and_tick_uniq_id(vtx, edg) - self._encode_ssds(vtx, nnf["status"]) - for node in nnf["status"]["access"].get("computes", []): + self._encode_ssds(vtx, entry["capacity"]) + for node in Hostlist(entry["hostlist"]): try: - index = self._r_hostlist.index(node["name"])[0] + index = self._r_hostlist.index(node)[0] except FileNotFoundError: pass else: self._encode_rank( - vtx, index, self._rank_to_children[index], node["name"] + vtx, index, self._rank_to_children[index], node ) # if the rabbit itself is in R, add it to the rack as a compute node as well try: - index = self._r_hostlist.index(nnf["metadata"]["name"])[0] + index = self._r_hostlist.index(rabbit_name)[0] except FileNotFoundError: pass else: - self._encode_rank( - vtx, index, self._rank_to_children[index], nnf["metadata"]["name"] - ) + self._encode_rank(vtx, index, self._rank_to_children[index], rabbit_name) self._rackids += 1 def _encode(self): @@ -172,15 +165,11 @@ def _encode(self): f"/{self._cluster_name}0", ) self._add_and_tick_uniq_id(vtx) - for nnf in self._nnfs: - self._encode_rack(vtx, nnf) + for rabbit_name, entry in self._rabbit_mapping["rabbits"].items(): + self._encode_rack(vtx, rabbit_name, entry) # add nodes not in rabbit racks, making the nodes contained by 'cluster' - dws_computes = set( - compute["name"] - for nnf in self._nnfs - for compute in nnf["status"]["access"].get("computes", []) - ) - dws_computes |= set(nnf["metadata"]["name"] for nnf in self._nnfs) + dws_computes = set(self._rabbit_mapping["computes"].keys()) + dws_computes |= set(self._rabbit_mapping["rabbits"].keys()) for rank, node in enumerate(self._r_hostlist): if node not in dws_computes: self._encode_rank( @@ -221,37 +210,16 @@ def to_gibibytes(byt): return byt // (1024**3) -def encode(rv1, nnfs, r_hostlist, chunks_per_nnf, cluster_name): - graph = Coral2Graph(rv1, nnfs, r_hostlist, chunks_per_nnf, cluster_name) +def encode(rv1, rabbit_mapping, r_hostlist, chunks_per_nnf, cluster_name): + graph = Coral2Graph(rv1, rabbit_mapping, r_hostlist, chunks_per_nnf, cluster_name) rv1["scheduling"] = graph.to_JSON() return rv1 -def get_storage(): - k8s_client = k8s.config.new_client_from_config() - try: - api_instance = k8s.client.CustomObjectsApi(k8s_client) - except ApiException as rest_exception: - if rest_exception.status == 403: - raise Exception( - "You must be logged in to the K8s or OpenShift cluster to continue" - ) - raise - - group = "dataworkflowservices.github.io" - version = "v1alpha2" - plural = "storages" - try: - api_response = api_instance.list_cluster_custom_object(group, version, plural) - except ApiException as e: - print("Exception: %s\n" % e, file=sys.stderr) - return api_response - - LOGGER = logging.getLogger("flux-dws2jgf") -@flux.util.CLIMain(LOGGER) +# @flux.util.CLIMain(LOGGER) def main(): parser = argparse.ArgumentParser( prog="flux-dws2jgf", @@ -293,6 +261,14 @@ def main(): "a resource.config table" ), ) + parser.add_argument( + "rabbitmapping", + metavar="FILE", + help=( + "Path to JSON object giving rabbit layout and capacity, as generated " + "e.g. by the 'flux rabbitmapping' script" + ), + ) args = parser.parse_args() if not args.cluster_name: args.cluster_name = "".join( @@ -302,28 +278,30 @@ def main(): if args.from_config is None: input_r = json.load(sys.stdin) else: - cp = subprocess.run( - f"flux R parse-config {args.from_config}".split(), capture_output=True + proc = subprocess.run( + f"flux R parse-config {args.from_config}".split(), + capture_output=True, + check=False, ) - if cp.returncode != 0: + if proc.returncode != 0: raise ValueError( f"Could not parse config file {args.from_config!r}, " - "error message was {cp.stderr}" + "error message was {proc.stderr}" ) - input_r = json.load(cp.stdout) - nnfs = [x for x in get_storage()["items"]] + input_r = json.load(proc.stdout) + with open(args.rabbitmapping, "r", encoding="utf8") as rabbitmap_fd: + rabbit_mapping = json.load(rabbitmap_fd) r_hostlist = Hostlist(input_r["execution"]["nodelist"]) - dws_computes = set( - compute["name"] - for nnf in nnfs - for compute in nnf["status"]["access"].get("computes", []) - ) + dws_computes = set(rabbit_mapping["computes"].keys()) if not args.no_validate and not dws_computes <= set(r_hostlist): raise RuntimeError( - f"Node(s) {dws_computes - set(r_hostlist)} found in DWS but not R from stdin" + f"Node(s) {dws_computes - set(r_hostlist)} found in rabbit_mapping " + "but not R from stdin" ) json.dump( - encode(input_r, nnfs, r_hostlist, args.chunks_per_nnf, args.cluster_name), + encode( + input_r, rabbit_mapping, r_hostlist, args.chunks_per_nnf, args.cluster_name + ), sys.stdout, )