From 15bac62e5383acc9322f2c671331cf4cb01ab1c4 Mon Sep 17 00:00:00 2001 From: mchades Date: Fri, 13 Sep 2024 23:07:23 +0800 Subject: [PATCH 1/3] implementation of HMSBackend for Hudi catalog --- .../catalog-lakehouse-hudi/build.gradle.kts | 81 +++++++++ .../lakehouse/hudi/HudiCatalogOperations.java | 5 +- .../hudi/HudiCatalogPropertiesMetadata.java | 33 +++- .../catalog/lakehouse/hudi/HudiColumn.java | 61 +++++++ .../catalog/lakehouse/hudi/HudiSchema.java | 5 + .../hudi/HudiSchemaPropertiesMetadata.java | 17 +- .../catalog/lakehouse/hudi/HudiTable.java | 5 + .../hudi/HudiTablePropertiesMetadata.java | 23 ++- .../hudi/backend/HudiCatalogBackend.java | 10 +- .../hudi/backend/hms/HudiHMSBackend.java | 3 +- .../hudi/backend/hms/HudiHMSBackendOps.java | 130 ++++++++++++++- .../hudi/backend/hms/HudiHMSSchema.java | 20 ++- .../hudi/backend/hms/HudiHMSTable.java | 18 +- .../lakehouse/hudi/utils/CatalogUtils.java | 18 +- .../lakehouse/hudi/TestHudiCatalog.java | 62 +++++++ .../hudi/TestHudiCatalogOperations.java | 127 ++++++++++++++ .../lakehouse/hudi/TestHudiSchema.java | 49 ++++++ .../catalog/lakehouse/hudi/TestHudiTable.java | 47 ++++++ .../hudi/backend/hms/TestHudiHMSBackend.java | 37 +++++ .../backend/hms/TestHudiHMSBackendOps.java | 126 ++++++++++++++ .../hudi/ops/InMemoryBackendOps.java | 157 ++++++++++++++++++ .../hudi/utils/TestCatalogUtils.java | 38 +++++ 22 files changed, 1040 insertions(+), 32 deletions(-) create mode 100644 catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiColumn.java create mode 100644 catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/TestHudiCatalog.java create mode 100644 catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/TestHudiCatalogOperations.java create mode 100644 catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/TestHudiSchema.java create mode 100644 catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/TestHudiTable.java create mode 100644 catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/TestHudiHMSBackend.java create mode 100644 catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/TestHudiHMSBackendOps.java create mode 100644 catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/ops/InMemoryBackendOps.java create mode 100644 catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/utils/TestCatalogUtils.java diff --git a/catalogs/catalog-lakehouse-hudi/build.gradle.kts b/catalogs/catalog-lakehouse-hudi/build.gradle.kts index 78ff2f8b12b..c38838c7e85 100644 --- a/catalogs/catalog-lakehouse-hudi/build.gradle.kts +++ b/catalogs/catalog-lakehouse-hudi/build.gradle.kts @@ -28,14 +28,95 @@ dependencies { implementation(project(":api")) { exclude(group = "*") } + implementation(project(":common")) { + exclude(group = "*") + } + implementation(project(":catalogs:hive-metastore-common")) implementation(project(":core")) { exclude(group = "*") } implementation(libs.guava) + implementation(libs.hive2.exec) { + artifact { + classifier = "core" + } + exclude("com.google.code.findbugs", "jsr305") + exclude("com.google.protobuf") + exclude("org.apache.avro") + exclude("org.apache.ant") + exclude("org.apache.calcite") + exclude("org.apache.calcite.avatica") + exclude("org.apache.curator") + exclude("org.apache.derby") + exclude("org.apache.hadoop", "hadoop-yarn-server-resourcemanager") + exclude("org.apache.hive", "hive-llap-tez") + exclude("org.apache.hive", "hive-vector-code-gen") + exclude("org.apache.ivy") + exclude("org.apache.logging.log4j") + exclude("org.apache.zookeeper") + exclude("org.codehaus.groovy", "groovy-all") + exclude("org.datanucleus", "datanucleus-core") + exclude("org.eclipse.jetty.aggregate", "jetty-all") + exclude("org.eclipse.jetty.orbit", "javax.servlet") + exclude("org.openjdk.jol") + exclude("org.pentaho") + exclude("org.slf4j") + } implementation(libs.hive2.metastore) { + exclude("ant") + exclude("co.cask.tephra") + exclude("com.github.joshelser") + exclude("com.google.code.findbugs", "jsr305") + exclude("com.google.code.findbugs", "sr305") + exclude("com.tdunning", "json") + exclude("com.zaxxer", "HikariCP") + exclude("io.dropwizard.metricss") + exclude("javax.transaction", "transaction-api") + exclude("org.apache.ant") + exclude("org.apache.avro") + exclude("org.apache.curator") + exclude("org.apache.derby") + exclude("org.apache.hadoop", "hadoop-yarn-server-resourcemanager") + exclude("org.apache.hbase") + exclude("org.apache.logging.log4j") + exclude("org.apache.parquet", "parquet-hadoop-bundle") + exclude("org.apache.zookeeper") + exclude("org.datanucleus") + exclude("org.eclipse.jetty.aggregate", "jetty-all") + exclude("org.eclipse.jetty.orbit", "javax.servlet") + exclude("org.openjdk.jol") + exclude("org.slf4j") + } + implementation(libs.hadoop2.common) { exclude("*") } implementation(libs.slf4j.api) implementation(libs.thrift) + + compileOnly(libs.lombok) + + annotationProcessor(libs.lombok) + + testImplementation(project(":catalogs:hive-metastore-common", "testArtifacts")) + + testImplementation(libs.bundles.log4j) + testImplementation(libs.commons.collections3) + testImplementation(libs.commons.configuration1) + testImplementation(libs.datanucleus.core) + testImplementation(libs.datanucleus.api.jdo) + testImplementation(libs.datanucleus.rdbms) + testImplementation(libs.datanucleus.jdo) + testImplementation(libs.derby) + testImplementation(libs.hadoop2.auth) { + exclude("*") + } + testImplementation(libs.hadoop2.mapreduce.client.core) { + exclude("*") + } + testImplementation(libs.htrace.core4) + testImplementation(libs.junit.jupiter.api) + testImplementation(libs.woodstox.core) + + testRuntimeOnly(libs.junit.jupiter.engine) } diff --git a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiCatalogOperations.java b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiCatalogOperations.java index d242cfeef54..c2b68d11d7e 100644 --- a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiCatalogOperations.java +++ b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiCatalogOperations.java @@ -18,6 +18,7 @@ */ package org.apache.gravitino.catalog.lakehouse.hudi; +import com.google.common.annotations.VisibleForTesting; import java.util.Map; import org.apache.gravitino.Catalog; import org.apache.gravitino.NameIdentifier; @@ -54,7 +55,7 @@ public class HudiCatalogOperations implements CatalogOperations, SupportsSchemas private static final Logger LOG = LoggerFactory.getLogger(HudiCatalogOperations.class); - private HudiCatalogBackendOps hudiCatalogBackendOps; + @VisibleForTesting HudiCatalogBackendOps hudiCatalogBackendOps; /** * Load the Hudi Catalog Backend and initialize the Hudi Catalog Operations. @@ -69,7 +70,7 @@ public void initialize( Map config, CatalogInfo info, HasPropertyMetadata propertiesMetadata) throws RuntimeException { HudiCatalogBackend hudiCatalogBackend = CatalogUtils.loadHudiCatalogBackend(config); - hudiCatalogBackendOps = hudiCatalogBackend.catalogOps(); + hudiCatalogBackendOps = hudiCatalogBackend.backendOps(); } /** diff --git a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiCatalogPropertiesMetadata.java b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiCatalogPropertiesMetadata.java index 8325dcc0d6b..1f5142a2865 100644 --- a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiCatalogPropertiesMetadata.java +++ b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiCatalogPropertiesMetadata.java @@ -18,14 +18,43 @@ */ package org.apache.gravitino.catalog.lakehouse.hudi; -import java.util.Collections; +import static org.apache.gravitino.connector.PropertyEntry.enumImmutablePropertyEntry; +import static org.apache.gravitino.connector.PropertyEntry.stringRequiredPropertyEntry; + +import com.google.common.collect.ImmutableMap; import java.util.Map; +import org.apache.gravitino.catalog.lakehouse.hudi.backend.BackendType; import org.apache.gravitino.connector.BaseCatalogPropertiesMetadata; import org.apache.gravitino.connector.PropertyEntry; +import org.apache.gravitino.hive.ClientPropertiesMetadata; public class HudiCatalogPropertiesMetadata extends BaseCatalogPropertiesMetadata { + public static final String CATALOG_BACKEND = "catalog-backend"; + public static final String URI = "uri"; + private static final ClientPropertiesMetadata CLIENT_PROPERTIES_METADATA = + new ClientPropertiesMetadata(); + + private static final Map> PROPERTIES_METADATA = + ImmutableMap.>builder() + .put( + CATALOG_BACKEND, + enumImmutablePropertyEntry( + CATALOG_BACKEND, + "Hudi catalog type choose properties", + true /* required */, + BackendType.class, + null /* defaultValue */, + false /* hidden */, + false /* reserved */)) + .put( + URI, + stringRequiredPropertyEntry( + URI, "Hudi catalog uri config", false /* immutable */, false /* hidden */)) + .putAll(CLIENT_PROPERTIES_METADATA.propertyEntries()) + .build(); + @Override protected Map> specificPropertyEntries() { - return Collections.emptyMap(); + return PROPERTIES_METADATA; } } diff --git a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiColumn.java b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiColumn.java new file mode 100644 index 00000000000..540561c4d31 --- /dev/null +++ b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiColumn.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.lakehouse.hudi; + +import lombok.EqualsAndHashCode; +import org.apache.gravitino.connector.BaseColumn; + +/** A class representing a column in a Hudi table. */ +@EqualsAndHashCode(callSuper = true) +public class HudiColumn extends BaseColumn { + /** + * Creates a new instance of {@link Builder}. + * + * @return The new instance. + */ + public static Builder builder() { + return new Builder(); + } + + private HudiColumn() {} + + /** A builder class for constructing HudiColumn instances. */ + public static class Builder extends BaseColumnBuilder { + + /** Creates a new instance of {@link Builder}. */ + private Builder() {} + + /** + * Internal method to build a HudiColumn instance using the provided values. + * + * @return A new HudiColumn instance with the configured values. + */ + @Override + protected HudiColumn internalBuild() { + HudiColumn hudiColumn = new HudiColumn(); + + hudiColumn.name = name; + hudiColumn.comment = comment; + hudiColumn.dataType = dataType; + hudiColumn.nullable = nullable; + hudiColumn.defaultValue = defaultValue == null ? DEFAULT_VALUE_NOT_SET : defaultValue; + return hudiColumn; + } + } +} diff --git a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiSchema.java b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiSchema.java index 391ab35799f..ebfddc9e211 100644 --- a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiSchema.java +++ b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiSchema.java @@ -81,5 +81,10 @@ protected HudiSchema internalBuild() { * @return the HudiSchema */ protected abstract HudiSchema buildFromSchema(T schema); + + @Override + public HudiSchema build() { + return internalBuild(); + } } } diff --git a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiSchemaPropertiesMetadata.java b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiSchemaPropertiesMetadata.java index c51fad43a52..74be8230544 100644 --- a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiSchemaPropertiesMetadata.java +++ b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiSchemaPropertiesMetadata.java @@ -18,14 +18,27 @@ */ package org.apache.gravitino.catalog.lakehouse.hudi; -import java.util.Collections; +import com.google.common.collect.ImmutableMap; import java.util.Map; import org.apache.gravitino.connector.BasePropertiesMetadata; import org.apache.gravitino.connector.PropertyEntry; public class HudiSchemaPropertiesMetadata extends BasePropertiesMetadata { + public static final String LOCATION = "location"; + private static final Map> PROPERTIES_METADATA = + ImmutableMap.>builder() + .put( + LOCATION, + PropertyEntry.stringOptionalPropertyEntry( + LOCATION, + "The directory for Hudi dataset storage", + false /* immutable */, + null /* default value */, + false /* hidden */)) + .build(); + @Override protected Map> specificPropertyEntries() { - return Collections.emptyMap(); + return PROPERTIES_METADATA; } } diff --git a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiTable.java b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiTable.java index 9f28a516a15..5deb1e67f33 100644 --- a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiTable.java +++ b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiTable.java @@ -68,5 +68,10 @@ protected HudiTable internalBuild() { * @return the HudiTable */ protected abstract HudiTable buildFromTable(T backendTable); + + @Override + public HudiTable build() { + return internalBuild(); + } } } diff --git a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiTablePropertiesMetadata.java b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiTablePropertiesMetadata.java index 3b87867c0f4..e459a8abd02 100644 --- a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiTablePropertiesMetadata.java +++ b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiTablePropertiesMetadata.java @@ -18,14 +18,33 @@ */ package org.apache.gravitino.catalog.lakehouse.hudi; -import java.util.Collections; +import static org.apache.gravitino.connector.PropertyEntry.stringImmutablePropertyEntry; +import static org.apache.gravitino.connector.PropertyEntry.stringReservedPropertyEntry; + +import com.google.common.collect.ImmutableMap; import java.util.Map; import org.apache.gravitino.connector.BasePropertiesMetadata; import org.apache.gravitino.connector.PropertyEntry; public class HudiTablePropertiesMetadata extends BasePropertiesMetadata { + public static final String COMMENT = "comment"; + public static final String LOCATION = "location"; + private static final Map> PROPERTIES_METADATA = + ImmutableMap.>builder() + .put(COMMENT, stringReservedPropertyEntry(COMMENT, "table comment", true /* hidden */)) + .put( + LOCATION, + stringImmutablePropertyEntry( + LOCATION, + "The location for Hudi table", + false /* required */, + null /* default value */, + false /* hidden */, + false /* reserved */)) + .build(); + @Override protected Map> specificPropertyEntries() { - return Collections.emptyMap(); + return PROPERTIES_METADATA; } } diff --git a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/HudiCatalogBackend.java b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/HudiCatalogBackend.java index e257e60191b..eb9fdf9c49a 100644 --- a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/HudiCatalogBackend.java +++ b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/HudiCatalogBackend.java @@ -26,20 +26,20 @@ public abstract class HudiCatalogBackend { private final BackendType backendType; - private final HudiCatalogBackendOps catalogOps; + private final HudiCatalogBackendOps backendOps; public abstract void initialize(Map properties); - protected HudiCatalogBackend(BackendType backendType, HudiCatalogBackendOps catalogOps) { + protected HudiCatalogBackend(BackendType backendType, HudiCatalogBackendOps backendOps) { this.backendType = backendType; - this.catalogOps = catalogOps; + this.backendOps = backendOps; } public BackendType type() { return backendType; } - public HudiCatalogBackendOps catalogOps() { - return catalogOps; + public HudiCatalogBackendOps backendOps() { + return backendOps; } } diff --git a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackend.java b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackend.java index dfc6228f24f..1726d22d712 100644 --- a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackend.java +++ b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackend.java @@ -37,7 +37,6 @@ private HudiHMSBackend(BackendType backendType, HudiCatalogBackendOps catalogOps @Override public void initialize(Map properties) { - // todo: initialize the catalogOps - catalogOps().initialize(properties); + backendOps().initialize(properties); } } diff --git a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackendOps.java b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackendOps.java index 3f21148b111..869b4fcd875 100644 --- a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackendOps.java +++ b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackendOps.java @@ -18,10 +18,18 @@ */ package org.apache.gravitino.catalog.lakehouse.hudi.backend.hms; +import static org.apache.gravitino.catalog.lakehouse.hudi.HudiCatalogPropertiesMetadata.URI; +import static org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import java.util.Map; import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.Namespace; import org.apache.gravitino.SchemaChange; +import org.apache.gravitino.catalog.lakehouse.hudi.HudiSchema; +import org.apache.gravitino.catalog.lakehouse.hudi.HudiTable; import org.apache.gravitino.catalog.lakehouse.hudi.ops.HudiCatalogBackendOps; import org.apache.gravitino.exceptions.NoSuchCatalogException; import org.apache.gravitino.exceptions.NoSuchSchemaException; @@ -29,28 +37,72 @@ import org.apache.gravitino.exceptions.NonEmptySchemaException; import org.apache.gravitino.exceptions.SchemaAlreadyExistsException; import org.apache.gravitino.exceptions.TableAlreadyExistsException; +import org.apache.gravitino.hive.CachedClientPool; import org.apache.gravitino.rel.Column; import org.apache.gravitino.rel.TableChange; import org.apache.gravitino.rel.expressions.distributions.Distribution; import org.apache.gravitino.rel.expressions.sorts.SortOrder; import org.apache.gravitino.rel.expressions.transforms.Transform; import org.apache.gravitino.rel.indexes.Index; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.UnknownDBException; +import org.apache.thrift.TException; public class HudiHMSBackendOps implements HudiCatalogBackendOps { + // Mapping from Gravitino config to Hive config + private static final Map CONFIG_CONVERTER = + ImmutableMap.of(URI, HiveConf.ConfVars.METASTOREURIS.varname); + @VisibleForTesting CachedClientPool clientPool; + @Override public void initialize(Map properties) { - // todo: initialize the catalogOps + this.clientPool = new CachedClientPool(buildHiveConf(properties), properties); } @Override - public HudiHMSSchema loadSchema(NameIdentifier schemaIdent) throws NoSuchSchemaException { - throw new UnsupportedOperationException("Not implemented yet"); + public HudiSchema loadSchema(NameIdentifier schemaIdent) throws NoSuchSchemaException { + try { + Database database = clientPool.run(client -> client.getDatabase(schemaIdent.name())); + return HudiHMSSchema.builder().withBackendSchema(database).build(); + + } catch (NoSuchObjectException | UnknownDBException e) { + throw new NoSuchSchemaException( + e, "Hudi schema (database) does not exist: %s in Hive Metastore", schemaIdent.name()); + + } catch (TException e) { + throw new RuntimeException( + "Failed to load Hudi schema (database) " + schemaIdent.name() + " from Hive Metastore", + e); + + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } @Override public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogException { - throw new UnsupportedOperationException("Not implemented yet"); + try { + return clientPool.run( + c -> + c.getAllDatabases().stream() + .map(db -> NameIdentifier.of(namespace, db)) + .toArray(NameIdentifier[]::new)); + + } catch (TException e) { + throw new RuntimeException( + "Failed to list all schemas (database) under namespace : " + + namespace + + " in Hive Metastore", + e); + + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } @Override @@ -73,12 +125,51 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty @Override public NameIdentifier[] listTables(Namespace namespace) throws NoSuchSchemaException { - throw new UnsupportedOperationException("Not implemented yet"); + NameIdentifier schemaIdent = NameIdentifier.of(namespace.levels()); + if (!schemaExists(schemaIdent)) { + throw new NoSuchSchemaException("Schema (database) does not exist %s", namespace); + } + + try { + return clientPool.run( + c -> + c.getTables(schemaIdent.name(), "*").stream() + .map(table -> NameIdentifier.of(namespace, table)) + .toArray(NameIdentifier[]::new)); + + } catch (UnknownDBException e) { + throw new NoSuchSchemaException( + "Schema (database) does not exist %s in Hive Metastore", namespace); + + } catch (TException e) { + throw new RuntimeException( + "Failed to list all tables under the namespace : " + namespace + " in Hive Metastore", e); + + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } @Override - public HudiHMSTable loadTable(NameIdentifier ident) throws NoSuchTableException { - throw new UnsupportedOperationException("Not implemented yet"); + public HudiTable loadTable(NameIdentifier tableIdent) throws NoSuchTableException { + NameIdentifier schemaIdent = NameIdentifier.of(tableIdent.namespace().levels()); + + try { + Table table = + clientPool.run(client -> client.getTable(schemaIdent.name(), tableIdent.name())); + return HudiHMSTable.builder().withBackendTable(table).build(); + + } catch (NoSuchObjectException e) { + throw new NoSuchTableException( + e, "Hudi table does not exist: %s in Hive Metastore", tableIdent.name()); + + } catch (TException e) { + throw new RuntimeException( + "Failed to load Hudi table " + tableIdent.name() + " from Hive metastore", e); + + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } @Override @@ -108,6 +199,29 @@ public boolean dropTable(NameIdentifier ident) { @Override public void close() { - // todo: close the HMS connection + if (clientPool != null) { + clientPool.close(); + clientPool = null; + } + } + + private HiveConf buildHiveConf(Map properties) { + Configuration hadoopConf = new Configuration(); + + Map byPassConfigs = Maps.newHashMap(); + Map convertedConfigs = Maps.newHashMap(); + properties.forEach( + (key, value) -> { + if (key.startsWith(CATALOG_BYPASS_PREFIX)) { + byPassConfigs.put(key.substring(CATALOG_BYPASS_PREFIX.length()), value); + } else if (CONFIG_CONVERTER.containsKey(key)) { + convertedConfigs.put(CONFIG_CONVERTER.get(key), value); + } + }); + byPassConfigs.forEach(hadoopConf::set); + // Gravitino conf has higher priority than bypass conf + convertedConfigs.forEach(hadoopConf::set); + + return new HiveConf(hadoopConf, HudiHMSBackendOps.class); } } diff --git a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSSchema.java b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSSchema.java index 636d76ba2c2..734261f5e6c 100644 --- a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSSchema.java +++ b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSSchema.java @@ -18,7 +18,12 @@ */ package org.apache.gravitino.catalog.lakehouse.hudi.backend.hms; +import static org.apache.gravitino.catalog.lakehouse.hudi.HudiSchemaPropertiesMetadata.LOCATION; + +import com.google.common.collect.Maps; +import java.util.Optional; import org.apache.gravitino.catalog.lakehouse.hudi.HudiSchema; +import org.apache.gravitino.meta.AuditInfo; import org.apache.hadoop.hive.metastore.api.Database; public class HudiHMSSchema extends HudiSchema { @@ -39,7 +44,7 @@ public Database fromHudiSchema() { public static class Builder extends HudiSchema.Builder { @Override - protected HudiSchema simpleBuild() { + protected HudiHMSSchema simpleBuild() { HudiHMSSchema schema = new HudiHMSSchema(); schema.name = name; schema.comment = comment; @@ -49,8 +54,17 @@ protected HudiSchema simpleBuild() { } @Override - protected HudiSchema buildFromSchema(Database schema) { - // todo: convert HMS database to HudiSchema + protected HudiHMSSchema buildFromSchema(Database database) { + name = database.getName(); + comment = database.getDescription(); + + properties = Maps.newHashMap(database.getParameters()); + properties.put(LOCATION, database.getLocationUri()); + + AuditInfo.Builder auditInfoBuilder = AuditInfo.builder(); + Optional.ofNullable(database.getOwnerName()).ifPresent(auditInfoBuilder::withCreator); + auditInfo = auditInfoBuilder.build(); + return simpleBuild(); } } diff --git a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSTable.java b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSTable.java index 4bd65b54dad..50223c168ea 100644 --- a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSTable.java +++ b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSTable.java @@ -18,7 +18,12 @@ */ package org.apache.gravitino.catalog.lakehouse.hudi.backend.hms; +import static org.apache.gravitino.catalog.lakehouse.hudi.HudiTablePropertiesMetadata.COMMENT; +import static org.apache.gravitino.catalog.lakehouse.hudi.HudiTablePropertiesMetadata.LOCATION; + +import org.apache.gravitino.catalog.lakehouse.hudi.HudiColumn; import org.apache.gravitino.catalog.lakehouse.hudi.HudiTable; +import org.apache.gravitino.hive.converter.HiveTableConverter; import org.apache.hadoop.hive.metastore.api.Table; public class HudiHMSTable extends HudiTable { @@ -44,6 +49,7 @@ protected HudiHMSTable simpleBuild() { table.columns = columns; table.indexes = indexes; table.partitioning = partitioning; + table.sortOrders = sortOrders; table.distribution = distribution; table.properties = properties; table.auditInfo = auditInfo; @@ -51,8 +57,16 @@ protected HudiHMSTable simpleBuild() { } @Override - protected HudiTable buildFromTable(Table backendTable) { - // todo: convert HMS table to HudiTable + protected HudiHMSTable buildFromTable(Table hmsTable) { + name = hmsTable.getTableName(); + comment = hmsTable.getParameters().get(COMMENT); + columns = HiveTableConverter.getColumns(hmsTable, HudiColumn.builder()); + partitioning = HiveTableConverter.getPartitioning(hmsTable); + sortOrders = HiveTableConverter.getSortOrders(hmsTable); + distribution = HiveTableConverter.getDistribution(hmsTable); + auditInfo = HiveTableConverter.getAuditInfo(hmsTable); + properties = hmsTable.getParameters(); + properties.put(LOCATION, hmsTable.getSd().getLocation()); return simpleBuild(); } } diff --git a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/utils/CatalogUtils.java b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/utils/CatalogUtils.java index 853d71159d1..c629ee55a83 100644 --- a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/utils/CatalogUtils.java +++ b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/utils/CatalogUtils.java @@ -18,7 +18,11 @@ */ package org.apache.gravitino.catalog.lakehouse.hudi.utils; +import static org.apache.gravitino.catalog.lakehouse.hudi.HudiCatalogPropertiesMetadata.CATALOG_BACKEND; + +import java.util.Locale; import java.util.Map; +import org.apache.gravitino.catalog.lakehouse.hudi.backend.BackendType; import org.apache.gravitino.catalog.lakehouse.hudi.backend.HudiCatalogBackend; import org.apache.gravitino.catalog.lakehouse.hudi.backend.hms.HudiHMSBackend; @@ -26,9 +30,15 @@ public class CatalogUtils { private CatalogUtils() {} public static HudiCatalogBackend loadHudiCatalogBackend(Map properties) { - // todo: load and initialize the backend based on the properties - HudiCatalogBackend hudiHMSBackend = new HudiHMSBackend(); - hudiHMSBackend.initialize(properties); - return hudiHMSBackend; + BackendType backendType = + BackendType.valueOf(properties.get(CATALOG_BACKEND).toUpperCase(Locale.ROOT)); + switch (backendType) { + case HMS: + HudiCatalogBackend hudiHMSBackend = new HudiHMSBackend(); + hudiHMSBackend.initialize(properties); + return hudiHMSBackend; + default: + throw new UnsupportedOperationException("Unsupported backend type: " + backendType); + } } } diff --git a/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/TestHudiCatalog.java b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/TestHudiCatalog.java new file mode 100644 index 00000000000..d49172fbc01 --- /dev/null +++ b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/TestHudiCatalog.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.lakehouse.hudi; + +import static org.apache.gravitino.catalog.lakehouse.hudi.HudiCatalogPropertiesMetadata.CATALOG_BACKEND; + +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.time.Instant; +import java.util.Map; +import org.apache.gravitino.Namespace; +import org.apache.gravitino.connector.CatalogOperations; +import org.apache.gravitino.meta.AuditInfo; +import org.apache.gravitino.meta.CatalogEntity; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestHudiCatalog { + @Test + public void testOps() throws IOException { + try (HudiCatalog catalog = new HudiCatalog()) { + IllegalArgumentException exception = + Assertions.assertThrows(IllegalArgumentException.class, catalog::ops); + Assertions.assertEquals( + "entity and conf must be set before calling ops()", exception.getMessage()); + } + + AuditInfo auditInfo = + AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build(); + CatalogEntity entity = + CatalogEntity.builder() + .withId(1L) + .withName("catalog") + .withNamespace(Namespace.of("metalake")) + .withType(HudiCatalog.Type.RELATIONAL) + .withProvider("lakehouse-hudi") + .withAuditInfo(auditInfo) + .build(); + + Map conf = ImmutableMap.of(CATALOG_BACKEND, "hms"); + try (HudiCatalog catalog = new HudiCatalog().withCatalogConf(conf).withCatalogEntity(entity)) { + CatalogOperations ops = catalog.ops(); + Assertions.assertInstanceOf(HudiCatalogOperations.class, ops); + } + } +} diff --git a/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/TestHudiCatalogOperations.java b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/TestHudiCatalogOperations.java new file mode 100644 index 00000000000..16595da6aab --- /dev/null +++ b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/TestHudiCatalogOperations.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.lakehouse.hudi; + +import static org.apache.gravitino.catalog.lakehouse.hudi.HudiCatalogPropertiesMetadata.CATALOG_BACKEND; + +import com.google.common.collect.ImmutableMap; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.catalog.lakehouse.hudi.backend.hms.HudiHMSBackendOps; +import org.apache.gravitino.catalog.lakehouse.hudi.ops.InMemoryBackendOps; +import org.apache.gravitino.connector.HasPropertyMetadata; +import org.apache.gravitino.connector.PropertiesMetadata; +import org.apache.gravitino.exceptions.NoSuchSchemaException; +import org.apache.gravitino.exceptions.NoSuchTableException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestHudiCatalogOperations { + + private static final HasPropertyMetadata HUDI_PROPERTIES_METADATA = + new HasPropertyMetadata() { + @Override + public PropertiesMetadata tablePropertiesMetadata() throws UnsupportedOperationException { + return HudiCatalog.TABLE_PROPERTIES_METADATA; + } + + @Override + public PropertiesMetadata catalogPropertiesMetadata() throws UnsupportedOperationException { + return HudiCatalog.CATALOG_PROPERTIES_METADATA; + } + + @Override + public PropertiesMetadata schemaPropertiesMetadata() throws UnsupportedOperationException { + return HudiCatalog.SCHEMA_PROPERTIES_METADATA; + } + + @Override + public PropertiesMetadata filesetPropertiesMetadata() throws UnsupportedOperationException { + throw new UnsupportedOperationException(); + } + + @Override + public PropertiesMetadata topicPropertiesMetadata() throws UnsupportedOperationException { + throw new UnsupportedOperationException(); + } + }; + + @Test + public void testInitialize() { + HudiCatalogOperations ops = new HudiCatalogOperations(); + ops.initialize(ImmutableMap.of(CATALOG_BACKEND, "hms"), null, HUDI_PROPERTIES_METADATA); + Assertions.assertInstanceOf(HudiHMSBackendOps.class, ops.hudiCatalogBackendOps); + ops.close(); + } + + @Test + public void testTestConnection() throws Exception { + try (HudiCatalogOperations ops = new HudiCatalogOperations(); + InMemoryBackendOps inMemoryBackendOps = new InMemoryBackendOps()) { + ops.hudiCatalogBackendOps = inMemoryBackendOps; + + Assertions.assertDoesNotThrow(() -> ops.testConnection(null, null, null, null, null)); + } + } + + @Test + public void testListSchemas() throws Exception { + try (HudiCatalogOperations ops = new HudiCatalogOperations(); + InMemoryBackendOps inMemoryBackendOps = new InMemoryBackendOps()) { + ops.hudiCatalogBackendOps = inMemoryBackendOps; + + NameIdentifier[] schemas = ops.listSchemas(null); + Assertions.assertEquals(0, schemas.length); + } + } + + @Test + public void testLoadSchema() throws Exception { + try (HudiCatalogOperations ops = new HudiCatalogOperations(); + InMemoryBackendOps inMemoryBackendOps = new InMemoryBackendOps()) { + ops.hudiCatalogBackendOps = inMemoryBackendOps; + + Assertions.assertThrows( + NoSuchSchemaException.class, + () -> ops.loadSchema(NameIdentifier.of("metalake", "catalog", "schema"))); + } + } + + @Test + public void testListTables() throws Exception { + try (HudiCatalogOperations ops = new HudiCatalogOperations(); + InMemoryBackendOps inMemoryBackendOps = new InMemoryBackendOps()) { + ops.hudiCatalogBackendOps = inMemoryBackendOps; + + NameIdentifier[] tables = ops.listTables(null); + Assertions.assertEquals(0, tables.length); + } + } + + @Test + public void testLoadTable() throws Exception { + try (HudiCatalogOperations ops = new HudiCatalogOperations(); + InMemoryBackendOps inMemoryBackendOps = new InMemoryBackendOps()) { + ops.hudiCatalogBackendOps = inMemoryBackendOps; + + Assertions.assertThrows( + NoSuchTableException.class, + () -> ops.loadTable(NameIdentifier.of("metalake", "catalog", "table"))); + } + } +} diff --git a/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/TestHudiSchema.java b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/TestHudiSchema.java new file mode 100644 index 00000000000..95d5dbdfa13 --- /dev/null +++ b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/TestHudiSchema.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.lakehouse.hudi; + +public class TestHudiSchema extends HudiSchema { + + public static Builder builder() { + return new Builder(); + } + + @Override + public TestHudiSchema fromHudiSchema() { + return this; + } + + public static class Builder extends HudiSchema.Builder { + + @Override + protected TestHudiSchema simpleBuild() { + TestHudiSchema schema = new TestHudiSchema(); + schema.name = name; + schema.comment = comment; + schema.properties = properties; + schema.auditInfo = auditInfo; + return schema; + } + + @Override + protected HudiSchema buildFromSchema(TestHudiSchema schema) { + return simpleBuild(); + } + } +} diff --git a/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/TestHudiTable.java b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/TestHudiTable.java new file mode 100644 index 00000000000..9914bff9dac --- /dev/null +++ b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/TestHudiTable.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.lakehouse.hudi; + +public class TestHudiTable extends HudiTable { + public static Builder builder() { + return new Builder(); + } + + @Override + public TestHudiTable fromHudiTable() { + return this; + } + + public static class Builder extends HudiTable.Builder { + @Override + protected TestHudiTable simpleBuild() { + TestHudiTable table = new TestHudiTable(); + table.name = name; + table.comment = comment; + table.properties = properties; + table.auditInfo = auditInfo; + return table; + } + + @Override + protected HudiTable buildFromTable(TestHudiTable table) { + return simpleBuild(); + } + } +} diff --git a/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/TestHudiHMSBackend.java b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/TestHudiHMSBackend.java new file mode 100644 index 00000000000..a921fd8c287 --- /dev/null +++ b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/TestHudiHMSBackend.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.lakehouse.hudi.backend.hms; + +import static org.apache.gravitino.catalog.lakehouse.hudi.backend.BackendType.HMS; + +import com.google.common.collect.ImmutableMap; +import org.apache.gravitino.catalog.lakehouse.hudi.backend.HudiCatalogBackend; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestHudiHMSBackend { + @Test + public void testInitialize() { + HudiCatalogBackend backend = new HudiHMSBackend(); + backend.initialize(ImmutableMap.of()); + + Assertions.assertEquals(HMS, backend.type()); + Assertions.assertInstanceOf(HudiHMSBackendOps.class, backend.backendOps()); + } +} diff --git a/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/TestHudiHMSBackendOps.java b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/TestHudiHMSBackendOps.java new file mode 100644 index 00000000000..28b954e9f7f --- /dev/null +++ b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/TestHudiHMSBackendOps.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.lakehouse.hudi.backend.hms; + +import static org.apache.gravitino.catalog.lakehouse.hudi.HudiCatalogPropertiesMetadata.URI; +import static org.apache.gravitino.catalog.lakehouse.hudi.HudiSchemaPropertiesMetadata.LOCATION; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import java.util.Arrays; +import java.util.Map; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.Namespace; +import org.apache.gravitino.catalog.lakehouse.hudi.HudiSchema; +import org.apache.gravitino.catalog.lakehouse.hudi.HudiTable; +import org.apache.gravitino.hive.hms.MiniHiveMetastoreService; +import org.apache.gravitino.rel.Column; +import org.apache.gravitino.rel.types.Types; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.thrift.TException; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class TestHudiHMSBackendOps extends MiniHiveMetastoreService { + + private static final HudiHMSBackendOps ops = new HudiHMSBackendOps(); + private static final String METALAKE_NAME = "metalake"; + private static final String CATALOG_NAME = "catalog"; + private static final String TABLE_NAME = "hudi_table"; + + @BeforeAll + public static void prepare() throws TException { + Map props = Maps.newHashMap(); + props.put(URI, hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname)); + ops.initialize(props); + + Table table = new Table(); + table.setDbName(DB_NAME); + table.setTableName(TABLE_NAME); + StorageDescriptor strgDesc = new StorageDescriptor(); + strgDesc.setCols(Lists.newArrayList(new FieldSchema("col1", "string", "description"))); + strgDesc.setSerdeInfo(new SerDeInfo()); + table.setSd(strgDesc); + metastoreClient.createTable(table); + } + + @AfterAll + public static void cleanup() throws TException { + ops.close(); + } + + @Test + public void testInitialize() { + try (HudiHMSBackendOps ops = new HudiHMSBackendOps()) { + ops.initialize(ImmutableMap.of()); + Assertions.assertNotNull(ops.clientPool); + } + } + + @Test + public void testLoadSchema() { + HudiSchema hudiSchema = ops.loadSchema(NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, DB_NAME)); + + Assertions.assertEquals(DB_NAME, hudiSchema.name()); + Assertions.assertEquals("description", hudiSchema.comment()); + Assertions.assertNotNull(hudiSchema.properties().get(LOCATION)); + } + + @Test + public void testListSchemas() { + Namespace namespace = Namespace.of(METALAKE_NAME, CATALOG_NAME); + NameIdentifier[] schemas = ops.listSchemas(namespace); + + Assertions.assertTrue(schemas.length > 0); + Assertions.assertTrue(Arrays.stream(schemas).anyMatch(schema -> schema.name().equals(DB_NAME))); + } + + @Test + public void testListTables() { + Namespace namespace = Namespace.of(METALAKE_NAME, CATALOG_NAME, DB_NAME); + NameIdentifier[] tables = ops.listTables(namespace); + + Assertions.assertTrue(Arrays.stream(tables).anyMatch(table -> table.name().equals(TABLE_NAME))); + } + + @Test + public void testLoadTable() { + Namespace namespace = Namespace.of(METALAKE_NAME, CATALOG_NAME, DB_NAME); + HudiTable hudiTable = ops.loadTable(NameIdentifier.of(namespace, TABLE_NAME)); + + Assertions.assertEquals(TABLE_NAME, hudiTable.name()); + Assertions.assertNull(hudiTable.comment()); + Assertions.assertNotNull(hudiTable.properties().get(LOCATION)); + + Column[] columns = hudiTable.columns(); + Assertions.assertEquals(1, columns.length); + + Column column = columns[0]; + Assertions.assertEquals("col1", column.name()); + Assertions.assertEquals(Types.StringType.get(), column.dataType()); + Assertions.assertEquals("description", column.comment()); + } +} diff --git a/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/ops/InMemoryBackendOps.java b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/ops/InMemoryBackendOps.java new file mode 100644 index 00000000000..eeb0ac295e4 --- /dev/null +++ b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/ops/InMemoryBackendOps.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.lakehouse.hudi.ops; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.Namespace; +import org.apache.gravitino.SchemaChange; +import org.apache.gravitino.catalog.lakehouse.hudi.HudiSchema; +import org.apache.gravitino.catalog.lakehouse.hudi.HudiTable; +import org.apache.gravitino.catalog.lakehouse.hudi.TestHudiSchema; +import org.apache.gravitino.catalog.lakehouse.hudi.TestHudiTable; +import org.apache.gravitino.exceptions.NoSuchCatalogException; +import org.apache.gravitino.exceptions.NoSuchSchemaException; +import org.apache.gravitino.exceptions.NoSuchTableException; +import org.apache.gravitino.exceptions.NonEmptySchemaException; +import org.apache.gravitino.exceptions.SchemaAlreadyExistsException; +import org.apache.gravitino.exceptions.TableAlreadyExistsException; +import org.apache.gravitino.rel.Column; +import org.apache.gravitino.rel.Table; +import org.apache.gravitino.rel.TableChange; +import org.apache.gravitino.rel.expressions.distributions.Distribution; +import org.apache.gravitino.rel.expressions.sorts.SortOrder; +import org.apache.gravitino.rel.expressions.transforms.Transform; +import org.apache.gravitino.rel.indexes.Index; + +public class InMemoryBackendOps implements HudiCatalogBackendOps { + private final ConcurrentMap schemas; + private final ConcurrentMap tables; + + public InMemoryBackendOps() { + this.schemas = new ConcurrentHashMap<>(); + this.tables = new ConcurrentHashMap<>(); + } + + @Override + public void initialize(Map properties) { + // Do nothing + } + + @Override + public HudiSchema loadSchema(NameIdentifier schemaIdent) throws NoSuchSchemaException { + TestHudiSchema schema = schemas.get(schemaIdent); + if (schema == null) { + throw new NoSuchSchemaException("Schema %s does not exist", schemaIdent); + } + return schema; + } + + @Override + public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogException { + return schemas.keySet().toArray(new NameIdentifier[0]); + } + + @Override + public HudiSchema createSchema( + NameIdentifier ident, String comment, Map properties) + throws NoSuchCatalogException, SchemaAlreadyExistsException { + if (schemas.containsKey(ident)) { + throw new SchemaAlreadyExistsException("Schema %s already exists", ident); + } + + HudiSchema schema = + TestHudiSchema.builder() + .withName(ident.name()) + .withComment(comment) + .withProperties(properties) + .build(); + schemas.put(ident, schema.fromHudiSchema()); + return schema; + } + + @Override + public HudiSchema alterSchema(NameIdentifier ident, SchemaChange... changes) + throws NoSuchSchemaException { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmptySchemaException { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public NameIdentifier[] listTables(Namespace namespace) throws NoSuchSchemaException { + return tables.keySet().toArray(new NameIdentifier[0]); + } + + @Override + public Table loadTable(NameIdentifier ident) throws NoSuchTableException { + TestHudiTable table = tables.get(ident); + if (table == null) { + throw new NoSuchTableException("Table %s does not exist", ident); + } + return table; + } + + @Override + public Table createTable( + NameIdentifier ident, + Column[] columns, + String comment, + Map properties, + Transform[] partitions, + Distribution distribution, + SortOrder[] sortOrders, + Index[] indexes) + throws NoSuchSchemaException, TableAlreadyExistsException { + if (tables.containsKey(ident)) { + throw new TableAlreadyExistsException("Table %s already exists", ident); + } + + HudiTable table = + TestHudiTable.builder() + .withName(ident.name()) + .withComment(comment) + .withProperties(properties) + .build(); + tables.put(ident, table.fromHudiTable()); + return table; + } + + @Override + public Table alterTable(NameIdentifier ident, TableChange... changes) + throws NoSuchTableException, IllegalArgumentException { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public boolean dropTable(NameIdentifier ident) { + throw new UnsupportedOperationException("Not implemented yet"); + } + + @Override + public void close() throws Exception { + schemas.clear(); + tables.clear(); + } +} diff --git a/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/utils/TestCatalogUtils.java b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/utils/TestCatalogUtils.java new file mode 100644 index 00000000000..5f306d7bf90 --- /dev/null +++ b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/utils/TestCatalogUtils.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.lakehouse.hudi.utils; + +import static org.apache.gravitino.catalog.lakehouse.hudi.HudiCatalogPropertiesMetadata.CATALOG_BACKEND; + +import com.google.common.collect.ImmutableMap; +import org.apache.gravitino.catalog.lakehouse.hudi.backend.HudiCatalogBackend; +import org.apache.gravitino.catalog.lakehouse.hudi.backend.hms.HudiHMSBackend; +import org.apache.gravitino.catalog.lakehouse.hudi.backend.hms.HudiHMSBackendOps; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestCatalogUtils { + @Test + public void testLoadHudiCatalogBackend() { + HudiCatalogBackend catalogBackend = + CatalogUtils.loadHudiCatalogBackend(ImmutableMap.of(CATALOG_BACKEND, "hms")); + Assertions.assertInstanceOf(HudiHMSBackend.class, catalogBackend); + Assertions.assertInstanceOf(HudiHMSBackendOps.class, catalogBackend.backendOps()); + } +} From 46cb034ef3f3b74bf49db1d821e85ad87104fd42 Mon Sep 17 00:00:00 2001 From: mchades Date: Thu, 26 Sep 2024 17:53:32 +0800 Subject: [PATCH 2/3] address comments --- .../hudi/HudiTablePropertiesMetadata.java | 13 +++++++++++++ .../hudi/backend/hms/HudiHMSBackendOps.java | 15 +++++++++++---- .../lakehouse/hudi/backend/hms/HudiHMSTable.java | 12 ++++++++++++ .../hudi/backend/hms/TestHudiHMSBackendOps.java | 3 ++- .../hive/converter/HiveTableConverter.java | 2 +- 5 files changed, 39 insertions(+), 6 deletions(-) diff --git a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiTablePropertiesMetadata.java b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiTablePropertiesMetadata.java index e459a8abd02..3a1c72ae536 100644 --- a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiTablePropertiesMetadata.java +++ b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiTablePropertiesMetadata.java @@ -29,6 +29,9 @@ public class HudiTablePropertiesMetadata extends BasePropertiesMetadata { public static final String COMMENT = "comment"; public static final String LOCATION = "location"; + public static final String INPUT_FORMAT = "input-format"; + public static final String OUTPUT_FORMAT = "output-format"; + private static final Map> PROPERTIES_METADATA = ImmutableMap.>builder() .put(COMMENT, stringReservedPropertyEntry(COMMENT, "table comment", true /* hidden */)) @@ -41,6 +44,16 @@ public class HudiTablePropertiesMetadata extends BasePropertiesMetadata { null /* default value */, false /* hidden */, false /* reserved */)) + .put( + INPUT_FORMAT, + stringReservedPropertyEntry( + INPUT_FORMAT, + "Hudi table input format used to distinguish the table type", + false /* hidden */)) + .put( + OUTPUT_FORMAT, + stringReservedPropertyEntry( + OUTPUT_FORMAT, "Hudi table output format", false /* hidden */)) .build(); @Override diff --git a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackendOps.java b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackendOps.java index 869b4fcd875..3cfab1933e7 100644 --- a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackendOps.java +++ b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackendOps.java @@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import java.util.List; import java.util.Map; import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.Namespace; @@ -132,10 +133,16 @@ public NameIdentifier[] listTables(Namespace namespace) throws NoSuchSchemaExcep try { return clientPool.run( - c -> - c.getTables(schemaIdent.name(), "*").stream() - .map(table -> NameIdentifier.of(namespace, table)) - .toArray(NameIdentifier[]::new)); + c -> { + List allTables = c.getAllTables(schemaIdent.name()); + return c.getTableObjectsByName(schemaIdent.name(), allTables).stream() + .filter( + t -> + t.getSd().getInputFormat() != null + && t.getSd().getInputFormat().startsWith("org.apache.hudi")) + .map(t -> NameIdentifier.of(namespace, t.getTableName())) + .toArray(NameIdentifier[]::new); + }); } catch (UnknownDBException e) { throw new NoSuchSchemaException( diff --git a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSTable.java b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSTable.java index 50223c168ea..2837ede51b2 100644 --- a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSTable.java +++ b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSTable.java @@ -19,7 +19,9 @@ package org.apache.gravitino.catalog.lakehouse.hudi.backend.hms; import static org.apache.gravitino.catalog.lakehouse.hudi.HudiTablePropertiesMetadata.COMMENT; +import static org.apache.gravitino.catalog.lakehouse.hudi.HudiTablePropertiesMetadata.INPUT_FORMAT; import static org.apache.gravitino.catalog.lakehouse.hudi.HudiTablePropertiesMetadata.LOCATION; +import static org.apache.gravitino.catalog.lakehouse.hudi.HudiTablePropertiesMetadata.OUTPUT_FORMAT; import org.apache.gravitino.catalog.lakehouse.hudi.HudiColumn; import org.apache.gravitino.catalog.lakehouse.hudi.HudiTable; @@ -62,11 +64,21 @@ protected HudiHMSTable buildFromTable(Table hmsTable) { comment = hmsTable.getParameters().get(COMMENT); columns = HiveTableConverter.getColumns(hmsTable, HudiColumn.builder()); partitioning = HiveTableConverter.getPartitioning(hmsTable); + + // should be always SortOrders.NONE since Hudi using clustering to sort data (see + // https://hudi.apache.org/docs/next/clustering/) + // but is run as a background table service sortOrders = HiveTableConverter.getSortOrders(hmsTable); + + // should be always Distributions.NONE since Hudi doesn't support distribution distribution = HiveTableConverter.getDistribution(hmsTable); auditInfo = HiveTableConverter.getAuditInfo(hmsTable); + properties = hmsTable.getParameters(); properties.put(LOCATION, hmsTable.getSd().getLocation()); + properties.put(INPUT_FORMAT, hmsTable.getSd().getInputFormat()); + properties.put(OUTPUT_FORMAT, hmsTable.getSd().getOutputFormat()); + return simpleBuild(); } } diff --git a/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/TestHudiHMSBackendOps.java b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/TestHudiHMSBackendOps.java index 28b954e9f7f..812f6f9ace4 100644 --- a/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/TestHudiHMSBackendOps.java +++ b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/TestHudiHMSBackendOps.java @@ -103,7 +103,8 @@ public void testListTables() { Namespace namespace = Namespace.of(METALAKE_NAME, CATALOG_NAME, DB_NAME); NameIdentifier[] tables = ops.listTables(namespace); - Assertions.assertTrue(Arrays.stream(tables).anyMatch(table -> table.name().equals(TABLE_NAME))); + // all hive tables are filtered out + Assertions.assertEquals(0, tables.length); } @Test diff --git a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/converter/HiveTableConverter.java b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/converter/HiveTableConverter.java index 757d8df282d..03cb233d4f8 100644 --- a/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/converter/HiveTableConverter.java +++ b/catalogs/hive-metastore-common/src/main/java/org/apache/gravitino/hive/converter/HiveTableConverter.java @@ -63,7 +63,7 @@ public static Distribution getDistribution(Table table) { } public static SortOrder[] getSortOrders(Table table) { - SortOrder[] sortOrders = new SortOrder[0]; + SortOrder[] sortOrders = SortOrders.NONE; StorageDescriptor sd = table.getSd(); if (sd.getSortCols() != null && !sd.getSortCols().isEmpty()) { sortOrders = From 84f176d80cfb021b389c99571508948dc87d3aab Mon Sep 17 00:00:00 2001 From: mchades Date: Thu, 10 Oct 2024 23:48:25 +0800 Subject: [PATCH 3/3] filter table --- .../catalog-lakehouse-hudi/build.gradle.kts | 18 +++++ .../hudi/backend/hms/HudiHMSBackendOps.java | 21 ++++- .../hudi/backend/hms/HudiHMSTable.java | 4 +- .../backend/hms/TestHudiHMSBackendOps.java | 81 ++++++++++++++++--- 4 files changed, 106 insertions(+), 18 deletions(-) diff --git a/catalogs/catalog-lakehouse-hudi/build.gradle.kts b/catalogs/catalog-lakehouse-hudi/build.gradle.kts index c38838c7e85..eef90f02957 100644 --- a/catalogs/catalog-lakehouse-hudi/build.gradle.kts +++ b/catalogs/catalog-lakehouse-hudi/build.gradle.kts @@ -24,6 +24,10 @@ plugins { id("idea") } +val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString() +val fullSparkVersion: String = libs.versions.spark34.get() +val sparkVersion = fullSparkVersion.split(".").take(2).joinToString(".") + dependencies { implementation(project(":api")) { exclude(group = "*") @@ -117,6 +121,20 @@ dependencies { testImplementation(libs.htrace.core4) testImplementation(libs.junit.jupiter.api) testImplementation(libs.woodstox.core) + testImplementation("org.apache.spark:spark-hive_$scalaVersion:$fullSparkVersion") { + exclude("org.apache.hadoop") + exclude("io.dropwizard.metrics") + exclude("com.fasterxml.jackson.core") + exclude("com.fasterxml.jackson.module", "jackson-module-scala_2.12") + } + testImplementation("org.apache.spark:spark-sql_$scalaVersion:$fullSparkVersion") { + exclude("org.apache.avro") + exclude("org.apache.hadoop") + exclude("org.apache.zookeeper") + exclude("io.dropwizard.metrics") + exclude("org.rocksdb") + } + testRuntimeOnly("org.apache.hudi:hudi-spark$sparkVersion-bundle_$scalaVersion:0.15.0") testRuntimeOnly(libs.junit.jupiter.engine) } diff --git a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackendOps.java b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackendOps.java index 3cfab1933e7..9b03518a602 100644 --- a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackendOps.java +++ b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSBackendOps.java @@ -58,6 +58,9 @@ public class HudiHMSBackendOps implements HudiCatalogBackendOps { // Mapping from Gravitino config to Hive config private static final Map CONFIG_CONVERTER = ImmutableMap.of(URI, HiveConf.ConfVars.METASTOREURIS.varname); + + private static final String HUDI_PACKAGE_PREFIX = "org.apache.hudi"; + @VisibleForTesting CachedClientPool clientPool; @Override @@ -136,10 +139,7 @@ public NameIdentifier[] listTables(Namespace namespace) throws NoSuchSchemaExcep c -> { List allTables = c.getAllTables(schemaIdent.name()); return c.getTableObjectsByName(schemaIdent.name(), allTables).stream() - .filter( - t -> - t.getSd().getInputFormat() != null - && t.getSd().getInputFormat().startsWith("org.apache.hudi")) + .filter(this::checkHudiTable) .map(t -> NameIdentifier.of(namespace, t.getTableName())) .toArray(NameIdentifier[]::new); }); @@ -164,6 +164,10 @@ public HudiTable loadTable(NameIdentifier tableIdent) throws NoSuchTableExceptio try { Table table = clientPool.run(client -> client.getTable(schemaIdent.name(), tableIdent.name())); + if (!checkHudiTable(table)) { + throw new NoSuchTableException( + "Table %s is not a Hudi table in Hive Metastore", tableIdent.name()); + } return HudiHMSTable.builder().withBackendTable(table).build(); } catch (NoSuchObjectException e) { @@ -212,6 +216,15 @@ public void close() { } } + private boolean checkHudiTable(Table table) { + // here uses the input format to filter out non-Hudi tables, the COW table + // uses `org.apache.hudi.hadoop.HoodieParquetInputFormat` and MOR table + // uses `org.apache.hudi.hadoop.HoodieParquetRealtimeInputFormat`, to + // simplify the logic, we just check the prefix of the input format + return table.getSd().getInputFormat() != null + && table.getSd().getInputFormat().startsWith(HUDI_PACKAGE_PREFIX); + } + private HiveConf buildHiveConf(Map properties) { Configuration hadoopConf = new Configuration(); diff --git a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSTable.java b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSTable.java index 2837ede51b2..e7c10ea1c22 100644 --- a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSTable.java +++ b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/HudiHMSTable.java @@ -65,12 +65,12 @@ protected HudiHMSTable buildFromTable(Table hmsTable) { columns = HiveTableConverter.getColumns(hmsTable, HudiColumn.builder()); partitioning = HiveTableConverter.getPartitioning(hmsTable); - // should be always SortOrders.NONE since Hudi using clustering to sort data (see + // Should always be SortOrders.NONE since Hudi using clustering to sort data (see // https://hudi.apache.org/docs/next/clustering/) // but is run as a background table service sortOrders = HiveTableConverter.getSortOrders(hmsTable); - // should be always Distributions.NONE since Hudi doesn't support distribution + // Should always be Distributions.NONE since Hudi doesn't support distribution distribution = HiveTableConverter.getDistribution(hmsTable); auditInfo = HiveTableConverter.getAuditInfo(hmsTable); diff --git a/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/TestHudiHMSBackendOps.java b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/TestHudiHMSBackendOps.java index 812f6f9ace4..d1e531e8481 100644 --- a/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/TestHudiHMSBackendOps.java +++ b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/backend/hms/TestHudiHMSBackendOps.java @@ -28,8 +28,10 @@ import java.util.Map; import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.Namespace; +import org.apache.gravitino.catalog.lakehouse.hudi.HudiColumn; import org.apache.gravitino.catalog.lakehouse.hudi.HudiSchema; import org.apache.gravitino.catalog.lakehouse.hudi.HudiTable; +import org.apache.gravitino.exceptions.NoSuchTableException; import org.apache.gravitino.hive.hms.MiniHiveMetastoreService; import org.apache.gravitino.rel.Column; import org.apache.gravitino.rel.types.Types; @@ -38,6 +40,7 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.spark.sql.SparkSession; import org.apache.thrift.TException; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; @@ -49,7 +52,8 @@ public class TestHudiHMSBackendOps extends MiniHiveMetastoreService { private static final HudiHMSBackendOps ops = new HudiHMSBackendOps(); private static final String METALAKE_NAME = "metalake"; private static final String CATALOG_NAME = "catalog"; - private static final String TABLE_NAME = "hudi_table"; + private static final String HIVE_TABLE_NAME = "hive_table"; + private static final String HUDI_TABLE_NAME = "hudi_table"; @BeforeAll public static void prepare() throws TException { @@ -57,14 +61,35 @@ public static void prepare() throws TException { props.put(URI, hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname)); ops.initialize(props); + // create a hive table Table table = new Table(); table.setDbName(DB_NAME); - table.setTableName(TABLE_NAME); + table.setTableName(HIVE_TABLE_NAME); StorageDescriptor strgDesc = new StorageDescriptor(); strgDesc.setCols(Lists.newArrayList(new FieldSchema("col1", "string", "description"))); strgDesc.setSerdeInfo(new SerDeInfo()); table.setSd(strgDesc); metastoreClient.createTable(table); + + // use Spark to create a hudi table + SparkSession sparkSession = + SparkSession.builder() + .master("local[1]") + .appName("Hudi Catalog integration test") + .config("hive.metastore.uris", hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname)) + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") + .config( + "spark.sql.catalog.spark_catalog", + "org.apache.spark.sql.hudi.catalog.HoodieCatalog") + .config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar") + .config("dfs.replication", "1") + .enableHiveSupport() + .getOrCreate(); + + // create a hudi table + sparkSession.sql( + String.format("CREATE TABLE %s.%s (ts BIGINT) USING HUDI", DB_NAME, HUDI_TABLE_NAME)); } @AfterAll @@ -104,24 +129,56 @@ public void testListTables() { NameIdentifier[] tables = ops.listTables(namespace); // all hive tables are filtered out - Assertions.assertEquals(0, tables.length); + Assertions.assertEquals(1, tables.length); + Assertions.assertEquals(HUDI_TABLE_NAME, tables[0].name()); } @Test public void testLoadTable() { Namespace namespace = Namespace.of(METALAKE_NAME, CATALOG_NAME, DB_NAME); - HudiTable hudiTable = ops.loadTable(NameIdentifier.of(namespace, TABLE_NAME)); - - Assertions.assertEquals(TABLE_NAME, hudiTable.name()); + Exception exception = + Assertions.assertThrows( + NoSuchTableException.class, + () -> ops.loadTable(NameIdentifier.of(namespace, HIVE_TABLE_NAME))); + Assertions.assertEquals( + "Table hive_table is not a Hudi table in Hive Metastore", exception.getMessage()); + + HudiTable hudiTable = ops.loadTable(NameIdentifier.of(namespace, HUDI_TABLE_NAME)); + Assertions.assertEquals(HUDI_TABLE_NAME, hudiTable.name()); Assertions.assertNull(hudiTable.comment()); Assertions.assertNotNull(hudiTable.properties().get(LOCATION)); Column[] columns = hudiTable.columns(); - Assertions.assertEquals(1, columns.length); - - Column column = columns[0]; - Assertions.assertEquals("col1", column.name()); - Assertions.assertEquals(Types.StringType.get(), column.dataType()); - Assertions.assertEquals("description", column.comment()); + Assertions.assertEquals(6, columns.length); + + Assertions.assertEquals( + HudiColumn.builder() + .withName("_hoodie_commit_time") + .withType(Types.StringType.get()) + .build(), + columns[0]); + Assertions.assertEquals( + HudiColumn.builder() + .withName("_hoodie_commit_seqno") + .withType(Types.StringType.get()) + .build(), + columns[1]); + Assertions.assertEquals( + HudiColumn.builder() + .withName("_hoodie_record_key") + .withType(Types.StringType.get()) + .build(), + columns[2]); + Assertions.assertEquals( + HudiColumn.builder() + .withName("_hoodie_partition_path") + .withType(Types.StringType.get()) + .build(), + columns[3]); + Assertions.assertEquals( + HudiColumn.builder().withName("_hoodie_file_name").withType(Types.StringType.get()).build(), + columns[4]); + Assertions.assertEquals( + HudiColumn.builder().withName("ts").withType(Types.LongType.get()).build(), columns[5]); } }