Skip to content

Commit

Permalink
Merge upstream changes, move google nio filesystem to separate branch
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Sherman <[email protected]>
  • Loading branch information
bentsherman committed May 19, 2024
1 parent f3eb942 commit 9edc796
Show file tree
Hide file tree
Showing 26 changed files with 89 additions and 507 deletions.
5 changes: 1 addition & 4 deletions docs/process.md
Original file line number Diff line number Diff line change
Expand Up @@ -2318,10 +2318,7 @@ Available options:
- `'symlink'`: Creates an absolute symbolic link in the publish directory for each output file (default).

`overwrite`
: :::{versionchanged} 24.02.0-edge
Prior to this version, the default behavior was `false` if the task was cached on a resumed run and `true` otherwise.
:::
: Determines whether to overwrite a published file if it already exists. By default, existing files are overwritten only if they are stale, i.e. checksum does not match the new file.
: When `true` any existing file in the specified folder will be overridden (default: `true` during normal pipeline execution and `false` when pipeline execution is `resumed`).

`path`
: Specifies the directory where files need to be published. **Note**: the syntax `publishDir '/some/dir'` is a shortcut for `publishDir path: '/some/dir'`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1372,6 +1372,7 @@ class TaskProcessor {
* Publish output files to a specified target folder
*
* @param task The task whose outputs need to be published
* @param overwrite When {@code true} any existing file will be overwritten, otherwise the publishing is ignored
*/
@CompileStatic
protected void publishOutputs( TaskRun task ) {
Expand All @@ -1387,6 +1388,10 @@ class TaskProcessor {

private void publishOutputs0( TaskRun task, PublishDir publish ) {

if( publish.overwrite == null ) {
publish.overwrite = !task.cached
}

HashSet<Path> files = []
def outputs = task.getOutputsByType(FileOutParam)
for( Map.Entry entry : outputs ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ class PublishDirTest extends Specification {
sourceDir.resolve('file1.txt'),
sourceDir.resolve('file2.bam')
] as Set
def publisher = new PublishDir(path: targetDir, mode: 'copy')
def publisher = new PublishDir(path: targetDir, mode: 'copy', overwrite: 'deep')
and:
def timestamp1 = targetDir.resolve('file1.txt').lastModified()
def timestamp2 = targetDir.resolve('file2.bam').lastModified()
Expand Down
15 changes: 0 additions & 15 deletions modules/nf-commons/src/main/nextflow/extension/FilesEx.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,14 @@ import java.nio.file.attribute.FileTime
import java.nio.file.attribute.PosixFilePermission
import java.nio.file.attribute.PosixFilePermissions

import com.google.common.hash.Hashing
import groovy.transform.CompileStatic
import groovy.transform.PackageScope
import groovy.transform.stc.ClosureParams
import groovy.transform.stc.FromString
import groovy.util.logging.Slf4j
import nextflow.file.ETagAwareFile
import nextflow.file.FileHelper
import nextflow.file.FileSystemPathFactory
import nextflow.io.ByteBufferBackedInputStream
import nextflow.util.CacheHelper
import nextflow.util.CharsetHelper
import nextflow.util.CheckHelper

Expand Down Expand Up @@ -1602,16 +1599,4 @@ class FilesEx {
static String getScheme(Path path) {
path.getFileSystem().provider().getScheme()
}

static String getChecksum(Path path) {
if( Files.isDirectory(path) )
return null

// use etag if available
if( path instanceof ETagAwareFile )
return path.getETag()

// otherwise compute checksum manually
CacheHelper.hasher(Hashing.md5(), path, CacheHelper.HashMode.DEEP).hash().toString()
}
}
7 changes: 7 additions & 0 deletions modules/nf-commons/src/main/nextflow/util/HashBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
Expand All @@ -44,6 +45,7 @@
import nextflow.ISession;
import nextflow.extension.Bolts;
import nextflow.extension.FilesEx;
import nextflow.file.ETagAwareFile;
import nextflow.file.FileHolder;
import nextflow.io.SerializableMarker;
import org.slf4j.Logger;
Expand Down Expand Up @@ -413,6 +415,11 @@ static private Hasher hashFileMetadata( Hasher hasher, Path file, BasicFileAttri
*/
static private Hasher hashFileContent( Hasher hasher, Path path ) {

// use etag if available
if( path instanceof ETagAwareFile )
hasher.putBytes(((ETagAwareFile)path).getETag().getBytes(StandardCharsets.UTF_8));

// otherwise compute checksum manually
OutputStream output = Funnels.asOutputStream(hasher);
try {
Files.copy(path, output);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
package nextflow.cloud.google

import groovy.transform.CompileStatic
import nextflow.cloud.google.nio.GsFileSystemProvider
import nextflow.file.FileHelper
import nextflow.plugin.BasePlugin
import org.pf4j.PluginWrapper
/**
Expand All @@ -32,10 +30,4 @@ class GoogleCloudPlugin extends BasePlugin {
super(wrapper)
}

@Override
void start() {
super.start()
FileHelper.getOrInstallProvider(GsFileSystemProvider)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,10 @@ import java.nio.file.Paths

import com.google.cloud.batch.v1.GCS
import com.google.cloud.batch.v1.Volume
import com.google.cloud.storage.contrib.nio.CloudStoragePath
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
<<<<<<< HEAD
import nextflow.cloud.google.nio.GsPath
=======
import nextflow.cloud.google.batch.client.BatchConfig
>>>>>>> master
import nextflow.executor.BashWrapperBuilder
import nextflow.extension.FilesEx
import nextflow.processor.TaskBean
Expand All @@ -47,12 +44,8 @@ class GoogleBatchScriptLauncher extends BashWrapperBuilder implements GoogleBatc

private static final String MOUNT_ROOT = '/mnt/disks'

<<<<<<< HEAD
private GsPath remoteWorkDir
=======
private BatchConfig config
private CloudStoragePath remoteWorkDir
>>>>>>> master
private Path remoteBinDir
private Set<String> buckets = new HashSet<>()
private PathTrie pathTrie = new PathTrie()
Expand All @@ -63,7 +56,7 @@ class GoogleBatchScriptLauncher extends BashWrapperBuilder implements GoogleBatc
GoogleBatchScriptLauncher(TaskBean bean, Path remoteBinDir) {
super(bean)
// keep track the google storage work dir
this.remoteWorkDir = (GsPath) bean.workDir
this.remoteWorkDir = (CloudStoragePath) bean.workDir
this.remoteBinDir = toContainerMount(remoteBinDir)

// map bean work and target dirs to container mount
Expand Down Expand Up @@ -112,7 +105,7 @@ class GoogleBatchScriptLauncher extends BashWrapperBuilder implements GoogleBatc
}

protected Path toContainerMount(Path path, boolean parent=false) {
if( path instanceof GsPath ) {
if( path instanceof CloudStoragePath ) {
buckets.add(path.bucket())
pathTrie.add( (parent ? "/${path.bucket()}${path.parent}" : "/${path.bucket()}${path}").toString() )
final containerMount = containerMountPath(path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,8 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {
super(task)
this.client = executor.getClient()
this.executor = executor
<<<<<<< HEAD
// those files are access via NF runtime, keep based on GsPath
=======
this.jobId = customJobName(task) ?: "nf-${task.hashLog.replace('/','')}-${System.currentTimeMillis()}"
// those files are access via NF runtime, keep based on CloudStoragePath
>>>>>>> master
this.outputFile = task.workDir.resolve(TaskRun.CMD_OUTFILE)
this.errorFile = task.workDir.resolve(TaskRun.CMD_ERRFILE)
this.exitFile = task.workDir.resolve(TaskRun.CMD_EXIT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.nio.file.Path

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.cloud.google.file.GsBashLib
import nextflow.cloud.google.util.GsBashLib
import nextflow.executor.SimpleFileCopyStrategy
import nextflow.processor.TaskBean
import nextflow.processor.TaskRun
Expand Down

This file was deleted.

Loading

0 comments on commit 9edc796

Please sign in to comment.