Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1483] POC of spark-connector to query simple hive table #1521

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ prometheus = "0.16.0"
jsqlparser = "4.2"
mysql = "8.0.23"
postgresql = "42.6.0"
kyuubi = "1.8.0"
scala = "2.12.18"

protobuf-plugin = "0.9.2"
spotless-plugin = '6.11.0'
Expand Down Expand Up @@ -106,12 +108,14 @@ trino-testing= { group = "io.trino", name = "trino-testing", version.ref = "trin
trino-memory= { group = "io.trino", name = "trino-memory", version.ref = "trino" }
trino-cli= { group = "io.trino", name = "trino-cli", version.ref = "trino" }
trino-client= { group = "io.trino", name = "trino-client", version.ref = "trino" }
iceberg-spark-runtime = { group = "org.apache.iceberg", name = "iceberg-spark-runtime-3.4_2.13", version.ref = "iceberg" }
spark-sql = { group = "org.apache.spark", name = "spark-sql_2.13", version.ref = "spark" }
scala-collection-compat = { group = "org.scala-lang.modules", name = "scala-collection-compat_2.13", version.ref = "scala-collection-compat" }
iceberg-spark-runtime = { group = "org.apache.iceberg", name = "iceberg-spark-runtime-3.4_2.12", version.ref = "iceberg" }
spark-sql = { group = "org.apache.spark", name = "spark-sql_2.12", version.ref = "spark" }
spark-core = { group = "org.apache.spark", name = "spark-core_2.12", version.ref = "spark" }
spark-catalyst = { group = "org.apache.spark", name = "spark-catalyst_2.12", version.ref = "spark" }
scala-collection-compat = { group = "org.scala-lang.modules", name = "scala-collection-compat_2.12", version.ref = "scala-collection-compat" }
sqlite-jdbc = { group = "org.xerial", name = "sqlite-jdbc", version.ref = "sqlite-jdbc" }
testng = { group = "org.testng", name = "testng", version.ref = "testng" }
spark-hive = { group = "org.apache.spark", name = "spark-hive_2.13", version.ref = "spark" }
spark-hive = { group = "org.apache.spark", name = "spark-hive_2.12", version.ref = "spark" }
commons-dbcp2 = { group = "org.apache.commons", name = "commons-dbcp2", version.ref = "commons-dbcp2" }
testcontainers = { group = "org.testcontainers", name = "testcontainers", version.ref = "testcontainers" }
testcontainers-mysql = { group = "org.testcontainers", name = "mysql", version.ref = "testcontainers" }
Expand All @@ -135,6 +139,8 @@ jsqlparser = { group = "com.github.jsqlparser", name = "jsqlparser", version.ref
mysql-driver = { group = "mysql", name = "mysql-connector-java", version.ref = "mysql" }
postgresql-driver = { group = "org.postgresql", name = "postgresql", version.ref = "postgresql" }
commons-cli = { group = "commons-cli", name = "commons-cli", version.ref = "commons-cli" }
kyuubi-spark-connector = { group = "org.apache.kyuubi", name = "kyuubi-spark-connector-hive_2.12", version.ref = "kyuubi" }
scala-library = { group = "org.scala-lang", name = "scala-library", version.ref = "scala" }

[bundles]
log4j = ["slf4j-api", "log4j-slf4j2-impl", "log4j-api", "log4j-core", "log4j-12-api"]
Expand All @@ -144,6 +150,7 @@ iceberg = ["iceberg-core", "iceberg-api"]
jwt = ["jwt-api", "jwt-impl", "jwt-gson"]
metrics = ["metrics-core", "metrics-jersey2", "metrics-jvm", "metrics-jmx", "metrics-servlets"]
prometheus = ["prometheus-servlet", "prometheus-dropwizard", "prometheus-client"]
spark = ["spark-catalyst"]

[plugins]
protobuf = { id = "com.google.protobuf", version.ref = "protobuf-plugin" }
Expand All @@ -155,4 +162,4 @@ shadow = { id = "com.github.johnrengelman.shadow", version.ref = "shadow-plugin"
node = { id = "com.github.node-gradle.node", version.ref = "node-plugin" }
tasktree = {id = "com.dorongold.task-tree", version = "2.1.1"}
dependencyLicenseReport = {id = "com.github.jk1.dependency-license-report", version = "2.5"}
bom = {id = "org.cyclonedx.bom", version = "1.5.0"}
bom = {id = "org.cyclonedx.bom", version = "1.5.0"}
1 change: 1 addition & 0 deletions integration-test/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dependencies {
implementation(project(":catalogs:catalog-jdbc-common"))
implementation(project(":catalogs:catalog-jdbc-mysql"))
implementation(project(":catalogs:catalog-jdbc-postgresql"))
implementation(project(":spark-connector"))
implementation(libs.guava)
implementation(libs.bundles.log4j)
implementation(libs.bundles.jersey)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.integration.test.spark;

import static com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta.METASTORE_URIS;

import com.datastrato.gravitino.Catalog;
import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.client.GravitinoMetaLake;
import com.datastrato.gravitino.integration.test.container.ContainerSuite;
import com.datastrato.gravitino.integration.test.container.HiveContainer;
import com.datastrato.gravitino.integration.test.util.AbstractIT;
import com.datastrato.gravitino.spark.GravitinoSparkConfig;
import com.datastrato.gravitino.spark.plugin.GravitinoSparkPlugin;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@TestInstance(Lifecycle.PER_CLASS)
public class SparkBaseIT extends AbstractIT {
private static final Logger LOG = LoggerFactory.getLogger(AbstractIT.class);
private static final ContainerSuite containerSuite = ContainerSuite.getInstance();
protected SparkSession sparkSession;
private String hiveMetastoreUri;
private String gravitinoUrl;
private String metalakeName = "test";
protected String hiveCatalogName = "hive";
private Catalog hiveCatalog;

@BeforeAll
public void startup() throws Exception {
initHiveEnv();
initGravitinoEnv();
initMetalakeCatalogs();
initSparkEnv();
}

@AfterAll
void stop() {
if (sparkSession != null) {
sparkSession.close();
}
}

private void initMetalakeCatalogs() {
client.createMetalake(NameIdentifier.of(metalakeName), "comment", Collections.emptyMap());
GravitinoMetaLake metalake = client.loadMetalake(NameIdentifier.of(metalakeName));
Map<String, String> properties = Maps.newHashMap();
properties.put(METASTORE_URIS, hiveMetastoreUri);

metalake.createCatalog(
NameIdentifier.of(metalakeName, hiveCatalogName),
Catalog.Type.RELATIONAL,
"hive",
"comment",
properties);
hiveCatalog = metalake.loadCatalog(NameIdentifier.of(metalakeName, hiveCatalogName));
}

private void initGravitinoEnv() {
// Gravitino server is already started by AbstractIT
int gravitinoPort = getGravitinoServerPort();
gravitinoUrl = String.format("http://127.0.0.1:%d", gravitinoPort);
}

private void initHiveEnv() {
containerSuite.startHiveContainer();
hiveMetastoreUri =
String.format(
"thrift://%s:%d",
containerSuite.getHiveContainer().getContainerIpAddress(),
HiveContainer.HIVE_METASTORE_PORT);
}

private void initSparkEnv() {
sparkSession =
SparkSession.builder()
.master("local[1]")
.appName("Spark Hive connector integration test")
.config("spark.plugins", GravitinoSparkPlugin.class.getName())
.config(GravitinoSparkConfig.GRAVITINO_URL, gravitinoUrl)
.config(GravitinoSparkConfig.GRAVITINO_METALAKE, metalakeName)
.enableHiveSupport()
.getOrCreate();
}

protected List<Object[]> sql(String query, Object... args) {
List<Row> rows = sparkSession.sql(String.format(query, args)).collectAsList();
if (rows.isEmpty()) {
return ImmutableList.of();
}
return rowsToJava(rows);
}

protected List<Object[]> rowsToJava(List<Row> rows) {
return rows.stream().map(this::toJava).collect(Collectors.toList());
}

private Object[] toJava(Row row) {
return IntStream.range(0, row.size())
.mapToObj(
pos -> {
if (row.isNullAt(pos)) {
return null;
}
Object value = row.get(pos);
if (value instanceof Row) {
return toJava((Row) value);
} else if (value instanceof scala.collection.Seq) {
return row.getList(pos);
} else if (value instanceof scala.collection.Map) {
return row.getJavaMap(pos);
}
return value;
})
.toArray(Object[]::new);
}

/** check whether all child map content is in parent map */
protected void checkMapContains(Map<String, String> child, Map<String, String> parent) {
child.forEach(
(k, v) -> {
Assertions.assertTrue(parent.containsKey(k));
Assertions.assertEquals(v, parent.get(k));
});
}

/** mainly used to debug */
protected void printObjects(List<Object[]> objects) {
objects.stream()
.forEach(
row -> {
String oneRow =
Arrays.stream(row).map(o -> String.valueOf(o)).collect(Collectors.joining(","));
LOG.warn(oneRow);
});
}

protected Map<String, String> getTableInfo(String tableName) {
return convertToStringMap(sql("desc table extended " + tableName));
}

protected List<String> getTableColumns(String tableName) {
List<Object[]> objects = sql("desc table extended " + tableName);
List<String> columns = new ArrayList<>();
objects.stream()
.anyMatch(
row -> {
String columName = (String) row[0];
if (StringUtils.isNoneBlank(columName)) {
columns.add(columName);
return false;
}
return true;
});
return columns;
}

protected Set<String> convertToStringSet(List<Object[]> objects, int index) {
return objects.stream().map(row -> String.valueOf(row[index])).collect(Collectors.toSet());
}

protected List<String> convertToStringList(List<Object[]> objects, int index) {
return objects.stream().map(row -> String.valueOf(row[index])).collect(Collectors.toList());
}

protected Map<String, String> convertToStringMap(List<Object[]> objects) {
return objects.stream()
.collect(
Collectors.toMap(
row -> String.valueOf(row[0]),
row -> String.valueOf(row[1]),
(oldValue, newValue) -> oldValue));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright 2023 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.integration.test.spark;

import java.util.Set;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@Tag("gravitino-docker-it")
public class SparkIT extends SparkBaseIT {

@BeforeEach
void init() {
sparkSession.sql("use " + hiveCatalogName);
}

@Test
public void testLoadCatalogs() {
Set<String> catalogs = convertToStringSet(sql("show catalogs"), 0);
catalogs.forEach(k -> System.out.println(k));
Assertions.assertTrue(catalogs.contains(hiveCatalogName));
}

public void testFunction() {
sparkSession.sql("select current_date(), unix_timestamp();").show();
}

public void testView() {
sparkSession.sql("create database if not exists v");
sparkSession.sql("use f");
// sparkSession.sql("create GLOBAL TEMPORARY VIEW IF NOT EXISTS view_student as select * from
// student limit 2;");
// sparkSession.sql("create view view_student1 as select * from student limit 2;");
// sparkSession.sql("select * from view_student1").show();
}

public void testFederatinoQuery() {
sparkSession.sql("use hive");
sparkSession.sql("create database if not exists f");
sparkSession.sql("drop table if exists f.student");
sparkSession.sql("CREATE TABLE f.student (id INT, name STRING, age INT)").show();
sparkSession.sql("INSERT into f.student VALUES(0, 'aa', 10), (1,'bb', 12);").show();
sparkSession.sql("desc table EXTENDED f.student").show();

sparkSession.sql("create database if not exists hive1.f1");
sparkSession.sql("drop table if exists hive1.f1.scores");
sparkSession.sql("CREATE TABLE hive1.f1.scores (id INT, score INT)").show();
sparkSession.sql("desc table EXTENDED hive1.f1.scores").show();
sparkSession.sql("INSERT into hive1.f1.scores VALUES(0, 100), (1, 98)").show();

sparkSession
.sql(
"select f.student.id, name, age, score from hive.f.student JOIN hive1.f1.scores ON f.student.id = hive1.f1.scores.id")
.show();
}

public void testTestCreateDatabase() {
sparkSession.sql("create database if not exists db_create2");
sparkSession.sql("show databases").show();
}

public void testCreateDatasourceTable() {
sparkSession
.sql(
"CREATE TABLE student_parquet(id INT, name STRING, age INT) USING PARQUET"
+ " OPTIONS ('parquet.bloom.filter.enabled'='true', "
+ "'parquet.bloom.filter.enabled#age'='false');")
.show();
}

public void testCreateHiveTable() {
sparkSession.sql("use default");
sparkSession.sql("drop table if exists student");
sparkSession.sql("drop table if exists student1");
sparkSession.sql(
"CREATE TABLE default.student (id INT, name STRING, age INT)\n"
// + " USING CSV\n"
+ " PARTITIONED BY (age)\n"
+ " CLUSTERED BY (Id) SORTED BY (name) INTO 4 buckets ROW FORMAT DELIMITED FIELDS TERMINATED BY ','\n"
+ " STORED AS TEXTFILE TBLPROPERTIES ('foo'='bar')\n"
+ " LOCATION '/tmp/family/' \n"
+ " COMMENT 'this is a comment';");
sparkSession.sql("create table student1 as select * from default.student").show();
}

public void testHiveDML() {
sparkSession.sql("create database if not exists db");
sparkSession.sql("drop table if exists db.student");
sparkSession.sql("use db");
sparkSession.sql("CREATE TABLE student (id INT, name STRING, age INT)").show();
sparkSession.sql("desc db.student").show();
sparkSession.sql("INSERT into db.student VALUES(0, 'aa', 10), (1,'bb', 12);").show();
sparkSession.sql("drop table if exists db.student1");
sparkSession.sql("create table db.student1 as select * from db.student limit 1");
sparkSession.sql("INSERT into db.student1 select * from db.student limit 1");
sparkSession.sql("select * from db.student1;").show();
}

/*
@Test
public void testSpark() {
sparkSession.sql(
"CREATE TABLE if NOT EXISTS sales ( id INT, name STRING, age INT ) PARTITIONED BY (country STRING, state STRING)");
sparkSession.sql("desc table extended sales").show();
sparkSession.sql(
"INSERT INTO sales PARTITION (country='USA', state='CA') VALUES (1, 'John', 25);");
sparkSession
.sql("INSERT INTO sales PARTITION (country='Canada', state='ON') VALUES (2, 'Alice', 30);")
.explain("extended");
// sparkSession.sql("select * from sales where country = 'USA'").explain("extended");

// insert into select xx
sparkSession.sql(
"CREATE TABLE IF NOT EXISTS target_table (id INT, name STRING, age INT) PARTITIONED BY (p INT)");
sparkSession
.sql(
"INSERT INTO target_table PARTITION ( p = 1 ) SELECT id, name, age FROM sales WHERE country='USA' AND state='CA'")
.explain("extended");
sparkSession.sql("select * from target_table").show();

// create table as select
// sparkSession.sql("CREATE TABLE IF NOT EXISTS target_table2 as select * from
// sales").explain("formatted");
sparkSession
.sql("CREATE TABLE IF NOT EXISTS target_table2 as select * from sales")
.explain("extended");
}
*/
}
1 change: 1 addition & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ include("api", "common", "core", "meta", "server", "integration-test", "server-c
include("catalogs:bundled-catalog", "catalogs:catalog-hive", "catalogs:catalog-lakehouse-iceberg", "catalogs:catalog-jdbc-common", "catalogs:catalog-jdbc-mysql", "catalogs:catalog-jdbc-postgresql")
include("clients:client-java", "clients:client-java-runtime")
include("trino-connector")
include("spark-connector")
include("web")
include("docs")
Loading
Loading