Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-18562] Support for Hadoop ABFS for Azure Datalake Gen2 accounts #16559

Merged
merged 2 commits into from
Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 40 additions & 3 deletions docs/content/docs/deployment/filesystems/azure.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,31 @@ under the License.
[Azure Blob Storage](https://docs.microsoft.com/en-us/azure/storage/) is a Microsoft-managed service providing cloud storage for a variety of use cases.
You can use Azure Blob Storage with Flink for **reading** and **writing data** as well in conjunction with the [streaming **state backends**]({{< ref "docs/ops/state/state_backends" >}})

Flink supports accessing Azure Blob Storage using both [wasb://](https://hadoop.apache.org/docs/stable/hadoop-azure/index.html) or [abfs://](https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html).

{{< hint info >}}
Azure recommends using abfs:// for accessing ADLS Gen2 storage accounts even though wasb:// works through backward compatibility.
{{< /hint >}}

{{< hint warning >}}
abfs:// can be used for accessing the ADLS Gen2 storage accounts only. Please visit Azure documentation on how to identify ADLS Gen2 storage account.
{{< /hint >}}


You can use Azure Blob Storage objects like regular files by specifying paths in the following format:

```plain
// WASB unencrypted access
wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>

// SSL encrypted access
// WASB SSL encrypted access
wasbs://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>

// ABFS unecrypted access
abfs://<your-container>@$<your-azure-account>.dfs.core.windows.net/<object-path>

// ABFS SSL encrypted access
abfss://<your-container>@$<your-azure-account>.dfs.core.windows.net/<object-path>
```

See below for how to use Azure Blob Storage in a Flink job:
Expand All @@ -63,9 +81,11 @@ cp ./opt/flink-azure-fs-hadoop-{{< version >}}.jar ./plugins/azure-fs-hadoop/

`flink-azure-fs-hadoop` registers default FileSystem wrappers for URIs with the *wasb://* and *wasbs://* (SSL encrypted access) scheme.

### Credentials Configuration
## Credentials Configuration

Hadoop's Azure Filesystem supports configuration of credentials via the Hadoop configuration as
### WASB

Hadoop's WASB Azure Filesystem supports configuration of credentials via the Hadoop configuration as
outlined in the [Hadoop Azure Blob Storage documentation](https://hadoop.apache.org/docs/current/hadoop-azure/index.html#Configuring_Credentials).
For convenience Flink forwards all Flink configurations with a key prefix of `fs.azure` to the
Hadoop configuration of the filesystem. Consequentially, the azure blob storage key can be configured
Expand All @@ -83,4 +103,21 @@ environment variable `AZURE_STORAGE_KEY` by setting the following configuration
fs.azure.account.keyprovider.<account_name>.blob.core.windows.net: org.apache.flink.fs.azurefs.EnvironmentVariableKeyProvider
```

### ABFS

Hadoop's ABFS Azure Filesystem supports several ways of configuring authentication. Please visit the [Hadoop ABFS documentation](https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html#Authentication) documentation on how to configure.

{{< hint info >}}
Azure recommends using Azure managed identity to access the ADLS Gen2 storage accounts using abfs. Please refer to [Azure managed identities documentation](https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/) for more details.

Please visit the [page](https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/services-support-managed-identities#azure-services-that-support-managed-identities-for-azure-resources) for the list of services that support Managed Identities. Flink clusters deployed in those Azure services can take advantage of Managed Identities.
{{< /hint >}}

##### Accessing ABFS using storage Keys (Discouraged)
Azure blob storage key can be configured in `flink-conf.yaml` via:

```yaml
fs.azure.account.key.<account_name>.dfs.core.windows.net: <azure_storage_key>
```

{{< top >}}
5 changes: 4 additions & 1 deletion docs/content/docs/deployment/filesystems/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,12 @@ The Apache Flink project supports the following file systems:
- **[Aliyun Object Storage Service]({{< ref "docs/deployment/filesystems/oss" >}})** is supported by `flink-oss-fs-hadoop` and registered under the *oss://* URI scheme.
The implementation is based on the [Hadoop Project](https://hadoop.apache.org/) but is self-contained with no dependency footprint.

- **[Azure Data Lake Store Gen2]({{< ref "docs/deployment/filesystems/azure" >}})** is supported by `flink-azure-fs-hadoop` and registered under the *abfs(s)://* URI schemes.
The implementation is based on the [Hadoop Project](https://hadoop.apache.org/) but is self-contained with no dependency footprint.

- **[Azure Blob Storage]({{< ref "docs/deployment/filesystems/azure" >}})** is supported by `flink-azure-fs-hadoop` and registered under the *wasb(s)://* URI schemes.
The implementation is based on the [Hadoop Project](https://hadoop.apache.org/) but is self-contained with no dependency footprint.

- **[Google Cloud Storage]({{< ref "docs/deployment/filesystems/gcs" >}})** is supported by `gcs-connector` and registered under the *gs://* URI scheme.
The implementation is based on the [Hadoop Project](https://hadoop.apache.org/) but is self-contained with no dependency footprint.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@ public enum WriteMode {
ImmutableMultimap.<String, String>builder()
.put("wasb", "flink-fs-azure-hadoop")
.put("wasbs", "flink-fs-azure-hadoop")
.put("abfs", "flink-fs-azure-hadoop")
.put("abfss", "flink-fs-azure-hadoop")
AHeise marked this conversation as resolved.
Show resolved Hide resolved
.put("oss", "flink-oss-fs-hadoop")
.put("s3", "flink-s3-fs-hadoop")
.put("s3", "flink-s3-fs-presto")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.runtime.util.HadoopConfigLoader;

import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,12 +35,12 @@
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Abstract factory for AzureFS. Subclasses override to specify the correct scheme (wasb / wasbs).
* Based on Azure HDFS support in the <a
* Abstract factory for AzureFS. Subclasses override to specify the correct scheme (wasb / wasbs /
* abfs/ abfss). Based on Azure HDFS support in the <a
* href="https://hadoop.apache.org/docs/current/hadoop-azure/index.html">hadoop-azure</a> module.
*/
public abstract class AbstractAzureFSFactory implements FileSystemFactory {
private static final Logger LOG = LoggerFactory.getLogger(AzureFSFactory.class);
private static final Logger LOG = LoggerFactory.getLogger(AzureBlobStorageFSFactory.class);

private static final String[] FLINK_CONFIG_PREFIXES = {"fs.azure.", "azure."};
private static final String HADOOP_CONFIG_PREFIX = "fs.azure.";
Expand All @@ -53,8 +52,6 @@ public abstract class AbstractAzureFSFactory implements FileSystemFactory {

private final HadoopConfigLoader configLoader;

private Configuration flinkConfig;

public AbstractAzureFSFactory() {
this.configLoader =
new HadoopConfigLoader(
Expand All @@ -68,25 +65,18 @@ public AbstractAzureFSFactory() {

@Override
public void configure(Configuration config) {
flinkConfig = config;
configLoader.setFlinkConfig(config);
}

abstract org.apache.hadoop.fs.FileSystem createAzureFS();

@Override
public FileSystem create(URI fsUri) throws IOException {
checkNotNull(fsUri, "passed file system URI object should not be null");
LOG.info("Trying to load and instantiate Azure File System");
return new HadoopFileSystem(createInitializedAzureFS(fsUri, flinkConfig));
}

// uri is of the form: wasb(s)://[email protected]/testDir
private org.apache.hadoop.fs.FileSystem createInitializedAzureFS(
URI fsUri, Configuration flinkConfig) throws IOException {
LOG.info("Trying to load and instantiate Azure File System for {}", fsUri);
org.apache.hadoop.conf.Configuration hadoopConfig = configLoader.getOrLoadHadoopConfig();

org.apache.hadoop.fs.FileSystem azureFS = new NativeAzureFileSystem();
azureFS.initialize(fsUri, hadoopConfig);

return azureFS;
org.apache.hadoop.fs.FileSystem fs = createAzureFS();
fs.initialize(fsUri, hadoopConfig);
return new HadoopFileSystem(fs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,19 @@

package org.apache.flink.fs.azurefs;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.azure.NativeAzureFileSystem;

/** A factory for the Azure file system over HTTP. */
public class AzureFSFactory extends AbstractAzureFSFactory {
public class AzureBlobStorageFSFactory extends AbstractAzureFSFactory {

@Override
public String getScheme() {
return "wasb";
}

@Override
FileSystem createAzureFS() {
return new NativeAzureFileSystem();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.fs.azurefs;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;

/** Abfs azureFs implementation. */
public class AzureDataLakeStoreGen2FSFactory extends AbstractAzureFSFactory {

@Override
public String getScheme() {
return "abfs";
}

@Override
FileSystem createAzureFS() {
return new AzureBlobFileSystem();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,19 @@

package org.apache.flink.fs.azurefs;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.azure.NativeAzureFileSystem;

/** A factory for the Azure file system over HTTPs. */
public class SecureAzureFSFactory extends AbstractAzureFSFactory {
public class SecureAzureBlobStorageFSFactory extends AbstractAzureFSFactory {

@Override
public String getScheme() {
return "wasbs";
}

@Override
FileSystem createAzureFS() {
return new NativeAzureFileSystem();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.fs.azurefs;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;

/** Secure ABFS AzureFS implementation. */
public class SecureAzureDataLakeStoreGen2FSFactory extends AbstractAzureFSFactory {

@Override
public String getScheme() {
return "abfss";
}

@Override
FileSystem createAzureFS() {
return new AzureBlobFileSystem();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,20 @@ This project bundles the following dependencies under the Apache Software Licens
- com.fasterxml.jackson.core:jackson-annotations:2.12.1
- com.fasterxml.jackson.core:jackson-core:2.12.1
- com.fasterxml.jackson.core:jackson-databind:2.12.1
- com.google.guava:guava:11.0.2
- com.microsoft.azure:azure-keyvault-core:0.8.0
- com.microsoft.azure:azure-storage:5.4.0
- com.google.guava:guava:20.0
- commons-codec:commons-codec:1.13
- commons-logging:commons-logging:1.1.3
- org.apache.hadoop:hadoop-azure:3.1.0
- org.apache.hadoop:hadoop-azure:3.3.1
- org.apache.hadoop.thirdparty:hadoop-shaded-guava:1.1.1
- org.apache.httpcomponents:httpclient:4.5.13
- org.apache.httpcomponents:httpcore:4.4.14
- org.codehaus.jackson:jackson-mapper-asl:1.9.13
- org.codehaus.jackson:jackson-core-asl:1.9.13
- org.eclipse.jetty:jetty-util:9.3.24.v20180605
- org.eclipse.jetty:jetty-util-ajax:9.3.24.v20180605
- org.wildfly.openssl:wildfly-openssl:1.0.7.Final

This project bundles the following dependencies under the MIT (https://opensource.org/licenses/MIT)

- com.microsoft.azure:azure-keyvault-core:1.0.0
- com.microsoft.azure:azure-storage:7.0.1
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.flink.fs.azurefs.AzureFSFactory
org.apache.flink.fs.azurefs.SecureAzureFSFactory
org.apache.flink.fs.azurefs.AzureBlobStorageFSFactory
org.apache.flink.fs.azurefs.SecureAzureBlobStorageFSFactory
org.apache.flink.fs.azurefs.AzureDataLakeStoreGen2FSFactory
org.apache.flink.fs.azurefs.SecureAzureDataLakeStoreGen2FSFactory
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

/** Tests for the AzureFSFactory. */
@RunWith(Parameterized.class)
public class AzureFSFactoryTest extends TestLogger {
public class AzureBlobStorageFSFactoryTest extends TestLogger {

@Parameterized.Parameter public String scheme;

Expand All @@ -46,7 +46,9 @@ public static List<String> parameters() {
@Rule public final ExpectedException exception = ExpectedException.none();

private AbstractAzureFSFactory getFactory(String scheme) {
return scheme.equals("wasb") ? new AzureFSFactory() : new SecureAzureFSFactory();
return scheme.equals("wasb")
? new AzureBlobStorageFSFactory()
: new SecureAzureBlobStorageFSFactory();
}

@Test
Expand Down
Loading