Skip to content

Commit

Permalink
Add initial Fusion support
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Sherman <[email protected]>
  • Loading branch information
bentsherman committed Jul 26, 2023
1 parent eb60190 commit 9a80259
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import groovy.util.logging.Slf4j
import nextflow.exception.AbortOperationException
import nextflow.executor.AbstractGridExecutor
import nextflow.file.FileHelper
import nextflow.fusion.FusionHelper
import nextflow.processor.TaskRun
import nextflow.util.Escape
import nextflow.util.ServiceName
Expand All @@ -40,8 +41,6 @@ class FloatGridExecutor extends AbstractGridExecutor {

private FloatJobs _floatJobs

private AtomicInteger serial = new AtomicInteger()

private Path binDir

private FloatConf getFloatConf() {
Expand Down Expand Up @@ -144,10 +143,9 @@ class FloatGridExecutor extends AbstractGridExecutor {
*
* @return
*/
private List<String> getSubmitCmdPrefix() {
final i = serial.incrementAndGet()
private List<String> getSubmitCmdPrefix(Integer index) {
final addresses = floatConf.addresses
final address = addresses[i % (addresses.size())]
final address = addresses[index % (addresses.size())]
return floatConf.getCliPrefix(address)
}

Expand Down Expand Up @@ -189,6 +187,10 @@ class FloatGridExecutor extends AbstractGridExecutor {
}

private List<String> getMountVols(TaskRun task) {
if (isFusionEnabled()) {
return []
}

List<String> volumes = []
volumes << floatConf.getWorkDirVol(workDir.uri)

Expand All @@ -200,8 +202,20 @@ class FloatGridExecutor extends AbstractGridExecutor {
return ret
}

private Map<String,String> getEnv(FloatTaskHandler handler) {
return isFusionEnabled()
? handler.fusionLauncher().fusionEnv()
: [:]
}

@Override
List<String> getSubmitCommandLine(TaskRun task, Path scriptFile) {
null
}

List<String> getSubmitCommandLine(FloatTaskHandler handler, Path scriptFile) {
final task = handler.task

validate(task)

final jobName = floatJobs.getJobName(task.id)
Expand All @@ -212,28 +226,45 @@ class FloatGridExecutor extends AbstractGridExecutor {
"you can specify a default container image " +
"with `process.container`")
}
def cmd = getSubmitCmdPrefix()
def cmd = getSubmitCmdPrefix(task.index)
cmd << 'sbatch'
for (def vol : getMountVols(task)) {
cmd << '--dataVolume' << vol
}
cmd << '--image' << task.getContainer()
cmd << '--cpu' << task.config.getCpus().toString()
cmd << '--mem' << getMemory(task)
cmd << '--job' << getScriptFilePath(scriptFile)
cmd << '--job' << getScriptFilePath(handler, scriptFile)
def env = getEnv(handler)
if (env) {
cmd << '--env' << env.collect(e -> "${e.key}=${e.value}").join(',')
}
cmd << '--customTag' << tag
cmd.addAll(getExtra(task))
log.info "[float] submit job: ${toLogStr(cmd)}"
return cmd
}

private String getScriptFilePath(Path scriptFile) {
/**
* TODO: should be removed when float CLI supports stdin script
*/
private String getScriptFilePath(FloatTaskHandler handler, Path scriptFile) {
if (isFusionEnabled()) {
return saveFusionScriptFile(handler, scriptFile)
}
if (workDir.getScheme() == "s3") {
return downloadScriptFile(scriptFile)
}
return scriptFile.toString()
}

protected String saveFusionScriptFile(FloatTaskHandler handler, Path scriptFile) {
final localTmp = File.createTempFile("nextflow", scriptFile.name)
log.info("save fusion launcher script")
localTmp.text = '#!/bin/bash\n' + handler.fusionSubmitCli().join(' ') + '\n'
return localTmp.getAbsolutePath()
}

protected String downloadScriptFile(Path scriptFile) {
final localTmp = File.createTempFile("nextflow", scriptFile.name)
log.info("download $scriptFile to $localTmp")
Expand Down Expand Up @@ -402,4 +433,14 @@ class FloatGridExecutor extends AbstractGridExecutor {
boolean isContainerNative() {
return true
}

@Override
boolean pipeLauncherScript() {
return isFusionEnabled()
}

@Override
boolean isFusionEnabled() {
return FusionHelper.isFusionEnabled(session)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.executor.GridTaskHandler
import nextflow.processor.TaskRun

/**
* Float task handler
*/
Expand All @@ -38,7 +39,7 @@ class FloatTaskHandler extends GridTaskHandler {
protected ProcessBuilder createProcessBuilder() {

// -- log the submit command
final cli = executor.getSubmitCommandLine(task, wrapperFile)
final cli = ((FloatGridExecutor)executor).getSubmitCommandLine(this, wrapperFile)
log.trace "start process ${task.name} > cli: ${cli}"

// -- prepare submit process
Expand All @@ -53,4 +54,12 @@ class FloatTaskHandler extends GridTaskHandler {
return builder
}

/**
* Override the grid task handler to make the fusion launcher
* script container-native.
*/
protected String fusionStdinWrapper() {
return '#!/bin/bash\n' + fusionSubmitCli().join(' ') + '\n'
}

}

0 comments on commit 9a80259

Please sign in to comment.