Skip to content

Commit

Permalink
xxx
Browse files Browse the repository at this point in the history
  • Loading branch information
FANNG1 committed Jan 17, 2024
1 parent 5502e9b commit e8dc07a
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

@Tag("gravitino-docker-it")
public class SparkIT {
private static String HIVE_METASTORE_URIS = "thrift://127.0.0.1:9083";
// private static final ContainerSuite containerSuite = ContainerSuite.getInstance();
private static SparkSession sparkSession;

@BeforeAll
Expand All @@ -39,29 +37,70 @@ void init() {
sparkSession.sql("use hive");
}

// hive and hive1 are two catalogs with different metastore uri
@Test
public void testLoadCatalogs() {
Assertions.assertDoesNotThrow(
() -> {
sparkSession.sql("use hive");
sparkSession.sql("use hive1");
sparkSession.sql("show catalogs").show();
});
}

@Test
public void testFederatinoQuery() {
sparkSession.sql("use hive");
sparkSession.sql("create database if not exists f");
sparkSession.sql("drop table if exists f.student");
sparkSession.sql("CREATE TABLE f.student (id INT, name STRING, age INT)").show();
sparkSession.sql("INSERT into f.student VALUES(0, 'aa', 10), (1,'bb', 12);").show();

sparkSession.sql("create database if not exists hive1.f1");
sparkSession.sql("drop table if exists hive1.f1.scores");
sparkSession.sql("CREATE TABLE hive1.f1.scores (id INT, score INT)").show();
sparkSession.sql("INSERT into hive1.f1.scores VALUES(0, 100), (1, 98)").show();

sparkSession
.sql(
"select f.student.id, name, age, score from hive.f.student JOIN hive1.f1.scores ON f.student.id = hive1.f1.scores.id")
.show();
}

@Test
public void testTestCreateDatabase() {
sparkSession.sql("create database if not exists db_create2");
sparkSession.sql("show databases").show();
}

@Test
public void testHiveTable() {
public void testCreateHiveTable() {
sparkSession.sql("use default");
sparkSession.sql("drop table if exists student");
sparkSession.sql("drop table if exists student1");
sparkSession.sql(
"CREATE TABLE default.student (id INT, name STRING, age INT)\n"
// + " USING CSV\n"
+ " PARTITIONED BY (age)\n"
+ " CLUSTERED BY (Id) SORTED BY (name) INTO 4 buckets ROW FORMAT DELIMITED FIELDS TERMINATED BY ','\n"
+ " STORED AS TEXTFILE TBLPROPERTIES ('foo'='bar')\n"
+ " LOCATION '/tmp/family/' \n"
+ " COMMENT 'this is a comment';");
sparkSession.sql("create table student1 as select * from default.student").show();
}

@Test
public void testHiveDML() {
sparkSession.sql("create database if not exists db");
sparkSession.sql("drop table if exists db.student");
sparkSession.sql("CREATE TABLE db.student (id INT, name STRING, age INT)").show();
sparkSession.sql("use db");
sparkSession.sql("CREATE TABLE student (id INT, name STRING, age INT)").show();
sparkSession.sql("desc db.student").show();
sparkSession.sql("INSERT into db.student VALUES(0, 'aa', 10), (1,'bb', 12);").show();
sparkSession.sql("select * from db.student;").show();
sparkSession.sql("drop table if exists db.student1");
sparkSession.sql("create table db.student1 as select * from db.student limit 1");
sparkSession.sql("INSERT into db.student1 select * from db.student limit 1");
sparkSession.sql("select * from db.student1;").show();
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.datastrato.gravitino.spark.TypeConverter;
import com.google.common.base.Preconditions;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
Expand Down Expand Up @@ -62,16 +63,23 @@ public Table createTable(
Identifier ident, Column[] columns, Transform[] partitions, Map<String, String> properties)
throws TableAlreadyExistsException, NoSuchNamespaceException {
NameIdentifier nameIdentifier =
NameIdentifier.of(metalakeName, catalogName, ident.namespace()[0], ident.name());
NameIdentifier.of(metalakeName, catalogName, getDatabase(ident), ident.name());
ColumnDTO[] gravitinoColumns =
Arrays.stream(columns)
.map(column -> createGravitinoColumn(column))
.toArray(ColumnDTO[]::new);

Map<String, String> gravitinoProperties = new HashMap<>();
gravitinoProperties.putAll(properties);
String comment = gravitinoProperties.remove("comment");
if (comment == null) {
comment = "";
}

com.datastrato.gravitino.rel.Table table =
gravitinoCatalog
.asTableCatalog()
.createTable(nameIdentifier, gravitinoColumns, "", properties);
.createTable(nameIdentifier, gravitinoColumns, comment, gravitinoProperties);

return createGravitinoTable(ident, table);
}
Expand All @@ -83,7 +91,7 @@ public Table loadTable(Identifier ident) throws NoSuchTableException {
gravitinoCatalog
.asTableCatalog()
.loadTable(
NameIdentifier.of(metalakeName, catalogName, ident.namespace()[0], ident.name()));
NameIdentifier.of(metalakeName, catalogName, getDatabase(ident), ident.name()));
return createGravitinoTable(ident, table);
} catch (com.datastrato.gravitino.exceptions.NoSuchTableException e) {
throw new NoSuchTableException(ident);
Expand All @@ -107,8 +115,7 @@ public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchT
public boolean dropTable(Identifier ident) {
return gravitinoCatalog
.asTableCatalog()
.dropTable(
NameIdentifier.of(metalakeName, catalogName, ident.namespace()[0], ident.name()));
.dropTable(NameIdentifier.of(metalakeName, catalogName, getDatabase(ident), ident.name()));
}

@Override
Expand Down Expand Up @@ -176,6 +183,14 @@ public boolean dropNamespace(String[] namespace, boolean cascade)
return false;
}

private String getDatabase(Identifier ident) {
String database = "default";
if (ident.namespace().length > 0) {
database = ident.namespace()[0];
}
return database;
}

abstract Table createGravitinoTable(
Identifier identifier, com.datastrato.gravitino.rel.Table gravitinoTable);

Expand Down

0 comments on commit e8dc07a

Please sign in to comment.