From eefc99e9d6c5972291089e2d44a8766646ae1796 Mon Sep 17 00:00:00 2001 From: Cedric Zhuang Date: Tue, 3 Sep 2024 15:52:07 -0700 Subject: [PATCH] [GH-82] Enable concurrent stage in When `scratch` is enabled, use `nxf_parallel` to pull the input files. --- README.md | 47 ++++++++++- .../nextflow/FloatFileCopyStrategy.groovy | 21 ++++- .../nextflow/FloatGridExecutor.groovy | 21 +++-- .../main/com/memverge/nextflow/Global.groovy | 20 ++++- .../memverge/nextflow/FloatBaseTest.groovy | 16 ++-- .../nextflow/FloatFileCopyStrategyTest.groovy | 30 ++++++-- .../nextflow/FloatGridExecutorTest.groovy | 2 +- .../com/memverge/nextflow/GlobalTest.groovy | 77 +++++++++++++++++-- 8 files changed, 205 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index 44c85e5..2c0fb30 100644 --- a/README.md +++ b/README.md @@ -146,6 +146,9 @@ Available `float` config options: to `maxMemoryFactor` * `memory` of the task. * `commonExtra`: allows the user to specify other submit CLI options. This parameter will be appended to every float submit command. +* `maxParallelTransfers`: an integer. default to 4. The maximum number of parallel transfers + for the task happened on the worker node. Note the actual concurrency is the minimum of this + value and the number of available cores. ### Configure with environment variables @@ -182,6 +185,33 @@ If the secret is not available, Nextflow reports error like this: Unknown config secret 'MMC_USERNAME' ``` +### Configuration best practices + +When you are using s3, it's recommended to update the aws client configurations +based on your environment and the workload. Here is an example: +```groovy +aws { + // recommended aws client settings + client { + maxConnections = 200 // Increase this number to allow large concurrency + maxErrorRetry = 10 // Increase the number of retries if needed + connectionTimeout = 60000 // timeout in milliseconds, 60 seconds + socketTimeout = 60000 // timeout in milliseconds, 60 seconds + } +} +``` + +If you are sure that the workflow file is properly composed, it's recommended to +set proper error strategy and retry limit in the process scope to make sure +the workflow can be completed. +Here is an example: +```groovy +process { + errorStrategy='retry' + maxRetries=5 +} +``` + ### Configure s3 work directory To enable s3 as work directory, user need to set work directory to a s3 bucket. @@ -196,6 +226,9 @@ workDir = 's3://bucket/path' process { executor = 'float' container = 'fedora/fedora-minimal' + disk = '120 GB' + scratch = true + stageInMode = 'copy' } podman.registry = 'quay.io' @@ -204,13 +237,12 @@ float { address = 'op.center.address' username = secrets.MMC_USERNAME password = secrets.MMC_PASSWORD - timeFactor = 2 } aws { accessKey = '***' secretKey = '***' - region = 'us-east-2' + region = 'us-east-2' // optional } ``` @@ -233,6 +265,17 @@ Tests done for s3 work directory support: * the test profile of nf-core/rnaseq * the test profile of nf-core/sarek +You can also enable `scrach = true` in the process scope. When `scratch` is enabled, the plugin will +use the scratch space of the worker node to store the task files. This is useful when the task files +are large and the network bandwidth is limited. + +When `scratch` is enabled, it's strongly recommended to set `stageInMode = 'copy'` in the process scope. +This will make sure the task files are copied to the scratch space before the task starts. + +Because the scratch space is local, you need to add `disk = '100 GB'` to make sure the task has enough +space to run. The plugin will also check the total input size of the task and make sure the disk space +is larger than 5 times of the input size. + ### Configure s3fs work directory To enable s3fs as work directory, user need to set work directory to a s3 bucket. diff --git a/plugins/nf-float/src/main/com/memverge/nextflow/FloatFileCopyStrategy.groovy b/plugins/nf-float/src/main/com/memverge/nextflow/FloatFileCopyStrategy.groovy index f3b403e..7f36032 100644 --- a/plugins/nf-float/src/main/com/memverge/nextflow/FloatFileCopyStrategy.groovy +++ b/plugins/nf-float/src/main/com/memverge/nextflow/FloatFileCopyStrategy.groovy @@ -19,6 +19,7 @@ import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import nextflow.executor.SimpleFileCopyStrategy import nextflow.file.FileSystemPathFactory +import nextflow.processor.TaskBean import nextflow.util.Escape import java.nio.file.Path @@ -29,11 +30,27 @@ import java.nio.file.Path class FloatFileCopyStrategy extends SimpleFileCopyStrategy { private FloatConf conf - FloatFileCopyStrategy(FloatConf conf) { - super() + FloatFileCopyStrategy(FloatConf conf, TaskBean bean) { + super(bean) this.conf = conf } + @Override + String getStageInputFilesScript(Map inputFiles) { + def result = 'downloads=(true)\n' + result += super.getStageInputFilesScript(inputFiles) + '\n' + result += 'nxf_parallel "${downloads[@]}"\n' + return result + } + + /** + * {@inheritDoc} + */ + @Override + String stageInputFile( Path path, String targetName ) { + return """downloads+=("${super.stageInputFile(path, targetName)}")""" + } + @Override String getBeforeStartScript() { def script = FloatBashLib.script(conf) diff --git a/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy b/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy index eef9c23..7ccc0e2 100644 --- a/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy +++ b/plugins/nf-float/src/main/com/memverge/nextflow/FloatGridExecutor.groovy @@ -41,6 +41,7 @@ class FloatGridExecutor extends AbstractGridExecutor { private static final int DFT_MEM_GB = 1 private static final long FUSION_MIN_VOL_SIZE = 80 private static final long MIN_VOL_SIZE = 40 + private static final long DISK_INPUT_FACTOR = 5 private FloatJobs _floatJobs @@ -74,7 +75,7 @@ class FloatGridExecutor extends AbstractGridExecutor { protected BashWrapperBuilder createBashWrapperBuilder(TaskRun task) { final bean = new TaskBean(task) - final strategy = new FloatFileCopyStrategy(floatConf) + final strategy = new FloatFileCopyStrategy(floatConf, bean) // creates the wrapper script final builder = new BashWrapperBuilder(bean, strategy) // job directives headers @@ -94,6 +95,11 @@ class FloatGridExecutor extends AbstractGridExecutor { result += "export PATH=\$PATH:${binDir}/bin\n" } + if (floatConf.s3cred.isValid()) { + // if we have s3 credential, make sure aws cli is available in path + result += "export PATH=\$PATH:/opt/aws/dist\n" + result += "export LD_LIBRARY_PATH=\$LIBRARY_PATH:/opt/aws/dist\n" + } return result } @@ -252,6 +258,7 @@ class FloatGridExecutor extends AbstractGridExecutor { return ret } + // get the size of the input files in bytes private static long getInputFileSize(TaskRun task) { long ret = 0 for (def src : task.getInputFilesMap().values()) { @@ -261,9 +268,10 @@ class FloatGridExecutor extends AbstractGridExecutor { } private Map getEnv(FloatTaskHandler handler) { - return isFusionEnabled() + def ret = isFusionEnabled() ? handler.fusionLauncher().fusionEnv() : [:] + return floatConf.s3cred.updateEnvMap(ret) } String getRunName() { @@ -396,7 +404,7 @@ class FloatGridExecutor extends AbstractGridExecutor { } private void addVolSize(List cmd, TaskRun task) { - Long size = MIN_VOL_SIZE + long size = MIN_VOL_SIZE def disk = task.config.getDisk() if (disk) { @@ -405,9 +413,10 @@ class FloatGridExecutor extends AbstractGridExecutor { if (isFusionEnabled()) { size = Math.max(size, FUSION_MIN_VOL_SIZE) } - if (size > MIN_VOL_SIZE) { - cmd << '--imageVolSize' << size.toString() - } + long inputSizeGB = (long)(getInputFileSize(task) / 1024 / 1024 / 1024) + long minDiskSizeBasedOnInput = inputSizeGB * DISK_INPUT_FACTOR + size = Math.max(size, minDiskSizeBasedOnInput) + cmd << '--imageVolSize' << size.toString() } /** diff --git a/plugins/nf-float/src/main/com/memverge/nextflow/Global.groovy b/plugins/nf-float/src/main/com/memverge/nextflow/Global.groovy index 760f999..1251a99 100644 --- a/plugins/nf-float/src/main/com/memverge/nextflow/Global.groovy +++ b/plugins/nf-float/src/main/com/memverge/nextflow/Global.groovy @@ -66,16 +66,32 @@ class AWSCred { def updateMap(Map map) { if (!isValid()) { - return + return map } if (hasAllKeyCaseInsensitive(map, ["accesskey", "secret"])) { - return + return map } map.put("accesskey", accessKey) map.put("secret", secretKey) if (token) { map.put("token", token) } + return map + } + + Map updateEnvMap(Map map) { + if (!isValid()) { + return map + } + if (hasAllKeyCaseInsensitive(map, ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"])) { + return map + } + map.put("AWS_ACCESS_KEY_ID", accessKey) + map.put("AWS_SECRET_ACCESS_KEY", secretKey) + if (token) { + map.put("AWS_SESSION_TOKEN", token) + } + return map } List getOpts() { diff --git a/plugins/nf-float/src/test/com/memverge/nextflow/FloatBaseTest.groovy b/plugins/nf-float/src/test/com/memverge/nextflow/FloatBaseTest.groovy index 06adddf..200db36 100644 --- a/plugins/nf-float/src/test/com/memverge/nextflow/FloatBaseTest.groovy +++ b/plugins/nf-float/src/test/com/memverge/nextflow/FloatBaseTest.groovy @@ -16,10 +16,7 @@ package com.memverge.nextflow import nextflow.Session -import nextflow.processor.TaskConfig -import nextflow.processor.TaskId -import nextflow.processor.TaskProcessor -import nextflow.processor.TaskRun +import nextflow.processor.* import spock.lang.Specification import java.nio.file.Path @@ -85,6 +82,7 @@ class FloatBaseTest extends BaseTest { task.processor = Mock(TaskProcessor) task.processor.getSession() >> Mock(Session) task.processor.getExecutor() >> exec + task.processor.getProcessEnvironment() >> [:] task.config = conf task.id = new TaskId(id) task.index = taskSerial.incrementAndGet() @@ -93,6 +91,13 @@ class FloatBaseTest extends BaseTest { return task } + def newTaskBean(FloatTestExecutor exec, int id, TaskConfig conf = null) { + def task = newTask(exec, id, conf) + def bean = new TaskBean(task) + bean.stageInMode = 'copy' + return bean + } + def jobID(TaskId id) { return "${FloatConf.NF_JOB_ID}:$tJob-$id" } @@ -116,6 +121,7 @@ class FloatBaseTest extends BaseTest { '--customTag', "${FloatConf.NF_SESSION_ID}:uuid-$uuid", '--customTag', "${FloatConf.NF_TASK_NAME}:foo--$taskIDStr-", '--customTag', "${FloatConf.FLOAT_INPUT_SIZE}:0", - '--customTag', "${FloatConf.NF_RUN_NAME}:test-run"] + '--customTag', "${FloatConf.NF_RUN_NAME}:test-run", + '--imageVolSize', '40'] } } diff --git a/plugins/nf-float/src/test/com/memverge/nextflow/FloatFileCopyStrategyTest.groovy b/plugins/nf-float/src/test/com/memverge/nextflow/FloatFileCopyStrategyTest.groovy index 5d671ec..7e52cdb 100644 --- a/plugins/nf-float/src/test/com/memverge/nextflow/FloatFileCopyStrategyTest.groovy +++ b/plugins/nf-float/src/test/com/memverge/nextflow/FloatFileCopyStrategyTest.groovy @@ -17,17 +17,16 @@ package com.memverge.nextflow import java.nio.file.Paths -class FloatFileCopyStrategyTest extends BaseTest { +class FloatFileCopyStrategyTest extends FloatBaseTest { def "get before script with conf"() { given: - def conf = [ - float: [maxParallelTransfers: 10] - ] + def conf = [float: [maxParallelTransfers: 10]] + def exec = newTestExecutor() when: def fConf = FloatConf.getConf(conf) - def strategy = new FloatFileCopyStrategy(fConf) + def strategy = new FloatFileCopyStrategy(fConf, newTaskBean(exec, 1)) final script = strategy.beforeStartScript then: @@ -35,18 +34,37 @@ class FloatFileCopyStrategyTest extends BaseTest { !script.contains('\nnull') } + def "get stage input file script"() { + given: + def conf = [float:[]] + def exec = newTestExecutor() + + when: + def fConf = FloatConf.getConf(conf) + def strategy = new FloatFileCopyStrategy(fConf, newTaskBean(exec, 1)) + final script = strategy.getStageInputFilesScript( + ['a': Paths.get('/target/A')]) + + then: + script.contains('downloads+=("cp -fRL /target/A a")') + script.contains('nxf_parallel') + !script.contains('\nnull') + } + def "get unstage output file script"() { given: def conf = [float:[]] + def exec = newTestExecutor() when: def fConf = FloatConf.getConf(conf) - def strategy = new FloatFileCopyStrategy(fConf) + def strategy = new FloatFileCopyStrategy(fConf, newTaskBean(exec, 1)) final script = strategy.getUnstageOutputFilesScript( ['a',], Paths.get('/target/A')) then: script.contains('eval "ls -1d a"') + script.contains('nxf_parallel') script.contains('uploads+=("nxf_fs_move "$name" /target/A")') } } diff --git a/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy b/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy index 08b5fb6..6de3d9c 100644 --- a/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy +++ b/plugins/nf-float/src/test/com/memverge/nextflow/FloatGridExecutorTest.groovy @@ -211,7 +211,7 @@ class FloatGridExecutorTest extends FloatBaseTest { '10'] then: - cmd == expected + cmd == expected } def "add specific extras"() { diff --git a/plugins/nf-float/src/test/com/memverge/nextflow/GlobalTest.groovy b/plugins/nf-float/src/test/com/memverge/nextflow/GlobalTest.groovy index f5ab996..a8829dc 100644 --- a/plugins/nf-float/src/test/com/memverge/nextflow/GlobalTest.groovy +++ b/plugins/nf-float/src/test/com/memverge/nextflow/GlobalTest.groovy @@ -1,5 +1,9 @@ package com.memverge.nextflow +import nextflow.processor.TaskConfig + +import java.nio.file.Paths + class GlobalTest extends FloatBaseTest { def "get env var. of any names"() { given: @@ -78,11 +82,21 @@ class GlobalTest extends FloatBaseTest { setEnv("AWS_SECRET_KEY") } - def "retrieve aws credentials from system env, with token"() { - given: + def setAwsEnvs() { setEnv("AWS_ACCESS_KEY", "A") setEnv("AWS_SECRET_KEY", "B") setEnv("AWS_SESSION_TOKEN", "C") + } + + def clearAwsEnvs() { + setEnv("AWS_ACCESS_KEY") + setEnv("AWS_SECRET_KEY") + setEnv("AWS_SESSION_TOKEN") + } + + def "retrieve aws credentials from system env, with token"() { + given: + setAwsEnvs() def config = [aws:[]] when: @@ -94,8 +108,61 @@ class GlobalTest extends FloatBaseTest { res.opts == ["accesskey=A", "secret=B", "token=C"] cleanup: - setEnv("AWS_ACCESS_KEY") - setEnv("AWS_SECRET_KEY") - setEnv("AWS_SESSION_TOKEN") + clearAwsEnvs() + } + + def "update arg map with aws credential"() { + given: + setAwsEnvs() + def config = [aws:[]] + + when: + def res = Global.getAwsCredentials(config) + def argMap = res.updateMap(['banana':'apple']) + + then: + res.isValid() == true + argMap['accesskey'] == "A" + argMap['secret'] == "B" + argMap['token'] == "C" + argMap['banana'] == "apple" + + cleanup: + clearAwsEnvs() + } + + def "update env map with aws credential"() { + given: + setAwsEnvs() + def config = [aws:[]] + + when: + def res = Global.getAwsCredentials(config) + def envMap = res.updateEnvMap([:]) + + then: + res.isValid() == true + envMap['AWS_ACCESS_KEY_ID'] == "A" + envMap['AWS_SECRET_ACCESS_KEY'] == "B" + envMap['AWS_SESSION_TOKEN'] == "C" + + cleanup: + clearAwsEnvs() + } + + def "add aws to path if s3 credential is available"() { + given: + setAwsEnvs() + final exec = newTestExecutor() + final task = newTask(exec, 0) + + when: + def script = exec.getHeaderScript(task) + + then: + script.contains("PATH:/opt/aws/dist") + + cleanup: + clearAwsEnvs() } }