Skip to content

Commit

Permalink
[apache#4718] fix(iceberg): use unified logic to transform catalog ba…
Browse files Browse the repository at this point in the history
…ckend name to handle the renaming of catalog (apache#4986)

### What changes were proposed in this pull request?

1. Spark,Trino, Iceberg catalog and Iceberg REST server use
`getCatalogBackendName` to get catalog backend name.
2. change the default backend name to catalog backend, like `jdbc`, it
will not change after rename.

### Why are the changes needed?

Fix: apache#4718 

### Does this PR introduce _any_ user-facing change?
Yes

### How was this patch tested?
1. create a jdbc catalog with catalog backend name, check whether can
see the schema after rename
1. create a jdbc catalog without catalog backend name, check whether can
see the schema after rename

---------

Co-authored-by: FANNG <[email protected]>
  • Loading branch information
github-actions[bot] and FANNG1 authored Sep 23, 2024
1 parent 720a67a commit a639390
Show file tree
Hide file tree
Showing 11 changed files with 109 additions and 38 deletions.
5 changes: 5 additions & 0 deletions catalogs/catalog-common/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,9 @@ plugins {
dependencies {
implementation(libs.slf4j.api)
implementation(libs.guava)

testImplementation(libs.junit.jupiter.api)
testImplementation(libs.junit.jupiter.params)

testRuntimeOnly(libs.junit.jupiter.engine)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;

public class IcebergPropertiesUtils {

Expand Down Expand Up @@ -66,4 +68,22 @@ public static Map<String, String> toIcebergCatalogProperties(
});
return icebergProperties;
}

/**
* Get catalog backend name from Gravitino catalog properties.
*
* @param catalogProperties a map of Gravitino catalog properties.
* @return catalog backend name.
*/
public static String getCatalogBackendName(Map<String, String> catalogProperties) {
String backendName = catalogProperties.get(IcebergConstants.CATALOG_BACKEND_NAME);
if (backendName != null) {
return backendName;
}

String catalogBackend = catalogProperties.get(IcebergConstants.CATALOG_BACKEND);
return Optional.ofNullable(catalogBackend)
.map(s -> s.toLowerCase(Locale.ROOT))
.orElse("memory");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.lakehouse.iceberg;

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class TestIcebergPropertiesUtils {

@Test
void testGetCatalogBackendName() {
Map<String, String> catalogProperties =
ImmutableMap.of(
IcebergConstants.CATALOG_BACKEND_NAME, "a", IcebergConstants.CATALOG_BACKEND, "jdbc");
String backendName = IcebergPropertiesUtils.getCatalogBackendName(catalogProperties);
Assertions.assertEquals("a", backendName);

catalogProperties = ImmutableMap.of(IcebergConstants.CATALOG_BACKEND, "jdbc");
backendName = IcebergPropertiesUtils.getCatalogBackendName(catalogProperties);
Assertions.assertEquals("jdbc", backendName);

catalogProperties = ImmutableMap.of(IcebergConstants.CATALOG_BACKEND, "JDBC");
backendName = IcebergPropertiesUtils.getCatalogBackendName(catalogProperties);
Assertions.assertEquals("jdbc", backendName);

catalogProperties = ImmutableMap.of(IcebergConstants.CATALOG_BACKEND, "hive");
backendName = IcebergPropertiesUtils.getCatalogBackendName(catalogProperties);
Assertions.assertEquals("hive", backendName);

catalogProperties = ImmutableMap.of();
backendName = IcebergPropertiesUtils.getCatalogBackendName(catalogProperties);
Assertions.assertEquals("memory", backendName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,6 @@ public void initialize(
Map<String, String> resultConf = Maps.newHashMap(prefixMap);
resultConf.putAll(gravitinoConfig);
resultConf.put("catalog_uuid", info.id().toString());
if (!resultConf.containsKey(IcebergCatalogPropertiesMetadata.CATALOG_BACKEND_NAME)) {
resultConf.put(IcebergCatalogPropertiesMetadata.CATALOG_BACKEND_NAME, info.name());
}
IcebergConfig icebergConfig = new IcebergConfig(resultConf);

this.icebergTableOps = new IcebergTableOps(icebergConfig);
Expand Down
18 changes: 6 additions & 12 deletions docs/lakehouse-iceberg-catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,12 @@ Builds with Apache Iceberg `1.5.2`. The Apache Iceberg table format version is `

### Catalog properties

| Property name | Description | Default value | Required | Since Version |
|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------|-------------------------------------------------------------|---------------|
| `catalog-backend` | Catalog backend of Gravitino Iceberg catalog. Supports `hive` or `jdbc` or `rest`. | (none) | Yes | 0.2.0 |
| `uri` | The URI configuration of the Iceberg catalog. `thrift://127.0.0.1:9083` or `jdbc:postgresql://127.0.0.1:5432/db_name` or `jdbc:mysql://127.0.0.1:3306/metastore_db` or `http://127.0.0.1:9001`. | (none) | Yes | 0.2.0 |
| `warehouse` | Warehouse directory of catalog. `file:///user/hive/warehouse-hive/` for local fs or `hdfs://namespace/hdfs/path` for HDFS. | (none) | Yes | 0.2.0 |
| `catalog-backend-name` | The catalog name passed to underlying Iceberg catalog backend. Catalog name in JDBC backend is used to isolate namespace and tables. | Gravitino catalog name | No | 0.5.2 |
| `authentication.type` | The type of authentication for Iceberg catalog backend, currently Gravitino only supports `Kerberos`, `simple`. | `simple` | No | 0.6.0 |
| `authentication.impersonation-enable` | Whether to enable impersonation for the Iceberg catalog | `false` | No | 0.6.0 |
| `authentication.kerberos.principal` | The principal of the Kerberos authentication | (none) | required if the value of `authentication.type` is Kerberos. | 0.6.0 |
| `authentication.kerberos.keytab-uri` | The URI of The keytab for the Kerberos authentication. | (none) | required if the value of `authentication.type` is Kerberos. | 0.6.0 |
| `authentication.kerberos.check-interval-sec` | The check interval of Kerberos credential for Iceberg catalog. | 60 | No | 0.6.0 |
| `authentication.kerberos.keytab-fetch-timeout-sec` | The fetch timeout of retrieving Kerberos keytab from `authentication.kerberos.keytab-uri`. | 60 | No | 0.6.0 |
| Property name | Description | Default value | Required | Since Version |
|----------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------|-------------------------------------------------------------|---------------|
| `catalog-backend` | Catalog backend of Gravitino Iceberg catalog. Supports `hive` or `jdbc` or `rest`. | (none) | Yes | 0.2.0 |
| `uri` | The URI configuration of the Iceberg catalog. `thrift://127.0.0.1:9083` or `jdbc:postgresql://127.0.0.1:5432/db_name` or `jdbc:mysql://127.0.0.1:3306/metastore_db` or `http://127.0.0.1:9001`. | (none) | Yes | 0.2.0 |
| `warehouse` | Warehouse directory of catalog. `file:///user/hive/warehouse-hive/` for local fs or `hdfs://namespace/hdfs/path` for HDFS. | (none) | Yes | 0.2.0 |
| `catalog-backend-name` | The catalog name passed to underlying Iceberg catalog backend. Catalog name in JDBC backend is used to isolate namespace and tables. | The property value of `catalog-backend`, like `jdbc` for JDBC catalog backend. | No | 0.5.2 |


Any properties not defined by Gravitino with `gravitino.bypass.` prefix will pass to Iceberg catalog properties and HDFS configuration. For example, if specify `gravitino.bypass.list-all-tables`, `list-all-tables` will pass to Iceberg catalog properties.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Config;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
Expand Down Expand Up @@ -162,8 +161,8 @@ public String getJdbcDriver() {
return get(JDBC_DRIVER);
}

public String getCatalogBackendName(String defaultCatalogBackendName) {
return Optional.ofNullable(get(CATALOG_BACKEND_NAME)).orElse(defaultCatalogBackendName);
public String getCatalogBackendName() {
return IcebergPropertiesUtils.getCatalogBackendName(getAllConfig());
}

public IcebergConfig(Map<String, String> properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class IcebergCatalogUtil {
private static final Logger LOG = LoggerFactory.getLogger(IcebergCatalogUtil.class);

private static InMemoryCatalog loadMemoryCatalog(IcebergConfig icebergConfig) {
String icebergCatalogName = icebergConfig.getCatalogBackendName("memory");
String icebergCatalogName = icebergConfig.getCatalogBackendName();
InMemoryCatalog memoryCatalog = new InMemoryCatalog();
Map<String, String> resultProperties = icebergConfig.getIcebergCatalogProperties();
resultProperties.put(CatalogProperties.WAREHOUSE_LOCATION, "/tmp");
Expand All @@ -63,7 +63,7 @@ private static InMemoryCatalog loadMemoryCatalog(IcebergConfig icebergConfig) {
private static HiveCatalog loadHiveCatalog(IcebergConfig icebergConfig) {
ClosableHiveCatalog hiveCatalog = new ClosableHiveCatalog();
HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
String icebergCatalogName = icebergConfig.getCatalogBackendName("hive");
String icebergCatalogName = icebergConfig.getCatalogBackendName();

Map<String, String> properties = icebergConfig.getIcebergCatalogProperties();
properties.forEach(hdfsConfiguration::set);
Expand Down Expand Up @@ -110,7 +110,7 @@ private static KerberosClient initKerberosAndReturnClient(

private static JdbcCatalog loadJdbcCatalog(IcebergConfig icebergConfig) {
String driverClassName = icebergConfig.getJdbcDriver();
String icebergCatalogName = icebergConfig.getCatalogBackendName("jdbc");
String icebergCatalogName = icebergConfig.getCatalogBackendName();

Map<String, String> properties = icebergConfig.getIcebergCatalogProperties();
Preconditions.checkNotNull(
Expand All @@ -136,7 +136,7 @@ private static JdbcCatalog loadJdbcCatalog(IcebergConfig icebergConfig) {
}

private static Catalog loadRestCatalog(IcebergConfig icebergConfig) {
String icebergCatalogName = icebergConfig.getCatalogBackendName("rest");
String icebergCatalogName = icebergConfig.getCatalogBackendName();
RESTCatalog restCatalog = new RESTCatalog();
HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
Map<String, String> properties = icebergConfig.getIcebergCatalogProperties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.Optional;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.spark.connector.PropertiesConverter;
import org.apache.gravitino.spark.connector.SparkTransformConverter;
Expand Down Expand Up @@ -58,10 +58,7 @@ public class GravitinoIcebergCatalog extends BaseCatalog
@Override
protected TableCatalog createAndInitSparkCatalog(
String name, CaseInsensitiveStringMap options, Map<String, String> properties) {
String catalogBackendName =
Optional.ofNullable(
properties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_NAME))
.orElse(name);
String catalogBackendName = IcebergPropertiesUtils.getCatalogBackendName(properties);
Map<String, String> all =
getPropertiesConverter().toSparkCatalogProperties(options, properties);
TableCatalog icebergCatalog = new SparkCatalog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Set;
import org.apache.commons.collections4.bidimap.TreeBidiMap;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils;
import org.apache.gravitino.catalog.property.PropertyConverter;
import org.apache.gravitino.trino.connector.GravitinoErrorCode;

Expand Down Expand Up @@ -345,17 +346,22 @@ private Map<String, String> buildJDBCBackendProperties(Map<String, String> prope

Map<String, String> jdbcProperties = new HashMap<>();
jdbcProperties.put("iceberg.catalog.type", "jdbc");
jdbcProperties.put("iceberg.jdbc-catalog.driver-class", properties.get("jdbc-driver"));
jdbcProperties.put("iceberg.jdbc-catalog.connection-url", properties.get("uri"));
jdbcProperties.put("iceberg.jdbc-catalog.connection-user", properties.get("jdbc-user"));
jdbcProperties.put("iceberg.jdbc-catalog.connection-password", properties.get("jdbc-password"));
jdbcProperties.put("iceberg.jdbc-catalog.default-warehouse-dir", properties.get("warehouse"));
jdbcProperties.put(
"iceberg.jdbc-catalog.driver-class",
properties.get(IcebergConstants.GRAVITINO_JDBC_DRIVER));
jdbcProperties.put("iceberg.jdbc-catalog.connection-url", properties.get(IcebergConstants.URI));
jdbcProperties.put(
"iceberg.jdbc-catalog.connection-user",
properties.get(IcebergConstants.GRAVITINO_JDBC_USER));
jdbcProperties.put(
"iceberg.jdbc-catalog.connection-password",
properties.get(IcebergConstants.GRAVITINO_JDBC_PASSWORD));
jdbcProperties.put(
"iceberg.jdbc-catalog.default-warehouse-dir", properties.get(IcebergConstants.WAREHOUSE));

// TODO (yuhui) Optimize the code for retrieve the catalogname
String catalogName = properties.get("catalog-name");
jdbcProperties.put(
"iceberg.jdbc-catalog.catalog-name",
properties.getOrDefault(IcebergConstants.CATALOG_BACKEND_NAME, catalogName));
IcebergPropertiesUtils.getCatalogBackendName(properties));

return jdbcProperties;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ public IcebergConnectorAdapter() {
@Override
public Map<String, String> buildInternalConnectorConfig(GravitinoCatalog catalog)
throws Exception {
catalog.getProperties().put("catalog-name", catalog.getName());
return catalogConverter.gravitinoToEngineProperties(catalog.getProperties());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.fasterxml.jackson.databind.json.JsonMapper;
import io.trino.spi.TrinoException;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
Expand Down Expand Up @@ -60,7 +59,7 @@ public GravitinoCatalog(String metalake, Catalog catalog) {
this.metalake = metalake;
this.provider = catalog.provider();
this.name = catalog.name();
this.properties = new HashMap<>(catalog.properties());
this.properties = catalog.properties();
Instant time =
catalog.auditInfo().lastModifiedTime() == null
? catalog.auditInfo().createTime()
Expand Down

0 comments on commit a639390

Please sign in to comment.