Skip to content

Commit

Permalink
hive properties
Browse files Browse the repository at this point in the history
  • Loading branch information
FANNG1 committed Mar 20, 2024
1 parent b53ff6b commit bc0e4cb
Show file tree
Hide file tree
Showing 11 changed files with 382 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,23 @@ void testComplexType() {
checkTableReadWrite(tableInfo);
}

@Test
void testTableOptions() {
String tableName = "simple_table";
dropTableIfExists(tableName);
String createTableSql = getCreateSimpleTableString(tableName);
createTableSql += " OPTIONS('a'='b')";
sql(createTableSql);
SparkTableInfo tableInfo = getTableInfo(tableName);

SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
.withName(tableName)
.withTableProperties(ImmutableMap.of("a", "b"));
checker.check(tableInfo);
checkTableReadWrite(tableInfo);
}

private void checkTableColumns(
String tableName, List<SparkColumnInfo> columnInfos, SparkTableInfo tableInfo) {
SparkTableInfoChecker.create()
Expand All @@ -451,7 +468,7 @@ private void checkTableColumns(
.check(tableInfo);
}

private void checkTableReadWrite(SparkTableInfo table) {
protected void checkTableReadWrite(SparkTableInfo table) {
String name = table.getTableIdentifier();
String insertValues =
table.getColumns().stream()
Expand Down Expand Up @@ -514,7 +531,7 @@ private void checkTableReadWrite(SparkTableInfo table) {
Assertions.assertEquals(checkValues, queryResult.get(0));
}

private String getCreateSimpleTableString(String tableName) {
protected String getCreateSimpleTableString(String tableName) {
return String.format(
"CREATE TABLE %s (id INT COMMENT 'id comment', name STRING COMMENT '', age INT)",
tableName);
Expand All @@ -529,8 +546,13 @@ private List<SparkColumnInfo> getSimpleTableColumn() {

// Helper method to create a simple table, and could use corresponding
// getSimpleTableColumn to check table column.
private void createSimpleTable(String identifier) {
protected void createSimpleTable(String identifier) {
String createTableSql = getCreateSimpleTableString(identifier);
sql(createTableSql);
}

protected void checkParquetFile(SparkTableInfo tableInfo) {
String location = tableInfo.getTableLocation();
Assertions.assertDoesNotThrow(() -> getSparkSession().read().parquet(location).printSchema());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public abstract class SparkEnvIT extends SparkUtilIT {
private final String metalakeName = "test";

private SparkSession sparkSession;
private String hiveMetastoreUri;
private String gravitinoUri;
private String hiveMetastoreUri = "thrift://127.0.0.1:9083";
private String gravitinoUri = "http://127.0.0.1:8090";

protected abstract String getCatalogName();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,14 @@
package com.datastrato.gravitino.integration.test.spark.hive;

import com.datastrato.gravitino.integration.test.spark.SparkCommonIT;
import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo;
import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfoChecker;
import com.datastrato.gravitino.spark.connector.hive.HivePropertyConstants;
import com.google.common.collect.ImmutableMap;
import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

@Tag("gravitino-docker-it")
Expand All @@ -21,4 +28,165 @@ protected String getCatalogName() {
protected String getProvider() {
return "hive";
}

@Test
void testHiveDefaultFormat() {
String tableName = "hive_default_format_table";
dropTableIfExists(tableName);
createSimpleTable(tableName);
SparkTableInfo tableInfo = getTableInfo(tableName);

SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
.withName(tableName)
.withTableProperties(
ImmutableMap.of(
HivePropertyConstants.SPARK_HIVE_INPUT_FORMAT,
HivePropertyConstants.TEXT_INPUT_FORMAT_CLASS,
HivePropertyConstants.SPARK_HIVE_OUTPUT_FORMAT,
HivePropertyConstants.IGNORE_KEY_OUTPUT_FORMAT_CLASS,
HivePropertyConstants.SPARK_HIVE_SERDE_LIB,
HivePropertyConstants.LAZY_SIMPLE_SERDE_CLASS));
checker.check(tableInfo);
checkTableReadWrite(tableInfo);
}

@Test
void testHiveFormatWithStoredAs() {
// TEXTFILE, ORC, PARQUET
String tableName = "test_table_property_stored_as_table";
dropTableIfExists(tableName);
String createTableSql = getCreateSimpleTableString(tableName);
createTableSql += "STORED AS PARQUET";
sql(createTableSql);
SparkTableInfo tableInfo = getTableInfo(tableName);

SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
.withName(tableName)
.withTableProperties(
ImmutableMap.of(
HivePropertyConstants.SPARK_HIVE_INPUT_FORMAT,
HivePropertyConstants.PARQUET_INPUT_FORMAT_CLASS,
HivePropertyConstants.SPARK_HIVE_OUTPUT_FORMAT,
HivePropertyConstants.PARQUET_OUTPUT_FORMAT_CLASS,
HivePropertyConstants.SPARK_HIVE_SERDE_LIB,
HivePropertyConstants.PARQUET_SERDE_CLASS));
checker.check(tableInfo);
checkTableReadWrite(tableInfo);
checkParquetFile(tableInfo);
}

@Test
void testHiveFormatWithUsing() {
String tableName = "test_hive_format_using_table";
dropTableIfExists(tableName);
String createTableSql = getCreateSimpleTableString(tableName);
// CSV, TXT, ORC, JDBC, PARQUET
createTableSql += "USING PARQUET";
sql(createTableSql);
SparkTableInfo tableInfo = getTableInfo(tableName);

SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
.withName(tableName)
.withTableProperties(
ImmutableMap.of(
HivePropertyConstants.SPARK_HIVE_INPUT_FORMAT,
HivePropertyConstants.PARQUET_INPUT_FORMAT_CLASS,
HivePropertyConstants.SPARK_HIVE_OUTPUT_FORMAT,
HivePropertyConstants.PARQUET_OUTPUT_FORMAT_CLASS,
HivePropertyConstants.SPARK_HIVE_SERDE_LIB,
HivePropertyConstants.PARQUET_SERDE_CLASS));
checker.check(tableInfo);
checkTableReadWrite(tableInfo);
checkParquetFile(tableInfo);
}

@Test
void testHivePropertiesWithSerdeRowFormat() {
String tableName = "test_hive_row_serde_table";
dropTableIfExists(tableName);
String createTableSql = getCreateSimpleTableString(tableName);
createTableSql =
String.format(
"%s ROW FORMAT SERDE '%s' WITH SERDEPROPERTIES ('serialization.format'='@', 'field.delim' = ',') STORED AS INPUTFORMAT '%s' OUTPUTFORMAT '%s'",
createTableSql,
HivePropertyConstants.PARQUET_SERDE_CLASS,
HivePropertyConstants.PARQUET_INPUT_FORMAT_CLASS,
HivePropertyConstants.PARQUET_OUTPUT_FORMAT_CLASS);
sql(createTableSql);
SparkTableInfo tableInfo = getTableInfo(tableName);

SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
.withName(tableName)
.withTableProperties(
ImmutableMap.of(
"serialization.format",
"@",
"field.delim",
",",
HivePropertyConstants.SPARK_HIVE_INPUT_FORMAT,
HivePropertyConstants.PARQUET_INPUT_FORMAT_CLASS,
HivePropertyConstants.SPARK_HIVE_OUTPUT_FORMAT,
HivePropertyConstants.PARQUET_OUTPUT_FORMAT_CLASS,
HivePropertyConstants.SPARK_HIVE_SERDE_LIB,
HivePropertyConstants.PARQUET_SERDE_CLASS));
checker.check(tableInfo);
checkTableReadWrite(tableInfo);
checkParquetFile(tableInfo);
}

/*
| DELIMITED [ FIELDS TERMINATED BY fields_terminated_char [ ESCAPED BY escaped_char ] ]
[ COLLECTION ITEMS TERMINATED BY collection_items_terminated_char ]
[ MAP KEYS TERMINATED BY map_key_terminated_char ]
[ LINES TERMINATED BY row_terminated_char ]
[ NULL DEFINED AS null_char ]
*/
@Test
void testHivePropertiesWithDelimitedRowFormat() {
String tableName = "test_hive_row_format_table";
dropTableIfExists(tableName);
String createTableSql = getCreateSimpleTableString(tableName);
createTableSql +=
"ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ESCAPED BY ';' "
+ "COLLECTION ITEMS TERMINATED BY '@' "
+ "MAP KEYS TERMINATED BY ':' "
+ "LINES TERMINATED BY '\\n' "
+ "NULL DEFINED AS 'n' "
+ "STORED AS TEXTFILE";
System.out.println(createTableSql);
sql(createTableSql);
SparkTableInfo tableInfo = getTableInfo(tableName);

SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
.withName(tableName)
.withTableProperties(
ImmutableMap.of(
"field.delim", ",",
"escape.delim", ";",
"mapkey.delim", ":",
"serialization.format", ",",
"colelction.delim", "@"));
checker.check(tableInfo);
checkTableReadWrite(tableInfo);

// check field.delim take effects
List<Object[]> rows =
rowsToJava(
getSparkSession()
.read()
.option("delimiter", ",")
.csv(tableInfo.getTableLocation())
.collectAsList());
Assertions.assertTrue(rows.size() == 1);
Object[] row = rows.get(0);
Assertions.assertEquals(3, row.length);
Assertions.assertEquals("2", row[0]);
Assertions.assertEquals("gravitino_it_test", (String) row[1]);
Assertions.assertEquals("2", row[2]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package com.datastrato.gravitino.integration.test.util.spark;

import com.datastrato.gravitino.spark.connector.ConnectorConstants;
import com.datastrato.gravitino.spark.connector.hive.HivePropertyConstants;
import com.datastrato.gravitino.spark.connector.table.SparkBaseTable;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -33,6 +34,10 @@ public String getTableName() {
return tableName;
}

public String getTableLocation() {
return tableProperties.get(HivePropertyConstants.GRAVITINO_HIVE_LOCATION);
}

// Include database name and table name
public String getTableIdentifier() {
if (StringUtils.isNotBlank(database)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo.SparkColumnInfo;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.Assertions;

/**
Expand All @@ -28,6 +29,7 @@ private enum CheckField {
NAME,
COLUMN,
COMMENT,
TABLE_PROPERTY,
}

public SparkTableInfoChecker withName(String name) {
Expand All @@ -48,6 +50,12 @@ public SparkTableInfoChecker withComment(String comment) {
return this;
}

public SparkTableInfoChecker withTableProperties(Map<String, String> properties) {
this.expectedTableInfo.setTableProperties(properties);
this.checkFields.add(CheckField.TABLE_PROPERTY);
return this;
}

public void check(SparkTableInfo realTableInfo) {
checkFields.stream()
.forEach(
Expand All @@ -65,6 +73,17 @@ public void check(SparkTableInfo realTableInfo) {
Assertions.assertEquals(
expectedTableInfo.getComment(), realTableInfo.getComment());
break;
case TABLE_PROPERTY:
Map<String, String> realTableProperties = realTableInfo.getTableProperties();
expectedTableInfo
.getTableProperties()
.forEach(
(k, v) -> {
Assertions.assertTrue(
realTableProperties.containsKey(k), k + " not exits");
Assertions.assertEquals(v, realTableProperties.get(k));
});
break;
default:
Assertions.fail(checkField + " not checked");
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ protected boolean tableExists(String tableName) {
}
}

private List<Object[]> rowsToJava(List<Row> rows) {
protected List<Object[]> rowsToJava(List<Row> rows) {
return rows.stream().map(this::toJava).collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,29 @@

package com.datastrato.gravitino.spark.connector;

import com.datastrato.gravitino.spark.connector.hive.HivePropertyConstants;
import java.util.Map;
import java.util.stream.Collectors;

/** Transform table properties between Gravitino and Spark. */
public interface PropertiesConverter {
Map<String, String> toGravitinoTableProperties(Map<String, String> properties);

Map<String, String> toSparkTableProperties(Map<String, String> properties);

static Map<String, String> transformOptionProperties(Map<String, String> properties) {
return properties.entrySet().stream()
.collect(
Collectors.toMap(
entry -> {
String key = entry.getKey();
if (key.startsWith(HivePropertyConstants.SPARK_OPTION_PREFIX)) {
return key.substring(HivePropertyConstants.SPARK_OPTION_PREFIX.length());
} else {
return key;
}
},
entry -> entry.getValue(),
(existingValue, newValue) -> existingValue));
}
}
Loading

0 comments on commit bc0e4cb

Please sign in to comment.