Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dws: use static rabbit layout mapping for JGF #204

Merged
merged 5 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/cmd/Makefile.am
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
dist_fluxcmd_SCRIPTS = \
flux-dws2jgf.py
flux-dws2jgf.py \
flux-rabbitmapping.py
123 changes: 61 additions & 62 deletions src/cmd/flux-dws2jgf.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,18 @@
import argparse
import sys
import json
import re
import logging
import itertools
import subprocess
import socket

import flux
from flux.idset import IDset
from flux.hostlist import Hostlist
from flux.idset import IDset
from fluxion.resourcegraph.V1 import (
FluxionResourceGraphV1,
FluxionResourcePoolV1,
FluxionResourceRelationshipV1,
)
import kubernetes as k8s
from kubernetes.client.rest import ApiException


class ElCapResourcePoolV1(FluxionResourcePoolV1):
Expand Down Expand Up @@ -53,12 +49,12 @@ class Coral2Graph(FluxionResourceGraphV1):
CORAL2 Graph: extend jsongraph's Graph class
"""

def __init__(self, rv1, nnfs, r_hostlist, chunks_per_nnf, cluster_name):
def __init__(self, rv1, rabbit_mapping, r_hostlist, chunks_per_nnf, cluster_name):
"""Constructor
rv1 -- RV1 Dictorary that conforms to Flux RFC 20:
Resource Set Specification Version 1
"""
self._nnfs = nnfs
self._rabbit_mapping = rabbit_mapping
self._r_hostlist = r_hostlist
self._chunks_per_nnf = chunks_per_nnf
self._rackids = 0
Expand Down Expand Up @@ -93,7 +89,7 @@ def _encode_rank(self, parent, rank, children, hName):
for i in IDset(val):
self._encode_child(vtx.get_id(), hPath, rank, str(key), i, {})

def _encode_ssds(self, parent, nnf):
def _encode_ssds(self, parent, capacity):
res_type = "ssd"
for i in range(self._chunks_per_nnf):
res_name = f"{res_type}{i}"
Expand All @@ -107,15 +103,15 @@ def _encode_ssds(self, parent, nnf):
-1,
True,
"GiB",
to_gibibytes(nnf["capacity"] // self._chunks_per_nnf),
to_gibibytes(capacity // self._chunks_per_nnf),
{},
f"{parent.path}/{res_name}",
1, # status=1 marks the ssds as 'down' initially
)
edg = ElCapResourceRelationshipV1(parent.get_id(), vtx.get_id())
self._add_and_tick_uniq_id(vtx, edg)

def _encode_rack(self, parent, nnf):
def _encode_rack(self, parent, rabbit_name, entry):
res_type = "rack"
res_name = f"{res_type}{self._rackids}"
vtx = ElCapResourcePoolV1(
Expand All @@ -129,30 +125,26 @@ def _encode_rack(self, parent, nnf):
True,
"",
1,
{"rabbit": nnf["metadata"]["name"], "ssdcount": str(self._chunks_per_nnf)},
{"rabbit": rabbit_name, "ssdcount": str(self._chunks_per_nnf)},
f"{parent.path}/{res_name}",
)
edg = ElCapResourceRelationshipV1(parent.get_id(), vtx.get_id())
self._add_and_tick_uniq_id(vtx, edg)
self._encode_ssds(vtx, nnf["status"])
for node in nnf["status"]["access"].get("computes", []):
self._encode_ssds(vtx, entry["capacity"])
for node in Hostlist(entry["hostlist"]):
try:
index = self._r_hostlist.index(node["name"])[0]
index = self._r_hostlist.index(node)[0]
except FileNotFoundError:
pass
else:
self._encode_rank(
vtx, index, self._rank_to_children[index], node["name"]
)
self._encode_rank(vtx, index, self._rank_to_children[index], node)
# if the rabbit itself is in R, add it to the rack as a compute node as well
try:
index = self._r_hostlist.index(nnf["metadata"]["name"])[0]
index = self._r_hostlist.index(rabbit_name)[0]
except FileNotFoundError:
pass
else:
self._encode_rank(
vtx, index, self._rank_to_children[index], nnf["metadata"]["name"]
)
self._encode_rank(vtx, index, self._rank_to_children[index], rabbit_name)
self._rackids += 1

def _encode(self):
Expand All @@ -171,15 +163,11 @@ def _encode(self):
f"/{self._cluster_name}0",
)
self._add_and_tick_uniq_id(vtx)
for nnf in self._nnfs:
self._encode_rack(vtx, nnf)
for rabbit_name, entry in self._rabbit_mapping["rabbits"].items():
self._encode_rack(vtx, rabbit_name, entry)
# add nodes not in rabbit racks, making the nodes contained by 'cluster'
dws_computes = set(
compute["name"]
for nnf in self._nnfs
for compute in nnf["status"]["access"].get("computes", [])
)
dws_computes |= set(nnf["metadata"]["name"] for nnf in self._nnfs)
dws_computes = set(self._rabbit_mapping["computes"].keys())
dws_computes |= set(self._rabbit_mapping["rabbits"].keys())
for rank, node in enumerate(self._r_hostlist):
if node not in dws_computes:
self._encode_rank(
Expand Down Expand Up @@ -220,33 +208,12 @@ def to_gibibytes(byt):
return byt // (1024**3)


def encode(rv1, nnfs, r_hostlist, chunks_per_nnf, cluster_name):
graph = Coral2Graph(rv1, nnfs, r_hostlist, chunks_per_nnf, cluster_name)
def encode(rv1, rabbit_mapping, r_hostlist, chunks_per_nnf, cluster_name):
graph = Coral2Graph(rv1, rabbit_mapping, r_hostlist, chunks_per_nnf, cluster_name)
rv1["scheduling"] = graph.to_JSON()
return rv1


def get_storage():
k8s_client = k8s.config.new_client_from_config()
try:
api_instance = k8s.client.CustomObjectsApi(k8s_client)
except ApiException as rest_exception:
if rest_exception.status == 403:
raise Exception(
"You must be logged in to the K8s or OpenShift cluster to continue"
)
raise

group = "dataworkflowservices.github.io"
version = "v1alpha2"
plural = "storages"
try:
api_response = api_instance.list_cluster_custom_object(group, version, plural)
except ApiException as e:
print("Exception: %s\n" % e, file=sys.stderr)
return api_response


LOGGER = logging.getLogger("flux-dws2jgf")


Expand All @@ -255,7 +222,10 @@ def main():
parser = argparse.ArgumentParser(
prog="flux-dws2jgf",
formatter_class=flux.util.help_formatter(),
description="Print JGF representation of Rabbit nodes",
description=(
"Print JGF representation of Rabbit nodes. Reads R from stdin"
"or a config file passed with the --from-config option."
),
)
parser.add_argument(
"--no-validate",
Expand All @@ -281,26 +251,55 @@ def main():
"If unspecified, use the hostname stripped of numerics."
),
)
parser.add_argument(
"--from-config",
metavar="FILE",
help=(
"Generate JGF based on a Flux config TOML file containing "
"a resource.config table"
),
)
parser.add_argument(
"rabbitmapping",
metavar="FILE",
help=(
"Path to JSON object giving rabbit layout and capacity, as generated "
"e.g. by the 'flux rabbitmapping' script"
),
)
args = parser.parse_args()
if not args.cluster_name:
args.cluster_name = "".join(
i for i in socket.gethostname() if not i.isdigit()
).rstrip("-")

input_r = json.load(sys.stdin)
nnfs = [x for x in get_storage()["items"]]
if args.from_config is None:
input_r = json.load(sys.stdin)
else:
proc = subprocess.run(
f"flux R parse-config {args.from_config}".split(),
capture_output=True,
check=False,
)
if proc.returncode != 0:
raise ValueError(
f"Could not parse config file {args.from_config!r}, "
"error message was {proc.stderr}"
)
input_r = json.load(proc.stdout)
with open(args.rabbitmapping, "r", encoding="utf8") as rabbitmap_fd:
rabbit_mapping = json.load(rabbitmap_fd)
r_hostlist = Hostlist(input_r["execution"]["nodelist"])
dws_computes = set(
compute["name"]
for nnf in nnfs
for compute in nnf["status"]["access"].get("computes", [])
)
dws_computes = set(rabbit_mapping["computes"].keys())
if not args.no_validate and not dws_computes <= set(r_hostlist):
raise RuntimeError(
f"Node(s) {dws_computes - set(r_hostlist)} found in DWS but not R from stdin"
f"Node(s) {dws_computes - set(r_hostlist)} found in rabbit_mapping "
"but not R from stdin"
)
json.dump(
encode(input_r, nnfs, r_hostlist, args.chunks_per_nnf, args.cluster_name),
encode(
input_r, rabbit_mapping, r_hostlist, args.chunks_per_nnf, args.cluster_name
),
sys.stdout,
)

Expand Down
72 changes: 72 additions & 0 deletions src/cmd/flux-rabbitmapping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/usr/bin/env python3

import argparse
import sys
import json
import subprocess

import flux
from flux.hostlist import Hostlist

import kubernetes as k8s
from kubernetes.client.rest import ApiException


def get_storage(config_file):
k8s_client = k8s.config.new_client_from_config(config_file=config_file)
try:
api_instance = k8s.client.CustomObjectsApi(k8s_client)
except ApiException as rest_exception:
if rest_exception.status == 403:
raise Exception(
"You must be logged in to the K8s or OpenShift cluster to continue"
)
raise

group = "dataworkflowservices.github.io"
version = "v1alpha2"
plural = "storages"
return api_instance.list_cluster_custom_object(group, version, plural)


def main():
parser = argparse.ArgumentParser(
formatter_class=flux.util.help_formatter(),
description=("Create a mapping between compute nodes and rabbit nodes"),
)
parser.add_argument(
"--kubeconfig",
"-k",
default=None,
metavar="FILE",
help="Path to kubeconfig file to use",
)
parser.add_argument(
"--indent",
"-i",
default=None,
type=int,
metavar="N",
help="Number of spaces to indent output JSON document",
)
parser.add_argument(
"--nosort",
action="store_false",
help="Do not sort keys in output JSON document",
)
args = parser.parse_args()
rabbit_mapping = {"computes": {}, "rabbits": {}}
for nnf in get_storage(args.kubeconfig)["items"]:
hlist = Hostlist()
nnf_name = nnf["metadata"]["name"]
for compute in nnf["status"]["access"].get("computes", []):
hlist.append(compute["name"])
rabbit_mapping["computes"][compute["name"]] = nnf_name
rabbit_mapping["rabbits"][nnf_name] = {}
rabbit_mapping["rabbits"][nnf_name]["hostlist"] = hlist.uniq().encode()
rabbit_mapping["rabbits"][nnf_name]["capacity"] = nnf["status"]["capacity"]
json.dump(rabbit_mapping, sys.stdout, indent=args.indent, sort_keys=args.nosort)


if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion src/python/flux_k8s/directivebreakdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def validate(self, nodecount):
requested = getattr(self, attr)
allowable = getattr(type(self), attr)
if attr == "lustre":
requested = requested / nodecount
requested = requested // nodecount
if allowable is not None and requested > allowable:
raise ValueError(
f"Requested a total of {requested} GiB of {attr} storage "
Expand Down
18 changes: 18 additions & 0 deletions t/data/dws2jgf/rabbits.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"computes": {
"compute-01": "kind-worker2",
"compute-02": "kind-worker2",
"compute-03": "kind-worker2",
"compute-04": "kind-worker3"
},
"rabbits": {
"kind-worker2": {
"capacity": 39582418599936,
"hostlist": "compute-[01-03]"
},
"kind-worker3": {
"capacity": 39582418599936,
"hostlist": "compute-04"
}
}
}
3 changes: 2 additions & 1 deletion t/t1002-dws-workflow-obj.t
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ test_expect_success 'job submission with valid DW string works with fluxion-rabb

test_expect_success 'load fluxion with rabbits' '
flux cancel ${DWS_JOBID} &&
flux python ${FLUX_SOURCE_DIR}/src/cmd/flux-rabbitmapping.py > rabbits.json &&
flux R encode -l | flux python ${FLUX_SOURCE_DIR}/src/cmd/flux-dws2jgf.py \
--no-validate | jq . > R.local &&
--no-validate rabbits.json | jq . > R.local &&
flux kvs put resource.R="$(cat R.local)" &&
flux module remove -f sched-fluxion-qmanager &&
flux module remove -f sched-fluxion-resource &&
Expand Down
5 changes: 3 additions & 2 deletions t/t1003-dws-nnf-watch.t
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ test_expect_success 'job-manager: load alloc-bypass plugin' '

test_expect_success 'load Fluxion with rabbit resource graph' '
echo $PYTHONPATH >&2 &&
flux python ${FLUX_SOURCE_DIR}/src/cmd/flux-rabbitmapping.py > rabbits.json &&
flux R encode -l | flux python ${FLUX_SOURCE_DIR}/src/cmd/flux-dws2jgf.py \
--no-validate | jq . > R.local &&
--no-validate rabbits.json | jq . > R.local &&
flux kvs put resource.R="$(cat R.local)" &&
flux module remove -f sched-fluxion-qmanager &&
flux module remove -f sched-fluxion-resource &&
Expand Down Expand Up @@ -172,7 +173,7 @@ test_expect_success 'exec Storage watching script with invalid --drain-queues ar
test_expect_success 'configure flux with queues' '
flux R encode -l | jq ".execution.properties.debug = \"0\"" | \
flux python ${FLUX_SOURCE_DIR}/src/cmd/flux-dws2jgf.py \
--no-validate | jq . > R.local.queues &&
--no-validate rabbits.json | jq . > R.local.queues &&
flux kvs put resource.R="$(cat R.local.queues)" &&
flux module remove -f sched-fluxion-qmanager &&
flux module remove -f sched-fluxion-resource &&
Expand Down
Loading
Loading