diff --git a/plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy b/plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy index d3394cd..ebf2bde 100644 --- a/plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy +++ b/plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy @@ -135,7 +135,7 @@ class FloatConf { } private static def warnDeprecated(String deprecated, String replacement) { - log.warn "[flaot] config option `$deprecated` " + + log.warn "[float] config option `$deprecated` " + "is no longer supported, " + "use `$replacement` instead" } diff --git a/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy b/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy index 0a5c4b2..065675c 100644 --- a/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy +++ b/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy @@ -27,7 +27,6 @@ import nextflow.util.ServiceName import java.nio.file.Path import java.nio.file.StandardCopyOption -import java.util.concurrent.atomic.AtomicInteger import java.util.stream.Collectors /** @@ -202,15 +201,15 @@ class FloatGridExecutor extends AbstractGridExecutor { return ret } - private Map getEnv(FloatTaskHandler handler) { + private Map getEnv(FloatTaskHandler handler) { return isFusionEnabled() - ? handler.fusionLauncher().fusionEnv() - : [:] + ? handler.fusionLauncher().fusionEnv() + : [:] } @Override List getSubmitCommandLine(TaskRun task, Path scriptFile) { - null + return getSubmitCommandLine(new FloatTaskHandler(task, this), scriptFile) } List getSubmitCommandLine(FloatTaskHandler handler, Path scriptFile) { @@ -227,17 +226,18 @@ class FloatGridExecutor extends AbstractGridExecutor { "with `process.container`") } def cmd = getSubmitCmdPrefix(task.index) - cmd << 'sbatch' - for (def vol : getMountVols(task)) { - cmd << '--dataVolume' << vol - } + cmd << 'submit' + getMountVols(task).forEach { cmd << '--dataVolume' << it } cmd << '--image' << task.getContainer() cmd << '--cpu' << task.config.getCpus().toString() cmd << '--mem' << getMemory(task) cmd << '--job' << getScriptFilePath(handler, scriptFile) - def env = getEnv(handler) - if (env) { - cmd << '--env' << env.collect(e -> "${e.key}=${e.value}").join(',') + getEnv(handler).each { key, val -> + cmd << '--env' << "${key}=${val}".toString() + } + if (isFusionEnabled()) { + cmd << '--extraContainerOpts' + cmd << '--privileged' } cmd << '--customTag' << tag cmd.addAll(getExtra(task)) @@ -258,7 +258,7 @@ class FloatGridExecutor extends AbstractGridExecutor { return scriptFile.toString() } - protected String saveFusionScriptFile(FloatTaskHandler handler, Path scriptFile) { + protected static String saveFusionScriptFile(FloatTaskHandler handler, Path scriptFile) { final localTmp = File.createTempFile("nextflow", scriptFile.name) log.info("save fusion launcher script") localTmp.text = '#!/bin/bash\n' + handler.fusionSubmitCli().join(' ') + '\n' @@ -276,7 +276,7 @@ class FloatGridExecutor extends AbstractGridExecutor { } /** - * Parse the string returned by the {@code float sbatch} and extract + * Parse the string returned by the {@code float submit} and extract * the job ID string * * @param text The string returned when submitting the job @@ -331,9 +331,10 @@ class FloatGridExecutor extends AbstractGridExecutor { jobIds.forEach { def id = it.toString() def cmd = getCmdPrefixForJob(id) - cmd << 'scancel' + cmd << 'cancel' cmd << '-j' cmd << id + cmd << '-f' log.info "[float] cancel job: ${toLogStr(cmd)}" ret.add(cmd) } @@ -349,7 +350,7 @@ class FloatGridExecutor extends AbstractGridExecutor { @Override protected List getKillCommand() { def cmd = getCmdPrefix0() - cmd << 'scancel' + cmd << 'cancel' cmd << '-j' log.info "[float] cancel job: ${toLogStr(cmd)}" return cmd @@ -386,7 +387,7 @@ class FloatGridExecutor extends AbstractGridExecutor { private List getQueueCmdOfOC(String oc = "") { def cmd = floatConf.getCliPrefix(oc) - cmd << 'squeue' + cmd << 'list' cmd << '--format' cmd << 'json' log.info "[float] query job status: ${toLogStr(cmd)}" @@ -404,6 +405,7 @@ class FloatGridExecutor extends AbstractGridExecutor { QueueStatus status = STATUS_MAP.getOrDefault(value, QueueStatus.UNKNOWN) ret[key] = status } + log.info "[float] got job status $ret" return ret } diff --git a/plugins/nf-float/src/main/com/memverge/nextflow/FloatJobs.groovy b/plugins/nf-float/src/main/com/memverge/nextflow/FloatJobs.groovy index f0129f9..70b5a91 100644 --- a/plugins/nf-float/src/main/com/memverge/nextflow/FloatJobs.groovy +++ b/plugins/nf-float/src/main/com/memverge/nextflow/FloatJobs.groovy @@ -88,29 +88,28 @@ class FloatJobs { def currentSt = job2status.get(jobID, Unknown) def workDir = task2workDir.get(taskID) - if (!workDir) { - return - } - // check the availability of result files - // call list files to update the folder cache - FileHelper.listDirectory(workDir) - def files = ['.command.out', '.command.err', '.exitcode'] - if (currentSt != Completed && st == Completed) { - for (filename in files) { - def name = workDir.resolve(filename) - try { - !FileHelper.checkIfExists(name, [checkIfExists: true]) - } catch (NoSuchFileException ex) { - log.info("[float] job $jobID completed " + - "but file not found: $ex") - return + if (workDir) { + // check the availability of result files + // call list files to update the folder cache + FileHelper.listDirectory(workDir) + def files = ['.command.out', '.command.err', '.exitcode'] + if (currentSt != Completed && st == Completed) { + for (filename in files) { + def name = workDir.resolve(filename) + try { + !FileHelper.checkIfExists(name, [checkIfExists: true]) + } catch (NoSuchFileException ex) { + log.info("[float] job $jobID completed " + + "but file not found: $ex") + return + } } + log.debug("[float] found $files in: $workDir") } - log.debug("[float] found $files in: $workDir") } job2status.put(jobID, st) } - log.debug("[float] update op-center $oc job status: $job2status") + log.debug "[float] update op-center $oc job status: $job2status" return job2status } diff --git a/plugins/nf-float/src/test/com/memverge/nextflow/FloatBaseTest.groovy b/plugins/nf-float/src/test/com/memverge/nextflow/FloatBaseTest.groovy index ed460e3..0dbe6a8 100644 --- a/plugins/nf-float/src/test/com/memverge/nextflow/FloatBaseTest.groovy +++ b/plugins/nf-float/src/test/com/memverge/nextflow/FloatBaseTest.groovy @@ -24,6 +24,7 @@ import spock.lang.Specification import java.nio.file.Path import java.nio.file.Paths +import java.util.concurrent.atomic.AtomicInteger class BaseTest extends Specification { def setEnv(String key, String value) { @@ -52,6 +53,7 @@ class FloatBaseTest extends BaseTest { def script = '/path/job.sh' def workDir = '/mnt/nfs/shared' def taskID = new TaskId(55) + private AtomicInteger taskSerial = new AtomicInteger() class FloatTestExecutor extends FloatGridExecutor { @Override @@ -90,6 +92,8 @@ class FloatBaseTest extends BaseTest { task.processor.getExecutor() >> exec task.config = conf task.id = taskID + task.index = taskSerial.incrementAndGet() + task.workDir = Paths.get(workDir) return task } @@ -101,7 +105,7 @@ class FloatBaseTest extends BaseTest { return ['float', '-a', param.addr ?: addr, '-u', user, '-p', pass, - 'sbatch', + 'submit', '--dataVolume', param.nfs ?: nfs + ':' + workDir, '--image', param.image ?: image, '--cpu', param.cpu ?: cpu.toString(), diff --git a/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorMultiOCTest.groovy b/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorMultiOCTest.groovy index dee52cf..c9a3a1d 100644 --- a/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorMultiOCTest.groovy +++ b/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorMultiOCTest.groovy @@ -35,11 +35,12 @@ class FloatGridExecutorMultiOCTest extends FloatBaseTest { def "submit job with round robin"() { given: def exec = newTestExecutor() - def task = newTask(exec) when: - def cmd1 = exec.getSubmitCommandLine(task, Paths.get(script)) - def cmd2 = exec.getSubmitCommandLine(task, Paths.get(script)) + def cmd1 = exec.getSubmitCommandLine( + newTask(exec), Paths.get(script)) + def cmd2 = exec.getSubmitCommandLine( + newTask(exec), Paths.get(script)) def expected1 = submitCmd(addr: "fb") def expected2 = submitCmd(addr: "fa") @@ -59,9 +60,9 @@ class FloatGridExecutorMultiOCTest extends FloatBaseTest { then: cmd1.join(' ') == "float -a fa -u ${user} -p ${pass} " + - "squeue --format json" + "list --format json" cmd2.join(' ') == "float -a fb -u ${user} -p ${pass} " + - "squeue --format json" + "list --format json" } def "input multiple addresses as list"() { @@ -70,10 +71,12 @@ class FloatGridExecutorMultiOCTest extends FloatBaseTest { def task = newTask(exec) when: - def cmd1 = exec.getSubmitCommandLine(task, Paths.get(script)) - def cmd2 = exec.getSubmitCommandLine(task, Paths.get(script)) - def expected1 = submitCmd(addr: "fb") - def expected2 = submitCmd(addr: "fa") + def cmd1 = exec.getSubmitCommandLine( + newTask(exec), Paths.get(script)) + def cmd2 = exec.getSubmitCommandLine( + newTask(exec), Paths.get(script)) + def expected1 = submitCmd(addr: "fa") + def expected2 = submitCmd(addr: "fb") then: cmd1.join(' ') == expected1.join(' ') diff --git a/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy b/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy index 859cccd..44f35a7 100644 --- a/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy +++ b/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy @@ -60,7 +60,7 @@ class FloatGridExecutorTest extends FloatBaseTest { cmd.join(' ') == ['float', '-a', addr, '-u', user, '-p', pass, - 'scancel', '-j', jobID].join(' ') + 'cancel', '-j', jobID].join(' ') } def "kill commands"() { @@ -73,7 +73,7 @@ class FloatGridExecutorTest extends FloatBaseTest { return ['float', '-a', addr, '-u', user, '-p', pass, - 'scancel', '-j', jobID].join(' ') + 'cancel', '-j', jobID, '-f'].join(' ') } then: @@ -354,7 +354,7 @@ class FloatGridExecutorTest extends FloatBaseTest { cmd == ['float', '-a', addr, '-u', user, '-p', pass, - 'squeue', '--format', 'json'] + 'list', '--format', 'json'] } def "retrieve the credentials from env"() {