Skip to content

Commit

Permalink
[#2620] feat(spark-connector): support hive table format properties (#…
Browse files Browse the repository at this point in the history
…2605)

### What changes were proposed in this pull request?
support hive table format properties
```sql
CREATE TABLE xxx STORED AS PARQUET 
CREATE TABLE xxx USING PARQUET 
CREATE TABLE xxx ROW FORMAT SERDE xx STORED AS INPUTFORMAT xx OUTPUTFORMAT xx 
CREATE TABLE xxx ROW FORMAT DELIMITED FIELDS TERMINATED xx
```

### Why are the changes needed?
Fix: #2620

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
UT and IT
  • Loading branch information
FANNG1 authored Mar 29, 2024
1 parent d89c833 commit c6f08c6
Show file tree
Hide file tree
Showing 12 changed files with 508 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ public class HiveTablePropertiesMetadata extends BasePropertiesMetadata {
@VisibleForTesting
public static final String ORC_SERDE_CLASS = "org.apache.hadoop.hive.ql.io.orc.OrcSerde";

private static final String PARQUET_INPUT_FORMAT_CLASS =
public static final String PARQUET_INPUT_FORMAT_CLASS =
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat";
private static final String PARQUET_OUTPUT_FORMAT_CLASS =
public static final String PARQUET_OUTPUT_FORMAT_CLASS =
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat";
private static final String PARQUET_SERDE_CLASS =
public static final String PARQUET_SERDE_CLASS =
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe";
private static final String COLUMNAR_SERDE_CLASS =
"org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe";
Expand Down Expand Up @@ -89,7 +89,10 @@ public enum TableType {
VIRTUAL_INDEX,
}

enum StorageFormat {
// In embedded test mode, HiveTablePropertiesMetadata will be loaded by spark connector which has
// different classloaders with Hive catalog. If StorageFormat is package scope, it couldn't
// be accessed by Hive catalog related classes in same package, so making it public.
public enum StorageFormat {
SEQUENCEFILE(
SEQUENCEFILE_INPUT_FORMAT_CLASS, SEQUENCEFILE_OUTPUT_FORMAT_CLASS, LAZY_SIMPLE_SERDE_CLASS),
TEXTFILE(TEXT_INPUT_FORMAT_CLASS, IGNORE_KEY_OUTPUT_FORMAT_CLASS, LAZY_SIMPLE_SERDE_CLASS),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.datastrato.gravitino.StringIdentifier;
import com.datastrato.gravitino.connector.BasePropertiesMetadata;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -14,13 +15,15 @@ public abstract class JdbcTablePropertiesMetadata extends BasePropertiesMetadata

public static final String COMMENT_KEY = "comment";

protected Map<String, String> transformToJdbcProperties(Map<String, String> properties) {
@VisibleForTesting
public Map<String, String> transformToJdbcProperties(Map<String, String> properties) {
HashMap<String, String> resultProperties = Maps.newHashMap(properties);
resultProperties.remove(StringIdentifier.ID_KEY);
return resultProperties;
}

protected Map<String, String> convertFromJdbcProperties(Map<String, String> properties) {
@VisibleForTesting
public Map<String, String> convertFromJdbcProperties(Map<String, String> properties) {
return properties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public Map<String, String> transformToJdbcProperties(Map<String, String> propert
}

@Override
protected Map<String, String> convertFromJdbcProperties(Map<String, String> properties) {
public Map<String, String> convertFromJdbcProperties(Map<String, String> properties) {
BidiMap<String, String> mysqlConfigToGravitino = GRAVITINO_CONFIG_TO_MYSQL.inverseBidiMap();
return Collections.unmodifiableMap(
new HashMap<String, String>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -116,6 +117,7 @@ void testCreateAndLoadSchema() {
@Test
void testAlterSchema() {
String testDatabaseName = "t_alter";
dropDatabaseIfExists(testDatabaseName);
sql("CREATE DATABASE " + testDatabaseName);
Assertions.assertTrue(
StringUtils.isBlank(getDatabaseMetadata(testDatabaseName).get("Properties")));
Expand Down Expand Up @@ -174,6 +176,7 @@ void testCreateTableWithDatabase() {
createDatabaseIfNotExists(databaseName);
String tableIdentifier = String.join(".", databaseName, tableName);

dropTableIfExists(tableIdentifier);
createSimpleTable(tableIdentifier);
SparkTableInfo tableInfo = getTableInfo(tableIdentifier);
SparkTableInfoChecker checker =
Expand All @@ -187,6 +190,7 @@ void testCreateTableWithDatabase() {
createDatabaseIfNotExists(databaseName);

sql("USE " + databaseName);
dropTableIfExists(tableName);
createSimpleTable(tableName);
tableInfo = getTableInfo(tableName);
checker =
Expand Down Expand Up @@ -257,6 +261,8 @@ void testRenameTable() {
void testListTable() {
String table1 = "list1";
String table2 = "list2";
dropTableIfExists(table1);
dropTableIfExists(table2);
createSimpleTable(table1);
createSimpleTable(table2);
Set<String> tables = listTableNames();
Expand All @@ -268,6 +274,8 @@ void testListTable() {
String table3 = "list3";
String table4 = "list4";
createDatabaseIfNotExists(database);
dropTableIfExists(String.join(".", database, table3));
dropTableIfExists(String.join(".", database, table4));
createSimpleTable(String.join(".", database, table3));
createSimpleTable(String.join(".", database, table4));
tables = listTableNames(database);
Expand Down Expand Up @@ -590,6 +598,23 @@ protected void checkDirExists(Path dir) {
}
}

@Test
void testTableOptions() {
String tableName = "options_table";
dropTableIfExists(tableName);
String createTableSql = getCreateSimpleTableString(tableName);
createTableSql += " OPTIONS('a'='b')";
sql(createTableSql);
SparkTableInfo tableInfo = getTableInfo(tableName);

SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
.withName(tableName)
.withTableProperties(ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "b"));
checker.check(tableInfo);
checkTableReadWrite(tableInfo);
}

protected void checkTableReadWrite(SparkTableInfo table) {
String name = table.getTableIdentifier();
boolean isPartitionTable = table.isPartitionTable();
Expand Down Expand Up @@ -687,4 +712,9 @@ private String getPartitionExpression(SparkTableInfo table, String delimiter) {
.map(column -> column.getName() + "=" + typeConstant.get(column.getType()))
.collect(Collectors.joining(delimiter));
}

protected void checkParquetFile(SparkTableInfo tableInfo) {
String location = tableInfo.getTableLocation();
Assertions.assertDoesNotThrow(() -> getSparkSession().read().parquet(location).printSchema());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo;
import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo.SparkColumnInfo;
import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfoChecker;
import com.datastrato.gravitino.spark.connector.hive.HivePropertiesConstants;
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.types.DataTypes;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
Expand Down Expand Up @@ -150,4 +153,172 @@ public void testInsertHiveFormatPartitionTableAsSelect() {
Assertions.assertTrue(tableData.size() == 1);
Assertions.assertEquals(expectedData, tableData.get(0));
}

@Test
void testHiveDefaultFormat() {
String tableName = "hive_default_format_table";
dropTableIfExists(tableName);
createSimpleTable(tableName);
SparkTableInfo tableInfo = getTableInfo(tableName);

SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
.withName(tableName)
.withTableProperties(
ImmutableMap.of(
HivePropertiesConstants.SPARK_HIVE_INPUT_FORMAT,
HivePropertiesConstants.TEXT_INPUT_FORMAT_CLASS,
HivePropertiesConstants.SPARK_HIVE_OUTPUT_FORMAT,
HivePropertiesConstants.IGNORE_KEY_OUTPUT_FORMAT_CLASS,
HivePropertiesConstants.SPARK_HIVE_SERDE_LIB,
HivePropertiesConstants.LAZY_SIMPLE_SERDE_CLASS));
checker.check(tableInfo);
checkTableReadWrite(tableInfo);
}

@Test
void testHiveFormatWithStoredAs() {
String tableName = "test_hive_format_stored_as_table";
dropTableIfExists(tableName);
String createTableSql = getCreateSimpleTableString(tableName);
createTableSql += "STORED AS PARQUET";
sql(createTableSql);
SparkTableInfo tableInfo = getTableInfo(tableName);

SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
.withName(tableName)
.withTableProperties(
ImmutableMap.of(
HivePropertiesConstants.SPARK_HIVE_INPUT_FORMAT,
HivePropertiesConstants.PARQUET_INPUT_FORMAT_CLASS,
HivePropertiesConstants.SPARK_HIVE_OUTPUT_FORMAT,
HivePropertiesConstants.PARQUET_OUTPUT_FORMAT_CLASS,
HivePropertiesConstants.SPARK_HIVE_SERDE_LIB,
HivePropertiesConstants.PARQUET_SERDE_CLASS));
checker.check(tableInfo);
checkTableReadWrite(tableInfo);
checkParquetFile(tableInfo);
}

@Test
void testHiveFormatWithUsing() {
String tableName = "test_hive_format_using_table";
dropTableIfExists(tableName);
String createTableSql = getCreateSimpleTableString(tableName);
createTableSql += "USING PARQUET";
sql(createTableSql);
SparkTableInfo tableInfo = getTableInfo(tableName);

SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
.withName(tableName)
.withTableProperties(
ImmutableMap.of(
HivePropertiesConstants.SPARK_HIVE_INPUT_FORMAT,
HivePropertiesConstants.PARQUET_INPUT_FORMAT_CLASS,
HivePropertiesConstants.SPARK_HIVE_OUTPUT_FORMAT,
HivePropertiesConstants.PARQUET_OUTPUT_FORMAT_CLASS,
HivePropertiesConstants.SPARK_HIVE_SERDE_LIB,
HivePropertiesConstants.PARQUET_SERDE_CLASS));
checker.check(tableInfo);
checkTableReadWrite(tableInfo);
checkParquetFile(tableInfo);
}

@Test
void testHivePropertiesWithSerdeRowFormat() {
String tableName = "test_hive_row_serde_table";
dropTableIfExists(tableName);
String createTableSql = getCreateSimpleTableString(tableName);
createTableSql =
String.format(
"%s ROW FORMAT SERDE '%s' WITH SERDEPROPERTIES ('serialization.format'='@', 'field.delim' = ',') STORED AS INPUTFORMAT '%s' OUTPUTFORMAT '%s'",
createTableSql,
HivePropertiesConstants.PARQUET_SERDE_CLASS,
HivePropertiesConstants.PARQUET_INPUT_FORMAT_CLASS,
HivePropertiesConstants.PARQUET_OUTPUT_FORMAT_CLASS);
sql(createTableSql);
SparkTableInfo tableInfo = getTableInfo(tableName);

SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
.withName(tableName)
.withTableProperties(
ImmutableMap.of(
TableCatalog.OPTION_PREFIX + "serialization.format",
"@",
TableCatalog.OPTION_PREFIX + "field.delim",
",",
HivePropertiesConstants.SPARK_HIVE_INPUT_FORMAT,
HivePropertiesConstants.PARQUET_INPUT_FORMAT_CLASS,
HivePropertiesConstants.SPARK_HIVE_OUTPUT_FORMAT,
HivePropertiesConstants.PARQUET_OUTPUT_FORMAT_CLASS,
HivePropertiesConstants.SPARK_HIVE_SERDE_LIB,
HivePropertiesConstants.PARQUET_SERDE_CLASS));
checker.check(tableInfo);
checkTableReadWrite(tableInfo);
checkParquetFile(tableInfo);
}

/*
| DELIMITED [ FIELDS TERMINATED BY fields_terminated_char [ ESCAPED BY escaped_char ] ]
[ COLLECTION ITEMS TERMINATED BY collection_items_terminated_char ]
[ MAP KEYS TERMINATED BY map_key_terminated_char ]
[ LINES TERMINATED BY row_terminated_char ]
[ NULL DEFINED AS null_char ]
*/
@Test
void testHivePropertiesWithDelimitedRowFormat() {
String tableName = "test_hive_row_format_table";
dropTableIfExists(tableName);
String createTableSql = getCreateSimpleTableString(tableName);
createTableSql +=
"ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ESCAPED BY ';' "
+ "COLLECTION ITEMS TERMINATED BY '@' "
+ "MAP KEYS TERMINATED BY ':' "
+ "NULL DEFINED AS 'n' "
+ "STORED AS TEXTFILE";
sql(createTableSql);
SparkTableInfo tableInfo = getTableInfo(tableName);

SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
.withName(tableName)
.withTableProperties(
ImmutableMap.of(
TableCatalog.OPTION_PREFIX + "field.delim",
",",
TableCatalog.OPTION_PREFIX + "escape.delim",
";",
TableCatalog.OPTION_PREFIX + "mapkey.delim",
":",
TableCatalog.OPTION_PREFIX + "serialization.format",
",",
TableCatalog.OPTION_PREFIX + "colelction.delim",
"@",
HivePropertiesConstants.SPARK_HIVE_INPUT_FORMAT,
HivePropertiesConstants.TEXT_INPUT_FORMAT_CLASS,
HivePropertiesConstants.SPARK_HIVE_OUTPUT_FORMAT,
HivePropertiesConstants.IGNORE_KEY_OUTPUT_FORMAT_CLASS,
HivePropertiesConstants.SPARK_HIVE_SERDE_LIB,
HivePropertiesConstants.LAZY_SIMPLE_SERDE_CLASS));
checker.check(tableInfo);
checkTableReadWrite(tableInfo);

// check it's a text file and field.delim take effects
List<Object[]> rows =
rowsToJava(
getSparkSession()
.read()
.option("delimiter", ",")
.csv(tableInfo.getTableLocation())
.collectAsList());
Assertions.assertTrue(rows.size() == 1);
Object[] row = rows.get(0);
Assertions.assertEquals(3, row.length);
Assertions.assertEquals("2", row[0]);
Assertions.assertEquals("gravitino_it_test", (String) row[1]);
Assertions.assertEquals("2", row[2]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import javax.ws.rs.NotSupportedException;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.expressions.BucketTransform;
import org.apache.spark.sql.connector.expressions.IdentityTransform;
import org.apache.spark.sql.connector.expressions.SortedBucketTransform;
Expand All @@ -43,6 +44,10 @@ public String getTableName() {
return tableName;
}

public String getTableLocation() {
return tableProperties.get(TableCatalog.PROP_LOCATION);
}

// Include database name and table name
public String getTableIdentifier() {
if (StringUtils.isNotBlank(database)) {
Expand All @@ -52,10 +57,6 @@ public String getTableIdentifier() {
}
}

public String getTableLocation() {
return tableProperties.get(ConnectorConstants.LOCATION);
}

public boolean isPartitionTable() {
return partitions.size() > 0;
}
Expand Down
Loading

0 comments on commit c6f08c6

Please sign in to comment.