diff --git a/src/cmd/flux-dws2jgf.py b/src/cmd/flux-dws2jgf.py index 9f6e818..1c5f785 100755 --- a/src/cmd/flux-dws2jgf.py +++ b/src/cmd/flux-dws2jgf.py @@ -3,23 +3,18 @@ import argparse import sys import json -import re import logging -import itertools import subprocess import socket import flux from flux.idset import IDset from flux.hostlist import Hostlist -from flux.idset import IDset from fluxion.resourcegraph.V1 import ( FluxionResourceGraphV1, FluxionResourcePoolV1, FluxionResourceRelationshipV1, ) -import kubernetes as k8s -from kubernetes.client.rest import ApiException class ElCapResourcePoolV1(FluxionResourcePoolV1): @@ -54,12 +49,12 @@ class Coral2Graph(FluxionResourceGraphV1): CORAL2 Graph: extend jsongraph's Graph class """ - def __init__(self, rv1, nnfs, r_hostlist, chunks_per_nnf, cluster_name): + def __init__(self, rv1, rabbit_mapping, r_hostlist, chunks_per_nnf, cluster_name): """Constructor rv1 -- RV1 Dictorary that conforms to Flux RFC 20: Resource Set Specification Version 1 """ - self._nnfs = nnfs + self._rabbit_mapping = rabbit_mapping self._r_hostlist = r_hostlist self._chunks_per_nnf = chunks_per_nnf self._rackids = 0 @@ -94,7 +89,7 @@ def _encode_rank(self, parent, rank, children, hName): for i in IDset(val): self._encode_child(vtx.get_id(), hPath, rank, str(key), i, {}) - def _encode_ssds(self, parent, nnf): + def _encode_ssds(self, parent, capacity): res_type = "ssd" for i in range(self._chunks_per_nnf): res_name = f"{res_type}{i}" @@ -108,7 +103,7 @@ def _encode_ssds(self, parent, nnf): -1, True, "GiB", - to_gibibytes(nnf["capacity"] // self._chunks_per_nnf), + to_gibibytes(capacity // self._chunks_per_nnf), {}, f"{parent.path}/{res_name}", 1, # status=1 marks the ssds as 'down' initially @@ -116,7 +111,7 @@ def _encode_ssds(self, parent, nnf): edg = ElCapResourceRelationshipV1(parent.get_id(), vtx.get_id()) self._add_and_tick_uniq_id(vtx, edg) - def _encode_rack(self, parent, nnf): + def _encode_rack(self, parent, rabbit_name, entry): res_type = "rack" res_name = f"{res_type}{self._rackids}" vtx = ElCapResourcePoolV1( @@ -130,30 +125,26 @@ def _encode_rack(self, parent, nnf): True, "", 1, - {"rabbit": nnf["metadata"]["name"], "ssdcount": str(self._chunks_per_nnf)}, + {"rabbit": rabbit_name, "ssdcount": str(self._chunks_per_nnf)}, f"{parent.path}/{res_name}", ) edg = ElCapResourceRelationshipV1(parent.get_id(), vtx.get_id()) self._add_and_tick_uniq_id(vtx, edg) - self._encode_ssds(vtx, nnf["status"]) - for node in nnf["status"]["access"].get("computes", []): + self._encode_ssds(vtx, entry["capacity"]) + for node in Hostlist(entry["hostlist"]): try: - index = self._r_hostlist.index(node["name"])[0] + index = self._r_hostlist.index(node)[0] except FileNotFoundError: pass else: - self._encode_rank( - vtx, index, self._rank_to_children[index], node["name"] - ) + self._encode_rank(vtx, index, self._rank_to_children[index], node) # if the rabbit itself is in R, add it to the rack as a compute node as well try: - index = self._r_hostlist.index(nnf["metadata"]["name"])[0] + index = self._r_hostlist.index(rabbit_name)[0] except FileNotFoundError: pass else: - self._encode_rank( - vtx, index, self._rank_to_children[index], nnf["metadata"]["name"] - ) + self._encode_rank(vtx, index, self._rank_to_children[index], rabbit_name) self._rackids += 1 def _encode(self): @@ -172,15 +163,11 @@ def _encode(self): f"/{self._cluster_name}0", ) self._add_and_tick_uniq_id(vtx) - for nnf in self._nnfs: - self._encode_rack(vtx, nnf) + for rabbit_name, entry in self._rabbit_mapping["rabbits"].items(): + self._encode_rack(vtx, rabbit_name, entry) # add nodes not in rabbit racks, making the nodes contained by 'cluster' - dws_computes = set( - compute["name"] - for nnf in self._nnfs - for compute in nnf["status"]["access"].get("computes", []) - ) - dws_computes |= set(nnf["metadata"]["name"] for nnf in self._nnfs) + dws_computes = set(self._rabbit_mapping["computes"].keys()) + dws_computes |= set(self._rabbit_mapping["rabbits"].keys()) for rank, node in enumerate(self._r_hostlist): if node not in dws_computes: self._encode_rank( @@ -221,33 +208,12 @@ def to_gibibytes(byt): return byt // (1024**3) -def encode(rv1, nnfs, r_hostlist, chunks_per_nnf, cluster_name): - graph = Coral2Graph(rv1, nnfs, r_hostlist, chunks_per_nnf, cluster_name) +def encode(rv1, rabbit_mapping, r_hostlist, chunks_per_nnf, cluster_name): + graph = Coral2Graph(rv1, rabbit_mapping, r_hostlist, chunks_per_nnf, cluster_name) rv1["scheduling"] = graph.to_JSON() return rv1 -def get_storage(): - k8s_client = k8s.config.new_client_from_config() - try: - api_instance = k8s.client.CustomObjectsApi(k8s_client) - except ApiException as rest_exception: - if rest_exception.status == 403: - raise Exception( - "You must be logged in to the K8s or OpenShift cluster to continue" - ) - raise - - group = "dataworkflowservices.github.io" - version = "v1alpha2" - plural = "storages" - try: - api_response = api_instance.list_cluster_custom_object(group, version, plural) - except ApiException as e: - print("Exception: %s\n" % e, file=sys.stderr) - return api_response - - LOGGER = logging.getLogger("flux-dws2jgf") @@ -293,6 +259,14 @@ def main(): "a resource.config table" ), ) + parser.add_argument( + "rabbitmapping", + metavar="FILE", + help=( + "Path to JSON object giving rabbit layout and capacity, as generated " + "e.g. by the 'flux rabbitmapping' script" + ), + ) args = parser.parse_args() if not args.cluster_name: args.cluster_name = "".join( @@ -313,19 +287,19 @@ def main(): "error message was {proc.stderr}" ) input_r = json.load(proc.stdout) - nnfs = [x for x in get_storage()["items"]] + with open(args.rabbitmapping, "r", encoding="utf8") as rabbitmap_fd: + rabbit_mapping = json.load(rabbitmap_fd) r_hostlist = Hostlist(input_r["execution"]["nodelist"]) - dws_computes = set( - compute["name"] - for nnf in nnfs - for compute in nnf["status"]["access"].get("computes", []) - ) + dws_computes = set(rabbit_mapping["computes"].keys()) if not args.no_validate and not dws_computes <= set(r_hostlist): raise RuntimeError( - f"Node(s) {dws_computes - set(r_hostlist)} found in DWS but not R from stdin" + f"Node(s) {dws_computes - set(r_hostlist)} found in rabbit_mapping " + "but not R from stdin" ) json.dump( - encode(input_r, nnfs, r_hostlist, args.chunks_per_nnf, args.cluster_name), + encode( + input_r, rabbit_mapping, r_hostlist, args.chunks_per_nnf, args.cluster_name + ), sys.stdout, )