Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle DWS fatal errors #44

Merged
merged 2 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/burst_buffer/burst_buffer.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,9 @@
# See https://slurm.schedmd.com/burst_buffer.conf.html
Directive=DW

# If set, then teardown a burst buffer after file staging error. Otherwise
# preserve the burst buffer for analysis and manual teardown.
# See https://slurm.schedmd.com/burst_buffer.conf.html
# and https://slurm.schedmd.com/burst_buffer.html#states
Flags=TeardownFailure

62 changes: 54 additions & 8 deletions src/burst_buffer/burst_buffer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,16 @@ end

-- DWS:get_driver_errors will collect driver errors from the Workflow resource
-- with respect to the given state.
function DWS:get_driver_errors(state)
local error_list = {}
local jsonpath = [[{range .status.drivers[?(@.watchState=="]].. state ..[[")]}==={"\n"}{@.status}{"\n"}{@.driverID}{"\n"}{@.error}{"\n"}{end}]]
-- If all_errors=true then collect all errors from all states in all drivers.
-- On success this returns true and a string with all of the errors.
-- On failure this returns false, an empty string for the errors, and a string
-- explaining why it couldn't collect the errors.
function DWS:get_driver_errors(state, all_errors)
local driver_index = [[?(@.watchState=="]].. state ..[[")]]
if all_errors == true then
driver_index = "*"
end
local jsonpath = [[{range .status.drivers[]] .. driver_index .. [[]}==={"\n"}{@.status}{"\n"}{@.driverID}{"\n"}{@.error}{"\n"}{end}]]
local ret, output = self:get_jsonpath(jsonpath)
if ret == false then
return ret, "", "could not get driver errors: " .. output
Expand Down Expand Up @@ -442,6 +449,18 @@ function DWS:kubectl(cmd)
return self:io_popen(kcmd)
end

-- DWS:scancel will run the Slurm scancel command and collect its output.
-- On success this returns true and the output of the command.
-- On failure this returns false and the output of the command.
function DWS:scancel(jobId, hurry)
local hurry_opt = ""
if hurry == true then
hurry_opt = "--hurry "
end
local scmd = "scancel " .. hurry_opt .. jobId
return self:io_popen(scmd)
end

-- DWS:io_popen will run the given command and collect its output.
-- On success this returns true and the output of the command.
-- On failure this returns false and the output of the command.
Expand Down Expand Up @@ -627,24 +646,51 @@ function slurm_bb_job_teardown(job_id, job_script, hurry)
hurry_flag = true
end
local workflow = DWS(make_workflow_name(job_id))
local done, err = workflow:set_workflow_state_and_wait("Teardown", hurry_flag)

local ret = slurm.SUCCESS
-- Does the workflow have a fatal error in it?
-- If so, we'll call scancel as well.
local done, state_errors, err = workflow:get_driver_errors("", true)
if done == false then
if string.find(err, [["]] .. workflow.name .. [[" not found]]) then
-- It's already gone, and that's what we wanted anyway.
return slurm.SUCCESS
else
slurm.log_error("%s: slurm_bb_job_teardown(), workflow=%s: %s", lua_script_name, workflow.name, err)
return slurm.ERROR, err
slurm.log_error("%s: slurm_bb_job_teardown(), workflow=%s: unable to check driver errors: %s", lua_script_name, workflow.name, err)
ret = slurm.ERROR
-- fall-through, let the Workflow delete happen.
end
end

done, err = workflow:set_workflow_state_and_wait("Teardown", hurry_flag)
if done == false then
slurm.log_error("%s: slurm_bb_job_teardown(), workflow=%s: %s", lua_script_name, workflow.name, err)
ret = slurm.ERROR
-- fall-through, let the Workflow delete happen.
end

done, err = workflow:delete()
if done == false then
slurm.log_error("%s: slurm_bb_job_teardown(), workflow=%s, delete: %s", lua_script_name, workflow.name, err)
return slurm.ERROR, err
ret = slurm.ERROR
-- fall-through, let any necessary scancel happen.
end

if state_errors ~= "" then
-- Now do the scancel. This will terminate this Lua script and will
-- trigger slurm to call our teardown again, but that'll be a no-op
-- when it comes back here.
slurm.log_info("%s: slurm_bb_job_teardown(), workflow=%s: executing scancel --hurry %s, found driver errors: %s", lua_script_name, workflow.name, job_id, state_errors)
_, err = workflow:scancel(job_id, true)
if err == "" then
err = "(no output)"
end
end

return slurm.SUCCESS
if ret == slurm.SUCCESS then
err = nil
end
return ret, err
end

--[[
Expand Down
65 changes: 33 additions & 32 deletions testsuite/integration/src/features/test_dws_states.feature
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Feature: Data Workflow Services State Progression
Verify that the DWS-Slurm Burst Buffer Plugin progresses through Data
Workflow Services states

@happy_one
Scenario: The DWS-BB Plugin progresses through DWS states
Given a job script:
#!/bin/bash
Expand All @@ -44,13 +45,15 @@ Feature: Data Workflow Services State Progression
And the Workflow and job progress to the PostRun state
And the Workflow and job progress to the DataOut state
And the Workflow and job progress to the Teardown state
And the job state is COMPLETED
And the job has eventually been COMPLETED

# DWS does not allow spaces in key/value pairs in directives. To skirt around this
# constraint, the dws-test-driver replaces underscores ("_") in the message value with
# spaces. This ensures that the dws-slurm-plugin can handle whitespace in error messages
# It also makes it easier to check that the error is included in scontrol output.
Scenario Outline: The DWS-BB Plugin can handle fatal driver errors before being canceled
# This scenario assumes that "Flags=TeardownFailure" is set in burst_buffer.conf.
@fatal_one
Scenario Outline: Report fatal errors from Proposal, Setup, DataIn, PreRun
Given a job script:
#!/bin/bash

Expand All @@ -59,12 +62,13 @@ Feature: Data Workflow Services State Progression
/bin/hostname

When the job is run
Then a Workflow has been created for the job
And the Workflow and job report fatal errors at the <workflowState> state
And the job is canceled
And the Workflow and job progress to the Teardown state
And the job's final system comment contains the following:
And some Workflow has been created for the job
And the Workflow reports fatal errors at the <workflowState> state
Then the job's system comment eventually contains the following:
TEST FATAL ERROR
And the Workflow and job progress to the Teardown state
And the Workflow has eventually been deleted
And the job has eventually been CANCELLED

Examples:
# *** HEADER ***
Expand All @@ -73,14 +77,15 @@ Feature: Data Workflow Services State Progression
| Proposal |
| Setup |
| DataIn |
| PostRun |
| DataOut |
| PreRun |

# With the exception of PreRun, states will need to be canceled with the
# "--hurry" flag to transition to the Teardown state. If
# "Flags=TeardownFailure" is set in burst_buffer.conf, then all states will
# transition to Teardown without needing to be canceled
Scenario Outline: The DWS-BB Plugin can handle fatal driver errors for PreRun
# DWS does not allow spaces in key/value pairs in directives. To skirt around this
# constraint, the dws-test-driver replaces underscores ("_") in the message value with
# spaces. This ensures that the dws-slurm-plugin can handle whitespace in error messages
# It also makes it easier to check that the error is included in scontrol output.
# This scenario assumes that "Flags=TeardownFailure" is set in burst_buffer.conf.
@fatal_two
Scenario Outline: Report fatal errors from PostRun and DataOut
Given a job script:
#!/bin/bash

Expand All @@ -89,35 +94,31 @@ Feature: Data Workflow Services State Progression
/bin/hostname

When the job is run
Then a Workflow has been created for the job
And the Workflow reports a fatal error in the <workflowState> state
And the Workflow and job progress to the Teardown state
# Slurm moved it from PreRun/Error to Teardown without canceling
# the job. So the driver (this test) must cancel it.
And the job is canceled
And the job's final system comment contains the following:
And some Workflow has been created for the job
And the Workflow reports fatal errors at the <workflowState> state
Then the job's system comment eventually contains the following:
TEST FATAL ERROR
And the Workflow and job progress to the Teardown state
And the Workflow has eventually been deleted
And the job has eventually been COMPLETED

Examples:
# *** HEADER ***
| workflowState |
# *** VALUES ***
| PreRun |
| PostRun |
| DataOut |

Scenario: The DWS-BB Plugin can handle fatal driver errors during Teardown
@fatal_three
Scenario: Report fatal errors from Teardown
Given a job script:
#!/bin/bash

#DW Teardown action=error message=TEST_FATAL_ERROR severity=Fatal
/bin/hostname

When the job is run
Then a Workflow has been created for the job
And the Workflow reports a fatal error in the Teardown state
And the job's intermediate system comment contains the following:
TEST FATAL ERROR
# Eventually the driver (this test) must work through the Teardown
# issues and complete that step. Slurm has already marked the job
# as completed and is now looping over slurm_bb_job_teardown() in
# burst_buffer.lua.
And the Workflow error is cleared from the Teardown state
And some Workflow has been created for the job
And the Workflow reports fatal errors at the Teardown state
Then the Workflow has eventually been deleted
And the job has eventually been COMPLETED
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ Feature: Integration test environment
srun -l /bin/hostname
srun -l /bin/pwd
When the job is run
Then the job state is COMPLETED
Then the job has eventually been COMPLETED

Scenario: Kubernetes and slurm are connected
Given the kubernetes cluster kube-system UID
Expand Down
5 changes: 4 additions & 1 deletion testsuite/integration/src/pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ bdd_features_base_dir = features
markers =
environment
dws_states

happy_one
fatal_one
fatal_two
fatal_three
18 changes: 4 additions & 14 deletions testsuite/integration/src/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,7 @@ def _(slurmctld, script_path):
# remove the slurm output from the jobs folder
slurmctld.remove_job_output(jobId, outputFilePath, errorFilePath)

@then(parsers.parse('the job state is {expectedJobState}'))
def _(slurmctld, jobId, expectedJobState):
"""the job state is <expectedJobState>"""
jobState, out = slurmctld.get_final_job_state(jobId)

if expectedJobState == "COMPLETED" and jobState == "FAILED":
warnings.warn(ResourceWarning((f"Job {jobId} failed unexpectedly.\n") + \
"This may happen if Slurm doesn't have enough resources to schedule the job.\n" + \
"This is not considered a test failure, in this context, since DWS isn't\n" + \
"dependent on the job's failure or success."
))
return

assert jobState == expectedJobState, "Unexpected Job State: " + jobState + "\n" + out
@then(parsers.parse('the job has eventually been {job_state:l}'))
def _(slurmctld, jobId, job_state):
"""the job has eventually been <job_state>"""
slurmctld.wait_until_job_has_been_x(jobId, job_state)
86 changes: 20 additions & 66 deletions testsuite/integration/src/tests/dws_bb_plugin/test_dws_states.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,19 @@ def _(k8s, jobId):
except k8sclient.exceptions.ApiException:
pass

@when(parsers.parse('the Workflow status becomes {status:l}'))
def _(slurmctld, jobId, status):
"""the Workflow status becomes <status>"""
workflowStatus = slurmctld.get_workflow_status(jobId)
assert workflowStatus["status"] == status
@when('some Workflow has been created for the job')
def _(k8s, jobId):
"""some Workflow has been created for the job."""
workflow = Workflow(k8s, jobId)
assert workflow.data is not None, "Workflow for Job: " + str(jobId) + " not found"

@then('the job is canceled')
@then('the Workflow has eventually been deleted')
def _(slurmctld, jobId):
"""the job is canceled"""
time.sleep(2) # Sleep long enough for bb plugin to poll workflow once or twice
slurmctld.cancel_job(jobId, False)
time.sleep(2) # Sleep long enough for the workflow to be deleted
"""the Workflow has eventually been deleted"""
slurmctld.wait_until_workflow_is_gone(jobId)

def verify_job_status(slurmctld, jobId, state, status):
jobStatus = slurmctld.get_workflow_status(jobId)
def verify_job_bbstat(slurmctld, jobId, state, status):
jobStatus = slurmctld.scontrol_show_bbstat(jobId)
assert jobStatus["desiredState"] == state, "Incorrect desired state: " + str(jobStatus)
assert jobStatus["currentState"] == state, "Incorrect current state: " + str(jobStatus)
assert jobStatus["status"] == status, "Incorrect status: " + str(jobStatus)
Expand All @@ -73,18 +71,18 @@ def _(k8s, slurmctld, jobId, state):

workflow = Workflow(k8s, jobId)
workflow.wait_until(
f"the workflow transitions to {state}/{expectedStatus}",
f"the workflow transitioned to {state}/{expectedStatus}",
lambda wf: wf.data["status"]["state"] == state and wf.data["status"]["status"] == expectedStatus
)
print("job %s progressed to state %s" % (str(jobId),state))
print(f"job {jobId} progressed to state {state}")

verify_job_status(slurmctld, jobId, state, expectedStatus)
verify_job_bbstat(slurmctld, jobId, state, expectedStatus)

# Set driver status to completed so the workflow can progress to the next state
foundPendingDriverStatus = False
for driverStatus in workflow.data["status"]["drivers"]:
if driverStatus["driverID"] == "tester" and driverStatus["watchState"] == state and driverStatus["status"] == "Pending":
print("updating workflow %s to complete state %s" % (str(jobId), state))
print(f"updating workflow {jobId} to complete state {state}")
driverStatus["completed"] = True
driverStatus["status"] = "Completed"
foundPendingDriverStatus = True
Expand All @@ -93,27 +91,6 @@ def _(k8s, slurmctld, jobId, state):
assert foundPendingDriverStatus, "Driver not found with \"Pending\" status"
workflow.save_driver_statuses()

@then(parsers.parse('the Workflow error is cleared from the {state:l} state'))
def _(k8s, slurmctld, jobId, state):
"""the Workflow error is cleared from the <state> state."""

workflow = Workflow(k8s, jobId)

# Set driver status to completed so the workflow can progress to the next state
foundPendingDriverStatus = False
for driverStatus in workflow.data["status"]["drivers"]:
if driverStatus["driverID"] == "tester" and driverStatus["watchState"] == state and driverStatus["status"] == "Error":
print(f"updating workflow %s to complete state %s" % (str(jobId), state))
driverStatus["completed"] = True
driverStatus["status"] = "Completed"
# The DWS webhook requires that the error message be cleared as well.
del driverStatus["error"]
foundPendingDriverStatus = True
break

assert foundPendingDriverStatus, "Driver not found with \"Error\" status"
workflow.save_driver_statuses()

def driver_state_check(workflow, state, expected_status):
found_it = False
print(f"check drivers for state {state} with status {expected_status}")
Expand All @@ -127,26 +104,9 @@ def driver_state_check(workflow, state, expected_status):
break
return found_it

@then(parsers.parse('the Workflow and job report fatal errors at the {state:l} state'))
def _(k8s, slurmctld, jobId, state):
"""the Workflow and job report errors at the <state> state."""

expected_status = "Error"

def driver_check(workflow):
return driver_state_check(workflow, state, expected_status)

workflow = Workflow(k8s, jobId)
workflow.wait_until(
f"the workflow {state} state shows a status of {expected_status}",
lambda wf: driver_check(wf) is True
)

verify_job_status(slurmctld, jobId, state, expected_status)

@then(parsers.parse('the Workflow reports a fatal error in the {state:l} state'))
@when(parsers.parse('the Workflow reports fatal errors at the {state:l} state'))
def _(k8s, slurmctld, jobId, state):
"""the Workflow reports a fatal error in the <state> state."""
"""the Workflow reports fatal errors at the <state> state."""

expected_status = "Error"

Expand All @@ -159,13 +119,7 @@ def driver_check(workflow):
lambda wf: driver_check(wf) is True
)

@then(parsers.parse("the job's {disposition:l} system comment contains the following:\n{message}"))
def _(slurmctld, jobId, disposition, message):
assert disposition in ["final", "intermediate"], f"unknown disposition: {disposition}"
must_be_gone = True if disposition == "final" else False
_,out = slurmctld.get_final_job_state(jobId, must_be_gone)
m = re.search(r'\n\s+SystemComment=(.*)\n\s+StdErr=', out, re.DOTALL)
assert m is not None, f"Could not find SystemComment in job state from Slurm\n{out}"
if message in m.group(1):
print(f"Found \"{message}\" in SystemComment")
assert message in m.group(1)
@then(parsers.parse("the job's system comment eventually contains the following:\n{message}"))
def _(slurmctld, jobId, message):
print(f"looking for system comment with: {message}")
slurmctld.wait_until_job_system_comment(jobId, message)
Loading
Loading