Skip to content

Commit

Permalink
[apache#4176]feat(iceberg) support multiple catalogs in Iceberg REST …
Browse files Browse the repository at this point in the history
…catalog server
  • Loading branch information
theoryxu committed Aug 12, 2024
1 parent 1b08479 commit 84db02e
Showing 1 changed file with 14 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.gravitino.iceberg.common.ops;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
Expand All @@ -43,38 +42,35 @@ public class ConfigBasedIcebergTableOpsProvider implements IcebergTableOpsProvid
public static final Logger LOG =
LoggerFactory.getLogger(ConfigBasedIcebergTableOpsProvider.class);

private Map<String, String> properties;

private List<String> catalogNames;
private Map<String, IcebergConfig> catalogConfigs;

@Override
public void initialize(Map<String, String> properties) {
this.catalogNames =
this.catalogConfigs =
properties.keySet().stream()
.map(this::getCatalogName)
.flatMap(op -> op.map(Stream::of).orElseGet(Stream::empty))
.distinct()
.collect(Collectors.toList());
this.properties = properties;
.collect(
Collectors.toMap(
catalogName -> catalogName,
catalogName ->
new IcebergConfig(
MapUtils.getPrefixMap(
properties, String.format("catalog.%s.", catalogName)))));
this.catalogConfigs.put(
IcebergConstants.GRAVITINO_DEFAULT_CATALOG, new IcebergConfig(properties));
}

@Override
public IcebergTableOps getIcebergTableOps(String catalogName) {
if (IcebergConstants.GRAVITINO_DEFAULT_CATALOG.equals(catalogName)) {
return new IcebergTableOps(new IcebergConfig(properties));
}
if (!catalogNames.contains(catalogName)) {
IcebergConfig icebergConfig = this.catalogConfigs.get(catalogName);
if (icebergConfig == null) {
String errorMsg = String.format("%s can not match any catalog", catalogName);
LOG.warn(errorMsg);
throw new RuntimeException(errorMsg);
}
return new IcebergTableOps(getCatalogConfig(catalogName));
}

private IcebergConfig getCatalogConfig(String catalogName) {
Map<String, String> base =
MapUtils.getPrefixMap(this.properties, String.format("catalog.%s.", catalogName));
return new IcebergConfig(base);
return new IcebergTableOps(icebergConfig);
}

private Optional<String> getCatalogName(String catalogConfigKey) {
Expand Down

0 comments on commit 84db02e

Please sign in to comment.