Skip to content

Commit

Permalink
Merge pull request #149 from jameshcorbett/disable-draining
Browse files Browse the repository at this point in the history
dws: add option to disable draining
  • Loading branch information
mergify[bot] authored Apr 10, 2024
2 parents 24719bd + 4581755 commit b2d848b
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 7 deletions.
30 changes: 24 additions & 6 deletions src/modules/coral2_dws.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,9 @@ def _workflow_state_change_cb_inner(workflow, jobid, winfo, handle, k8s_api):
WORKFLOWS_IN_TC.discard(winfo)


def drain_offline_nodes(handle, rabbit_name, nodelist):
def drain_offline_nodes(handle, rabbit_name, nodelist, disable_draining):
if disable_draining:
return
offline_nodes = Hostlist()
for compute_node in nodelist:
if compute_node["status"] != "Ready":
Expand Down Expand Up @@ -474,7 +476,7 @@ def mark_rabbit(handle, status, resource_path):
handle.rpc("sched-fluxion-resource.set_status", payload).then(log_rpc_response)


def rabbit_state_change_cb(event, handle, rabbit_rpaths):
def rabbit_state_change_cb(event, handle, rabbit_rpaths, disable_draining):
"""Callback firing when a Storage object changes.
Marks a rabbit as up or down.
Expand All @@ -488,7 +490,9 @@ def rabbit_state_change_cb(event, handle, rabbit_rpaths):
)
return
mark_rabbit(handle, status, rabbit_rpaths[name])
drain_offline_nodes(handle, name, rabbit["status"]["access"]["computes"])
drain_offline_nodes(
handle, name, rabbit["status"]["access"]["computes"], disable_draining
)
# TODO: add some check for whether rabbit capacity has changed
# TODO: update capacity of rabbit in resource graph (mark some slices down?)

Expand All @@ -507,7 +511,7 @@ def map_rabbits_to_fluxion_paths(graph_path):
return rabbit_rpaths


def init_rabbits(k8s_api, handle, watchers, graph_path):
def init_rabbits(k8s_api, handle, watchers, graph_path, disable_draining):
"""Watch every rabbit ('Storage' resources in k8s) known to k8s.
Whenever a Storage resource changes, mark it as 'up' or 'down' in Fluxion.
Expand All @@ -528,7 +532,9 @@ def init_rabbits(k8s_api, handle, watchers, graph_path):
"Encountered an unknown Storage object '%s' in the event stream", name
)
mark_rabbit(handle, rabbit["status"]["status"], rabbit_rpaths[name])
drain_offline_nodes(handle, name, rabbit["status"]["access"]["computes"])
drain_offline_nodes(
handle, name, rabbit["status"]["access"]["computes"], disable_draining
)
watchers.add_watch(
Watch(
k8s_api,
Expand All @@ -537,6 +543,7 @@ def init_rabbits(k8s_api, handle, watchers, graph_path):
rabbit_state_change_cb,
handle,
rabbit_rpaths,
disable_draining,
)
)

Expand Down Expand Up @@ -609,6 +616,11 @@ def setup_parsing():
metavar="N",
help="Minimum allocation size of rabbit allocations, in bytes",
)
parser.add_argument(
"--disable-compute-node-draining",
action="store_true",
help="Disable the draining of compute nodes based on k8s status",
)
return parser


Expand Down Expand Up @@ -713,7 +725,13 @@ def main():
# start watching k8s workflow resources and operate on them when updates occur
# or new RPCs are received
with Watchers(handle, watch_interval=args.watch_interval) as watchers:
init_rabbits(k8s_api, handle, watchers, args.resourcegraph)
init_rabbits(
k8s_api,
handle,
watchers,
args.resourcegraph,
args.disable_compute_node_draining,
)
services = register_services(handle, k8s_api)
watchers.add_watch(
Watch(k8s_api, WORKFLOW_CRD, 0, workflow_state_change_cb, handle, k8s_api)
Expand Down
25 changes: 24 additions & 1 deletion t/t1003-dws-nnf-watch.t
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,30 @@ test_expect_success 'test that flux drains Offline compute nodes' '
-p "[{\"op\":\"replace\", \"path\":\"/status/access/computes/0/status\", \"value\": \"Disabled\"}]" &&
kubectl get storages kind-worker2 -ojson | jq -e ".status.access.computes[0].status == \"Disabled\"" &&
sleep 2.5 && flux resource drain | grep compute-01 &&
flux resource undrain compute-01
flux resource undrain compute-01 &&
test_must_fail bash -c "flux resource drain | grep compute-01"
'

test_expect_success 'exec Storage watching script with --disable-draining' '
flux cancel ${jobid} &&
jobid=$(flux submit \
--setattr=system.alloc-bypass.R="$(cat R.local)" --output=dws.out --error=dws.err \
-o per-resource.type=node flux python ${DWS_MODULE_PATH} -vvv -rR.local \
--disable-compute-node-draining) &&
flux job wait-event -vt 15 -p guest.exec.eventlog ${jobid} shell.start
'

test_expect_success 'test that flux does not drain Offline compute nodes with --disable-draining' '
kubectl get storages kind-worker2 -ojson | jq -e ".spec.mode == \"Testing\"" &&
kubectl patch storage kind-worker2 --subresource=status --type=json \
-p "[{\"op\":\"replace\", \"path\":\"/status/access/computes/0/status\", \"value\": \"Ready\"}]" &&
kubectl get storages kind-worker2 -ojson | jq -e ".status.access.computes[0].status == \"Ready\"" &&
test_must_fail bash -c "flux resource drain | grep compute-01" &&
kubectl patch storage kind-worker2 --subresource=status --type=json \
-p "[{\"op\":\"replace\", \"path\":\"/status/access/computes/0/status\", \"value\": \"Disabled\"}]" &&
kubectl get storages kind-worker2 -ojson | jq -e ".status.access.computes[0].status == \"Disabled\"" &&
sleep 2 &&
test_must_fail bash -c "flux resource drain | grep compute-01"
'

test_expect_success 'return the storage resource to Live mode' '
Expand Down

0 comments on commit b2d848b

Please sign in to comment.