diff --git a/docs/content/docs/deployment/filesystems/azure.md b/docs/content/docs/deployment/filesystems/azure.md index dcb20387ab0ca..b26c22ee381f4 100644 --- a/docs/content/docs/deployment/filesystems/azure.md +++ b/docs/content/docs/deployment/filesystems/azure.md @@ -30,13 +30,31 @@ under the License. [Azure Blob Storage](https://docs.microsoft.com/en-us/azure/storage/) is a Microsoft-managed service providing cloud storage for a variety of use cases. You can use Azure Blob Storage with Flink for **reading** and **writing data** as well in conjunction with the [streaming **state backends**]({{< ref "docs/ops/state/state_backends" >}}) +Flink supports accessing Azure Blob Storage using both [wasb://](https://hadoop.apache.org/docs/stable/hadoop-azure/index.html) or [abfs://](https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html). + +{{< hint info >}} +Azure recommends using abfs:// for accessing ADLS Gen2 storage accounts even though wasb:// works through backward compatibility. +{{< /hint >}} + +{{< hint warning >}} +abfs:// can be used for accessing the ADLS Gen2 storage accounts only. Please visit Azure documentation on how to identify ADLS Gen2 storage account. +{{< /hint >}} + + You can use Azure Blob Storage objects like regular files by specifying paths in the following format: ```plain +// WASB unencrypted access wasb://@$.blob.core.windows.net/ -// SSL encrypted access +// WASB SSL encrypted access wasbs://@$.blob.core.windows.net/ + +// ABFS unecrypted access +abfs://@$.dfs.core.windows.net/ + +// ABFS SSL encrypted access +abfss://@$.dfs.core.windows.net/ ``` See below for how to use Azure Blob Storage in a Flink job: @@ -63,9 +81,11 @@ cp ./opt/flink-azure-fs-hadoop-{{< version >}}.jar ./plugins/azure-fs-hadoop/ `flink-azure-fs-hadoop` registers default FileSystem wrappers for URIs with the *wasb://* and *wasbs://* (SSL encrypted access) scheme. -### Credentials Configuration +## Credentials Configuration + +### WASB -Hadoop's Azure Filesystem supports configuration of credentials via the Hadoop configuration as +Hadoop's WASB Azure Filesystem supports configuration of credentials via the Hadoop configuration as outlined in the [Hadoop Azure Blob Storage documentation](https://hadoop.apache.org/docs/current/hadoop-azure/index.html#Configuring_Credentials). For convenience Flink forwards all Flink configurations with a key prefix of `fs.azure` to the Hadoop configuration of the filesystem. Consequentially, the azure blob storage key can be configured @@ -83,4 +103,19 @@ environment variable `AZURE_STORAGE_KEY` by setting the following configuration fs.azure.account.keyprovider..blob.core.windows.net: org.apache.flink.fs.azurefs.EnvironmentVariableKeyProvider ``` +### ABFS + +Hadoop's ABFS Azure Filesystem supports several ways of configuring authentication. Please visit the [Hadoop ABFS documentation](https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html#Authentication) documentation on how to configure. + +{{< hint info >}} +Azure recommends using Azure managed identity to access the ADLS Gen2 storage accounts using abfs. Details on how to do this are beyond the scope of this documentation, please refer to the Azure documentation for more details. +{{< /hint >}} + +##### Accessing ABFS using storage Keys (Discouraged) +Azure blob storage key can be configured in `flink-conf.yaml` via: + +```yaml +fs.azure.account.key..dfs.core.windows.net: +``` + {{< top >}} diff --git a/docs/content/docs/deployment/filesystems/overview.md b/docs/content/docs/deployment/filesystems/overview.md index aec8b539c9d89..82ecaf9f28287 100644 --- a/docs/content/docs/deployment/filesystems/overview.md +++ b/docs/content/docs/deployment/filesystems/overview.md @@ -56,7 +56,7 @@ The Apache Flink project supports the following file systems: - **[Aliyun Object Storage Service]({{< ref "docs/deployment/filesystems/oss" >}})** is supported by `flink-oss-fs-hadoop` and registered under the *oss://* URI scheme. The implementation is based on the [Hadoop Project](https://hadoop.apache.org/) but is self-contained with no dependency footprint. - - **[Azure Blob Storage]({{< ref "docs/deployment/filesystems/azure" >}})** is supported by `flink-azure-fs-hadoop` and registered under the *wasb(s)://* URI schemes. + - **[Azure Blob Storage]({{< ref "docs/deployment/filesystems/azure" >}})** is supported by `flink-azure-fs-hadoop` and registered under the *abfs(s)://* and *wasb(s)://* URI schemes. The implementation is based on the [Hadoop Project](https://hadoop.apache.org/) but is self-contained with no dependency footprint. - **[Google Cloud Storage]({{< ref "docs/deployment/filesystems/gcs" >}})** is supported by `gcs-connector` and registered under the *gs://* URI scheme. diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java index 895869539c8d5..b77c2f5671ef1 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java @@ -248,6 +248,8 @@ public enum WriteMode { ImmutableMultimap.builder() .put("wasb", "flink-fs-azure-hadoop") .put("wasbs", "flink-fs-azure-hadoop") + .put("abfs", "flink-fs-azure-hadoop") + .put("abfss", "flink-fs-azure-hadoop") .put("oss", "flink-oss-fs-hadoop") .put("s3", "flink-s3-fs-hadoop") .put("s3", "flink-s3-fs-presto") diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/ABFSAzureFSFactory.java b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/ABFSAzureFSFactory.java new file mode 100644 index 0000000000000..6e4d87562df93 --- /dev/null +++ b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/ABFSAzureFSFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.azurefs; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; + +/** Abfs azureFs implementation. */ +public class ABFSAzureFSFactory extends AbstractAzureFSFactory { + + @Override + public String getScheme() { + return "abfs"; + } + + @Override + FileSystem createAzureFS() { + return new AzureBlobFileSystem(); + } +} diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AbstractAzureFSFactory.java b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AbstractAzureFSFactory.java index 5fbad53675f93..7a8eecc51745c 100644 --- a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AbstractAzureFSFactory.java +++ b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AbstractAzureFSFactory.java @@ -24,7 +24,6 @@ import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; import org.apache.flink.runtime.util.HadoopConfigLoader; -import org.apache.hadoop.fs.azure.NativeAzureFileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,8 +35,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** - * Abstract factory for AzureFS. Subclasses override to specify the correct scheme (wasb / wasbs). - * Based on Azure HDFS support in the hadoop-azure module. */ public abstract class AbstractAzureFSFactory implements FileSystemFactory { @@ -53,8 +52,6 @@ public abstract class AbstractAzureFSFactory implements FileSystemFactory { private final HadoopConfigLoader configLoader; - private Configuration flinkConfig; - public AbstractAzureFSFactory() { this.configLoader = new HadoopConfigLoader( @@ -68,25 +65,18 @@ public AbstractAzureFSFactory() { @Override public void configure(Configuration config) { - flinkConfig = config; configLoader.setFlinkConfig(config); } + abstract org.apache.hadoop.fs.FileSystem createAzureFS(); + @Override public FileSystem create(URI fsUri) throws IOException { checkNotNull(fsUri, "passed file system URI object should not be null"); - LOG.info("Trying to load and instantiate Azure File System"); - return new HadoopFileSystem(createInitializedAzureFS(fsUri, flinkConfig)); - } - - // uri is of the form: wasb(s)://yourcontainer@youraccount.blob.core.windows.net/testDir - private org.apache.hadoop.fs.FileSystem createInitializedAzureFS( - URI fsUri, Configuration flinkConfig) throws IOException { + LOG.info("Trying to load and instantiate Azure File System for {}", fsUri); org.apache.hadoop.conf.Configuration hadoopConfig = configLoader.getOrLoadHadoopConfig(); - - org.apache.hadoop.fs.FileSystem azureFS = new NativeAzureFileSystem(); - azureFS.initialize(fsUri, hadoopConfig); - - return azureFS; + org.apache.hadoop.fs.FileSystem fs = createAzureFS(); + fs.initialize(fsUri, hadoopConfig); + return new HadoopFileSystem(fs); } } diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureFSFactory.java b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureFSFactory.java index 15205692022e2..1b9ebf1b9ae36 100644 --- a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureFSFactory.java +++ b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureFSFactory.java @@ -18,6 +18,9 @@ package org.apache.flink.fs.azurefs; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.azure.NativeAzureFileSystem; + /** A factory for the Azure file system over HTTP. */ public class AzureFSFactory extends AbstractAzureFSFactory { @@ -25,4 +28,9 @@ public class AzureFSFactory extends AbstractAzureFSFactory { public String getScheme() { return "wasb"; } + + @Override + FileSystem createAzureFS() { + return new NativeAzureFileSystem(); + } } diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/SecureABFSAzureFSFactory.java b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/SecureABFSAzureFSFactory.java new file mode 100644 index 0000000000000..d813af7edf2a9 --- /dev/null +++ b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/SecureABFSAzureFSFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.azurefs; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; + +/** Secure ABFS AzureFS implementation. */ +public class SecureABFSAzureFSFactory extends AbstractAzureFSFactory { + + @Override + public String getScheme() { + return "abfss"; + } + + @Override + FileSystem createAzureFS() { + return new AzureBlobFileSystem(); + } +} diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/SecureAzureFSFactory.java b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/SecureAzureFSFactory.java index bdd0a6dad1b7d..f8e41dd29eb5c 100644 --- a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/SecureAzureFSFactory.java +++ b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/SecureAzureFSFactory.java @@ -18,6 +18,9 @@ package org.apache.flink.fs.azurefs; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.azure.NativeAzureFileSystem; + /** A factory for the Azure file system over HTTPs. */ public class SecureAzureFSFactory extends AbstractAzureFSFactory { @@ -25,4 +28,9 @@ public class SecureAzureFSFactory extends AbstractAzureFSFactory { public String getScheme() { return "wasbs"; } + + @Override + FileSystem createAzureFS() { + return new NativeAzureFileSystem(); + } } diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory b/flink-filesystems/flink-azure-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory index 4d6a19aa54e6c..462eb14eb7c33 100644 --- a/flink-filesystems/flink-azure-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory +++ b/flink-filesystems/flink-azure-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory @@ -15,3 +15,5 @@ org.apache.flink.fs.azurefs.AzureFSFactory org.apache.flink.fs.azurefs.SecureAzureFSFactory +org.apache.flink.fs.azurefs.ABFSAzureFSFactory +org.apache.flink.fs.azurefs.SecureABFSAzureFSFactory diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/ABFSAzureFSFactoryTest.java b/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/ABFSAzureFSFactoryTest.java new file mode 100644 index 0000000000000..a331bdc4389ea --- /dev/null +++ b/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/ABFSAzureFSFactoryTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.azurefs; + +import org.apache.flink.configuration.Configuration; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.net.URI; +import java.util.Arrays; +import java.util.List; + +/** Tests for the ABFSAzureFSFactory. */ +@RunWith(Parameterized.class) +public class ABFSAzureFSFactoryTest { + @Parameterized.Parameter public String scheme; + + @Parameterized.Parameters(name = "Scheme = {0}") + public static List parameters() { + return Arrays.asList("abfs", "abfss"); + } + + @Rule public final ExpectedException exception = ExpectedException.none(); + + private AbstractAzureFSFactory getFactory(String scheme) { + return scheme.equals("abfs") ? new ABFSAzureFSFactory() : new SecureABFSAzureFSFactory(); + } + + @Test + public void testNullFsURI() throws Exception { + URI uri = null; + AbstractAzureFSFactory factory = getFactory(scheme); + + exception.expect(NullPointerException.class); + exception.expectMessage("passed file system URI object should not be null"); + + factory.create(uri); + } + + @Test + public void testCreateFsWithMissingAuthority() throws Exception { + String uriString = String.format("%s:///my/path", scheme); + final URI uri = URI.create(uriString); + + exception.expect(IllegalArgumentException.class); + exception.expectMessage(String.format("%s has invalid authority.", uriString)); + + AbstractAzureFSFactory factory = getFactory(scheme); + factory.configure(new Configuration()); + factory.create(uri); + } +} diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/resources/META-INF/NOTICE b/flink-filesystems/flink-s3-fs-presto/src/main/resources/META-INF/NOTICE index 05c1f5051985c..d0ba23fcf3aa1 100644 --- a/flink-filesystems/flink-s3-fs-presto/src/main/resources/META-INF/NOTICE +++ b/flink-filesystems/flink-s3-fs-presto/src/main/resources/META-INF/NOTICE @@ -25,13 +25,13 @@ This project bundles the following dependencies under the Apache Software Licens - com.fasterxml.jackson.core:jackson-databind:2.12.1 - com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.12.1 - com.fasterxml.woodstox:woodstox-core:5.3.0 -- com.google.guava:guava:26.0-jre +- com.google.guava:guava:21.0 - com.google.inject:guice:4.2.2 -- com.facebook.airlift:configuration:0.201 -- com.facebook.airlift:log:0.201 -- com.facebook.airlift:stats:0.201 -- io.airlift:units:1.3 -- io.airlift:slice:0.38 +- io.airlift:configuration:0.153 +- io.airlift:log:0.153 +- io.airlift:stats:0.153 +- io.airlift:slice:0.31 +- io.airlift:units:1.0 - joda-time:joda-time:2.5 - org.apache.commons:commons-configuration2:2.1.1 - org.apache.commons:commons-lang3:3.3.2