From 9760f4fa4ca8a8681c7d82e17679b81e45eb510d Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Fri, 28 Apr 2023 14:26:20 -0500 Subject: [PATCH] Remove dependency on task graph branch Signed-off-by: Ben Sherman --- docs/config.md | 3 - .../src/main/groovy/nextflow/Session.groovy | 9 -- .../groovy/nextflow/dag/ConcreteDAG.groovy | 125 ------------------ .../nextflow/dag/CytoscapeHtmlRenderer.groovy | 2 +- .../nextflow/dag/CytoscapeJsRenderer.groovy | 2 +- .../src/main/groovy/nextflow/dag/DAG.groovy | 2 +- .../groovy/nextflow/dag/DagRenderer.groovy | 15 +-- .../groovy/nextflow/dag/DotRenderer.groovy | 2 +- .../groovy/nextflow/dag/GexfRenderer.groovy | 2 +- .../nextflow/dag/GraphVizRenderer.groovy | 2 +- .../nextflow/dag/MermaidRenderer.groovy | 53 +------- .../groovy/nextflow/dag/NodeMarker.groovy | 6 +- .../nextflow/processor/TaskHandler.groovy | 28 ---- .../nextflow/processor/TaskProcessor.groovy | 2 +- .../groovy/nextflow/processor/TaskRun.groovy | 1 - .../trace/DefaultObserverFactory.groovy | 1 - .../nextflow/trace/GraphObserver.groovy | 66 ++++----- .../trace/TemporaryFileObserver.groovy | 1 - .../nextflow/dag/ConcreteDAGTest.groovy | 86 ------------ .../nextflow/dag/DotRendererTest.groovy | 2 +- .../nextflow/dag/GexfRendererTest.groovy | 2 +- .../nextflow/dag/MermaidRendererTest.groovy | 70 +--------- .../nextflow/processor/TaskHandlerTest.groovy | 25 ---- .../nextflow/trace/GraphObserverTest.groovy | 34 ++--- .../main/nextflow/extension/FilesEx.groovy | 20 --- .../main/nextflow/file/ETagAwareFile.groovy | 29 ---- .../main/nextflow/cloud/aws/nio/S3Path.java | 11 +- .../cloud/azure/nio/AzFileAttributes.groovy | 8 -- .../nextflow/cloud/azure/nio/AzPath.groovy | 8 +- tests/process-named-inputs.groovy | 88 ++++++++++++ tests/process-operator-closure.nf | 36 +++++ 31 files changed, 197 insertions(+), 544 deletions(-) delete mode 100644 modules/nextflow/src/main/groovy/nextflow/dag/ConcreteDAG.groovy delete mode 100644 modules/nextflow/src/test/groovy/nextflow/dag/ConcreteDAGTest.groovy delete mode 100644 modules/nf-commons/src/main/nextflow/file/ETagAwareFile.groovy create mode 100644 tests/process-named-inputs.groovy create mode 100644 tests/process-operator-closure.nf diff --git a/docs/config.md b/docs/config.md index a6bccaca22..b729fb48cd 100644 --- a/docs/config.md +++ b/docs/config.md @@ -483,9 +483,6 @@ The following settings are available: `dag.overwrite` : When `true` overwrites any existing DAG file with the same name. -`dag.type` -: Can be `abstract` to render the abstract (process) DAG or `concrete` to render the concrete (task) DAG (default: `abstract`). - Read the {ref}`dag-visualisation` page to learn more about the execution graph that can be generated by Nextflow. (config-docker)= diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index 836d2f50e7..0020c7e11d 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -39,7 +39,6 @@ import nextflow.conda.CondaConfig import nextflow.config.Manifest import nextflow.container.ContainerConfig import nextflow.dag.DAG -import nextflow.dag.ConcreteDAG import nextflow.exception.AbortOperationException import nextflow.exception.AbortSignalException import nextflow.exception.IllegalConfigException @@ -194,8 +193,6 @@ class Session implements ISession { private DAG dag - private ConcreteDAG concreteDag - private CacheDB cache private Barrier processesBarrier = new Barrier() @@ -348,7 +345,6 @@ class Session implements ISession { // -- DAG object this.dag = new DAG() - this.concreteDag = new ConcreteDAG() // -- init work dir this.workDir = ((config.workDir ?: 'work') as Path).complete() @@ -803,8 +799,6 @@ class Session implements ISession { DAG getDag() { this.dag } - ConcreteDAG getConcreteDAG() { this.concreteDag } - ExecutorService getExecService() { execService } /** @@ -1024,9 +1018,6 @@ class Session implements ISession { final trace = handler.safeTraceRecord() cache.putTaskAsync(handler, trace) - // save the task meta file to the task directory - handler.writeMetaFile() - // notify the event to the observers for( int i=0; i - */ -@Slf4j -class ConcreteDAG { - - private Lock sync = new ReentrantLock() - - private Map nodes = new HashMap<>(100) - - Map getNodes() { - nodes - } - - /** - * Add a task to the graph - * - * @param task - */ - void addTask(TaskRun task) { - final hash = task.hash.toString() - final label = "[${hash.substring(0,2)}/${hash.substring(2,8)}] ${task.name}" - final inputs = task.getInputFilesMap() - .collect { name, path -> - new Input(name: name, path: path, predecessor: getPredecessorHash(path)) - } - - sync.lock() - try { - nodes[hash] = new Task( - index: nodes.size(), - label: label, - inputs: inputs - ) - } - finally { - sync.unlock() - } - } - - static public String getPredecessorHash(Path path) { - final pattern = Pattern.compile('.*/([0-9a-f]{2}/[0-9a-f]{30})') - final matcher = pattern.matcher(path.toString()) - - matcher.find() ? matcher.group(1).replace('/', '') : null - } - - /** - * Add a task's outputs to the graph - * - * @param task - */ - void addTaskOutputs(TaskRun task) { - final hash = task.hash.toString() - final outputs = task.getOutputsByType(FileOutParam) - .values() - .flatten() - .collect { path -> - new Output(name: path.name, path: path) - } - - sync.lock() - try { - nodes[hash].outputs = outputs - } - finally { - sync.unlock() - } - } - - @MapConstructor - @ToString(includeNames = true, includes = 'label', includePackage=false) - static protected class Task { - int index - String label - List inputs - List outputs - - String getSlug() { "t${index}" } - } - - @MapConstructor - static protected class Input { - String name - Path path - String predecessor - } - - @MapConstructor - static protected class Output { - String name - Path path - } - -} diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/CytoscapeHtmlRenderer.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/CytoscapeHtmlRenderer.groovy index e839824da8..5aff50ffe3 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/CytoscapeHtmlRenderer.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/CytoscapeHtmlRenderer.groovy @@ -28,7 +28,7 @@ import java.nio.file.Path class CytoscapeHtmlRenderer implements DagRenderer { @Override - void renderAbstractGraph(DAG dag, Path file) { + void renderDocument(DAG dag, Path file) { String tmplPage = readTemplate() String network = CytoscapeJsRenderer.renderNetwork(dag) file.text = tmplPage.replaceAll(~/\/\* REPLACE_WITH_NETWORK_DATA \*\//, network) diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/CytoscapeJsRenderer.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/CytoscapeJsRenderer.groovy index af2a09b98e..dfcb8246df 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/CytoscapeJsRenderer.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/CytoscapeJsRenderer.groovy @@ -28,7 +28,7 @@ import java.nio.file.Path class CytoscapeJsRenderer implements DagRenderer { @Override - void renderAbstractGraph(DAG dag, Path file) { + void renderDocument(DAG dag, Path file) { file.text = renderNetwork(dag) } diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/DAG.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/DAG.groovy index da4e5887ce..d1ef0c0666 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/DAG.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/DAG.groovy @@ -43,7 +43,7 @@ import nextflow.script.params.TupleOutParam import java.util.concurrent.atomic.AtomicLong /** - * Model the abstract graph of a pipeline execution. + * Model a direct acyclic graph of the pipeline execution. * * @author Paolo Di Tommaso */ diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/DagRenderer.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/DagRenderer.groovy index 768c9ef11e..0d3bfcd252 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/DagRenderer.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/DagRenderer.groovy @@ -23,19 +23,10 @@ import java.nio.file.Path * @author Paolo Di Tommaso * @author Mike Smoot */ -trait DagRenderer { +interface DagRenderer { /** - * Render an abstract (process) DAG. + * Render the dag to the specified file. */ - void renderAbstractGraph(DAG dag, Path file) { - throw new UnsupportedOperationException("Abstract graph rendering is not supported for this file format") - } - - /** - * Render a concrete (task) DAG. - */ - void renderConcreteGraph(ConcreteDAG dag, Path file) { - throw new UnsupportedOperationException("Concrete graph rendering is not supported for this file format") - } + void renderDocument(DAG dag, Path file); } diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/DotRenderer.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/DotRenderer.groovy index c5411f820b..f9b5a36e41 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/DotRenderer.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/DotRenderer.groovy @@ -45,7 +45,7 @@ class DotRenderer implements DagRenderer { static String normalise(String str) { str.replaceAll(/[^0-9_A-Za-z]/,'') } @Override - void renderAbstractGraph(DAG dag, Path file) { + void renderDocument(DAG dag, Path file) { file.text = renderNetwork(dag) } diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/GexfRenderer.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/GexfRenderer.groovy index 43aaba6e9c..c2bac51110 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/GexfRenderer.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/GexfRenderer.groovy @@ -41,7 +41,7 @@ class GexfRenderer implements DagRenderer { } @Override - void renderAbstractGraph(DAG dag, Path file) { + void renderDocument(DAG dag, Path file) { final Charset charset = Charset.defaultCharset() Writer bw = Files.newBufferedWriter(file, charset) final XMLOutputFactory xof = XMLOutputFactory.newFactory() diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/GraphVizRenderer.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/GraphVizRenderer.groovy index 0fddcee58d..3106890b9c 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/GraphVizRenderer.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/GraphVizRenderer.groovy @@ -41,7 +41,7 @@ class GraphvizRenderer implements DagRenderer { * See http://www.graphviz.org for more info. */ @Override - void renderAbstractGraph(DAG dag, Path target) { + void renderDocument(DAG dag, Path target) { def result = Files.createTempFile('nxf-',".$format") def temp = Files.createTempFile('nxf-','.dot') // save the DAG as `dot` to a temp file diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/MermaidRenderer.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/MermaidRenderer.groovy index fa8f6f602f..3afdb7cf42 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/MermaidRenderer.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/MermaidRenderer.groovy @@ -26,7 +26,11 @@ import java.nio.file.Path class MermaidRenderer implements DagRenderer { @Override - void renderAbstractGraph(DAG dag, Path file) { + void renderDocument(DAG dag, Path file) { + file.text = renderNetwork(dag) + } + + String renderNetwork(DAG dag) { def lines = [] lines << "flowchart TD" @@ -40,7 +44,7 @@ class MermaidRenderer implements DagRenderer { lines << "" - file.text = lines.join('\n') + return lines.join('\n') } private String renderVertex(DAG.Vertex vertex) { @@ -71,49 +75,4 @@ class MermaidRenderer implements DagRenderer { return "${edge.from.name} -->${label} ${edge.to.name}" } - - @Override - void renderConcreteGraph(ConcreteDAG graph, Path file) { - def renderedOutputs = [] as Set - def numInputs = 0 - def numOutputs = 0 - - def lines = [] - lines << "flowchart TD" - - // render tasks and task inputs - graph.nodes.values().each { task -> - // render task node - lines << " ${task.getSlug()}[\"${task.label}\"]" - - task.inputs.each { input -> - // render task input from predecessor - if( input.predecessor != null ) { - final pred = graph.nodes[input.predecessor] - lines << " ${pred.getSlug()} -->|${input.name}| ${task.getSlug()}" - renderedOutputs << input.path - } - - // render task input from source node - else { - numInputs += 1 - lines << " i${numInputs}(( )) -->|${input.name}| ${task.getSlug()}" - } - } - } - - // render task outputs with sink nodes - graph.nodes.values().each { task -> - task.outputs.each { output -> - if( output.path !in renderedOutputs ) { - numOutputs += 1 - lines << " ${task.getSlug()} -->|${output.name}| o${numOutputs}(( ))" - } - } - } - - lines << "" - - file.text = lines.join('\n') - } } diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/NodeMarker.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/NodeMarker.groovy index 18550b4cc6..68d56a12d5 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/NodeMarker.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/NodeMarker.groovy @@ -40,7 +40,7 @@ class NodeMarker { static private Session getSession() { Global.session as Session } /** - * Creates a vertex in the abstract DAG representing a computing `process` + * Creates a new vertex in the DAG representing a computing `process` * * @param label The label associated to the process * @param inputs The list of inputs entering in the process @@ -52,7 +52,7 @@ class NodeMarker { } /** - * Creates a vertex in the abstract DAG representing a dataflow operator + * Creates a new DAG vertex representing a dataflow operator * * @param label The operator label * @param inputs The operator input(s). It can be either a single channel or a list of channels. @@ -66,7 +66,7 @@ class NodeMarker { } /** - * Creates a vertex in the abstract DAG representing a dataflow channel source. + * Creates a vertex in the DAG representing a dataflow channel source. * * @param label The node description * @param source Either a dataflow channel or a list of channel. diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy index 42ee522185..a94ded210f 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy @@ -18,14 +18,9 @@ package nextflow.processor import static nextflow.processor.TaskStatus.* -import java.nio.file.Files import java.nio.file.NoSuchFileException -import groovy.json.JsonBuilder import groovy.util.logging.Slf4j -import nextflow.dag.ConcreteDAG -import nextflow.extension.FilesEx -import nextflow.script.params.FileOutParam import nextflow.trace.TraceRecord /** * Actions to handle the underlying job running the user task. @@ -218,29 +213,6 @@ abstract class TaskHandler { return record } - void writeMetaFile() { - final record = [ - hash: task.hash.toString(), - inputs: task.getInputFilesMap().collect { name, path -> - [ - name: name, - path: path.toString(), - predecessor: ConcreteDAG.getPredecessorHash(path) - ] - }, - outputs: task.getOutputsByType(FileOutParam).values().flatten().collect { path -> - [ - name: path.name, - path: path.toString(), - size: Files.size(path), - checksum: FilesEx.getChecksum(path) - ] - } - ] - - task.workDir.resolve(TaskRun.CMD_META).text = new JsonBuilder(record).toString() - } - /** * Determine if a process can be forked i.e. can launch * a parallel task execution. This is only enforced when diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy index 00f38a443a..88e173d33d 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy @@ -543,7 +543,7 @@ class TaskProcessor { def invoke = new InvokeTaskAdapter(this, opInputs.size()) session.allOperators << (operator = new DataflowOperator(group, params, invoke)) - // notify the creation of a new process in the abstract DAG + // notify the creation of a new vertex the execution DAG NodeMarker.addProcessNode(this, config.getInputs(), config.getOutputs()) // fix issue #41 diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy index 703109d9c6..a45382a144 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy @@ -550,7 +550,6 @@ class TaskRun implements Cloneable { static final public String CMD_STAGE = '.command.stage' static final public String CMD_TRACE = '.command.trace' static final public String CMD_ENV = '.command.env' - static final public String CMD_META = '.command.meta.json' String toString( ) { diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy index cd7bd72d7e..dac68d9adb 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/DefaultObserverFactory.groovy @@ -94,7 +94,6 @@ class DefaultObserverFactory implements TraceObserverFactory { if( !fileName ) fileName = GraphObserver.DEF_FILE_NAME def traceFile = (fileName as Path).complete() def observer = new GraphObserver(traceFile) - config.navigate('dag.type') { observer.type = it ?: 'abstract' } config.navigate('dag.overwrite') { observer.overwrite = it } result << observer } diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/GraphObserver.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/GraphObserver.groovy index 23c565277d..0a61d43eda 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/GraphObserver.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/GraphObserver.groovy @@ -23,7 +23,6 @@ import groovy.util.logging.Slf4j import nextflow.Session import nextflow.dag.CytoscapeHtmlRenderer import nextflow.dag.DAG -import nextflow.dag.ConcreteDAG import nextflow.dag.DagRenderer import nextflow.dag.DotRenderer import nextflow.dag.GexfRenderer @@ -46,11 +45,7 @@ class GraphObserver implements TraceObserver { private Path file - private String type = 'abstract' - - private DAG abstractDag - - private ConcreteDAG concreteDag + private DAG dag private String name @@ -71,8 +66,7 @@ class GraphObserver implements TraceObserver { @Override void onFlowCreate(Session session) { - this.abstractDag = session.dag - this.concreteDag = session.concreteDag + this.dag = session.dag // check file existance final attrs = FileHelper.readAttributes(file) if( attrs ) { @@ -83,40 +77,16 @@ class GraphObserver implements TraceObserver { } } - @Override - void onProcessSubmit(TaskHandler handler, TraceRecord trace) { - concreteDag.addTask( handler.task ) - } - - @Override - void onProcessComplete(TaskHandler handler, TraceRecord trace) { - concreteDag.addTaskOutputs( handler.task ) - } - - @Override - void onProcessCached(TaskHandler handler, TraceRecord trace) { - concreteDag.addTask( handler.task ) - concreteDag.addTaskOutputs( handler.task ) - } - @Override void onFlowComplete() { - if( type == 'abstract' ) { - // -- normalise the DAG - abstractDag.normalize() - // -- render it to a file - createRenderer().renderAbstractGraph(abstractDag,file) - } - else if( type == 'concrete' ) { - createRenderer().renderConcreteGraph(concreteDag,file) - } - else { - log.warn("Invalid DAG type '${type}', should be 'abstract' or 'concrete'") - } + // -- normalise the DAG + dag.normalize() + // -- render it to a file + createRender().renderDocument(dag,file) } @PackageScope - DagRenderer createRenderer() { + DagRenderer createRender() { if( format == 'dot' ) new DotRenderer(name) @@ -133,6 +103,28 @@ class GraphObserver implements TraceObserver { new GraphvizRenderer(name, format) } + + @Override + void onProcessCreate(TaskProcessor process) { + + } + + + @Override + void onProcessSubmit(TaskHandler handler, TraceRecord trace) { + + } + + @Override + void onProcessStart(TaskHandler handler, TraceRecord trace) { + + } + + @Override + void onProcessComplete(TaskHandler handler, TraceRecord trace) { + + } + @Override boolean enableMetrics() { return false diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/TemporaryFileObserver.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/TemporaryFileObserver.groovy index e487d8746d..33a4327853 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/TemporaryFileObserver.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/TemporaryFileObserver.groovy @@ -25,7 +25,6 @@ import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import nextflow.Session import nextflow.dag.DAG -import nextflow.dag.ConcreteDAG import nextflow.file.FileHelper import nextflow.processor.TaskHandler import nextflow.processor.TaskProcessor diff --git a/modules/nextflow/src/test/groovy/nextflow/dag/ConcreteDAGTest.groovy b/modules/nextflow/src/test/groovy/nextflow/dag/ConcreteDAGTest.groovy deleted file mode 100644 index a666372169..0000000000 --- a/modules/nextflow/src/test/groovy/nextflow/dag/ConcreteDAGTest.groovy +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright 2013-2023, Seqera Labs - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package nextflow.dag - -import java.nio.file.Paths - -import com.google.common.hash.HashCode -import nextflow.processor.TaskRun -import spock.lang.Specification -/** - * - * @author Ben Sherman - */ -class ConcreteDAGTest extends Specification { - - def 'should add task nodes and outputs' () { - - given: - def task1 = Mock(TaskRun) { - getHash() >> HashCode.fromString('00112233445566778899aabbccddeeff') - getName() >> 'foo' - getInputFilesMap() >> [ - 'data.txt': Paths.get('/inputs/data.txt') - ] - getOutputsByType(_) >> [ - 'data.foo': Paths.get('/work/00/112233445566778899aabbccddeeff/data.foo') - ] - } - def task2 = Mock(TaskRun) { - getHash() >> HashCode.fromString('aabbccddeeff00112233445566778899') - getName() >> 'bar' - getInputFilesMap() >> [ - 'data.foo': Paths.get('/work/00/112233445566778899aabbccddeeff/data.foo') - ] - getOutputsByType(_) >> [ - 'data.bar': Paths.get('/work/aa/bbccddeeff00112233445566778899/data.bar') - ] - } - def dag = new ConcreteDAG() - - when: - dag.addTask( task1 ) - dag.addTask( task2 ) - def node1 = dag.nodes['00112233445566778899aabbccddeeff'] - def node2 = dag.nodes['aabbccddeeff00112233445566778899'] - then: - node1.index == 0 - node1.label == '[00/112233] foo' - node1.inputs.size() == 1 - node1.inputs[0].name == 'data.txt' - node1.inputs[0].path == Paths.get('/inputs/data.txt') - node1.inputs[0].predecessor == null - node2.index == 1 - node2.label == '[aa/bbccdd] bar' - node2.inputs.size() == 1 - node2.inputs[0].name == 'data.foo' - node2.inputs[0].path == Paths.get('/work/00/112233445566778899aabbccddeeff/data.foo') - node2.inputs[0].predecessor == '00112233445566778899aabbccddeeff' - - when: - dag.addTaskOutputs( task1 ) - dag.addTaskOutputs( task2 ) - then: - node1.outputs.size() == 1 - node1.outputs[0].name == 'data.foo' - node1.outputs[0].path == Paths.get('/work/00/112233445566778899aabbccddeeff/data.foo') - node2.outputs.size() == 1 - node2.outputs[0].name == 'data.bar' - node2.outputs[0].path == Paths.get('/work/aa/bbccddeeff00112233445566778899/data.bar') - } - -} diff --git a/modules/nextflow/src/test/groovy/nextflow/dag/DotRendererTest.groovy b/modules/nextflow/src/test/groovy/nextflow/dag/DotRendererTest.groovy index 7e5921aeef..2ff86ebaf7 100644 --- a/modules/nextflow/src/test/groovy/nextflow/dag/DotRendererTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/dag/DotRendererTest.groovy @@ -54,7 +54,7 @@ class DotRendererTest extends Specification { dag.normalize() when: - new DotRenderer('TheGraph').renderAbstractGraph(dag, file) + new DotRenderer('TheGraph').renderDocument(dag, file) then: file.text == ''' diff --git a/modules/nextflow/src/test/groovy/nextflow/dag/GexfRendererTest.groovy b/modules/nextflow/src/test/groovy/nextflow/dag/GexfRendererTest.groovy index 6d4a9e3539..f112732b2e 100644 --- a/modules/nextflow/src/test/groovy/nextflow/dag/GexfRendererTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/dag/GexfRendererTest.groovy @@ -47,7 +47,7 @@ class GexfRendererTest extends Specification { dag.normalize() when: - new GexfRenderer('TheGraph').renderAbstractGraph(dag, file.toPath()) + new GexfRenderer('TheGraph').renderDocument(dag, file.toPath()) then: def graph = new XmlSlurper().parse(file); assert graph.name() == 'gexf' diff --git a/modules/nextflow/src/test/groovy/nextflow/dag/MermaidRendererTest.groovy b/modules/nextflow/src/test/groovy/nextflow/dag/MermaidRendererTest.groovy index b0d1226727..220422ac77 100644 --- a/modules/nextflow/src/test/groovy/nextflow/dag/MermaidRendererTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/dag/MermaidRendererTest.groovy @@ -15,14 +15,13 @@ */ package nextflow.dag - import java.nio.file.Files -import java.nio.file.Paths import groovyx.gpars.dataflow.DataflowQueue -import nextflow.Session import spock.lang.Specification +import nextflow.Session + /** * * @author Ben Sherman @@ -33,7 +32,7 @@ class MermaidRendererTest extends Specification { new Session() } - def 'should render an abstract graph using the `mmd` format' () { + def 'should render a graph using the `mmd` format' () { given: def file = Files.createTempFile('test', null) def ch1 = new DataflowQueue() @@ -47,7 +46,7 @@ class MermaidRendererTest extends Specification { dag.normalize() when: - new MermaidRenderer().renderAbstractGraph(dag, file) + new MermaidRenderer().renderDocument(dag, file) then: file.text == ''' @@ -65,65 +64,4 @@ class MermaidRendererTest extends Specification { cleanup: file.delete() } - - def 'should render a concrete graph using the `mmd` format' () { - given: - def file = Files.createTempFile('test', null) - - def dag = Mock(ConcreteDAG) { - nodes >> [ - '012345': new ConcreteDAG.Task( - index: 1, - label: 'foo', - inputs: [ - new ConcreteDAG.Input( - name: 'data.txt', - path: Paths.get('/inputs/data.txt'), - predecessor: null - ) - ], - outputs: [ - new ConcreteDAG.Output( - name: 'data.foo', - path: Paths.get('/work/012345/data.foo'), - ) - ] - ), - 'abcdef': new ConcreteDAG.Task( - index: 2, - label: 'bar', - inputs: [ - new ConcreteDAG.Input( - name: 'data.foo', - path: Paths.get('/work/012345/data.foo'), - predecessor: '012345' - ) - ], - outputs: [ - new ConcreteDAG.Output( - name: 'data.bar', - path: Paths.get('/work/abcdef/data.bar'), - ) - ] - ) - ] - } - - when: - new MermaidRenderer().renderConcreteGraph(dag, file) - then: - file.text == - ''' - flowchart TD - t1["foo"] - i1(( )) -->|data.txt| t1 - t2["bar"] - t1 -->|data.foo| t2 - t2 -->|data.bar| o1(( )) - ''' - .stripIndent().leftTrim() - - cleanup: - file.delete() - } } diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/TaskHandlerTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/TaskHandlerTest.groovy index ad94ce47a7..1846fb0838 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/TaskHandlerTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/TaskHandlerTest.groovy @@ -16,10 +16,8 @@ package nextflow.processor -import java.nio.file.Paths import java.util.concurrent.atomic.LongAdder -import com.google.common.hash.HashCode import nextflow.Session import nextflow.executor.Executor import nextflow.util.Duration @@ -137,29 +135,6 @@ class TaskHandlerTest extends Specification { } - def 'should write meta file' () { - - given: - def folder = File.createTempDir() - def outputFile = new File(folder, 'bar.txt') ; outputFile.text = 'bar' - def task = Mock(TaskRun) { - hash >> HashCode.fromString('aabbccddeeff00112233445566778899') - workDir >> folder.toPath() - getInputFilesMap() >> [ 'foo.txt': Paths.get('/tmp/00/112233445566778899aabbccddeeff/foo.txt') ] - getOutputsByType(_) >> [ 'bar.txt': outputFile.toPath() ] - } - def handler = [:] as TaskHandler - handler.task = task - - when: - handler.writeMetaFile() - then: - task.workDir.resolve(TaskRun.CMD_META).text == """{"hash":"aabbccddeeff00112233445566778899","inputs":[{"name":"foo.txt","path":"/tmp/00/112233445566778899aabbccddeeff/foo.txt","predecessor":"00112233445566778899aabbccddeeff"}],"outputs":[{"name":"bar.txt","path":"${folder}/bar.txt","size":3,"checksum":"37b51d194a7513e45b56f6524f2d51f2"}]}""" - - cleanup: - folder.delete() - } - LongAdder _adder(Integer x) { if( x != null ) { def adder = new LongAdder() diff --git a/modules/nextflow/src/test/groovy/nextflow/trace/GraphObserverTest.groovy b/modules/nextflow/src/test/groovy/nextflow/trace/GraphObserverTest.groovy index 384c737944..f3d13bc507 100644 --- a/modules/nextflow/src/test/groovy/nextflow/trace/GraphObserverTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/trace/GraphObserverTest.groovy @@ -35,7 +35,7 @@ import test.TestHelper */ class GraphObserverTest extends Specification { - DAG dag + DAG test_dag def setup() { new Session() @@ -45,28 +45,28 @@ class GraphObserverTest extends Specification { def ch2 = new DataflowQueue() def ch3 = new DataflowQueue() - dag = new DAG() + test_dag = new DAG() - dag.addVertex( + test_dag.addVertex( DAG.Type.ORIGIN, 'Source', null, [ new DAG.ChannelHandler(channel: src, label: 'src') ] ) - dag.addVertex( + test_dag.addVertex( DAG.Type.PROCESS, 'Process 1', [ new DAG.ChannelHandler(channel: src, label: 'Source') ], [ new DAG.ChannelHandler(channel: ch1, label: 'Channel 1') ] ) - dag.addVertex( + test_dag.addVertex( DAG.Type.OPERATOR, 'Filter', [ new DAG.ChannelHandler(channel: ch1, label: 'Channel 1') ], [ new DAG.ChannelHandler(channel: ch2, label: 'Channel 2') ] ) - dag.addVertex( + test_dag.addVertex( DAG.Type.PROCESS, 'Process 2', [ new DAG.ChannelHandler(channel: ch2, label: 'Channel 2') ], @@ -78,7 +78,7 @@ class GraphObserverTest extends Specification { given: def file = Files.createTempFile('nxf_','.dot') def gr = new GraphObserver(file) - gr.abstractDag = dag + gr.dag = test_dag when: gr.onFlowComplete() @@ -105,7 +105,7 @@ class GraphObserverTest extends Specification { given: def file = Files.createTempFile('nxf-','.html') def gr = new GraphObserver(file) - gr.abstractDag = dag + gr.dag = test_dag when: gr.onFlowComplete() @@ -133,7 +133,7 @@ class GraphObserverTest extends Specification { given: def file = Files.createTempFile('nxf-','.svg') def gr = new GraphObserver(file) - gr.abstractDag = dag + gr.dag = test_dag when: gr.onFlowComplete() @@ -151,7 +151,7 @@ class GraphObserverTest extends Specification { given: def file = Files.createTempFile('nxf-','.png') def gr = new GraphObserver(file) - gr.abstractDag = dag + gr.dag = test_dag when: gr.onFlowComplete() @@ -168,7 +168,7 @@ class GraphObserverTest extends Specification { given: def file = Files.createTempFile('nxf-','.pdf') def gr = new GraphObserver(file) - gr.abstractDag = dag + gr.dag = test_dag when: gr.onFlowComplete() @@ -185,7 +185,7 @@ class GraphObserverTest extends Specification { def folder = Files.createTempDirectory('test') def file = folder.resolve('nope') def gr = new GraphObserver(file) - gr.abstractDag = dag + gr.dag = test_dag when: gr.onFlowComplete() @@ -216,34 +216,34 @@ class GraphObserverTest extends Specification { then: observer.name == 'hello-world' observer.format == 'dot' - observer.createRenderer() instanceof DotRenderer + observer.createRender() instanceof DotRenderer when: observer = new GraphObserver(Paths.get('/path/to/TheGraph.html')) then: observer.name == 'TheGraph' observer.format == 'html' - observer.createRenderer() instanceof CytoscapeHtmlRenderer + observer.createRender() instanceof CytoscapeHtmlRenderer when: observer = new GraphObserver(Paths.get('/path/to/TheGraph.mmd')) then: observer.name == 'TheGraph' observer.format == 'mmd' - observer.createRenderer() instanceof MermaidRenderer + observer.createRender() instanceof MermaidRenderer when: observer = new GraphObserver(Paths.get('/path/to/TheGraph.SVG')) then: observer.name == 'TheGraph' observer.format == 'svg' - observer.createRenderer() instanceof GraphvizRenderer + observer.createRender() instanceof GraphvizRenderer when: observer = new GraphObserver(Paths.get('/path/to/anonymous')) then: observer.name == 'anonymous' observer.format == 'dot' - observer.createRenderer() instanceof DotRenderer + observer.createRender() instanceof DotRenderer } } diff --git a/modules/nf-commons/src/main/nextflow/extension/FilesEx.groovy b/modules/nf-commons/src/main/nextflow/extension/FilesEx.groovy index 026d0dbc92..389dab5107 100644 --- a/modules/nf-commons/src/main/nextflow/extension/FilesEx.groovy +++ b/modules/nf-commons/src/main/nextflow/extension/FilesEx.groovy @@ -34,14 +34,12 @@ import java.nio.file.attribute.FileAttribute import java.nio.file.attribute.FileTime import java.nio.file.attribute.PosixFilePermission import java.nio.file.attribute.PosixFilePermissions -import java.security.MessageDigest import groovy.transform.CompileStatic import groovy.transform.PackageScope import groovy.transform.stc.ClosureParams import groovy.transform.stc.FromString import groovy.util.logging.Slf4j -import nextflow.file.ETagAwareFile import nextflow.file.FileHelper import nextflow.file.FileSystemPathFactory import nextflow.io.ByteBufferBackedInputStream @@ -1601,22 +1599,4 @@ class FilesEx { static String getScheme(Path path) { path.getFileSystem().provider().getScheme() } - - static String getChecksum(Path path) { - if( Files.isDirectory(path) ) - return null - - if( path instanceof ETagAwareFile ) - return ((ETagAwareFile)path).getETag() - - final is = Files.newInputStream(path) - final md = MessageDigest.getInstance('MD5') - final buf = new byte[16 << 10] - - int len - while( (len=is.read(buf)) != -1 ) - md.update(buf, 0, len) - - new BigInteger(1, md.digest()).toString(16) - } } diff --git a/modules/nf-commons/src/main/nextflow/file/ETagAwareFile.groovy b/modules/nf-commons/src/main/nextflow/file/ETagAwareFile.groovy deleted file mode 100644 index f1c40073b3..0000000000 --- a/modules/nf-commons/src/main/nextflow/file/ETagAwareFile.groovy +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright 2013-2023, Seqera Labs - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package nextflow.file - -/** - * Defines the interface for files that have an ETag - * - * @author Ben Sherman - */ -interface ETagAwareFile { - - String getETag() - -} diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Path.java b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Path.java index a7b7a4e1ed..5ce117ac6f 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Path.java +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Path.java @@ -42,14 +42,13 @@ import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import nextflow.file.ETagAwareFile; import nextflow.file.TagAwareFile; import static com.google.common.collect.Iterables.concat; import static com.google.common.collect.Iterables.filter; import static com.google.common.collect.Iterables.transform; import static java.lang.String.format; -public class S3Path implements Path, ETagAwareFile, TagAwareFile { +public class S3Path implements Path, TagAwareFile { public static final String PATH_SEPARATOR = "/"; /** @@ -555,14 +554,6 @@ public String getContentType() { return contentType; } - @Override - public String getETag() { - return fileSystem - .getClient() - .getObjectMetadata(getBucket(), getKey()) - .getETag(); - } - public String getStorageClass() { return storageClass; } diff --git a/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileAttributes.groovy b/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileAttributes.groovy index ba07cda22f..a955888d70 100644 --- a/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileAttributes.groovy +++ b/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileAttributes.groovy @@ -46,8 +46,6 @@ class AzFileAttributes implements BasicFileAttributes { private String objectId - private String etag - static AzFileAttributes root() { new AzFileAttributes(size: 0, objectId: '/', directory: true) } @@ -62,7 +60,6 @@ class AzFileAttributes implements BasicFileAttributes { updateTime = time(props.getLastModified()) directory = client.blobName.endsWith('/') size = props.getBlobSize() - etag = props.getETag() } AzFileAttributes(String containerName, BlobItem item) { @@ -72,7 +69,6 @@ class AzFileAttributes implements BasicFileAttributes { creationTime = time(item.properties.getCreationTime()) updateTime = time(item.properties.getLastModified()) size = item.properties.getContentLength() - etag = item.properties.getETag() } } @@ -148,10 +144,6 @@ class AzFileAttributes implements BasicFileAttributes { return objectId } - String getETag() { - return etag - } - @Override boolean equals( Object obj ) { if( this.class != obj?.class ) return false diff --git a/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzPath.groovy b/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzPath.groovy index 3bdd222f6b..2f654b4ad8 100644 --- a/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzPath.groovy +++ b/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzPath.groovy @@ -29,7 +29,6 @@ import com.azure.storage.blob.models.BlobItem import groovy.transform.CompileStatic import groovy.transform.EqualsAndHashCode import groovy.transform.PackageScope -import nextflow.file.ETagAwareFile /** * Implements Azure path object @@ -38,7 +37,7 @@ import nextflow.file.ETagAwareFile */ @CompileStatic @EqualsAndHashCode(includes = 'fs,path,directory', includeFields = true) -class AzPath implements Path, ETagAwareFile { +class AzPath implements Path { private AzFileSystem fs @@ -306,11 +305,6 @@ class AzPath implements Path, ETagAwareFile { return this.toString() <=> other.toString() } - @Override - String getETag() { - return attributes.getETag() - } - String getContainerName() { if( path.isAbsolute() ) { path.nameCount==0 ? '/' : path.getName(0) diff --git a/tests/process-named-inputs.groovy b/tests/process-named-inputs.groovy new file mode 100644 index 0000000000..61539c5913 --- /dev/null +++ b/tests/process-named-inputs.groovy @@ -0,0 +1,88 @@ +import static nextflow.Nextflow.* + +import org.apache.commons.lang.StringUtils as StringUtils +import groovy.transform.Field as Field +import java.nio.file.Path as Path +import nextflow.Channel as Channel +import nextflow.util.Duration as Duration +import nextflow.util.MemoryUnit as MemoryUnit +import nextflow.io.ValueObject as ValueObject +import nextflow.Channel as channel + +@groovy.transform.BaseScript +public class script1677878444250 extends nextflow.script.BaseScript { + + public script1677878444250() { + nextflow.script.ScriptMeta.get(this).setDsl1ProcessNames(['foo']) + } + + public script1677878444250(final groovy.lang.Binding context) { + super.setBinding(context) + nextflow.script.ScriptMeta.get(this).setDsl1ProcessNames(['foo']) + } + + public static void main(final java.lang.String[] args) { + org.codehaus.groovy.runtime.InvokerHelper.runScript(script1677878444250, args) + } + + @groovy.transform.Generated + protected java.lang.Object runScript() { + nextflow .enable.dsl = 2 + + this.process('foo', { + this._in_val(new nextflow.script.TokenVar('bar')) + this._in_val(new nextflow.script.TokenVar('baz')) + this._out_stdout() + new nextflow.script.BodyDef( + { + """ + echo $bar + echo $baz + """ + }, + '"""\n echo $bar\n echo $baz\n """\n', + 'script', + [ + new nextflow.script.TokenValRef('baz', 13, 11), + new nextflow.script.TokenValRef('bar', 12, 11) + ] + ) + }) + + this.workflow('foo_wrapper', { + this._take_bar() + this._take_baz() + this._emit_$out0() + new nextflow.script.BodyDef( + { + this.foo(['bar': bar , 'baz': baz ]) + $out0 = foo .out + }, + ' take:\n bar\n baz\n main:\n foo(bar: bar, baz: baz)\n emit:\n foo.out\n', + 'workflow', + [ + new nextflow.script.TokenValRef('foo.out', 25, 5), + new nextflow.script.TokenValRef('baz', 23, 24), + new nextflow.script.TokenValRef('bar', 23, 14), + new nextflow.script.TokenValRef('$out0', -1, -1) + ] + ) + }) + + this.workflow({ + new nextflow.script.BodyDef( + { + this.foo_wrapper(['bar': 'bar', 'baz': 'baz']) | view + }, + ' foo_wrapper(bar: \'bar\', baz: \'baz\')\n | view\n', + 'workflow', + [ + new nextflow.script.TokenValRef('view', 31, 7) + ] + ) + }) + } + +} + +// classes are appended here (with imports duplicated) \ No newline at end of file diff --git a/tests/process-operator-closure.nf b/tests/process-operator-closure.nf new file mode 100644 index 0000000000..f26bd1f642 --- /dev/null +++ b/tests/process-operator-closure.nf @@ -0,0 +1,36 @@ + +params.data = "$baseDir/data/hello.txt" + +process foo { + input: + path 'input.txt' + val index + output: + path 'result.txt' + script: + """ + cat input.txt > result.txt + echo "Task ${index} was here" >> result.txt + """ +} + +workflow { + // perform a fixed number of iterations + Channel.of( 1..10 ) + | reduce(file(params.data), foo) // { f, i -> foo(f, i) } + | view { f -> f.text } + + // iterate until some condition is satisfied + foo + .recurse(file(params.data)) + .until { f -> f.size() > 100 } + + foo + .out + .view(f -> f.text) + + Channel.recurse( file(params.data), foo ) + | until { f -> f.size() > 100 } + | last + | view { f -> f.text } +}