diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/PublishIndexOp.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/PublishIndexOp.groovy deleted file mode 100644 index acecd9e9b4..0000000000 --- a/modules/nextflow/src/main/groovy/nextflow/extension/PublishIndexOp.groovy +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Copyright 2013-2024, Seqera Labs - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package nextflow.extension - -import java.nio.file.Path - -import groovy.transform.CompileStatic -import groovy.util.logging.Slf4j -import groovyx.gpars.dataflow.DataflowReadChannel -import nextflow.Global -import nextflow.Session -import nextflow.util.CsvWriter -/** - * Publish an index file describing all files from a source - * channel, including metadata. - * - * @author Ben Sherman - */ -@Slf4j -@CompileStatic -class PublishIndexOp { - - private DataflowReadChannel source - - private Path basePath - - private Path path - - private Closure mapper - - private /* boolean | List */ header = false - - private String sep = ',' - - private List records = [] - - private Session getSession() { Global.session as Session } - - PublishIndexOp(DataflowReadChannel source, Path basePath, String indexPath, Map opts) { - this.source = source - this.basePath = basePath - this.path = basePath.resolve(indexPath) - if( opts.mapper ) - this.mapper = opts.mapper as Closure - if( opts.header != null ) - this.header = opts.header - if( opts.sep ) - this.sep = opts.sep as String - } - - void apply() { - final events = new HashMap(2) - events.onNext = this.&onNext - events.onComplete = this.&onComplete - DataflowHelper.subscribeImpl(source, events) - } - - protected void onNext(value) { - final record = mapper != null ? mapper.call(value) : value - final normalized = normalizePaths(record) - log.trace "Normalized record for index file: ${normalized}" - records << normalized - } - - protected void onComplete(nope) { - if( records.size() == 0 ) - return - log.trace "Saving records to index file: ${records}" - new CsvWriter(header: header, sep: sep).apply(records, path) - } - - protected Object normalizePaths(value) { - if( value instanceof Path ) - return List.of(normalizePath(value)) - - if( value instanceof Collection ) { - return value.collect { el -> - if( el instanceof Path ) - return normalizePath(el) - if( el instanceof Collection ) - return normalizePaths(el) - return el - } - } - - if( value instanceof Map ) { - return value.collectEntries { k, v -> - if( v instanceof Path ) - return List.of(k, normalizePath(v)) - if( v instanceof Collection ) - return List.of(k, normalizePaths(v)) - return List.of(k, v) - } - } - - throw new IllegalArgumentException("Index file record must be a list, map, or file: ${value} [${value.class.simpleName}]") - } - - private Path normalizePath(Path path) { - final sourceDir = getTaskDir(path) - return basePath.resolve(sourceDir.relativize(path)) - } - - /** - * Given a path try to infer the task directory to which the path below - * ie. the directory starting with a workflow work dir and having at lest - * two sub-directories eg work-dir/xx/yyyyyy/etc - * - * @param path - */ - protected Path getTaskDir(Path path) { - if( path == null ) - return null - return getTaskDir0(path, session.workDir.resolve('tmp')) - ?: getTaskDir0(path, session.workDir) - ?: getTaskDir0(path, session.bucketDir) - } - - private Path getTaskDir0(Path file, Path base) { - if( base == null ) - return null - if( base.fileSystem != file.fileSystem ) - return null - final len = base.nameCount - if( file.startsWith(base) && file.getNameCount() > len+2 ) - return base.resolve(file.subpath(len,len+2)) - return null - } - -} diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy index 45d1adc245..68f4c8dc63 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy @@ -24,6 +24,7 @@ import groovyx.gpars.dataflow.DataflowReadChannel import nextflow.Global import nextflow.Session import nextflow.processor.PublishDir +import nextflow.util.CsvWriter /** * Publish files from a source channel. * @@ -37,6 +38,12 @@ class PublishOp { private PublishDir publisher + private Path targetDir + + private IndexOpts indexOpts + + private List indexRecords = [] + private volatile boolean complete private Session getSession() { Global.session as Session } @@ -44,6 +51,9 @@ class PublishOp { PublishOp(DataflowReadChannel source, Map opts) { this.source = source this.publisher = PublishDir.create(opts) + this.targetDir = opts.path as Path + if( opts.index ) + this.indexOpts = new IndexOpts(targetDir, opts.index as Map) } boolean getComplete() { complete } @@ -64,13 +74,32 @@ class PublishOp { final files = entry.value publisher.apply(files, sourceDir) } + + if( indexOpts ) { + final record = indexOpts.mapper != null ? indexOpts.mapper.call(value) : value + final normalized = normalizePaths(record) + log.trace "Normalized record for index file: ${normalized}" + indexRecords << normalized + } } protected void onComplete(nope) { + if( indexOpts && indexRecords.size() > 0 ) { + log.trace "Saving records to index file: ${indexRecords}" + new CsvWriter(header: indexOpts.header, sep: indexOpts.sep).apply(indexRecords, indexOpts.path) + session.notifyFilePublish(indexOpts.path) + } + log.trace "Publish operator complete" this.complete = true } + /** + * Extract files from a received value for publishing. + * + * @param result + * @param value + */ protected Map> collectFiles(Map> result, value) { if( value instanceof Path ) { final sourceDir = getTaskDir(value) @@ -86,9 +115,47 @@ class PublishOp { } /** - * Given a path try to infer the task directory to which the path below - * ie. the directory starting with a workflow work dir and having at lest - * two sub-directories eg work-dir/xx/yyyyyy/etc + * Normalize the paths in a record by converting + * work directory paths to publish paths. + * + * @param value + */ + protected Object normalizePaths(value) { + if( value instanceof Path ) + return List.of(normalizePath(value)) + + if( value instanceof Collection ) { + return value.collect { el -> + if( el instanceof Path ) + return normalizePath(el) + if( el instanceof Collection ) + return normalizePaths(el) + return el + } + } + + if( value instanceof Map ) { + return value.collectEntries { k, v -> + if( v instanceof Path ) + return List.of(k, normalizePath(v)) + if( v instanceof Collection ) + return List.of(k, normalizePaths(v)) + return List.of(k, v) + } + } + + throw new IllegalArgumentException("Index file record must be a list, map, or file: ${value} [${value.class.simpleName}]") + } + + private Path normalizePath(Path path) { + final sourceDir = getTaskDir(path) + return targetDir.resolve(sourceDir.relativize(path)) + } + + /** + * Try to infer the parent task directory to which a path belongs. It + * should be a directory starting with a session work dir and having + * at lest two sub-directories, e.g. work/ab/cdef/etc * * @param path */ @@ -111,4 +178,22 @@ class PublishOp { return null } + static class IndexOpts { + Path path + Closure mapper + def /* boolean | List */ header = false + String sep = ',' + + IndexOpts(Path targetDir, Map opts) { + this.path = targetDir.resolve(opts.path as String) + + if( opts.mapper ) + this.mapper = opts.mapper as Closure + if( opts.header != null ) + this.header = opts.header + if( opts.sep ) + this.sep = opts.sep as String + } + } + } diff --git a/modules/nextflow/src/main/groovy/nextflow/script/OutputDsl.groovy b/modules/nextflow/src/main/groovy/nextflow/script/OutputDsl.groovy index a111babfaa..5a13db1539 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/OutputDsl.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/OutputDsl.groovy @@ -26,7 +26,7 @@ import nextflow.exception.ScriptRuntimeException import nextflow.extension.CH import nextflow.extension.MixOp import nextflow.extension.PublishOp -import nextflow.extension.PublishIndexOp +import nextflow.file.FileHelper /** * Implements the DSL for publishing workflow outputs @@ -43,10 +43,12 @@ class OutputDsl { private Map defaults = [:] + private volatile List ops = [] + void directory(String directory) { if( this.directory ) throw new ScriptRuntimeException("Publish directory cannot be defined more than once in the workflow publish definition") - this.directory = (directory as Path).complete() + this.directory = FileHelper.toCanonicalPath(directory) } void contentType(String value) { @@ -120,22 +122,13 @@ class OutputDsl { : sources.first() final opts = publishOptions(name, publishConfigs[name] ?: [:]) - new PublishOp(CH.getReadChannel(mixed), opts).apply() - - if( opts.index ) { - final basePath = opts.path as Path - final indexOpts = opts.index as Map - final indexPath = indexOpts.path as String - if( !indexPath ) - throw new ScriptRuntimeException("Index file definition for publish target '${name}' is missing `path` option") - new PublishIndexOp(CH.getReadChannel(mixed), basePath, indexPath, indexOpts).apply() - } + ops << new PublishOp(CH.getReadChannel(mixed), opts).apply() } } private Map publishOptions(String name, Map overrides) { if( !directory ) - directory = Paths.get('.').complete() + directory = FileHelper.toCanonicalPath('.') final opts = defaults + overrides if( opts.containsKey('ignoreErrors') ) @@ -147,9 +140,20 @@ class OutputDsl { if( path.startsWith('/') ) throw new ScriptRuntimeException("Invalid publish target path '${path}' -- it should be a relative path") opts.path = directory.resolve(path) + + if( opts.index && !(opts.index as Map).path ) + throw new ScriptRuntimeException("Index file definition for publish target '${name}' is missing `path` option") + return opts } + boolean getComplete() { + for( final op : ops ) + if( !op.complete ) + return false + return true + } + static class TargetDsl { private Map opts = [:] diff --git a/modules/nextflow/src/test/groovy/nextflow/extension/PublishOpTest.groovy b/modules/nextflow/src/test/groovy/nextflow/extension/PublishOpTest.groovy deleted file mode 100644 index e8530c9e3b..0000000000 --- a/modules/nextflow/src/test/groovy/nextflow/extension/PublishOpTest.groovy +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright 2013-2024, Seqera Labs - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package nextflow.extension - -import java.nio.file.Files -import java.util.concurrent.TimeoutException - -import groovyx.gpars.dataflow.DataflowQueue -import nextflow.Channel -import nextflow.Global -import nextflow.Session -import test.BaseSpec -/** - * - * @author Paolo Di Tommaso - */ -class PublishOpTest extends BaseSpec { - - - def 'should publish files' () { - given: - def folder = Files.createTempDirectory('test') - def file1 = folder.resolve('file1.txt'); file1.text = 'Hello' - def file2 = folder.resolve('file2.txt'); file2.text = 'world' - def target = folder.resolve('target/dir') - - - def BASE = folder - def sess = Mock(Session) { - getWorkDir() >> BASE - getConfig() >> [:] - } - Global.session = sess - - and: - def ch = new DataflowQueue() - ch.bind(file1) - ch.bind(file2) - ch.bind(Channel.STOP) - - when: - def now = System.currentTimeMillis() - def op = new PublishOp(ch, [path:target, mode:'symlink']).apply() - while( !op.complete ) { - sleep 100 - if( System.currentTimeMillis() - now > 5_000 ) - throw new TimeoutException() - } - then: - target.resolve('file1.txt').text == 'Hello' - target.resolve('file2.txt').text == 'world' - - cleanup: - folder?.deleteDir() - } - -} diff --git a/modules/nextflow/src/test/groovy/nextflow/script/OutputDslTest.groovy b/modules/nextflow/src/test/groovy/nextflow/script/OutputDslTest.groovy new file mode 100644 index 0000000000..125e344af0 --- /dev/null +++ b/modules/nextflow/src/test/groovy/nextflow/script/OutputDslTest.groovy @@ -0,0 +1,85 @@ +package nextflow.script + +import java.nio.file.Files +import java.util.concurrent.TimeoutException + +import groovyx.gpars.dataflow.DataflowQueue +import nextflow.Channel +import nextflow.Global +import nextflow.Session +import nextflow.SysEnv +import spock.lang.Specification +/** + * + * @author Ben Sherman + */ +class OutputDslTest extends Specification { + + def 'should publish workflow outputs'() { + given: + def root = Files.createTempDirectory('test') + def workDir = root.resolve('work') + def work1 = workDir.resolve('ab/1234'); Files.createDirectories(work1) + def work2 = workDir.resolve('cd/5678'); Files.createDirectories(work2) + def file1 = work1.resolve('file1.txt'); file1.text = 'Hello' + def file2 = work2.resolve('file2.txt'); file2.text = 'world' + def target = root.resolve('results') + and: + def session = Mock(Session) { + getConfig() >> [:] + getWorkDir() >> workDir + } + Global.session = session + and: + def ch1 = new DataflowQueue() + ch1.bind(file1) + ch1.bind(Channel.STOP) + and: + def ch2 = new DataflowQueue() + ch2.bind(file2) + ch2.bind(Channel.STOP) + and: + def targets = [ + (ch1): 'foo', + (ch2): 'bar' + ] + def dsl = new OutputDsl() + and: + SysEnv.push(NXF_FILE_ROOT: root.toString()) + + when: + dsl.directory('results') + dsl.mode('symlink') + dsl.overwrite(true) + dsl.target('bar') { + path('barbar') + index { + path 'index.csv' + } + } + dsl.build(targets) + + def now = System.currentTimeMillis() + while( !dsl.complete ) { + sleep 100 + if( System.currentTimeMillis() - now > 5_000 ) + throw new TimeoutException() + } + + then: + target.resolve('foo/file1.txt').text == 'Hello' + target.resolve('barbar/file2.txt').text == 'world' + target.resolve('barbar/index.csv').text == """\ + "${target}/barbar/file2.txt" + """.stripIndent() + and: + 1 * session.notifyFilePublish(target.resolve('foo/file1.txt'), file1) + 1 * session.notifyFilePublish(target.resolve('barbar/file2.txt'), file2) + 1 * session.notifyFilePublish(target.resolve('barbar/index.csv')) + + cleanup: + SysEnv.pop() + root?.deleteDir() + } + +}