Skip to content

Commit

Permalink
filter table
Browse files Browse the repository at this point in the history
  • Loading branch information
mchades committed Oct 10, 2024
1 parent 46cb034 commit 84f176d
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 18 deletions.
18 changes: 18 additions & 0 deletions catalogs/catalog-lakehouse-hudi/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ plugins {
id("idea")
}

val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString()
val fullSparkVersion: String = libs.versions.spark34.get()
val sparkVersion = fullSparkVersion.split(".").take(2).joinToString(".")

dependencies {
implementation(project(":api")) {
exclude(group = "*")
Expand Down Expand Up @@ -117,6 +121,20 @@ dependencies {
testImplementation(libs.htrace.core4)
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.woodstox.core)
testImplementation("org.apache.spark:spark-hive_$scalaVersion:$fullSparkVersion") {
exclude("org.apache.hadoop")
exclude("io.dropwizard.metrics")
exclude("com.fasterxml.jackson.core")
exclude("com.fasterxml.jackson.module", "jackson-module-scala_2.12")
}
testImplementation("org.apache.spark:spark-sql_$scalaVersion:$fullSparkVersion") {
exclude("org.apache.avro")
exclude("org.apache.hadoop")
exclude("org.apache.zookeeper")
exclude("io.dropwizard.metrics")
exclude("org.rocksdb")
}

testRuntimeOnly("org.apache.hudi:hudi-spark$sparkVersion-bundle_$scalaVersion:0.15.0")
testRuntimeOnly(libs.junit.jupiter.engine)
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ public class HudiHMSBackendOps implements HudiCatalogBackendOps {
// Mapping from Gravitino config to Hive config
private static final Map<String, String> CONFIG_CONVERTER =
ImmutableMap.of(URI, HiveConf.ConfVars.METASTOREURIS.varname);

private static final String HUDI_PACKAGE_PREFIX = "org.apache.hudi";

@VisibleForTesting CachedClientPool clientPool;

@Override
Expand Down Expand Up @@ -136,10 +139,7 @@ public NameIdentifier[] listTables(Namespace namespace) throws NoSuchSchemaExcep
c -> {
List<String> allTables = c.getAllTables(schemaIdent.name());
return c.getTableObjectsByName(schemaIdent.name(), allTables).stream()
.filter(
t ->
t.getSd().getInputFormat() != null
&& t.getSd().getInputFormat().startsWith("org.apache.hudi"))
.filter(this::checkHudiTable)
.map(t -> NameIdentifier.of(namespace, t.getTableName()))
.toArray(NameIdentifier[]::new);
});
Expand All @@ -164,6 +164,10 @@ public HudiTable loadTable(NameIdentifier tableIdent) throws NoSuchTableExceptio
try {
Table table =
clientPool.run(client -> client.getTable(schemaIdent.name(), tableIdent.name()));
if (!checkHudiTable(table)) {
throw new NoSuchTableException(
"Table %s is not a Hudi table in Hive Metastore", tableIdent.name());
}
return HudiHMSTable.builder().withBackendTable(table).build();

} catch (NoSuchObjectException e) {
Expand Down Expand Up @@ -212,6 +216,15 @@ public void close() {
}
}

private boolean checkHudiTable(Table table) {
// here uses the input format to filter out non-Hudi tables, the COW table
// uses `org.apache.hudi.hadoop.HoodieParquetInputFormat` and MOR table
// uses `org.apache.hudi.hadoop.HoodieParquetRealtimeInputFormat`, to
// simplify the logic, we just check the prefix of the input format
return table.getSd().getInputFormat() != null
&& table.getSd().getInputFormat().startsWith(HUDI_PACKAGE_PREFIX);
}

private HiveConf buildHiveConf(Map<String, String> properties) {
Configuration hadoopConf = new Configuration();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ protected HudiHMSTable buildFromTable(Table hmsTable) {
columns = HiveTableConverter.getColumns(hmsTable, HudiColumn.builder());
partitioning = HiveTableConverter.getPartitioning(hmsTable);

// should be always SortOrders.NONE since Hudi using clustering to sort data (see
// Should always be SortOrders.NONE since Hudi using clustering to sort data (see
// https://hudi.apache.org/docs/next/clustering/)
// but is run as a background table service
sortOrders = HiveTableConverter.getSortOrders(hmsTable);

// should be always Distributions.NONE since Hudi doesn't support distribution
// Should always be Distributions.NONE since Hudi doesn't support distribution
distribution = HiveTableConverter.getDistribution(hmsTable);
auditInfo = HiveTableConverter.getAuditInfo(hmsTable);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import java.util.Map;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.catalog.lakehouse.hudi.HudiColumn;
import org.apache.gravitino.catalog.lakehouse.hudi.HudiSchema;
import org.apache.gravitino.catalog.lakehouse.hudi.HudiTable;
import org.apache.gravitino.exceptions.NoSuchTableException;
import org.apache.gravitino.hive.hms.MiniHiveMetastoreService;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.types.Types;
Expand All @@ -38,6 +40,7 @@
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.spark.sql.SparkSession;
import org.apache.thrift.TException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
Expand All @@ -49,22 +52,44 @@ public class TestHudiHMSBackendOps extends MiniHiveMetastoreService {
private static final HudiHMSBackendOps ops = new HudiHMSBackendOps();
private static final String METALAKE_NAME = "metalake";
private static final String CATALOG_NAME = "catalog";
private static final String TABLE_NAME = "hudi_table";
private static final String HIVE_TABLE_NAME = "hive_table";
private static final String HUDI_TABLE_NAME = "hudi_table";

@BeforeAll
public static void prepare() throws TException {
Map<String, String> props = Maps.newHashMap();
props.put(URI, hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname));
ops.initialize(props);

// create a hive table
Table table = new Table();
table.setDbName(DB_NAME);
table.setTableName(TABLE_NAME);
table.setTableName(HIVE_TABLE_NAME);
StorageDescriptor strgDesc = new StorageDescriptor();
strgDesc.setCols(Lists.newArrayList(new FieldSchema("col1", "string", "description")));
strgDesc.setSerdeInfo(new SerDeInfo());
table.setSd(strgDesc);
metastoreClient.createTable(table);

// use Spark to create a hudi table
SparkSession sparkSession =
SparkSession.builder()
.master("local[1]")
.appName("Hudi Catalog integration test")
.config("hive.metastore.uris", hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname))
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.hudi.catalog.HoodieCatalog")
.config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar")
.config("dfs.replication", "1")
.enableHiveSupport()
.getOrCreate();

// create a hudi table
sparkSession.sql(
String.format("CREATE TABLE %s.%s (ts BIGINT) USING HUDI", DB_NAME, HUDI_TABLE_NAME));
}

@AfterAll
Expand Down Expand Up @@ -104,24 +129,56 @@ public void testListTables() {
NameIdentifier[] tables = ops.listTables(namespace);

// all hive tables are filtered out
Assertions.assertEquals(0, tables.length);
Assertions.assertEquals(1, tables.length);
Assertions.assertEquals(HUDI_TABLE_NAME, tables[0].name());
}

@Test
public void testLoadTable() {
Namespace namespace = Namespace.of(METALAKE_NAME, CATALOG_NAME, DB_NAME);
HudiTable hudiTable = ops.loadTable(NameIdentifier.of(namespace, TABLE_NAME));

Assertions.assertEquals(TABLE_NAME, hudiTable.name());
Exception exception =
Assertions.assertThrows(
NoSuchTableException.class,
() -> ops.loadTable(NameIdentifier.of(namespace, HIVE_TABLE_NAME)));
Assertions.assertEquals(
"Table hive_table is not a Hudi table in Hive Metastore", exception.getMessage());

HudiTable hudiTable = ops.loadTable(NameIdentifier.of(namespace, HUDI_TABLE_NAME));
Assertions.assertEquals(HUDI_TABLE_NAME, hudiTable.name());
Assertions.assertNull(hudiTable.comment());
Assertions.assertNotNull(hudiTable.properties().get(LOCATION));

Column[] columns = hudiTable.columns();
Assertions.assertEquals(1, columns.length);

Column column = columns[0];
Assertions.assertEquals("col1", column.name());
Assertions.assertEquals(Types.StringType.get(), column.dataType());
Assertions.assertEquals("description", column.comment());
Assertions.assertEquals(6, columns.length);

Assertions.assertEquals(
HudiColumn.builder()
.withName("_hoodie_commit_time")
.withType(Types.StringType.get())
.build(),
columns[0]);
Assertions.assertEquals(
HudiColumn.builder()
.withName("_hoodie_commit_seqno")
.withType(Types.StringType.get())
.build(),
columns[1]);
Assertions.assertEquals(
HudiColumn.builder()
.withName("_hoodie_record_key")
.withType(Types.StringType.get())
.build(),
columns[2]);
Assertions.assertEquals(
HudiColumn.builder()
.withName("_hoodie_partition_path")
.withType(Types.StringType.get())
.build(),
columns[3]);
Assertions.assertEquals(
HudiColumn.builder().withName("_hoodie_file_name").withType(Types.StringType.get()).build(),
columns[4]);
Assertions.assertEquals(
HudiColumn.builder().withName("ts").withType(Types.LongType.get()).build(), columns[5]);
}
}

0 comments on commit 84f176d

Please sign in to comment.