Skip to content

Commit

Permalink
Adding ABFS support to access ADLS Gen2 storage accounts
Browse files Browse the repository at this point in the history
  • Loading branch information
srinipunuru committed Jul 21, 2021
1 parent f9286a5 commit e52f46a
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@ public enum WriteMode {
ImmutableMultimap.<String, String>builder()
.put("wasb", "flink-fs-azure-hadoop")
.put("wasbs", "flink-fs-azure-hadoop")
.put("abfs", "flink-fs-azure-hadoop")
.put("abfss", "flink-fs-azure-hadoop")
.put("oss", "flink-oss-fs-hadoop")
.put("s3", "flink-s3-fs-hadoop")
.put("s3", "flink-s3-fs-presto")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.fs.azurefs;

/** Abfs azureFs implementation. */
public class ABFSAzureFSFactory extends AbstractAzureFSFactory {

@Override
public String getScheme() {
return "abfs";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.runtime.util.HadoopConfigLoader;

import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,7 +37,7 @@
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Abstract factory for AzureFS. Subclasses override to specify the correct scheme (wasb / wasbs).
* Abstract factory for AzureFS. Subclasses override to specify the correct scheme (wasb / wasbs / abfs/ abfss).
* Based on Azure HDFS support in the <a
* href="https://hadoop.apache.org/docs/current/hadoop-azure/index.html">hadoop-azure</a> module.
*/
Expand Down Expand Up @@ -75,18 +76,27 @@ public void configure(Configuration config) {
@Override
public FileSystem create(URI fsUri) throws IOException {
checkNotNull(fsUri, "passed file system URI object should not be null");
LOG.info("Trying to load and instantiate Azure File System");
LOG.info("Trying to load and instantiate Azure File System for {}", fsUri);
return new HadoopFileSystem(createInitializedAzureFS(fsUri, flinkConfig));
}

// uri is of the form: wasb(s)://[email protected]/testDir
// uri is of the form: wasb(s)://[email protected]/testDir (or)
// abfs(s):////[email protected]/testDir
private org.apache.hadoop.fs.FileSystem createInitializedAzureFS(
URI fsUri, Configuration flinkConfig) throws IOException {
org.apache.hadoop.conf.Configuration hadoopConfig = configLoader.getOrLoadHadoopConfig();

org.apache.hadoop.fs.FileSystem azureFS = new NativeAzureFileSystem();
azureFS.initialize(fsUri, hadoopConfig);

return azureFS;
String scheme = fsUri.getScheme();

if (scheme.startsWith("wasb")) {
LOG.info("Trying to initialize hadoop filesystem for {}.", scheme);
org.apache.hadoop.fs.FileSystem azureFS = new NativeAzureFileSystem();
azureFS.initialize(fsUri, hadoopConfig);
return azureFS;
} else {
LOG.info("Trying to initialize hadoop filesystem for {}.", scheme);
org.apache.hadoop.fs.FileSystem azureFS = new AzureBlobFileSystem();
azureFS.initialize(fsUri, hadoopConfig);
return azureFS;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.fs.azurefs;

/** Secure ABFS AzureFS implementation. */
public class SecureABFSAzureFSFactory extends AbstractAzureFSFactory {

@Override
public String getScheme() {
return "abfss";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@

org.apache.flink.fs.azurefs.AzureFSFactory
org.apache.flink.fs.azurefs.SecureAzureFSFactory
org.apache.flink.fs.azurefs.ABFSAzureFSFactory
org.apache.flink.fs.azurefs.SecureABFSAzureFSFactory
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.apache.flink.fs.azurefs;

import org.apache.flink.configuration.Configuration;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.net.URI;
import java.util.Arrays;
import java.util.List;

/** Tests for the ABFSAzureFSFactory. */
@RunWith(Parameterized.class)
public class ABFSAzureFSFactoryTest {
@Parameterized.Parameter public String scheme;

@Parameterized.Parameters(name = "Scheme = {0}")
public static List<String> parameters() {
return Arrays.asList("abfs", "abfss");
}

@Rule public final ExpectedException exception = ExpectedException.none();

private AbstractAzureFSFactory getFactory(String scheme) {
return scheme.equals("abfs") ? new ABFSAzureFSFactory() : new SecureABFSAzureFSFactory();
}

@Test
public void testNullFsURI() throws Exception {
URI uri = null;
AbstractAzureFSFactory factory = getFactory(scheme);

exception.expect(NullPointerException.class);
exception.expectMessage("passed file system URI object should not be null");

factory.create(uri);
}

@Test
public void testCreateFsWithMissingAuthority() throws Exception {
String uriString = String.format("%s:///my/path", scheme);
final URI uri = URI.create(uriString);

exception.expect(IllegalArgumentException.class);
exception.expectMessage(String.format("%s has invalid authority.", uriString));

AbstractAzureFSFactory factory = getFactory(scheme);
factory.configure(new Configuration());
factory.create(uri);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.WriteOperationHelper;
import org.apache.hadoop.fs.s3a.statistics.impl.BondedS3AStatisticsContext;

import java.io.File;
import java.io.FileOutputStream;
Expand All @@ -52,8 +55,22 @@ public class HadoopS3AccessHelper implements S3AccessHelper {
private final InternalWriteOperationHelper s3accessHelper;

public HadoopS3AccessHelper(S3AFileSystem s3a, Configuration conf) {
FileSystem.Statistics statistics = new FileSystem.Statistics(s3a.getScheme());
BondedS3AStatisticsContext statisticsContext =
new BondedS3AStatisticsContext(
new BondedS3AStatisticsContext.S3AFSStatisticsSource() {
public S3AInstrumentation getInstrumentation() {
return s3a.getInstrumentation();
}

public FileSystem.Statistics getInstanceStatistics() {
return statistics;
}
});

this.s3accessHelper =
new InternalWriteOperationHelper(checkNotNull(s3a), checkNotNull(conf));
new InternalWriteOperationHelper(
checkNotNull(s3a), checkNotNull(conf), statisticsContext);
this.s3a = s3a;
}

Expand Down Expand Up @@ -144,8 +161,11 @@ public ObjectMetadata getObjectMetadata(String key) throws IOException {
*/
private static final class InternalWriteOperationHelper extends WriteOperationHelper {

InternalWriteOperationHelper(S3AFileSystem owner, Configuration conf) {
super(owner, conf);
InternalWriteOperationHelper(
S3AFileSystem owner,
Configuration conf,
BondedS3AStatisticsContext statisticsContext) {
super(owner, conf, statisticsContext);
}
}
}
2 changes: 1 addition & 1 deletion flink-filesystems/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ under the License.
<packaging>pom</packaging>

<properties>
<fs.hadoopshaded.version>3.1.0</fs.hadoopshaded.version>
<fs.hadoopshaded.version>3.3.1</fs.hadoopshaded.version>
</properties>

<modules>
Expand Down

0 comments on commit e52f46a

Please sign in to comment.