Skip to content

Commit

Permalink
[GH-49] Support fusion FS.
Browse files Browse the repository at this point in the history
Use privileged mode for podman.
Add fusion envs to the submit command.

Fix an error that job is not properly canceled.
Fix the related unittests.

Add configuration for fusion in readme.
  • Loading branch information
jealous committed Aug 2, 2023
1 parent a06b349 commit 6b30ae3
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 57 deletions.
62 changes: 56 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ Just make sure you have proper internet access.

```groovy
plugins {
id '[email protected].0'
id '[email protected].1'
}
```

Expand All @@ -66,17 +66,17 @@ Go to the folder where you just install the `nextflow` command line.
Let's call this folder the Nextflow home directory.
Create the float plugin folder with:
```bash
mkdir -p .nextflow/plugins/nf-float-0.2.0
mkdir -p .nextflow/plugins/nf-float-0.2.1
```
where `0.2.0` is the version of the float plugin. This version number should
where `0.2.1` is the version of the float plugin. This version number should
align with the version in of your plugin and the property in your configuration
file. (check the configuration section)

Retrieve your plugin zip file and unzip it in this folder.
If everything goes right, you should be able to see two sub-folders:

```bash
$ ll .nextflow/plugins/nf-float-0.2.0/
$ ll .nextflow/plugins/nf-float-0.2.1/
total 48
drwxr-xr-x 4 ec2-user ec2-user 51 Jan 5 07:17 classes
drwxr-xr-x 2 ec2-user ec2-user 25 Jan 5 07:17 META-INF
Expand All @@ -89,7 +89,7 @@ file with the command line option `-c`. Here is a sample of the configuration.

```groovy
plugins {
id '[email protected].0'
id '[email protected].1'
}
workDir = '/mnt/memverge/shared'
Expand Down Expand Up @@ -158,7 +158,7 @@ Unknown config secret 'MMC_USERNAME'
To enable s3 as work directory, user need to set work directory to a s3 bucket.
```groovy
plugins {
id '[email protected].0'
id '[email protected].1'
}
workDir = 's3://bucket/path'
Expand Down Expand Up @@ -198,6 +198,56 @@ Nextflow looks for AWS credentials in the following order:
For detail, check NextFlow's document.
https://www.nextflow.io/docs/latest/amazons3.html#security-credentials

Tests done for s3 work directory support:
* trivial sequence and scatter workflow.
* the test profile of nf-core/rnaseq
* the test profile of nf-core/sarek


### Configure fusion FS over s3

Since release 0.2.1, we support fusion FS over s3. To enable fusion, you need to add following configurations
```groovy
wave.enabled = true // 1
fusion {
enabled = true // 2
exportStorageCredentials = true // 3
exportAwsAccessKeys = true // 4
}
```
1. fusion needs wave support.
2. enable fusion explicitly
3. export the aws credentials as environment variable.
4. same as 3. Different nextflow versions may require different option. Supply both 3 & 4 if you are not sure.

In additional, you may want to:
* point your work directory to a location in s3.
* specify your s3 credentials in the `aws` scope.

When fusion is enabled, you can find similar submit command line in your `.nextflow.log`
```
float -a 34.71.114.123 -u admin -p *** submit --image \
wave.seqera.io/wt/dfd4c4e2d48d/biocontainers/mulled-v2-***:***-0 \
--cpu 12 --mem 72 --job /tmp/nextflow5377817282489183149.command.run \
--env FUSION_WORK=/fusion/s3/cedric-memverge-test/nf-work/work/31/a4b682beb93c944fbd3a342ffc41c5 \
--env AWS_ACCESS_KEY_ID=*** --env AWS_SECRET_ACCESS_KEY=*** \
--env FUSION_TAGS=[.command.*|.exitcode|.fusion.*](nextflow.io/metadata=true),[*](nextflow.io/temporary=true) \
--extraContainerOpts --privileged --customTag nf-job-id:znzjht-4
```
* the task image is wrapped by a layer provided by wave.
__note__: releases prior to MMC 2.3.1 has bug that fails the image pull requests to the wave registry.
please upgrade to the latest MMC master.
* `FUSION_WORK` and `FUSION_TAGS` is added as environment variables.
* aws credentials is added as environment variables.
* use `extraContainerOpts` to make sure we run the container in privileged mode.
__note__: this option requires MMC 2.3 or later.

Tests for the fusion support.
* trivial sequence and scatter workflow.
* the test profile of nf-core/rnaseq
* the test profile of nf-core/sarek

## Task Sample

For each process, users could supply their requirements for the CPU, memory and container image using the standard Nextflow process directives.
Expand Down
2 changes: 1 addition & 1 deletion conf/float-rt.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
plugins {
id '[email protected].0'
id '[email protected].1'
}

workDir = '/mnt/memverge/shared'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class FloatConf {
}

private static def warnDeprecated(String deprecated, String replacement) {
log.warn "[flaot] config option `$deprecated` " +
log.warn "[float] config option `$deprecated` " +
"is no longer supported, " +
"use `$replacement` instead"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import nextflow.util.ServiceName

import java.nio.file.Path
import java.nio.file.StandardCopyOption
import java.util.concurrent.atomic.AtomicInteger
import java.util.stream.Collectors

/**
Expand Down Expand Up @@ -202,15 +201,15 @@ class FloatGridExecutor extends AbstractGridExecutor {
return ret
}

private Map<String,String> getEnv(FloatTaskHandler handler) {
private Map<String, String> getEnv(FloatTaskHandler handler) {
return isFusionEnabled()
? handler.fusionLauncher().fusionEnv()
: [:]
? handler.fusionLauncher().fusionEnv()
: [:]
}

@Override
List<String> getSubmitCommandLine(TaskRun task, Path scriptFile) {
null
return getSubmitCommandLine(new FloatTaskHandler(task, this), scriptFile)
}

List<String> getSubmitCommandLine(FloatTaskHandler handler, Path scriptFile) {
Expand All @@ -227,17 +226,18 @@ class FloatGridExecutor extends AbstractGridExecutor {
"with `process.container`")
}
def cmd = getSubmitCmdPrefix(task.index)
cmd << 'sbatch'
for (def vol : getMountVols(task)) {
cmd << '--dataVolume' << vol
}
cmd << 'submit'
getMountVols(task).forEach { cmd << '--dataVolume' << it }
cmd << '--image' << task.getContainer()
cmd << '--cpu' << task.config.getCpus().toString()
cmd << '--mem' << getMemory(task)
cmd << '--job' << getScriptFilePath(handler, scriptFile)
def env = getEnv(handler)
if (env) {
cmd << '--env' << env.collect(e -> "${e.key}=${e.value}").join(',')
getEnv(handler).each { key, val ->
cmd << '--env' << "${key}=${val}".toString()
}
if (isFusionEnabled()) {
cmd << '--extraContainerOpts'
cmd << '--privileged'
}
cmd << '--customTag' << tag
cmd.addAll(getExtra(task))
Expand All @@ -258,7 +258,7 @@ class FloatGridExecutor extends AbstractGridExecutor {
return scriptFile.toString()
}

protected String saveFusionScriptFile(FloatTaskHandler handler, Path scriptFile) {
protected static String saveFusionScriptFile(FloatTaskHandler handler, Path scriptFile) {
final localTmp = File.createTempFile("nextflow", scriptFile.name)
log.info("save fusion launcher script")
localTmp.text = '#!/bin/bash\n' + handler.fusionSubmitCli().join(' ') + '\n'
Expand All @@ -276,7 +276,7 @@ class FloatGridExecutor extends AbstractGridExecutor {
}

/**
* Parse the string returned by the {@code float sbatch} and extract
* Parse the string returned by the {@code float submit} and extract
* the job ID string
*
* @param text The string returned when submitting the job
Expand Down Expand Up @@ -331,9 +331,10 @@ class FloatGridExecutor extends AbstractGridExecutor {
jobIds.forEach {
def id = it.toString()
def cmd = getCmdPrefixForJob(id)
cmd << 'scancel'
cmd << 'cancel'
cmd << '-j'
cmd << id
cmd << '-f'
log.info "[float] cancel job: ${toLogStr(cmd)}"
ret.add(cmd)
}
Expand All @@ -349,7 +350,7 @@ class FloatGridExecutor extends AbstractGridExecutor {
@Override
protected List<String> getKillCommand() {
def cmd = getCmdPrefix0()
cmd << 'scancel'
cmd << 'cancel'
cmd << '-j'
log.info "[float] cancel job: ${toLogStr(cmd)}"
return cmd
Expand Down Expand Up @@ -386,7 +387,7 @@ class FloatGridExecutor extends AbstractGridExecutor {

private List<String> getQueueCmdOfOC(String oc = "") {
def cmd = floatConf.getCliPrefix(oc)
cmd << 'squeue'
cmd << 'list'
cmd << '--format'
cmd << 'json'
log.info "[float] query job status: ${toLogStr(cmd)}"
Expand All @@ -404,6 +405,7 @@ class FloatGridExecutor extends AbstractGridExecutor {
QueueStatus status = STATUS_MAP.getOrDefault(value, QueueStatus.UNKNOWN)
ret[key] = status
}
log.info "[float] got job status $ret"
return ret
}

Expand Down
35 changes: 17 additions & 18 deletions plugins/nf-float/src/main/com/memverge/nextflow/FloatJobs.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -88,29 +88,28 @@ class FloatJobs {

def currentSt = job2status.get(jobID, Unknown)
def workDir = task2workDir.get(taskID)
if (!workDir) {
return
}
// check the availability of result files
// call list files to update the folder cache
FileHelper.listDirectory(workDir)
def files = ['.command.out', '.command.err', '.exitcode']
if (currentSt != Completed && st == Completed) {
for (filename in files) {
def name = workDir.resolve(filename)
try {
!FileHelper.checkIfExists(name, [checkIfExists: true])
} catch (NoSuchFileException ex) {
log.info("[float] job $jobID completed " +
"but file not found: $ex")
return
if (workDir) {
// check the availability of result files
// call list files to update the folder cache
FileHelper.listDirectory(workDir)
def files = ['.command.out', '.command.err', '.exitcode']
if (currentSt != Completed && st == Completed) {
for (filename in files) {
def name = workDir.resolve(filename)
try {
!FileHelper.checkIfExists(name, [checkIfExists: true])
} catch (NoSuchFileException ex) {
log.info("[float] job $jobID completed " +
"but file not found: $ex")
return
}
}
log.debug("[float] found $files in: $workDir")
}
log.debug("[float] found $files in: $workDir")
}
job2status.put(jobID, st)
}
log.debug("[float] update op-center $oc job status: $job2status")
log.debug "[float] update op-center $oc job status: $job2status"
return job2status
}

Expand Down
2 changes: 1 addition & 1 deletion plugins/nf-float/src/resources/META-INF/MANIFEST.MF
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Manifest-Version: 1.0
Plugin-Class: com.memverge.nextflow.FloatPlugin
Plugin-Id: nf-float
Plugin-Version: 0.2.0
Plugin-Version: 0.2.1
Plugin-Provider: MemVerge Corp.
Plugin-Requires: >=23.04.0
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import spock.lang.Specification

import java.nio.file.Path
import java.nio.file.Paths
import java.util.concurrent.atomic.AtomicInteger

class BaseTest extends Specification {
def setEnv(String key, String value) {
Expand Down Expand Up @@ -52,6 +53,7 @@ class FloatBaseTest extends BaseTest {
def script = '/path/job.sh'
def workDir = '/mnt/nfs/shared'
def taskID = new TaskId(55)
private AtomicInteger taskSerial = new AtomicInteger()

class FloatTestExecutor extends FloatGridExecutor {
@Override
Expand Down Expand Up @@ -90,6 +92,8 @@ class FloatBaseTest extends BaseTest {
task.processor.getExecutor() >> exec
task.config = conf
task.id = taskID
task.index = taskSerial.incrementAndGet()
task.workDir = Paths.get(workDir)
return task
}

Expand All @@ -101,7 +105,7 @@ class FloatBaseTest extends BaseTest {
return ['float', '-a', param.addr ?: addr,
'-u', user,
'-p', pass,
'sbatch',
'submit',
'--dataVolume', param.nfs ?: nfs + ':' + workDir,
'--image', param.image ?: image,
'--cpu', param.cpu ?: cpu.toString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ class FloatGridExecutorMultiOCTest extends FloatBaseTest {
def "submit job with round robin"() {
given:
def exec = newTestExecutor()
def task = newTask(exec)

when:
def cmd1 = exec.getSubmitCommandLine(task, Paths.get(script))
def cmd2 = exec.getSubmitCommandLine(task, Paths.get(script))
def cmd1 = exec.getSubmitCommandLine(
newTask(exec), Paths.get(script))
def cmd2 = exec.getSubmitCommandLine(
newTask(exec), Paths.get(script))
def expected1 = submitCmd(addr: "fb")
def expected2 = submitCmd(addr: "fa")

Expand All @@ -59,9 +60,9 @@ class FloatGridExecutorMultiOCTest extends FloatBaseTest {

then:
cmd1.join(' ') == "float -a fa -u ${user} -p ${pass} " +
"squeue --format json"
"list --format json"
cmd2.join(' ') == "float -a fb -u ${user} -p ${pass} " +
"squeue --format json"
"list --format json"
}

def "input multiple addresses as list"() {
Expand All @@ -70,10 +71,12 @@ class FloatGridExecutorMultiOCTest extends FloatBaseTest {
def task = newTask(exec)

when:
def cmd1 = exec.getSubmitCommandLine(task, Paths.get(script))
def cmd2 = exec.getSubmitCommandLine(task, Paths.get(script))
def expected1 = submitCmd(addr: "fb")
def expected2 = submitCmd(addr: "fa")
def cmd1 = exec.getSubmitCommandLine(
newTask(exec), Paths.get(script))
def cmd2 = exec.getSubmitCommandLine(
newTask(exec), Paths.get(script))
def expected1 = submitCmd(addr: "fa")
def expected2 = submitCmd(addr: "fb")

then:
cmd1.join(' ') == expected1.join(' ')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class FloatGridExecutorTest extends FloatBaseTest {
cmd.join(' ') == ['float', '-a', addr,
'-u', user,
'-p', pass,
'scancel', '-j', jobID].join(' ')
'cancel', '-j', jobID].join(' ')
}

def "kill commands"() {
Expand All @@ -73,7 +73,7 @@ class FloatGridExecutorTest extends FloatBaseTest {
return ['float', '-a', addr,
'-u', user,
'-p', pass,
'scancel', '-j', jobID].join(' ')
'cancel', '-j', jobID, '-f'].join(' ')
}

then:
Expand Down Expand Up @@ -354,7 +354,7 @@ class FloatGridExecutorTest extends FloatBaseTest {
cmd == ['float', '-a', addr,
'-u', user,
'-p', pass,
'squeue', '--format', 'json']
'list', '--format', 'json']
}

def "retrieve the credentials from env"() {
Expand Down

0 comments on commit 6b30ae3

Please sign in to comment.