Skip to content

Commit

Permalink
Merge pull request #204 from jameshcorbett/rabbit-mapping
Browse files Browse the repository at this point in the history
Dws: use static rabbit layout mapping for JGF
  • Loading branch information
mergify[bot] authored Sep 4, 2024
2 parents 6e101f4 + bcf9be2 commit 5a3d0a1
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 72 deletions.
3 changes: 2 additions & 1 deletion src/cmd/Makefile.am
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
dist_fluxcmd_SCRIPTS = \
flux-dws2jgf.py
flux-dws2jgf.py \
flux-rabbitmapping.py
123 changes: 61 additions & 62 deletions src/cmd/flux-dws2jgf.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,18 @@
import argparse
import sys
import json
import re
import logging
import itertools
import subprocess
import socket

import flux
from flux.idset import IDset
from flux.hostlist import Hostlist
from flux.idset import IDset
from fluxion.resourcegraph.V1 import (
FluxionResourceGraphV1,
FluxionResourcePoolV1,
FluxionResourceRelationshipV1,
)
import kubernetes as k8s
from kubernetes.client.rest import ApiException


class ElCapResourcePoolV1(FluxionResourcePoolV1):
Expand Down Expand Up @@ -53,12 +49,12 @@ class Coral2Graph(FluxionResourceGraphV1):
CORAL2 Graph: extend jsongraph's Graph class
"""

def __init__(self, rv1, nnfs, r_hostlist, chunks_per_nnf, cluster_name):
def __init__(self, rv1, rabbit_mapping, r_hostlist, chunks_per_nnf, cluster_name):
"""Constructor
rv1 -- RV1 Dictorary that conforms to Flux RFC 20:
Resource Set Specification Version 1
"""
self._nnfs = nnfs
self._rabbit_mapping = rabbit_mapping
self._r_hostlist = r_hostlist
self._chunks_per_nnf = chunks_per_nnf
self._rackids = 0
Expand Down Expand Up @@ -93,7 +89,7 @@ def _encode_rank(self, parent, rank, children, hName):
for i in IDset(val):
self._encode_child(vtx.get_id(), hPath, rank, str(key), i, {})

def _encode_ssds(self, parent, nnf):
def _encode_ssds(self, parent, capacity):
res_type = "ssd"
for i in range(self._chunks_per_nnf):
res_name = f"{res_type}{i}"
Expand All @@ -107,15 +103,15 @@ def _encode_ssds(self, parent, nnf):
-1,
True,
"GiB",
to_gibibytes(nnf["capacity"] // self._chunks_per_nnf),
to_gibibytes(capacity // self._chunks_per_nnf),
{},
f"{parent.path}/{res_name}",
1, # status=1 marks the ssds as 'down' initially
)
edg = ElCapResourceRelationshipV1(parent.get_id(), vtx.get_id())
self._add_and_tick_uniq_id(vtx, edg)

def _encode_rack(self, parent, nnf):
def _encode_rack(self, parent, rabbit_name, entry):
res_type = "rack"
res_name = f"{res_type}{self._rackids}"
vtx = ElCapResourcePoolV1(
Expand All @@ -129,30 +125,26 @@ def _encode_rack(self, parent, nnf):
True,
"",
1,
{"rabbit": nnf["metadata"]["name"], "ssdcount": str(self._chunks_per_nnf)},
{"rabbit": rabbit_name, "ssdcount": str(self._chunks_per_nnf)},
f"{parent.path}/{res_name}",
)
edg = ElCapResourceRelationshipV1(parent.get_id(), vtx.get_id())
self._add_and_tick_uniq_id(vtx, edg)
self._encode_ssds(vtx, nnf["status"])
for node in nnf["status"]["access"].get("computes", []):
self._encode_ssds(vtx, entry["capacity"])
for node in Hostlist(entry["hostlist"]):
try:
index = self._r_hostlist.index(node["name"])[0]
index = self._r_hostlist.index(node)[0]
except FileNotFoundError:
pass
else:
self._encode_rank(
vtx, index, self._rank_to_children[index], node["name"]
)
self._encode_rank(vtx, index, self._rank_to_children[index], node)
# if the rabbit itself is in R, add it to the rack as a compute node as well
try:
index = self._r_hostlist.index(nnf["metadata"]["name"])[0]
index = self._r_hostlist.index(rabbit_name)[0]
except FileNotFoundError:
pass
else:
self._encode_rank(
vtx, index, self._rank_to_children[index], nnf["metadata"]["name"]
)
self._encode_rank(vtx, index, self._rank_to_children[index], rabbit_name)
self._rackids += 1

def _encode(self):
Expand All @@ -171,15 +163,11 @@ def _encode(self):
f"/{self._cluster_name}0",
)
self._add_and_tick_uniq_id(vtx)
for nnf in self._nnfs:
self._encode_rack(vtx, nnf)
for rabbit_name, entry in self._rabbit_mapping["rabbits"].items():
self._encode_rack(vtx, rabbit_name, entry)
# add nodes not in rabbit racks, making the nodes contained by 'cluster'
dws_computes = set(
compute["name"]
for nnf in self._nnfs
for compute in nnf["status"]["access"].get("computes", [])
)
dws_computes |= set(nnf["metadata"]["name"] for nnf in self._nnfs)
dws_computes = set(self._rabbit_mapping["computes"].keys())
dws_computes |= set(self._rabbit_mapping["rabbits"].keys())
for rank, node in enumerate(self._r_hostlist):
if node not in dws_computes:
self._encode_rank(
Expand Down Expand Up @@ -220,33 +208,12 @@ def to_gibibytes(byt):
return byt // (1024**3)


def encode(rv1, nnfs, r_hostlist, chunks_per_nnf, cluster_name):
graph = Coral2Graph(rv1, nnfs, r_hostlist, chunks_per_nnf, cluster_name)
def encode(rv1, rabbit_mapping, r_hostlist, chunks_per_nnf, cluster_name):
graph = Coral2Graph(rv1, rabbit_mapping, r_hostlist, chunks_per_nnf, cluster_name)
rv1["scheduling"] = graph.to_JSON()
return rv1


def get_storage():
k8s_client = k8s.config.new_client_from_config()
try:
api_instance = k8s.client.CustomObjectsApi(k8s_client)
except ApiException as rest_exception:
if rest_exception.status == 403:
raise Exception(
"You must be logged in to the K8s or OpenShift cluster to continue"
)
raise

group = "dataworkflowservices.github.io"
version = "v1alpha2"
plural = "storages"
try:
api_response = api_instance.list_cluster_custom_object(group, version, plural)
except ApiException as e:
print("Exception: %s\n" % e, file=sys.stderr)
return api_response


LOGGER = logging.getLogger("flux-dws2jgf")


Expand All @@ -255,7 +222,10 @@ def main():
parser = argparse.ArgumentParser(
prog="flux-dws2jgf",
formatter_class=flux.util.help_formatter(),
description="Print JGF representation of Rabbit nodes",
description=(
"Print JGF representation of Rabbit nodes. Reads R from stdin"
"or a config file passed with the --from-config option."
),
)
parser.add_argument(
"--no-validate",
Expand All @@ -281,26 +251,55 @@ def main():
"If unspecified, use the hostname stripped of numerics."
),
)
parser.add_argument(
"--from-config",
metavar="FILE",
help=(
"Generate JGF based on a Flux config TOML file containing "
"a resource.config table"
),
)
parser.add_argument(
"rabbitmapping",
metavar="FILE",
help=(
"Path to JSON object giving rabbit layout and capacity, as generated "
"e.g. by the 'flux rabbitmapping' script"
),
)
args = parser.parse_args()
if not args.cluster_name:
args.cluster_name = "".join(
i for i in socket.gethostname() if not i.isdigit()
).rstrip("-")

input_r = json.load(sys.stdin)
nnfs = [x for x in get_storage()["items"]]
if args.from_config is None:
input_r = json.load(sys.stdin)
else:
proc = subprocess.run(
f"flux R parse-config {args.from_config}".split(),
capture_output=True,
check=False,
)
if proc.returncode != 0:
raise ValueError(
f"Could not parse config file {args.from_config!r}, "
"error message was {proc.stderr}"
)
input_r = json.load(proc.stdout)
with open(args.rabbitmapping, "r", encoding="utf8") as rabbitmap_fd:
rabbit_mapping = json.load(rabbitmap_fd)
r_hostlist = Hostlist(input_r["execution"]["nodelist"])
dws_computes = set(
compute["name"]
for nnf in nnfs
for compute in nnf["status"]["access"].get("computes", [])
)
dws_computes = set(rabbit_mapping["computes"].keys())
if not args.no_validate and not dws_computes <= set(r_hostlist):
raise RuntimeError(
f"Node(s) {dws_computes - set(r_hostlist)} found in DWS but not R from stdin"
f"Node(s) {dws_computes - set(r_hostlist)} found in rabbit_mapping "
"but not R from stdin"
)
json.dump(
encode(input_r, nnfs, r_hostlist, args.chunks_per_nnf, args.cluster_name),
encode(
input_r, rabbit_mapping, r_hostlist, args.chunks_per_nnf, args.cluster_name
),
sys.stdout,
)

Expand Down
72 changes: 72 additions & 0 deletions src/cmd/flux-rabbitmapping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/usr/bin/env python3

import argparse
import sys
import json
import subprocess

import flux
from flux.hostlist import Hostlist

import kubernetes as k8s
from kubernetes.client.rest import ApiException


def get_storage(config_file):
k8s_client = k8s.config.new_client_from_config(config_file=config_file)
try:
api_instance = k8s.client.CustomObjectsApi(k8s_client)
except ApiException as rest_exception:
if rest_exception.status == 403:
raise Exception(
"You must be logged in to the K8s or OpenShift cluster to continue"
)
raise

group = "dataworkflowservices.github.io"
version = "v1alpha2"
plural = "storages"
return api_instance.list_cluster_custom_object(group, version, plural)


def main():
parser = argparse.ArgumentParser(
formatter_class=flux.util.help_formatter(),
description=("Create a mapping between compute nodes and rabbit nodes"),
)
parser.add_argument(
"--kubeconfig",
"-k",
default=None,
metavar="FILE",
help="Path to kubeconfig file to use",
)
parser.add_argument(
"--indent",
"-i",
default=None,
type=int,
metavar="N",
help="Number of spaces to indent output JSON document",
)
parser.add_argument(
"--nosort",
action="store_false",
help="Do not sort keys in output JSON document",
)
args = parser.parse_args()
rabbit_mapping = {"computes": {}, "rabbits": {}}
for nnf in get_storage(args.kubeconfig)["items"]:
hlist = Hostlist()
nnf_name = nnf["metadata"]["name"]
for compute in nnf["status"]["access"].get("computes", []):
hlist.append(compute["name"])
rabbit_mapping["computes"][compute["name"]] = nnf_name
rabbit_mapping["rabbits"][nnf_name] = {}
rabbit_mapping["rabbits"][nnf_name]["hostlist"] = hlist.uniq().encode()
rabbit_mapping["rabbits"][nnf_name]["capacity"] = nnf["status"]["capacity"]
json.dump(rabbit_mapping, sys.stdout, indent=args.indent, sort_keys=args.nosort)


if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion src/python/flux_k8s/directivebreakdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def validate(self, nodecount):
requested = getattr(self, attr)
allowable = getattr(type(self), attr)
if attr == "lustre":
requested = requested / nodecount
requested = requested // nodecount
if allowable is not None and requested > allowable:
raise ValueError(
f"Requested a total of {requested} GiB of {attr} storage "
Expand Down
18 changes: 18 additions & 0 deletions t/data/dws2jgf/rabbits.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"computes": {
"compute-01": "kind-worker2",
"compute-02": "kind-worker2",
"compute-03": "kind-worker2",
"compute-04": "kind-worker3"
},
"rabbits": {
"kind-worker2": {
"capacity": 39582418599936,
"hostlist": "compute-[01-03]"
},
"kind-worker3": {
"capacity": 39582418599936,
"hostlist": "compute-04"
}
}
}
3 changes: 2 additions & 1 deletion t/t1002-dws-workflow-obj.t
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ test_expect_success 'job submission with valid DW string works with fluxion-rabb

test_expect_success 'load fluxion with rabbits' '
flux cancel ${DWS_JOBID} &&
flux python ${FLUX_SOURCE_DIR}/src/cmd/flux-rabbitmapping.py > rabbits.json &&
flux R encode -l | flux python ${FLUX_SOURCE_DIR}/src/cmd/flux-dws2jgf.py \
--no-validate | jq . > R.local &&
--no-validate rabbits.json | jq . > R.local &&
flux kvs put resource.R="$(cat R.local)" &&
flux module remove -f sched-fluxion-qmanager &&
flux module remove -f sched-fluxion-resource &&
Expand Down
5 changes: 3 additions & 2 deletions t/t1003-dws-nnf-watch.t
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ test_expect_success 'job-manager: load alloc-bypass plugin' '

test_expect_success 'load Fluxion with rabbit resource graph' '
echo $PYTHONPATH >&2 &&
flux python ${FLUX_SOURCE_DIR}/src/cmd/flux-rabbitmapping.py > rabbits.json &&
flux R encode -l | flux python ${FLUX_SOURCE_DIR}/src/cmd/flux-dws2jgf.py \
--no-validate | jq . > R.local &&
--no-validate rabbits.json | jq . > R.local &&
flux kvs put resource.R="$(cat R.local)" &&
flux module remove -f sched-fluxion-qmanager &&
flux module remove -f sched-fluxion-resource &&
Expand Down Expand Up @@ -172,7 +173,7 @@ test_expect_success 'exec Storage watching script with invalid --drain-queues ar
test_expect_success 'configure flux with queues' '
flux R encode -l | jq ".execution.properties.debug = \"0\"" | \
flux python ${FLUX_SOURCE_DIR}/src/cmd/flux-dws2jgf.py \
--no-validate | jq . > R.local.queues &&
--no-validate rabbits.json | jq . > R.local.queues &&
flux kvs put resource.R="$(cat R.local.queues)" &&
flux module remove -f sched-fluxion-qmanager &&
flux module remove -f sched-fluxion-resource &&
Expand Down
Loading

0 comments on commit 5a3d0a1

Please sign in to comment.