From b79fb7c3eb7092aeef83e7dc1d1cf5d1a6313637 Mon Sep 17 00:00:00 2001 From: Dean Roehrich Date: Fri, 22 Sep 2023 08:54:23 -0500 Subject: [PATCH 1/2] Handle DWS fatal errors Set Flags=TeardownFailure in burst_buffer.conf. This tells Slurm to go to Teardown if there are errors during stage_in or stage_out. In the burst_buffer.lua plugin, adjust slurm_bb_job_teardown() to check the workflow for any fatal errors recorded in the drivers array. If any are found, then we know we were called in a fatal error situation and we not only delete the workflow, per usual, but we also call the Slurm `scancel` command to cancel the Slurm job. Signed-off-by: Dean Roehrich --- src/burst_buffer/burst_buffer.conf | 6 ++ src/burst_buffer/burst_buffer.lua | 62 +++++++++++-- .../src/features/test_dws_states.feature | 65 +++++++------- .../src/features/test_environment.feature | 2 +- testsuite/integration/src/pytest.ini | 5 +- testsuite/integration/src/tests/conftest.py | 18 +--- .../tests/dws_bb_plugin/test_dws_states.py | 86 +++++-------------- testsuite/integration/src/tests/slurmctld.py | 45 +++++----- testsuite/unit/src/burst_buffer/dws-test.lua | 4 +- 9 files changed, 149 insertions(+), 144 deletions(-) diff --git a/src/burst_buffer/burst_buffer.conf b/src/burst_buffer/burst_buffer.conf index 67d41cc..04427d2 100644 --- a/src/burst_buffer/burst_buffer.conf +++ b/src/burst_buffer/burst_buffer.conf @@ -3,3 +3,9 @@ # See https://slurm.schedmd.com/burst_buffer.conf.html Directive=DW +# If set, then teardown a burst buffer after file staging error. Otherwise +# preserve the burst buffer for analysis and manual teardown. +# See https://slurm.schedmd.com/burst_buffer.conf.html +# and https://slurm.schedmd.com/burst_buffer.html#states +Flags=TeardownFailure + diff --git a/src/burst_buffer/burst_buffer.lua b/src/burst_buffer/burst_buffer.lua index be0a963..b1c80a8 100644 --- a/src/burst_buffer/burst_buffer.lua +++ b/src/burst_buffer/burst_buffer.lua @@ -280,9 +280,16 @@ end -- DWS:get_driver_errors will collect driver errors from the Workflow resource -- with respect to the given state. -function DWS:get_driver_errors(state) - local error_list = {} - local jsonpath = [[{range .status.drivers[?(@.watchState=="]].. state ..[[")]}==={"\n"}{@.status}{"\n"}{@.driverID}{"\n"}{@.error}{"\n"}{end}]] +-- If all_errors=true then collect all errors from all states in all drivers. +-- On success this returns true and a string with all of the errors. +-- On failure this returns false, an empty string for the errors, and a string +-- explaining why it couldn't collect the errors. +function DWS:get_driver_errors(state, all_errors) + local driver_index = [[?(@.watchState=="]].. state ..[[")]] + if all_errors == true then + driver_index = "*" + end + local jsonpath = [[{range .status.drivers[]] .. driver_index .. [[]}==={"\n"}{@.status}{"\n"}{@.driverID}{"\n"}{@.error}{"\n"}{end}]] local ret, output = self:get_jsonpath(jsonpath) if ret == false then return ret, "", "could not get driver errors: " .. output @@ -442,6 +449,18 @@ function DWS:kubectl(cmd) return self:io_popen(kcmd) end +-- DWS:scancel will run the Slurm scancel command and collect its output. +-- On success this returns true and the output of the command. +-- On failure this returns false and the output of the command. +function DWS:scancel(jobId, hurry) + local hurry_opt = "" + if hurry == true then + hurry_opt = "--hurry " + end + local scmd = "scancel " .. hurry_opt .. jobId + return self:io_popen(scmd) +end + -- DWS:io_popen will run the given command and collect its output. -- On success this returns true and the output of the command. -- On failure this returns false and the output of the command. @@ -627,24 +646,51 @@ function slurm_bb_job_teardown(job_id, job_script, hurry) hurry_flag = true end local workflow = DWS(make_workflow_name(job_id)) - local done, err = workflow:set_workflow_state_and_wait("Teardown", hurry_flag) + + local ret = slurm.SUCCESS + -- Does the workflow have a fatal error in it? + -- If so, we'll call scancel as well. + local done, state_errors, err = workflow:get_driver_errors("", true) if done == false then if string.find(err, [["]] .. workflow.name .. [[" not found]]) then -- It's already gone, and that's what we wanted anyway. return slurm.SUCCESS else - slurm.log_error("%s: slurm_bb_job_teardown(), workflow=%s: %s", lua_script_name, workflow.name, err) - return slurm.ERROR, err + slurm.log_error("%s: slurm_bb_job_teardown(), workflow=%s: unable to check driver errors: %s", lua_script_name, workflow.name, err) + ret = slurm.ERROR + -- fall-through, let the Workflow delete happen. end end + done, err = workflow:set_workflow_state_and_wait("Teardown", hurry_flag) + if done == false then + slurm.log_error("%s: slurm_bb_job_teardown(), workflow=%s: %s", lua_script_name, workflow.name, err) + ret = slurm.ERROR + -- fall-through, let the Workflow delete happen. + end + done, err = workflow:delete() if done == false then slurm.log_error("%s: slurm_bb_job_teardown(), workflow=%s, delete: %s", lua_script_name, workflow.name, err) - return slurm.ERROR, err + ret = slurm.ERROR + -- fall-through, let any necessary scancel happen. + end + + if state_errors ~= "" then + -- Now do the scancel. This will terminate this Lua script and will + -- trigger slurm to call our teardown again, but that'll be a no-op + -- when it comes back here. + slurm.log_info("%s: slurm_bb_job_teardown(), workflow=%s: executing scancel --hurry %s, found driver errors: %s", lua_script_name, workflow.name, job_id, state_errors) + _, err = workflow:scancel(job_id, true) + if err == "" then + err = "(no output)" + end end - return slurm.SUCCESS + if ret == slurm.SUCCESS then + err = nil + end + return ret, err end --[[ diff --git a/testsuite/integration/src/features/test_dws_states.feature b/testsuite/integration/src/features/test_dws_states.feature index 92b921b..99ca4f4 100644 --- a/testsuite/integration/src/features/test_dws_states.feature +++ b/testsuite/integration/src/features/test_dws_states.feature @@ -22,6 +22,7 @@ Feature: Data Workflow Services State Progression Verify that the DWS-Slurm Burst Buffer Plugin progresses through Data Workflow Services states + @happy_one Scenario: The DWS-BB Plugin progresses through DWS states Given a job script: #!/bin/bash @@ -44,13 +45,15 @@ Feature: Data Workflow Services State Progression And the Workflow and job progress to the PostRun state And the Workflow and job progress to the DataOut state And the Workflow and job progress to the Teardown state - And the job state is COMPLETED + And the job has eventually been COMPLETED # DWS does not allow spaces in key/value pairs in directives. To skirt around this # constraint, the dws-test-driver replaces underscores ("_") in the message value with # spaces. This ensures that the dws-slurm-plugin can handle whitespace in error messages # It also makes it easier to check that the error is included in scontrol output. - Scenario Outline: The DWS-BB Plugin can handle fatal driver errors before being canceled + # This scenario assumes that "Flags=TeardownFailure" is set in burst_buffer.conf. + @fatal_one + Scenario Outline: Report fatal errors from Proposal, Setup, DataIn, PreRun Given a job script: #!/bin/bash @@ -59,12 +62,13 @@ Feature: Data Workflow Services State Progression /bin/hostname When the job is run - Then a Workflow has been created for the job - And the Workflow and job report fatal errors at the state - And the job is canceled - And the Workflow and job progress to the Teardown state - And the job's final system comment contains the following: + And some Workflow has been created for the job + And the Workflow reports fatal errors at the state + Then the job's system comment eventually contains the following: TEST FATAL ERROR + And the Workflow and job progress to the Teardown state + And the job has eventually been CANCELLED + And the Workflow has eventually been deleted Examples: # *** HEADER *** @@ -73,14 +77,15 @@ Feature: Data Workflow Services State Progression | Proposal | | Setup | | DataIn | - | PostRun | - | DataOut | + | PreRun | - # With the exception of PreRun, states will need to be canceled with the - # "--hurry" flag to transition to the Teardown state. If - # "Flags=TeardownFailure" is set in burst_buffer.conf, then all states will - # transition to Teardown without needing to be canceled - Scenario Outline: The DWS-BB Plugin can handle fatal driver errors for PreRun + # DWS does not allow spaces in key/value pairs in directives. To skirt around this + # constraint, the dws-test-driver replaces underscores ("_") in the message value with + # spaces. This ensures that the dws-slurm-plugin can handle whitespace in error messages + # It also makes it easier to check that the error is included in scontrol output. + # This scenario assumes that "Flags=TeardownFailure" is set in burst_buffer.conf. + @fatal_two + Scenario Outline: Report fatal errors from PostRun and DataOut Given a job script: #!/bin/bash @@ -89,22 +94,23 @@ Feature: Data Workflow Services State Progression /bin/hostname When the job is run - Then a Workflow has been created for the job - And the Workflow reports a fatal error in the state - And the Workflow and job progress to the Teardown state - # Slurm moved it from PreRun/Error to Teardown without canceling - # the job. So the driver (this test) must cancel it. - And the job is canceled - And the job's final system comment contains the following: + And some Workflow has been created for the job + And the Workflow reports fatal errors at the state + Then the job's system comment eventually contains the following: TEST FATAL ERROR + And the Workflow and job progress to the Teardown state + And the job has eventually been COMPLETED + And the Workflow has eventually been deleted Examples: # *** HEADER *** | workflowState | # *** VALUES *** - | PreRun | + | PostRun | + | DataOut | - Scenario: The DWS-BB Plugin can handle fatal driver errors during Teardown + @fatal_three + Scenario: Report fatal errors from Teardown Given a job script: #!/bin/bash @@ -112,12 +118,7 @@ Feature: Data Workflow Services State Progression /bin/hostname When the job is run - Then a Workflow has been created for the job - And the Workflow reports a fatal error in the Teardown state - And the job's intermediate system comment contains the following: - TEST FATAL ERROR - # Eventually the driver (this test) must work through the Teardown - # issues and complete that step. Slurm has already marked the job - # as completed and is now looping over slurm_bb_job_teardown() in - # burst_buffer.lua. - And the Workflow error is cleared from the Teardown state + And some Workflow has been created for the job + And the Workflow reports fatal errors at the Teardown state + Then the job has eventually been COMPLETED + And the Workflow has eventually been deleted diff --git a/testsuite/integration/src/features/test_environment.feature b/testsuite/integration/src/features/test_environment.feature index dfbd412..4e2f657 100644 --- a/testsuite/integration/src/features/test_environment.feature +++ b/testsuite/integration/src/features/test_environment.feature @@ -36,7 +36,7 @@ Feature: Integration test environment srun -l /bin/hostname srun -l /bin/pwd When the job is run - Then the job state is COMPLETED + Then the job has eventually been COMPLETED Scenario: Kubernetes and slurm are connected Given the kubernetes cluster kube-system UID diff --git a/testsuite/integration/src/pytest.ini b/testsuite/integration/src/pytest.ini index 5399438..f58024e 100644 --- a/testsuite/integration/src/pytest.ini +++ b/testsuite/integration/src/pytest.ini @@ -22,4 +22,7 @@ bdd_features_base_dir = features markers = environment dws_states - + happy_one + fatal_one + fatal_two + fatal_three diff --git a/testsuite/integration/src/tests/conftest.py b/testsuite/integration/src/tests/conftest.py index 255d323..126d9c2 100644 --- a/testsuite/integration/src/tests/conftest.py +++ b/testsuite/integration/src/tests/conftest.py @@ -72,17 +72,7 @@ def _(slurmctld, script_path): # remove the slurm output from the jobs folder slurmctld.remove_job_output(jobId, outputFilePath, errorFilePath) -@then(parsers.parse('the job state is {expectedJobState}')) -def _(slurmctld, jobId, expectedJobState): - """the job state is """ - jobState, out = slurmctld.get_final_job_state(jobId) - - if expectedJobState == "COMPLETED" and jobState == "FAILED": - warnings.warn(ResourceWarning((f"Job {jobId} failed unexpectedly.\n") + \ - "This may happen if Slurm doesn't have enough resources to schedule the job.\n" + \ - "This is not considered a test failure, in this context, since DWS isn't\n" + \ - "dependent on the job's failure or success." - )) - return - - assert jobState == expectedJobState, "Unexpected Job State: " + jobState + "\n" + out +@then(parsers.parse('the job has eventually been {job_state:l}')) +def _(slurmctld, jobId, job_state): + """the job has eventually been """ + slurmctld.wait_until_job_has_been_x(jobId, job_state) diff --git a/testsuite/integration/src/tests/dws_bb_plugin/test_dws_states.py b/testsuite/integration/src/tests/dws_bb_plugin/test_dws_states.py index ff0277e..c5433ba 100644 --- a/testsuite/integration/src/tests/dws_bb_plugin/test_dws_states.py +++ b/testsuite/integration/src/tests/dws_bb_plugin/test_dws_states.py @@ -46,21 +46,19 @@ def _(k8s, jobId): except k8sclient.exceptions.ApiException: pass -@when(parsers.parse('the Workflow status becomes {status:l}')) -def _(slurmctld, jobId, status): - """the Workflow status becomes """ - workflowStatus = slurmctld.get_workflow_status(jobId) - assert workflowStatus["status"] == status +@when('some Workflow has been created for the job') +def _(k8s, jobId): + """some Workflow has been created for the job.""" + workflow = Workflow(k8s, jobId) + assert workflow.data is not None, "Workflow for Job: " + str(jobId) + " not found" -@then('the job is canceled') +@then('the Workflow has eventually been deleted') def _(slurmctld, jobId): - """the job is canceled""" - time.sleep(2) # Sleep long enough for bb plugin to poll workflow once or twice - slurmctld.cancel_job(jobId, False) - time.sleep(2) # Sleep long enough for the workflow to be deleted + """the Workflow has eventually been deleted""" + slurmctld.wait_until_workflow_is_gone(jobId) -def verify_job_status(slurmctld, jobId, state, status): - jobStatus = slurmctld.get_workflow_status(jobId) +def verify_job_bbstat(slurmctld, jobId, state, status): + jobStatus = slurmctld.scontrol_show_bbstat(jobId) assert jobStatus["desiredState"] == state, "Incorrect desired state: " + str(jobStatus) assert jobStatus["currentState"] == state, "Incorrect current state: " + str(jobStatus) assert jobStatus["status"] == status, "Incorrect status: " + str(jobStatus) @@ -73,18 +71,18 @@ def _(k8s, slurmctld, jobId, state): workflow = Workflow(k8s, jobId) workflow.wait_until( - f"the workflow transitions to {state}/{expectedStatus}", + f"the workflow transitioned to {state}/{expectedStatus}", lambda wf: wf.data["status"]["state"] == state and wf.data["status"]["status"] == expectedStatus ) - print("job %s progressed to state %s" % (str(jobId),state)) + print(f"job {jobId} progressed to state {state}") - verify_job_status(slurmctld, jobId, state, expectedStatus) + verify_job_bbstat(slurmctld, jobId, state, expectedStatus) # Set driver status to completed so the workflow can progress to the next state foundPendingDriverStatus = False for driverStatus in workflow.data["status"]["drivers"]: if driverStatus["driverID"] == "tester" and driverStatus["watchState"] == state and driverStatus["status"] == "Pending": - print("updating workflow %s to complete state %s" % (str(jobId), state)) + print(f"updating workflow {jobId} to complete state {state}") driverStatus["completed"] = True driverStatus["status"] = "Completed" foundPendingDriverStatus = True @@ -93,27 +91,6 @@ def _(k8s, slurmctld, jobId, state): assert foundPendingDriverStatus, "Driver not found with \"Pending\" status" workflow.save_driver_statuses() -@then(parsers.parse('the Workflow error is cleared from the {state:l} state')) -def _(k8s, slurmctld, jobId, state): - """the Workflow error is cleared from the state.""" - - workflow = Workflow(k8s, jobId) - - # Set driver status to completed so the workflow can progress to the next state - foundPendingDriverStatus = False - for driverStatus in workflow.data["status"]["drivers"]: - if driverStatus["driverID"] == "tester" and driverStatus["watchState"] == state and driverStatus["status"] == "Error": - print(f"updating workflow %s to complete state %s" % (str(jobId), state)) - driverStatus["completed"] = True - driverStatus["status"] = "Completed" - # The DWS webhook requires that the error message be cleared as well. - del driverStatus["error"] - foundPendingDriverStatus = True - break - - assert foundPendingDriverStatus, "Driver not found with \"Error\" status" - workflow.save_driver_statuses() - def driver_state_check(workflow, state, expected_status): found_it = False print(f"check drivers for state {state} with status {expected_status}") @@ -127,26 +104,9 @@ def driver_state_check(workflow, state, expected_status): break return found_it -@then(parsers.parse('the Workflow and job report fatal errors at the {state:l} state')) -def _(k8s, slurmctld, jobId, state): - """the Workflow and job report errors at the state.""" - - expected_status = "Error" - - def driver_check(workflow): - return driver_state_check(workflow, state, expected_status) - - workflow = Workflow(k8s, jobId) - workflow.wait_until( - f"the workflow {state} state shows a status of {expected_status}", - lambda wf: driver_check(wf) is True - ) - - verify_job_status(slurmctld, jobId, state, expected_status) - -@then(parsers.parse('the Workflow reports a fatal error in the {state:l} state')) +@when(parsers.parse('the Workflow reports fatal errors at the {state:l} state')) def _(k8s, slurmctld, jobId, state): - """the Workflow reports a fatal error in the state.""" + """the Workflow reports fatal errors at the state.""" expected_status = "Error" @@ -159,13 +119,7 @@ def driver_check(workflow): lambda wf: driver_check(wf) is True ) -@then(parsers.parse("the job's {disposition:l} system comment contains the following:\n{message}")) -def _(slurmctld, jobId, disposition, message): - assert disposition in ["final", "intermediate"], f"unknown disposition: {disposition}" - must_be_gone = True if disposition == "final" else False - _,out = slurmctld.get_final_job_state(jobId, must_be_gone) - m = re.search(r'\n\s+SystemComment=(.*)\n\s+StdErr=', out, re.DOTALL) - assert m is not None, f"Could not find SystemComment in job state from Slurm\n{out}" - if message in m.group(1): - print(f"Found \"{message}\" in SystemComment") - assert message in m.group(1) +@then(parsers.parse("the job's system comment eventually contains the following:\n{message}")) +def _(slurmctld, jobId, message): + print(f"looking for system comment with: {message}") + slurmctld.wait_until_job_system_comment(jobId, message) diff --git a/testsuite/integration/src/tests/slurmctld.py b/testsuite/integration/src/tests/slurmctld.py index f7869d1..4903d86 100644 --- a/testsuite/integration/src/tests/slurmctld.py +++ b/testsuite/integration/src/tests/slurmctld.py @@ -20,6 +20,7 @@ import os import time import docker +import re from tenacity import * # Submitting jobs can fail, occasionally, when the DWS webhook rejects the @@ -59,13 +60,6 @@ def submit_job(self, scriptPath): jobId = int(out.split()[-1]) return jobId, scriptPath + ".out", scriptPath + ".error.out" - def cancel_job(self, jobId, hurry_flag=False): - print("cancel job" + str(jobId)) - cmd = "scancel --hurry %s" % str(jobId) - rc, out = self.exec_run(cmd) - if rc != 0: - raise JobCancelError(out) - def remove_job_output(self, jobId, outputFilePath, errorFilePath): """ The creation of the job's output file will sometimes lag behind the @@ -92,19 +86,7 @@ def wait_until_workflow_is_gone(self, jobId): print(f"Workflow {jobId} still exists: " + out) raise JobNotCompleteError() - @retry( - wait=wait_fixed(2), - stop=stop_after_attempt(30), - retry=retry_if_result(lambda state: state[0] not in ["COMPLETED", "FAILED", "CANCELLED"]), - retry_error_callback=lambda retry_state: retry_state.outcome.result() - ) - def get_final_job_state(self, jobId, must_be_gone=True): - # When the job is finished, the workflow should not exist. - if must_be_gone: - self.wait_until_workflow_is_gone(jobId) - else: - time.sleep(5) # wait for workflow info to be transferred to the job - + def scontrol_show_job(self, jobId): rc, out = self.exec_run("scontrol show job " + str(jobId)) assert rc==0, "Could not get job state from Slurm:\n" + out @@ -119,8 +101,29 @@ def get_final_job_state(self, jobId, must_be_gone=True): print("JobState=" + keyVal[1]) return keyVal[1], out assert False, "Could not parse state from: " + out + + @retry( + wait=wait_fixed(2), + stop=stop_after_attempt(5) + ) + def wait_until_job_has_been_x(self, jobId, job_state): + job_state, _ = self.scontrol_show_job(jobId) + print(f"Found \"{job_state}\" in JobState") + assert job_state == job_state - def get_workflow_status(self, jobId): + @retry( + wait=wait_fixed(2), + stop=stop_after_attempt(5) + ) + def wait_until_job_system_comment(self, jobId, message): + _,out = self.scontrol_show_job(jobId) + m = re.search(r'\n\s+SystemComment=(.*)\n\s+StdErr=', out, re.DOTALL) + assert m is not None, f"Could not find SystemComment in job state from Slurm\n{out}" + if message in m.group(1): + print(f"Found \"{message}\" in SystemComment") + assert message in m.group(1) + + def scontrol_show_bbstat(self, jobId): rc, out = self.exec_run("scontrol show bbstat workflow " + str(jobId)) assert rc == 0, "Could not get job status from Slurm:\n" + out # This next check is because the scontrol command does not exit non-zero diff --git a/testsuite/unit/src/burst_buffer/dws-test.lua b/testsuite/unit/src/burst_buffer/dws-test.lua index 85cd6ad..2be76de 100644 --- a/testsuite/unit/src/burst_buffer/dws-test.lua +++ b/testsuite/unit/src/burst_buffer/dws-test.lua @@ -855,11 +855,13 @@ describe("Slurm API", function() end local call_bb_teardown = function(hurry) + dwsmq_enqueue(true, "") -- get_driver_errors + dwsmq_enqueue(true, "") -- kubectl_cache_home local delete_result = 'workflow.dataworkflowservices.github.io "' .. workflow_name .. '" deleted\n' local popen_count = mock_popen_calls("Teardown", "Completed") dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(true, delete_result) - popen_count = popen_count + 2 + popen_count = popen_count + 4 io.popen:clear() local ret, err = slurm_bb_job_teardown(jobID, job_script_name, hurry) From 42913d166c90dff26d9be9d2df3506b2919b7e9e Mon Sep 17 00:00:00 2001 From: Dean Roehrich Date: Fri, 22 Sep 2023 13:34:10 -0500 Subject: [PATCH 2/2] Reorder some steps to better handle the teardown Signed-off-by: Dean Roehrich --- .../integration/src/features/test_dws_states.feature | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/testsuite/integration/src/features/test_dws_states.feature b/testsuite/integration/src/features/test_dws_states.feature index 99ca4f4..4510826 100644 --- a/testsuite/integration/src/features/test_dws_states.feature +++ b/testsuite/integration/src/features/test_dws_states.feature @@ -67,8 +67,8 @@ Feature: Data Workflow Services State Progression Then the job's system comment eventually contains the following: TEST FATAL ERROR And the Workflow and job progress to the Teardown state - And the job has eventually been CANCELLED And the Workflow has eventually been deleted + And the job has eventually been CANCELLED Examples: # *** HEADER *** @@ -99,8 +99,8 @@ Feature: Data Workflow Services State Progression Then the job's system comment eventually contains the following: TEST FATAL ERROR And the Workflow and job progress to the Teardown state - And the job has eventually been COMPLETED And the Workflow has eventually been deleted + And the job has eventually been COMPLETED Examples: # *** HEADER *** @@ -120,5 +120,5 @@ Feature: Data Workflow Services State Progression When the job is run And some Workflow has been created for the job And the Workflow reports fatal errors at the Teardown state - Then the job has eventually been COMPLETED - And the Workflow has eventually been deleted + Then the Workflow has eventually been deleted + And the job has eventually been COMPLETED