From cc277566bcf8251786fc5e18424b9a0ca5354849 Mon Sep 17 00:00:00 2001 From: mchades Date: Sat, 12 Oct 2024 18:58:17 +0800 Subject: [PATCH] [#4953] feat(hudi-catalog): add Hudi catalog IT and enable module (#4965) ### What changes were proposed in this pull request? add Hudi catalog IT and enable module ### Why are the changes needed? Fix: #4953 ### Does this PR introduce _any_ user-facing change? yes. The Hudi catalog is available now ### How was this patch tested? ITs added --- build.gradle.kts | 1 + catalogs/catalog-hive/build.gradle.kts | 2 +- catalogs/catalog-kafka/build.gradle.kts | 9 - .../catalog-lakehouse-hudi/build.gradle.kts | 136 +++-- .../lakehouse/hudi/HudiCatalogOperations.java | 3 +- .../src/main/resources/hive-site.xml.template | 21 + .../src/main/resources/lakehouse-hudi.conf | 22 + .../hudi/TestHudiCatalogOperations.java | 4 +- .../integration/test/HudiCatalogHMSIT.java | 503 ++++++++++++++++++ .../src/test/resources/log4j2.properties | 73 +++ .../hive-metastore-common/build.gradle.kts | 2 +- .../gravitino/dto/util/DTOConverters.java | 4 +- flink-connector/flink/build.gradle.kts | 2 +- gradle/libs.versions.toml | 1 + .../src/test/resources/log4j2.properties | 73 +++ .../gravitino/server/GravitinoServer.java | 2 + spark-connector/spark-common/build.gradle.kts | 2 +- spark-connector/v3.3/spark/build.gradle.kts | 2 +- spark-connector/v3.4/spark/build.gradle.kts | 2 +- spark-connector/v3.5/spark/build.gradle.kts | 2 +- 20 files changed, 813 insertions(+), 53 deletions(-) create mode 100644 catalogs/catalog-lakehouse-hudi/src/main/resources/hive-site.xml.template create mode 100644 catalogs/catalog-lakehouse-hudi/src/main/resources/lakehouse-hudi.conf create mode 100644 catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/integration/test/HudiCatalogHMSIT.java create mode 100644 catalogs/catalog-lakehouse-hudi/src/test/resources/log4j2.properties create mode 100644 integration-test-common/src/test/resources/log4j2.properties diff --git a/build.gradle.kts b/build.gradle.kts index 75d0d8759e3..6db5f00cca4 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -780,6 +780,7 @@ tasks { ":catalogs:catalog-hive:copyLibAndConfig", ":catalogs:catalog-lakehouse-iceberg:copyLibAndConfig", ":catalogs:catalog-lakehouse-paimon:copyLibAndConfig", + "catalogs:catalog-lakehouse-hudi:copyLibAndConfig", ":catalogs:catalog-jdbc-doris:copyLibAndConfig", ":catalogs:catalog-jdbc-mysql:copyLibAndConfig", ":catalogs:catalog-jdbc-postgresql:copyLibAndConfig", diff --git a/catalogs/catalog-hive/build.gradle.kts b/catalogs/catalog-hive/build.gradle.kts index 2afb48f9a2d..aca8959df13 100644 --- a/catalogs/catalog-hive/build.gradle.kts +++ b/catalogs/catalog-hive/build.gradle.kts @@ -58,7 +58,7 @@ dependencies { exclude("com.google.code.findbugs", "sr305") exclude("com.tdunning", "json") exclude("com.zaxxer", "HikariCP") - exclude("io.dropwizard.metricss") + exclude("io.dropwizard.metrics") exclude("javax.transaction", "transaction-api") exclude("org.apache.ant") exclude("org.apache.avro") diff --git a/catalogs/catalog-kafka/build.gradle.kts b/catalogs/catalog-kafka/build.gradle.kts index fe8e6086f46..6a8c104c726 100644 --- a/catalogs/catalog-kafka/build.gradle.kts +++ b/catalogs/catalog-kafka/build.gradle.kts @@ -97,15 +97,6 @@ tasks.getByName("generateMetadataFileForMavenJavaPublication") { } tasks.test { - doFirst { - val testMode = project.properties["testMode"] as? String ?: "embedded" - if (testMode == "deploy") { - environment("GRAVITINO_HOME", project.rootDir.path + "/distribution/package") - } else if (testMode == "embedded") { - environment("GRAVITINO_HOME", project.rootDir.path) - } - } - val skipITs = project.hasProperty("skipITs") if (skipITs) { // Exclude integration tests diff --git a/catalogs/catalog-lakehouse-hudi/build.gradle.kts b/catalogs/catalog-lakehouse-hudi/build.gradle.kts index eef90f02957..814965ec038 100644 --- a/catalogs/catalog-lakehouse-hudi/build.gradle.kts +++ b/catalogs/catalog-lakehouse-hudi/build.gradle.kts @@ -27,55 +27,38 @@ plugins { val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString() val fullSparkVersion: String = libs.versions.spark34.get() val sparkVersion = fullSparkVersion.split(".").take(2).joinToString(".") +val hudiVersion = libs.versions.hudi.get() dependencies { implementation(project(":api")) { - exclude(group = "*") + exclude("*") } implementation(project(":common")) { - exclude(group = "*") + exclude("*") } implementation(project(":catalogs:hive-metastore-common")) implementation(project(":core")) { - exclude(group = "*") + exclude("*") } + implementation(libs.commons.collections3) + implementation(libs.commons.configuration1) + implementation(libs.htrace.core4) implementation(libs.guava) - implementation(libs.hive2.exec) { - artifact { - classifier = "core" - } - exclude("com.google.code.findbugs", "jsr305") - exclude("com.google.protobuf") - exclude("org.apache.avro") - exclude("org.apache.ant") - exclude("org.apache.calcite") - exclude("org.apache.calcite.avatica") - exclude("org.apache.curator") - exclude("org.apache.derby") - exclude("org.apache.hadoop", "hadoop-yarn-server-resourcemanager") - exclude("org.apache.hive", "hive-llap-tez") - exclude("org.apache.hive", "hive-vector-code-gen") - exclude("org.apache.ivy") - exclude("org.apache.logging.log4j") - exclude("org.apache.zookeeper") - exclude("org.codehaus.groovy", "groovy-all") - exclude("org.datanucleus", "datanucleus-core") - exclude("org.eclipse.jetty.aggregate", "jetty-all") - exclude("org.eclipse.jetty.orbit", "javax.servlet") - exclude("org.openjdk.jol") - exclude("org.pentaho") - exclude("org.slf4j") + implementation(libs.hadoop2.auth) { + exclude("*") } + implementation(libs.woodstox.core) implementation(libs.hive2.metastore) { exclude("ant") exclude("co.cask.tephra") + exclude("com.fasterxml.jackson.core", "jackson-core") exclude("com.github.joshelser") exclude("com.google.code.findbugs", "jsr305") exclude("com.google.code.findbugs", "sr305") exclude("com.tdunning", "json") exclude("com.zaxxer", "HikariCP") - exclude("io.dropwizard.metricss") + exclude("io.dropwizard.metrics") exclude("javax.transaction", "transaction-api") exclude("org.apache.ant") exclude("org.apache.avro") @@ -95,16 +78,29 @@ dependencies { implementation(libs.hadoop2.common) { exclude("*") } + implementation(libs.hadoop2.mapreduce.client.core) { + exclude("*") + } implementation(libs.slf4j.api) - implementation(libs.thrift) compileOnly(libs.lombok) annotationProcessor(libs.lombok) testImplementation(project(":catalogs:hive-metastore-common", "testArtifacts")) + testImplementation(project(":clients:client-java")) { + exclude("org.apache.logging.log4j") + } + testImplementation(project(":integration-test-common", "testArtifacts")) + testImplementation(project(":server")) { + exclude("org.apache.logging.log4j") + } + testImplementation(project(":server-common")) { + exclude("org.apache.logging.log4j") + } - testImplementation(libs.bundles.log4j) + testImplementation(libs.bundles.jetty) + testImplementation(libs.bundles.jersey) testImplementation(libs.commons.collections3) testImplementation(libs.commons.configuration1) testImplementation(libs.datanucleus.core) @@ -115,12 +111,29 @@ dependencies { testImplementation(libs.hadoop2.auth) { exclude("*") } + testImplementation(libs.hadoop2.hdfs) testImplementation(libs.hadoop2.mapreduce.client.core) { exclude("*") } testImplementation(libs.htrace.core4) testImplementation(libs.junit.jupiter.api) - testImplementation(libs.woodstox.core) + testImplementation(libs.mysql.driver) + testImplementation(libs.postgresql.driver) + testImplementation(libs.prometheus.dropwizard) + testImplementation("org.apache.spark:spark-hive_$scalaVersion:$fullSparkVersion") { + exclude("org.apache.hadoop") + exclude("io.dropwizard.metrics") + exclude("com.fasterxml.jackson.core") + exclude("com.fasterxml.jackson.module", "jackson-module-scala_2.12") + } + testImplementation("org.apache.spark:spark-sql_$scalaVersion:$fullSparkVersion") { + exclude("org.apache.avro") + exclude("org.apache.hadoop") + exclude("org.apache.zookeeper") + exclude("io.dropwizard.metrics") + exclude("org.rocksdb") + } + testImplementation(libs.testcontainers) testImplementation("org.apache.spark:spark-hive_$scalaVersion:$fullSparkVersion") { exclude("org.apache.hadoop") exclude("io.dropwizard.metrics") @@ -135,6 +148,63 @@ dependencies { exclude("org.rocksdb") } - testRuntimeOnly("org.apache.hudi:hudi-spark$sparkVersion-bundle_$scalaVersion:0.15.0") + testRuntimeOnly("org.apache.hudi:hudi-spark$sparkVersion-bundle_$scalaVersion:$hudiVersion") testRuntimeOnly(libs.junit.jupiter.engine) } + +tasks { + val runtimeJars by registering(Copy::class) { + from(configurations.runtimeClasspath) + into("build/libs") + } + + val copyCatalogLibs by registering(Copy::class) { + dependsOn("jar", "runtimeJars") + from("build/libs") { + exclude("guava-*.jar") + exclude("log4j-*.jar") + exclude("slf4j-*.jar") + } + into("$rootDir/distribution/package/catalogs/lakehouse-hudi/libs") + } + + val copyCatalogConfig by registering(Copy::class) { + from("src/main/resources") + into("$rootDir/distribution/package/catalogs/lakehouse-hudi/conf") + + include("lakehouse-hudi.conf") + include("hive-site.xml.template") + + rename { original -> + if (original.endsWith(".template")) { + original.replace(".template", "") + } else { + original + } + } + + exclude { details -> + details.file.isDirectory() + } + + fileMode = 0b111101101 + } + + register("copyLibAndConfig", Copy::class) { + dependsOn(copyCatalogLibs, copyCatalogConfig) + } +} + +tasks.test { + val skipITs = project.hasProperty("skipITs") + if (skipITs) { + // Exclude integration tests + exclude("**/integration/test/**") + } else { + dependsOn(tasks.jar) + } +} + +tasks.getByName("generateMetadataFileForMavenJavaPublication") { + dependsOn("runtimeJars") +} diff --git a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiCatalogOperations.java b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiCatalogOperations.java index c2b68d11d7e..f73927233a6 100644 --- a/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiCatalogOperations.java +++ b/catalogs/catalog-lakehouse-hudi/src/main/java/org/apache/gravitino/catalog/lakehouse/hudi/HudiCatalogOperations.java @@ -92,7 +92,8 @@ public void testConnection( Map properties) throws Exception { try { - hudiCatalogBackendOps.listSchemas(null); + hudiCatalogBackendOps.listSchemas( + Namespace.of(catalogIdent.namespace().level(0), catalogIdent.name())); } catch (Exception e) { throw new ConnectionFailedException( e, "Failed to run listSchemas on Hudi catalog: %s", e.getMessage()); diff --git a/catalogs/catalog-lakehouse-hudi/src/main/resources/hive-site.xml.template b/catalogs/catalog-lakehouse-hudi/src/main/resources/hive-site.xml.template new file mode 100644 index 00000000000..efa7db5adb6 --- /dev/null +++ b/catalogs/catalog-lakehouse-hudi/src/main/resources/hive-site.xml.template @@ -0,0 +1,21 @@ + + + + diff --git a/catalogs/catalog-lakehouse-hudi/src/main/resources/lakehouse-hudi.conf b/catalogs/catalog-lakehouse-hudi/src/main/resources/lakehouse-hudi.conf new file mode 100644 index 00000000000..ebab7ce76d6 --- /dev/null +++ b/catalogs/catalog-lakehouse-hudi/src/main/resources/lakehouse-hudi.conf @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# This file holds common configurations for Lakehouse-hudi catalog. The format of the key is +# 'gravitino.bypass.{hudi-inner-config-key}' and `hudi-inner-config-key` is the +# real key that pass to Lakehouse-hudi catalog. diff --git a/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/TestHudiCatalogOperations.java b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/TestHudiCatalogOperations.java index 16595da6aab..01e6166476b 100644 --- a/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/TestHudiCatalogOperations.java +++ b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/TestHudiCatalogOperations.java @@ -75,7 +75,9 @@ public void testTestConnection() throws Exception { InMemoryBackendOps inMemoryBackendOps = new InMemoryBackendOps()) { ops.hudiCatalogBackendOps = inMemoryBackendOps; - Assertions.assertDoesNotThrow(() -> ops.testConnection(null, null, null, null, null)); + Assertions.assertDoesNotThrow( + () -> + ops.testConnection(NameIdentifier.of("metalake", "catalog"), null, null, null, null)); } } diff --git a/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/integration/test/HudiCatalogHMSIT.java b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/integration/test/HudiCatalogHMSIT.java new file mode 100644 index 00000000000..034670fcfed --- /dev/null +++ b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/integration/test/HudiCatalogHMSIT.java @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.lakehouse.hudi.integration.test; + +import static org.apache.gravitino.catalog.lakehouse.hudi.HudiCatalogPropertiesMetadata.CATALOG_BACKEND; +import static org.apache.gravitino.catalog.lakehouse.hudi.HudiCatalogPropertiesMetadata.URI; +import static org.apache.gravitino.catalog.lakehouse.hudi.HudiSchemaPropertiesMetadata.LOCATION; + +import com.google.common.collect.ImmutableMap; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.Namespace; +import org.apache.gravitino.Schema; +import org.apache.gravitino.SchemaChange; +import org.apache.gravitino.SupportsSchemas; +import org.apache.gravitino.client.GravitinoMetalake; +import org.apache.gravitino.dto.rel.ColumnDTO; +import org.apache.gravitino.integration.test.container.ContainerSuite; +import org.apache.gravitino.integration.test.container.HiveContainer; +import org.apache.gravitino.integration.test.util.AbstractIT; +import org.apache.gravitino.rel.Column; +import org.apache.gravitino.rel.Table; +import org.apache.gravitino.rel.TableCatalog; +import org.apache.gravitino.rel.TableChange; +import org.apache.gravitino.rel.expressions.distributions.Distributions; +import org.apache.gravitino.rel.expressions.sorts.SortOrders; +import org.apache.gravitino.rel.expressions.transforms.Transform; +import org.apache.gravitino.rel.expressions.transforms.Transforms; +import org.apache.gravitino.rel.indexes.Indexes; +import org.apache.gravitino.rel.types.Types; +import org.apache.gravitino.utils.RandomNameUtils; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +@Tag("gravitino-docker-test") +public class HudiCatalogHMSIT extends AbstractIT { + private static final ContainerSuite containerSuite = ContainerSuite.getInstance(); + + private static String hmsURI; + private static SparkSession sparkSession; + private static GravitinoMetalake metalake; + private static Catalog catalog; + private static final String METALAKE_NAME = RandomNameUtils.genRandomName("hudi_metalake"); + private static final String CATALOG_NAME = RandomNameUtils.genRandomName("hudi_catalog"); + private static final String DB_NAME = RandomNameUtils.genRandomName("hudi_schema"); + private static final String DB_LOCATION = "/user/hive/warehouse-catalog-hudi/" + DB_NAME; + private static final String DATA_TABLE_NAME = RandomNameUtils.genRandomName("hudi_data_table"); + private static final String COW_TABLE = RandomNameUtils.genRandomName("hudi_cow_table"); + private static final String MOR_TABLE = RandomNameUtils.genRandomName("hudi_mor_table"); + + @BeforeAll + public static void prepare() { + containerSuite.startHiveContainer(); + hmsURI = + String.format( + "thrift://%s:%d", + containerSuite.getHiveContainer().getContainerIpAddress(), + HiveContainer.HIVE_METASTORE_PORT); + + createHudiTables(); + + metalake = + client.createMetalake(METALAKE_NAME, "metalake for hudi catalog IT", ImmutableMap.of()); + catalog = + metalake.createCatalog( + CATALOG_NAME, + Catalog.Type.RELATIONAL, + "lakehouse-hudi", + "hudi catalog for hms", + ImmutableMap.of(CATALOG_BACKEND, "hms", URI, hmsURI)); + } + + @Test + public void testCatalog() { + String catalogName = RandomNameUtils.genRandomName("hudi_catalog"); + String comment = "hudi catalog for hms"; + // test create exception + Exception exception = + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + metalake.createCatalog( + catalogName, + Catalog.Type.RELATIONAL, + "lakehouse-hudi", + comment, + ImmutableMap.of())); + Assertions.assertTrue( + exception + .getMessage() + .contains("Properties are required and must be set: [catalog-backend, uri]"), + "Unexpected exception message: " + exception.getMessage()); + + // test testConnection exception + exception = + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + metalake.testConnection( + catalogName, + Catalog.Type.RELATIONAL, + "lakehouse-hudi", + comment, + ImmutableMap.of())); + Assertions.assertTrue( + exception + .getMessage() + .contains("Properties are required and must be set: [catalog-backend, uri]"), + "Unexpected exception message: " + exception.getMessage()); + + // test testConnection + ImmutableMap properties = ImmutableMap.of(CATALOG_BACKEND, "hms", URI, hmsURI); + Assertions.assertDoesNotThrow( + () -> + metalake.testConnection( + catalogName, Catalog.Type.RELATIONAL, "lakehouse-hudi", comment, properties)); + + // test create and load + metalake.createCatalog( + catalogName, Catalog.Type.RELATIONAL, "lakehouse-hudi", comment, properties); + Catalog catalog = metalake.loadCatalog(catalogName); + Assertions.assertEquals(catalogName, catalog.name()); + Assertions.assertEquals(Catalog.Type.RELATIONAL, catalog.type()); + Assertions.assertEquals("lakehouse-hudi", catalog.provider()); + Assertions.assertEquals(comment, catalog.comment()); + Assertions.assertEquals(properties, catalog.properties()); + + // test list + String[] catalogs = metalake.listCatalogs(); + Assertions.assertTrue(Arrays.asList(catalogs).contains(catalogName)); + } + + @Test + public void testSchema() { + SupportsSchemas schemaOps = catalog.asSchemas(); + String schemaName = RandomNameUtils.genRandomName("hudi_schema"); + // test create + Exception exception = + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> schemaOps.createSchema(schemaName, null, ImmutableMap.of())); + Assertions.assertTrue( + exception.getMessage().contains("Not implemented yet"), + "Unexpected exception message: " + exception.getMessage()); + + // test alter + exception = + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> schemaOps.alterSchema(schemaName, SchemaChange.removeProperty("test"))); + Assertions.assertTrue( + exception.getMessage().contains("Not implemented yet"), + "Unexpected exception message: " + exception.getMessage()); + + // test list + String[] schemas = schemaOps.listSchemas(); + Assertions.assertTrue(Arrays.asList(schemas).contains(DB_NAME)); + + // test load + Schema schema = schemaOps.loadSchema(DB_NAME); + Assertions.assertEquals(DB_NAME, schema.name()); + Assertions.assertEquals("", schema.comment()); + Assertions.assertTrue(schema.properties().get(LOCATION).endsWith(DB_NAME)); + } + + @Test + public void testTable() { + TableCatalog tableOps = catalog.asTableCatalog(); + String tableName = RandomNameUtils.genRandomName("hudi_table"); + NameIdentifier tableIdent = NameIdentifier.of(DB_NAME, tableName); + + // test create + Exception exception = + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> + tableOps.createTable( + tableIdent, + new Column[] {Column.of("col1", Types.StringType.get())}, + null, + null)); + Assertions.assertTrue( + exception.getMessage().contains("Not implemented yet"), + "Unexpected exception message: " + exception.getMessage()); + + // test alter + exception = + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> tableOps.alterTable(tableIdent, TableChange.updateComment("new comment"))); + Assertions.assertTrue( + exception.getMessage().contains("Not implemented yet"), + "Unexpected exception message: " + exception.getMessage()); + + // test list + NameIdentifier[] tables = tableOps.listTables(Namespace.of(DB_NAME)); + List tableNames = + Arrays.stream(tables).map(NameIdentifier::name).collect(Collectors.toList()); + + Assertions.assertTrue(tableNames.contains(DATA_TABLE_NAME)); + + Assertions.assertTrue(tableNames.contains(COW_TABLE)); + Assertions.assertFalse(tableNames.contains(COW_TABLE + "_rt")); + Assertions.assertFalse(tableNames.contains(COW_TABLE + "_ro")); + + Assertions.assertTrue(tableNames.contains(MOR_TABLE)); + Assertions.assertTrue(tableNames.contains(MOR_TABLE + "_rt")); + Assertions.assertTrue(tableNames.contains(MOR_TABLE + "_ro")); + + // test load + Table table = tableOps.loadTable(NameIdentifier.of(DB_NAME, COW_TABLE)); + Assertions.assertEquals(COW_TABLE, table.name()); + assertTable(table); + + table = tableOps.loadTable(NameIdentifier.of(DB_NAME, MOR_TABLE)); + Assertions.assertEquals(MOR_TABLE, table.name()); + assertTable(table); + + table = tableOps.loadTable(NameIdentifier.of(DB_NAME, MOR_TABLE + "_rt")); + Assertions.assertEquals(MOR_TABLE + "_rt", table.name()); + assertTable(table); + + table = tableOps.loadTable(NameIdentifier.of(DB_NAME, MOR_TABLE + "_ro")); + Assertions.assertEquals(MOR_TABLE + "_ro", table.name()); + assertTable(table); + } + + private void assertTable(Table table) { + Assertions.assertNull(table.comment()); + assertColumns(table); + assertProperties(table); + assertPartitioning(table.partitioning()); + Assertions.assertEquals(Distributions.NONE, table.distribution()); + Assertions.assertEquals(SortOrders.NONE, table.sortOrder()); + Assertions.assertEquals(Indexes.EMPTY_INDEXES, table.index()); + } + + private void assertPartitioning(Transform[] partitioning) { + Assertions.assertEquals(1, partitioning.length); + Assertions.assertEquals(Transforms.identity("city"), partitioning[0]); + } + + private void assertProperties(Table table) { + Map properties = table.properties(); + Assertions.assertTrue(properties.containsKey("last_commit_time_sync")); + Assertions.assertTrue(properties.containsKey("last_commit_completion_time_sync")); + Assertions.assertTrue(properties.containsKey("transient_lastDdlTime")); + Assertions.assertTrue(properties.containsKey("spark.sql.sources.schema.numParts")); + Assertions.assertTrue(properties.containsKey("spark.sql.sources.schema.part.0")); + Assertions.assertTrue(properties.containsKey("spark.sql.sources.schema.partCol.0")); + Assertions.assertTrue(properties.containsKey("spark.sql.sources.schema.numPartCols")); + Assertions.assertTrue(properties.containsKey("spark.sql.sources.provider")); + Assertions.assertTrue(properties.containsKey("spark.sql.create.version")); + + if (table.name().endsWith("_rt") || table.name().endsWith("_ro")) { + Assertions.assertEquals("TRUE", properties.get("EXTERNAL")); + } else { + Assertions.assertTrue(properties.containsKey("type")); + Assertions.assertTrue(properties.containsKey("provider")); + } + } + + private void assertColumns(Table table) { + Column[] columns = table.columns(); + Assertions.assertEquals(11, columns.length); + if (table.name().endsWith("_rt") || table.name().endsWith("_ro")) { + Assertions.assertEquals( + ColumnDTO.builder() + .withName("_hoodie_commit_time") + .withDataType(Types.StringType.get()) + .withComment("") + .build(), + columns[0]); + Assertions.assertEquals( + ColumnDTO.builder() + .withName("_hoodie_commit_seqno") + .withDataType(Types.StringType.get()) + .withComment("") + .build(), + columns[1]); + Assertions.assertEquals( + ColumnDTO.builder() + .withName("_hoodie_record_key") + .withDataType(Types.StringType.get()) + .withComment("") + .build(), + columns[2]); + Assertions.assertEquals( + ColumnDTO.builder() + .withName("_hoodie_partition_path") + .withDataType(Types.StringType.get()) + .withComment("") + .build(), + columns[3]); + Assertions.assertEquals( + ColumnDTO.builder() + .withName("_hoodie_file_name") + .withDataType(Types.StringType.get()) + .withComment("") + .build(), + columns[4]); + Assertions.assertEquals( + ColumnDTO.builder() + .withName("ts") + .withDataType(Types.LongType.get()) + .withComment("") + .build(), + columns[5]); + Assertions.assertEquals( + ColumnDTO.builder() + .withName("uuid") + .withDataType(Types.StringType.get()) + .withComment("") + .build(), + columns[6]); + Assertions.assertEquals( + ColumnDTO.builder() + .withName("rider") + .withDataType(Types.StringType.get()) + .withComment("") + .build(), + columns[7]); + Assertions.assertEquals( + ColumnDTO.builder() + .withName("driver") + .withDataType(Types.StringType.get()) + .withComment("") + .build(), + columns[8]); + Assertions.assertEquals( + ColumnDTO.builder() + .withName("fare") + .withDataType(Types.DoubleType.get()) + .withComment("") + .build(), + columns[9]); + Assertions.assertEquals( + ColumnDTO.builder() + .withName("city") + .withDataType(Types.StringType.get()) + .withComment("") + .build(), + columns[10]); + } else { + Assertions.assertEquals( + ColumnDTO.builder() + .withName("_hoodie_commit_time") + .withDataType(Types.StringType.get()) + .build(), + columns[0]); + Assertions.assertEquals( + ColumnDTO.builder() + .withName("_hoodie_commit_seqno") + .withDataType(Types.StringType.get()) + .build(), + columns[1]); + Assertions.assertEquals( + ColumnDTO.builder() + .withName("_hoodie_record_key") + .withDataType(Types.StringType.get()) + .build(), + columns[2]); + Assertions.assertEquals( + ColumnDTO.builder() + .withName("_hoodie_partition_path") + .withDataType(Types.StringType.get()) + .build(), + columns[3]); + Assertions.assertEquals( + ColumnDTO.builder() + .withName("_hoodie_file_name") + .withDataType(Types.StringType.get()) + .build(), + columns[4]); + Assertions.assertEquals( + ColumnDTO.builder().withName("ts").withDataType(Types.LongType.get()).build(), + columns[5]); + Assertions.assertEquals( + ColumnDTO.builder().withName("uuid").withDataType(Types.StringType.get()).build(), + columns[6]); + Assertions.assertEquals( + ColumnDTO.builder().withName("rider").withDataType(Types.StringType.get()).build(), + columns[7]); + Assertions.assertEquals( + ColumnDTO.builder().withName("driver").withDataType(Types.StringType.get()).build(), + columns[8]); + Assertions.assertEquals( + ColumnDTO.builder().withName("fare").withDataType(Types.DoubleType.get()).build(), + columns[9]); + Assertions.assertEquals( + ColumnDTO.builder().withName("city").withDataType(Types.StringType.get()).build(), + columns[10]); + } + } + + private static void createHudiTables() { + sparkSession = + SparkSession.builder() + .master("local[1]") + .appName("Hudi Catalog integration test") + .config("hive.metastore.uris", hmsURI) + .config( + "spark.sql.warehouse.dir", + String.format( + "hdfs://%s:%d/user/hive/warehouse-catalog-hudi", + containerSuite.getHiveContainer().getContainerIpAddress(), + HiveContainer.HDFS_DEFAULTFS_PORT)) + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") + .config( + "spark.sql.catalog.spark_catalog", + "org.apache.spark.sql.hudi.catalog.HoodieCatalog") + .config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar") + .config("dfs.replication", "1") + .enableHiveSupport() + .getOrCreate(); + + sparkSession.sql( + String.format("CREATE DATABASE IF NOT EXISTS %s LOCATION '%s'", DB_NAME, DB_LOCATION)); + + sparkSession.sql( + String.format( + "CREATE TABLE %s.%s (\n" + + " ts BIGINT,\n" + + " uuid STRING,\n" + + " rider STRING,\n" + + " driver STRING,\n" + + " fare DOUBLE,\n" + + " city STRING\n" + + ") USING HUDI TBLPROPERTIES (type = 'cow') \n" + + "PARTITIONED BY (city)", + DB_NAME, COW_TABLE)); + sparkSession.sql( + String.format( + "INSERT INTO %s.%s\n" + + "VALUES\n" + + "(1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai')", + DB_NAME, COW_TABLE)); + + sparkSession.sql( + String.format( + "CREATE TABLE %s.%s (\n" + + " ts BIGINT,\n" + + " uuid STRING,\n" + + " rider STRING,\n" + + " driver STRING,\n" + + " fare DOUBLE,\n" + + " city STRING\n" + + ") USING HUDI TBLPROPERTIES (type = 'mor') \n" + + "PARTITIONED BY (city)", + DB_NAME, MOR_TABLE)); + sparkSession.sql( + String.format( + "INSERT INTO %s.%s\n" + + "VALUES\n" + + "(1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai')", + DB_NAME, MOR_TABLE)); + + sparkSession.sql( + String.format( + "CREATE TABLE %s.%s (\n" + + " ts BIGINT,\n" + + " uuid STRING,\n" + + " rider STRING,\n" + + " driver STRING,\n" + + " fare DOUBLE,\n" + + " city STRING\n" + + ") USING HUDI\n" + + "PARTITIONED BY (city)", + DB_NAME, DATA_TABLE_NAME)); + + sparkSession.sql( + String.format( + "INSERT INTO %s.%s\n" + + "VALUES\n" + + "(1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),\n" + + "(1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai')", + DB_NAME, DATA_TABLE_NAME)); + } +} diff --git a/catalogs/catalog-lakehouse-hudi/src/test/resources/log4j2.properties b/catalogs/catalog-lakehouse-hudi/src/test/resources/log4j2.properties new file mode 100644 index 00000000000..062b0a5c77c --- /dev/null +++ b/catalogs/catalog-lakehouse-hudi/src/test/resources/log4j2.properties @@ -0,0 +1,73 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Set to debug or trace if log4j initialization is failing +status = info + +# Name of the configuration +name = ConsoleLogConfig + +# Console appender configuration +appender.console.type = Console +appender.console.name = consoleLogger +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p [%t] %c{1}:%L - %m%n + +# Log files location +property.logPath = ${sys:gravitino.log.path:-build/catalog-lakehouse-hudi-integration-test.log} + +# File appender configuration +appender.file.type = File +appender.file.name = fileLogger +appender.file.fileName = ${logPath} +appender.file.layout.type = PatternLayout +appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5p %c - %m%n + +# Root logger level +rootLogger.level = info + +# Root logger referring to console and file appenders +rootLogger.appenderRef.stdout.ref = consoleLogger +rootLogger.appenderRef.file.ref = fileLogger + +# File appender configuration for testcontainers +appender.testcontainersFile.type = File +appender.testcontainersFile.name = testcontainersLogger +appender.testcontainersFile.fileName = build/testcontainers.log +appender.testcontainersFile.layout.type = PatternLayout +appender.testcontainersFile.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5p %c - %m%n + +# Logger for testcontainers +logger.testcontainers.name = org.testcontainers +logger.testcontainers.level = debug +logger.testcontainers.additivity = false +logger.testcontainers.appenderRef.file.ref = testcontainersLogger + +logger.tc.name = tc +logger.tc.level = debug +logger.tc.additivity = false +logger.tc.appenderRef.file.ref = testcontainersLogger + +logger.docker.name = com.github.dockerjava +logger.docker.level = warn +logger.docker.additivity = false +logger.docker.appenderRef.file.ref = testcontainersLogger + +logger.http.name = com.github.dockerjava.zerodep.shaded.org.apache.hc.client5.http.wire +logger.http.level = off \ No newline at end of file diff --git a/catalogs/hive-metastore-common/build.gradle.kts b/catalogs/hive-metastore-common/build.gradle.kts index 0f023acb377..539c8291dd1 100644 --- a/catalogs/hive-metastore-common/build.gradle.kts +++ b/catalogs/hive-metastore-common/build.gradle.kts @@ -45,7 +45,7 @@ dependencies { exclude("com.google.code.findbugs", "sr305") exclude("com.tdunning", "json") exclude("com.zaxxer", "HikariCP") - exclude("io.dropwizard.metricss") + exclude("io.dropwizard.metrics") exclude("javax.transaction", "transaction-api") exclude("org.apache.ant") exclude("org.apache.avro") diff --git a/common/src/main/java/org/apache/gravitino/dto/util/DTOConverters.java b/common/src/main/java/org/apache/gravitino/dto/util/DTOConverters.java index adc1f5f03e0..d12b141ff3a 100644 --- a/common/src/main/java/org/apache/gravitino/dto/util/DTOConverters.java +++ b/common/src/main/java/org/apache/gravitino/dto/util/DTOConverters.java @@ -849,7 +849,7 @@ public static SortOrder fromDTO(SortOrderDTO sortOrderDTO) { */ public static SortOrder[] fromDTOs(SortOrderDTO[] sortOrderDTO) { if (ArrayUtils.isEmpty(sortOrderDTO)) { - return new SortOrder[0]; + return SortOrders.NONE; } return Arrays.stream(sortOrderDTO).map(DTOConverters::fromDTO).toArray(SortOrder[]::new); @@ -863,7 +863,7 @@ public static SortOrder[] fromDTOs(SortOrderDTO[] sortOrderDTO) { */ public static Transform[] fromDTOs(Partitioning[] partitioning) { if (ArrayUtils.isEmpty(partitioning)) { - return new Transform[0]; + return Transforms.EMPTY_TRANSFORM; } return Arrays.stream(partitioning).map(DTOConverters::fromDTO).toArray(Transform[]::new); } diff --git a/flink-connector/flink/build.gradle.kts b/flink-connector/flink/build.gradle.kts index c4e75200ec6..0cc8148765a 100644 --- a/flink-connector/flink/build.gradle.kts +++ b/flink-connector/flink/build.gradle.kts @@ -146,7 +146,7 @@ dependencies { exclude("com.google.code.findbugs", "sr305") exclude("com.tdunning", "json") exclude("com.zaxxer", "HikariCP") - exclude("io.dropwizard.metricss") + exclude("io.dropwizard.metrics") exclude("javax.transaction", "transaction-api") exclude("org.apache.avro") exclude("org.apache.curator") diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 44a4737d210..09cfbea2ce8 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -102,6 +102,7 @@ datanucleus-core = "4.1.17" datanucleus-api-jdo = "4.2.4" datanucleus-rdbms = "4.1.19" datanucleus-jdo = "3.2.0-m3" +hudi = "0.15.0" [libraries] protobuf-java = { group = "com.google.protobuf", name = "protobuf-java", version.ref = "protoc" } diff --git a/integration-test-common/src/test/resources/log4j2.properties b/integration-test-common/src/test/resources/log4j2.properties new file mode 100644 index 00000000000..b22faa0a547 --- /dev/null +++ b/integration-test-common/src/test/resources/log4j2.properties @@ -0,0 +1,73 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Set to debug or trace if log4j initialization is failing +status = info + +# Name of the configuration +name = ConsoleLogConfig + +# Console appender configuration +appender.console.type = Console +appender.console.name = consoleLogger +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p [%t] %c{1}:%L - %m%n + +# Log files location +property.logPath = ${sys:gravitino.log.path:-build/integration-test-common.log} + +# File appender configuration +appender.file.type = File +appender.file.name = fileLogger +appender.file.fileName = ${logPath} +appender.file.layout.type = PatternLayout +appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5p %c - %m%n + +# Root logger level +rootLogger.level = info + +# Root logger referring to console and file appenders +rootLogger.appenderRef.stdout.ref = consoleLogger +rootLogger.appenderRef.file.ref = fileLogger + +# File appender configuration for testcontainers +appender.testcontainersFile.type = File +appender.testcontainersFile.name = testcontainersLogger +appender.testcontainersFile.fileName = build/testcontainers.log +appender.testcontainersFile.layout.type = PatternLayout +appender.testcontainersFile.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5p %c - %m%n + +# Logger for testcontainers +logger.testcontainers.name = org.testcontainers +logger.testcontainers.level = debug +logger.testcontainers.additivity = false +logger.testcontainers.appenderRef.file.ref = testcontainersLogger + +logger.tc.name = tc +logger.tc.level = debug +logger.tc.additivity = false +logger.tc.appenderRef.file.ref = testcontainersLogger + +logger.docker.name = com.github.dockerjava +logger.docker.level = warn +logger.docker.additivity = false +logger.docker.appenderRef.file.ref = testcontainersLogger + +logger.http.name = com.github.dockerjava.zerodep.shaded.org.apache.hc.client5.http.wire +logger.http.level = off \ No newline at end of file diff --git a/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java b/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java index 3232c293bcd..e383c65b7a4 100644 --- a/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java +++ b/server/src/main/java/org/apache/gravitino/server/GravitinoServer.java @@ -48,6 +48,7 @@ import org.apache.gravitino.server.web.ui.WebUIFilter; import org.apache.gravitino.tag.TagManager; import org.glassfish.hk2.utilities.binding.AbstractBinder; +import org.glassfish.jersey.CommonProperties; import org.glassfish.jersey.jackson.JacksonFeature; import org.glassfish.jersey.server.ResourceConfig; import org.glassfish.jersey.servlet.ServletContainer; @@ -119,6 +120,7 @@ protected void configure() { register(JsonParseExceptionMapper.class); register(JsonMappingExceptionMapper.class); register(ObjectMapperProvider.class).register(JacksonFeature.class); + property(CommonProperties.JSON_JACKSON_DISABLED_MODULES, "DefaultScalaModule"); if (!enableAuthorization) { register(AccessControlNotAllowedFilter.class); diff --git a/spark-connector/spark-common/build.gradle.kts b/spark-connector/spark-common/build.gradle.kts index 2d875204628..7f3c66aa6e6 100644 --- a/spark-connector/spark-common/build.gradle.kts +++ b/spark-connector/spark-common/build.gradle.kts @@ -91,7 +91,7 @@ dependencies { exclude("com.google.code.findbugs", "sr305") exclude("com.tdunning", "json") exclude("com.zaxxer", "HikariCP") - exclude("io.dropwizard.metricss") + exclude("io.dropwizard.metrics") exclude("javax.transaction", "transaction-api") exclude("org.apache.avro") exclude("org.apache.curator") diff --git a/spark-connector/v3.3/spark/build.gradle.kts b/spark-connector/v3.3/spark/build.gradle.kts index 2a8ed3f97ce..c4c417d62ef 100644 --- a/spark-connector/v3.3/spark/build.gradle.kts +++ b/spark-connector/v3.3/spark/build.gradle.kts @@ -98,7 +98,7 @@ dependencies { exclude("com.tdunning", "json") exclude("com.zaxxer", "HikariCP") exclude("com.sun.jersey") - exclude("io.dropwizard.metricss") + exclude("io.dropwizard.metrics") exclude("javax.transaction", "transaction-api") exclude("org.apache.avro") exclude("org.apache.curator") diff --git a/spark-connector/v3.4/spark/build.gradle.kts b/spark-connector/v3.4/spark/build.gradle.kts index bf51bc8690c..f3308fca34b 100644 --- a/spark-connector/v3.4/spark/build.gradle.kts +++ b/spark-connector/v3.4/spark/build.gradle.kts @@ -98,7 +98,7 @@ dependencies { exclude("com.tdunning", "json") exclude("com.zaxxer", "HikariCP") exclude("com.sun.jersey") - exclude("io.dropwizard.metricss") + exclude("io.dropwizard.metrics") exclude("javax.transaction", "transaction-api") exclude("org.apache.avro") exclude("org.apache.curator") diff --git a/spark-connector/v3.5/spark/build.gradle.kts b/spark-connector/v3.5/spark/build.gradle.kts index 170a4cc63f5..7b8cc8447b7 100644 --- a/spark-connector/v3.5/spark/build.gradle.kts +++ b/spark-connector/v3.5/spark/build.gradle.kts @@ -100,7 +100,7 @@ dependencies { exclude("com.tdunning", "json") exclude("com.zaxxer", "HikariCP") exclude("com.sun.jersey") - exclude("io.dropwizard.metricss") + exclude("io.dropwizard.metrics") exclude("javax.transaction", "transaction-api") exclude("org.apache.avro") exclude("org.apache.curator")