From 2b9fb6b31041881db77054b11b1082d354f63733 Mon Sep 17 00:00:00 2001 From: cai can <94670132+caican00@users.noreply.github.com> Date: Thu, 16 May 2024 12:02:22 +0800 Subject: [PATCH] [#3193] feat(spark-connector): Support Iceberg RestCatalog in spark-connector (#3194) ### What changes were proposed in this pull request? Support Iceberg `RestCatalog` in spark-connector. Add IT for Iceberg `RestCatalog` with HiveCatalog backend. ### Why are the changes needed? without this, spark-connector does not support Iceberg `RestCatalog` Fix: https://github.com/datastrato/gravitino/issues/3193 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New ITs. --- integration-test/build.gradle.kts | 2 + .../integration/test/spark/SparkEnvIT.java | 63 ++++++++++++++++++- .../SparkIcebergCatalogHiveBackendIT.java | 5 +- .../SparkIcebergCatalogRestBackendIT.java | 31 +++++++++ .../iceberg/IcebergPropertiesConstants.java | 9 ++- .../iceberg/IcebergPropertiesConverter.java | 24 +++++++ .../TestIcebergPropertiesConverter.java | 24 +++++++ 7 files changed, 154 insertions(+), 4 deletions(-) create mode 100644 integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogRestBackendIT.java diff --git a/integration-test/build.gradle.kts b/integration-test/build.gradle.kts index 65768e8f251..7f740c5032a 100644 --- a/integration-test/build.gradle.kts +++ b/integration-test/build.gradle.kts @@ -118,6 +118,8 @@ dependencies { } testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion") testImplementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion") + testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion") + testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion") testImplementation(libs.okhttp3.loginterceptor) testImplementation(libs.postgresql.driver) diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkEnvIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkEnvIT.java index 58ffd104e28..7094434cbbd 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkEnvIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/SparkEnvIT.java @@ -7,14 +7,19 @@ import com.datastrato.gravitino.Catalog; import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.auxiliary.AuxiliaryServiceManager; import com.datastrato.gravitino.client.GravitinoMetalake; import com.datastrato.gravitino.integration.test.container.ContainerSuite; import com.datastrato.gravitino.integration.test.container.HiveContainer; +import com.datastrato.gravitino.integration.test.util.AbstractIT; import com.datastrato.gravitino.integration.test.util.spark.SparkUtilIT; +import com.datastrato.gravitino.server.web.JettyServerConfig; import com.datastrato.gravitino.spark.connector.GravitinoSparkConfig; +import com.datastrato.gravitino.spark.connector.iceberg.IcebergPropertiesConstants; import com.datastrato.gravitino.spark.connector.plugin.GravitinoSparkPlugin; import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -30,9 +35,11 @@ public abstract class SparkEnvIT extends SparkUtilIT { private static final Logger LOG = LoggerFactory.getLogger(SparkEnvIT.class); private static final ContainerSuite containerSuite = ContainerSuite.getInstance(); + protected static final String icebergRestServiceName = "iceberg-rest"; protected String hiveMetastoreUri = "thrift://127.0.0.1:9083"; protected String warehouse; protected FileSystem hdfs; + protected String icebergRestServiceUri; private final String metalakeName = "test"; private SparkSession sparkSession; @@ -51,8 +58,15 @@ protected SparkSession getSparkSession() { } @BeforeAll - void startUp() { + void startUp() throws Exception { initHiveEnv(); + // initialize the hiveMetastoreUri and warehouse at first to inject properties to + // IcebergRestService + if ("lakehouse-iceberg".equalsIgnoreCase(getProvider())) { + initIcebergRestServiceEnv(); + } + // Start Gravitino server + AbstractIT.startIntegrationTest(); initHdfsFileSystem(); initGravitinoEnv(); initMetalakeAndCatalogs(); @@ -64,7 +78,7 @@ void startUp() { } @AfterAll - void stop() { + void stop() throws IOException, InterruptedException { if (hdfs != null) { try { hdfs.close(); @@ -75,8 +89,19 @@ void stop() { if (sparkSession != null) { sparkSession.close(); } + AbstractIT.stopIntegrationTest(); } + // AbstractIT#startIntegrationTest() is static, so we couldn't update the value of + // ignoreIcebergRestService + // if startIntegrationTest() is auto invoked by Junit. So here we override + // startIntegrationTest() to disable the auto invoke by junit. + @BeforeAll + public static void startIntegrationTest() {} + + @AfterAll + public static void stopIntegrationTest() {} + private void initMetalakeAndCatalogs() { client.createMetalake(NameIdentifier.of(metalakeName), "", Collections.emptyMap()); GravitinoMetalake metalake = client.loadMetalake(NameIdentifier.of(metalakeName)); @@ -93,6 +118,7 @@ private void initGravitinoEnv() { // Gravitino server is already started by AbstractIT, just construct gravitinoUrl int gravitinoPort = getGravitinoServerPort(); gravitinoUri = String.format("http://127.0.0.1:%d", gravitinoPort); + icebergRestServiceUri = getIcebergRestServiceUri(); } private void initHiveEnv() { @@ -109,6 +135,30 @@ private void initHiveEnv() { HiveContainer.HDFS_DEFAULTFS_PORT); } + private void initIcebergRestServiceEnv() { + ignoreIcebergRestService = false; + Map icebergRestServiceConfigs = new HashMap<>(); + icebergRestServiceConfigs.put( + AuxiliaryServiceManager.GRAVITINO_AUX_SERVICE_PREFIX + + icebergRestServiceName + + "." + + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND, + IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_HIVE); + icebergRestServiceConfigs.put( + AuxiliaryServiceManager.GRAVITINO_AUX_SERVICE_PREFIX + + icebergRestServiceName + + "." + + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI, + hiveMetastoreUri); + icebergRestServiceConfigs.put( + AuxiliaryServiceManager.GRAVITINO_AUX_SERVICE_PREFIX + + icebergRestServiceName + + "." + + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, + warehouse); + AbstractIT.registerCustomConfigs(icebergRestServiceConfigs); + } + private void initHdfsFileSystem() { Configuration conf = new Configuration(); conf.set( @@ -139,4 +189,13 @@ private void initSparkEnv() { .enableHiveSupport() .getOrCreate(); } + + private String getIcebergRestServiceUri() { + JettyServerConfig jettyServerConfig = + JettyServerConfig.fromConfig( + serverConfig, + AuxiliaryServiceManager.GRAVITINO_AUX_SERVICE_PREFIX + icebergRestServiceName + "."); + return String.format( + "http://%s:%d/iceberg/", jettyServerConfig.getHost(), jettyServerConfig.getHttpPort()); + } } diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogHiveBackendIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogHiveBackendIT.java index e7c3b1328e0..a3f149c128a 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogHiveBackendIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogHiveBackendIT.java @@ -10,6 +10,7 @@ import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.TestInstance; +/** This class use Iceberg HiveCatalog for backend catalog. */ @Tag("gravitino-docker-it") @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class SparkIcebergCatalogHiveBackendIT extends SparkIcebergCatalogIT { @@ -17,7 +18,9 @@ public class SparkIcebergCatalogHiveBackendIT extends SparkIcebergCatalogIT { @Override protected Map getCatalogConfigs() { Map catalogProperties = Maps.newHashMap(); - catalogProperties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND, "hive"); + catalogProperties.put( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND, + IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_HIVE); catalogProperties.put( IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, warehouse); catalogProperties.put( diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogRestBackendIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogRestBackendIT.java new file mode 100644 index 00000000000..078ee54bb8d --- /dev/null +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/spark/iceberg/SparkIcebergCatalogRestBackendIT.java @@ -0,0 +1,31 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.integration.test.spark.iceberg; + +import com.datastrato.gravitino.spark.connector.iceberg.IcebergPropertiesConstants; +import com.google.common.collect.Maps; +import java.util.Map; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.TestInstance; + +/** This class use Iceberg RESTCatalog for test, and the real backend catalog is HiveCatalog. */ +@Tag("gravitino-docker-it") +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class SparkIcebergCatalogRestBackendIT extends SparkIcebergCatalogIT { + + @Override + protected Map getCatalogConfigs() { + Map catalogProperties = Maps.newHashMap(); + catalogProperties.put( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND, + IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_REST); + catalogProperties.put( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI, icebergRestServiceUri); + catalogProperties.put( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, warehouse); + + return catalogProperties; + } +} diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java index 3f8591dec28..0ccfc359e71 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java @@ -38,11 +38,18 @@ public class IcebergPropertiesConstants { static final String ICEBERG_CATALOG_JDBC_PASSWORD = IcebergCatalogPropertiesMetadata.ICEBERG_JDBC_PASSWORD; + @VisibleForTesting + public static final String ICEBERG_CATALOG_BACKEND_HIVE = CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE; + static final String GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE = "hive"; - static final String ICEBERG_CATALOG_BACKEND_HIVE = CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE; static final String GRAVITINO_ICEBERG_CATALOG_BACKEND_JDBC = "jdbc"; static final String ICEBERG_CATALOG_BACKEND_JDBC = "jdbc"; + @VisibleForTesting + public static final String ICEBERG_CATALOG_BACKEND_REST = CatalogUtil.ICEBERG_CATALOG_TYPE_REST; + + static final String GRAVITINO_ICEBERG_CATALOG_BACKEND_REST = "rest"; + private IcebergPropertiesConstants() {} } diff --git a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java index 58c2a09c9d5..3810f91f299 100644 --- a/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java +++ b/spark-connector/spark-connector/src/main/java/com/datastrato/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java @@ -44,6 +44,9 @@ public Map toSparkCatalogProperties(Map properti case IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_JDBC: initJdbcProperties(properties, all); break; + case IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_REST: + initRestProperties(properties, all); + break; default: // SparkCatalog does not support Memory type catalog throw new IllegalArgumentException( @@ -123,4 +126,25 @@ private void initJdbcProperties( icebergProperties.put(IcebergPropertiesConstants.ICEBERG_CATALOG_JDBC_USER, jdbcUser); icebergProperties.put(IcebergPropertiesConstants.ICEBERG_CATALOG_JDBC_PASSWORD, jdbcPassword); } + + private void initRestProperties( + Map gravitinoProperties, HashMap icebergProperties) { + String restUri = + gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI); + Preconditions.checkArgument( + StringUtils.isNotBlank(restUri), + "Couldn't get " + + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI + + " from Iceberg Catalog properties"); + icebergProperties.put( + IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE, + IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_REST); + icebergProperties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI, restUri); + if (gravitinoProperties.containsKey( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE)) { + icebergProperties.put( + IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE, + gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE)); + } + } } diff --git a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/iceberg/TestIcebergPropertiesConverter.java b/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/iceberg/TestIcebergPropertiesConverter.java index 549fb503db5..a20fdf239fe 100644 --- a/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/iceberg/TestIcebergPropertiesConverter.java +++ b/spark-connector/spark-connector/src/test/java/com/datastrato/gravitino/spark/connector/iceberg/TestIcebergPropertiesConverter.java @@ -69,4 +69,28 @@ void testCatalogPropertiesWithJdbcBackend() { "passwd"), properties); } + + @Test + void testCatalogPropertiesWithRestBackend() { + Map properties = + icebergPropertiesConverter.toSparkCatalogProperties( + ImmutableMap.of( + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND, + IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_REST, + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI, + "rest-uri", + IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, + "rest-warehouse", + "key1", + "value1")); + Assertions.assertEquals( + ImmutableMap.of( + IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE, + IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_REST, + IcebergPropertiesConstants.ICEBERG_CATALOG_URI, + "rest-uri", + IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE, + "rest-warehouse"), + properties); + } }