Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into test-sample
Browse files Browse the repository at this point in the history
  • Loading branch information
jun-ma committed Sep 26, 2024
2 parents 53ac6be + eaffd52 commit 489e73c
Show file tree
Hide file tree
Showing 103 changed files with 2,472 additions and 323 deletions.
3 changes: 2 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,13 @@ allprojects {
param.environment("PROJECT_VERSION", project.version)

// Gravitino CI Docker image
param.environment("GRAVITINO_CI_HIVE_DOCKER_IMAGE", "apache/gravitino-ci:hive-0.1.13")
param.environment("GRAVITINO_CI_HIVE_DOCKER_IMAGE", "apache/gravitino-ci:hive-0.1.14")
param.environment("GRAVITINO_CI_KERBEROS_HIVE_DOCKER_IMAGE", "apache/gravitino-ci:kerberos-hive-0.1.5")
param.environment("GRAVITINO_CI_DORIS_DOCKER_IMAGE", "apache/gravitino-ci:doris-0.1.5")
param.environment("GRAVITINO_CI_TRINO_DOCKER_IMAGE", "apache/gravitino-ci:trino-0.1.6")
param.environment("GRAVITINO_CI_RANGER_DOCKER_IMAGE", "apache/gravitino-ci:ranger-0.1.1")
param.environment("GRAVITINO_CI_KAFKA_DOCKER_IMAGE", "apache/kafka:3.7.0")
param.environment("GRAVITINO_CI_LOCALSTACK_DOCKER_IMAGE", "localstack/localstack:latest")

val dockerRunning = project.rootProject.extra["dockerRunning"] as? Boolean ?: false
val macDockerConnector = project.rootProject.extra["macDockerConnector"] as? Boolean ?: false
Expand Down
2 changes: 2 additions & 0 deletions catalogs/catalog-hive/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ dependencies {
testImplementation(libs.slf4j.api)
testImplementation(libs.testcontainers)
testImplementation(libs.testcontainers.mysql)
testImplementation(libs.testcontainers.localstack)
testImplementation(libs.hadoop2.s3)

testRuntimeOnly(libs.junit.jupiter.engine)
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.catalog.hive.integration.test;

import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.gravitino.integration.test.container.GravitinoLocalStackContainer;
import org.apache.gravitino.integration.test.container.HiveContainer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
import org.testcontainers.shaded.org.awaitility.Awaitility;

public class CatalogHiveS3IT extends CatalogHiveIT {

private static final Logger LOGGER = LoggerFactory.getLogger(CatalogHiveS3IT.class);

private static final String S3_BUCKET_NAME = "my-test-bucket";
private GravitinoLocalStackContainer gravitinoLocalStackContainer;

private static final String S3_ACCESS_KEY = "S3_ACCESS_KEY";
private static final String S3_SECRET_KEY = "S3_SECRET_KEY";
private static final String S3_ENDPOINT = "S3_ENDPOINT";

private String getS3Endpoint;
private String accessKey;
private String secretKey;

@Override
protected void startNecessaryContainer() {
containerSuite.startLocalStackContainer();
gravitinoLocalStackContainer = containerSuite.getLocalStackContainer();

Awaitility.await()
.atMost(60, TimeUnit.SECONDS)
.pollInterval(1, TimeUnit.SECONDS)
.until(
() -> {
try {
Container.ExecResult result =
gravitinoLocalStackContainer.executeInContainer(
"awslocal", "iam", "create-user", "--user-name", "anonymous");
return result.getExitCode() == 0;
} catch (Exception e) {
LOGGER.info("LocalStack is not ready yet for: ", e);
return false;
}
});

gravitinoLocalStackContainer.executeInContainer(
"awslocal", "s3", "mb", "s3://" + S3_BUCKET_NAME);

Container.ExecResult result =
gravitinoLocalStackContainer.executeInContainer(
"awslocal", "iam", "create-access-key", "--user-name", "anonymous");

// Get access key and secret key from result
String[] lines = result.getStdout().split("\n");
accessKey = lines[3].split(":")[1].trim().substring(1, 21);
secretKey = lines[5].split(":")[1].trim().substring(1, 41);

LOGGER.info("Access key: " + accessKey);
LOGGER.info("Secret key: " + secretKey);

getS3Endpoint =
String.format("http://%s:%d", gravitinoLocalStackContainer.getContainerIpAddress(), 4566);

gravitinoLocalStackContainer.executeInContainer(
"awslocal",
"s3api",
"put-bucket-acl",
"--bucket",
"my-test-bucket",
"--acl",
"public-read-write");

Map<String, String> hiveContainerEnv =
ImmutableMap.of(
S3_ACCESS_KEY,
accessKey,
S3_SECRET_KEY,
secretKey,
S3_ENDPOINT,
getS3Endpoint,
HiveContainer.HIVE_RUNTIME_VERSION,
HiveContainer.HIVE3);

containerSuite.startHiveContainerWithS3(hiveContainerEnv);

HIVE_METASTORE_URIS =
String.format(
"thrift://%s:%d",
containerSuite.getHiveContainerWithS3().getContainerIpAddress(),
HiveContainer.HIVE_METASTORE_PORT);
}

@Override
protected void initFileSystem() throws IOException {
// Use S3a file system
Configuration conf = new Configuration();
conf.set("fs.s3a.access.key", accessKey);
conf.set("fs.s3a.secret.key", secretKey);
conf.set("fs.s3a.endpoint", getS3Endpoint);
conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
conf.set(
"fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
conf.set("fs.s3a.path.style.access", "true");
conf.set("fs.s3a.connection.ssl.enabled", "false");
fileSystem = FileSystem.get(URI.create("s3a://" + S3_BUCKET_NAME), conf);
}

@Override
protected void initSparkSession() {
sparkSession =
SparkSession.builder()
.master("local[1]")
.appName("Hive Catalog integration test")
.config("hive.metastore.uris", HIVE_METASTORE_URIS)
.config(
"spark.sql.warehouse.dir",
String.format(
"hdfs://%s:%d/user/hive/warehouse",
containerSuite.getHiveContainerWithS3().getContainerIpAddress(),
HiveContainer.HDFS_DEFAULTFS_PORT))
.config("spark.hadoop.fs.s3a.access.key", accessKey)
.config("spark.hadoop.fs.s3a.secret.key", secretKey)
.config("spark.hadoop.fs.s3a.endpoint", getS3Endpoint)
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.hadoop.fs.s3a.path.style.access", "true")
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
.config(
"spark.hadoop.fs.s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
.config("spark.sql.storeAssignmentPolicy", "LEGACY")
.config("mapreduce.input.fileinputformat.input.dir.recursive", "true")
.enableHiveSupport()
.getOrCreate();
}

@Override
protected Map<String, String> createSchemaProperties() {
Map<String, String> properties = new HashMap<>();
properties.put("key1", "val1");
properties.put("key2", "val2");
properties.put("location", "s3a://" + S3_BUCKET_NAME + "/test-" + System.currentTimeMillis());
return properties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,26 @@ public User getUser(String user) throws NoSuchUserException, NoSuchMetalakeExcep
return getMetalake().getUser(user);
}

/**
* Lists the users.
*
* @return The User list.
* @throws NoSuchMetalakeException If the Metalake with the given name does not exist.
*/
public User[] listUsers() {
return getMetalake().listUsers();
}

/**
* Lists the usernames.
*
* @return The username list.
* @throws NoSuchMetalakeException If the Metalake with the given name does not exist.
*/
public String[] listUserNames() {
return getMetalake().listUserNames();
}

/**
* Adds a new Group.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.gravitino.dto.responses.SetResponse;
import org.apache.gravitino.dto.responses.TagListResponse;
import org.apache.gravitino.dto.responses.TagResponse;
import org.apache.gravitino.dto.responses.UserListResponse;
import org.apache.gravitino.dto.responses.UserResponse;
import org.apache.gravitino.exceptions.CatalogAlreadyExistsException;
import org.apache.gravitino.exceptions.GroupAlreadyExistsException;
Expand Down Expand Up @@ -515,6 +516,46 @@ public User getUser(String user) throws NoSuchUserException, NoSuchMetalakeExcep
return resp.getUser();
}

/**
* Lists the users.
*
* @return The User list.
* @throws NoSuchMetalakeException If the Metalake with the given name does not exist.
*/
public User[] listUsers() throws NoSuchMetalakeException {
Map<String, String> params = new HashMap<>();
params.put("details", "true");

UserListResponse resp =
restClient.get(
String.format(API_METALAKES_USERS_PATH, name(), BLANK_PLACE_HOLDER),
params,
UserListResponse.class,
Collections.emptyMap(),
ErrorHandlers.userErrorHandler());
resp.validate();

return resp.getUsers();
}

/**
* Lists the usernames.
*
* @return The username list.
* @throws NoSuchMetalakeException If the Metalake with the given name does not exist.
*/
public String[] listUserNames() throws NoSuchMetalakeException {
NameListResponse resp =
restClient.get(
String.format(API_METALAKES_USERS_PATH, name(), BLANK_PLACE_HOLDER),
NameListResponse.class,
Collections.emptyMap(),
ErrorHandlers.userErrorHandler());
resp.validate();

return resp.getNames();
}

/**
* Adds a new Group.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import static org.apache.hc.core5.http.HttpStatus.SC_SERVER_ERROR;

import java.time.Instant;
import java.util.Collections;
import java.util.Map;
import org.apache.gravitino.authorization.Group;
import org.apache.gravitino.authorization.User;
import org.apache.gravitino.dto.AuditDTO;
Expand All @@ -35,7 +37,9 @@
import org.apache.gravitino.dto.responses.ErrorResponse;
import org.apache.gravitino.dto.responses.GroupResponse;
import org.apache.gravitino.dto.responses.MetalakeResponse;
import org.apache.gravitino.dto.responses.NameListResponse;
import org.apache.gravitino.dto.responses.RemoveResponse;
import org.apache.gravitino.dto.responses.UserListResponse;
import org.apache.gravitino.dto.responses.UserResponse;
import org.apache.gravitino.exceptions.GroupAlreadyExistsException;
import org.apache.gravitino.exceptions.NoSuchGroupException;
Expand Down Expand Up @@ -175,6 +179,56 @@ public void testRemoveUsers() throws Exception {
Assertions.assertThrows(RuntimeException.class, () -> gravitinoClient.removeUser(username));
}

@Test
public void testListUserNames() throws Exception {
String userPath = withSlash(String.format(API_METALAKES_USERS_PATH, metalakeName, ""));

NameListResponse listResponse = new NameListResponse(new String[] {"user1", "user2"});
buildMockResource(Method.GET, userPath, null, listResponse, SC_OK);

Assertions.assertArrayEquals(new String[] {"user1", "user2"}, gravitinoClient.listUserNames());

ErrorResponse errRespNoMetalake =
ErrorResponse.notFound(NoSuchMetalakeException.class.getSimpleName(), "metalake not found");
buildMockResource(Method.GET, userPath, null, errRespNoMetalake, SC_NOT_FOUND);
Exception ex =
Assertions.assertThrows(
NoSuchMetalakeException.class, () -> gravitinoClient.listUserNames());
Assertions.assertEquals("metalake not found", ex.getMessage());

// Test RuntimeException
ErrorResponse errResp = ErrorResponse.internalError("internal error");
buildMockResource(Method.GET, userPath, null, errResp, SC_SERVER_ERROR);
Assertions.assertThrows(RuntimeException.class, () -> gravitinoClient.listUserNames());
}

@Test
public void testListUsers() throws Exception {
String userPath = withSlash(String.format(API_METALAKES_USERS_PATH, metalakeName, ""));
UserDTO user1 = mockUserDTO("user1");
UserDTO user2 = mockUserDTO("user2");
Map<String, String> params = Collections.singletonMap("details", "true");
UserListResponse listResponse = new UserListResponse(new UserDTO[] {user1, user2});
buildMockResource(Method.GET, userPath, params, null, listResponse, SC_OK);

User[] users = gravitinoClient.listUsers();
Assertions.assertEquals(2, users.length);
assertUser(user1, users[0]);
assertUser(user2, users[1]);

ErrorResponse errRespNoMetalake =
ErrorResponse.notFound(NoSuchMetalakeException.class.getSimpleName(), "metalake not found");
buildMockResource(Method.GET, userPath, params, null, errRespNoMetalake, SC_NOT_FOUND);
Exception ex =
Assertions.assertThrows(NoSuchMetalakeException.class, () -> gravitinoClient.listUsers());
Assertions.assertEquals("metalake not found", ex.getMessage());

// Test RuntimeException
ErrorResponse errResp = ErrorResponse.internalError("internal error");
buildMockResource(Method.GET, userPath, params, null, errResp, SC_SERVER_ERROR);
Assertions.assertThrows(RuntimeException.class, () -> gravitinoClient.listUsers());
}

@Test
public void testAddGroups() throws Exception {
String groupName = "group";
Expand Down
Loading

0 comments on commit 489e73c

Please sign in to comment.