Skip to content

Commit

Permalink
[#2736] feat(spark-connector) support hive external table (#2739)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

support hive external table format
`CREATE EXTERNAL TABLE family (id INT, name STRING)`

### Why are the changes needed?


Fix: #2736 

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

UT and IT

---------

Co-authored-by: Charlie Cheng <[email protected]>
  • Loading branch information
charliecheng630 and Charlie Cheng authored Apr 3, 2024
1 parent f7f12d8 commit c21b022
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
Expand Down Expand Up @@ -650,6 +651,21 @@ protected void checkDirExists(Path dir) {
}
}

protected void checkDataFileExists(Path dir) {
Boolean isExists = false;
try {
for (FileStatus fileStatus : hdfs.listStatus(dir)) {
if (fileStatus.isFile()) {
isExists = true;
break;
}
}
Assertions.assertTrue(isExists);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Test
void testTableOptions() {
String tableName = "options_table";
Expand Down Expand Up @@ -726,9 +742,17 @@ protected String getExpectedTableData(SparkTableInfo table) {
}

protected String getCreateSimpleTableString(String tableName) {
return getCreateSimpleTableString(tableName, false);
}

protected String getCreateSimpleTableString(String tableName, boolean isExternal) {
String external = "";
if (isExternal) {
external = "EXTERNAL";
}
return String.format(
"CREATE TABLE %s (id INT COMMENT 'id comment', name STRING COMMENT '', age INT)",
tableName);
"CREATE %s TABLE %s (id INT COMMENT 'id comment', name STRING COMMENT '', age INT)",
external, tableName);
}

protected List<SparkColumnInfo> getSimpleTableColumn() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,27 @@ void testHiveFormatWithStoredAs() {
checkParquetFile(tableInfo);
}

@Test
void testHiveFormatWithExternalTable() {
String tableName = "test_hive_format_with_external_table";
dropTableIfExists(tableName);
String createTableSql = getCreateSimpleTableString(tableName, true);
sql(createTableSql);
SparkTableInfo tableInfo = getTableInfo(tableName);

SparkTableInfoChecker checker =
SparkTableInfoChecker.create()
.withName(tableName)
.withTableProperties(
ImmutableMap.of(HivePropertiesConstants.SPARK_HIVE_EXTERNAL, "true"));
checker.check(tableInfo);
checkTableReadWrite(tableInfo);

dropTableIfExists(tableName);
Path tableLocation = new Path(tableInfo.getTableLocation());
checkDataFileExists(tableLocation);
}

@Test
void testHiveFormatWithUsing() {
String tableName = "test_hive_format_using_table";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package com.datastrato.gravitino.spark.connector.hive;

import static com.datastrato.gravitino.catalog.hive.HiveTablePropertiesMetadata.TableType.EXTERNAL_TABLE;

import com.datastrato.gravitino.catalog.hive.HiveTablePropertiesMetadata;
import com.datastrato.gravitino.catalog.hive.HiveTablePropertiesMetadata.StorageFormat;
import com.google.common.annotations.VisibleForTesting;
Expand All @@ -27,11 +29,14 @@ public class HivePropertiesConstants {
public static final String GRAVITINO_HIVE_FORMAT_AVRO = StorageFormat.AVRO.toString();
public static final String GRAVITINO_HIVE_FORMAT_JSON = StorageFormat.JSON.toString();
public static final String GRAVITINO_HIVE_FORMAT_CSV = StorageFormat.CSV.toString();
public static final String GRAVITINO_HIVE_EXTERNAL_TABLE = EXTERNAL_TABLE.name();
public static final String GRAVITINO_HIVE_TABLE_TYPE = "table-type";

public static final String SPARK_HIVE_STORED_AS = "hive.stored-as";
public static final String SPARK_HIVE_INPUT_FORMAT = "input-format";
public static final String SPARK_HIVE_OUTPUT_FORMAT = "output-format";
public static final String SPARK_HIVE_SERDE_LIB = "serde-lib";
public static final String SPARK_HIVE_EXTERNAL = "external";

@VisibleForTesting
public static final String TEXT_INPUT_FORMAT_CLASS =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package com.datastrato.gravitino.spark.connector.hive;

import com.datastrato.gravitino.catalog.hive.HiveTablePropertiesMetadata;
import com.datastrato.gravitino.spark.connector.PropertiesConverter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -60,6 +61,10 @@ public Map<String, String> toGravitinoTableProperties(Map<String, String> proper
String provider = gravitinoTableProperties.get(TableCatalog.PROP_PROVIDER);
String storeAs = gravitinoTableProperties.get(HivePropertiesConstants.SPARK_HIVE_STORED_AS);
String fileFormat = Optional.ofNullable(storeAs).orElse(provider);
String isExternal =
Optional.ofNullable(gravitinoTableProperties.get(TableCatalog.PROP_EXTERNAL))
.orElse("false");

if (fileFormat != null) {
String gravitinoFormat = fileFormatMap.get(fileFormat.toLowerCase(Locale.ROOT));
if (gravitinoFormat != null) {
Expand All @@ -70,6 +75,11 @@ public Map<String, String> toGravitinoTableProperties(Map<String, String> proper
}
}

if (isExternal.equalsIgnoreCase("true")) {
gravitinoTableProperties.put(
HivePropertiesConstants.GRAVITINO_HIVE_TABLE_TYPE,
HiveTablePropertiesMetadata.TableType.EXTERNAL_TABLE.name());
}
sparkToGravitinoPropertyMap.forEach(
(sparkProperty, gravitinoProperty) -> {
if (gravitinoTableProperties.containsKey(sparkProperty)) {
Expand All @@ -83,7 +93,15 @@ public Map<String, String> toGravitinoTableProperties(Map<String, String> proper

@Override
public Map<String, String> toSparkTableProperties(Map<String, String> properties) {
return toOptionProperties(properties);
Map<String, String> sparkTableProperties = toOptionProperties(properties);
String hiveTableType =
sparkTableProperties.get(HivePropertiesConstants.GRAVITINO_HIVE_TABLE_TYPE);
if (HivePropertiesConstants.GRAVITINO_HIVE_EXTERNAL_TABLE.equalsIgnoreCase(hiveTableType)) {
sparkTableProperties.remove(HivePropertiesConstants.GRAVITINO_HIVE_TABLE_TYPE);
sparkTableProperties.put(HivePropertiesConstants.SPARK_HIVE_EXTERNAL, "true");
}

return sparkTableProperties;
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,24 @@ void testTableFormat() {
ImmutableMap.of(TableCatalog.OPTION_PREFIX + "a", "a", "b", "b"), hiveProperties);
}

@Test
void testExternalTable() {
Map<String, String> hiveProperties =
hivePropertiesConverter.toGravitinoTableProperties(
ImmutableMap.of(HivePropertiesConstants.SPARK_HIVE_EXTERNAL, "true"));
Assertions.assertEquals(
hiveProperties.get(HivePropertiesConstants.GRAVITINO_HIVE_TABLE_TYPE),
HivePropertiesConstants.GRAVITINO_HIVE_EXTERNAL_TABLE);

hiveProperties =
hivePropertiesConverter.toSparkTableProperties(
ImmutableMap.of(
HivePropertiesConstants.GRAVITINO_HIVE_TABLE_TYPE,
HivePropertiesConstants.GRAVITINO_HIVE_EXTERNAL_TABLE));
Assertions.assertEquals(
ImmutableMap.of(HivePropertiesConstants.SPARK_HIVE_EXTERNAL, "true"), hiveProperties);
}

@Test
void testOptionProperties() {
Map<String, String> properties =
Expand Down

0 comments on commit c21b022

Please sign in to comment.