Skip to content

Commit

Permalink
[apache#3379] feat(catalog-hadoop): Add S3 support for Fileset Hadoop…
Browse files Browse the repository at this point in the history
… catalog (apache#4232)

### What changes were proposed in this pull request?

Add S3 support for Fileset Hadoop catalog. We only add hadoop-aws
dependency actually, most of the work is conducting tests.

### Why are the changes needed?

Fix: apache#3379 

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

IT.

---------

Co-authored-by: zhanghan18 <[email protected]>
Co-authored-by: yuqi <[email protected]>
  • Loading branch information
3 people authored and mplmoknijb committed Nov 6, 2024
1 parent 7e4b387 commit bffdf2f
Show file tree
Hide file tree
Showing 16 changed files with 553 additions and 48 deletions.
1 change: 1 addition & 0 deletions LICENSE.bin
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@
Apache Hadoop
Apache Hadoop Aliyun connector
Apache Hadoop GCS connector
Apache Hadoop AWS connector
Apache Hadoop Annotatations
Apache Hadoop Auth
Apache Hadoop Client Aggregator
Expand Down
7 changes: 5 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,9 @@ tasks {
if (!it.name.startsWith("catalog") &&
!it.name.startsWith("authorization") &&
!it.name.startsWith("client") && !it.name.startsWith("filesystem") && !it.name.startsWith("spark") && !it.name.startsWith("iceberg") && it.name != "trino-connector" &&
it.name != "integration-test" && it.name != "hive-metastore-common" && !it.name.startsWith("flink") && it.name != "gcp-bundle" && it.name != "aliyun-bundle"
it.name != "integration-test" && it.name != "bundled-catalog" && !it.name.startsWith("flink") &&
it.name != "integration-test" && it.name != "hive-metastore-common" && !it.name.startsWith("flink") &&
it.name != "gcp-bundle" && it.name != "aliyun-bundle" && it.name != "aws-bundle"
) {
from(it.configurations.runtimeClasspath)
into("distribution/package/libs")
Expand All @@ -765,8 +767,9 @@ tasks {
!it.name.startsWith("integration-test") &&
!it.name.startsWith("flink") &&
!it.name.startsWith("trino-connector") &&
it.name != "bundled-catalog" &&
it.name != "hive-metastore-common" && it.name != "gcp-bundle" &&
it.name != "aliyun-bundle"
it.name != "aliyun-bundle" && it.name != "aws-bundle"
) {
dependsOn("${it.name}:build")
from("${it.name}/build/libs")
Expand Down
46 changes: 46 additions & 0 deletions bundles/aws-bundle/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar

plugins {
`maven-publish`
id("java")
alias(libs.plugins.shadow)
}

dependencies {
compileOnly(project(":catalogs:catalog-hadoop"))
compileOnly(libs.hadoop3.common)
implementation(libs.hadoop3.aws)
}

tasks.withType(ShadowJar::class.java) {
isZip64 = true
configurations = listOf(project.configurations.runtimeClasspath.get())
archiveClassifier.set("")
}

tasks.jar {
dependsOn(tasks.named("shadowJar"))
archiveClassifier.set("empty")
}

tasks.compileJava {
dependsOn(":catalogs:catalog-hadoop:runtimeJars")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.s3.fs;

import java.io.IOException;
import java.util.Map;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;

public class S3FileSystemProvider implements FileSystemProvider {
@Override
public FileSystem getFileSystem(Path path, Map<String, String> config) throws IOException {
Configuration configuration = new Configuration();
config.forEach(
(k, v) -> {
configuration.set(k.replace("gravitino.bypass.", ""), v);
});

return S3AFileSystem.newInstance(path.toUri(), configuration);
}

@Override
public String scheme() {
return "s3a";
}

@Override
public String name() {
return "s3";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

org.apache.gravitino.s3.fs.S3FileSystemProvider
6 changes: 6 additions & 0 deletions catalogs/catalog-hadoop/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ dependencies {
testImplementation(project(":integration-test-common", "testArtifacts"))
testImplementation(project(":server"))
testImplementation(project(":server-common"))
testImplementation(project(":bundles:aws-bundle"))
testImplementation(project(":bundles:gcp-bundle"))
testImplementation(project(":bundles:aliyun-bundle"))

Expand Down Expand Up @@ -161,6 +162,11 @@ tasks.test {
} else {
dependsOn(tasks.jar)
}

// this task depends on :bundles:aws-bundle:jar
dependsOn(":bundles:aws-bundle:jar")
dependsOn(":bundles:aliyun-bundle:jar")
dependsOn(":bundles:gcp-bundle:jar")
}

tasks.getByName("generateMetadataFileForMavenJavaPublication") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,24 @@ public class HadoopCatalogIT extends BaseIT {
private static final Logger LOG = LoggerFactory.getLogger(HadoopCatalogIT.class);
protected static final ContainerSuite containerSuite = ContainerSuite.getInstance();

public String metalakeName = GravitinoITUtils.genRandomName("CatalogFilesetIT_metalake");
public String catalogName = GravitinoITUtils.genRandomName("CatalogFilesetIT_catalog");
public static final String SCHEMA_PREFIX = "CatalogFilesetIT_schema";
public String schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
protected String metalakeName = GravitinoITUtils.genRandomName("CatalogFilesetIT_metalake");
protected String catalogName = GravitinoITUtils.genRandomName("CatalogFilesetIT_catalog");
public final String SCHEMA_PREFIX = "CatalogFilesetIT_schema";
protected String schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
protected static final String provider = "hadoop";
protected static GravitinoMetalake metalake;
protected static Catalog catalog;
protected static FileSystem fileSystem;
protected static String defaultBaseLocation;
protected GravitinoMetalake metalake;
protected Catalog catalog;
protected FileSystem fileSystem;
protected String defaultBaseLocation;

protected void startNecessaryContainer() {
containerSuite.startHiveContainer();
}

@BeforeAll
public void setup() throws IOException {
containerSuite.startHiveContainer();
startNecessaryContainer();

Configuration conf = new Configuration();
conf.set("fs.defaultFS", defaultBaseLocation());
fileSystem = FileSystem.get(conf);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.catalog.hadoop.integration.test;

import static org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.integration.test.container.GravitinoLocalStackContainer;
import org.apache.gravitino.integration.test.util.GravitinoITUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.shaded.org.awaitility.Awaitility;

@Tag("gravitino-docker-test")
public class HadoopS3CatalogIT extends HadoopCatalogIT {
private static final Logger LOG = LoggerFactory.getLogger(HadoopOSSCatalogIT.class);
private String bucketName = "s3-bucket-" + UUID.randomUUID().toString().replace("-", "");
private String accessKey;
private String secretKey;
private String s3Endpoint;

private GravitinoLocalStackContainer gravitinoLocalStackContainer;

@VisibleForTesting
public void startIntegrationTest() throws Exception {}

@Override
protected void startNecessaryContainer() {

containerSuite.startLocalStackContainer();
gravitinoLocalStackContainer = containerSuite.getLocalStackContainer();

Awaitility.await()
.atMost(60, TimeUnit.SECONDS)
.pollInterval(1, TimeUnit.SECONDS)
.until(
() -> {
try {
Container.ExecResult result =
gravitinoLocalStackContainer.executeInContainer(
"awslocal", "iam", "create-user", "--user-name", "anonymous");
return result.getExitCode() == 0;
} catch (Exception e) {
LOG.info("LocalStack is not ready yet for: ", e);
return false;
}
});

gravitinoLocalStackContainer.executeInContainer("awslocal", "s3", "mb", "s3://" + bucketName);

Container.ExecResult result =
gravitinoLocalStackContainer.executeInContainer(
"awslocal", "iam", "create-access-key", "--user-name", "anonymous");

gravitinoLocalStackContainer.executeInContainer(
"awslocal",
"s3api",
"put-bucket-acl",
"--bucket",
"my-test-bucket",
"--acl",
"public-read-write");

// Get access key and secret key from result
String[] lines = result.getStdout().split("\n");
accessKey = lines[3].split(":")[1].trim().substring(1, 21);
secretKey = lines[5].split(":")[1].trim().substring(1, 41);

LOG.info("Access key: " + accessKey);
LOG.info("Secret key: " + secretKey);

s3Endpoint =
String.format("http://%s:%d", gravitinoLocalStackContainer.getContainerIpAddress(), 4566);
}

@BeforeAll
public void setup() throws IOException {
copyBundleJarsToHadoop("aws-bundle");

try {
super.startIntegrationTest();
} catch (Exception e) {
throw new RuntimeException("Failed to start integration test", e);
}

startNecessaryContainer();

metalakeName = GravitinoITUtils.genRandomName("CatalogFilesetIT_metalake");
catalogName = GravitinoITUtils.genRandomName("CatalogFilesetIT_catalog");
schemaName = GravitinoITUtils.genRandomName("CatalogFilesetIT_schema");

schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
Configuration conf = new Configuration();

conf.set("fs.s3a.access.key", accessKey);
conf.set("fs.s3a.secret.key", secretKey);
conf.set("fs.s3a.endpoint", s3Endpoint);
conf.set(
"fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
fileSystem = FileSystem.get(URI.create(String.format("s3a://%s", bucketName)), conf);

createMetalake();
createCatalog();
createSchema();
}

@AfterAll
public void stop() throws IOException {
Catalog catalog = metalake.loadCatalog(catalogName);
catalog.asSchemas().dropSchema(schemaName, true);
metalake.dropCatalog(catalogName, true);
client.dropMetalake(metalakeName);

try {
closer.close();
} catch (Exception e) {
LOG.error("Failed to close CloseableGroup", e);
}
}

protected String defaultBaseLocation() {
if (defaultBaseLocation == null) {
try {
Path bucket =
new Path(
String.format(
"s3a://%s/%s", bucketName, GravitinoITUtils.genRandomName("CatalogFilesetIT")));
if (!fileSystem.exists(bucket)) {
fileSystem.mkdirs(bucket);
}

defaultBaseLocation = bucket.toString();
} catch (IOException e) {
throw new RuntimeException("Failed to create default base location", e);
}
}

return defaultBaseLocation;
}

protected void createCatalog() {
Map<String, String> map = Maps.newHashMap();
map.put("gravitino.bypass.fs.s3a.access.key", accessKey);
map.put("gravitino.bypass.fs.s3a.secret.key", secretKey);
map.put("gravitino.bypass.fs.s3a.endpoint", s3Endpoint);
map.put(
"gravitino.bypass.fs.s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
map.put(FILESYSTEM_PROVIDERS, "s3");

metalake.createCatalog(catalogName, Catalog.Type.FILESET, provider, "comment", map);

catalog = metalake.loadCatalog(catalogName);
}

protected String generateLocation(String filesetName) {
return String.format("%s/%s", defaultBaseLocation, filesetName);
}
}
Loading

0 comments on commit bffdf2f

Please sign in to comment.