Skip to content

Commit

Permalink
[GH-82] Enable concurrent stage in
Browse files Browse the repository at this point in the history
When `scratch` is enabled, use `nxf_parallel` to pull the input
files.
  • Loading branch information
jealous committed Sep 6, 2024
1 parent 4ea7d44 commit eefc99e
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 29 deletions.
47 changes: 45 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ Available `float` config options:
to `maxMemoryFactor` * `memory` of the task.
* `commonExtra`: allows the user to specify other submit CLI options. This parameter
will be appended to every float submit command.
* `maxParallelTransfers`: an integer. default to 4. The maximum number of parallel transfers
for the task happened on the worker node. Note the actual concurrency is the minimum of this
value and the number of available cores.

### Configure with environment variables

Expand Down Expand Up @@ -182,6 +185,33 @@ If the secret is not available, Nextflow reports error like this:
Unknown config secret 'MMC_USERNAME'
```

### Configuration best practices

When you are using s3, it's recommended to update the aws client configurations
based on your environment and the workload. Here is an example:
```groovy
aws {
// recommended aws client settings
client {
maxConnections = 200 // Increase this number to allow large concurrency
maxErrorRetry = 10 // Increase the number of retries if needed
connectionTimeout = 60000 // timeout in milliseconds, 60 seconds
socketTimeout = 60000 // timeout in milliseconds, 60 seconds
}
}
```

If you are sure that the workflow file is properly composed, it's recommended to
set proper error strategy and retry limit in the process scope to make sure
the workflow can be completed.
Here is an example:
```groovy
process {
errorStrategy='retry'
maxRetries=5
}
```

### Configure s3 work directory

To enable s3 as work directory, user need to set work directory to a s3 bucket.
Expand All @@ -196,6 +226,9 @@ workDir = 's3://bucket/path'
process {
executor = 'float'
container = 'fedora/fedora-minimal'
disk = '120 GB'
scratch = true
stageInMode = 'copy'
}
podman.registry = 'quay.io'
Expand All @@ -204,13 +237,12 @@ float {
address = 'op.center.address'
username = secrets.MMC_USERNAME
password = secrets.MMC_PASSWORD
timeFactor = 2
}
aws {
accessKey = '***'
secretKey = '***'
region = 'us-east-2'
region = 'us-east-2' // optional
}
```

Expand All @@ -233,6 +265,17 @@ Tests done for s3 work directory support:
* the test profile of nf-core/rnaseq
* the test profile of nf-core/sarek

You can also enable `scrach = true` in the process scope. When `scratch` is enabled, the plugin will
use the scratch space of the worker node to store the task files. This is useful when the task files
are large and the network bandwidth is limited.

When `scratch` is enabled, it's strongly recommended to set `stageInMode = 'copy'` in the process scope.
This will make sure the task files are copied to the scratch space before the task starts.

Because the scratch space is local, you need to add `disk = '100 GB'` to make sure the task has enough
space to run. The plugin will also check the total input size of the task and make sure the disk space
is larger than 5 times of the input size.

### Configure s3fs work directory

To enable s3fs as work directory, user need to set work directory to a s3 bucket.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.executor.SimpleFileCopyStrategy
import nextflow.file.FileSystemPathFactory
import nextflow.processor.TaskBean
import nextflow.util.Escape

import java.nio.file.Path
Expand All @@ -29,11 +30,27 @@ import java.nio.file.Path
class FloatFileCopyStrategy extends SimpleFileCopyStrategy {
private FloatConf conf

FloatFileCopyStrategy(FloatConf conf) {
super()
FloatFileCopyStrategy(FloatConf conf, TaskBean bean) {
super(bean)
this.conf = conf
}

@Override
String getStageInputFilesScript(Map<String,Path> inputFiles) {
def result = 'downloads=(true)\n'
result += super.getStageInputFilesScript(inputFiles) + '\n'
result += 'nxf_parallel "${downloads[@]}"\n'
return result
}

/**
* {@inheritDoc}
*/
@Override
String stageInputFile( Path path, String targetName ) {
return """downloads+=("${super.stageInputFile(path, targetName)}")"""
}

@Override
String getBeforeStartScript() {
def script = FloatBashLib.script(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class FloatGridExecutor extends AbstractGridExecutor {
private static final int DFT_MEM_GB = 1
private static final long FUSION_MIN_VOL_SIZE = 80
private static final long MIN_VOL_SIZE = 40
private static final long DISK_INPUT_FACTOR = 5

private FloatJobs _floatJobs

Expand Down Expand Up @@ -74,7 +75,7 @@ class FloatGridExecutor extends AbstractGridExecutor {

protected BashWrapperBuilder createBashWrapperBuilder(TaskRun task) {
final bean = new TaskBean(task)
final strategy = new FloatFileCopyStrategy(floatConf)
final strategy = new FloatFileCopyStrategy(floatConf, bean)
// creates the wrapper script
final builder = new BashWrapperBuilder(bean, strategy)
// job directives headers
Expand All @@ -94,6 +95,11 @@ class FloatGridExecutor extends AbstractGridExecutor {
result += "export PATH=\$PATH:${binDir}/bin\n"
}

if (floatConf.s3cred.isValid()) {
// if we have s3 credential, make sure aws cli is available in path
result += "export PATH=\$PATH:/opt/aws/dist\n"
result += "export LD_LIBRARY_PATH=\$LIBRARY_PATH:/opt/aws/dist\n"
}
return result
}

Expand Down Expand Up @@ -252,6 +258,7 @@ class FloatGridExecutor extends AbstractGridExecutor {
return ret
}

// get the size of the input files in bytes
private static long getInputFileSize(TaskRun task) {
long ret = 0
for (def src : task.getInputFilesMap().values()) {
Expand All @@ -261,9 +268,10 @@ class FloatGridExecutor extends AbstractGridExecutor {
}

private Map<String, String> getEnv(FloatTaskHandler handler) {
return isFusionEnabled()
def ret = isFusionEnabled()
? handler.fusionLauncher().fusionEnv()
: [:]
return floatConf.s3cred.updateEnvMap(ret)
}

String getRunName() {
Expand Down Expand Up @@ -396,7 +404,7 @@ class FloatGridExecutor extends AbstractGridExecutor {
}

private void addVolSize(List<String> cmd, TaskRun task) {
Long size = MIN_VOL_SIZE
long size = MIN_VOL_SIZE

def disk = task.config.getDisk()
if (disk) {
Expand All @@ -405,9 +413,10 @@ class FloatGridExecutor extends AbstractGridExecutor {
if (isFusionEnabled()) {
size = Math.max(size, FUSION_MIN_VOL_SIZE)
}
if (size > MIN_VOL_SIZE) {
cmd << '--imageVolSize' << size.toString()
}
long inputSizeGB = (long)(getInputFileSize(task) / 1024 / 1024 / 1024)
long minDiskSizeBasedOnInput = inputSizeGB * DISK_INPUT_FACTOR
size = Math.max(size, minDiskSizeBasedOnInput)
cmd << '--imageVolSize' << size.toString()
}

/**
Expand Down
20 changes: 18 additions & 2 deletions plugins/nf-float/src/main/com/memverge/nextflow/Global.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,32 @@ class AWSCred {

def updateMap(Map map) {
if (!isValid()) {
return
return map
}
if (hasAllKeyCaseInsensitive(map, ["accesskey", "secret"])) {
return
return map
}
map.put("accesskey", accessKey)
map.put("secret", secretKey)
if (token) {
map.put("token", token)
}
return map
}

Map<String, String> updateEnvMap(Map map) {
if (!isValid()) {
return map
}
if (hasAllKeyCaseInsensitive(map, ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"])) {
return map
}
map.put("AWS_ACCESS_KEY_ID", accessKey)
map.put("AWS_SECRET_ACCESS_KEY", secretKey)
if (token) {
map.put("AWS_SESSION_TOKEN", token)
}
return map
}

List<String> getOpts() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@
package com.memverge.nextflow

import nextflow.Session
import nextflow.processor.TaskConfig
import nextflow.processor.TaskId
import nextflow.processor.TaskProcessor
import nextflow.processor.TaskRun
import nextflow.processor.*
import spock.lang.Specification

import java.nio.file.Path
Expand Down Expand Up @@ -85,6 +82,7 @@ class FloatBaseTest extends BaseTest {
task.processor = Mock(TaskProcessor)
task.processor.getSession() >> Mock(Session)
task.processor.getExecutor() >> exec
task.processor.getProcessEnvironment() >> [:]
task.config = conf
task.id = new TaskId(id)
task.index = taskSerial.incrementAndGet()
Expand All @@ -93,6 +91,13 @@ class FloatBaseTest extends BaseTest {
return task
}

def newTaskBean(FloatTestExecutor exec, int id, TaskConfig conf = null) {
def task = newTask(exec, id, conf)
def bean = new TaskBean(task)
bean.stageInMode = 'copy'
return bean
}

def jobID(TaskId id) {
return "${FloatConf.NF_JOB_ID}:$tJob-$id"
}
Expand All @@ -116,6 +121,7 @@ class FloatBaseTest extends BaseTest {
'--customTag', "${FloatConf.NF_SESSION_ID}:uuid-$uuid",
'--customTag', "${FloatConf.NF_TASK_NAME}:foo--$taskIDStr-",
'--customTag', "${FloatConf.FLOAT_INPUT_SIZE}:0",
'--customTag', "${FloatConf.NF_RUN_NAME}:test-run"]
'--customTag', "${FloatConf.NF_RUN_NAME}:test-run",
'--imageVolSize', '40']
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,54 @@ package com.memverge.nextflow

import java.nio.file.Paths

class FloatFileCopyStrategyTest extends BaseTest {
class FloatFileCopyStrategyTest extends FloatBaseTest {

def "get before script with conf"() {
given:
def conf = [
float: [maxParallelTransfers: 10]
]
def conf = [float: [maxParallelTransfers: 10]]
def exec = newTestExecutor()

when:
def fConf = FloatConf.getConf(conf)
def strategy = new FloatFileCopyStrategy(fConf)
def strategy = new FloatFileCopyStrategy(fConf, newTaskBean(exec, 1))
final script = strategy.beforeStartScript

then:
script.contains('cpus>10')
!script.contains('\nnull')
}

def "get stage input file script"() {
given:
def conf = [float:[]]
def exec = newTestExecutor()

when:
def fConf = FloatConf.getConf(conf)
def strategy = new FloatFileCopyStrategy(fConf, newTaskBean(exec, 1))
final script = strategy.getStageInputFilesScript(
['a': Paths.get('/target/A')])

then:
script.contains('downloads+=("cp -fRL /target/A a")')
script.contains('nxf_parallel')
!script.contains('\nnull')
}

def "get unstage output file script"() {
given:
def conf = [float:[]]
def exec = newTestExecutor()

when:
def fConf = FloatConf.getConf(conf)
def strategy = new FloatFileCopyStrategy(fConf)
def strategy = new FloatFileCopyStrategy(fConf, newTaskBean(exec, 1))
final script = strategy.getUnstageOutputFilesScript(
['a',], Paths.get('/target/A'))

then:
script.contains('eval "ls -1d a"')
script.contains('nxf_parallel')
script.contains('uploads+=("nxf_fs_move "$name" /target/A")')
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ class FloatGridExecutorTest extends FloatBaseTest {
'10']

then:
cmd == expected
cmd == expected
}

def "add specific extras"() {
Expand Down
Loading

0 comments on commit eefc99e

Please sign in to comment.