Skip to content

Commit

Permalink
sparkit
Browse files Browse the repository at this point in the history
  • Loading branch information
FANNG1 committed Jan 18, 2024
1 parent b4f4ef6 commit 1abfa08
Show file tree
Hide file tree
Showing 15 changed files with 979 additions and 5 deletions.
17 changes: 12 additions & 5 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ prometheus = "0.16.0"
jsqlparser = "4.2"
mysql = "8.0.23"
postgresql = "42.6.0"
kyuubi = "1.8.0"
scala = "2.12.18"

protobuf-plugin = "0.9.2"
spotless-plugin = '6.11.0'
Expand Down Expand Up @@ -106,12 +108,14 @@ trino-testing= { group = "io.trino", name = "trino-testing", version.ref = "trin
trino-memory= { group = "io.trino", name = "trino-memory", version.ref = "trino" }
trino-cli= { group = "io.trino", name = "trino-cli", version.ref = "trino" }
trino-client= { group = "io.trino", name = "trino-client", version.ref = "trino" }
iceberg-spark-runtime = { group = "org.apache.iceberg", name = "iceberg-spark-runtime-3.4_2.13", version.ref = "iceberg" }
spark-sql = { group = "org.apache.spark", name = "spark-sql_2.13", version.ref = "spark" }
scala-collection-compat = { group = "org.scala-lang.modules", name = "scala-collection-compat_2.13", version.ref = "scala-collection-compat" }
iceberg-spark-runtime = { group = "org.apache.iceberg", name = "iceberg-spark-runtime-3.4_2.12", version.ref = "iceberg" }
spark-sql = { group = "org.apache.spark", name = "spark-sql_2.12", version.ref = "spark" }
spark-core = { group = "org.apache.spark", name = "spark-core_2.12", version.ref = "spark" }
spark-catalyst = { group = "org.apache.spark", name = "spark-catalyst_2.12", version.ref = "spark" }
scala-collection-compat = { group = "org.scala-lang.modules", name = "scala-collection-compat_2.12", version.ref = "scala-collection-compat" }
sqlite-jdbc = { group = "org.xerial", name = "sqlite-jdbc", version.ref = "sqlite-jdbc" }
testng = { group = "org.testng", name = "testng", version.ref = "testng" }
spark-hive = { group = "org.apache.spark", name = "spark-hive_2.13", version.ref = "spark" }
spark-hive = { group = "org.apache.spark", name = "spark-hive_2.12", version.ref = "spark" }
commons-dbcp2 = { group = "org.apache.commons", name = "commons-dbcp2", version.ref = "commons-dbcp2" }
testcontainers = { group = "org.testcontainers", name = "testcontainers", version.ref = "testcontainers" }
testcontainers-mysql = { group = "org.testcontainers", name = "mysql", version.ref = "testcontainers" }
Expand All @@ -135,6 +139,8 @@ jsqlparser = { group = "com.github.jsqlparser", name = "jsqlparser", version.ref
mysql-driver = { group = "mysql", name = "mysql-connector-java", version.ref = "mysql" }
postgresql-driver = { group = "org.postgresql", name = "postgresql", version.ref = "postgresql" }
commons-cli = { group = "commons-cli", name = "commons-cli", version.ref = "commons-cli" }
kyuubi-spark-connector = { group = "org.apache.kyuubi", name = "kyuubi-spark-connector-hive_2.12", version.ref = "kyuubi" }
scala-library = { group = "org.scala-lang", name = "scala-library", version.ref = "scala" }

[bundles]
log4j = ["slf4j-api", "log4j-slf4j2-impl", "log4j-api", "log4j-core", "log4j-12-api"]
Expand All @@ -144,6 +150,7 @@ iceberg = ["iceberg-core", "iceberg-api"]
jwt = ["jwt-api", "jwt-impl", "jwt-gson"]
metrics = ["metrics-core", "metrics-jersey2", "metrics-jvm", "metrics-jmx", "metrics-servlets"]
prometheus = ["prometheus-servlet", "prometheus-dropwizard", "prometheus-client"]
spark = ["spark-catalyst"]

[plugins]
protobuf = { id = "com.google.protobuf", version.ref = "protobuf-plugin" }
Expand All @@ -155,4 +162,4 @@ shadow = { id = "com.github.johnrengelman.shadow", version.ref = "shadow-plugin"
node = { id = "com.github.node-gradle.node", version.ref = "node-plugin" }
tasktree = {id = "com.dorongold.task-tree", version = "2.1.1"}
dependencyLicenseReport = {id = "com.github.jk1.dependency-license-report", version = "2.5"}
bom = {id = "org.cyclonedx.bom", version = "1.5.0"}
bom = {id = "org.cyclonedx.bom", version = "1.5.0"}
1 change: 1 addition & 0 deletions integration-test/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dependencies {
implementation(project(":catalogs:catalog-jdbc-common"))
implementation(project(":catalogs:catalog-jdbc-mysql"))
implementation(project(":catalogs:catalog-jdbc-postgresql"))
implementation(project(":spark-connector"))
implementation(libs.guava)
implementation(libs.bundles.log4j)
implementation(libs.bundles.jersey)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.integration.test.spark;

import static com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta.METASTORE_URIS;

import com.datastrato.gravitino.Catalog;
import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.client.GravitinoMetaLake;
import com.datastrato.gravitino.integration.test.container.ContainerSuite;
import com.datastrato.gravitino.integration.test.container.HiveContainer;
import com.datastrato.gravitino.integration.test.util.AbstractIT;
import com.datastrato.gravitino.spark.GravitinoSparkConfig;
import com.datastrato.gravitino.spark.plugin.GravitinoSparkPlugin;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@TestInstance(Lifecycle.PER_CLASS)
public class SparkBaseIT extends AbstractIT {
private static final Logger LOG = LoggerFactory.getLogger(AbstractIT.class);
private static final ContainerSuite containerSuite = ContainerSuite.getInstance();
protected SparkSession sparkSession;
private String hiveMetastoreUri;
private String gravitinoUrl;
private String metalakeName = "test";
protected String hiveCatalogName = "hive";
private Catalog hiveCatalog;

@BeforeAll
public void startup() throws Exception {
initHiveEnv();
initGravitinoEnv();
initMetalakeCatalogs();
initSparkEnv();
}

@AfterAll
void stop() {
if (sparkSession != null) {
sparkSession.close();
}
}

private void initMetalakeCatalogs() {
client.createMetalake(NameIdentifier.of(metalakeName), "comment", Collections.emptyMap());
GravitinoMetaLake metalake = client.loadMetalake(NameIdentifier.of(metalakeName));
Map<String, String> properties = Maps.newHashMap();
properties.put(METASTORE_URIS, hiveMetastoreUri);

metalake.createCatalog(
NameIdentifier.of(metalakeName, hiveCatalogName),
Catalog.Type.RELATIONAL,
"hive",
"comment",
properties);
hiveCatalog = metalake.loadCatalog(NameIdentifier.of(metalakeName, hiveCatalogName));
}

private void initGravitinoEnv() {
// Gravitino server is already started by AbstractIT
int gravitinoPort = getGravitinoServerPort();
gravitinoUrl = String.format("http://127.0.0.1:%d", gravitinoPort);
}

private void initHiveEnv() {
containerSuite.startHiveContainer();
hiveMetastoreUri =
String.format(
"thrift://%s:%d",
containerSuite.getHiveContainer().getContainerIpAddress(),
HiveContainer.HIVE_METASTORE_PORT);
}

private void initSparkEnv() {
sparkSession =
SparkSession.builder()
.master("local[1]")
.appName("Spark Hive connector integration test")
.config("spark.plugins", GravitinoSparkPlugin.class.getName())
.config(GravitinoSparkConfig.GRAVITINO_URL, gravitinoUrl)
.config(GravitinoSparkConfig.GRAVITINO_METALAKE, metalakeName)
.enableHiveSupport()
.getOrCreate();
}

protected List<Object[]> sql(String query, Object... args) {
List<Row> rows = sparkSession.sql(String.format(query, args)).collectAsList();
if (rows.isEmpty()) {
return ImmutableList.of();
}
return rowsToJava(rows);
}

protected List<Object[]> rowsToJava(List<Row> rows) {
return rows.stream().map(this::toJava).collect(Collectors.toList());
}

private Object[] toJava(Row row) {
return IntStream.range(0, row.size())
.mapToObj(
pos -> {
if (row.isNullAt(pos)) {
return null;
}
Object value = row.get(pos);
if (value instanceof Row) {
return toJava((Row) value);
} else if (value instanceof scala.collection.Seq) {
return row.getList(pos);
} else if (value instanceof scala.collection.Map) {
return row.getJavaMap(pos);
}
return value;
})
.toArray(Object[]::new);
}

/** check whether all child map content is in parent map */
protected void checkMapContains(Map<String, String> child, Map<String, String> parent) {
child.forEach(
(k, v) -> {
Assertions.assertTrue(parent.containsKey(k));
Assertions.assertEquals(v, parent.get(k));
});
}

/** mainly used to debug */
protected void printObjects(List<Object[]> objects) {
objects.stream()
.forEach(
row -> {
String oneRow =
Arrays.stream(row).map(o -> String.valueOf(o)).collect(Collectors.joining(","));
LOG.warn(oneRow);
});
}

protected Map<String, String> getTableInfo(String tableName) {
return convertToStringMap(sql("desc table extended " + tableName));
}

protected List<String> getTableColumns(String tableName) {
List<Object[]> objects = sql("desc table extended " + tableName);
List<String> columns = new ArrayList<>();
objects.stream()
.anyMatch(
row -> {
String columName = (String) row[0];
if (StringUtils.isNoneBlank(columName)) {
columns.add(columName);
return false;
}
return true;
});
return columns;
}

protected Set<String> convertToStringSet(List<Object[]> objects, int index) {
return objects.stream().map(row -> String.valueOf(row[index])).collect(Collectors.toSet());
}

protected List<String> convertToStringList(List<Object[]> objects, int index) {
return objects.stream().map(row -> String.valueOf(row[index])).collect(Collectors.toList());
}

protected Map<String, String> convertToStringMap(List<Object[]> objects) {
return objects.stream()
.collect(
Collectors.toMap(
row -> String.valueOf(row[0]),
row -> String.valueOf(row[1]),
(oldValue, newValue) -> oldValue));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright 2023 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.integration.test.spark;

import java.util.Set;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@Tag("gravitino-docker-it")
public class SparkIT extends SparkBaseIT {

@BeforeEach
void init() {
sparkSession.sql("use " + hiveCatalogName);
}

@Test
public void testLoadCatalogs() {
Set<String> catalogs = convertToStringSet(sql("show catalogs"), 0);
catalogs.forEach(k -> System.out.println(k));
Assertions.assertTrue(catalogs.contains(hiveCatalogName));
}

public void testFunction() {
sparkSession.sql("select current_date(), unix_timestamp();").show();
}

public void testView() {
sparkSession.sql("create database if not exists v");
sparkSession.sql("use f");
// sparkSession.sql("create GLOBAL TEMPORARY VIEW IF NOT EXISTS view_student as select * from
// student limit 2;");
// sparkSession.sql("create view view_student1 as select * from student limit 2;");
// sparkSession.sql("select * from view_student1").show();
}

public void testFederatinoQuery() {
sparkSession.sql("use hive");
sparkSession.sql("create database if not exists f");
sparkSession.sql("drop table if exists f.student");
sparkSession.sql("CREATE TABLE f.student (id INT, name STRING, age INT)").show();
sparkSession.sql("INSERT into f.student VALUES(0, 'aa', 10), (1,'bb', 12);").show();
sparkSession.sql("desc table EXTENDED f.student").show();

sparkSession.sql("create database if not exists hive1.f1");
sparkSession.sql("drop table if exists hive1.f1.scores");
sparkSession.sql("CREATE TABLE hive1.f1.scores (id INT, score INT)").show();
sparkSession.sql("desc table EXTENDED hive1.f1.scores").show();
sparkSession.sql("INSERT into hive1.f1.scores VALUES(0, 100), (1, 98)").show();

sparkSession
.sql(
"select f.student.id, name, age, score from hive.f.student JOIN hive1.f1.scores ON f.student.id = hive1.f1.scores.id")
.show();
}

public void testTestCreateDatabase() {
sparkSession.sql("create database if not exists db_create2");
sparkSession.sql("show databases").show();
}

public void testCreateDatasourceTable() {
sparkSession
.sql(
"CREATE TABLE student_parquet(id INT, name STRING, age INT) USING PARQUET"
+ " OPTIONS ('parquet.bloom.filter.enabled'='true', "
+ "'parquet.bloom.filter.enabled#age'='false');")
.show();
}

public void testCreateHiveTable() {
sparkSession.sql("use default");
sparkSession.sql("drop table if exists student");
sparkSession.sql("drop table if exists student1");
sparkSession.sql(
"CREATE TABLE default.student (id INT, name STRING, age INT)\n"
// + " USING CSV\n"
+ " PARTITIONED BY (age)\n"
+ " CLUSTERED BY (Id) SORTED BY (name) INTO 4 buckets ROW FORMAT DELIMITED FIELDS TERMINATED BY ','\n"
+ " STORED AS TEXTFILE TBLPROPERTIES ('foo'='bar')\n"
+ " LOCATION '/tmp/family/' \n"
+ " COMMENT 'this is a comment';");
sparkSession.sql("create table student1 as select * from default.student").show();
}

public void testHiveDML() {
sparkSession.sql("create database if not exists db");
sparkSession.sql("drop table if exists db.student");
sparkSession.sql("use db");
sparkSession.sql("CREATE TABLE student (id INT, name STRING, age INT)").show();
sparkSession.sql("desc db.student").show();
sparkSession.sql("INSERT into db.student VALUES(0, 'aa', 10), (1,'bb', 12);").show();
sparkSession.sql("drop table if exists db.student1");
sparkSession.sql("create table db.student1 as select * from db.student limit 1");
sparkSession.sql("INSERT into db.student1 select * from db.student limit 1");
sparkSession.sql("select * from db.student1;").show();
}

/*
@Test
public void testSpark() {
sparkSession.sql(
"CREATE TABLE if NOT EXISTS sales ( id INT, name STRING, age INT ) PARTITIONED BY (country STRING, state STRING)");
sparkSession.sql("desc table extended sales").show();
sparkSession.sql(
"INSERT INTO sales PARTITION (country='USA', state='CA') VALUES (1, 'John', 25);");
sparkSession
.sql("INSERT INTO sales PARTITION (country='Canada', state='ON') VALUES (2, 'Alice', 30);")
.explain("extended");
// sparkSession.sql("select * from sales where country = 'USA'").explain("extended");
// insert into select xx
sparkSession.sql(
"CREATE TABLE IF NOT EXISTS target_table (id INT, name STRING, age INT) PARTITIONED BY (p INT)");
sparkSession
.sql(
"INSERT INTO target_table PARTITION ( p = 1 ) SELECT id, name, age FROM sales WHERE country='USA' AND state='CA'")
.explain("extended");
sparkSession.sql("select * from target_table").show();
// create table as select
// sparkSession.sql("CREATE TABLE IF NOT EXISTS target_table2 as select * from
// sales").explain("formatted");
sparkSession
.sql("CREATE TABLE IF NOT EXISTS target_table2 as select * from sales")
.explain("extended");
}
*/
}
1 change: 1 addition & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ include("api", "common", "core", "meta", "server", "integration-test", "server-c
include("catalogs:bundled-catalog", "catalogs:catalog-hive", "catalogs:catalog-lakehouse-iceberg", "catalogs:catalog-jdbc-common", "catalogs:catalog-jdbc-mysql", "catalogs:catalog-jdbc-postgresql")
include("clients:client-java", "clients:client-java-runtime")
include("trino-connector")
include("spark-connector")
include("web")
include("docs")
Loading

0 comments on commit 1abfa08

Please sign in to comment.