Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for temporary output paths #3818

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,9 @@ The following settings are available:
`dag.overwrite`
: When `true` overwrites any existing DAG file with the same name.

`dag.type`
: Can be `abstract` to render the abstract (process) DAG or `concrete` to render the concrete (task) DAG (default: `abstract`).

Read the {ref}`dag-visualisation` page to learn more about the execution graph that can be generated by Nextflow.

(config-docker)=
Expand Down
21 changes: 21 additions & 0 deletions docs/process.md
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,9 @@ Available options:
`maxDepth`
: Maximum number of directory levels to visit (default: no limit)

`temporary`
: When `true` the target files will be deleted once they are no longer needed by downstream tasks.

`type`
: Type of paths returned, either `file`, `dir` or `any` (default: `any`, or `file` if the specified file name pattern contains a double star (`**`))

Expand Down Expand Up @@ -1009,6 +1012,24 @@ One example in which you'd need to manage the naming of output files is when you
To sum up, the use of output files with static names over dynamic ones is preferable whenever possible, because it will result in simpler and more portable code.
:::

### Temporary output files

:::{warning}
This feature is experimental and may change in a future release.
:::

When a `path` output is declared with `temporary: true`, the target files for this output will be automatically deleted during pipeline execution, as soon as they are no longer needed by downstream tasks. This feature is useful for cleaning up large intermediate files in order to free up disk storage.

The lifetime of a temporary file is determined by the downstream tasks that take the file as an input. When all of these tasks finish, the temporary file can be deleted.

The following caveats apply when using temporary outputs:

- Resumability is not currently supported for tasks with temporary outputs. If you try to resume a run with temporary outputs, any tasks whose outputs were deleted will have to be re-run.

- A temporary output should not be forwarded by a downstream process using the `includeInputs` option. In this case, the temporary output will be deleted prematurely, and any process that consumes the forwarded output channel may fail or produce incorrect output. To avoid this problem, only the most downstream output should be declared as temporary.

- If a file captured by a temporary output path is also captured by a regular output path, it will still be treated as a temporary file. If the regular output path is also published, some outputs may be deleted before they can be published.

(process-env)=

### Output type `env`
Expand Down
20 changes: 20 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import nextflow.conda.CondaConfig
import nextflow.config.Manifest
import nextflow.container.ContainerConfig
import nextflow.dag.DAG
import nextflow.dag.ConcreteDAG
import nextflow.exception.AbortOperationException
import nextflow.exception.AbortSignalException
import nextflow.exception.IllegalConfigException
Expand Down Expand Up @@ -193,6 +194,8 @@ class Session implements ISession {

private DAG dag

private ConcreteDAG concreteDag

private CacheDB cache

private Barrier processesBarrier = new Barrier()
Expand Down Expand Up @@ -345,6 +348,7 @@ class Session implements ISession {

// -- DAG object
this.dag = new DAG()
this.concreteDag = new ConcreteDAG()

// -- init work dir
this.workDir = ((config.workDir ?: 'work') as Path).complete()
Expand Down Expand Up @@ -799,6 +803,8 @@ class Session implements ISession {

DAG getDag() { this.dag }

ConcreteDAG getConcreteDAG() { this.concreteDag }

ExecutorService getExecService() { execService }

/**
Expand Down Expand Up @@ -935,6 +941,17 @@ class Session implements ISession {
}
}

void notifyProcessClose(TaskProcessor process) {
observers.each { observer ->
try {
observer.onProcessClose(process)
}
catch( Exception e ) {
log.debug(e.getMessage(), e)
}
}
}

void notifyProcessTerminate(TaskProcessor process) {
for( int i=0; i<observers.size(); i++ ) {
final observer = observers.get(i)
Expand Down Expand Up @@ -1007,6 +1024,9 @@ class Session implements ISession {
final trace = handler.safeTraceRecord()
cache.putTaskAsync(handler, trace)

// save the task meta file to the task directory
handler.writeMetaFile()

// notify the event to the observers
for( int i=0; i<observers.size(); i++ ) {
final observer = observers.get(i)
Expand Down
125 changes: 125 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/dag/ConcreteDAG.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright 2013-2023, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.dag

import java.nio.file.Path
import java.util.regex.Pattern
import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock

import groovy.transform.MapConstructor
import groovy.transform.ToString
import groovy.util.logging.Slf4j
import nextflow.processor.TaskRun
import nextflow.script.params.FileOutParam
/**
* Model the conrete (task) graph of a pipeline execution.
*
* @author Ben Sherman <[email protected]>
*/
@Slf4j
class ConcreteDAG {

private Lock sync = new ReentrantLock()

private Map<String,Task> nodes = new HashMap<>(100)

Map<String,Task> getNodes() {
nodes
}

/**
* Add a task to the graph
*
* @param task
*/
void addTask(TaskRun task) {
final hash = task.hash.toString()
final label = "[${hash.substring(0,2)}/${hash.substring(2,8)}] ${task.name}"
final inputs = task.getInputFilesMap()
.collect { name, path ->
new Input(name: name, path: path, predecessor: getPredecessorHash(path))
}

sync.lock()
try {
nodes[hash] = new Task(
index: nodes.size(),
label: label,
inputs: inputs
)
}
finally {
sync.unlock()
}
}

static public String getPredecessorHash(Path path) {
final pattern = Pattern.compile('.*/([0-9a-f]{2}/[0-9a-f]{30})')
final matcher = pattern.matcher(path.toString())

matcher.find() ? matcher.group(1).replace('/', '') : null
}

/**
* Add a task's outputs to the graph
*
* @param task
*/
void addTaskOutputs(TaskRun task) {
final hash = task.hash.toString()
final outputs = task.getOutputsByType(FileOutParam)
.values()
.flatten()
.collect { path ->
new Output(name: path.name, path: path)
}

sync.lock()
try {
nodes[hash].outputs = outputs
}
finally {
sync.unlock()
}
}

@MapConstructor
@ToString(includeNames = true, includes = 'label', includePackage=false)
static protected class Task {
int index
String label
List<Input> inputs
List<Output> outputs

String getSlug() { "t${index}" }
}

@MapConstructor
static protected class Input {
String name
Path path
String predecessor
}

@MapConstructor
static protected class Output {
String name
Path path
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import java.nio.file.Path
class CytoscapeHtmlRenderer implements DagRenderer {

@Override
void renderDocument(DAG dag, Path file) {
void renderAbstractGraph(DAG dag, Path file) {
String tmplPage = readTemplate()
String network = CytoscapeJsRenderer.renderNetwork(dag)
file.text = tmplPage.replaceAll(~/\/\* REPLACE_WITH_NETWORK_DATA \*\//, network)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import java.nio.file.Path
class CytoscapeJsRenderer implements DagRenderer {

@Override
void renderDocument(DAG dag, Path file) {
void renderAbstractGraph(DAG dag, Path file) {
file.text = renderNetwork(dag)
}

Expand Down
2 changes: 1 addition & 1 deletion modules/nextflow/src/main/groovy/nextflow/dag/DAG.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import nextflow.script.params.TupleOutParam
import java.util.concurrent.atomic.AtomicLong

/**
* Model a direct acyclic graph of the pipeline execution.
* Model the abstract graph of a pipeline execution.
*
* @author Paolo Di Tommaso <[email protected]>
*/
Expand Down
15 changes: 12 additions & 3 deletions modules/nextflow/src/main/groovy/nextflow/dag/DagRenderer.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,19 @@ import java.nio.file.Path
* @author Paolo Di Tommaso <[email protected]>
* @author Mike Smoot <[email protected]>
*/
interface DagRenderer {
trait DagRenderer {

/**
* Render the dag to the specified file.
* Render an abstract (process) DAG.
*/
void renderDocument(DAG dag, Path file);
void renderAbstractGraph(DAG dag, Path file) {
throw new UnsupportedOperationException("Abstract graph rendering is not supported for this file format")
}

/**
* Render a concrete (task) DAG.
*/
void renderConcreteGraph(ConcreteDAG dag, Path file) {
throw new UnsupportedOperationException("Concrete graph rendering is not supported for this file format")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class DotRenderer implements DagRenderer {
static String normalise(String str) { str.replaceAll(/[^0-9_A-Za-z]/,'') }

@Override
void renderDocument(DAG dag, Path file) {
void renderAbstractGraph(DAG dag, Path file) {
file.text = renderNetwork(dag)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class GexfRenderer implements DagRenderer {
}

@Override
void renderDocument(DAG dag, Path file) {
void renderAbstractGraph(DAG dag, Path file) {
final Charset charset = Charset.defaultCharset()
Writer bw = Files.newBufferedWriter(file, charset)
final XMLOutputFactory xof = XMLOutputFactory.newFactory()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class GraphvizRenderer implements DagRenderer {
* See http://www.graphviz.org for more info.
*/
@Override
void renderDocument(DAG dag, Path target) {
void renderAbstractGraph(DAG dag, Path target) {
def result = Files.createTempFile('nxf-',".$format")
def temp = Files.createTempFile('nxf-','.dot')
// save the DAG as `dot` to a temp file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,7 @@ import java.nio.file.Path
class MermaidRenderer implements DagRenderer {

@Override
void renderDocument(DAG dag, Path file) {
file.text = renderNetwork(dag)
}

String renderNetwork(DAG dag) {
void renderAbstractGraph(DAG dag, Path file) {
def lines = []
lines << "flowchart TD"

Expand All @@ -44,7 +40,7 @@ class MermaidRenderer implements DagRenderer {

lines << ""

return lines.join('\n')
file.text = lines.join('\n')
}

private String renderVertex(DAG.Vertex vertex) {
Expand Down Expand Up @@ -75,4 +71,49 @@ class MermaidRenderer implements DagRenderer {

return "${edge.from.name} -->${label} ${edge.to.name}"
}

@Override
void renderConcreteGraph(ConcreteDAG graph, Path file) {
def renderedOutputs = [] as Set<Path>
def numInputs = 0
def numOutputs = 0

def lines = []
lines << "flowchart TD"

// render tasks and task inputs
graph.nodes.values().each { task ->
// render task node
lines << " ${task.getSlug()}[\"${task.label}\"]"

task.inputs.each { input ->
// render task input from predecessor
if( input.predecessor != null ) {
final pred = graph.nodes[input.predecessor]
lines << " ${pred.getSlug()} -->|${input.name}| ${task.getSlug()}"
renderedOutputs << input.path
}

// render task input from source node
else {
numInputs += 1
lines << " i${numInputs}(( )) -->|${input.name}| ${task.getSlug()}"
}
}
}

// render task outputs with sink nodes
graph.nodes.values().each { task ->
task.outputs.each { output ->
if( output.path !in renderedOutputs ) {
numOutputs += 1
lines << " ${task.getSlug()} -->|${output.name}| o${numOutputs}(( ))"
}
}
}

lines << ""

file.text = lines.join('\n')
}
}
Loading