Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3379] feat(catalog-hadoop): Add S3 support for Fileset Hadoop catalog #4232

Merged
merged 33 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
4aa4dc5
feat: skeleton for fileset on s3
Jul 10, 2024
52fac93
add some IT for fileset on s3
Jul 12, 2024
df38821
resolve conflict
Jul 20, 2024
5d18a17
add some IT
Jul 21, 2024
0e1c86b
rename
Jul 21, 2024
0604533
code enhance
Jul 22, 2024
28d1bf7
Merge branch 'main' into fileset-s3
Jul 22, 2024
7ad945b
use hadoop-aws version 3.3.6 in test
Jul 23, 2024
e9ecd09
Merge branch 'main' into fileset-s3
Jul 25, 2024
ad028ee
upgrade to hadoop 3.3.6
Jul 31, 2024
8fea51c
refactor test code
Jul 31, 2024
ae08425
Merge branch 'main' into fileset-s3
Aug 1, 2024
18a7bc2
Merge branch 'main' into fileset-s3
Aug 5, 2024
a2bbf0c
config
Aug 7, 2024
31f0e43
only support catalog-level auth for s3
Aug 8, 2024
7ac1500
Merge branch 'main' into fileset-s3
Aug 8, 2024
5b6649c
resolve conflict
Aug 8, 2024
70ceb68
add region config
Aug 8, 2024
489ea69
Merge branch 'main' into fileset-s3
Aug 14, 2024
2d3dd13
Merge branch 'main' into fileset-s3
Sep 9, 2024
c340a46
refactor
Sep 10, 2024
278cbbd
code style
Sep 10, 2024
9cc926b
fix version id
Sep 10, 2024
4be1f34
Merge branch 'main' of github.com:datastrato/graviton into fileset-s3
yuqi1129 Oct 19, 2024
02f498a
fix
yuqi1129 Oct 19, 2024
83c3fa9
fix
yuqi1129 Oct 19, 2024
915f751
fix test error.
yuqi1129 Oct 19, 2024
073d45c
fix test error.
yuqi1129 Oct 19, 2024
8c6f39e
Merge branch 'main' into fileset-s3
yuqi1129 Oct 19, 2024
b44bd6b
fix
yuqi1129 Oct 19, 2024
8b12ebb
fix
yuqi1129 Oct 19, 2024
d99a153
Remove unused code.
yuqi1129 Oct 21, 2024
7dbb936
Optimize code.
yuqi1129 Oct 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions LICENSE.bin
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@
Apache Hadoop
Apache Hadoop Aliyun connector
Apache Hadoop GCS connector
Apache Hadoop AWS connector
Apache Hadoop Annotatations
Apache Hadoop Auth
Apache Hadoop Client Aggregator
Expand Down
7 changes: 5 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,9 @@ tasks {
if (!it.name.startsWith("catalog") &&
!it.name.startsWith("authorization") &&
!it.name.startsWith("client") && !it.name.startsWith("filesystem") && !it.name.startsWith("spark") && !it.name.startsWith("iceberg") && it.name != "trino-connector" &&
it.name != "integration-test" && it.name != "hive-metastore-common" && !it.name.startsWith("flink") && it.name != "gcp-bundle" && it.name != "aliyun-bundle"
it.name != "integration-test" && it.name != "bundled-catalog" && !it.name.startsWith("flink") &&
it.name != "integration-test" && it.name != "hive-metastore-common" && !it.name.startsWith("flink") &&
it.name != "gcp-bundle" && it.name != "aliyun-bundle" && it.name != "aws-bundle"
) {
from(it.configurations.runtimeClasspath)
into("distribution/package/libs")
Expand All @@ -764,8 +766,9 @@ tasks {
!it.name.startsWith("integration-test") &&
!it.name.startsWith("flink") &&
!it.name.startsWith("trino-connector") &&
it.name != "bundled-catalog" &&
it.name != "hive-metastore-common" && it.name != "gcp-bundle" &&
it.name != "aliyun-bundle"
it.name != "aliyun-bundle" && it.name != "aws-bundle"
) {
dependsOn("${it.name}:build")
from("${it.name}/build/libs")
Expand Down
46 changes: 46 additions & 0 deletions bundles/aws-bundle/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar

plugins {
`maven-publish`
id("java")
alias(libs.plugins.shadow)
}

dependencies {
compileOnly(project(":catalogs:catalog-hadoop"))
compileOnly(libs.hadoop3.common)
implementation(libs.hadoop3.aws)
}

tasks.withType(ShadowJar::class.java) {
isZip64 = true
configurations = listOf(project.configurations.runtimeClasspath.get())
archiveClassifier.set("")
}

tasks.jar {
dependsOn(tasks.named("shadowJar"))
archiveClassifier.set("empty")
}

tasks.compileJava {
dependsOn(":catalogs:catalog-hadoop:runtimeJars")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.s3.fs;

import java.io.IOException;
import java.util.Map;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;

public class S3FileSystemProvider implements FileSystemProvider {
@Override
public FileSystem getFileSystem(Path path, Map<String, String> config) throws IOException {
Configuration configuration = new Configuration();
config.forEach(
(k, v) -> {
configuration.set(k.replace("gravitino.bypass.", ""), v);
});

return S3AFileSystem.newInstance(path.toUri(), configuration);
}

@Override
public String scheme() {
return "s3a";
}

@Override
public String name() {
return "s3";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

org.apache.gravitino.s3.fs.S3FileSystemProvider
6 changes: 6 additions & 0 deletions catalogs/catalog-hadoop/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ dependencies {
testImplementation(project(":integration-test-common", "testArtifacts"))
testImplementation(project(":server"))
testImplementation(project(":server-common"))
testImplementation(project(":bundles:aws-bundle"))
testImplementation(project(":bundles:gcp-bundle"))
testImplementation(project(":bundles:aliyun-bundle"))

Expand Down Expand Up @@ -161,6 +162,11 @@ tasks.test {
} else {
dependsOn(tasks.jar)
}

// this task depends on :bundles:aws-bundle:jar
dependsOn(":bundles:aws-bundle:jar")
dependsOn(":bundles:aliyun-bundle:jar")
dependsOn(":bundles:gcp-bundle:jar")
}

tasks.getByName("generateMetadataFileForMavenJavaPublication") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,24 @@ public class HadoopCatalogIT extends BaseIT {
private static final Logger LOG = LoggerFactory.getLogger(HadoopCatalogIT.class);
protected static final ContainerSuite containerSuite = ContainerSuite.getInstance();

public String metalakeName = GravitinoITUtils.genRandomName("CatalogFilesetIT_metalake");
public String catalogName = GravitinoITUtils.genRandomName("CatalogFilesetIT_catalog");
public static final String SCHEMA_PREFIX = "CatalogFilesetIT_schema";
public String schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
protected String metalakeName = GravitinoITUtils.genRandomName("CatalogFilesetIT_metalake");
protected String catalogName = GravitinoITUtils.genRandomName("CatalogFilesetIT_catalog");
public final String SCHEMA_PREFIX = "CatalogFilesetIT_schema";
protected String schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
protected static final String provider = "hadoop";
protected static GravitinoMetalake metalake;
protected static Catalog catalog;
protected static FileSystem fileSystem;
protected static String defaultBaseLocation;
protected GravitinoMetalake metalake;
protected Catalog catalog;
protected FileSystem fileSystem;
protected String defaultBaseLocation;

protected void startNecessaryContainer() {
containerSuite.startHiveContainer();
}

@BeforeAll
public void setup() throws IOException {
containerSuite.startHiveContainer();
startNecessaryContainer();

Configuration conf = new Configuration();
conf.set("fs.defaultFS", defaultBaseLocation());
fileSystem = FileSystem.get(conf);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.catalog.hadoop.integration.test;

import static org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.integration.test.container.GravitinoLocalStackContainer;
import org.apache.gravitino.integration.test.util.GravitinoITUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.shaded.org.awaitility.Awaitility;

@Tag("gravitino-docker-test")
public class HadoopS3CatalogIT extends HadoopCatalogIT {
private static final Logger LOG = LoggerFactory.getLogger(HadoopOSSCatalogIT.class);
private String bucketName = "s3-bucket-" + UUID.randomUUID().toString().replace("-", "");
private String accessKey;
private String secretKey;
private String s3Endpoint;

private GravitinoLocalStackContainer gravitinoLocalStackContainer;

@VisibleForTesting
public void startIntegrationTest() throws Exception {}

@Override
protected void startNecessaryContainer() {

containerSuite.startLocalStackContainer();
gravitinoLocalStackContainer = containerSuite.getLocalStackContainer();

Awaitility.await()
.atMost(60, TimeUnit.SECONDS)
.pollInterval(1, TimeUnit.SECONDS)
.until(
() -> {
try {
Container.ExecResult result =
gravitinoLocalStackContainer.executeInContainer(
"awslocal", "iam", "create-user", "--user-name", "anonymous");
return result.getExitCode() == 0;
} catch (Exception e) {
LOG.info("LocalStack is not ready yet for: ", e);
return false;
}
});

gravitinoLocalStackContainer.executeInContainer("awslocal", "s3", "mb", "s3://" + bucketName);

Container.ExecResult result =
gravitinoLocalStackContainer.executeInContainer(
"awslocal", "iam", "create-access-key", "--user-name", "anonymous");

gravitinoLocalStackContainer.executeInContainer(
"awslocal",
"s3api",
"put-bucket-acl",
"--bucket",
"my-test-bucket",
"--acl",
"public-read-write");

// Get access key and secret key from result
String[] lines = result.getStdout().split("\n");
accessKey = lines[3].split(":")[1].trim().substring(1, 21);
secretKey = lines[5].split(":")[1].trim().substring(1, 41);

LOG.info("Access key: " + accessKey);
LOG.info("Secret key: " + secretKey);

s3Endpoint =
String.format("http://%s:%d", gravitinoLocalStackContainer.getContainerIpAddress(), 4566);
}

@BeforeAll
public void setup() throws IOException {
copyBundleJarsToHadoop("aws-bundle");

try {
super.startIntegrationTest();
} catch (Exception e) {
throw new RuntimeException("Failed to start integration test", e);
}

startNecessaryContainer();

metalakeName = GravitinoITUtils.genRandomName("CatalogFilesetIT_metalake");
catalogName = GravitinoITUtils.genRandomName("CatalogFilesetIT_catalog");
schemaName = GravitinoITUtils.genRandomName("CatalogFilesetIT_schema");

schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
Configuration conf = new Configuration();

conf.set("fs.s3a.access.key", accessKey);
conf.set("fs.s3a.secret.key", secretKey);
conf.set("fs.s3a.endpoint", s3Endpoint);
conf.set(
"fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
fileSystem = FileSystem.get(URI.create(String.format("s3a://%s", bucketName)), conf);

createMetalake();
createCatalog();
createSchema();
}

@AfterAll
public void stop() throws IOException {
Catalog catalog = metalake.loadCatalog(catalogName);
catalog.asSchemas().dropSchema(schemaName, true);
metalake.dropCatalog(catalogName, true);
client.dropMetalake(metalakeName);

try {
closer.close();
} catch (Exception e) {
LOG.error("Failed to close CloseableGroup", e);
}
}

protected String defaultBaseLocation() {
if (defaultBaseLocation == null) {
try {
Path bucket =
new Path(
String.format(
"s3a://%s/%s", bucketName, GravitinoITUtils.genRandomName("CatalogFilesetIT")));
if (!fileSystem.exists(bucket)) {
fileSystem.mkdirs(bucket);
}

defaultBaseLocation = bucket.toString();
} catch (IOException e) {
throw new RuntimeException("Failed to create default base location", e);
}
}

return defaultBaseLocation;
}

protected void createCatalog() {
Map<String, String> map = Maps.newHashMap();
map.put("gravitino.bypass.fs.s3a.access.key", accessKey);
map.put("gravitino.bypass.fs.s3a.secret.key", secretKey);
map.put("gravitino.bypass.fs.s3a.endpoint", s3Endpoint);
map.put(
"gravitino.bypass.fs.s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
map.put(FILESYSTEM_PROVIDERS, "s3");

metalake.createCatalog(catalogName, Catalog.Type.FILESET, provider, "comment", map);

catalog = metalake.loadCatalog(catalogName);
}

protected String generateLocation(String filesetName) {
return String.format("%s/%s", defaultBaseLocation, filesetName);
}
}
Loading
Loading