Skip to content

Commit

Permalink
[apache#4176]feat(iceberg) support multiple catalogs by Provider
Browse files Browse the repository at this point in the history
  • Loading branch information
theoryxu committed Jul 31, 2024
1 parent 4f27630 commit 253a8e5
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 68 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.iceberg.common.ops;

import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.utils.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This provider use configs to support multiple catalogs.
*
* <p>For example, there are two different catalogs: jdbc_proxy, hive_proxy The config is like:
*
* <p>gravitino.iceberg-rest.catalog.jdbc_proxy.catalog-backend = jdbc
* gravitino.iceberg-rest.catalog.jdbc_proxy.uri = jdbc:mysql://{host}:{port}/{db} ...
* gravitino.iceberg-rest.catalog.hive_proxy.catalog-backend = hive
* gravitino.iceberg-rest.catalog.hive_proxy.uri = thrift://{host}:{port} ...
*/
public class ConfigIcebergTableOpsProvider implements IcebergTableOpsProvider {
public static final Logger LOG = LoggerFactory.getLogger(ConfigIcebergTableOpsProvider.class);

Expand All @@ -34,19 +61,6 @@ public IcebergTableOps getIcebergTableOps(String prefix) {
return new IcebergTableOps(getCatalogConfig(prefix));
}

@Override
public Optional<String> getPrefix(String warehouse) {
if (StringUtils.isBlank(warehouse)) {
return Optional.empty();
}
if (!getCatalogs().contains(warehouse)) {
String errorMsg = String.format("%s can not match any catalog", warehouse);
LOG.error(errorMsg);
throw new RuntimeException(errorMsg);
}
return Optional.of(warehouse);
}

private List<String> getCatalogs() {
Map<String, Boolean> catalogs = Maps.newHashMap();
for (String key : this.icebergConfig.getAllConfig().keySet()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.iceberg.common.ops;

import com.google.common.base.Splitter;
Expand All @@ -7,7 +25,6 @@
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import org.apache.gravitino.iceberg.common.IcebergConfig;
Expand All @@ -32,11 +49,6 @@ public IcebergTableOpsManager(IcebergConfig config) {
this.provider.initialize(config);
}

public IcebergTableOpsManager(IcebergTableOpsProvider provider) {
this.icebergTableOpsMap = Maps.newConcurrentMap();
this.provider = provider;
}

public IcebergTableOps getOps(String rawPrefix) {
String prefix = shelling(rawPrefix);
String cacheKey = prefix;
Expand All @@ -47,15 +59,6 @@ public IcebergTableOps getOps(String rawPrefix) {
return icebergTableOpsMap.computeIfAbsent(cacheKey, k -> provider.getIcebergTableOps(prefix));
}

public Optional<String> getPrefix(String rawPrefix, String warehouse) {
String prefix = shelling(rawPrefix);
if (!StringUtils.isBlank(prefix)) {
return Optional.of(prefix);
} else {
return provider.getPrefix(warehouse);
}
}

private IcebergTableOpsProvider createProvider(IcebergConfig config) {
try (IsolatedClassLoader isolatedClassLoader =
IsolatedClassLoader.buildClassLoader(getClassPaths(config))) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.iceberg.common.ops;

import java.util.Optional;
import org.apache.gravitino.iceberg.common.IcebergConfig;

/**
Expand All @@ -17,12 +34,4 @@ public interface IcebergTableOpsProvider {
* @return the instance of IcebergTableOps.
*/
IcebergTableOps getIcebergTableOps(String prefix);

/**
* Get a path prefix using by clients.
*
* @param warehouse the identifier for an iceberg catalog.
* @return a path prefix.
*/
Optional<String> getPrefix(String warehouse);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,14 @@

import com.codahale.metrics.annotation.ResponseMetered;
import com.codahale.metrics.annotation.Timed;
import javax.inject.Inject;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.gravitino.iceberg.common.ops.IcebergTableOpsManager;
import org.apache.gravitino.iceberg.service.IcebergRestUtils;
import org.apache.gravitino.metrics.MetricNames;
import org.apache.iceberg.rest.responses.ConfigResponse;
Expand All @@ -46,25 +41,12 @@ public class IcebergConfigOperations {
@Context
private HttpServletRequest httpRequest;

private IcebergTableOpsManager icebergTableOpsManager;

@Inject
public IcebergConfigOperations(IcebergTableOpsManager icebergTableOpsManager) {
this.icebergTableOpsManager = icebergTableOpsManager;
}

@GET
@Produces(MediaType.APPLICATION_JSON)
@Timed(name = "config." + MetricNames.HTTP_PROCESS_DURATION, absolute = true)
@ResponseMetered(name = "config", absolute = true)
public Response getConfig(
@PathParam("prefix") String prefix,
@DefaultValue("") @QueryParam("warehouse") String warehouse) {
ConfigResponse.Builder builder = ConfigResponse.builder();
icebergTableOpsManager
.getPrefix(prefix, warehouse)
.ifPresent(p -> builder.withOverride("prefix", p));
ConfigResponse response = builder.build();
public Response getConfig() {
ConfigResponse response = ConfigResponse.builder().build();
return IcebergRestUtils.ok(response);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.gravitino.Config;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.server.GravitinoIcebergRESTServer;
import org.apache.gravitino.integration.test.util.HttpUtils;
Expand Down Expand Up @@ -123,7 +124,12 @@ private void customizeConfigFile(String configTempFileName, String configFileNam
String.valueOf(RESTUtils.findAvailablePort(2000, 3000)));

configMap.putAll(customConfigs);

if (ITUtils.EMBEDDED_TEST_MODE.equals(System.getProperty(ITUtils.TEST_MODE))) {
configMap.put(
IcebergConfig.ICEBERG_CONFIG_PREFIX
+ IcebergConstants.ICEBERG_REST_SERVICE_CATALOG_PROVIDER_CLASSPATH,
"iceberg/iceberg-common/build/libs");
}
ITUtils.rewriteConfigFile(configTempFileName, configFileName, configMap);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.common.ops.ConfigIcebergTableOpsProvider;
import org.apache.gravitino.iceberg.common.ops.IcebergTableOpsManager;
import org.apache.gravitino.iceberg.service.IcebergExceptionMapper;
import org.apache.gravitino.iceberg.service.IcebergObjectMapperProvider;
Expand Down Expand Up @@ -71,10 +71,11 @@ public static ResourceConfig getIcebergResourceConfig(Class c, boolean bindIcebe
if (bindIcebergTableOps) {
Map<String, String> catalogConf = Maps.newHashMap();
catalogConf.put(String.format("catalog.%s.xx", PREFIX), "xxx");
catalogConf.put(
IcebergConstants.ICEBERG_REST_SERVICE_CATALOG_PROVIDER_CLASSPATH,
"iceberg/iceberg-common/build/libs");
IcebergConfig icebergConfig = new IcebergConfig(catalogConf);
ConfigIcebergTableOpsProvider provider = new ConfigIcebergTableOpsProvider();
provider.initialize(icebergConfig);
IcebergTableOpsManager icebergTableOpsManager = new IcebergTableOpsManager(provider);
IcebergTableOpsManager icebergTableOpsManager = new IcebergTableOpsManager(icebergConfig);

IcebergMetricsManager icebergMetricsManager = new IcebergMetricsManager(new IcebergConfig());
resourceConfig.register(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,7 @@ public void testConfig(boolean withPrefix) {

ConfigResponse response = resp.readEntity(ConfigResponse.class);
Assertions.assertEquals(0, response.defaults().size());
if (withPrefix) {
Assertions.assertEquals(1, response.overrides().size());
} else {
Assertions.assertEquals(0, response.overrides().size());
}
Assertions.assertEquals(0, response.overrides().size());
}

@ParameterizedTest
Expand Down

0 comments on commit 253a8e5

Please sign in to comment.