From 7bc44f390b9cbcf11d48de09937f3646c67bd38c Mon Sep 17 00:00:00 2001 From: Cedric Zhuang Date: Sat, 2 Sep 2023 21:39:48 +0800 Subject: [PATCH 1/3] Add cpu/memory factor Add float options to tune the cpu & memory settings. Add a new tag to track the total size of the task input. --- .../com/memverge/nextflow/FloatConf.groovy | 11 +- .../nextflow/FloatGridExecutor.groovy | 29 ++++- .../memverge/nextflow/FloatBaseTest.groovy | 7 +- .../nextflow/FloatGridExecutorTest.groovy | 100 ++++++++++++++++++ 4 files changed, 139 insertions(+), 8 deletions(-) diff --git a/plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy b/plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy index 7c9f2b3..b6b9763 100644 --- a/plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy +++ b/plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy @@ -37,6 +37,7 @@ class FloatConf { static final String NF_RUN_NAME = 'nextflow-io-run-name' static final String NF_SESSION_ID = 'nextflow-io-session-id' static final String NF_TASK_NAME = 'nextflow-io-task-name' + static final String NF_INPUT_SIZE = 'nextflow-io-input-size' /** credentials for op center */ String username @@ -54,6 +55,8 @@ class FloatConf { String commonExtra float timeFactor = 1 + float cpuFactor = 1 + float memoryFactory = 1 /** * Create a FloatConf instance and initialize the content from the @@ -146,6 +149,12 @@ class FloatConf { if (floatNode.timeFactor) { this.timeFactor = floatNode.timeFactor as Float } + if (floatNode.cpuFactor) { + this.cpuFactor = floatNode.cpuFactor as Float + } + if (floatNode.memoryFactor) { + this.memoryFactory = floatNode.memoryFactor as Float + } this.commonExtra = floatNode.commonExtra if (floatNode.cpu) @@ -162,7 +171,7 @@ class FloatConf { warnDeprecated("float.container", "process.container") } - private String collapseMapToString(Map map) { + private static String collapseMapToString(Map map) { final collapsedStr = map .toConfigObject() .flatten() diff --git a/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy b/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy index cbece07..bec32c2 100644 --- a/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy +++ b/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy @@ -122,14 +122,17 @@ class FloatGridExecutor extends AbstractGridExecutor { log.info "[float] sync the float binary, $res" } - private static String getMemory(TaskRun task) { + private String getMemory(TaskRun task) { final mem = task.config.getMemory() - final giga = mem?.toGiga() + Long giga = mem?.toGiga() if (!giga) { log.debug "memory $mem is too small. " + "will use default $DFT_MEM_GB" + giga = DFT_MEM_GB } - return giga ? giga.toString() : DFT_MEM_GB + giga = (long) ((float) (giga) * floatConf.memoryFactory) + giga = Math.max(giga, DFT_MEM_GB) + return giga.toString() } private Collection getExtra(TaskRun task) { @@ -236,6 +239,14 @@ class FloatGridExecutor extends AbstractGridExecutor { return ret } + private static long getInputFileSize(TaskRun task) { + long ret = 0 + for (def src : task.getInputFilesMap().values()) { + ret += src.size() + } + return ret + } + private Map getEnv(FloatTaskHandler handler) { return isFusionEnabled() ? handler.fusionLauncher().fusionEnv() @@ -247,6 +258,7 @@ class FloatGridExecutor extends AbstractGridExecutor { result[FloatConf.NF_JOB_ID] = floatJobs.getNfJobID(task.id) result[FloatConf.NF_SESSION_ID] = "uuid-${session.uniqueId}".toString() result[FloatConf.NF_TASK_NAME] = task.name + result[FloatConf.NF_INPUT_SIZE] = getInputFileSize(task).toString() if (task.processor.name) { result[FloatConf.NF_PROCESS_NAME] = task.processor.name } @@ -274,6 +286,12 @@ class FloatGridExecutor extends AbstractGridExecutor { return value } + private Integer getCpu(TaskRun task) { + int cpu = task.config.getCpus() + int ret = (int) (((float) cpu) * floatConf.cpuFactor) + return Math.max(ret, 1) + } + List getSubmitCommandLine(FloatTaskHandler handler, Path scriptFile) { final task = handler.task @@ -289,7 +307,7 @@ class FloatGridExecutor extends AbstractGridExecutor { cmd << 'submit' getMountVols(task).forEach { cmd << '--dataVolume' << it } cmd << '--image' << task.getContainer() - cmd << '--cpu' << task.config.getCpus().toString() + cmd << '--cpu' << getCpu(task).toString() cmd << '--mem' << getMemory(task) cmd << '--job' << getScriptFilePath(handler, scriptFile) getEnv(handler).each { key, val -> @@ -322,6 +340,9 @@ class FloatGridExecutor extends AbstractGridExecutor { if (floatConf.extraOptions) { cmd << '--extraOptions' << floatConf.extraOptions } + if (task.config.getAttempt() > 1) { + cmd << '--vmPolicy' << '[onDemand=true]' + } cmd.addAll(getExtra(task)) log.info "[float] submit job: ${toLogStr(cmd)}" return cmd diff --git a/plugins/nf-float/src/test/com/memverge/nextflow/FloatBaseTest.groovy b/plugins/nf-float/src/test/com/memverge/nextflow/FloatBaseTest.groovy index be027ea..c1e25f9 100644 --- a/plugins/nf-float/src/test/com/memverge/nextflow/FloatBaseTest.groovy +++ b/plugins/nf-float/src/test/com/memverge/nextflow/FloatBaseTest.groovy @@ -110,8 +110,9 @@ class FloatBaseTest extends BaseTest { '--mem', param.memory ?: mem.toString(), '--job', script, '--customTag', jobID(taskID), - '--customTag', "nextflow-io-session-id:uuid-$uuid", - '--customTag', "nextflow-io-task-name:foo-$taskIndex", - '--customTag', 'nextflow-io-run-name:test-run'] + '--customTag', "${FloatConf.NF_SESSION_ID}:uuid-$uuid", + '--customTag', "${FloatConf.NF_TASK_NAME}:foo-$taskIndex", + '--customTag', "${FloatConf.NF_INPUT_SIZE}:0", + '--customTag', "${FloatConf.NF_RUN_NAME}:test-run"] } } diff --git a/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy b/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy index 2de823c..006852c 100644 --- a/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy +++ b/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy @@ -334,6 +334,106 @@ class FloatGridExecutorTest extends FloatBaseTest { cmd.join(' ').contains('--timeLimit 3960s') } + def "use on-demand for retry"() { + given: + final exec = newTestExecutor( + [float: [address : addr, + username : user, + password : pass, + nfs : nfs]]) + final task = newTask(exec, new TaskConfig( + container: image, + time: '1h', + attempt: 2, + )) + + when: + final cmd = exec.getSubmitCommandLine(task, Paths.get(script)) + + then: + cmd.join(' ').contains('onDemand=true') + } + + def "use cpu factor"() { + given: + final exec = newTestExecutor( + [float: [address : addr, + username : user, + password : pass, + nfs : nfs, + cpuFactor: 1.5]]) + final task = newTask(exec, new TaskConfig( + container: image, + cpus: 2, + )) + + when: + final cmd = exec.getSubmitCommandLine(task, Paths.get(script)) + + then: + cmd.join(' ').contains('--cpu 3') + } + + def "use invalid cpu"() { + given: + final exec = newTestExecutor( + [float: [address : addr, + username : user, + password : pass, + nfs : nfs, + cpuFactor: 0.2]]) + final task = newTask(exec, new TaskConfig( + container: image, + cpus: 1, + )) + + when: + final cmd = exec.getSubmitCommandLine(task, Paths.get(script)) + + then: + cmd.join(' ').contains('--cpu 1') + } + + def "use memory factor"() { + given: + final exec = newTestExecutor( + [float: [address : addr, + username : user, + password : pass, + nfs : nfs, + memoryFactor: 0.5]]) + final task = newTask(exec, new TaskConfig( + container: image, + memory: "8 GB", + )) + + when: + final cmd = exec.getSubmitCommandLine(task, Paths.get(script)) + + then: + cmd.join(' ').contains('--mem 4') + } + + def "use invalid memory"() { + given: + final exec = newTestExecutor( + [float: [address : addr, + username : user, + password : pass, + nfs : nfs, + memoryFactor: 0.5]]) + final task = newTask(exec, new TaskConfig( + container: image, + memory: 8, + )) + + when: + final cmd = exec.getSubmitCommandLine(task, Paths.get(script)) + + then: + cmd.join(' ').contains('--mem 1') + } + def "use extra options"() { given: final option = """--external 'mnt[]:sm'""" From dbf06b5cae4d6b7236d820a46f947619fb69f687 Mon Sep 17 00:00:00 2001 From: Cedric Zhuang Date: Sun, 3 Sep 2023 02:13:23 +0800 Subject: [PATCH 2/3] Do not update the job status when it's finished Always refresh the work directory when job finished is received. --- .../nextflow/FloatGridExecutor.groovy | 12 +++-- .../com/memverge/nextflow/FloatJob.groovy | 6 +-- .../com/memverge/nextflow/FloatJobs.groovy | 49 ++++++------------- .../memverge/nextflow/FloatTaskHandler.groovy | 6 ++- .../nextflow/FloatGridExecutorTest.groovy | 2 +- .../com/memverge/nextflow/FloatJobTest.groovy | 10 ++-- 6 files changed, 35 insertions(+), 50 deletions(-) diff --git a/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy b/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy index bec32c2..3a3001b 100644 --- a/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy +++ b/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy @@ -167,6 +167,7 @@ class FloatGridExecutor extends AbstractGridExecutor { if (res.succeeded) { job = FloatJob.parse(res.out) + job = floatJobs.updateJob(job) } } catch (Exception e) { log.warn "[float] failed to retrieve job status $nfJobID, float: ${job.floatJobID}", e @@ -532,15 +533,16 @@ class FloatGridExecutor extends AbstractGridExecutor { if (!job) { return FloatStatus.UNKNOWN } - log.debug "[float] task id: ${task.id}, nf-job-id: $job.nfJobID, " + + log.info "[float] task id: ${task.id}, nf-job-id: $job.nfJobID, " + "float-job-id: $job.floatJobID, float status: $job.status" - if (job.finished) { - floatJobs.refreshWorkDir(job.nfJobID) - task.exitStatus = job.rcCode - } return job.status } + Integer getJobRC(TaskId id) { + def job = getJob(id) + return job.rcCode + } + static private Map STATUS_MAP = new HashMap<>() static { diff --git a/plugins/nf-float/src/main/com/memverge/nextflow/FloatJob.groovy b/plugins/nf-float/src/main/com/memverge/nextflow/FloatJob.groovy index 8b62de6..52ddb06 100644 --- a/plugins/nf-float/src/main/com/memverge/nextflow/FloatJob.groovy +++ b/plugins/nf-float/src/main/com/memverge/nextflow/FloatJob.groovy @@ -126,8 +126,8 @@ class FloatJob { return status ? status.isFinished() : false } - static Map parseJobMap(String input) { - Map ret = new HashMap<>() + static List parseJobMap(String input) { + List ret = [] try { def parser = new JsonSlurper() def obj = parser.parseText(input) @@ -142,7 +142,7 @@ class FloatJob { job.floatJobID = floatJobID job.status = FloatStatus.of(status) job.rc = i.rc as String - ret[nfJobID] = job + ret.add(job) } } } catch (Exception e) { diff --git a/plugins/nf-float/src/main/com/memverge/nextflow/FloatJobs.groovy b/plugins/nf-float/src/main/com/memverge/nextflow/FloatJobs.groovy index e26e54f..26ecaec 100644 --- a/plugins/nf-float/src/main/com/memverge/nextflow/FloatJobs.groovy +++ b/plugins/nf-float/src/main/com/memverge/nextflow/FloatJobs.groovy @@ -15,14 +15,12 @@ */ package com.memverge.nextflow -import groovy.transform.WithReadLock -import groovy.transform.WithWriteLock + import groovy.util.logging.Slf4j import nextflow.file.FileHelper import nextflow.processor.TaskId import org.apache.commons.lang.RandomStringUtils -import java.nio.file.NoSuchFileException import java.nio.file.Path import java.util.concurrent.ConcurrentHashMap @@ -59,7 +57,6 @@ class FloatJobs { return floatJobID2oc.getOrDefault(floatJobID, ocs[0]) } - @WithReadLock Map getNfJobID2job() { return nfJobID2FloatJob } @@ -73,12 +70,18 @@ class FloatJobs { return updateOcStatus(ocs[0], text) } - FloatStatus getJobStatus(String nfJobID) { - FloatJob job = nfJobID2FloatJob.get(nfJobID) - if (job == null) { - return FloatStatus.UNKNOWN + FloatJob updateJob(FloatJob job) { + FloatJob existingJob = nfJobID2job.get(job.nfJobID) + if (existingJob != null && existingJob.finished) { + // job already finished, no need to update + job = existingJob + } else { + nfJobID2job.put(job.nfJobID, job) + } + if (job.finished) { + refreshWorkDir(job.nfJobID) } - return job.status + return job } def refreshWorkDir(String nfJobID) { @@ -89,37 +92,15 @@ class FloatJobs { } } - @WithWriteLock def updateOcStatus(String oc, String text) { - def stMap = FloatJob.parseJobMap(text) - stMap.each { nfJobID, job -> + def jobs = FloatJob.parseJobMap(text) + jobs.each {job -> if (!job.nfJobID || !job.status) { return } floatJobID2oc.put(job.floatJobID, oc) - def currentSt = getJobStatus(nfJobID) - def workDir = nfJobID2workDir.get(job.nfJobID) - if (workDir && job.finished) { - refreshWorkDir(job.nfJobID) - def files = ['.command.out', '.command.err', '.exitcode'] - if (!currentSt.finished && job.finished) { - for (filename in files) { - def name = workDir.resolve(filename) - try { - !FileHelper.checkIfExists(name, [checkIfExists: true]) - } catch (NoSuchFileException ex) { - log.info "[float] job $nfJobID completed " + - "but file not found: ${ex.message}" - job.status = currentSt - return - } - } - log.debug "[float] found $files in: $workDir" - } - } - + updateJob(job) } - nfJobID2FloatJob += stMap log.debug "[float] update op-center $oc job status" return nfJobID2FloatJob } diff --git a/plugins/nf-float/src/main/com/memverge/nextflow/FloatTaskHandler.groovy b/plugins/nf-float/src/main/com/memverge/nextflow/FloatTaskHandler.groovy index fa4e42c..c2f61cb 100644 --- a/plugins/nf-float/src/main/com/memverge/nextflow/FloatTaskHandler.groovy +++ b/plugins/nf-float/src/main/com/memverge/nextflow/FloatTaskHandler.groovy @@ -74,16 +74,18 @@ class FloatTaskHandler extends GridTaskHandler { final FloatStatus st = floatExecutor.getJobStatus(task) if (st.finished) { status = COMPLETED + task.exitStatus = readExitStatus() if (task.exitStatus == null) { - task.exitStatus = readExitStatus() + task.exitStatus = floatExecutor.getJobRC(task.id) } - // both exit status and job rc code are empty if (task.exitStatus == null) { if (st.isError()) { task.exitStatus = 1 } else { task.exitStatus = 0 } + log.info "both .exitcode and rc are empty for ${task.id}," + + "set exit to ${task.exitStatus}" } task.stdout = outputFile task.stderr = errorFile diff --git a/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy b/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy index 006852c..04be163 100644 --- a/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy +++ b/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy @@ -535,7 +535,7 @@ class FloatGridExecutorTest extends FloatBaseTest { res['tJob-5'] == qs.ERROR res['tJob-6'] == qs.ERROR res['tJob-7'] == qs.ERROR - res['tJob-8'] == qs.UNKNOWN + res['tJob-8'] == qs.DONE res['tJob-9'] == qs.RUNNING res['tJob-10'] == qs.RUNNING res['tJob-11'] == qs.RUNNING diff --git a/plugins/nf-float/src/test/com/memverge/nextflow/FloatJobTest.groovy b/plugins/nf-float/src/test/com/memverge/nextflow/FloatJobTest.groovy index 9020ea6..2f21286 100644 --- a/plugins/nf-float/src/test/com/memverge/nextflow/FloatJobTest.groovy +++ b/plugins/nf-float/src/test/com/memverge/nextflow/FloatJobTest.groovy @@ -148,9 +148,9 @@ class FloatJobTest extends Specification { ]""" when: - def stMap = FloatJob.parseJobMap(out) - def st1 = stMap['job-1'] - def st2 = stMap['job-3'] + def jobs = FloatJob.parseJobMap(out) + def st1 = jobs.get(0) + def st2 = jobs.get(1) then: st1.status == FloatStatus.ERROR @@ -168,9 +168,9 @@ class FloatJobTest extends Specification { final out = """No jobs""" when: - def stMap = FloatJob.parseJobMap(out) + def jobs = FloatJob.parseJobMap(out) then: - stMap.size() == 0 + jobs.size() == 0 } } From 2b117c46e42eb4ec1b9a1a53930ca12e77954a6d Mon Sep 17 00:00:00 2001 From: Cedric Zhuang Date: Fri, 8 Sep 2023 15:29:11 +0800 Subject: [PATCH 3/3] Handle quoted params in extra. Properly handle the quoted parameters in `extra` or `commonExtra`. --- .../nextflow/FloatGridExecutor.groovy | 27 ++++++- .../nextflow/FloatGridExecutorTest.groovy | 71 ++++++++++++------- 2 files changed, 73 insertions(+), 25 deletions(-) diff --git a/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy b/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy index 3a3001b..6eb6c55 100644 --- a/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy +++ b/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy @@ -28,6 +28,8 @@ import nextflow.util.ServiceName import java.nio.file.Path import java.nio.file.StandardCopyOption +import java.util.regex.Matcher +import java.util.regex.Pattern import java.util.stream.Collectors /** @@ -142,10 +144,33 @@ class FloatGridExecutor extends AbstractGridExecutor { if (common) { extra = common.trim() + " " + extra.trim() } - def ret = extra.split('\\s+') + def ret = splitWithQuotes(extra) return ret.findAll { it.length() > 0 } } + private static List splitWithQuotes(String input) { + List ret = new ArrayList() + int start = 0 + boolean inQuotes = false + for (int i = 0; i < input.size(); i++) { + if (input[i] == '"') { + inQuotes = !inQuotes + } + if ((input[i] == '\t' || input[i] == ' ') && !inQuotes) { + String token = input.substring(start, i).trim() + if (token.size() > 0) { + ret.add(token) + } + start = i + } + } + String token = input.substring(start).trim() + if (token.size() > 0) { + ret.add(token) + } + return ret + } + private List getCmdPrefixForJob(String floatJobID) { final oc = floatJobs.getOc(floatJobID) return floatConf.getCliPrefix(oc) diff --git a/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy b/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy index 04be163..3eea09d 100644 --- a/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy +++ b/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy @@ -150,6 +150,29 @@ class FloatGridExecutorTest extends FloatBaseTest { cmd.join(" ") == expected.join(" ") } + def "quoted arguments in extra"() { + given: + final exec = newTestExecutor( + [float: [address : addr, + username : user, + password : pass, + nfs : nfs, + commonExtra: '--dataVolume [opts="-o allow_other"]s3://1.2.3.4:/a:/a --rootVolSize 10']] + ) + final task = newTask(exec) + + when: + final cmd = exec.getSubmitCommandLine(task, Paths.get(script)) + final expected = submitCmd() + [ + '--dataVolume', + '[opts="-o allow_other"]s3://1.2.3.4:/a:/a', + '--rootVolSize', + '10'] + + then: + cmd == expected + } + def "add specific extras"() { given: final exec = newTestExecutor() @@ -317,10 +340,10 @@ class FloatGridExecutorTest extends FloatBaseTest { def "use timeout factor"() { given: final exec = newTestExecutor( - [float: [address : addr, - username : user, - password : pass, - nfs : nfs, + [float: [address : addr, + username : user, + password : pass, + nfs : nfs, timeFactor: 1.1]]) final task = newTask(exec, new TaskConfig( container: image, @@ -337,10 +360,10 @@ class FloatGridExecutorTest extends FloatBaseTest { def "use on-demand for retry"() { given: final exec = newTestExecutor( - [float: [address : addr, - username : user, - password : pass, - nfs : nfs]]) + [float: [address : addr, + username: user, + password: pass, + nfs : nfs]]) final task = newTask(exec, new TaskConfig( container: image, time: '1h', @@ -357,10 +380,10 @@ class FloatGridExecutorTest extends FloatBaseTest { def "use cpu factor"() { given: final exec = newTestExecutor( - [float: [address : addr, - username : user, - password : pass, - nfs : nfs, + [float: [address : addr, + username : user, + password : pass, + nfs : nfs, cpuFactor: 1.5]]) final task = newTask(exec, new TaskConfig( container: image, @@ -377,10 +400,10 @@ class FloatGridExecutorTest extends FloatBaseTest { def "use invalid cpu"() { given: final exec = newTestExecutor( - [float: [address : addr, - username : user, - password : pass, - nfs : nfs, + [float: [address : addr, + username : user, + password : pass, + nfs : nfs, cpuFactor: 0.2]]) final task = newTask(exec, new TaskConfig( container: image, @@ -397,10 +420,10 @@ class FloatGridExecutorTest extends FloatBaseTest { def "use memory factor"() { given: final exec = newTestExecutor( - [float: [address : addr, - username : user, - password : pass, - nfs : nfs, + [float: [address : addr, + username : user, + password : pass, + nfs : nfs, memoryFactor: 0.5]]) final task = newTask(exec, new TaskConfig( container: image, @@ -417,10 +440,10 @@ class FloatGridExecutorTest extends FloatBaseTest { def "use invalid memory"() { given: final exec = newTestExecutor( - [float: [address : addr, - username : user, - password : pass, - nfs : nfs, + [float: [address : addr, + username : user, + password : pass, + nfs : nfs, memoryFactor: 0.5]]) final task = newTask(exec, new TaskConfig( container: image,