Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#4981] Improvement(test): Add integration tests for Hive catalog with S3 location. #4982

Merged
merged 6 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,13 @@ allprojects {
param.environment("PROJECT_VERSION", project.version)

// Gravitino CI Docker image
param.environment("GRAVITINO_CI_HIVE_DOCKER_IMAGE", "apache/gravitino-ci:hive-0.1.13")
param.environment("GRAVITINO_CI_HIVE_DOCKER_IMAGE", "apache/gravitino-ci:hive-0.1.14")
param.environment("GRAVITINO_CI_KERBEROS_HIVE_DOCKER_IMAGE", "apache/gravitino-ci:kerberos-hive-0.1.5")
param.environment("GRAVITINO_CI_DORIS_DOCKER_IMAGE", "apache/gravitino-ci:doris-0.1.5")
param.environment("GRAVITINO_CI_TRINO_DOCKER_IMAGE", "apache/gravitino-ci:trino-0.1.6")
param.environment("GRAVITINO_CI_RANGER_DOCKER_IMAGE", "apache/gravitino-ci:ranger-0.1.1")
param.environment("GRAVITINO_CI_KAFKA_DOCKER_IMAGE", "apache/kafka:3.7.0")
param.environment("GRAVITINO_CI_LOCALSTACK_DOCKER_IMAGE", "localstack/localstack:latest")

val dockerRunning = project.rootProject.extra["dockerRunning"] as? Boolean ?: false
val macDockerConnector = project.rootProject.extra["macDockerConnector"] as? Boolean ?: false
Expand Down
2 changes: 2 additions & 0 deletions catalogs/catalog-hive/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ dependencies {
testImplementation(libs.slf4j.api)
testImplementation(libs.testcontainers)
testImplementation(libs.testcontainers.mysql)
testImplementation(libs.testcontainers.localstack)
testImplementation(libs.hadoop2.s3)

testRuntimeOnly(libs.junit.jupiter.engine)
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.catalog.hive.integration.test;

import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.gravitino.integration.test.container.GravitinoLocalStackContainer;
import org.apache.gravitino.integration.test.container.HiveContainer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
import org.testcontainers.shaded.org.awaitility.Awaitility;

public class CatalogHiveS3IT extends CatalogHiveIT {

private static final Logger LOGGER = LoggerFactory.getLogger(CatalogHiveS3IT.class);

private static final String S3_BUCKET_NAME = "my-test-bucket";
private GravitinoLocalStackContainer gravitinoLocalStackContainer;

private static final String S3_ACCESS_KEY = "S3_ACCESS_KEY";
private static final String S3_SECRET_KEY = "S3_SECRET_KEY";
private static final String S3_ENDPOINT = "S3_ENDPOINT";

private String getS3Endpoint;
private String accessKey;
private String secretKey;

@Override
protected void startNecessaryContainer() {
containerSuite.startLocalStackContainer();
gravitinoLocalStackContainer = containerSuite.getLocalStackContainer();

Awaitility.await()
.atMost(60, TimeUnit.SECONDS)
.pollInterval(1, TimeUnit.SECONDS)
.until(
() -> {
try {
Container.ExecResult result =
gravitinoLocalStackContainer.executeInContainer(
"awslocal", "iam", "create-user", "--user-name", "anonymous");
return result.getExitCode() == 0;
} catch (Exception e) {
LOGGER.info("LocalStack is not ready yet for: ", e);
return false;
}
});

gravitinoLocalStackContainer.executeInContainer(
"awslocal", "s3", "mb", "s3://" + S3_BUCKET_NAME);

Container.ExecResult result =
gravitinoLocalStackContainer.executeInContainer(
"awslocal", "iam", "create-access-key", "--user-name", "anonymous");

// Get access key and secret key from result
String[] lines = result.getStdout().split("\n");
accessKey = lines[3].split(":")[1].trim().substring(1, 21);
secretKey = lines[5].split(":")[1].trim().substring(1, 41);

LOGGER.info("Access key: " + accessKey);
LOGGER.info("Secret key: " + secretKey);

getS3Endpoint =
String.format("http://%s:%d", gravitinoLocalStackContainer.getContainerIpAddress(), 4566);

gravitinoLocalStackContainer.executeInContainer(
"awslocal",
"s3api",
"put-bucket-acl",
"--bucket",
"my-test-bucket",
"--acl",
"public-read-write");

Map<String, String> hiveContainerEnv =
ImmutableMap.of(
S3_ACCESS_KEY,
accessKey,
S3_SECRET_KEY,
secretKey,
S3_ENDPOINT,
getS3Endpoint,
HiveContainer.HIVE_RUNTIME_VERSION,
HiveContainer.HIVE3);

containerSuite.startHiveContainerWithS3(hiveContainerEnv);

HIVE_METASTORE_URIS =
String.format(
"thrift://%s:%d",
containerSuite.getHiveContainerWithS3().getContainerIpAddress(),
HiveContainer.HIVE_METASTORE_PORT);
}

@Override
protected void initFileSystem() throws IOException {
// Use S3a file system
Configuration conf = new Configuration();
conf.set("fs.s3a.access.key", accessKey);
conf.set("fs.s3a.secret.key", secretKey);
conf.set("fs.s3a.endpoint", getS3Endpoint);
conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
conf.set(
"fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
conf.set("fs.s3a.path.style.access", "true");
conf.set("fs.s3a.connection.ssl.enabled", "false");
fileSystem = FileSystem.get(URI.create("s3a://" + S3_BUCKET_NAME), conf);
}

@Override
protected void initSparkSession() {
sparkSession =
SparkSession.builder()
.master("local[1]")
.appName("Hive Catalog integration test")
.config("hive.metastore.uris", HIVE_METASTORE_URIS)
.config(
"spark.sql.warehouse.dir",
String.format(
"hdfs://%s:%d/user/hive/warehouse",
containerSuite.getHiveContainerWithS3().getContainerIpAddress(),
HiveContainer.HDFS_DEFAULTFS_PORT))
.config("spark.hadoop.fs.s3a.access.key", accessKey)
.config("spark.hadoop.fs.s3a.secret.key", secretKey)
.config("spark.hadoop.fs.s3a.endpoint", getS3Endpoint)
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.hadoop.fs.s3a.path.style.access", "true")
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
.config(
"spark.hadoop.fs.s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
.config("spark.sql.storeAssignmentPolicy", "LEGACY")
.config("mapreduce.input.fileinputformat.input.dir.recursive", "true")
.enableHiveSupport()
.getOrCreate();
}

@Override
protected Map<String, String> createSchemaProperties() {
Map<String, String> properties = new HashMap<>();
properties.put("key1", "val1");
properties.put("key2", "val2");
properties.put("location", "s3a://" + S3_BUCKET_NAME + "/test-" + System.currentTimeMillis());
return properties;
}
}
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ hadoop2-hdfs = { group = "org.apache.hadoop", name = "hadoop-hdfs", version.ref
hadoop2-hdfs-client = { group = "org.apache.hadoop", name = "hadoop-hdfs-client", version.ref = "hadoop2" }
hadoop2-common = { group = "org.apache.hadoop", name = "hadoop-common", version.ref = "hadoop2"}
hadoop2-mapreduce-client-core = { group = "org.apache.hadoop", name = "hadoop-mapreduce-client-core", version.ref = "hadoop2"}
hadoop2-s3 = { group = "org.apache.hadoop", name = "hadoop-aws", version.ref = "hadoop2"}
hadoop3-hdfs = { group = "org.apache.hadoop", name = "hadoop-hdfs", version.ref = "hadoop3" }
hadoop3-common = { group = "org.apache.hadoop", name = "hadoop-common", version.ref = "hadoop3"}
hadoop3-client = { group = "org.apache.hadoop", name = "hadoop-client", version.ref = "hadoop3"}
Expand Down Expand Up @@ -183,6 +184,7 @@ testcontainers = { group = "org.testcontainers", name = "testcontainers", versio
testcontainers-mysql = { group = "org.testcontainers", name = "mysql", version.ref = "testcontainers" }
testcontainers-postgresql = { group = "org.testcontainers", name = "postgresql", version.ref = "testcontainers" }
testcontainers-junit-jupiter = { group = "org.testcontainers", name = "junit-jupiter", version.ref = "testcontainers" }
testcontainers-localstack = { group = "org.testcontainers", name = "localstack", version.ref = "testcontainers" }
trino-jdbc = { group = "io.trino", name = "trino-jdbc", version.ref = "trino" }
jwt-api = { group = "io.jsonwebtoken", name = "jjwt-api", version.ref = "jwt"}
jwt-impl = { group = "io.jsonwebtoken", name = "jjwt-impl", version.ref = "jwt"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ public class ContainerSuite implements Closeable {
private static volatile Map<PGImageName, PostgreSQLContainer> pgContainerMap =
new EnumMap<>(PGImageName.class);

private static volatile GravitinoLocalStackContainer gravitinoLocalStackContainer;

/**
* We can share the same Hive container as Hive container with S3 contains the following
* differences: 1. Configuration of S3 and corresponding environment variables 2. The Hive
* container with S3 is Hive3 container and the Hive container is Hive2 container. There is
* something wrong with the hive2 container to access the S3.
*/
private static volatile HiveContainer hiveContainerWithS3;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot share the container even if we uniform the hive version to hive3 for the following reasons.

  1. We need to ensure the s3 environment is ready before starting the Hive container so we can share it.
  2. for the reason in #(1, we need to start localstack(S3 simulator) first, however we can't make sure CatalogHiveS3IT was the first one to execute.


protected static final CloseableGroup closer = CloseableGroup.create();

private static void initIfNecessary() {
Expand Down Expand Up @@ -112,7 +122,11 @@ public Network getNetwork() {
return network;
}

public void startHiveContainer() {
public void startHiveContainer(Map<String, String> env) {
ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
builder.putAll(env);
builder.put("HADOOP_USER_NAME", "anonymous");

if (hiveContainer == null) {
synchronized (ContainerSuite.class) {
if (hiveContainer == null) {
Expand All @@ -121,10 +135,7 @@ public void startHiveContainer() {
HiveContainer.Builder hiveBuilder =
HiveContainer.builder()
.withHostName("gravitino-ci-hive")
.withEnvVars(
ImmutableMap.<String, String>builder()
.put("HADOOP_USER_NAME", "anonymous")
.build())
.withEnvVars(builder.build())
.withNetwork(network);
HiveContainer container = closer.register(hiveBuilder.build());
container.start();
Expand All @@ -134,6 +145,33 @@ public void startHiveContainer() {
}
}

public void startHiveContainerWithS3(Map<String, String> env) {
ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
builder.putAll(env);
builder.put("HADOOP_USER_NAME", "anonymous");

if (hiveContainerWithS3 == null) {
synchronized (ContainerSuite.class) {
if (hiveContainerWithS3 == null) {
initIfNecessary();
// Start Hive container
HiveContainer.Builder hiveBuilder =
HiveContainer.builder()
.withHostName("gravitino-ci-hive")
.withEnvVars(builder.build())
.withNetwork(network);
HiveContainer container = closer.register(hiveBuilder.build());
container.start();
hiveContainerWithS3 = container;
}
}
}
}

public void startHiveContainer() {
startHiveContainer(ImmutableMap.of());
}

/**
* To start and enable Ranger plugin in Hive container, <br>
* you can specify environment variables: <br>
Expand Down Expand Up @@ -361,6 +399,33 @@ public void startKafkaContainer() {
}
}

public void startLocalStackContainer() {
if (gravitinoLocalStackContainer == null) {
synchronized (ContainerSuite.class) {
if (gravitinoLocalStackContainer == null) {
GravitinoLocalStackContainer.Builder builder =
GravitinoLocalStackContainer.builder().withNetwork(network);
GravitinoLocalStackContainer container = closer.register(builder.build());
try {
container.start();
} catch (Exception e) {
LOG.error("Failed to start LocalStack container", e);
throw new RuntimeException("Failed to start LocalStack container", e);
}
gravitinoLocalStackContainer = container;
}
}
}
}

public GravitinoLocalStackContainer getLocalStackContainer() {
return gravitinoLocalStackContainer;
}

public HiveContainer getHiveContainerWithS3() {
return hiveContainerWithS3;
}

public KafkaContainer getKafkaContainer() {
return kafkaContainer;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.integration.test.container;

import com.google.common.collect.ImmutableSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.testcontainers.containers.Network;

public class GravitinoLocalStackContainer extends BaseContainer {

public static final String DEFAULT_IMAGE = System.getenv("GRAVITINO_CI_LOCALSTACK_DOCKER_IMAGE");
public static final String HOST_NAME = "gravitino-ci-localstack";
public static final int PORT = 4566;

public GravitinoLocalStackContainer(
String image,
String hostName,
Set<Integer> ports,
Map<String, String> extraHosts,
Map<String, String> filesToMount,
Map<String, String> envVars,
Optional<Network> network) {
super(image, hostName, ports, extraHosts, filesToMount, envVars, network);
}

public static Builder builder() {
return new Builder();
}

@Override
protected boolean checkContainerStatus(int retryLimit) {
return true;
}

public static class Builder
extends BaseContainer.Builder<
GravitinoLocalStackContainer.Builder, GravitinoLocalStackContainer> {
public Builder() {
super();
this.image = DEFAULT_IMAGE;
this.hostName = HOST_NAME;
this.exposePorts = ImmutableSet.of(PORT);
}

@Override
public GravitinoLocalStackContainer build() {
return new GravitinoLocalStackContainer(
image, hostName, exposePorts, extraHosts, filesToMount, envVars, network);
}
}
}
Loading