From 730ff237d734ad589bb81f557275582a662f64a7 Mon Sep 17 00:00:00 2001 From: Cedric Zhuang Date: Mon, 31 Jul 2023 17:44:27 +0800 Subject: [PATCH] [GH-49] Support fusion FS. Use privileged mode for podman. Add fusion envs to the submit command. Fix an error that job is not properly canceled. Fix the related unittests. Add configuration for fusion in readme. --- README.md | 62 +++++++++++++++++-- conf/float-rt.conf | 2 +- .../com/memverge/nextflow/FloatConf.groovy | 2 +- .../nextflow/FloatGridExecutor.groovy | 36 ++++++----- .../com/memverge/nextflow/FloatJobs.groovy | 35 +++++------ .../src/resources/META-INF/MANIFEST.MF | 2 +- .../memverge/nextflow/FloatBaseTest.groovy | 6 +- .../FloatGridExecutorMultiOCTest.groovy | 21 ++++--- .../nextflow/FloatGridExecutorTest.groovy | 6 +- 9 files changed, 115 insertions(+), 57 deletions(-) diff --git a/README.md b/README.md index 52cbcfd..e8602cb 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,7 @@ Just make sure you have proper internet access. ```groovy plugins { - id 'nf-float@0.2.0' + id 'nf-float@0.2.1' } ``` @@ -66,9 +66,9 @@ Go to the folder where you just install the `nextflow` command line. Let's call this folder the Nextflow home directory. Create the float plugin folder with: ```bash -mkdir -p .nextflow/plugins/nf-float-0.2.0 +mkdir -p .nextflow/plugins/nf-float-0.2.1 ``` -where `0.2.0` is the version of the float plugin. This version number should +where `0.2.1` is the version of the float plugin. This version number should align with the version in of your plugin and the property in your configuration file. (check the configuration section) @@ -76,7 +76,7 @@ Retrieve your plugin zip file and unzip it in this folder. If everything goes right, you should be able to see two sub-folders: ```bash -$ ll .nextflow/plugins/nf-float-0.2.0/ +$ ll .nextflow/plugins/nf-float-0.2.1/ total 48 drwxr-xr-x 4 ec2-user ec2-user 51 Jan 5 07:17 classes drwxr-xr-x 2 ec2-user ec2-user 25 Jan 5 07:17 META-INF @@ -89,7 +89,7 @@ file with the command line option `-c`. Here is a sample of the configuration. ```groovy plugins { - id 'nf-float@0.2.0' + id 'nf-float@0.2.1' } workDir = '/mnt/memverge/shared' @@ -158,7 +158,7 @@ Unknown config secret 'MMC_USERNAME' To enable s3 as work directory, user need to set work directory to a s3 bucket. ```groovy plugins { - id 'nf-float@0.2.0' + id 'nf-float@0.2.1' } workDir = 's3://bucket/path' @@ -198,6 +198,56 @@ Nextflow looks for AWS credentials in the following order: For detail, check NextFlow's document. https://www.nextflow.io/docs/latest/amazons3.html#security-credentials +Tests done for s3 work directory support: +* trivial sequence and scatter workflow. +* the test profile of nf-core/rnaseq +* the test profile of nf-core/sarek + + +### Configure fusion FS over s3 + +Since release 0.2.1, we support fusion FS over s3. To enable fusion, you need to add following configurations +```groovy +wave.enabled = true // 1 + +fusion { + enabled = true // 2 + exportStorageCredentials = true // 3 + exportAwsAccessKeys = true // 4 +} +``` +1. fusion needs wave support. +2. enable fusion explicitly +3. export the aws credentials as environment variable. +4. same as 3. Different nextflow versions may require different option. Supply both 3 & 4 if you are not sure. + +In additional, you may want to: +* point your work directory to a location in s3. +* specify your s3 credentials in the `aws` scope. + +When fusion is enabled, you can find similar submit command line in your `.nextflow.log` +``` +float -a 34.71.114.123 -u admin -p *** submit --image \ +wave.seqera.io/wt/dfd4c4e2d48d/biocontainers/mulled-v2-***:***-0 \ +--cpu 12 --mem 72 --job /tmp/nextflow5377817282489183149.command.run \ +--env FUSION_WORK=/fusion/s3/cedric-memverge-test/nf-work/work/31/a4b682beb93c944fbd3a342ffc41c5 \ +--env AWS_ACCESS_KEY_ID=*** --env AWS_SECRET_ACCESS_KEY=*** \ +--env FUSION_TAGS=[.command.*|.exitcode|.fusion.*](nextflow.io/metadata=true),[*](nextflow.io/temporary=true) \ +--extraContainerOpts --privileged --customTag nf-job-id:znzjht-4 +``` +* the task image is wrapped by a layer provided by wave. + __note__: releases prior to MMC 2.3.1 has bug that fails the image pull requests to the wave registry. + please upgrade to the latest MMC master. +* `FUSION_WORK` and `FUSION_TAGS` is added as environment variables. +* aws credentials is added as environment variables. +* use `extraContainerOpts` to make sure we run the container in privileged mode. + __note__: this option requires MMC 2.3 or later. + +Tests for the fusion support. +* trivial sequence and scatter workflow. +* the test profile of nf-core/rnaseq +* the test profile of nf-core/sarek + ## Task Sample For each process, users could supply their requirements for the CPU, memory and container image using the standard Nextflow process directives. diff --git a/conf/float-rt.conf b/conf/float-rt.conf index 1d561f9..db67da1 100644 --- a/conf/float-rt.conf +++ b/conf/float-rt.conf @@ -1,5 +1,5 @@ plugins { - id 'nf-float@0.2.0' + id 'nf-float@0.2.1' } workDir = '/mnt/memverge/shared' diff --git a/plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy b/plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy index d3394cd..ebf2bde 100644 --- a/plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy +++ b/plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy @@ -135,7 +135,7 @@ class FloatConf { } private static def warnDeprecated(String deprecated, String replacement) { - log.warn "[flaot] config option `$deprecated` " + + log.warn "[float] config option `$deprecated` " + "is no longer supported, " + "use `$replacement` instead" } diff --git a/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy b/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy index 0a5c4b2..065675c 100644 --- a/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy +++ b/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy @@ -27,7 +27,6 @@ import nextflow.util.ServiceName import java.nio.file.Path import java.nio.file.StandardCopyOption -import java.util.concurrent.atomic.AtomicInteger import java.util.stream.Collectors /** @@ -202,15 +201,15 @@ class FloatGridExecutor extends AbstractGridExecutor { return ret } - private Map getEnv(FloatTaskHandler handler) { + private Map getEnv(FloatTaskHandler handler) { return isFusionEnabled() - ? handler.fusionLauncher().fusionEnv() - : [:] + ? handler.fusionLauncher().fusionEnv() + : [:] } @Override List getSubmitCommandLine(TaskRun task, Path scriptFile) { - null + return getSubmitCommandLine(new FloatTaskHandler(task, this), scriptFile) } List getSubmitCommandLine(FloatTaskHandler handler, Path scriptFile) { @@ -227,17 +226,18 @@ class FloatGridExecutor extends AbstractGridExecutor { "with `process.container`") } def cmd = getSubmitCmdPrefix(task.index) - cmd << 'sbatch' - for (def vol : getMountVols(task)) { - cmd << '--dataVolume' << vol - } + cmd << 'submit' + getMountVols(task).forEach { cmd << '--dataVolume' << it } cmd << '--image' << task.getContainer() cmd << '--cpu' << task.config.getCpus().toString() cmd << '--mem' << getMemory(task) cmd << '--job' << getScriptFilePath(handler, scriptFile) - def env = getEnv(handler) - if (env) { - cmd << '--env' << env.collect(e -> "${e.key}=${e.value}").join(',') + getEnv(handler).each { key, val -> + cmd << '--env' << "${key}=${val}".toString() + } + if (isFusionEnabled()) { + cmd << '--extraContainerOpts' + cmd << '--privileged' } cmd << '--customTag' << tag cmd.addAll(getExtra(task)) @@ -258,7 +258,7 @@ class FloatGridExecutor extends AbstractGridExecutor { return scriptFile.toString() } - protected String saveFusionScriptFile(FloatTaskHandler handler, Path scriptFile) { + protected static String saveFusionScriptFile(FloatTaskHandler handler, Path scriptFile) { final localTmp = File.createTempFile("nextflow", scriptFile.name) log.info("save fusion launcher script") localTmp.text = '#!/bin/bash\n' + handler.fusionSubmitCli().join(' ') + '\n' @@ -276,7 +276,7 @@ class FloatGridExecutor extends AbstractGridExecutor { } /** - * Parse the string returned by the {@code float sbatch} and extract + * Parse the string returned by the {@code float submit} and extract * the job ID string * * @param text The string returned when submitting the job @@ -331,9 +331,10 @@ class FloatGridExecutor extends AbstractGridExecutor { jobIds.forEach { def id = it.toString() def cmd = getCmdPrefixForJob(id) - cmd << 'scancel' + cmd << 'cancel' cmd << '-j' cmd << id + cmd << '-f' log.info "[float] cancel job: ${toLogStr(cmd)}" ret.add(cmd) } @@ -349,7 +350,7 @@ class FloatGridExecutor extends AbstractGridExecutor { @Override protected List getKillCommand() { def cmd = getCmdPrefix0() - cmd << 'scancel' + cmd << 'cancel' cmd << '-j' log.info "[float] cancel job: ${toLogStr(cmd)}" return cmd @@ -386,7 +387,7 @@ class FloatGridExecutor extends AbstractGridExecutor { private List getQueueCmdOfOC(String oc = "") { def cmd = floatConf.getCliPrefix(oc) - cmd << 'squeue' + cmd << 'list' cmd << '--format' cmd << 'json' log.info "[float] query job status: ${toLogStr(cmd)}" @@ -404,6 +405,7 @@ class FloatGridExecutor extends AbstractGridExecutor { QueueStatus status = STATUS_MAP.getOrDefault(value, QueueStatus.UNKNOWN) ret[key] = status } + log.info "[float] got job status $ret" return ret } diff --git a/plugins/nf-float/src/main/com/memverge/nextflow/FloatJobs.groovy b/plugins/nf-float/src/main/com/memverge/nextflow/FloatJobs.groovy index f0129f9..70b5a91 100644 --- a/plugins/nf-float/src/main/com/memverge/nextflow/FloatJobs.groovy +++ b/plugins/nf-float/src/main/com/memverge/nextflow/FloatJobs.groovy @@ -88,29 +88,28 @@ class FloatJobs { def currentSt = job2status.get(jobID, Unknown) def workDir = task2workDir.get(taskID) - if (!workDir) { - return - } - // check the availability of result files - // call list files to update the folder cache - FileHelper.listDirectory(workDir) - def files = ['.command.out', '.command.err', '.exitcode'] - if (currentSt != Completed && st == Completed) { - for (filename in files) { - def name = workDir.resolve(filename) - try { - !FileHelper.checkIfExists(name, [checkIfExists: true]) - } catch (NoSuchFileException ex) { - log.info("[float] job $jobID completed " + - "but file not found: $ex") - return + if (workDir) { + // check the availability of result files + // call list files to update the folder cache + FileHelper.listDirectory(workDir) + def files = ['.command.out', '.command.err', '.exitcode'] + if (currentSt != Completed && st == Completed) { + for (filename in files) { + def name = workDir.resolve(filename) + try { + !FileHelper.checkIfExists(name, [checkIfExists: true]) + } catch (NoSuchFileException ex) { + log.info("[float] job $jobID completed " + + "but file not found: $ex") + return + } } + log.debug("[float] found $files in: $workDir") } - log.debug("[float] found $files in: $workDir") } job2status.put(jobID, st) } - log.debug("[float] update op-center $oc job status: $job2status") + log.debug "[float] update op-center $oc job status: $job2status" return job2status } diff --git a/plugins/nf-float/src/resources/META-INF/MANIFEST.MF b/plugins/nf-float/src/resources/META-INF/MANIFEST.MF index 5baa670..711b9a6 100644 --- a/plugins/nf-float/src/resources/META-INF/MANIFEST.MF +++ b/plugins/nf-float/src/resources/META-INF/MANIFEST.MF @@ -1,6 +1,6 @@ Manifest-Version: 1.0 Plugin-Class: com.memverge.nextflow.FloatPlugin Plugin-Id: nf-float -Plugin-Version: 0.2.0 +Plugin-Version: 0.2.1 Plugin-Provider: MemVerge Corp. Plugin-Requires: >=23.04.0 diff --git a/plugins/nf-float/src/test/com/memverge/nextflow/FloatBaseTest.groovy b/plugins/nf-float/src/test/com/memverge/nextflow/FloatBaseTest.groovy index ed460e3..0dbe6a8 100644 --- a/plugins/nf-float/src/test/com/memverge/nextflow/FloatBaseTest.groovy +++ b/plugins/nf-float/src/test/com/memverge/nextflow/FloatBaseTest.groovy @@ -24,6 +24,7 @@ import spock.lang.Specification import java.nio.file.Path import java.nio.file.Paths +import java.util.concurrent.atomic.AtomicInteger class BaseTest extends Specification { def setEnv(String key, String value) { @@ -52,6 +53,7 @@ class FloatBaseTest extends BaseTest { def script = '/path/job.sh' def workDir = '/mnt/nfs/shared' def taskID = new TaskId(55) + private AtomicInteger taskSerial = new AtomicInteger() class FloatTestExecutor extends FloatGridExecutor { @Override @@ -90,6 +92,8 @@ class FloatBaseTest extends BaseTest { task.processor.getExecutor() >> exec task.config = conf task.id = taskID + task.index = taskSerial.incrementAndGet() + task.workDir = Paths.get(workDir) return task } @@ -101,7 +105,7 @@ class FloatBaseTest extends BaseTest { return ['float', '-a', param.addr ?: addr, '-u', user, '-p', pass, - 'sbatch', + 'submit', '--dataVolume', param.nfs ?: nfs + ':' + workDir, '--image', param.image ?: image, '--cpu', param.cpu ?: cpu.toString(), diff --git a/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorMultiOCTest.groovy b/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorMultiOCTest.groovy index dee52cf..c9a3a1d 100644 --- a/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorMultiOCTest.groovy +++ b/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorMultiOCTest.groovy @@ -35,11 +35,12 @@ class FloatGridExecutorMultiOCTest extends FloatBaseTest { def "submit job with round robin"() { given: def exec = newTestExecutor() - def task = newTask(exec) when: - def cmd1 = exec.getSubmitCommandLine(task, Paths.get(script)) - def cmd2 = exec.getSubmitCommandLine(task, Paths.get(script)) + def cmd1 = exec.getSubmitCommandLine( + newTask(exec), Paths.get(script)) + def cmd2 = exec.getSubmitCommandLine( + newTask(exec), Paths.get(script)) def expected1 = submitCmd(addr: "fb") def expected2 = submitCmd(addr: "fa") @@ -59,9 +60,9 @@ class FloatGridExecutorMultiOCTest extends FloatBaseTest { then: cmd1.join(' ') == "float -a fa -u ${user} -p ${pass} " + - "squeue --format json" + "list --format json" cmd2.join(' ') == "float -a fb -u ${user} -p ${pass} " + - "squeue --format json" + "list --format json" } def "input multiple addresses as list"() { @@ -70,10 +71,12 @@ class FloatGridExecutorMultiOCTest extends FloatBaseTest { def task = newTask(exec) when: - def cmd1 = exec.getSubmitCommandLine(task, Paths.get(script)) - def cmd2 = exec.getSubmitCommandLine(task, Paths.get(script)) - def expected1 = submitCmd(addr: "fb") - def expected2 = submitCmd(addr: "fa") + def cmd1 = exec.getSubmitCommandLine( + newTask(exec), Paths.get(script)) + def cmd2 = exec.getSubmitCommandLine( + newTask(exec), Paths.get(script)) + def expected1 = submitCmd(addr: "fa") + def expected2 = submitCmd(addr: "fb") then: cmd1.join(' ') == expected1.join(' ') diff --git a/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy b/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy index 859cccd..44f35a7 100644 --- a/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy +++ b/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy @@ -60,7 +60,7 @@ class FloatGridExecutorTest extends FloatBaseTest { cmd.join(' ') == ['float', '-a', addr, '-u', user, '-p', pass, - 'scancel', '-j', jobID].join(' ') + 'cancel', '-j', jobID].join(' ') } def "kill commands"() { @@ -73,7 +73,7 @@ class FloatGridExecutorTest extends FloatBaseTest { return ['float', '-a', addr, '-u', user, '-p', pass, - 'scancel', '-j', jobID].join(' ') + 'cancel', '-j', jobID, '-f'].join(' ') } then: @@ -354,7 +354,7 @@ class FloatGridExecutorTest extends FloatBaseTest { cmd == ['float', '-a', addr, '-u', user, '-p', pass, - 'squeue', '--format', 'json'] + 'list', '--format', 'json'] } def "retrieve the credentials from env"() {