diff --git a/README.md b/README.md index 52cbcfd..e8602cb 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,7 @@ Just make sure you have proper internet access. ```groovy plugins { - id 'nf-float@0.2.0' + id 'nf-float@0.2.1' } ``` @@ -66,9 +66,9 @@ Go to the folder where you just install the `nextflow` command line. Let's call this folder the Nextflow home directory. Create the float plugin folder with: ```bash -mkdir -p .nextflow/plugins/nf-float-0.2.0 +mkdir -p .nextflow/plugins/nf-float-0.2.1 ``` -where `0.2.0` is the version of the float plugin. This version number should +where `0.2.1` is the version of the float plugin. This version number should align with the version in of your plugin and the property in your configuration file. (check the configuration section) @@ -76,7 +76,7 @@ Retrieve your plugin zip file and unzip it in this folder. If everything goes right, you should be able to see two sub-folders: ```bash -$ ll .nextflow/plugins/nf-float-0.2.0/ +$ ll .nextflow/plugins/nf-float-0.2.1/ total 48 drwxr-xr-x 4 ec2-user ec2-user 51 Jan 5 07:17 classes drwxr-xr-x 2 ec2-user ec2-user 25 Jan 5 07:17 META-INF @@ -89,7 +89,7 @@ file with the command line option `-c`. Here is a sample of the configuration. ```groovy plugins { - id 'nf-float@0.2.0' + id 'nf-float@0.2.1' } workDir = '/mnt/memverge/shared' @@ -158,7 +158,7 @@ Unknown config secret 'MMC_USERNAME' To enable s3 as work directory, user need to set work directory to a s3 bucket. ```groovy plugins { - id 'nf-float@0.2.0' + id 'nf-float@0.2.1' } workDir = 's3://bucket/path' @@ -198,6 +198,56 @@ Nextflow looks for AWS credentials in the following order: For detail, check NextFlow's document. https://www.nextflow.io/docs/latest/amazons3.html#security-credentials +Tests done for s3 work directory support: +* trivial sequence and scatter workflow. +* the test profile of nf-core/rnaseq +* the test profile of nf-core/sarek + + +### Configure fusion FS over s3 + +Since release 0.2.1, we support fusion FS over s3. To enable fusion, you need to add following configurations +```groovy +wave.enabled = true // 1 + +fusion { + enabled = true // 2 + exportStorageCredentials = true // 3 + exportAwsAccessKeys = true // 4 +} +``` +1. fusion needs wave support. +2. enable fusion explicitly +3. export the aws credentials as environment variable. +4. same as 3. Different nextflow versions may require different option. Supply both 3 & 4 if you are not sure. + +In additional, you may want to: +* point your work directory to a location in s3. +* specify your s3 credentials in the `aws` scope. + +When fusion is enabled, you can find similar submit command line in your `.nextflow.log` +``` +float -a 34.71.114.123 -u admin -p *** submit --image \ +wave.seqera.io/wt/dfd4c4e2d48d/biocontainers/mulled-v2-***:***-0 \ +--cpu 12 --mem 72 --job /tmp/nextflow5377817282489183149.command.run \ +--env FUSION_WORK=/fusion/s3/cedric-memverge-test/nf-work/work/31/a4b682beb93c944fbd3a342ffc41c5 \ +--env AWS_ACCESS_KEY_ID=*** --env AWS_SECRET_ACCESS_KEY=*** \ +--env FUSION_TAGS=[.command.*|.exitcode|.fusion.*](nextflow.io/metadata=true),[*](nextflow.io/temporary=true) \ +--extraContainerOpts --privileged --customTag nf-job-id:znzjht-4 +``` +* the task image is wrapped by a layer provided by wave. + __note__: releases prior to MMC 2.3.1 has bug that fails the image pull requests to the wave registry. + please upgrade to the latest MMC master. +* `FUSION_WORK` and `FUSION_TAGS` is added as environment variables. +* aws credentials is added as environment variables. +* use `extraContainerOpts` to make sure we run the container in privileged mode. + __note__: this option requires MMC 2.3 or later. + +Tests for the fusion support. +* trivial sequence and scatter workflow. +* the test profile of nf-core/rnaseq +* the test profile of nf-core/sarek + ## Task Sample For each process, users could supply their requirements for the CPU, memory and container image using the standard Nextflow process directives. diff --git a/conf/float-rt.conf b/conf/float-rt.conf index 1d561f9..db67da1 100644 --- a/conf/float-rt.conf +++ b/conf/float-rt.conf @@ -1,5 +1,5 @@ plugins { - id 'nf-float@0.2.0' + id 'nf-float@0.2.1' } workDir = '/mnt/memverge/shared' diff --git a/plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy b/plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy index d3394cd..ebf2bde 100644 --- a/plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy +++ b/plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy @@ -135,7 +135,7 @@ class FloatConf { } private static def warnDeprecated(String deprecated, String replacement) { - log.warn "[flaot] config option `$deprecated` " + + log.warn "[float] config option `$deprecated` " + "is no longer supported, " + "use `$replacement` instead" } diff --git a/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy b/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy index 0a5c4b2..065675c 100644 --- a/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy +++ b/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy @@ -27,7 +27,6 @@ import nextflow.util.ServiceName import java.nio.file.Path import java.nio.file.StandardCopyOption -import java.util.concurrent.atomic.AtomicInteger import java.util.stream.Collectors /** @@ -202,15 +201,15 @@ class FloatGridExecutor extends AbstractGridExecutor { return ret } - private Map getEnv(FloatTaskHandler handler) { + private Map getEnv(FloatTaskHandler handler) { return isFusionEnabled() - ? handler.fusionLauncher().fusionEnv() - : [:] + ? handler.fusionLauncher().fusionEnv() + : [:] } @Override List getSubmitCommandLine(TaskRun task, Path scriptFile) { - null + return getSubmitCommandLine(new FloatTaskHandler(task, this), scriptFile) } List getSubmitCommandLine(FloatTaskHandler handler, Path scriptFile) { @@ -227,17 +226,18 @@ class FloatGridExecutor extends AbstractGridExecutor { "with `process.container`") } def cmd = getSubmitCmdPrefix(task.index) - cmd << 'sbatch' - for (def vol : getMountVols(task)) { - cmd << '--dataVolume' << vol - } + cmd << 'submit' + getMountVols(task).forEach { cmd << '--dataVolume' << it } cmd << '--image' << task.getContainer() cmd << '--cpu' << task.config.getCpus().toString() cmd << '--mem' << getMemory(task) cmd << '--job' << getScriptFilePath(handler, scriptFile) - def env = getEnv(handler) - if (env) { - cmd << '--env' << env.collect(e -> "${e.key}=${e.value}").join(',') + getEnv(handler).each { key, val -> + cmd << '--env' << "${key}=${val}".toString() + } + if (isFusionEnabled()) { + cmd << '--extraContainerOpts' + cmd << '--privileged' } cmd << '--customTag' << tag cmd.addAll(getExtra(task)) @@ -258,7 +258,7 @@ class FloatGridExecutor extends AbstractGridExecutor { return scriptFile.toString() } - protected String saveFusionScriptFile(FloatTaskHandler handler, Path scriptFile) { + protected static String saveFusionScriptFile(FloatTaskHandler handler, Path scriptFile) { final localTmp = File.createTempFile("nextflow", scriptFile.name) log.info("save fusion launcher script") localTmp.text = '#!/bin/bash\n' + handler.fusionSubmitCli().join(' ') + '\n' @@ -276,7 +276,7 @@ class FloatGridExecutor extends AbstractGridExecutor { } /** - * Parse the string returned by the {@code float sbatch} and extract + * Parse the string returned by the {@code float submit} and extract * the job ID string * * @param text The string returned when submitting the job @@ -331,9 +331,10 @@ class FloatGridExecutor extends AbstractGridExecutor { jobIds.forEach { def id = it.toString() def cmd = getCmdPrefixForJob(id) - cmd << 'scancel' + cmd << 'cancel' cmd << '-j' cmd << id + cmd << '-f' log.info "[float] cancel job: ${toLogStr(cmd)}" ret.add(cmd) } @@ -349,7 +350,7 @@ class FloatGridExecutor extends AbstractGridExecutor { @Override protected List getKillCommand() { def cmd = getCmdPrefix0() - cmd << 'scancel' + cmd << 'cancel' cmd << '-j' log.info "[float] cancel job: ${toLogStr(cmd)}" return cmd @@ -386,7 +387,7 @@ class FloatGridExecutor extends AbstractGridExecutor { private List getQueueCmdOfOC(String oc = "") { def cmd = floatConf.getCliPrefix(oc) - cmd << 'squeue' + cmd << 'list' cmd << '--format' cmd << 'json' log.info "[float] query job status: ${toLogStr(cmd)}" @@ -404,6 +405,7 @@ class FloatGridExecutor extends AbstractGridExecutor { QueueStatus status = STATUS_MAP.getOrDefault(value, QueueStatus.UNKNOWN) ret[key] = status } + log.info "[float] got job status $ret" return ret } diff --git a/plugins/nf-float/src/main/com/memverge/nextflow/FloatJobs.groovy b/plugins/nf-float/src/main/com/memverge/nextflow/FloatJobs.groovy index f0129f9..70b5a91 100644 --- a/plugins/nf-float/src/main/com/memverge/nextflow/FloatJobs.groovy +++ b/plugins/nf-float/src/main/com/memverge/nextflow/FloatJobs.groovy @@ -88,29 +88,28 @@ class FloatJobs { def currentSt = job2status.get(jobID, Unknown) def workDir = task2workDir.get(taskID) - if (!workDir) { - return - } - // check the availability of result files - // call list files to update the folder cache - FileHelper.listDirectory(workDir) - def files = ['.command.out', '.command.err', '.exitcode'] - if (currentSt != Completed && st == Completed) { - for (filename in files) { - def name = workDir.resolve(filename) - try { - !FileHelper.checkIfExists(name, [checkIfExists: true]) - } catch (NoSuchFileException ex) { - log.info("[float] job $jobID completed " + - "but file not found: $ex") - return + if (workDir) { + // check the availability of result files + // call list files to update the folder cache + FileHelper.listDirectory(workDir) + def files = ['.command.out', '.command.err', '.exitcode'] + if (currentSt != Completed && st == Completed) { + for (filename in files) { + def name = workDir.resolve(filename) + try { + !FileHelper.checkIfExists(name, [checkIfExists: true]) + } catch (NoSuchFileException ex) { + log.info("[float] job $jobID completed " + + "but file not found: $ex") + return + } } + log.debug("[float] found $files in: $workDir") } - log.debug("[float] found $files in: $workDir") } job2status.put(jobID, st) } - log.debug("[float] update op-center $oc job status: $job2status") + log.debug "[float] update op-center $oc job status: $job2status" return job2status } diff --git a/plugins/nf-float/src/resources/META-INF/MANIFEST.MF b/plugins/nf-float/src/resources/META-INF/MANIFEST.MF index 5baa670..711b9a6 100644 --- a/plugins/nf-float/src/resources/META-INF/MANIFEST.MF +++ b/plugins/nf-float/src/resources/META-INF/MANIFEST.MF @@ -1,6 +1,6 @@ Manifest-Version: 1.0 Plugin-Class: com.memverge.nextflow.FloatPlugin Plugin-Id: nf-float -Plugin-Version: 0.2.0 +Plugin-Version: 0.2.1 Plugin-Provider: MemVerge Corp. Plugin-Requires: >=23.04.0 diff --git a/plugins/nf-float/src/test/com/memverge/nextflow/FloatBaseTest.groovy b/plugins/nf-float/src/test/com/memverge/nextflow/FloatBaseTest.groovy index ed460e3..0dbe6a8 100644 --- a/plugins/nf-float/src/test/com/memverge/nextflow/FloatBaseTest.groovy +++ b/plugins/nf-float/src/test/com/memverge/nextflow/FloatBaseTest.groovy @@ -24,6 +24,7 @@ import spock.lang.Specification import java.nio.file.Path import java.nio.file.Paths +import java.util.concurrent.atomic.AtomicInteger class BaseTest extends Specification { def setEnv(String key, String value) { @@ -52,6 +53,7 @@ class FloatBaseTest extends BaseTest { def script = '/path/job.sh' def workDir = '/mnt/nfs/shared' def taskID = new TaskId(55) + private AtomicInteger taskSerial = new AtomicInteger() class FloatTestExecutor extends FloatGridExecutor { @Override @@ -90,6 +92,8 @@ class FloatBaseTest extends BaseTest { task.processor.getExecutor() >> exec task.config = conf task.id = taskID + task.index = taskSerial.incrementAndGet() + task.workDir = Paths.get(workDir) return task } @@ -101,7 +105,7 @@ class FloatBaseTest extends BaseTest { return ['float', '-a', param.addr ?: addr, '-u', user, '-p', pass, - 'sbatch', + 'submit', '--dataVolume', param.nfs ?: nfs + ':' + workDir, '--image', param.image ?: image, '--cpu', param.cpu ?: cpu.toString(), diff --git a/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorMultiOCTest.groovy b/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorMultiOCTest.groovy index dee52cf..c9a3a1d 100644 --- a/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorMultiOCTest.groovy +++ b/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorMultiOCTest.groovy @@ -35,11 +35,12 @@ class FloatGridExecutorMultiOCTest extends FloatBaseTest { def "submit job with round robin"() { given: def exec = newTestExecutor() - def task = newTask(exec) when: - def cmd1 = exec.getSubmitCommandLine(task, Paths.get(script)) - def cmd2 = exec.getSubmitCommandLine(task, Paths.get(script)) + def cmd1 = exec.getSubmitCommandLine( + newTask(exec), Paths.get(script)) + def cmd2 = exec.getSubmitCommandLine( + newTask(exec), Paths.get(script)) def expected1 = submitCmd(addr: "fb") def expected2 = submitCmd(addr: "fa") @@ -59,9 +60,9 @@ class FloatGridExecutorMultiOCTest extends FloatBaseTest { then: cmd1.join(' ') == "float -a fa -u ${user} -p ${pass} " + - "squeue --format json" + "list --format json" cmd2.join(' ') == "float -a fb -u ${user} -p ${pass} " + - "squeue --format json" + "list --format json" } def "input multiple addresses as list"() { @@ -70,10 +71,12 @@ class FloatGridExecutorMultiOCTest extends FloatBaseTest { def task = newTask(exec) when: - def cmd1 = exec.getSubmitCommandLine(task, Paths.get(script)) - def cmd2 = exec.getSubmitCommandLine(task, Paths.get(script)) - def expected1 = submitCmd(addr: "fb") - def expected2 = submitCmd(addr: "fa") + def cmd1 = exec.getSubmitCommandLine( + newTask(exec), Paths.get(script)) + def cmd2 = exec.getSubmitCommandLine( + newTask(exec), Paths.get(script)) + def expected1 = submitCmd(addr: "fa") + def expected2 = submitCmd(addr: "fb") then: cmd1.join(' ') == expected1.join(' ') diff --git a/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy b/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy index 859cccd..44f35a7 100644 --- a/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy +++ b/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy @@ -60,7 +60,7 @@ class FloatGridExecutorTest extends FloatBaseTest { cmd.join(' ') == ['float', '-a', addr, '-u', user, '-p', pass, - 'scancel', '-j', jobID].join(' ') + 'cancel', '-j', jobID].join(' ') } def "kill commands"() { @@ -73,7 +73,7 @@ class FloatGridExecutorTest extends FloatBaseTest { return ['float', '-a', addr, '-u', user, '-p', pass, - 'scancel', '-j', jobID].join(' ') + 'cancel', '-j', jobID, '-f'].join(' ') } then: @@ -354,7 +354,7 @@ class FloatGridExecutorTest extends FloatBaseTest { cmd == ['float', '-a', addr, '-u', user, '-p', pass, - 'squeue', '--format', 'json'] + 'list', '--format', 'json'] } def "retrieve the credentials from env"() {