Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for several process directives #51

Merged
merged 1 commit into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 57 additions & 52 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,20 @@ process {
* `workDir` is where we mount the NFS and where Nextflow put the process files.
* In the `float` section, users must supply the address of the MMCE operation
center and the proper credentials.
* `address` address of your operation center(s). Separate multiple addresses with `,`.
* `username` and `password` are the credentials for your operation center
* `nfs` points to the location of the NFS.
* `commonExtra` allows the user to specify other submit parameters. This parameter
will be appended to every float submit command.
* In the `process` scope, we assign `float` to `executor` to tell Nextflow to run
the task with the float executor.
* In the `process` scope, we specify `executor = 'float'` to tell Nextflow to execute
tasks with the Float executor.

Available `float` config options:

* `address`: address of your operation center(s). Separate multiple addresses with `,`.
* `username`, `password`: the credentials for your operation center
* `nfs`: the location of the NFS (if using NFS for the work directory)
* `migratePolicy`: the migration policy used by WaveRider, specified as a map. Refer to
the CLI usage for the list of available options.
* `vmPolicy`: the VM creation policy, specified as a map. Refer to the CLI usage
for the list of available options.
* `commonExtra`: allows the user to specify other submit CLI options. This parameter
will be appended to every float submit command.

### Configure with environment variables

Expand Down Expand Up @@ -226,14 +233,18 @@ In additional, you may want to:
* specify your s3 credentials in the `aws` scope.

When fusion is enabled, you can find similar submit command line in your `.nextflow.log`
```
float -a 34.71.114.123 -u admin -p *** submit --image \
wave.seqera.io/wt/dfd4c4e2d48d/biocontainers/mulled-v2-***:***-0 \
--cpu 12 --mem 72 --job /tmp/nextflow5377817282489183149.command.run \
--env FUSION_WORK=/fusion/s3/cedric-memverge-test/nf-work/work/31/a4b682beb93c944fbd3a342ffc41c5 \
--env AWS_ACCESS_KEY_ID=*** --env AWS_SECRET_ACCESS_KEY=*** \
--env FUSION_TAGS=[.command.*|.exitcode|.fusion.*](nextflow.io/metadata=true),[*](nextflow.io/temporary=true) \
--extraContainerOpts --privileged --customTag nf-job-id:znzjht-4
```bash
float -a 34.71.114.123 -u admin -p '***' submit
--image 'wave.seqera.io/wt/dfd4c4e2d48d/biocontainers/mulled-v2-***:***-0'
--cpu 12
--mem 72
--job /tmp/nextflow5377817282489183149.command.run
--env FUSION_WORK=/fusion/s3/cedric-memverge-test/nf-work/work/31/a4b682beb93c944fbd3a342ffc41c5
--env AWS_ACCESS_KEY_ID=***
--env AWS_SECRET_ACCESS_KEY=***
--env 'FUSION_TAGS=[.command.*|.exitcode|.fusion.*](nextflow.io/metadata=true),[*](nextflow.io/temporary=true)'
--extraContainerOpts --privileged
--customTag nf-job-id:znzjht-4
```
* the task image is wrapped by a layer provided by wave.
__note__: releases prior to MMC 2.3.1 has bug that fails the image pull requests to the wave registry.
Expand All @@ -248,7 +259,27 @@ Tests for the fusion support.
* the test profile of nf-core/rnaseq
* the test profile of nf-core/sarek

## Task Sample
### Configure VM creation and migration policies

While the VM and migration policies can be specified like any CLI option via `float.commonExtra`,
they can also be specified using the config options `float.vmPolicy` and `float.migratePolicy` as maps:

```groovy
float {
vmPolicy = [
spotFirst: true,
retryLimit: 3,
retryInterval: '10m'
]

migratePolicy = [
cpu: [upperBoundRatio: 90, upperBoundDuration: '10s'],
mem: [lowerBoundRatio: 20, upperBoundRatio: 90]
]
}
```

## Process definition

For each process, users could supply their requirements for the CPU, memory and container image using the standard Nextflow process directives.
Here is an example of a hello world workflow.
Expand All @@ -273,12 +304,16 @@ workflow {
}
```

* `executor 'float'` - tells Nextflow to execute tasks with Float.
* `cpus` - specifies the number of cores required by this process.
* `memory` specifies the memory.
* `container` - specifies the container image.
* `extra` - specifies extra parameters for the job. It will be merged with
the `commonExtra` parameter.
The following process directives are supported for specifying task resources:

* `conda` (only when using [Wave](https://seqera.io/wave/))
* `container`
* `cpus`
* `disk` (controls the size of the root volume)
* `machineType`
* `memory`
* `resourceLabels`
* `time`

## Run the Workflow

Expand All @@ -289,36 +324,6 @@ file and task file as arguments. Here is an example.
./nextflow run samples/tutorial.nf -c conf/float-rt.conf
```

## Plugin Assets

- `settings.gradle`

Gradle project settings.

- `plugins/nf-float`

The plugin implementation base directory.

- `plugins/nf-float/build.gradle`

Plugin Gradle build file

- `plugins/nf-float/src/resources/META-INF/MANIFEST.MF`

Manifest file defining the plugin attributes.

- `plugins/nf-float/src/resources/META-INF/extensions.idx`

This file declares one or more extension classes provided by the plugin.

- `plugins/nf-float/src/main`

The plugin implementation sources.

- `plugins/nf-float/src/test`

The plugin unit tests.

## Unit testing

Run the following command in the project root directory (ie. where the file `settings.gradle` is located):
Expand Down
22 changes: 20 additions & 2 deletions plugins/nf-float/src/main/com/memverge/nextflow/FloatConf.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class FloatConf {
String s3secretKey

/** parameters for submitting the tasks */
String vmPolicy
String migratePolicy
String commonExtra

/**
Expand Down Expand Up @@ -118,6 +120,13 @@ class FloatConf {
.findAll { it.size() > 0 }
}
this.nfs = floatNode.nfs

if (floatNode.vmPolicy) {
this.vmPolicy = collapseMapToString(floatNode.vmPolicy as Map)
}
if (floatNode.migratePolicy) {
this.migratePolicy = collapseMapToString(floatNode.migratePolicy as Map)
}
this.commonExtra = floatNode.commonExtra

if (floatNode.cpu)
Expand All @@ -134,13 +143,22 @@ class FloatConf {
warnDeprecated("float.container", "process.container")
}

private static def warnDeprecated(String deprecated, String replacement) {
private String collapseMapToString(Map map) {
final collapsedStr = map
.toConfigObject()
.flatten()
.collect( (k, v) -> "${k}=${v}" )
.join(',')
return "[${collapsedStr}]"
}

private static void warnDeprecated(String deprecated, String replacement) {
log.warn "[float] config option `$deprecated` " +
"is no longer supported, " +
"use `$replacement` instead"
}

private def initAwsConf(Map conf) {
private void initAwsConf(Map conf) {
def cred = Global.getAwsCredentials(System.getenv(), conf)
if (cred && cred.size() > 1) {
s3accessKey = cred[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,19 @@ class FloatGridExecutor extends AbstractGridExecutor {
: [:]
}

private Map<String,String> getCustomTags(TaskRun task) {
final result = new LinkedHashMap<String,String>(10)
result[FloatConf.NF_JOB_ID] = floatJobs.getJobName(task.id)
result.'nextflow.io/processName' = task.processor.name
result.'nextflow.io/runName' = session.runName
result.'nextflow.io/sessionId' = "uuid-${session.uniqueId}".toString()
result.'nextflow.io/taskName' = task.name
final resourceLabels = task.config.getResourceLabels()
if( resourceLabels )
result.putAll(resourceLabels)
return result
}

@Override
List<String> getSubmitCommandLine(TaskRun task, Path scriptFile) {
return getSubmitCommandLine(new FloatTaskHandler(task, this), scriptFile)
Expand All @@ -217,11 +230,9 @@ class FloatGridExecutor extends AbstractGridExecutor {

validate(task)

final jobName = floatJobs.getJobName(task.id)
final String tag = "${FloatConf.NF_JOB_ID}:${jobName}"
final container = task.getContainer()
if (!container) {
throw new AbortOperationException("container is empty." +
throw new AbortOperationException("container is empty. " +
"you can specify a default container image " +
"with `process.container`")
}
Expand All @@ -239,7 +250,24 @@ class FloatGridExecutor extends AbstractGridExecutor {
cmd << '--extraContainerOpts'
cmd << '--privileged'
}
cmd << '--customTag' << tag
getCustomTags(task).each { key, val ->
cmd << '--customTag' << "${key}:${val}".toString()
}
if (task.config.getMachineType()) {
cmd << '--instType' << task.config.getMachineType()
}
if (task.config.getDisk()) {
cmd << '--rootVolSize' << task.config.getDisk().toGiga().toString()
}
if (task.config.getTime()) {
cmd << '--timeLimit' << "${task.config.getTime().toSeconds()}s".toString()
}
if (floatConf.vmPolicy) {
cmd << '--vmPolicy' << floatConf.vmPolicy
}
if (floatConf.migratePolicy) {
cmd << '--migratePolicy' << floatConf.migratePolicy
}
cmd.addAll(getExtra(task))
log.info "[float] submit job: ${toLogStr(cmd)}"
return cmd
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class FloatBaseTest extends BaseTest {
task.id = taskID
task.index = taskSerial.incrementAndGet()
task.workDir = Paths.get(workDir)
task.name = "foo (${task.index})"
return task
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,4 +186,26 @@ class FloatConfTest extends BaseTest {
then:
volume == 'nfs://1.2.3.4/work/dir:/local'
}

def "get vm policy from config"() {
expect:
FloatConf.getConf([float: [vmPolicy: CONF]]).vmPolicy == STR

where:
CONF | STR
[spotOnly:true,retryLimit:10,retryInterval:'30s'] | '[spotOnly=true,retryLimit=10,retryInterval=30s]'
[spotOnly:true,priceLimit:0.1] | '[spotOnly=true,priceLimit=0.1]'
}

def "get migrate policy from config"() {
expect:
FloatConf.getConf([float: [migratePolicy: CONF]]).migratePolicy == STR

where:
CONF | STR
[disable:true] | '[disable=true]'
[cpu:[upperBoundRatio:90,upperBoundDuration:'10s']] | '[cpu.upperBoundRatio=90,cpu.upperBoundDuration=10s]'
[cpu:[lowerBoundRatio:30],mem:[upperBoundRatio:90]] | '[cpu.lowerBoundRatio=30,mem.upperBoundRatio=90]'
[cpu:[step:50]] | '[cpu.step=50]'
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -269,13 +269,73 @@ class FloatGridExecutorTest extends FloatBaseTest {
cmd.join(' ') == expected.join(' ')
}

def "use machine type directive"() {
given:
final exec = newTestExecutor()
final task = newTask(exec, new TaskConfig(
container: image,
machineType: 't2.xlarge',
))

when:
final cmd = exec.getSubmitCommandLine(task, Paths.get(script))

then:
cmd.join(' ').contains('--instType t2.xlarge')
}

def "use disk directive"() {
given:
final exec = newTestExecutor()
final task = newTask(exec, new TaskConfig(
container: image,
disk: '40G',
))

when:
final cmd = exec.getSubmitCommandLine(task, Paths.get(script))

then:
cmd.join(' ').contains('--rootVolSize 40')
}

def "use time directive"() {
given:
final exec = newTestExecutor()
final task = newTask(exec, new TaskConfig(
container: image,
time: '24h',
))

when:
final cmd = exec.getSubmitCommandLine(task, Paths.get(script))

then:
cmd.join(' ').contains('--timeLimit 86400s')
}

def "use resourceLabels directive"() {
given:
final exec = newTestExecutor()
final task = newTask(exec, new TaskConfig(
container: image,
resourceLabels: [foo: 'bar'],
))

when:
final cmd = exec.getSubmitCommandLine(task, Paths.get(script))

then:
cmd.join(' ').contains('--customTag foo:bar')
}

private def taskStatus(int i, String st) {
return """
{
"id": "task$i",
"name": "tJob-$i",
"user": "admin",
"imageID": "docker.io/memverge/cactus:latest",
{
"id": "task$i",
"name": "tJob-$i",
"user": "admin",
"imageID": "docker.io/memverge/cactus:latest",
"status": "$st",
"customTags": {
"nf-job-id": "tJob-$i"
Expand Down