Skip to content

Commit

Permalink
Add support for several process directives
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Sherman <[email protected]>
  • Loading branch information
bentsherman authored and jealous committed Aug 4, 2023
1 parent 6b30ae3 commit 70672ba
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 63 deletions.
109 changes: 57 additions & 52 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,20 @@ process {
* `workDir` is where we mount the NFS and where Nextflow put the process files.
* In the `float` section, users must supply the address of the MMCE operation
center and the proper credentials.
* `address` address of your operation center(s). Separate multiple addresses with `,`.
* `username` and `password` are the credentials for your operation center
* `nfs` points to the location of the NFS.
* `commonExtra` allows the user to specify other submit parameters. This parameter
will be appended to every float submit command.
* In the `process` scope, we assign `float` to `executor` to tell Nextflow to run
the task with the float executor.
* In the `process` scope, we specify `executor = 'float'` to tell Nextflow to execute
tasks with the Float executor.

Available `float` config options:

* `address`: address of your operation center(s). Separate multiple addresses with `,`.
* `username`, `password`: the credentials for your operation center
* `nfs`: the location of the NFS (if using NFS for the work directory)
* `migratePolicy`: the migration policy used by WaveRider, specified as a map. Refer to
the CLI usage for the list of available options.
* `vmPolicy`: the VM creation policy, specified as a map. Refer to the CLI usage
for the list of available options.
* `commonExtra`: allows the user to specify other submit CLI options. This parameter
will be appended to every float submit command.

### Configure with environment variables

Expand Down Expand Up @@ -226,14 +233,18 @@ In additional, you may want to:
* specify your s3 credentials in the `aws` scope.

When fusion is enabled, you can find similar submit command line in your `.nextflow.log`
```
float -a 34.71.114.123 -u admin -p *** submit --image \
wave.seqera.io/wt/dfd4c4e2d48d/biocontainers/mulled-v2-***:***-0 \
--cpu 12 --mem 72 --job /tmp/nextflow5377817282489183149.command.run \
--env FUSION_WORK=/fusion/s3/cedric-memverge-test/nf-work/work/31/a4b682beb93c944fbd3a342ffc41c5 \
--env AWS_ACCESS_KEY_ID=*** --env AWS_SECRET_ACCESS_KEY=*** \
--env FUSION_TAGS=[.command.*|.exitcode|.fusion.*](nextflow.io/metadata=true),[*](nextflow.io/temporary=true) \
--extraContainerOpts --privileged --customTag nf-job-id:znzjht-4
```bash
float -a 34.71.114.123 -u admin -p '***' submit
--image 'wave.seqera.io/wt/dfd4c4e2d48d/biocontainers/mulled-v2-***:***-0'
--cpu 12
--mem 72
--job /tmp/nextflow5377817282489183149.command.run
--env FUSION_WORK=/fusion/s3/cedric-memverge-test/nf-work/work/31/a4b682beb93c944fbd3a342ffc41c5
--env AWS_ACCESS_KEY_ID=***
--env AWS_SECRET_ACCESS_KEY=***
--env 'FUSION_TAGS=[.command.*|.exitcode|.fusion.*](nextflow.io/metadata=true),[*](nextflow.io/temporary=true)'
--extraContainerOpts --privileged
--customTag nf-job-id:znzjht-4
```
* the task image is wrapped by a layer provided by wave.
__note__: releases prior to MMC 2.3.1 has bug that fails the image pull requests to the wave registry.
Expand All @@ -248,7 +259,27 @@ Tests for the fusion support.
* the test profile of nf-core/rnaseq
* the test profile of nf-core/sarek

## Task Sample
### Configure VM creation and migration policies

While the VM and migration policies can be specified like any CLI option via `float.commonExtra`,
they can also be specified using the config options `float.vmPolicy` and `float.migratePolicy` as maps:

```groovy
float {
vmPolicy = [
spotFirst: true,
retryLimit: 3,
retryInterval: '10m'
]
migratePolicy = [
cpu: [upperBoundRatio: 90, upperBoundDuration: '10s'],
mem: [lowerBoundRatio: 20, upperBoundRatio: 90]
]
}
```

## Process definition

For each process, users could supply their requirements for the CPU, memory and container image using the standard Nextflow process directives.
Here is an example of a hello world workflow.
Expand All @@ -273,12 +304,16 @@ workflow {
}
```

* `executor 'float'` - tells Nextflow to execute tasks with Float.
* `cpus` - specifies the number of cores required by this process.
* `memory` specifies the memory.
* `container` - specifies the container image.
* `extra` - specifies extra parameters for the job. It will be merged with
the `commonExtra` parameter.
The following process directives are supported for specifying task resources:

* `conda` (only when using [Wave](https://seqera.io/wave/))
* `container`
* `cpus`
* `disk` (controls the size of the root volume)
* `machineType`
* `memory`
* `resourceLabels`
* `time`

## Run the Workflow

Expand All @@ -289,36 +324,6 @@ file and task file as arguments. Here is an example.
./nextflow run samples/tutorial.nf -c conf/float-rt.conf
```

## Plugin Assets
- `settings.gradle`

Gradle project settings.

- `plugins/nf-float`

The plugin implementation base directory.

- `plugins/nf-float/build.gradle`

Plugin Gradle build file

- `plugins/nf-float/src/resources/META-INF/MANIFEST.MF`

Manifest file defining the plugin attributes.

- `plugins/nf-float/src/resources/META-INF/extensions.idx`

This file declares one or more extension classes provided by the plugin.

- `plugins/nf-float/src/main`

The plugin implementation sources.

- `plugins/nf-float/src/test`

The plugin unit tests.

## Unit testing

Run the following command in the project root directory (ie. where the file `settings.gradle` is located):
Expand Down
22 changes: 20 additions & 2 deletions plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class FloatConf {
String s3secretKey

/** parameters for submitting the tasks */
String vmPolicy
String migratePolicy
String commonExtra

/**
Expand Down Expand Up @@ -118,6 +120,13 @@ class FloatConf {
.findAll { it.size() > 0 }
}
this.nfs = floatNode.nfs

if (floatNode.vmPolicy) {
this.vmPolicy = collapseMapToString(floatNode.vmPolicy as Map)
}
if (floatNode.migratePolicy) {
this.migratePolicy = collapseMapToString(floatNode.migratePolicy as Map)
}
this.commonExtra = floatNode.commonExtra

if (floatNode.cpu)
Expand All @@ -134,13 +143,22 @@ class FloatConf {
warnDeprecated("float.container", "process.container")
}

private static def warnDeprecated(String deprecated, String replacement) {
private String collapseMapToString(Map map) {
final collapsedStr = map
.toConfigObject()
.flatten()
.collect( (k, v) -> "${k}=${v}" )
.join(',')
return "[${collapsedStr}]"
}

private static void warnDeprecated(String deprecated, String replacement) {
log.warn "[float] config option `$deprecated` " +
"is no longer supported, " +
"use `$replacement` instead"
}

private def initAwsConf(Map conf) {
private void initAwsConf(Map conf) {
def cred = Global.getAwsCredentials(System.getenv(), conf)
if (cred && cred.size() > 1) {
s3accessKey = cred[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,19 @@ class FloatGridExecutor extends AbstractGridExecutor {
: [:]
}

private Map<String,String> getCustomTags(TaskRun task) {
final result = new LinkedHashMap<String,String>(10)
result[FloatConf.NF_JOB_ID] = floatJobs.getJobName(task.id)
result.'nextflow.io/processName' = task.processor.name
result.'nextflow.io/runName' = session.runName
result.'nextflow.io/sessionId' = "uuid-${session.uniqueId}".toString()
result.'nextflow.io/taskName' = task.name
final resourceLabels = task.config.getResourceLabels()
if( resourceLabels )
result.putAll(resourceLabels)
return result
}

@Override
List<String> getSubmitCommandLine(TaskRun task, Path scriptFile) {
return getSubmitCommandLine(new FloatTaskHandler(task, this), scriptFile)
Expand All @@ -217,11 +230,9 @@ class FloatGridExecutor extends AbstractGridExecutor {

validate(task)

final jobName = floatJobs.getJobName(task.id)
final String tag = "${FloatConf.NF_JOB_ID}:${jobName}"
final container = task.getContainer()
if (!container) {
throw new AbortOperationException("container is empty." +
throw new AbortOperationException("container is empty. " +
"you can specify a default container image " +
"with `process.container`")
}
Expand All @@ -239,7 +250,24 @@ class FloatGridExecutor extends AbstractGridExecutor {
cmd << '--extraContainerOpts'
cmd << '--privileged'
}
cmd << '--customTag' << tag
getCustomTags(task).each { key, val ->
cmd << '--customTag' << "${key}:${val}".toString()
}
if (task.config.getMachineType()) {
cmd << '--instType' << task.config.getMachineType()
}
if (task.config.getDisk()) {
cmd << '--rootVolSize' << task.config.getDisk().toGiga().toString()
}
if (task.config.getTime()) {
cmd << '--timeLimit' << "${task.config.getTime().toSeconds()}s".toString()
}
if (floatConf.vmPolicy) {
cmd << '--vmPolicy' << floatConf.vmPolicy
}
if (floatConf.migratePolicy) {
cmd << '--migratePolicy' << floatConf.migratePolicy
}
cmd.addAll(getExtra(task))
log.info "[float] submit job: ${toLogStr(cmd)}"
return cmd
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class FloatBaseTest extends BaseTest {
task.id = taskID
task.index = taskSerial.incrementAndGet()
task.workDir = Paths.get(workDir)
task.name = "foo (${task.index})"
return task
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,4 +186,26 @@ class FloatConfTest extends BaseTest {
then:
volume == 'nfs://1.2.3.4/work/dir:/local'
}

def "get vm policy from config"() {
expect:
FloatConf.getConf([float: [vmPolicy: CONF]]).vmPolicy == STR

where:
CONF | STR
[spotOnly:true,retryLimit:10,retryInterval:'30s'] | '[spotOnly=true,retryLimit=10,retryInterval=30s]'
[spotOnly:true,priceLimit:0.1] | '[spotOnly=true,priceLimit=0.1]'
}

def "get migrate policy from config"() {
expect:
FloatConf.getConf([float: [migratePolicy: CONF]]).migratePolicy == STR

where:
CONF | STR
[disable:true] | '[disable=true]'
[cpu:[upperBoundRatio:90,upperBoundDuration:'10s']] | '[cpu.upperBoundRatio=90,cpu.upperBoundDuration=10s]'
[cpu:[lowerBoundRatio:30],mem:[upperBoundRatio:90]] | '[cpu.lowerBoundRatio=30,mem.upperBoundRatio=90]'
[cpu:[step:50]] | '[cpu.step=50]'
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -269,13 +269,73 @@ class FloatGridExecutorTest extends FloatBaseTest {
cmd.join(' ') == expected.join(' ')
}

def "use machine type directive"() {
given:
final exec = newTestExecutor()
final task = newTask(exec, new TaskConfig(
container: image,
machineType: 't2.xlarge',
))

when:
final cmd = exec.getSubmitCommandLine(task, Paths.get(script))

then:
cmd.join(' ').contains('--instType t2.xlarge')
}

def "use disk directive"() {
given:
final exec = newTestExecutor()
final task = newTask(exec, new TaskConfig(
container: image,
disk: '40G',
))

when:
final cmd = exec.getSubmitCommandLine(task, Paths.get(script))

then:
cmd.join(' ').contains('--rootVolSize 40')
}

def "use time directive"() {
given:
final exec = newTestExecutor()
final task = newTask(exec, new TaskConfig(
container: image,
time: '24h',
))

when:
final cmd = exec.getSubmitCommandLine(task, Paths.get(script))

then:
cmd.join(' ').contains('--timeLimit 86400s')
}

def "use resourceLabels directive"() {
given:
final exec = newTestExecutor()
final task = newTask(exec, new TaskConfig(
container: image,
resourceLabels: [foo: 'bar'],
))

when:
final cmd = exec.getSubmitCommandLine(task, Paths.get(script))

then:
cmd.join(' ').contains('--customTag foo:bar')
}

private def taskStatus(int i, String st) {
return """
{
"id": "task$i",
"name": "tJob-$i",
"user": "admin",
"imageID": "docker.io/memverge/cactus:latest",
{
"id": "task$i",
"name": "tJob-$i",
"user": "admin",
"imageID": "docker.io/memverge/cactus:latest",
"status": "$st",
"customTags": {
"nf-job-id": "tJob-$i"
Expand Down

0 comments on commit 70672ba

Please sign in to comment.