Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Overwrite published outputs only if they are stale #4729

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/process.md
Original file line number Diff line number Diff line change
Expand Up @@ -2230,7 +2230,10 @@ Available options:
- `'symlink'`: Creates an absolute symbolic link in the publish directory for each output file (default).

`overwrite`
: When `true` any existing file in the specified folder will be overridden (default: `true` during normal pipeline execution and `false` when pipeline execution is `resumed`).
: :::{versionchanged} 24.02.0-edge
Prior to this version, the default behavior was `false` if the task was cached on a resumed run and `true` otherwise.
:::
: Determines whether to overwrite a published file if it already exists. By default, existing files are overwritten only if they are stale, i.e. checksum does not match the new file.

`path`
: Specifies the directory where files need to be published. **Note**: the syntax `publishDir '/some/dir'` is a shortcut for `publishDir path: '/some/dir'`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ package nextflow.processor
import java.nio.file.FileAlreadyExistsException
import java.nio.file.FileSystem
import java.nio.file.FileSystems
import java.nio.file.FileVisitResult
import java.nio.file.Files
import java.nio.file.LinkOption
import java.nio.file.NoSuchFileException
import java.nio.file.Path
import java.nio.file.PathMatcher
import java.nio.file.SimpleFileVisitor
import java.nio.file.attribute.BasicFileAttributes
import java.util.concurrent.ExecutorService
import java.util.regex.Pattern

Expand Down Expand Up @@ -68,7 +71,7 @@ class PublishDir {
Path path

/**
* Whenever overwrite existing files
* Whether to overwrite existing files
*/
Boolean overwrite

Expand Down Expand Up @@ -331,10 +334,10 @@ class PublishDir {
}

if( inProcess ) {
safeProcessFile(source, destination)
safeProcessPath(source, destination)
}
else {
threadPool.submit({ safeProcessFile(source, destination) } as Runnable)
threadPool.submit({ safeProcessPath(source, destination) } as Runnable)
}

}
Expand All @@ -357,18 +360,35 @@ class PublishDir {
throw new IllegalArgumentException("Not a valid publish target path: `$target` [${target?.class?.name}]")
}

protected void safeProcessFile(Path source, Path target) {
protected void safeProcessPath(Path source, Path target) {
try {
processFile(source, target)
processPath(source, target)
}
catch( Throwable e ) {
log.warn "Failed to publish file: ${source.toUriString()}; to: ${target.toUriString()} [${mode.toString().toLowerCase()}] -- See log file for details", e
if( NF.strictMode || failOnError){
if( NF.strictMode || failOnError ) {
session?.abort(e)
}
}
}

protected void processPath(Path source, Path target) {

// publish each file in the directory tree
if( Files.isDirectory(source) )
Files.walkFileTree(source, new SimpleFileVisitor<Path>() {
FileVisitResult visitFile(Path sourceFile, BasicFileAttributes attrs) {
final targetFile = target.resolve(source.relativize(sourceFile).toString())
processFile(sourceFile, targetFile)
FileVisitResult.CONTINUE
}
})

// otherwise publish file directly
else
processFile(source, target)
}

protected void processFile( Path source, Path destination ) {

// resolve Fusion symlink if applicable
Expand All @@ -391,8 +411,9 @@ class PublishDir {
// see https://github.com/nextflow-io/nextflow/issues/2177
if( checkSourcePathConflicts(destination))
return

if( overwrite ) {

// overwrite only if explicitly enabled or destination is stale
if( overwrite || (overwrite == null && source.getChecksum() != destination.getChecksum()) ) {
FileHelper.deletePath(destination)
processFileImpl(source, destination)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1324,7 +1324,6 @@ class TaskProcessor {
* Publish output files to a specified target folder
*
* @param task The task whose outputs need to be published
* @param overwrite When {@code true} any existing file will be overwritten, otherwise the publishing is ignored
*/
@CompileStatic
protected void publishOutputs( TaskRun task ) {
Expand All @@ -1340,10 +1339,6 @@ class TaskProcessor {

private void publishOutputs0( TaskRun task, PublishDir publish ) {

if( publish.overwrite == null ) {
publish.overwrite = !task.cached
}

HashSet<Path> files = []
def outputs = task.getOutputsByType(FileOutParam)
for( Map.Entry entry : outputs ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,48 @@ class PublishDirTest extends Specification {

}

def 'should overwrite published files only if they are stale' () {

given:
def session = new Session()
def folder = Files.createTempDirectory('nxf')
def sourceDir = folder.resolve('work-dir')
def targetDir = folder.resolve('pub-dir')
sourceDir.mkdir()
sourceDir.resolve('file1.txt').text = 'aaa'
sourceDir.resolve('file2.bam').text = 'bbb'
targetDir.mkdir()
targetDir.resolve('file1.txt').text = 'aaa'
targetDir.resolve('file2.bam').text = 'bbb (old)'

def task = new TaskRun(workDir: sourceDir, config: new TaskConfig(), name: 'foo')

when:
def outputs = [
sourceDir.resolve('file1.txt'),
sourceDir.resolve('file2.bam')
] as Set
def publisher = new PublishDir(path: targetDir, mode: 'copy')
and:
def timestamp1 = targetDir.resolve('file1.txt').lastModified()
def timestamp2 = targetDir.resolve('file2.bam').lastModified()
and:
publisher.apply( outputs, task )
and:
[email protected](false)

then:
timestamp1 == targetDir.resolve('file1.txt').lastModified()
timestamp2 != targetDir.resolve('file2.bam').lastModified()

targetDir.resolve('file1.txt').text == 'aaa'
targetDir.resolve('file2.bam').text == 'bbb'

cleanup:
folder?.deleteDir()

}

def 'should apply saveAs closure' () {

given:
Expand Down
15 changes: 15 additions & 0 deletions modules/nf-commons/src/main/nextflow/extension/FilesEx.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,17 @@ import java.nio.file.attribute.FileTime
import java.nio.file.attribute.PosixFilePermission
import java.nio.file.attribute.PosixFilePermissions

import com.google.common.hash.Hashing
import groovy.transform.CompileStatic
import groovy.transform.PackageScope
import groovy.transform.stc.ClosureParams
import groovy.transform.stc.FromString
import groovy.util.logging.Slf4j
import nextflow.file.ETagAwareFile
import nextflow.file.FileHelper
import nextflow.file.FileSystemPathFactory
import nextflow.io.ByteBufferBackedInputStream
import nextflow.util.CacheHelper
import nextflow.util.CharsetHelper
import nextflow.util.CheckHelper

Expand Down Expand Up @@ -1599,4 +1602,16 @@ class FilesEx {
static String getScheme(Path path) {
path.getFileSystem().provider().getScheme()
}

static String getChecksum(Path path) {
if( Files.isDirectory(path) )
return null

// use etag if available
if( path instanceof ETagAwareFile )
return path.getETag()

// otherwise compute checksum manually
CacheHelper.hasher(Hashing.md5(), path, CacheHelper.HashMode.DEEP).hash().toString()
}
}
29 changes: 29 additions & 0 deletions modules/nf-commons/src/main/nextflow/file/ETagAwareFile.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2013-2023, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package nextflow.file

/**
* Defines the interface for files that have an ETag
*
* @author Ben Sherman <[email protected]>
*/
interface ETagAwareFile {

String getETag()

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/
package nextflow.cloud.aws

import nextflow.cloud.aws.nio.S3FileSystemProvider
import groovy.transform.CompileStatic
import nextflow.cloud.aws.nio.S3FileSystemProvider
import nextflow.file.FileHelper
import nextflow.plugin.BasePlugin
import org.pf4j.PluginWrapper
Expand Down
11 changes: 10 additions & 1 deletion plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Path.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import nextflow.file.ETagAwareFile;
import nextflow.file.TagAwareFile;
import static com.google.common.collect.Iterables.concat;
import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Iterables.transform;
import static java.lang.String.format;

public class S3Path implements Path, TagAwareFile {
public class S3Path implements Path, ETagAwareFile, TagAwareFile {

public static final String PATH_SEPARATOR = "/";
/**
Expand Down Expand Up @@ -566,6 +567,14 @@ public String getStorageClass() {
return storageClass;
}

@Override
public String getETag() {
return fileSystem
.getClient()
.getObjectMetadata(getBucket(), getKey())
.getETag();
}

// ~ helpers methods

private static Function<String, String> strip(final String ... strs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class PublishDirS3Test extends Specification {
when:
spy.apply1(source, true)
then:
1 * spy.safeProcessFile(source, _) >> { sourceFile, s3File ->
1 * spy.safeProcessPath(source, _) >> { sourceFile, s3File ->
assert s3File instanceof S3Path
assert (s3File as S3Path).getTagsList().find{ it.getKey()=='FOO'}.value == 'this'
assert (s3File as S3Path).getTagsList().find{ it.getKey()=='BAR'}.value == 'that'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class AzFileAttributes implements BasicFileAttributes {

private String objectId

private String etag

static AzFileAttributes root() {
new AzFileAttributes(size: 0, objectId: '/', directory: true)
}
Expand All @@ -60,6 +62,7 @@ class AzFileAttributes implements BasicFileAttributes {
updateTime = time(props.getLastModified())
directory = client.blobName.endsWith('/')
size = props.getBlobSize()
etag = props.getETag()

// Support for Azure Data Lake Storage Gen2 with hierarchical namespace enabled
final meta = props.getMetadata()
Expand All @@ -75,6 +78,7 @@ class AzFileAttributes implements BasicFileAttributes {
creationTime = time(item.properties.getCreationTime())
updateTime = time(item.properties.getLastModified())
size = item.properties.getContentLength()
etag = item.properties.getETag()
}
}

Expand Down Expand Up @@ -150,6 +154,10 @@ class AzFileAttributes implements BasicFileAttributes {
return objectId
}

String getETag() {
return etag
}

@Override
boolean equals( Object obj ) {
if( this.class != obj?.class ) return false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import com.azure.storage.blob.models.BlobItem
import groovy.transform.CompileStatic
import groovy.transform.EqualsAndHashCode
import groovy.transform.PackageScope
import nextflow.file.ETagAwareFile

/**
* Implements Azure path object
Expand All @@ -37,7 +38,7 @@ import groovy.transform.PackageScope
*/
@CompileStatic
@EqualsAndHashCode(includes = 'fs,path,directory', includeFields = true)
class AzPath implements Path {
class AzPath implements Path, ETagAwareFile {

private AzFileSystem fs

Expand Down Expand Up @@ -333,4 +334,9 @@ class AzPath implements Path {
return result
}

@Override
String getETag() {
return attributes.getETag()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package nextflow.cloud.google

import groovy.transform.CompileStatic
import nextflow.cloud.google.nio.GsFileSystemProvider
import nextflow.file.FileHelper
import nextflow.plugin.BasePlugin
import org.pf4j.PluginWrapper
/**
Expand All @@ -30,4 +32,10 @@ class GoogleCloudPlugin extends BasePlugin {
super(wrapper)
}

@Override
void start() {
super.start()
FileHelper.getOrInstallProvider(GsFileSystemProvider)
}

}
Loading