Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WX-1173 Reopen filesystem for blob storage outside workspace #7178

Merged
merged 40 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
38b5b4b
Initial map rework
kraefrei Jul 14, 2023
30490d9
Move over changes from initial prototype
kraefrei Jul 17, 2023
73825d3
Some cleanup
kraefrei Jul 18, 2023
88ecd81
Modify SAS generation to account for public fs
kraefrei Jul 18, 2023
0737288
Update some comments
kraefrei Jul 18, 2023
dcb4912
Modify the NIO library to assume a single store in a filesystem
kraefrei Jul 18, 2023
bbf0d01
Cleanup single store changes
kraefrei Jul 18, 2023
4962cbf
Fix bug with public filesystems that fit the 'Terra' pattern
kraefrei Jul 19, 2023
5dc6ba9
Clean up
kraefrei Jul 19, 2023
cd83f29
Remove tenant
kraefrei Jul 20, 2023
3de31fd
Add initial AzureFileSystem spec
kraefrei Jul 20, 2023
5121e6e
Test debugging
jgainerdewar Jul 25, 2023
342c956
Test debugging
jgainerdewar Jul 25, 2023
c884b3d
Test debugging
jgainerdewar Jul 25, 2023
95dd0b8
Does this work better?
jgainerdewar Jul 25, 2023
01cf7db
What about now?
jgainerdewar Jul 25, 2023
5ec30fc
Change test build
jgainerdewar Jul 26, 2023
672ffda
Testing
jgainerdewar Aug 1, 2023
86194e5
Restore test to pre-debugging state and ignore
jgainerdewar Aug 4, 2023
8621e8c
Remove unused config
jgainerdewar Aug 4, 2023
2dac0ba
Remove unused config
jgainerdewar Aug 4, 2023
da8a547
tiny change huge impact
THWiseman Aug 14, 2023
19f7ce5
Merge branch 'develop' into WX-1173
THWiseman Aug 15, 2023
5b6f36d
Merge branch 'develop' into WX-1173
THWiseman Aug 15, 2023
a3ff4d7
remove unnecessary config change
THWiseman Aug 15, 2023
925691f
merge
THWiseman Aug 18, 2023
90bf473
merge but better
THWiseman Aug 18, 2023
a9b28f1
unignore
THWiseman Aug 18, 2023
fd9e7f4
still one import problem
THWiseman Aug 18, 2023
1225057
`AzureFileSystem` is-a `FileSystem` and has-a `isExpired`
aednichols Aug 18, 2023
c2df9e5
Revert "`AzureFileSystem` is-a `FileSystem` and has-a `isExpired`"
aednichols Aug 18, 2023
4b890a6
Revert "Revert "`AzureFileSystem` is-a `FileSystem` and has-a `isExpi…
aednichols Aug 18, 2023
8a8a505
`AzureFileSystem` is-a `FileSystem` and has-a `isExpired`
aednichols Aug 18, 2023
6e818c1
`Try.get` in test
aednichols Aug 18, 2023
a0a0979
compile issue
THWiseman Aug 21, 2023
206522c
Merge branch 'develop' into WX-1173
THWiseman Aug 21, 2023
9ced6fb
definitley fix mock issue, maybe fix filesystem issue
THWiseman Aug 21, 2023
9e09bba
all seems well. rename variable
THWiseman Aug 21, 2023
8e36d6f
remove unused params
THWiseman Aug 21, 2023
2869238
remove stale comment
THWiseman Aug 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,6 @@

package com.azure.storage.blob.nio;

import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobListDetails;
import com.azure.storage.blob.models.ListBlobsOptions;

import java.io.IOException;
import java.nio.file.DirectoryIteratorException;
import java.nio.file.DirectoryStream;
Expand All @@ -18,6 +12,12 @@
import java.util.NoSuchElementException;
import java.util.Set;

import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobListDetails;
import com.azure.storage.blob.models.ListBlobsOptions;

/**
* A type for iterating over the contents of a directory.
*
Expand Down Expand Up @@ -88,7 +88,7 @@ private static class AzureDirectoryIterator implements Iterator<Path> {
if (path.isRoot()) {
String containerName = path.toString().substring(0, path.toString().length() - 1);
AzureFileSystem afs = ((AzureFileSystem) path.getFileSystem());
containerClient = ((AzureFileStore) afs.getFileStore(containerName)).getContainerClient();
containerClient = ((AzureFileStore) afs.getFileStore()).getContainerClient();
} else {
AzureResource azureResource = new AzureResource(path);
listOptions.setPrefix(azureResource.getBlobClient().getBlobName() + AzureFileSystem.PATH_SEPARATOR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,6 @@

package com.azure.storage.blob.nio;

import com.azure.core.credential.AzureSasCredential;
import com.azure.core.http.HttpClient;
import com.azure.core.http.policy.HttpLogDetailLevel;
import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.implementation.util.BlobUserAgentModificationPolicy;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.azure.storage.common.policy.RequestRetryOptions;
import com.azure.storage.common.policy.RetryPolicyType;

import java.io.IOException;
import java.nio.file.FileStore;
import java.nio.file.FileSystem;
Expand All @@ -27,14 +14,31 @@
import java.nio.file.attribute.FileAttributeView;
import java.nio.file.attribute.UserPrincipalLookupService;
import java.nio.file.spi.FileSystemProvider;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;

import com.azure.core.credential.AzureSasCredential;
import com.azure.core.http.HttpClient;
import com.azure.core.http.policy.HttpLogDetailLevel;
import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.implementation.util.BlobUserAgentModificationPolicy;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.azure.storage.common.policy.RequestRetryOptions;
import com.azure.storage.common.policy.RetryPolicyType;

/**
* Implement's Java's {@link FileSystem} interface for Azure Blob Storage.
Expand Down Expand Up @@ -67,6 +71,11 @@ public final class AzureFileSystem extends FileSystem {
*/
public static final String AZURE_STORAGE_SAS_TOKEN_CREDENTIAL = "AzureStorageSasTokenCredential";

/**
* Expected type: String
*/
public static final String AZURE_STORAGE_PUBLIC_ACCESS_CREDENTIAL = "AzureStoragePublicAccessCredential";

/**
* Expected type: com.azure.core.http.policy.HttpLogLevelDetail
*/
Expand Down Expand Up @@ -159,9 +168,9 @@ public final class AzureFileSystem extends FileSystem {
private final Long putBlobThreshold;
private final Integer maxConcurrencyPerRequest;
private final Integer downloadResumeRetries;
private final Map<String, FileStore> fileStores;
private FileStore defaultFileStore;
private boolean closed;
private Instant expiry;

AzureFileSystem(AzureFileSystemProvider parentFileSystemProvider, String endpoint, Map<String, ?> config)
throws IOException {
Expand All @@ -181,7 +190,7 @@ public final class AzureFileSystem extends FileSystem {
this.downloadResumeRetries = (Integer) config.get(AZURE_STORAGE_DOWNLOAD_RESUME_RETRIES);

// Initialize and ensure access to FileStores.
this.fileStores = this.initializeFileStores(config);
this.defaultFileStore = this.initializeFileStore(config);
} catch (RuntimeException e) {
throw LoggingUtility.logError(LOGGER, new IllegalArgumentException("There was an error parsing the "
+ "configurations map. Please ensure all fields are set to a legal value of the correct type.", e));
Expand Down Expand Up @@ -221,7 +230,7 @@ public FileSystemProvider provider() {
@Override
public void close() throws IOException {
this.closed = true;
this.parentFileSystemProvider.closeFileSystem(this.getFileSystemUrl());
this.parentFileSystemProvider.closeFileSystem(this.getFileSystemUrl() + "/" + defaultFileStore.name());
}

/**
Expand Down Expand Up @@ -282,9 +291,7 @@ public Iterable<Path> getRootDirectories() {
If the file system was set to use all containers in the account, the account will be re-queried and the
list may grow or shrink if containers were added or deleted.
*/
return fileStores.keySet().stream()
.map(name -> this.getPath(name + AzurePath.ROOT_DIR_SUFFIX))
.collect(Collectors.toList());
return Arrays.asList(this.getPath(defaultFileStore.name() + AzurePath.ROOT_DIR_SUFFIX));
}

/**
Expand All @@ -304,7 +311,7 @@ public Iterable<FileStore> getFileStores() {
If the file system was set to use all containers in the account, the account will be re-queried and the
list may grow or shrink if containers were added or deleted.
*/
return this.fileStores.values();
return Arrays.asList(defaultFileStore);
}

/**
Expand Down Expand Up @@ -397,6 +404,12 @@ private BlobServiceClient buildBlobServiceClient(String endpoint, Map<String, ?>
builder.credential((StorageSharedKeyCredential) config.get(AZURE_STORAGE_SHARED_KEY_CREDENTIAL));
} else if (config.containsKey(AZURE_STORAGE_SAS_TOKEN_CREDENTIAL)) {
builder.credential((AzureSasCredential) config.get(AZURE_STORAGE_SAS_TOKEN_CREDENTIAL));
this.setExpiryFromSAS((AzureSasCredential) config.get(AZURE_STORAGE_SAS_TOKEN_CREDENTIAL));
} else if (config.containsKey(AZURE_STORAGE_PUBLIC_ACCESS_CREDENTIAL)) {
// The Blob Service Client Builder requires at least one kind of authentication to make requests
// For public files however, this is unnecessary. This key-value pair is to denote the case
// explicitly when we supply a placeholder SAS credential to bypass this requirement.
builder.credential((AzureSasCredential) config.get(AZURE_STORAGE_PUBLIC_ACCESS_CREDENTIAL));
} else {
throw LoggingUtility.logError(LOGGER, new IllegalArgumentException(String.format("No credentials were "
+ "provided. Please specify one of the following when constructing an AzureFileSystem: %s, %s.",
Expand Down Expand Up @@ -430,23 +443,17 @@ private BlobServiceClient buildBlobServiceClient(String endpoint, Map<String, ?>
return builder.buildClient();
}

private Map<String, FileStore> initializeFileStores(Map<String, ?> config) throws IOException {
String fileStoreNames = (String) config.get(AZURE_STORAGE_FILE_STORES);
if (CoreUtils.isNullOrEmpty(fileStoreNames)) {
private FileStore initializeFileStore(Map<String, ?> config) throws IOException {
String fileStoreName = (String) config.get(AZURE_STORAGE_FILE_STORES);
if (CoreUtils.isNullOrEmpty(fileStoreName)) {
throw LoggingUtility.logError(LOGGER, new IllegalArgumentException("The list of FileStores cannot be "
+ "null."));
}

Boolean skipConnectionCheck = (Boolean) config.get(AZURE_STORAGE_SKIP_INITIAL_CONTAINER_CHECK);
Map<String, FileStore> fileStores = new HashMap<>();
for (String fileStoreName : fileStoreNames.split(",")) {
FileStore fs = new AzureFileStore(this, fileStoreName, skipConnectionCheck);
if (this.defaultFileStore == null) {
this.defaultFileStore = fs;
}
fileStores.put(fileStoreName, fs);
}
return fileStores;
this.defaultFileStore = new AzureFileStore(this, fileStoreName, skipConnectionCheck);
return this.defaultFileStore;
}

@Override
Expand All @@ -470,12 +477,11 @@ Path getDefaultDirectory() {
return this.getPath(this.defaultFileStore.name() + AzurePath.ROOT_DIR_SUFFIX);
}

FileStore getFileStore(String name) throws IOException {
FileStore store = this.fileStores.get(name);
if (store == null) {
throw LoggingUtility.logError(LOGGER, new IOException("Invalid file store: " + name));
FileStore getFileStore() throws IOException {
if (this.defaultFileStore == null) {
throw LoggingUtility.logError(LOGGER, new IOException("FileStore not initialized"));
}
return store;
return defaultFileStore;
}

Long getBlockSize() {
Expand All @@ -489,4 +495,24 @@ Long getPutBlobThreshold() {
Integer getMaxConcurrencyPerRequest() {
return this.maxConcurrencyPerRequest;
}

public Optional<Instant> getExpiry() {
return Optional.ofNullable(expiry);
}

private void setExpiryFromSAS(AzureSasCredential token) {
List<String> strings = Arrays.asList(token.getSignature().split("&"));
Optional<String> expiryString = strings.stream()
.filter(s -> s.startsWith("se"))
.findFirst()
.map(s -> s.replaceFirst("se=",""))
.map(s -> s.replace("%3A", ":"));
this.expiry = expiryString.map(es -> Instant.parse(es)).orElse(null);
}

public boolean isExpired(Duration buffer) {
return Optional.ofNullable(this.expiry)
.map(e -> Instant.now().plus(buffer).isAfter(e))
.orElse(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ public BlobClient toBlobClient() throws IOException {
String fileStoreName = this.rootToFileStore(root.toString());

BlobContainerClient containerClient =
((AzureFileStore) this.parentFileSystem.getFileStore(fileStoreName)).getContainerClient();
((AzureFileStore) this.parentFileSystem.getFileStore()).getContainerClient();

String blobName = this.withoutRoot();
if (blobName.isEmpty()) {
Expand Down
5 changes: 3 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,11 @@ lazy val azureBlobNio = (project in file("azure-blob-nio"))
lazy val azureBlobFileSystem = (project in file("filesystems/blob"))
.withLibrarySettings("cromwell-azure-blobFileSystem", blobFileSystemDependencies)
.dependsOn(core)
.dependsOn(core % "test->test")
.dependsOn(common % "test->test")
.dependsOn(cloudSupport)
.dependsOn(azureBlobNio)
.dependsOn(core % "test->test")
.dependsOn(common % "test->test")
.dependsOn(azureBlobNio % "test->test")

lazy val awsS3FileSystem = (project in file("filesystems/s3"))
.withLibrarySettings("cromwell-aws-s3filesystem", s3FileSystemDependencies)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,9 @@ import java.util.UUID
// WSM config is needed for accessing WSM-managed blob containers created in Terra workspaces.
// If the identity executing Cromwell has native access to the blob container, this can be ignored.
final case class WorkspaceManagerConfig(url: WorkspaceManagerURL,
workspaceId: WorkspaceId,
containerResourceId: ContainerResourceId,
overrideWsmAuthToken: Option[String]) // dev-only

final case class BlobFileSystemConfig(endpointURL: EndpointURL,
blobContainerName: BlobContainerName,
subscriptionId: Option[SubscriptionId],
final case class BlobFileSystemConfig(subscriptionId: Option[SubscriptionId],
expiryBufferMinutes: Long,
workspaceManagerConfig: Option[WorkspaceManagerConfig])

Expand All @@ -26,8 +22,6 @@ object BlobFileSystemConfig {
final val defaultExpiryBufferMinutes = 10L

def apply(config: Config): BlobFileSystemConfig = {
val endpointURL = parseString(config, "endpoint").map(EndpointURL)
val blobContainer = parseString(config, "container").map(BlobContainerName)
val subscriptionId = parseUUIDOpt(config, "subscription").map(_.map(SubscriptionId))
val expiryBufferMinutes =
parseLongOpt(config, "expiry-buffer-minutes")
Expand All @@ -37,17 +31,15 @@ object BlobFileSystemConfig {
if (config.hasPath("workspace-manager")) {
val wsmConf = config.getConfig("workspace-manager")
val wsmURL = parseString(wsmConf, "url").map(WorkspaceManagerURL)
val workspaceId = parseUUID(wsmConf, "workspace-id").map(WorkspaceId)
val containerResourceId = parseUUID(wsmConf, "container-resource-id").map(ContainerResourceId)
val overrideWsmAuthToken = parseStringOpt(wsmConf, "b2cToken")

(wsmURL, workspaceId, containerResourceId, overrideWsmAuthToken)
(wsmURL, overrideWsmAuthToken)
.mapN(WorkspaceManagerConfig)
.map(Option(_))
}
else None.validNel

(endpointURL, blobContainer, subscriptionId, expiryBufferMinutes, wsmConfig)
(subscriptionId, expiryBufferMinutes, wsmConfig)
.mapN(BlobFileSystemConfig.apply)
.unsafe("Couldn't parse blob filesystem config")
}
Expand All @@ -58,9 +50,6 @@ object BlobFileSystemConfig {
private def parseStringOpt(config: Config, path: String) =
validate[Option[String]] { config.as[Option[String]](path) }

private def parseUUID(config: Config, path: String) =
validate[UUID] { UUID.fromString(config.as[String](path)) }

private def parseUUIDOpt(config: Config, path: String) =
validate[Option[UUID]] { config.as[Option[String]](path).map(UUID.fromString) }

Expand Down
Loading
Loading