Skip to content

Commit

Permalink
WX-1173 Reopen filesystem for blob storage outside workspace (#7178)
Browse files Browse the repository at this point in the history
Co-authored-by: Janet Gainer-Dewar <[email protected]>
Co-authored-by: Tom Wiseman <[email protected]>
Co-authored-by: Adam Nichols <[email protected]>
  • Loading branch information
4 people authored Aug 22, 2023
1 parent 3affdc3 commit bdc1ab3
Show file tree
Hide file tree
Showing 14 changed files with 399 additions and 303 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,6 @@

package com.azure.storage.blob.nio;

import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobListDetails;
import com.azure.storage.blob.models.ListBlobsOptions;

import java.io.IOException;
import java.nio.file.DirectoryIteratorException;
import java.nio.file.DirectoryStream;
Expand All @@ -18,6 +12,12 @@
import java.util.NoSuchElementException;
import java.util.Set;

import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobListDetails;
import com.azure.storage.blob.models.ListBlobsOptions;

/**
* A type for iterating over the contents of a directory.
*
Expand Down Expand Up @@ -88,7 +88,7 @@ private static class AzureDirectoryIterator implements Iterator<Path> {
if (path.isRoot()) {
String containerName = path.toString().substring(0, path.toString().length() - 1);
AzureFileSystem afs = ((AzureFileSystem) path.getFileSystem());
containerClient = ((AzureFileStore) afs.getFileStore(containerName)).getContainerClient();
containerClient = ((AzureFileStore) afs.getFileStore()).getContainerClient();
} else {
AzureResource azureResource = new AzureResource(path);
listOptions.setPrefix(azureResource.getBlobClient().getBlobName() + AzureFileSystem.PATH_SEPARATOR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,6 @@

package com.azure.storage.blob.nio;

import com.azure.core.credential.AzureSasCredential;
import com.azure.core.http.HttpClient;
import com.azure.core.http.policy.HttpLogDetailLevel;
import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.implementation.util.BlobUserAgentModificationPolicy;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.azure.storage.common.policy.RequestRetryOptions;
import com.azure.storage.common.policy.RetryPolicyType;

import java.io.IOException;
import java.nio.file.FileStore;
import java.nio.file.FileSystem;
Expand All @@ -27,14 +14,31 @@
import java.nio.file.attribute.FileAttributeView;
import java.nio.file.attribute.UserPrincipalLookupService;
import java.nio.file.spi.FileSystemProvider;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;

import com.azure.core.credential.AzureSasCredential;
import com.azure.core.http.HttpClient;
import com.azure.core.http.policy.HttpLogDetailLevel;
import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.implementation.util.BlobUserAgentModificationPolicy;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.azure.storage.common.policy.RequestRetryOptions;
import com.azure.storage.common.policy.RetryPolicyType;

/**
* Implement's Java's {@link FileSystem} interface for Azure Blob Storage.
Expand Down Expand Up @@ -67,6 +71,11 @@ public final class AzureFileSystem extends FileSystem {
*/
public static final String AZURE_STORAGE_SAS_TOKEN_CREDENTIAL = "AzureStorageSasTokenCredential";

/**
* Expected type: String
*/
public static final String AZURE_STORAGE_PUBLIC_ACCESS_CREDENTIAL = "AzureStoragePublicAccessCredential";

/**
* Expected type: com.azure.core.http.policy.HttpLogLevelDetail
*/
Expand Down Expand Up @@ -159,9 +168,9 @@ public final class AzureFileSystem extends FileSystem {
private final Long putBlobThreshold;
private final Integer maxConcurrencyPerRequest;
private final Integer downloadResumeRetries;
private final Map<String, FileStore> fileStores;
private FileStore defaultFileStore;
private boolean closed;
private Instant expiry;

AzureFileSystem(AzureFileSystemProvider parentFileSystemProvider, String endpoint, Map<String, ?> config)
throws IOException {
Expand All @@ -181,7 +190,7 @@ public final class AzureFileSystem extends FileSystem {
this.downloadResumeRetries = (Integer) config.get(AZURE_STORAGE_DOWNLOAD_RESUME_RETRIES);

// Initialize and ensure access to FileStores.
this.fileStores = this.initializeFileStores(config);
this.defaultFileStore = this.initializeFileStore(config);
} catch (RuntimeException e) {
throw LoggingUtility.logError(LOGGER, new IllegalArgumentException("There was an error parsing the "
+ "configurations map. Please ensure all fields are set to a legal value of the correct type.", e));
Expand Down Expand Up @@ -221,7 +230,7 @@ public FileSystemProvider provider() {
@Override
public void close() throws IOException {
this.closed = true;
this.parentFileSystemProvider.closeFileSystem(this.getFileSystemUrl());
this.parentFileSystemProvider.closeFileSystem(this.getFileSystemUrl() + "/" + defaultFileStore.name());
}

/**
Expand Down Expand Up @@ -282,9 +291,7 @@ public Iterable<Path> getRootDirectories() {
If the file system was set to use all containers in the account, the account will be re-queried and the
list may grow or shrink if containers were added or deleted.
*/
return fileStores.keySet().stream()
.map(name -> this.getPath(name + AzurePath.ROOT_DIR_SUFFIX))
.collect(Collectors.toList());
return Arrays.asList(this.getPath(defaultFileStore.name() + AzurePath.ROOT_DIR_SUFFIX));
}

/**
Expand All @@ -304,7 +311,7 @@ public Iterable<FileStore> getFileStores() {
If the file system was set to use all containers in the account, the account will be re-queried and the
list may grow or shrink if containers were added or deleted.
*/
return this.fileStores.values();
return Arrays.asList(defaultFileStore);
}

/**
Expand Down Expand Up @@ -397,6 +404,12 @@ private BlobServiceClient buildBlobServiceClient(String endpoint, Map<String, ?>
builder.credential((StorageSharedKeyCredential) config.get(AZURE_STORAGE_SHARED_KEY_CREDENTIAL));
} else if (config.containsKey(AZURE_STORAGE_SAS_TOKEN_CREDENTIAL)) {
builder.credential((AzureSasCredential) config.get(AZURE_STORAGE_SAS_TOKEN_CREDENTIAL));
this.setExpiryFromSAS((AzureSasCredential) config.get(AZURE_STORAGE_SAS_TOKEN_CREDENTIAL));
} else if (config.containsKey(AZURE_STORAGE_PUBLIC_ACCESS_CREDENTIAL)) {
// The Blob Service Client Builder requires at least one kind of authentication to make requests
// For public files however, this is unnecessary. This key-value pair is to denote the case
// explicitly when we supply a placeholder SAS credential to bypass this requirement.
builder.credential((AzureSasCredential) config.get(AZURE_STORAGE_PUBLIC_ACCESS_CREDENTIAL));
} else {
throw LoggingUtility.logError(LOGGER, new IllegalArgumentException(String.format("No credentials were "
+ "provided. Please specify one of the following when constructing an AzureFileSystem: %s, %s.",
Expand Down Expand Up @@ -430,23 +443,17 @@ private BlobServiceClient buildBlobServiceClient(String endpoint, Map<String, ?>
return builder.buildClient();
}

private Map<String, FileStore> initializeFileStores(Map<String, ?> config) throws IOException {
String fileStoreNames = (String) config.get(AZURE_STORAGE_FILE_STORES);
if (CoreUtils.isNullOrEmpty(fileStoreNames)) {
private FileStore initializeFileStore(Map<String, ?> config) throws IOException {
String fileStoreName = (String) config.get(AZURE_STORAGE_FILE_STORES);
if (CoreUtils.isNullOrEmpty(fileStoreName)) {
throw LoggingUtility.logError(LOGGER, new IllegalArgumentException("The list of FileStores cannot be "
+ "null."));
}

Boolean skipConnectionCheck = (Boolean) config.get(AZURE_STORAGE_SKIP_INITIAL_CONTAINER_CHECK);
Map<String, FileStore> fileStores = new HashMap<>();
for (String fileStoreName : fileStoreNames.split(",")) {
FileStore fs = new AzureFileStore(this, fileStoreName, skipConnectionCheck);
if (this.defaultFileStore == null) {
this.defaultFileStore = fs;
}
fileStores.put(fileStoreName, fs);
}
return fileStores;
this.defaultFileStore = new AzureFileStore(this, fileStoreName, skipConnectionCheck);
return this.defaultFileStore;
}

@Override
Expand All @@ -470,12 +477,11 @@ Path getDefaultDirectory() {
return this.getPath(this.defaultFileStore.name() + AzurePath.ROOT_DIR_SUFFIX);
}

FileStore getFileStore(String name) throws IOException {
FileStore store = this.fileStores.get(name);
if (store == null) {
throw LoggingUtility.logError(LOGGER, new IOException("Invalid file store: " + name));
FileStore getFileStore() throws IOException {
if (this.defaultFileStore == null) {
throw LoggingUtility.logError(LOGGER, new IOException("FileStore not initialized"));
}
return store;
return defaultFileStore;
}

Long getBlockSize() {
Expand All @@ -489,4 +495,24 @@ Long getPutBlobThreshold() {
Integer getMaxConcurrencyPerRequest() {
return this.maxConcurrencyPerRequest;
}

public Optional<Instant> getExpiry() {
return Optional.ofNullable(expiry);
}

private void setExpiryFromSAS(AzureSasCredential token) {
List<String> strings = Arrays.asList(token.getSignature().split("&"));
Optional<String> expiryString = strings.stream()
.filter(s -> s.startsWith("se"))
.findFirst()
.map(s -> s.replaceFirst("se=",""))
.map(s -> s.replace("%3A", ":"));
this.expiry = expiryString.map(es -> Instant.parse(es)).orElse(null);
}

public boolean isExpired(Duration buffer) {
return Optional.ofNullable(this.expiry)
.map(e -> Instant.now().plus(buffer).isAfter(e))
.orElse(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ public BlobClient toBlobClient() throws IOException {
String fileStoreName = this.rootToFileStore(root.toString());

BlobContainerClient containerClient =
((AzureFileStore) this.parentFileSystem.getFileStore(fileStoreName)).getContainerClient();
((AzureFileStore) this.parentFileSystem.getFileStore()).getContainerClient();

String blobName = this.withoutRoot();
if (blobName.isEmpty()) {
Expand Down
5 changes: 3 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,11 @@ lazy val azureBlobNio = (project in file("azure-blob-nio"))
lazy val azureBlobFileSystem = (project in file("filesystems/blob"))
.withLibrarySettings("cromwell-azure-blobFileSystem", blobFileSystemDependencies)
.dependsOn(core)
.dependsOn(core % "test->test")
.dependsOn(common % "test->test")
.dependsOn(cloudSupport)
.dependsOn(azureBlobNio)
.dependsOn(core % "test->test")
.dependsOn(common % "test->test")
.dependsOn(azureBlobNio % "test->test")

lazy val awsS3FileSystem = (project in file("filesystems/s3"))
.withLibrarySettings("cromwell-aws-s3filesystem", s3FileSystemDependencies)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,9 @@ import java.util.UUID
// WSM config is needed for accessing WSM-managed blob containers created in Terra workspaces.
// If the identity executing Cromwell has native access to the blob container, this can be ignored.
final case class WorkspaceManagerConfig(url: WorkspaceManagerURL,
workspaceId: WorkspaceId,
containerResourceId: ContainerResourceId,
overrideWsmAuthToken: Option[String]) // dev-only

final case class BlobFileSystemConfig(endpointURL: EndpointURL,
blobContainerName: BlobContainerName,
subscriptionId: Option[SubscriptionId],
final case class BlobFileSystemConfig(subscriptionId: Option[SubscriptionId],
expiryBufferMinutes: Long,
workspaceManagerConfig: Option[WorkspaceManagerConfig])

Expand All @@ -26,8 +22,6 @@ object BlobFileSystemConfig {
final val defaultExpiryBufferMinutes = 10L

def apply(config: Config): BlobFileSystemConfig = {
val endpointURL = parseString(config, "endpoint").map(EndpointURL)
val blobContainer = parseString(config, "container").map(BlobContainerName)
val subscriptionId = parseUUIDOpt(config, "subscription").map(_.map(SubscriptionId))
val expiryBufferMinutes =
parseLongOpt(config, "expiry-buffer-minutes")
Expand All @@ -37,17 +31,15 @@ object BlobFileSystemConfig {
if (config.hasPath("workspace-manager")) {
val wsmConf = config.getConfig("workspace-manager")
val wsmURL = parseString(wsmConf, "url").map(WorkspaceManagerURL)
val workspaceId = parseUUID(wsmConf, "workspace-id").map(WorkspaceId)
val containerResourceId = parseUUID(wsmConf, "container-resource-id").map(ContainerResourceId)
val overrideWsmAuthToken = parseStringOpt(wsmConf, "b2cToken")

(wsmURL, workspaceId, containerResourceId, overrideWsmAuthToken)
(wsmURL, overrideWsmAuthToken)
.mapN(WorkspaceManagerConfig)
.map(Option(_))
}
else None.validNel

(endpointURL, blobContainer, subscriptionId, expiryBufferMinutes, wsmConfig)
(subscriptionId, expiryBufferMinutes, wsmConfig)
.mapN(BlobFileSystemConfig.apply)
.unsafe("Couldn't parse blob filesystem config")
}
Expand All @@ -58,9 +50,6 @@ object BlobFileSystemConfig {
private def parseStringOpt(config: Config, path: String) =
validate[Option[String]] { config.as[Option[String]](path) }

private def parseUUID(config: Config, path: String) =
validate[UUID] { UUID.fromString(config.as[String](path)) }

private def parseUUIDOpt(config: Config, path: String) =
validate[Option[UUID]] { config.as[Option[String]](path).map(UUID.fromString) }

Expand Down
Loading

0 comments on commit bdc1ab3

Please sign in to comment.