From 4573f1a2a3bad410d910d3a38a79fafca071f4c1 Mon Sep 17 00:00:00 2001 From: James Corbett Date: Wed, 10 Apr 2024 13:02:48 -0700 Subject: [PATCH 1/2] dws: add option to disable compute-node draining Problem: the coral2-dws script drains nodes aggressively, sometimes more than is desirable. Add a command-line option to disable draining. --- src/modules/coral2_dws.py | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/src/modules/coral2_dws.py b/src/modules/coral2_dws.py index 120ab1b..c42a8a3 100755 --- a/src/modules/coral2_dws.py +++ b/src/modules/coral2_dws.py @@ -443,7 +443,9 @@ def _workflow_state_change_cb_inner(workflow, jobid, winfo, handle, k8s_api): WORKFLOWS_IN_TC.discard(winfo) -def drain_offline_nodes(handle, rabbit_name, nodelist): +def drain_offline_nodes(handle, rabbit_name, nodelist, disable_draining): + if disable_draining: + return offline_nodes = Hostlist() for compute_node in nodelist: if compute_node["status"] != "Ready": @@ -474,7 +476,7 @@ def mark_rabbit(handle, status, resource_path): handle.rpc("sched-fluxion-resource.set_status", payload).then(log_rpc_response) -def rabbit_state_change_cb(event, handle, rabbit_rpaths): +def rabbit_state_change_cb(event, handle, rabbit_rpaths, disable_draining): """Callback firing when a Storage object changes. Marks a rabbit as up or down. @@ -488,7 +490,9 @@ def rabbit_state_change_cb(event, handle, rabbit_rpaths): ) return mark_rabbit(handle, status, rabbit_rpaths[name]) - drain_offline_nodes(handle, name, rabbit["status"]["access"]["computes"]) + drain_offline_nodes( + handle, name, rabbit["status"]["access"]["computes"], disable_draining + ) # TODO: add some check for whether rabbit capacity has changed # TODO: update capacity of rabbit in resource graph (mark some slices down?) @@ -507,7 +511,7 @@ def map_rabbits_to_fluxion_paths(graph_path): return rabbit_rpaths -def init_rabbits(k8s_api, handle, watchers, graph_path): +def init_rabbits(k8s_api, handle, watchers, graph_path, disable_draining): """Watch every rabbit ('Storage' resources in k8s) known to k8s. Whenever a Storage resource changes, mark it as 'up' or 'down' in Fluxion. @@ -528,7 +532,9 @@ def init_rabbits(k8s_api, handle, watchers, graph_path): "Encountered an unknown Storage object '%s' in the event stream", name ) mark_rabbit(handle, rabbit["status"]["status"], rabbit_rpaths[name]) - drain_offline_nodes(handle, name, rabbit["status"]["access"]["computes"]) + drain_offline_nodes( + handle, name, rabbit["status"]["access"]["computes"], disable_draining + ) watchers.add_watch( Watch( k8s_api, @@ -537,6 +543,7 @@ def init_rabbits(k8s_api, handle, watchers, graph_path): rabbit_state_change_cb, handle, rabbit_rpaths, + disable_draining, ) ) @@ -609,6 +616,11 @@ def setup_parsing(): metavar="N", help="Minimum allocation size of rabbit allocations, in bytes", ) + parser.add_argument( + "--disable-compute-node-draining", + action="store_true", + help="Disable the draining of compute nodes based on k8s status", + ) return parser @@ -713,7 +725,13 @@ def main(): # start watching k8s workflow resources and operate on them when updates occur # or new RPCs are received with Watchers(handle, watch_interval=args.watch_interval) as watchers: - init_rabbits(k8s_api, handle, watchers, args.resourcegraph) + init_rabbits( + k8s_api, + handle, + watchers, + args.resourcegraph, + args.disable_compute_node_draining, + ) services = register_services(handle, k8s_api) watchers.add_watch( Watch(k8s_api, WORKFLOW_CRD, 0, workflow_state_change_cb, handle, k8s_api) From 45817556bcc994ce673e2bbad1fed7e16e35e1d0 Mon Sep 17 00:00:00 2001 From: James Corbett Date: Wed, 10 Apr 2024 13:06:28 -0700 Subject: [PATCH 2/2] test: add tests for disabling dws node draining Problem: there are no tests for ensuring that the command-line option to disable the draining of compute nodes works properly. Add tests. --- t/t1003-dws-nnf-watch.t | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/t/t1003-dws-nnf-watch.t b/t/t1003-dws-nnf-watch.t index aaca5e8..1666b25 100755 --- a/t/t1003-dws-nnf-watch.t +++ b/t/t1003-dws-nnf-watch.t @@ -126,7 +126,30 @@ test_expect_success 'test that flux drains Offline compute nodes' ' -p "[{\"op\":\"replace\", \"path\":\"/status/access/computes/0/status\", \"value\": \"Disabled\"}]" && kubectl get storages kind-worker2 -ojson | jq -e ".status.access.computes[0].status == \"Disabled\"" && sleep 2.5 && flux resource drain | grep compute-01 && - flux resource undrain compute-01 + flux resource undrain compute-01 && + test_must_fail bash -c "flux resource drain | grep compute-01" +' + +test_expect_success 'exec Storage watching script with --disable-draining' ' + flux cancel ${jobid} && + jobid=$(flux submit \ + --setattr=system.alloc-bypass.R="$(cat R.local)" --output=dws.out --error=dws.err \ + -o per-resource.type=node flux python ${DWS_MODULE_PATH} -vvv -rR.local \ + --disable-compute-node-draining) && + flux job wait-event -vt 15 -p guest.exec.eventlog ${jobid} shell.start +' + +test_expect_success 'test that flux does not drain Offline compute nodes with --disable-draining' ' + kubectl get storages kind-worker2 -ojson | jq -e ".spec.mode == \"Testing\"" && + kubectl patch storage kind-worker2 --subresource=status --type=json \ + -p "[{\"op\":\"replace\", \"path\":\"/status/access/computes/0/status\", \"value\": \"Ready\"}]" && + kubectl get storages kind-worker2 -ojson | jq -e ".status.access.computes[0].status == \"Ready\"" && + test_must_fail bash -c "flux resource drain | grep compute-01" && + kubectl patch storage kind-worker2 --subresource=status --type=json \ + -p "[{\"op\":\"replace\", \"path\":\"/status/access/computes/0/status\", \"value\": \"Disabled\"}]" && + kubectl get storages kind-worker2 -ojson | jq -e ".status.access.computes[0].status == \"Disabled\"" && + sleep 2 && + test_must_fail bash -c "flux resource drain | grep compute-01" ' test_expect_success 'return the storage resource to Live mode' '