Skip to content

Commit

Permalink
Adding ABFS support to access ADLS Gen2 storage accounts
Browse files Browse the repository at this point in the history
  • Loading branch information
srinipunuru committed Aug 5, 2021
1 parent b7352bf commit c84c830
Show file tree
Hide file tree
Showing 11 changed files with 217 additions and 28 deletions.
41 changes: 38 additions & 3 deletions docs/content/docs/deployment/filesystems/azure.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,31 @@ under the License.
[Azure Blob Storage](https://docs.microsoft.com/en-us/azure/storage/) is a Microsoft-managed service providing cloud storage for a variety of use cases.
You can use Azure Blob Storage with Flink for **reading** and **writing data** as well in conjunction with the [streaming **state backends**]({{< ref "docs/ops/state/state_backends" >}})

Flink supports accessing Azure Blob Storage using both [wasb://](https://hadoop.apache.org/docs/stable/hadoop-azure/index.html) or [abfs://](https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html).

{{< hint info >}}
Azure recommends using abfs:// for accessing ADLS Gen2 storage accounts even though wasb:// works through backward compatibility.
{{< /hint >}}

{{< hint warning >}}
abfs:// can be used for accessing the ADLS Gen2 storage accounts only. Please visit Azure documentation on how to identify ADLS Gen2 storage account.
{{< /hint >}}


You can use Azure Blob Storage objects like regular files by specifying paths in the following format:

```plain
// WASB unencrypted access
wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>
// SSL encrypted access
// WASB SSL encrypted access
wasbs://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>
// ABFS unecrypted access
abfs://<your-container>@$<your-azure-account>.dfs.core.windows.net/<object-path>
// ABFS SSL encrypted access
abfss://<your-container>@$<your-azure-account>.dfs.core.windows.net/<object-path>
```

See below for how to use Azure Blob Storage in a Flink job:
Expand All @@ -63,9 +81,11 @@ cp ./opt/flink-azure-fs-hadoop-{{< version >}}.jar ./plugins/azure-fs-hadoop/

`flink-azure-fs-hadoop` registers default FileSystem wrappers for URIs with the *wasb://* and *wasbs://* (SSL encrypted access) scheme.

### Credentials Configuration
## Credentials Configuration

### WASB

Hadoop's Azure Filesystem supports configuration of credentials via the Hadoop configuration as
Hadoop's WASB Azure Filesystem supports configuration of credentials via the Hadoop configuration as
outlined in the [Hadoop Azure Blob Storage documentation](https://hadoop.apache.org/docs/current/hadoop-azure/index.html#Configuring_Credentials).
For convenience Flink forwards all Flink configurations with a key prefix of `fs.azure` to the
Hadoop configuration of the filesystem. Consequentially, the azure blob storage key can be configured
Expand All @@ -83,4 +103,19 @@ environment variable `AZURE_STORAGE_KEY` by setting the following configuration
fs.azure.account.keyprovider.<account_name>.blob.core.windows.net: org.apache.flink.fs.azurefs.EnvironmentVariableKeyProvider
```

### ABFS

Hadoop's ABFS Azure Filesystem supports several ways of configuring authentication. Please visit the [Hadoop ABFS documentation](https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html#Authentication) documentation on how to configure.

{{< hint info >}}
Azure recommends using Azure managed identity to access the ADLS Gen2 storage accounts using abfs. Details on how to do this are beyond the scope of this documentation, please refer to the Azure documentation for more details.
{{< /hint >}}

##### Accessing ABFS using storage Keys (Discouraged)
Azure blob storage key can be configured in `flink-conf.yaml` via:

```yaml
fs.azure.account.key.<account_name>.dfs.core.windows.net: <azure_storage_key>
```

{{< top >}}
2 changes: 1 addition & 1 deletion docs/content/docs/deployment/filesystems/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ The Apache Flink project supports the following file systems:
- **[Aliyun Object Storage Service]({{< ref "docs/deployment/filesystems/oss" >}})** is supported by `flink-oss-fs-hadoop` and registered under the *oss://* URI scheme.
The implementation is based on the [Hadoop Project](https://hadoop.apache.org/) but is self-contained with no dependency footprint.

- **[Azure Blob Storage]({{< ref "docs/deployment/filesystems/azure" >}})** is supported by `flink-azure-fs-hadoop` and registered under the *wasb(s)://* URI schemes.
- **[Azure Blob Storage]({{< ref "docs/deployment/filesystems/azure" >}})** is supported by `flink-azure-fs-hadoop` and registered under the *abfs(s)://* and *wasb(s)://* URI schemes.
The implementation is based on the [Hadoop Project](https://hadoop.apache.org/) but is self-contained with no dependency footprint.

- **[Google Cloud Storage]({{< ref "docs/deployment/filesystems/gcs" >}})** is supported by `gcs-connector` and registered under the *gs://* URI scheme.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@ public enum WriteMode {
ImmutableMultimap.<String, String>builder()
.put("wasb", "flink-fs-azure-hadoop")
.put("wasbs", "flink-fs-azure-hadoop")
.put("abfs", "flink-fs-azure-hadoop")
.put("abfss", "flink-fs-azure-hadoop")
.put("oss", "flink-oss-fs-hadoop")
.put("s3", "flink-s3-fs-hadoop")
.put("s3", "flink-s3-fs-presto")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.fs.azurefs;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;

/** Abfs azureFs implementation. */
public class ABFSAzureFSFactory extends AbstractAzureFSFactory {

@Override
public String getScheme() {
return "abfs";
}

@Override
FileSystem createAzureFS() {
return new AzureBlobFileSystem();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.runtime.util.HadoopConfigLoader;

import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,8 +35,8 @@
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Abstract factory for AzureFS. Subclasses override to specify the correct scheme (wasb / wasbs).
* Based on Azure HDFS support in the <a
* Abstract factory for AzureFS. Subclasses override to specify the correct scheme (wasb / wasbs /
* abfs/ abfss). Based on Azure HDFS support in the <a
* href="https://hadoop.apache.org/docs/current/hadoop-azure/index.html">hadoop-azure</a> module.
*/
public abstract class AbstractAzureFSFactory implements FileSystemFactory {
Expand All @@ -53,8 +52,6 @@ public abstract class AbstractAzureFSFactory implements FileSystemFactory {

private final HadoopConfigLoader configLoader;

private Configuration flinkConfig;

public AbstractAzureFSFactory() {
this.configLoader =
new HadoopConfigLoader(
Expand All @@ -68,25 +65,18 @@ public AbstractAzureFSFactory() {

@Override
public void configure(Configuration config) {
flinkConfig = config;
configLoader.setFlinkConfig(config);
}

abstract org.apache.hadoop.fs.FileSystem createAzureFS();

@Override
public FileSystem create(URI fsUri) throws IOException {
checkNotNull(fsUri, "passed file system URI object should not be null");
LOG.info("Trying to load and instantiate Azure File System");
return new HadoopFileSystem(createInitializedAzureFS(fsUri, flinkConfig));
}

// uri is of the form: wasb(s)://[email protected]/testDir
private org.apache.hadoop.fs.FileSystem createInitializedAzureFS(
URI fsUri, Configuration flinkConfig) throws IOException {
LOG.info("Trying to load and instantiate Azure File System for {}", fsUri);
org.apache.hadoop.conf.Configuration hadoopConfig = configLoader.getOrLoadHadoopConfig();

org.apache.hadoop.fs.FileSystem azureFS = new NativeAzureFileSystem();
azureFS.initialize(fsUri, hadoopConfig);

return azureFS;
org.apache.hadoop.fs.FileSystem fs = createAzureFS();
fs.initialize(fsUri, hadoopConfig);
return new HadoopFileSystem(fs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,19 @@

package org.apache.flink.fs.azurefs;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.azure.NativeAzureFileSystem;

/** A factory for the Azure file system over HTTP. */
public class AzureFSFactory extends AbstractAzureFSFactory {

@Override
public String getScheme() {
return "wasb";
}

@Override
FileSystem createAzureFS() {
return new NativeAzureFileSystem();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.fs.azurefs;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;

/** Secure ABFS AzureFS implementation. */
public class SecureABFSAzureFSFactory extends AbstractAzureFSFactory {

@Override
public String getScheme() {
return "abfss";
}

@Override
FileSystem createAzureFS() {
return new AzureBlobFileSystem();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,19 @@

package org.apache.flink.fs.azurefs;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.azure.NativeAzureFileSystem;

/** A factory for the Azure file system over HTTPs. */
public class SecureAzureFSFactory extends AbstractAzureFSFactory {

@Override
public String getScheme() {
return "wasbs";
}

@Override
FileSystem createAzureFS() {
return new NativeAzureFileSystem();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@

org.apache.flink.fs.azurefs.AzureFSFactory
org.apache.flink.fs.azurefs.SecureAzureFSFactory
org.apache.flink.fs.azurefs.ABFSAzureFSFactory
org.apache.flink.fs.azurefs.SecureABFSAzureFSFactory
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.fs.azurefs;

import org.apache.flink.configuration.Configuration;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.net.URI;
import java.util.Arrays;
import java.util.List;

/** Tests for the ABFSAzureFSFactory. */
@RunWith(Parameterized.class)
public class ABFSAzureFSFactoryTest {
@Parameterized.Parameter public String scheme;

@Parameterized.Parameters(name = "Scheme = {0}")
public static List<String> parameters() {
return Arrays.asList("abfs", "abfss");
}

@Rule public final ExpectedException exception = ExpectedException.none();

private AbstractAzureFSFactory getFactory(String scheme) {
return scheme.equals("abfs") ? new ABFSAzureFSFactory() : new SecureABFSAzureFSFactory();
}

@Test
public void testNullFsURI() throws Exception {
URI uri = null;
AbstractAzureFSFactory factory = getFactory(scheme);

exception.expect(NullPointerException.class);
exception.expectMessage("passed file system URI object should not be null");

factory.create(uri);
}

@Test
public void testCreateFsWithMissingAuthority() throws Exception {
String uriString = String.format("%s:///my/path", scheme);
final URI uri = URI.create(uriString);

exception.expect(IllegalArgumentException.class);
exception.expectMessage(String.format("%s has invalid authority.", uriString));

AbstractAzureFSFactory factory = getFactory(scheme);
factory.configure(new Configuration());
factory.create(uri);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ This project bundles the following dependencies under the Apache Software Licens
- com.fasterxml.jackson.core:jackson-databind:2.12.1
- com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.12.1
- com.fasterxml.woodstox:woodstox-core:5.3.0
- com.google.guava:guava:26.0-jre
- com.google.guava:guava:21.0
- com.google.inject:guice:4.2.2
- com.facebook.airlift:configuration:0.201
- com.facebook.airlift:log:0.201
- com.facebook.airlift:stats:0.201
- io.airlift:units:1.3
- io.airlift:slice:0.38
- io.airlift:configuration:0.153
- io.airlift:log:0.153
- io.airlift:stats:0.153
- io.airlift:slice:0.31
- io.airlift:units:1.0
- joda-time:joda-time:2.5
- org.apache.commons:commons-configuration2:2.1.1
- org.apache.commons:commons-lang3:3.3.2
Expand Down

0 comments on commit c84c830

Please sign in to comment.