-
Notifications
You must be signed in to change notification settings - Fork 369
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor code around cloud support #1975
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,30 +1,25 @@ | ||
package picard.nio; | ||
|
||
import com.google.cloud.storage.contrib.nio.CloudStorageFileSystem; | ||
import com.google.cloud.storage.contrib.nio.CloudStoragePath; | ||
import htsjdk.io.IOPath; | ||
import htsjdk.samtools.util.FileExtensions; | ||
import htsjdk.utils.ValidationUtils; | ||
import picard.PicardException; | ||
|
||
import java.io.File; | ||
import java.nio.file.Path; | ||
import java.nio.file.Paths; | ||
import java.util.Arrays; | ||
import java.util.UUID; | ||
|
||
|
||
/** | ||
* Derived from BucketUtils.java in GATK | ||
*/ | ||
public class PicardBucketUtils { | ||
public static final String GOOGLE_CLOUD_STORAGE_FILESYSTEM_SCHEME = "gs"; | ||
public static final String HTTP_FILESYSTEM_PROVIDER_SCHEME = "http"; | ||
public static final String HTTPS_FILESYSTEM_PROVIDER_SCHEME = "https"; | ||
public static final String HDFS_SCHEME = "hdfs"; | ||
public static final String FILE_SCHEME = "file"; | ||
|
||
// This Picard test staging bucket has a TTL of 180 days (DeleteAction with Age = 180) | ||
public static final String GCLOUD_PICARD_STAGING_DIRECTORY = "gs://hellbender-test-logs/staging/picard/"; | ||
public static final String GCLOUD_PICARD_STAGING_DIRECTORY_STR = "gs://hellbender-test-logs/staging/picard/"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These really need to be reconciled with the ones in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for pointing this out. I think it should be handled in a separate PR, if that's alright with you. |
||
public static final PicardHtsPath GCLOUD_PICARD_STAGING_DIRECTORY = new PicardHtsPath(GCLOUD_PICARD_STAGING_DIRECTORY_STR); | ||
|
||
|
||
// slashes omitted since hdfs paths seem to only have 1 slash which would be weirder to include than no slashes | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. git blame seems to implicate me on this comment, but I have no clue what it means ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is a remnant from when we copied over code from GATK. Removed. |
||
private PicardBucketUtils(){} //private so that no one will instantiate this class | ||
|
@@ -41,59 +36,43 @@ private PicardBucketUtils(){} //private so that no one will instantiate this cla | |
* @return a new temporary path of the form [directory]/[prefix][random chars][.extension] | ||
* | ||
*/ | ||
public static PicardHtsPath getTempFilePath(final String directory, String prefix, final String extension){ | ||
public static IOPath getTempFilePath(final IOPath directory, String prefix, final String extension){ | ||
ValidationUtils.validateArg(extension.startsWith("."), "The new extension must start with a period '.'"); | ||
final String defaultPrefix = "tmp"; | ||
|
||
if (directory == null){ | ||
// If directory = null, we are creating a local temp file. | ||
// File#createTempFile requires that the prefix be at least 3 characters long | ||
prefix = prefix.length() >= 3 ? prefix : defaultPrefix; | ||
return new PicardHtsPath(PicardIOUtils.createTempFile(prefix, extension)); | ||
} | ||
|
||
if (isGcsUrl(directory) || isHadoopUrl(directory)){ | ||
final PicardHtsPath path = PicardHtsPath.fromPath(randomRemotePath(directory, prefix, extension)); | ||
PicardIOUtils.deleteOnExit(path.toPath()); | ||
// Mark auxiliary files to be deleted | ||
PicardIOUtils.deleteOnExit(PicardHtsPath.replaceExtension(path, FileExtensions.TRIBBLE_INDEX, true).toPath()); | ||
PicardIOUtils.deleteOnExit(PicardHtsPath.replaceExtension(path, FileExtensions.TABIX_INDEX, true).toPath()); | ||
PicardIOUtils.deleteOnExit(PicardHtsPath.replaceExtension(path, FileExtensions.BAI_INDEX, true).toPath()); // e.g. file.bam.bai | ||
PicardIOUtils.deleteOnExit(PicardHtsPath.replaceExtension(path, FileExtensions.BAI_INDEX, false).toPath()); // e.g. file.bai | ||
PicardIOUtils.deleteOnExit(PicardHtsPath.replaceExtension(path, ".md5", true).toPath()); | ||
return path; | ||
} else { | ||
} else if (PicardBucketUtils.isLocalPath(directory)) { | ||
// Assume the (non-null) directory points to a directory on a local filesystem | ||
prefix = prefix.length() >= 3 ? prefix : defaultPrefix; | ||
return new PicardHtsPath(PicardIOUtils.createTempFileInDirectory(prefix, extension, new File(directory))); | ||
return new PicardHtsPath(PicardIOUtils.createTempFileInDirectory(prefix, extension, directory.toPath().toFile())); | ||
} else { | ||
if (isSupportedCloudFilesystem(directory)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the right way to branch here is to invert the logic and test for a |
||
final IOPath path = randomRemotePath(directory, prefix, extension); | ||
PicardIOUtils.deleteOnExit(path.toPath()); | ||
// Mark auxiliary files to be deleted | ||
PicardIOUtils.deleteOnExit(PicardHtsPath.replaceExtension(path, FileExtensions.TRIBBLE_INDEX, true).toPath()); | ||
PicardIOUtils.deleteOnExit(PicardHtsPath.replaceExtension(path, FileExtensions.TABIX_INDEX, true).toPath()); | ||
PicardIOUtils.deleteOnExit(PicardHtsPath.replaceExtension(path, FileExtensions.BAI_INDEX, true).toPath()); // e.g. file.bam.bai | ||
PicardIOUtils.deleteOnExit(PicardHtsPath.replaceExtension(path, FileExtensions.BAI_INDEX, false).toPath()); // e.g. file.bai | ||
PicardIOUtils.deleteOnExit(PicardHtsPath.replaceExtension(path, ".md5", true).toPath()); | ||
return path; | ||
} else { | ||
throw new PicardException("Unsupported cloud filesystem: " + directory.getURIString()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
} | ||
} | ||
} | ||
|
||
/** | ||
* This overload of getTempFilePath takes the directory of type PicardHtsPath instead of String. | ||
* | ||
* @see #getTempFilePath(String, String, String) | ||
* | ||
*/ | ||
public static PicardHtsPath getTempFilePath(final IOPath directory, String prefix, final String extension){ | ||
return getTempFilePath(directory.getURIString(), prefix, extension); | ||
} | ||
|
||
/** | ||
* Calls getTempFilePath with the empty string as the prefix. | ||
* | ||
* @see #getTempFilePath(String, String, String) | ||
*/ | ||
public static PicardHtsPath getTempFilePath(String directory, String extension){ | ||
return getTempFilePath(directory, "", extension); | ||
} | ||
|
||
/** | ||
* Creates a temporary file in a local directory. | ||
* | ||
* @see #getTempFilePath(String, String, String) | ||
* @see #getTempFilePath(IOPath, String, String) | ||
*/ | ||
public static PicardHtsPath getLocalTempFilePath(final String prefix, final String extension){ | ||
return getTempFilePath((String) null, prefix, extension); | ||
public static IOPath getLocalTempFilePath(final String prefix, final String extension){ | ||
return getTempFilePath(null, prefix, extension); | ||
} | ||
|
||
/** | ||
|
@@ -110,10 +89,10 @@ public static PicardHtsPath getLocalTempFilePath(final String prefix, final Stri | |
* @param relativePath The relative location for the new "directory" under the harcoded staging bucket with a TTL set e.g. "test/RevertSam/". | ||
* @return A PicardHtsPath object to a randomly generated "directory" e.g. "gs://hellbender-test-logs/staging/picard/test/RevertSam/{randomly-generated-string}/" | ||
*/ | ||
public static PicardHtsPath getRandomGCSDirectory(final String relativePath){ | ||
public static IOPath getRandomGCSDirectory(final String relativePath){ | ||
ValidationUtils.validateArg(relativePath.endsWith("/"), "relativePath must end in backslash '/': " + relativePath); | ||
|
||
return PicardHtsPath.fromPath(PicardBucketUtils.randomRemotePath(GCLOUD_PICARD_STAGING_DIRECTORY + relativePath, "", "/")); | ||
return PicardBucketUtils.randomRemotePath(PicardHtsPath.resolve(GCLOUD_PICARD_STAGING_DIRECTORY, relativePath), "", "/"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Every direct reference to |
||
} | ||
|
||
/** | ||
|
@@ -123,39 +102,14 @@ public static PicardHtsPath getRandomGCSDirectory(final String relativePath){ | |
* @param prefix The beginning of the file name | ||
* @param suffix The end of the file name, e.g. ".tmp" | ||
*/ | ||
public static Path randomRemotePath(String stagingLocation, String prefix, String suffix) { | ||
public static IOPath randomRemotePath(final IOPath stagingLocation, final String prefix, final String suffix) { | ||
if (isGcsUrl(stagingLocation)) { | ||
return getPathOnGcs(stagingLocation).resolve(prefix + UUID.randomUUID() + suffix); | ||
} else if (isHadoopUrl(stagingLocation)) { | ||
return Paths.get(stagingLocation, prefix + UUID.randomUUID() + suffix); | ||
return PicardHtsPath.resolve(stagingLocation, prefix + UUID.randomUUID() + suffix); | ||
} else { | ||
throw new IllegalArgumentException("Staging location is not remote: " + stagingLocation); | ||
} | ||
} | ||
|
||
/** | ||
* String -> Path. This *should* not be necessary (use Paths.get(URI.create(...)) instead) , but it currently is | ||
* on Spark because using the fat, shaded jar breaks the registration of the GCS FilesystemProvider. | ||
* To transform other types of string URLs into Paths, use IOUtils.getPath instead. | ||
*/ | ||
private static CloudStoragePath getPathOnGcs(String gcsUrl) { | ||
// use a split limit of -1 to preserve empty split tokens, especially trailing slashes on directory names | ||
final String[] split = gcsUrl.split("/", -1); | ||
final String BUCKET = split[2]; | ||
final String pathWithoutBucket = String.join("/", Arrays.copyOfRange(split, 3, split.length)); | ||
return CloudStorageFileSystem.forBucket(BUCKET).getPath(pathWithoutBucket); | ||
} | ||
|
||
/** | ||
* | ||
* @param path path to inspect | ||
* @return true if this path represents a gcs location | ||
*/ | ||
private static boolean isGcsUrl(final String path) { | ||
GATKUtils.nonNull(path); | ||
return path.startsWith(GOOGLE_CLOUD_STORAGE_FILESYSTEM_SCHEME + "://"); | ||
} | ||
|
||
/** | ||
* | ||
* Return true if this {@code PicardHTSPath} represents a gcs URI. | ||
|
@@ -168,34 +122,31 @@ public static boolean isGcsUrl(final IOPath pathSpec) { | |
} | ||
|
||
/** | ||
* @param pathSpec specifier to inspect | ||
* @return true if this {@code GATKPath} represents a remote storage system which may benefit from prefetching (gcs or http(s)) | ||
* @param path specifier to inspect | ||
* @return true if this {@code IOPath} represents a remote storage system which may benefit from prefetching (gcs or http(s)) | ||
*/ | ||
public static boolean isEligibleForPrefetching(final IOPath pathSpec) { | ||
GATKUtils.nonNull(pathSpec); | ||
return isEligibleForPrefetching(pathSpec.getScheme()); | ||
} | ||
|
||
/** | ||
* @param path path to inspect | ||
* @return true if this {@code Path} represents a remote storage system which may benefit from prefetching (gcs or http(s)) | ||
*/ | ||
public static boolean isEligibleForPrefetching(final Path path) { | ||
public static boolean isEligibleForPrefetching(final IOPath path) { | ||
GATKUtils.nonNull(path); | ||
return isEligibleForPrefetching(path.toUri().getScheme()); | ||
} | ||
|
||
private static boolean isEligibleForPrefetching(final String scheme){ | ||
final String scheme = path.getScheme(); | ||
return scheme != null | ||
&& (scheme.equals(GOOGLE_CLOUD_STORAGE_FILESYSTEM_SCHEME) | ||
|| scheme.equals(HTTP_FILESYSTEM_PROVIDER_SCHEME) | ||
|| scheme.equals(HTTPS_FILESYSTEM_PROVIDER_SCHEME)); | ||
} | ||
|
||
public static boolean isLocalPath(final IOPath path){ | ||
return path.getScheme().equals(FILE_SCHEME); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should really be an instance method on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This needs to happen in IOPath, because all the objects that would call the instance method isLocalPath have type IOPath, not PicardHtsPath. If you could add it to htsjdk, that would be great. |
||
|
||
/** | ||
* Returns true if the given path is a HDFS (Hadoop filesystem) URL. | ||
* As of August 2024, we only support Google Cloud. | ||
* Will add other filesystems (e.g. Azure, AWS) when ready. | ||
* | ||
* @return whether the cloud filesystem is currently supported by Picard. | ||
*/ | ||
private static boolean isHadoopUrl(String path) { | ||
return path.startsWith(HDFS_SCHEME + "://"); | ||
public static boolean isSupportedCloudFilesystem(final IOPath path){ | ||
ValidationUtils.validateArg(! isLocalPath(path), "isSupportedCloudFilesystem should be called on a cloud path but was given: " + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't actually think this method should be retained (or adds much value), for several reasons. First, enumerating all of the supported protocol schemes may become difficult as we add additional cloud providers (I think azure, for example, uses multiple schemes). Also, what schemes will work may depend on what additional providers the user has on their classpath. Having said that, if you're going to keep it, it should be moved to PicardHtsPath, and a test added, with test cases at least for Finally, for a predicate method like this, where you're just testing the protocol scheme, it should be pure and not have a side effect like throwing an exception (with the possible exception of requiring that the input is non-null, since the code can't recover from that). I don't see any reason to require the caller to first test if the path has a cloud scheme to prevent it from throwing. It should just return a boolean if the scheme is a supported cloud scheme, and let the caller decide when to throw. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removing sounds good |
||
path.getURIString()); | ||
return isGcsUrl(path); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -182,7 +182,7 @@ public static PicardHtsPath replaceExtension(final IOPath path, final String new | |
/** | ||
* Wrapper for Path.resolve() | ||
*/ | ||
public static PicardHtsPath resolve(final PicardHtsPath absPath, final String relativePath){ | ||
public static PicardHtsPath resolve(final IOPath absPath, final String relativePath){ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the return value for this should also be IOPath, unless there is some reason that would cause problems. (Ideally this would be a method on IOPath, but since it has to return a new object of the subclass type, it can't live there, unless we add some kind of abstract "toSelf" method that each subclass has to implement. For another day). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done, and I see your point. Another day sounds good. |
||
return PicardHtsPath.fromPath(absPath.toPath().resolve(relativePath)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name of this method should probably change to reflect the new return type , i.e.,
getIOPath
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done