Skip to content

Commit

Permalink
[apache#4176]feat(iceberg) support multiple catalogs in the IcebergCo…
Browse files Browse the repository at this point in the history
…nfig
  • Loading branch information
theoryxu committed Jul 26, 2024
1 parent 033768c commit 1811070
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 51 deletions.
7 changes: 7 additions & 0 deletions common/src/main/java/org/apache/gravitino/utils/MapUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ public static Map<String, String> getPrefixMap(Map<String, String> m, String pre
return Collections.unmodifiableMap(configs);
}

/**
* Returns a map with all keys that start with the given prefix and reserve prefix
*
* @param m The map to filter.
* @param prefix The prefix to filter by.
* @return A map with all keys that start with the given prefix and reserve prefix
*/
public static Map<String, String> filterPrefixMap(Map<String, String> m, String prefix) {
Map<String, String> configs = Maps.newHashMap();
m.forEach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Streams;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -192,7 +193,7 @@ public void serviceInit(Config gravitinoConfig) {
auxServiceName,
cl -> {
Map<String, String> auxConfig =
new HashMap<>(
Maps.newHashMap(
MapUtils.getPrefixMap(serviceConfigs, DOT.join(auxServiceName, "")));
auxConfig.putAll(
MapUtils.filterPrefixMap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
package org.apache.gravitino.iceberg.common;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Config;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
Expand All @@ -30,6 +33,7 @@
import org.apache.gravitino.config.ConfigEntry;
import org.apache.gravitino.server.web.JettyServerConfig;
import org.apache.gravitino.server.web.OverwriteDefaultConfig;
import org.apache.gravitino.utils.MapUtils;

public class IcebergConfig extends Config implements OverwriteDefaultConfig {
public static final String ICEBERG_CONFIG_PREFIX = "gravitino.iceberg-rest.";
Expand Down Expand Up @@ -142,6 +146,30 @@ public IcebergConfig() {
super(false);
}

public List<String> getCatalogs() {
Map<String, Boolean> catalogs = Maps.newHashMap();
for (String key : this.getAllConfig().keySet()) {
if (!key.startsWith("catalog.")) {
continue;
}
if (key.split("\\.").length < 3) {
throw new RuntimeException(String.format("%s format is illegal", key));
}
catalogs.put(key.split("\\.")[1], true);
}
return catalogs.keySet().stream().sorted().collect(Collectors.toList());
}

public IcebergConfig getCatalogConfig(String catalog) {
Map<String, String> base = Maps.newHashMap(this.getAllConfig());
Map<String, String> merge =
MapUtils.getPrefixMap(this.getAllConfig(), String.format("catalog.%s.", catalog));
for (String key : merge.keySet()) {
base.put(key, merge.get(key));
}
return new IcebergConfig(base);
}

@Override
public Map<String, String> getOverwriteDefaultConfig() {
return ImmutableMap.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
public class IcebergTableOpsManager implements AutoCloseable {
public static final Logger LOG = LoggerFactory.getLogger(IcebergTableOpsManager.class);

public static final String DEFAULT_CATALOG = "default_catalog";

private final Map<String, IcebergTableOps> icebergTableOpsMap;

private final IcebergConfig icebergConfig;
Expand All @@ -40,62 +42,70 @@ public IcebergTableOpsManager(IcebergConfig config, EntityStore entityStore) {
}

public IcebergTableOps getOps(String prefix) {
if (!icebergConfig.get(IcebergConfig.REST_PROXY)) {
LOG.debug("server's rest-proxy is false, return default iceberg catalog");
return icebergTableOpsMap.computeIfAbsent("default", k -> new IcebergTableOps(icebergConfig));
}

if (prefix == null || prefix.length() == 0) {
LOG.debug("prefix is empty, return default iceberg catalog");
return icebergTableOpsMap.computeIfAbsent("default", k -> new IcebergTableOps(icebergConfig));
return icebergTableOpsMap.computeIfAbsent(
DEFAULT_CATALOG,
k -> new IcebergTableOps(icebergConfig.getCatalogConfig(DEFAULT_CATALOG)));
}

String[] segments = prefix.split("/");
String metalake = segments[0];
String catalog = segments[1];
return icebergTableOpsMap.computeIfAbsent(
String.format("%s_%s", metalake, catalog),
k -> {
CatalogEntity entity;
try {
entity =
entityStore.get(
NameIdentifier.of(metalake, catalog),
Entity.EntityType.CATALOG,
CatalogEntity.class);
} catch (NoSuchEntityException e) {
throw new RuntimeException(String.format("%s.%s does not exist", metalake, catalog));
} catch (IOException ioe) {
LOG.error("Failed to get {}.{}", metalake, catalog, ioe);
throw new RuntimeException(ioe);
}

if (!"lakehouse-iceberg".equals(entity.getProvider())) {
String errorMsg = String.format("%s.%s is not iceberg catalog", metalake, catalog);
LOG.error(errorMsg);
throw new RuntimeException(errorMsg);
}

Map<String, String> properties = entity.getProperties();
if (!Boolean.parseBoolean(
properties.getOrDefault(IcebergConstants.GRAVITINO_REST_PROXY, "true"))) {
String errorMsg = String.format("%s.%s rest-proxy is false", metalake, catalog);
LOG.error(errorMsg);
throw new RuntimeException(errorMsg);
}

Map<String, String> catalogProperties = new HashMap<>(properties);
catalogProperties.merge(
IcebergConstants.CATALOG_BACKEND_NAME, catalog, (oldValue, newValue) -> oldValue);
catalogProperties.put(
IcebergConstants.ICEBERG_JDBC_PASSWORD,
properties.getOrDefault(IcebergConstants.GRAVITINO_JDBC_PASSWORD, ""));
catalogProperties.put(
IcebergConstants.ICEBERG_JDBC_USER,
properties.getOrDefault(IcebergConstants.GRAVITINO_JDBC_USER, ""));

return new IcebergTableOps(new IcebergConfig(catalogProperties));
});

if (icebergConfig.get(IcebergConfig.REST_PROXY)) {
return icebergTableOpsMap.computeIfAbsent(
String.format("%s_%s", metalake, catalog),
k -> {
CatalogEntity entity;
try {
entity =
entityStore.get(
NameIdentifier.of(metalake, catalog),
Entity.EntityType.CATALOG,
CatalogEntity.class);
} catch (NoSuchEntityException e) {
throw new RuntimeException(String.format("%s.%s does not exist", metalake, catalog));
} catch (IOException ioe) {
LOG.error("Failed to get {}.{}", metalake, catalog, ioe);
throw new RuntimeException(ioe);
}

if (!"lakehouse-iceberg".equals(entity.getProvider())) {
String errorMsg = String.format("%s.%s is not iceberg catalog", metalake, catalog);
LOG.error(errorMsg);
throw new RuntimeException(errorMsg);
}

Map<String, String> properties = entity.getProperties();
if (!Boolean.parseBoolean(
properties.getOrDefault(IcebergConstants.GRAVITINO_REST_PROXY, "true"))) {
String errorMsg = String.format("%s.%s rest-proxy is false", metalake, catalog);
LOG.error(errorMsg);
throw new RuntimeException(errorMsg);
}

Map<String, String> catalogProperties = new HashMap<>(properties);
catalogProperties.merge(
IcebergConstants.CATALOG_BACKEND_NAME, catalog, (oldValue, newValue) -> oldValue);
catalogProperties.put(
IcebergConstants.ICEBERG_JDBC_PASSWORD,
properties.getOrDefault(IcebergConstants.GRAVITINO_JDBC_PASSWORD, ""));
catalogProperties.put(
IcebergConstants.ICEBERG_JDBC_USER,
properties.getOrDefault(IcebergConstants.GRAVITINO_JDBC_USER, ""));

return new IcebergTableOps(new IcebergConfig(catalogProperties));
});
} else {
if (!icebergConfig.getCatalogs().contains(catalog)) {
String errorMsg = String.format("%s is not iceberg catalog", catalog);
LOG.error(errorMsg);
throw new RuntimeException(errorMsg);
}
return icebergTableOpsMap.computeIfAbsent(
catalog, k -> new IcebergTableOps(icebergConfig.getCatalogConfig(catalog)));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.gravitino.EntityStore;
Expand All @@ -44,6 +46,7 @@ public class IcebergRestTestUtil {
private static final String V_1 = "v1";
public static final String METALAKE = "ut_metalake";
public static final String CATALOG = "ut_catalog";

public static final String PREFIX = METALAKE + "/" + CATALOG;
public static final String CONFIG_PATH = V_1 + "/config";
public static final String NAMESPACE_PATH = V_1 + "/namespaces";
Expand Down Expand Up @@ -75,8 +78,10 @@ public static ResourceConfig getIcebergResourceConfig(Class c, boolean bindIcebe
}

if (bindIcebergTableOps) {
Map<String, String> catalogConf = Maps.newHashMap();
catalogConf.put(String.format("catalog.%s.xx", CATALOG), "xxx");
IcebergTableOpsManager icebergTableOpsManager =
new IcebergTableOpsManager(new IcebergConfig(), mockEntityStore());
new IcebergTableOpsManager(new IcebergConfig(catalogConf), mockEntityStore());

IcebergMetricsManager icebergMetricsManager = new IcebergMetricsManager(new IcebergConfig());
resourceConfig.register(
Expand Down

0 comments on commit 1811070

Please sign in to comment.