Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dws: crudely enforce mdt count constraints #181

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 6 additions & 45 deletions src/modules/coral2_dws.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import pwd
import time
import pathlib
import math

import kubernetes as k8s
from kubernetes.client.rest import ApiException
Expand Down Expand Up @@ -285,50 +284,12 @@ def setup_cb(handle, _t, msg, k8s_api):
# if a breakdown doesn't have a storage field (e.g. persistentdw) directives
# ignore it and proceed
if "storage" in breakdown["status"]:
allocation_sets = []
for alloc_set in breakdown["status"]["storage"]["allocationSets"]:
storage_field = []
server_alloc_set = {
"allocationSize": alloc_set["minimumCapacity"],
"label": alloc_set["label"],
"storage": storage_field,
}
if (
alloc_set["allocationStrategy"]
== directivebreakdown.AllocationStrategy.PER_COMPUTE.value
):
# make an allocation on every rabbit attached to compute nodes
# in the job
for nnf_name, nodecount in nodes_per_nnf.items():
storage_field.append(
{
"allocationCount": nodecount,
"name": nnf_name,
}
)
elif (
alloc_set["allocationStrategy"]
== directivebreakdown.AllocationStrategy.ACROSS_SERVERS.value
):
nodecount_gcd = functools.reduce(math.gcd, nodes_per_nnf.values())
server_alloc_set["allocationSize"] = math.ceil(
nodecount_gcd * alloc_set["minimumCapacity"] / len(hlist)
)
# split lustre across every rabbit, weighting the split based on
# the number of the job's nodes associated with each rabbit
for rabbit_name in nodes_per_nnf:
storage_field.append(
{
"allocationCount": nodes_per_nnf[rabbit_name]
/ nodecount_gcd,
"name": rabbit_name,
}
)
# enforce the minimum allocation size
server_alloc_set["allocationSize"] = max(
server_alloc_set["allocationSize"], _MIN_ALLOCATION_SIZE * 1024**3
)
allocation_sets.append(server_alloc_set)
allocation_sets = directivebreakdown.build_allocation_sets(
breakdown["status"]["storage"]["allocationSets"],
nodes_per_nnf,
hlist,
_MIN_ALLOCATION_SIZE,
)
k8s_api.patch_namespaced_custom_object(
SERVER_CRD.group,
SERVER_CRD.version,
Expand Down
102 changes: 70 additions & 32 deletions src/python/flux_k8s/directivebreakdown.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
"""Module defining functions related to DirectiveBreakdown resources."""

import enum
import copy
import functools
import math
import collections

from flux_k8s.crd import DIRECTIVEBREAKDOWN_CRD


class AllocationStrategy(enum.Enum):
"""Enum defining different AllocationStrategies."""

PER_COMPUTE = "AllocatePerCompute"
SINGLE_SERVER = "AllocateSingleServer"
ACROSS_SERVERS = "AllocateAcrossServers"
Expand All @@ -14,47 +21,78 @@ class AllocationStrategy(enum.Enum):
LUSTRE_TYPES = ("ost", "mdt", "mgt", "mgtmdt")


def build_allocation_sets(allocation_sets, local_allocations, nodes_per_nnf):
ret = []
for allocation in allocation_sets:
alloc_entry = {
"allocationSize": 0,
"label": allocation["label"],
"storage": [],
def build_allocation_sets(breakdown_alloc_sets, nodes_per_nnf, hlist, min_alloc_size):
"""Build the allocationSet for a Server based on its DirectiveBreakdown."""
allocation_sets = []
for alloc_set in breakdown_alloc_sets:
storage_field = []
server_alloc_set = {
"allocationSize": alloc_set["minimumCapacity"],
"label": alloc_set["label"],
"storage": storage_field,
}
max_alloc_size = 0
# build the storage field of alloc_entry
for nnf_name in local_allocations:
if allocation["label"] in PER_COMPUTE_TYPES:
alloc_size = int(
local_allocations[nnf_name]
* allocation["percentage_of_total"]
/ nodes_per_nnf[nnf_name]
if alloc_set["allocationStrategy"] == AllocationStrategy.PER_COMPUTE.value:
# make an allocation on every rabbit attached to compute nodes
# in the job
for nnf_name, nodecount in nodes_per_nnf.items():
storage_field.append(
{
"allocationCount": nodecount,
"name": nnf_name,
}
)
if alloc_size < allocation["minimumCapacity"]:
raise RuntimeError(
"Expected an allocation size of at least "
f"{allocation['minimumCapacity']}, got {alloc_size}"
)
if max_alloc_size == 0:
max_alloc_size = alloc_size
else:
max_alloc_size = min(max_alloc_size, alloc_size)
alloc_entry["storage"].append(
{"allocationCount": nodes_per_nnf[nnf_name], "name": nnf_name}
elif alloc_set["allocationStrategy"] == AllocationStrategy.ACROSS_SERVERS.value:
if "count" in alloc_set.get("constraints", {}):
# a specific number of allocations is required (generally for MDTs)
count = alloc_set["constraints"]["count"]
server_alloc_set["allocationSize"] = math.ceil(
alloc_set["minimumCapacity"] / count
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably just a dumb question, but could count ever be 0 in case the key-value lookup doesn't succeed for whatever reason? Could maybe be helpful to check before attempting this division, although Python generally doesn't freak out when attempting division by 0, just raises an error :) I'll leave it up to you whether that could be helpful

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think count could be 0, but then something in the k8s environment must have been misconfigured and would need tweaking by an admin. I think the zerodivisionerror would be OK here. Good catch though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds great - just wanted to make sure. thanks!!

# place the allocations on the rabbits with the most nodes allocated
# to this job (and therefore the largest storage allocations)
while count > 0:
# count may be greater than the rabbits available, so we may need
# to place multiple on a single rabbit (hence the outer while-loop)
for name, _ in collections.Counter(nodes_per_nnf).most_common(
count
):
storage_field.append(
{
"allocationCount": 1,
"name": name,
}
)
count -= 1
if count == 0:
break
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you change the while condition to >= 1, maybe you don't need to add this extra if check at the end of every loop?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how >=1 would be different than > 0? Also the if check is just to break out of the inner loop, which would otherwise proceed without regard to the value of count--I wanted to make sure that exactly count allocations were made.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha - my mistake!! Thanks for the clarification

else:
raise ValueError(f"{allocation['label']} not currently supported")
alloc_entry["allocationSize"] = max_alloc_size
ret.append(alloc_entry)
return ret
nodecount_gcd = functools.reduce(math.gcd, nodes_per_nnf.values())
server_alloc_set["allocationSize"] = math.ceil(
nodecount_gcd * alloc_set["minimumCapacity"] / len(hlist)
)
# split lustre across every rabbit, weighting the split based on
# the number of the job's nodes associated with each rabbit
for rabbit_name in nodes_per_nnf:
storage_field.append(
{
"allocationCount": int(
nodes_per_nnf[rabbit_name] / nodecount_gcd
),
"name": rabbit_name,
}
)
# enforce the minimum allocation size
server_alloc_set["allocationSize"] = max(
server_alloc_set["allocationSize"], min_alloc_size * 1024**3
)
allocation_sets.append(server_alloc_set)
return allocation_sets


def apply_breakdowns(k8s_api, workflow, old_resources, min_size):
"""Apply all of the directive breakdown information to a jobspec's `resources`."""
resources = copy.deepcopy(old_resources)
breakdown_list = list(fetch_breakdowns(k8s_api, workflow))
per_compute_total = 0 # total bytes of per-compute storage
if not resources:
raise ValueError("jobspec resources empty")
if len(resources) > 1 or resources[0]["type"] != "node":
Expand Down Expand Up @@ -99,7 +137,7 @@ def apply_breakdowns(k8s_api, workflow, old_resources, min_size):
def fetch_breakdowns(k8s_api, workflow):
"""Fetch all of the directive breakdowns associated with a workflow."""
if not workflow["status"].get("directiveBreakdowns"):
return [] # destroy_persistent DW directives have no breakdowns
return # destroy_persistent DW directives have no breakdowns
for breakdown in workflow["status"]["directiveBreakdowns"]:
yield k8s_api.get_namespaced_custom_object(
DIRECTIVEBREAKDOWN_CRD.group,
Expand Down
Loading