Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Fusion support #48

Merged
merged 2 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 56 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ Just make sure you have proper internet access.

```groovy
plugins {
id '[email protected].0'
id '[email protected].1'
}
```

Expand All @@ -66,17 +66,17 @@ Go to the folder where you just install the `nextflow` command line.
Let's call this folder the Nextflow home directory.
Create the float plugin folder with:
```bash
mkdir -p .nextflow/plugins/nf-float-0.2.0
mkdir -p .nextflow/plugins/nf-float-0.2.1
```
where `0.2.0` is the version of the float plugin. This version number should
where `0.2.1` is the version of the float plugin. This version number should
align with the version in of your plugin and the property in your configuration
file. (check the configuration section)

Retrieve your plugin zip file and unzip it in this folder.
If everything goes right, you should be able to see two sub-folders:

```bash
$ ll .nextflow/plugins/nf-float-0.2.0/
$ ll .nextflow/plugins/nf-float-0.2.1/
total 48
drwxr-xr-x 4 ec2-user ec2-user 51 Jan 5 07:17 classes
drwxr-xr-x 2 ec2-user ec2-user 25 Jan 5 07:17 META-INF
Expand All @@ -89,7 +89,7 @@ file with the command line option `-c`. Here is a sample of the configuration.

```groovy
plugins {
id '[email protected].0'
id '[email protected].1'
}

workDir = '/mnt/memverge/shared'
Expand Down Expand Up @@ -158,7 +158,7 @@ Unknown config secret 'MMC_USERNAME'
To enable s3 as work directory, user need to set work directory to a s3 bucket.
```groovy
plugins {
id '[email protected].0'
id '[email protected].1'
}

workDir = 's3://bucket/path'
Expand Down Expand Up @@ -198,6 +198,56 @@ Nextflow looks for AWS credentials in the following order:
For detail, check NextFlow's document.
https://www.nextflow.io/docs/latest/amazons3.html#security-credentials

Tests done for s3 work directory support:
* trivial sequence and scatter workflow.
* the test profile of nf-core/rnaseq
* the test profile of nf-core/sarek


### Configure fusion FS over s3

Since release 0.2.1, we support fusion FS over s3. To enable fusion, you need to add following configurations
```groovy
wave.enabled = true // 1

fusion {
enabled = true // 2
exportStorageCredentials = true // 3
exportAwsAccessKeys = true // 4
}
```
1. fusion needs wave support.
2. enable fusion explicitly
3. export the aws credentials as environment variable.
4. same as 3. Different nextflow versions may require different option. Supply both 3 & 4 if you are not sure.

In additional, you may want to:
* point your work directory to a location in s3.
* specify your s3 credentials in the `aws` scope.

When fusion is enabled, you can find similar submit command line in your `.nextflow.log`
```
float -a 34.71.114.123 -u admin -p *** submit --image \
wave.seqera.io/wt/dfd4c4e2d48d/biocontainers/mulled-v2-***:***-0 \
--cpu 12 --mem 72 --job /tmp/nextflow5377817282489183149.command.run \
--env FUSION_WORK=/fusion/s3/cedric-memverge-test/nf-work/work/31/a4b682beb93c944fbd3a342ffc41c5 \
--env AWS_ACCESS_KEY_ID=*** --env AWS_SECRET_ACCESS_KEY=*** \
--env FUSION_TAGS=[.command.*|.exitcode|.fusion.*](nextflow.io/metadata=true),[*](nextflow.io/temporary=true) \
--extraContainerOpts --privileged --customTag nf-job-id:znzjht-4
```
* the task image is wrapped by a layer provided by wave.
__note__: releases prior to MMC 2.3.1 has bug that fails the image pull requests to the wave registry.
please upgrade to the latest MMC master.
* `FUSION_WORK` and `FUSION_TAGS` is added as environment variables.
* aws credentials is added as environment variables.
* use `extraContainerOpts` to make sure we run the container in privileged mode.
__note__: this option requires MMC 2.3 or later.

Tests for the fusion support.
* trivial sequence and scatter workflow.
* the test profile of nf-core/rnaseq
* the test profile of nf-core/sarek

## Task Sample

For each process, users could supply their requirements for the CPU, memory and container image using the standard Nextflow process directives.
Expand Down
2 changes: 1 addition & 1 deletion conf/float-rt.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
plugins {
id '[email protected].0'
id '[email protected].1'
}

workDir = '/mnt/memverge/shared'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class FloatConf {
}

private static def warnDeprecated(String deprecated, String replacement) {
log.warn "[flaot] config option `$deprecated` " +
log.warn "[float] config option `$deprecated` " +
"is no longer supported, " +
"use `$replacement` instead"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ import groovy.util.logging.Slf4j
import nextflow.exception.AbortOperationException
import nextflow.executor.AbstractGridExecutor
import nextflow.file.FileHelper
import nextflow.fusion.FusionHelper
import nextflow.processor.TaskRun
import nextflow.util.Escape
import nextflow.util.ServiceName

import java.nio.file.Path
import java.nio.file.StandardCopyOption
import java.util.concurrent.atomic.AtomicInteger
import java.util.stream.Collectors

/**
Expand All @@ -40,8 +40,6 @@ class FloatGridExecutor extends AbstractGridExecutor {

private FloatJobs _floatJobs

private AtomicInteger serial = new AtomicInteger()

private Path binDir

private FloatConf getFloatConf() {
Expand Down Expand Up @@ -144,10 +142,9 @@ class FloatGridExecutor extends AbstractGridExecutor {
*
* @return
*/
private List<String> getSubmitCmdPrefix() {
final i = serial.incrementAndGet()
private List<String> getSubmitCmdPrefix(Integer index) {
final addresses = floatConf.addresses
final address = addresses[i % (addresses.size())]
final address = addresses[index % (addresses.size())]
return floatConf.getCliPrefix(address)
}

Expand Down Expand Up @@ -189,6 +186,10 @@ class FloatGridExecutor extends AbstractGridExecutor {
}

private List<String> getMountVols(TaskRun task) {
if (isFusionEnabled()) {
return []
}

List<String> volumes = []
volumes << floatConf.getWorkDirVol(workDir.uri)

Expand All @@ -200,8 +201,20 @@ class FloatGridExecutor extends AbstractGridExecutor {
return ret
}

private Map<String, String> getEnv(FloatTaskHandler handler) {
return isFusionEnabled()
? handler.fusionLauncher().fusionEnv()
: [:]
}

@Override
List<String> getSubmitCommandLine(TaskRun task, Path scriptFile) {
return getSubmitCommandLine(new FloatTaskHandler(task, this), scriptFile)
}

List<String> getSubmitCommandLine(FloatTaskHandler handler, Path scriptFile) {
final task = handler.task

validate(task)

final jobName = floatJobs.getJobName(task.id)
Expand All @@ -212,28 +225,46 @@ class FloatGridExecutor extends AbstractGridExecutor {
"you can specify a default container image " +
"with `process.container`")
}
def cmd = getSubmitCmdPrefix()
cmd << 'sbatch'
for (def vol : getMountVols(task)) {
cmd << '--dataVolume' << vol
}
def cmd = getSubmitCmdPrefix(task.index)
cmd << 'submit'
getMountVols(task).forEach { cmd << '--dataVolume' << it }
cmd << '--image' << task.getContainer()
cmd << '--cpu' << task.config.getCpus().toString()
cmd << '--mem' << getMemory(task)
cmd << '--job' << getScriptFilePath(scriptFile)
cmd << '--job' << getScriptFilePath(handler, scriptFile)
getEnv(handler).each { key, val ->
cmd << '--env' << "${key}=${val}".toString()
}
if (isFusionEnabled()) {
cmd << '--extraContainerOpts'
cmd << '--privileged'
}
cmd << '--customTag' << tag
cmd.addAll(getExtra(task))
log.info "[float] submit job: ${toLogStr(cmd)}"
return cmd
}

private String getScriptFilePath(Path scriptFile) {
/**
* TODO: should be removed when float CLI supports stdin script
*/
private String getScriptFilePath(FloatTaskHandler handler, Path scriptFile) {
if (isFusionEnabled()) {
return saveFusionScriptFile(handler, scriptFile)
}
if (workDir.getScheme() == "s3") {
return downloadScriptFile(scriptFile)
}
return scriptFile.toString()
}

protected static String saveFusionScriptFile(FloatTaskHandler handler, Path scriptFile) {
final localTmp = File.createTempFile("nextflow", scriptFile.name)
log.info("save fusion launcher script")
localTmp.text = '#!/bin/bash\n' + handler.fusionSubmitCli().join(' ') + '\n'
return localTmp.getAbsolutePath()
}

protected String downloadScriptFile(Path scriptFile) {
final localTmp = File.createTempFile("nextflow", scriptFile.name)
log.info("download $scriptFile to $localTmp")
Expand All @@ -245,7 +276,7 @@ class FloatGridExecutor extends AbstractGridExecutor {
}

/**
* Parse the string returned by the {@code float sbatch} and extract
* Parse the string returned by the {@code float submit} and extract
* the job ID string
*
* @param text The string returned when submitting the job
Expand Down Expand Up @@ -300,9 +331,10 @@ class FloatGridExecutor extends AbstractGridExecutor {
jobIds.forEach {
def id = it.toString()
def cmd = getCmdPrefixForJob(id)
cmd << 'scancel'
cmd << 'cancel'
cmd << '-j'
cmd << id
cmd << '-f'
log.info "[float] cancel job: ${toLogStr(cmd)}"
ret.add(cmd)
}
Expand All @@ -318,7 +350,7 @@ class FloatGridExecutor extends AbstractGridExecutor {
@Override
protected List<String> getKillCommand() {
def cmd = getCmdPrefix0()
cmd << 'scancel'
cmd << 'cancel'
cmd << '-j'
log.info "[float] cancel job: ${toLogStr(cmd)}"
return cmd
Expand Down Expand Up @@ -355,7 +387,7 @@ class FloatGridExecutor extends AbstractGridExecutor {

private List<String> getQueueCmdOfOC(String oc = "") {
def cmd = floatConf.getCliPrefix(oc)
cmd << 'squeue'
cmd << 'list'
cmd << '--format'
cmd << 'json'
log.info "[float] query job status: ${toLogStr(cmd)}"
Expand All @@ -373,6 +405,7 @@ class FloatGridExecutor extends AbstractGridExecutor {
QueueStatus status = STATUS_MAP.getOrDefault(value, QueueStatus.UNKNOWN)
ret[key] = status
}
log.info "[float] got job status $ret"
return ret
}

Expand Down Expand Up @@ -402,4 +435,14 @@ class FloatGridExecutor extends AbstractGridExecutor {
boolean isContainerNative() {
return true
}

@Override
boolean pipeLauncherScript() {
return isFusionEnabled()
}

@Override
boolean isFusionEnabled() {
return FusionHelper.isFusionEnabled(session)
}
}
35 changes: 17 additions & 18 deletions plugins/nf-float/src/main/com/memverge/nextflow/FloatJobs.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -88,29 +88,28 @@ class FloatJobs {

def currentSt = job2status.get(jobID, Unknown)
def workDir = task2workDir.get(taskID)
if (!workDir) {
return
}
// check the availability of result files
// call list files to update the folder cache
FileHelper.listDirectory(workDir)
def files = ['.command.out', '.command.err', '.exitcode']
if (currentSt != Completed && st == Completed) {
for (filename in files) {
def name = workDir.resolve(filename)
try {
!FileHelper.checkIfExists(name, [checkIfExists: true])
} catch (NoSuchFileException ex) {
log.info("[float] job $jobID completed " +
"but file not found: $ex")
return
if (workDir) {
// check the availability of result files
// call list files to update the folder cache
FileHelper.listDirectory(workDir)
def files = ['.command.out', '.command.err', '.exitcode']
if (currentSt != Completed && st == Completed) {
for (filename in files) {
def name = workDir.resolve(filename)
try {
!FileHelper.checkIfExists(name, [checkIfExists: true])
} catch (NoSuchFileException ex) {
log.info("[float] job $jobID completed " +
"but file not found: $ex")
return
}
}
log.debug("[float] found $files in: $workDir")
}
log.debug("[float] found $files in: $workDir")
}
job2status.put(jobID, st)
}
log.debug("[float] update op-center $oc job status: $job2status")
log.debug "[float] update op-center $oc job status: $job2status"
return job2status
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.executor.GridTaskHandler
import nextflow.processor.TaskRun

/**
* Float task handler
*/
Expand All @@ -38,7 +39,7 @@ class FloatTaskHandler extends GridTaskHandler {
protected ProcessBuilder createProcessBuilder() {

// -- log the submit command
final cli = executor.getSubmitCommandLine(task, wrapperFile)
final cli = ((FloatGridExecutor)executor).getSubmitCommandLine(this, wrapperFile)
log.trace "start process ${task.name} > cli: ${cli}"

// -- prepare submit process
Expand All @@ -53,4 +54,12 @@ class FloatTaskHandler extends GridTaskHandler {
return builder
}

/**
* Override the grid task handler to make the fusion launcher
* script container-native.
*/
protected String fusionStdinWrapper() {
return '#!/bin/bash\n' + fusionSubmitCli().join(' ') + '\n'
}

}
2 changes: 1 addition & 1 deletion plugins/nf-float/src/resources/META-INF/MANIFEST.MF
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Manifest-Version: 1.0
Plugin-Class: com.memverge.nextflow.FloatPlugin
Plugin-Id: nf-float
Plugin-Version: 0.2.0
Plugin-Version: 0.2.1
Plugin-Provider: MemVerge Corp.
Plugin-Requires: >=23.04.0
Loading