diff --git a/src/burst_buffer/burst_buffer.lua b/src/burst_buffer/burst_buffer.lua index f7d7a16..b5acfce 100644 --- a/src/burst_buffer/burst_buffer.lua +++ b/src/burst_buffer/burst_buffer.lua @@ -28,6 +28,8 @@ WLMID_PLACEHOLDER = "slurm" -- The fully-qualified name of the DWS Workflow CRD. local WORKFLOW_CRD = "workflows.dws.cray.hpe.com" +KUBECTL_CACHE_DIR = "/tmp/burst_buffer_kubectl_cache" + lua_script_name="burst_buffer.lua" math.randomseed(os.time()) @@ -237,12 +239,13 @@ function DWS:get_hurry() end -- DWS:wait_for_status_complete will loop until the workflow reports that --- its status is completed. +-- its status is completed. If the max_passes == -1 then this will loop +-- without limit until the state is complete or an error is encountered. -- On success this returns true and a table containing the status. -- On failure this returns false, an empty table, and an error message. function DWS:wait_for_status_complete(max_passes) local empty = {} - while max_passes > 0 do + while max_passes > 0 or max_passes == -1 do local done, status, err = self:get_current_state() if done == false then return false, empty, err @@ -255,7 +258,9 @@ function DWS:wait_for_status_complete(max_passes) return false, empty, string.format("Error in Workflow %s", self.name) end os.execute("sleep 1") - max_passes = max_passes - 1 + if max_passes > 0 then + max_passes = max_passes - 1 + end end return false, empty, "exceeded max wait time" end @@ -288,7 +293,7 @@ function DWS:set_workflow_state_and_wait(new_state, hurry) return done, "set_desired_state: " .. err end - local done, status, err = self:wait_for_status_complete(60) + local done, status, err = self:wait_for_status_complete(-1) if done == false then return done, "wait_for_status_complete: " .. err end @@ -296,6 +301,38 @@ function DWS:set_workflow_state_and_wait(new_state, hurry) return true end +-- DWS:kubectl_cache_home will determine where the kubectl discovery cache +-- and http cache may be located. +-- If the user's HOME dir exists then kubectl expects to use that and this does +-- nothing and returns true and an empty string. +-- Otherwise, this will create a dir in /tmp and will return true and a string +-- that defines the HOME variable with the new dir. +-- If there is an error creating the dir this will return false and a string +-- containing an error message. +function DWS:kubectl_cache_home() + + local dir_exists = function(dname) + local cmd = "test -d " .. dname + local done, _ = self:io_popen(cmd) + if done == false then + end + return done + end + + if dir_exists(os.getenv("HOME")) == false then + if dir_exists(KUBECTL_CACHE_DIR) == false then + local cmd = "mkdir " .. KUBECTL_CACHE_DIR + local done, result = self:io_popen(cmd) + if done == false then + return false, "Unable to create " .. KUBECTL_CACHE_DIR .. " cache dir for kubectl: " .. result + end + end + return true, "HOME=" .. KUBECTL_CACHE_DIR + end + + return true, "" +end + -- DWS:token will find a ServiceAccount token and return a --token argument -- for the kubectl command. If we're not inside the slurm pod, maybe in a test -- env, then this will return an empty string. @@ -314,8 +351,19 @@ end -- On success this returns true and the output of the command. -- On failure this returns false and the output of the command. function DWS:kubectl(cmd) - local kcmd = "kubectl " .. self:token() .. " " .. cmd - local handle = io.popen(kcmd) + local done, homedir_msg = self:kubectl_cache_home() + if done ~= true then + return false, homedir_msg + end + local kcmd = homedir_msg .. " kubectl " .. self:token() .. " " .. cmd + return self:io_popen(kcmd) +end + +-- DWS:io_popen will run the given command and collect its output. +-- On success this returns true and the output of the command. +-- On failure this returns false and the output of the command. +function DWS:io_popen(cmd) + local handle = io.popen(cmd) if handle == nil then -- The io.popen was stubbed by a test. Use the provided -- return values. @@ -534,7 +582,7 @@ function slurm_bb_setup(job_id, uid, gid, pool, bb_size, job_script) -- Wait for proposal state to complete, or pick up any error that may -- be waiting in the Workflow. - local done, status, err = workflow:wait_for_status_complete(60) + local done, status, err = workflow:wait_for_status_complete(-1) if done == false then slurm.log_error("%s: slurm_bb_setup(), workflow=%s, waiting for Proposal state to complete: %s", lua_script_name, workflow_name, err) return slurm.ERROR, err @@ -546,7 +594,7 @@ function slurm_bb_setup(job_id, uid, gid, pool, bb_size, job_script) return done, err end - done, status, err = workflow:wait_for_status_complete(60) + done, status, err = workflow:wait_for_status_complete(-1) if done == err then slurm.log_error("%s: slurm_bb_setup(), workflow=%s, waiting for Setup state to complete: %s", lua_script_name, workflow_name, err) return done, err diff --git a/testsuite/unit/src/burst_buffer/dws-test.lua b/testsuite/unit/src/burst_buffer/dws-test.lua index ee8e6e4..e88fcde 100644 --- a/testsuite/unit/src/burst_buffer/dws-test.lua +++ b/testsuite/unit/src/burst_buffer/dws-test.lua @@ -90,6 +90,83 @@ local function write_job_script(job_script_name, job_text) file:close() end +describe("The dws library kubectl cache", function() + + local iopopen_spy + local workflow_name + local workflow + + before_each(function() + iopopen_spy = spy.on(io, "popen") + + workflow_name = "check" .. math.random(1000) + workflow = DWS(workflow_name) + end) + + after_each(function() + io.popen:revert() + end) + + it("verifies the real HOME dir", function() + local done, result = workflow:kubectl_cache_home() + assert.is_true(done, result) + assert.spy(io.popen).was.called(1) + end) + + context("swap os.getenv", function() + + local created_dir + local osgetenv_spy + local orig_os_getenv + local home_no_home + + before_each(function() + home_no_home = "/DOES/NOT/EXIST" + created_dir = false + + orig_os_getenv = os.getenv + + os.getenv = function(variable) + -- When the user asks for $HOME, we'll return + -- a path that doesn't exist. + return home_no_home + end + + osgetenv_spy = spy.on(os, "getenv") + end) + + after_each(function() + os.getenv:revert() + os.getenv = orig_os_getenv + end) + + after_each(function() + if created_dir then + os.remove(KUBECTL_CACHE_DIR) + end + end) + + it("confirms swap of os.getenv", function() + local val = os.getenv("HOME") + assert.spy(os.getenv).was.called(1) + assert.equals(val, home_no_home) + end) + + it("uses cache dir in /tmp when HOME dir does not exist", function() + local done, result = workflow:kubectl_cache_home() + created_dir = done + assert.is_true(done, result) + assert.spy(io.popen).was.called(3) + + -- Confirm that it re-uses an existing /tmp cache dir. + io.popen:clear() + local done, result = workflow:kubectl_cache_home() + assert.is_true(done, result) + assert.spy(io.popen).was.called(2) + end) + end) +end) + describe("The dws library initializer", function() local workflow @@ -185,7 +262,7 @@ describe("The dws library", function() end) after_each(function() - if resource_exists then + if resource_exists and IS_REAL_K8S then -- If resource_exists is still true here then -- we already have an error condition. Try -- to clean it up but don't bother checking for @@ -203,18 +280,20 @@ describe("The dws library", function() it("can apply and delete a workflow resource", function() local result_wanted = "workflow.dws.cray.hpe.com/" .. workflow_name .. " created\n" + dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(true, result_wanted) local done, err = workflow:apply(yaml_name) resource_exists = done assert.is_true(done, err) if IS_NOT_K8S then - assert.stub(io.popen).was_called() + assert.stub(io.popen).was_called(2) io.popen:clear() end assert.is_equal(err, result_wanted) result_wanted = 'workflow.dws.cray.hpe.com "' .. workflow_name .. '" deleted\n' + dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(true, result_wanted) if IS_NOT_K8S then io.popen:clear() @@ -222,7 +301,7 @@ describe("The dws library", function() done, err = workflow:delete() resource_exists = done if IS_NOT_K8S then - assert.stub(io.popen).was_called() + assert.stub(io.popen).was_called(2) end assert.is_true(done, err) assert.is_equal(err, result_wanted) @@ -264,11 +343,12 @@ describe("The dws library", function() before_each(function() local result_wanted = "workflow.dws.cray.hpe.com/" .. workflow_name .. " created\n" + dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(true, result_wanted) local done, err = workflow:apply(yaml_name) resource_exists = done if IS_NOT_K8S then - assert.stub(io.popen).was_called() + assert.stub(io.popen).was_called(2) io.popen:clear() end assert.is_true(done, err) @@ -280,6 +360,7 @@ describe("The dws library", function() local result_wanted = 'workflow.dws.cray.hpe.com "' .. workflow_name .. '" deleted\n' dwsmq_reset() + dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(true, result_wanted) if IS_NOT_K8S then io.popen:clear() @@ -287,11 +368,11 @@ describe("The dws library", function() local done, err = workflow:delete() resource_exists = done if IS_NOT_K8S then - assert.stub(io.popen).was_called() + assert.stub(io.popen).was_called(2) end assert.is_true(done, err) assert.is_equal(err, result_wanted) - elseif resource_exists then + elseif resource_exists and IS_REAL_K8S then -- We didn't expect to create a resource, but -- we got one. So we're already in an error -- condition. Just try to clean up the mess. @@ -321,11 +402,13 @@ describe("The dws library", function() ret_wanted = false end + dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(ret_wanted, result_wanted) local done, err = workflow:set_desired_state(new_state, hurry) if IS_NOT_K8S then - assert.stub(io.popen).was_called() + assert.stub(io.popen).was_called(2) + io.popen:clear() end assert.is_equal(done, ret_wanted) assert.is_equal(err, result_wanted) @@ -335,10 +418,12 @@ describe("The dws library", function() local wait_for_state = function(state) local result_wanted = "desiredState=" .. state .. "\ncurrentState=" .. state .. "\nstatus=Completed\n" + dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(true, result_wanted) local done, status, err = workflow:wait_for_status_complete(60) if IS_NOT_K8S then - assert.stub(io.popen).was_called() + assert.stub(io.popen).was_called(2) + io.popen:clear() end assert.is_true(done, err) assert.is_equal(status["desiredState"], state) @@ -353,10 +438,12 @@ describe("The dws library", function() result_wanted = "true" end + dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(true, result_wanted) local done, hurry = workflow:get_hurry() if IS_NOT_K8S then - assert.stub(io.popen).was_called() + assert.stub(io.popen).was_called(2) + io.popen:clear() end assert.is_true(done, hurry) assert.is_equal(desired_hurry, hurry) @@ -421,7 +508,9 @@ describe("The dws library", function() local set_result_wanted = "workflow.dws.cray.hpe.com/" .. workflow_name .. " patched\n" local wait_result_wanted = "desiredState=" .. new_state .. "\ncurrentState=" .. new_state .. "\nstatus=Completed\n" + dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(true, set_result_wanted) + dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(true, wait_result_wanted) expect_exists = true @@ -430,7 +519,8 @@ describe("The dws library", function() end local done, err = workflow:set_workflow_state_and_wait(new_state) if IS_NOT_K8S then - assert.stub(io.popen).was_called(2) + assert.stub(io.popen).was_called(4) + io.popen:clear() end assert.is_true(done, err) end) @@ -490,18 +580,19 @@ describe("The dws library", function() end) after_each(function() + dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(false, result_wanted) local done, err = workflow:apply(yaml_name) resource_exists = done assert.is_not_true(done) if IS_NOT_K8S then - assert.stub(io.popen).was_called() + assert.stub(io.popen).was_called(2) end assert.is_true(string.find(err, result_wanted) ~= nil, err) end) after_each(function() - if resource_exists then + if resource_exists and IS_REAL_K8S then -- If resource_exists is still true here then -- we already have an error condition. Try -- to clean it up but don't bother checking for @@ -627,6 +718,7 @@ describe("Burst buffer helpers", function() local result_wanted = 'workflow.dws.cray.hpe.com "' .. workflow_name .. '" deleted\n' dwsmq_reset() + dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(true, result_wanted) if IS_NOT_K8S then io.popen:clear() @@ -634,12 +726,12 @@ describe("Burst buffer helpers", function() local done, err = workflow:delete() resource_exists = done if IS_NOT_K8S then - assert.stub(io.popen).was_called() + assert.stub(io.popen).was_called(2) end assert.is_true(done, err) assert.is_equal(err, result_wanted) - elseif resource_exists then + elseif resource_exists and IS_REAL_K8S then -- We didn't expect to create a resource, but -- we got one. So we're already in an error -- condition. Just try to clean up the mess. @@ -660,13 +752,14 @@ describe("Burst buffer helpers", function() local result_wanted = "workflow.dws.cray.hpe.com/" .. workflow_name .. " created\n" + dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(true, result_wanted) local done, err = make_workflow(workflow, job_script_name, jobID, userID, groupID) resource_exists = done expect_exists = true if IS_NOT_K8S then - assert.stub(io.popen).was_called() + assert.stub(io.popen).was_called(2) end if err ~= nil then print(err) @@ -689,13 +782,14 @@ describe("Burst buffer helpers", function() -- We'll look for only a small piece of the error -- message here. local result_wanted = "unable to find ruleset" + dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(false, result_wanted) local done, err = make_workflow(workflow, job_script_name, jobID, userID, groupID) resource_exists = done expect_exists = false if IS_NOT_K8S then - assert.stub(io.popen).was_called() + assert.stub(io.popen).was_called(2) end print("Expect an error message here: " .. err) assert.is_not_true(done, err) @@ -755,6 +849,7 @@ describe("Slurm API", function() local result_wanted = 'workflow.dws.cray.hpe.com "' .. workflow_name .. '" deleted\n' dwsmq_reset() + dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(true, result_wanted) if IS_NOT_K8S then io.popen:clear() @@ -762,12 +857,12 @@ describe("Slurm API", function() local done, err = workflow:delete() resource_exists = done if IS_NOT_K8S then - assert.stub(io.popen).was_called() + assert.stub(io.popen).was_called(2) end assert.is_true(done, err) assert.is_equal(err, result_wanted) - elseif resource_exists then + elseif resource_exists and IS_REAL_K8S then -- We didn't expect to create a resource, but -- we got one. So we're already in an error -- condition. Just try to clean up the mess. @@ -804,7 +899,7 @@ describe("Slurm API", function() local ret, err = slurm_bb_job_process(job_script_name) if IS_NOT_K8S then - assert.stub(io.popen).was_called(2) + assert.stub(io.popen).was_called(4) end assert.is_equal(ret, slurm.SUCCESS) assert.is_nil(err, err) @@ -822,11 +917,12 @@ describe("Slurm API", function() -- We'll look for only a small piece of the error -- message here. local result_wanted = "unable to find ruleset" + dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(false, result_wanted) local ret, err = slurm_bb_job_process(job_script_name) if IS_NOT_K8S then - assert.stub(io.popen).was_called(1) + assert.stub(io.popen).was_called(2) end print("Expect an error message here: " .. err) assert.is_equal(ret, slurm.ERROR) @@ -842,9 +938,13 @@ describe("Slurm API", function() local set_state_result_wanted = "workflow.dws.cray.hpe.com/" .. workflow_name .. " patched\n" local setup_status_complete_result_wanted = "desiredState=Setup\ncurrentState=Setup\nstatus=Completed\n" + dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(true, apply_result_wanted) + dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(true, proposal_status_complete_result_wanted) + dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(true, set_state_result_wanted) + dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(true, setup_status_complete_result_wanted) local ret, err = slurm_bb_setup(jobID, userID, groupID, "pool1", 1, job_script_name) @@ -854,7 +954,8 @@ describe("Slurm API", function() workflow = DWS(workflow_name) end if IS_NOT_K8S then - assert.stub(io.popen).was_called(4) + assert.stub(io.popen).was_called(8) + io.popen:clear() end assert.is_equal(ret, slurm.SUCCESS) assert.is_nil(err, err) @@ -865,8 +966,11 @@ describe("Slurm API", function() local teardown_status_complete_result_wanted = "desiredState=Teardown\ncurrentState=Teardown\nstatus=Completed\n" local delete_result_wanted = 'workflow.dws.cray.hpe.com "' .. workflow_name .. '" deleted\n' + dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(true, set_state_result_wanted) + dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(true, teardown_status_complete_result_wanted) + dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(true, delete_result_wanted) if IS_NOT_K8S then @@ -878,7 +982,8 @@ describe("Slurm API", function() resource_exists = false end if IS_NOT_K8S then - assert.stub(io.popen).was_called(3) + assert.stub(io.popen).was_called(6) + io.popen:clear() end assert.is_equal(ret, slurm.SUCCESS) assert.is_nil(err, err) @@ -891,7 +996,9 @@ describe("Slurm API", function() local set_state_result_wanted = "workflow.dws.cray.hpe.com/" .. workflow_name .. " patched\n" local status_complete_result_wanted = "desiredState=" .. new_state .. "\ncurrentState=" .. new_state .. "\nstatus=Completed\n" + dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(true, set_state_result_wanted) + dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(true, status_complete_result_wanted) if IS_NOT_K8S then @@ -905,11 +1012,12 @@ describe("Slurm API", function() } local ret, err = funcs[new_state](jobID, job_script_name) if IS_NOT_K8S then - assert.stub(io.popen).was_called(2) + assert.stub(io.popen).was_called(4) end assert.is_equal(ret, slurm.SUCCESS) assert.is_nil(err, err) + dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(true, status_complete_result_wanted) local bb_status_wanted = "desiredState=" .. new_state .. " currentState=" .. new_state .. " status=Completed" if IS_NOT_K8S then @@ -917,7 +1025,8 @@ describe("Slurm API", function() end local ret, msg = slurm_bb_get_status("workflow", jobID) if IS_NOT_K8S then - assert.stub(io.popen).was_called() + assert.stub(io.popen).was_called(2) + io.popen:clear() end print(msg) assert.is_equal(ret, slurm.SUCCESS) @@ -986,6 +1095,7 @@ describe("Slurm API", function() -- error condition. local call_bb_state_negative = function(new_state) local set_state_result_wanted = 'Error from server (NotFound): workflows.dws.cray.hpe.com "' .. workflow_name .. '" not found\n' + dwsmq_enqueue(true, "") -- kubectl_cache_home dwsmq_enqueue(false, set_state_result_wanted) if IS_NOT_K8S then @@ -1002,7 +1112,7 @@ describe("Slurm API", function() local ret, err = funcs[new_state][1](jobID, job_script_name) if IS_NOT_K8S then - assert.stub(io.popen).was_called(1) + assert.stub(io.popen).was_called(2) end assert.is_equal(ret, slurm.ERROR) assert.is_equal(err, "set_desired_state: " .. set_state_result_wanted)