Skip to content

Commit

Permalink
dws2jgf: read from JSON file instead of k8s
Browse files Browse the repository at this point in the history
Problem: as described in #193, JGF is too unwieldy to be stored in
Ansible. On the other hand, Flux's ability to start up and run
jobs cannot be dependent on the responsiveness of kubernetes, so
generating JGF from kubernetes before starting Flux is not an
option.

Change flux-dws2jgf to read from a static JSON file generated by
the flux-rabbitmapping script, instead of from kubernetes.
  • Loading branch information
jameshcorbett committed Aug 31, 2024
1 parent 68a06db commit beef4ba
Showing 1 changed file with 34 additions and 60 deletions.
94 changes: 34 additions & 60 deletions src/cmd/flux-dws2jgf.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,18 @@
import argparse
import sys
import json
import re
import logging
import itertools
import subprocess
import socket

import flux
from flux.idset import IDset
from flux.hostlist import Hostlist
from flux.idset import IDset
from fluxion.resourcegraph.V1 import (
FluxionResourceGraphV1,
FluxionResourcePoolV1,
FluxionResourceRelationshipV1,
)
import kubernetes as k8s
from kubernetes.client.rest import ApiException


class ElCapResourcePoolV1(FluxionResourcePoolV1):
Expand Down Expand Up @@ -54,12 +49,12 @@ class Coral2Graph(FluxionResourceGraphV1):
CORAL2 Graph: extend jsongraph's Graph class
"""

def __init__(self, rv1, nnfs, r_hostlist, chunks_per_nnf, cluster_name):
def __init__(self, rv1, rabbit_mapping, r_hostlist, chunks_per_nnf, cluster_name):
"""Constructor
rv1 -- RV1 Dictorary that conforms to Flux RFC 20:
Resource Set Specification Version 1
"""
self._nnfs = nnfs
self._rabbit_mapping = rabbit_mapping
self._r_hostlist = r_hostlist
self._chunks_per_nnf = chunks_per_nnf
self._rackids = 0
Expand Down Expand Up @@ -94,7 +89,7 @@ def _encode_rank(self, parent, rank, children, hName):
for i in IDset(val):
self._encode_child(vtx.get_id(), hPath, rank, str(key), i, {})

def _encode_ssds(self, parent, nnf):
def _encode_ssds(self, parent, capacity):
res_type = "ssd"
for i in range(self._chunks_per_nnf):
res_name = f"{res_type}{i}"
Expand All @@ -108,15 +103,15 @@ def _encode_ssds(self, parent, nnf):
-1,
True,
"GiB",
to_gibibytes(nnf["capacity"] // self._chunks_per_nnf),
to_gibibytes(capacity // self._chunks_per_nnf),
{},
f"{parent.path}/{res_name}",
1, # status=1 marks the ssds as 'down' initially
)
edg = ElCapResourceRelationshipV1(parent.get_id(), vtx.get_id())
self._add_and_tick_uniq_id(vtx, edg)

def _encode_rack(self, parent, nnf):
def _encode_rack(self, parent, rabbit_name, entry):
res_type = "rack"
res_name = f"{res_type}{self._rackids}"
vtx = ElCapResourcePoolV1(
Expand All @@ -130,30 +125,26 @@ def _encode_rack(self, parent, nnf):
True,
"",
1,
{"rabbit": nnf["metadata"]["name"], "ssdcount": str(self._chunks_per_nnf)},
{"rabbit": rabbit_name, "ssdcount": str(self._chunks_per_nnf)},
f"{parent.path}/{res_name}",
)
edg = ElCapResourceRelationshipV1(parent.get_id(), vtx.get_id())
self._add_and_tick_uniq_id(vtx, edg)
self._encode_ssds(vtx, nnf["status"])
for node in nnf["status"]["access"].get("computes", []):
self._encode_ssds(vtx, entry["capacity"])
for node in Hostlist(entry["hostlist"]):
try:
index = self._r_hostlist.index(node["name"])[0]
index = self._r_hostlist.index(node)[0]
except FileNotFoundError:
pass
else:
self._encode_rank(
vtx, index, self._rank_to_children[index], node["name"]
)
self._encode_rank(vtx, index, self._rank_to_children[index], node)
# if the rabbit itself is in R, add it to the rack as a compute node as well
try:
index = self._r_hostlist.index(nnf["metadata"]["name"])[0]
index = self._r_hostlist.index(rabbit_name)[0]
except FileNotFoundError:
pass
else:
self._encode_rank(
vtx, index, self._rank_to_children[index], nnf["metadata"]["name"]
)
self._encode_rank(vtx, index, self._rank_to_children[index], rabbit_name)
self._rackids += 1

def _encode(self):
Expand All @@ -172,15 +163,11 @@ def _encode(self):
f"/{self._cluster_name}0",
)
self._add_and_tick_uniq_id(vtx)
for nnf in self._nnfs:
self._encode_rack(vtx, nnf)
for rabbit_name, entry in self._rabbit_mapping["rabbits"].items():
self._encode_rack(vtx, rabbit_name, entry)
# add nodes not in rabbit racks, making the nodes contained by 'cluster'
dws_computes = set(
compute["name"]
for nnf in self._nnfs
for compute in nnf["status"]["access"].get("computes", [])
)
dws_computes |= set(nnf["metadata"]["name"] for nnf in self._nnfs)
dws_computes = set(self._rabbit_mapping["computes"].keys())
dws_computes |= set(self._rabbit_mapping["rabbits"].keys())
for rank, node in enumerate(self._r_hostlist):
if node not in dws_computes:
self._encode_rank(
Expand Down Expand Up @@ -221,33 +208,12 @@ def to_gibibytes(byt):
return byt // (1024**3)


def encode(rv1, nnfs, r_hostlist, chunks_per_nnf, cluster_name):
graph = Coral2Graph(rv1, nnfs, r_hostlist, chunks_per_nnf, cluster_name)
def encode(rv1, rabbit_mapping, r_hostlist, chunks_per_nnf, cluster_name):
graph = Coral2Graph(rv1, rabbit_mapping, r_hostlist, chunks_per_nnf, cluster_name)
rv1["scheduling"] = graph.to_JSON()
return rv1


def get_storage():
k8s_client = k8s.config.new_client_from_config()
try:
api_instance = k8s.client.CustomObjectsApi(k8s_client)
except ApiException as rest_exception:
if rest_exception.status == 403:
raise Exception(
"You must be logged in to the K8s or OpenShift cluster to continue"
)
raise

group = "dataworkflowservices.github.io"
version = "v1alpha2"
plural = "storages"
try:
api_response = api_instance.list_cluster_custom_object(group, version, plural)
except ApiException as e:
print("Exception: %s\n" % e, file=sys.stderr)
return api_response


LOGGER = logging.getLogger("flux-dws2jgf")


Expand Down Expand Up @@ -293,6 +259,14 @@ def main():
"a resource.config table"
),
)
parser.add_argument(
"rabbitmapping",
metavar="FILE",
help=(
"Path to JSON object giving rabbit layout and capacity, as generated "
"e.g. by the 'flux rabbitmapping' script"
),
)
args = parser.parse_args()
if not args.cluster_name:
args.cluster_name = "".join(
Expand All @@ -313,19 +287,19 @@ def main():
"error message was {proc.stderr}"
)
input_r = json.load(proc.stdout)
nnfs = [x for x in get_storage()["items"]]
with open(args.rabbitmapping, "r", encoding="utf8") as rabbitmap_fd:
rabbit_mapping = json.load(rabbitmap_fd)
r_hostlist = Hostlist(input_r["execution"]["nodelist"])
dws_computes = set(
compute["name"]
for nnf in nnfs
for compute in nnf["status"]["access"].get("computes", [])
)
dws_computes = set(rabbit_mapping["computes"].keys())
if not args.no_validate and not dws_computes <= set(r_hostlist):
raise RuntimeError(
f"Node(s) {dws_computes - set(r_hostlist)} found in DWS but not R from stdin"
f"Node(s) {dws_computes - set(r_hostlist)} found in rabbit_mapping "
"but not R from stdin"
)
json.dump(
encode(input_r, nnfs, r_hostlist, args.chunks_per_nnf, args.cluster_name),
encode(
input_r, rabbit_mapping, r_hostlist, args.chunks_per_nnf, args.cluster_name
),
sys.stdout,
)

Expand Down

0 comments on commit beef4ba

Please sign in to comment.