Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2620] feat(spark-connector): support hive table format properties #2605

Merged
merged 11 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ public class HiveTablePropertiesMetadata extends BasePropertiesMetadata {
@VisibleForTesting
public static final String ORC_SERDE_CLASS = "org.apache.hadoop.hive.ql.io.orc.OrcSerde";

private static final String PARQUET_INPUT_FORMAT_CLASS =
public static final String PARQUET_INPUT_FORMAT_CLASS =
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat";
private static final String PARQUET_OUTPUT_FORMAT_CLASS =
public static final String PARQUET_OUTPUT_FORMAT_CLASS =
"org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat";
private static final String PARQUET_SERDE_CLASS =
public static final String PARQUET_SERDE_CLASS =
"org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe";
private static final String COLUMNAR_SERDE_CLASS =
"org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe";
Expand Down Expand Up @@ -89,7 +89,10 @@ public enum TableType {
VIRTUAL_INDEX,
}

enum StorageFormat {
// In embedded test mode, HiveTablePropertiesMetadata will be loaded by spark connector which has
// different classloaders with Hive catalog. If StorageFormat is package scope, it couldn't
// be accessed by Hive catalog related classes in same package, so making it public.
public enum StorageFormat {
SEQUENCEFILE(
SEQUENCEFILE_INPUT_FORMAT_CLASS, SEQUENCEFILE_OUTPUT_FORMAT_CLASS, LAZY_SIMPLE_SERDE_CLASS),
TEXTFILE(TEXT_INPUT_FORMAT_CLASS, IGNORE_KEY_OUTPUT_FORMAT_CLASS, LAZY_SIMPLE_SERDE_CLASS),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.datastrato.gravitino.StringIdentifier;
import com.datastrato.gravitino.connector.BasePropertiesMetadata;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -14,13 +15,15 @@ public abstract class JdbcTablePropertiesMetadata extends BasePropertiesMetadata

public static final String COMMENT_KEY = "comment";

protected Map<String, String> transformToJdbcProperties(Map<String, String> properties) {
@VisibleForTesting
public Map<String, String> transformToJdbcProperties(Map<String, String> properties) {
HashMap<String, String> resultProperties = Maps.newHashMap(properties);
resultProperties.remove(StringIdentifier.ID_KEY);
return resultProperties;
}

protected Map<String, String> convertFromJdbcProperties(Map<String, String> properties) {
@VisibleForTesting
public Map<String, String> convertFromJdbcProperties(Map<String, String> properties) {
return properties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public Map<String, String> transformToJdbcProperties(Map<String, String> propert
}

@Override
protected Map<String, String> convertFromJdbcProperties(Map<String, String> properties) {
public Map<String, String> convertFromJdbcProperties(Map<String, String> properties) {
BidiMap<String, String> mysqlConfigToGravitino = GRAVITINO_CONFIG_TO_MYSQL.inverseBidiMap();
return Collections.unmodifiableMap(
new HashMap<String, String>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -116,6 +117,7 @@ void testCreateAndLoadSchema() {
@Test
void testAlterSchema() {
String testDatabaseName = "t_alter";
dropDatabaseIfExists(testDatabaseName);
sql("CREATE DATABASE " + testDatabaseName);
Assertions.assertTrue(
StringUtils.isBlank(getDatabaseMetadata(testDatabaseName).get("Properties")));
Expand Down Expand Up @@ -174,6 +176,7 @@ void testCreateTableWithDatabase() {
createDatabaseIfNotExists(databaseName);
String tableIdentifier = String.join(".", databaseName, tableName);

dropTableIfExists(tableIdentifier);
createSimpleTable(tableIdentifier);
SparkTableInfo tableInfo = getTableInfo(tableIdentifier);
SparkTableInfoChecker checker =
Expand All @@ -187,6 +190,7 @@ void testCreateTableWithDatabase() {
createDatabaseIfNotExists(databaseName);

sql("USE " + databaseName);
dropTableIfExists(tableName);
createSimpleTable(tableName);
tableInfo = getTableInfo(tableName);
checker =
Expand Down Expand Up @@ -257,6 +261,8 @@ void testRenameTable() {
void testListTable() {
String table1 = "list1";
String table2 = "list2";
dropTableIfExists(table1);
dropTableIfExists(table2);
createSimpleTable(table1);
createSimpleTable(table2);
Set<String> tables = listTableNames();
Expand All @@ -268,6 +274,8 @@ void testListTable() {
String table3 = "list3";
String table4 = "list4";
createDatabaseIfNotExists(database);
dropTableIfExists(String.join(".", database, table3));
dropTableIfExists(String.join(".", database, table4));
createSimpleTable(String.join(".", database, table3));
createSimpleTable(String.join(".", database, table4));
tables = listTableNames(database);
Expand Down Expand Up @@ -590,6 +598,23 @@ protected void checkDirExists(Path dir) {
}
}

@Test
void testTableOptions() {
String tableName = "options_table";
dropTableIfExists(tableName);
String createTableSql = getCreateSimpleTableString(tableName);
createTableSql += " OPTIONS('a'='b')";
sql(createTableSql);
SparkTableInfo tableInfo = getTableInfo(tableName);

SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
.withName(tableName)
.withTableProperties(ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "b"));
checker.check(tableInfo);
checkTableReadWrite(tableInfo);
}

protected void checkTableReadWrite(SparkTableInfo table) {
String name = table.getTableIdentifier();
boolean isPartitionTable = table.isPartitionTable();
Expand Down Expand Up @@ -687,4 +712,9 @@ private String getPartitionExpression(SparkTableInfo table, String delimiter) {
.map(column -> column.getName() + "=" + typeConstant.get(column.getType()))
.collect(Collectors.joining(delimiter));
}

protected void checkParquetFile(SparkTableInfo tableInfo) {
String location = tableInfo.getTableLocation();
Assertions.assertDoesNotThrow(() -> getSparkSession().read().parquet(location).printSchema());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo;
import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfo.SparkColumnInfo;
import com.datastrato.gravitino.integration.test.util.spark.SparkTableInfoChecker;
import com.datastrato.gravitino.spark.connector.hive.HivePropertiesConstants;
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.types.DataTypes;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
Expand Down Expand Up @@ -150,4 +153,172 @@ public void testInsertHiveFormatPartitionTableAsSelect() {
Assertions.assertTrue(tableData.size() == 1);
Assertions.assertEquals(expectedData, tableData.get(0));
}

@Test
void testHiveDefaultFormat() {
String tableName = "hive_default_format_table";
dropTableIfExists(tableName);
createSimpleTable(tableName);
SparkTableInfo tableInfo = getTableInfo(tableName);

SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
.withName(tableName)
.withTableProperties(
ImmutableMap.of(
HivePropertiesConstants.SPARK_HIVE_INPUT_FORMAT,
HivePropertiesConstants.TEXT_INPUT_FORMAT_CLASS,
HivePropertiesConstants.SPARK_HIVE_OUTPUT_FORMAT,
HivePropertiesConstants.IGNORE_KEY_OUTPUT_FORMAT_CLASS,
HivePropertiesConstants.SPARK_HIVE_SERDE_LIB,
HivePropertiesConstants.LAZY_SIMPLE_SERDE_CLASS));
checker.check(tableInfo);
checkTableReadWrite(tableInfo);
}

@Test
void testHiveFormatWithStoredAs() {
String tableName = "test_hive_format_stored_as_table";
dropTableIfExists(tableName);
String createTableSql = getCreateSimpleTableString(tableName);
createTableSql += "STORED AS PARQUET";
sql(createTableSql);
SparkTableInfo tableInfo = getTableInfo(tableName);

SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
.withName(tableName)
.withTableProperties(
ImmutableMap.of(
HivePropertiesConstants.SPARK_HIVE_INPUT_FORMAT,
HivePropertiesConstants.PARQUET_INPUT_FORMAT_CLASS,
HivePropertiesConstants.SPARK_HIVE_OUTPUT_FORMAT,
HivePropertiesConstants.PARQUET_OUTPUT_FORMAT_CLASS,
HivePropertiesConstants.SPARK_HIVE_SERDE_LIB,
HivePropertiesConstants.PARQUET_SERDE_CLASS));
checker.check(tableInfo);
checkTableReadWrite(tableInfo);
checkParquetFile(tableInfo);
}

@Test
void testHiveFormatWithUsing() {
String tableName = "test_hive_format_using_table";
dropTableIfExists(tableName);
String createTableSql = getCreateSimpleTableString(tableName);
createTableSql += "USING PARQUET";
sql(createTableSql);
SparkTableInfo tableInfo = getTableInfo(tableName);

SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
.withName(tableName)
.withTableProperties(
ImmutableMap.of(
HivePropertiesConstants.SPARK_HIVE_INPUT_FORMAT,
HivePropertiesConstants.PARQUET_INPUT_FORMAT_CLASS,
HivePropertiesConstants.SPARK_HIVE_OUTPUT_FORMAT,
HivePropertiesConstants.PARQUET_OUTPUT_FORMAT_CLASS,
HivePropertiesConstants.SPARK_HIVE_SERDE_LIB,
HivePropertiesConstants.PARQUET_SERDE_CLASS));
checker.check(tableInfo);
checkTableReadWrite(tableInfo);
checkParquetFile(tableInfo);
}

@Test
void testHivePropertiesWithSerdeRowFormat() {
String tableName = "test_hive_row_serde_table";
dropTableIfExists(tableName);
String createTableSql = getCreateSimpleTableString(tableName);
createTableSql =
String.format(
"%s ROW FORMAT SERDE '%s' WITH SERDEPROPERTIES ('serialization.format'='@', 'field.delim' = ',') STORED AS INPUTFORMAT '%s' OUTPUTFORMAT '%s'",
createTableSql,
HivePropertiesConstants.PARQUET_SERDE_CLASS,
HivePropertiesConstants.PARQUET_INPUT_FORMAT_CLASS,
HivePropertiesConstants.PARQUET_OUTPUT_FORMAT_CLASS);
sql(createTableSql);
SparkTableInfo tableInfo = getTableInfo(tableName);

SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
.withName(tableName)
.withTableProperties(
ImmutableMap.of(
TableCatalog.OPTION_PREFIX + "serialization.format",
"@",
TableCatalog.OPTION_PREFIX + "field.delim",
",",
HivePropertiesConstants.SPARK_HIVE_INPUT_FORMAT,
HivePropertiesConstants.PARQUET_INPUT_FORMAT_CLASS,
HivePropertiesConstants.SPARK_HIVE_OUTPUT_FORMAT,
HivePropertiesConstants.PARQUET_OUTPUT_FORMAT_CLASS,
HivePropertiesConstants.SPARK_HIVE_SERDE_LIB,
HivePropertiesConstants.PARQUET_SERDE_CLASS));
checker.check(tableInfo);
checkTableReadWrite(tableInfo);
checkParquetFile(tableInfo);
}

/*
| DELIMITED [ FIELDS TERMINATED BY fields_terminated_char [ ESCAPED BY escaped_char ] ]
[ COLLECTION ITEMS TERMINATED BY collection_items_terminated_char ]
[ MAP KEYS TERMINATED BY map_key_terminated_char ]
[ LINES TERMINATED BY row_terminated_char ]
[ NULL DEFINED AS null_char ]
*/
@Test
void testHivePropertiesWithDelimitedRowFormat() {
String tableName = "test_hive_row_format_table";
dropTableIfExists(tableName);
String createTableSql = getCreateSimpleTableString(tableName);
createTableSql +=
"ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ESCAPED BY ';' "
+ "COLLECTION ITEMS TERMINATED BY '@' "
+ "MAP KEYS TERMINATED BY ':' "
+ "NULL DEFINED AS 'n' "
+ "STORED AS TEXTFILE";
sql(createTableSql);
SparkTableInfo tableInfo = getTableInfo(tableName);

SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
.withName(tableName)
.withTableProperties(
ImmutableMap.of(
TableCatalog.OPTION_PREFIX + "field.delim",
",",
TableCatalog.OPTION_PREFIX + "escape.delim",
";",
TableCatalog.OPTION_PREFIX + "mapkey.delim",
":",
TableCatalog.OPTION_PREFIX + "serialization.format",
",",
TableCatalog.OPTION_PREFIX + "colelction.delim",
"@",
HivePropertiesConstants.SPARK_HIVE_INPUT_FORMAT,
HivePropertiesConstants.TEXT_INPUT_FORMAT_CLASS,
HivePropertiesConstants.SPARK_HIVE_OUTPUT_FORMAT,
HivePropertiesConstants.IGNORE_KEY_OUTPUT_FORMAT_CLASS,
HivePropertiesConstants.SPARK_HIVE_SERDE_LIB,
HivePropertiesConstants.LAZY_SIMPLE_SERDE_CLASS));
checker.check(tableInfo);
checkTableReadWrite(tableInfo);

// check it's a text file and field.delim take effects
List<Object[]> rows =
rowsToJava(
getSparkSession()
.read()
.option("delimiter", ",")
.csv(tableInfo.getTableLocation())
.collectAsList());
Assertions.assertTrue(rows.size() == 1);
Object[] row = rows.get(0);
Assertions.assertEquals(3, row.length);
Assertions.assertEquals("2", row[0]);
Assertions.assertEquals("gravitino_it_test", (String) row[1]);
Assertions.assertEquals("2", row[2]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import javax.ws.rs.NotSupportedException;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.expressions.BucketTransform;
import org.apache.spark.sql.connector.expressions.IdentityTransform;
import org.apache.spark.sql.connector.expressions.SortedBucketTransform;
Expand All @@ -43,6 +44,10 @@ public String getTableName() {
return tableName;
}

public String getTableLocation() {
return tableProperties.get(TableCatalog.PROP_LOCATION);
}

// Include database name and table name
public String getTableIdentifier() {
if (StringUtils.isNotBlank(database)) {
Expand All @@ -52,10 +57,6 @@ public String getTableIdentifier() {
}
}

public String getTableLocation() {
return tableProperties.get(ConnectorConstants.LOCATION);
}

public boolean isPartitionTable() {
return partitions.size() > 0;
}
Expand Down
Loading
Loading