Skip to content

Commit

Permalink
fix comment
Browse files Browse the repository at this point in the history
  • Loading branch information
FANNG1 committed Oct 17, 2024
1 parent 3e35c8e commit b72294a
Show file tree
Hide file tree
Showing 17 changed files with 185 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package org.apache.gravitino.credential;

public class CredentialConstants {
public static final String CREDENTIAL_TYPE = "credential-type";
public static final String CREDENTIAL_PROVIDER_TYPE = "credential-provider-type";

private CredentialConstants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ public void initialize(

this.icebergCatalogWrapper =
new IcebergCatalogWrapper(
icebergConfig,
false /*Gravitino server will support credential vending in separate interface*/);
icebergConfig);
this.icebergCatalogWrapperHelper =
new IcebergCatalogWrapperHelper(icebergCatalogWrapper.getCatalog());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public class TestIcebergTableUpdate {
@BeforeEach
public void init() {
icebergCatalogWrapper =
new IcebergCatalogWrapper(new IcebergConfig(Collections.emptyMap()), false);
new IcebergCatalogWrapper(new IcebergConfig(Collections.emptyMap()));
icebergCatalogWrapperHelper =
new IcebergCatalogWrapperHelper(icebergCatalogWrapper.getCatalog());
createNamespace(TEST_NAMESPACE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
/**
* Helper class to generate specific credential properties for different table format and engine.
*/
public class CredentialUtils {
public class CredentialPropertyUtils {
/**
* Transforms a specific credential into a map of Iceberg properties.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,22 @@

package org.apache.gravitino.credential;

import org.apache.gravitino.NameIdentifier;
import com.google.common.collect.ImmutableSet;
import java.util.HashMap;
import java.util.Map;
import org.apache.gravitino.utils.PrincipalUtils;

public class CredentialProviderManager {

void registerCredentialProvider(NameIdentifier nameIdentifier, CredentialProvider credentialProvider) {
public void registerCredentialProvider(String catalogName, CredentialProvider credentialProvider) {

}

void unregisterCredentialProvider(NameIdentifier nameIdentifier) {
public void unregisterCredentialProvider(String catalogName) {

}

CredentialProvider getCredentialProvider(NameIdentifier nameIdentifier, String credentialType) {
public CredentialProvider getCredentialProvider(String catalogName) {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.credential;

import com.google.common.collect.ImmutableSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.utils.PrincipalUtils;

public class CredentialUtils {
public static Credential vendCredential(CredentialProvider credentialProvider, String path) {
PathBasedCredentialContext pathBasedCredentialContext =
new PathBasedCredentialContext(
PrincipalUtils.getCurrentUserName(), ImmutableSet.of(path), ImmutableSet.of());
return credentialProvider.getCredential(pathBasedCredentialContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,9 @@ public class IcebergConfig extends Config implements OverwriteDefaultConfig {
.toSequence()
.createWithDefault(Collections.emptyList());

public static final ConfigEntry<String> CREDENTIAL_TYPE =
new ConfigBuilder(CredentialConstants.CREDENTIAL_TYPE)
.doc("The credential type for Iceberg")
public static final ConfigEntry<String> CREDENTIAL_PROVIDER_TYPE =
new ConfigBuilder(CredentialConstants.CREDENTIAL_PROVIDER_TYPE)
.doc("The credential provider type for Iceberg")
.version(ConfigConstants.VERSION_0_7_0)
.stringConf()
.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
package org.apache.gravitino.iceberg.common.ops;

import java.util.Map;
import java.util.Optional;
import org.apache.gravitino.iceberg.common.IcebergConfig;

/**
* IcebergCatalogWrapperProvider is an interface defining how Iceberg REST catalog server gets
* Iceberg catalogs.
* {@code IcebergCatalogConfigProvider} is an interface defining how Iceberg REST catalog server gets
* Iceberg catalog configurations.
*/
public interface IcebergCatalogWrapperProvider {
public interface IcebergCatalogConfigProvider {

/**
* @param properties The parameters for creating Provider which from configurations whose prefix
Expand All @@ -33,8 +35,8 @@ public interface IcebergCatalogWrapperProvider {
void initialize(Map<String, String> properties);

/**
* @param catalogName a param send by clients.
* @return the instance of IcebergCatalogWrapper.
* @param catalogName Iceberg catalog name.
* @return the configuration of Iceberg catalog.
*/
IcebergCatalogWrapper getIcebergTableOps(String catalogName);
Optional<IcebergConfig> getIcebergCatalogConfig(String catalogName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.collect.ImmutableSet;
import java.sql.Driver;
import java.sql.DriverManager;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
Expand All @@ -31,20 +30,13 @@
import java.util.function.Supplier;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.credential.Credential;
import org.apache.gravitino.credential.CredentialConstants;
import org.apache.gravitino.credential.CredentialProvider;
import org.apache.gravitino.credential.CredentialProviderFactory;
import org.apache.gravitino.credential.CredentialUtils;
import org.apache.gravitino.credential.PathBasedCredentialContext;
import org.apache.gravitino.iceberg.common.IcebergCatalogBackend;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.common.utils.IcebergCatalogUtil;
import org.apache.gravitino.utils.IsolatedClassLoader;
import org.apache.gravitino.utils.MapUtils;
import org.apache.gravitino.utils.PrincipalUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.iceberg.Transaction;
Expand Down Expand Up @@ -78,7 +70,6 @@ public class IcebergCatalogWrapper implements AutoCloseable {
@Getter protected Catalog catalog;
private SupportsNamespaces asNamespaceCatalog;
private final IcebergCatalogBackend catalogBackend;
private Optional<CredentialProvider> credentialProvider;
private String catalogUri = null;
private Map<String, String> catalogConfigToClients;
private Map<String, String> catalogPropertiesMap;
Expand All @@ -93,7 +84,7 @@ public class IcebergCatalogWrapper implements AutoCloseable {
IcebergConstants.ICEBERG_OSS_ACCESS_KEY_ID,
IcebergConstants.ICEBERG_OSS_ACCESS_KEY_SECRET);

public IcebergCatalogWrapper(IcebergConfig icebergConfig, boolean supportsCredentialVending) {
public IcebergCatalogWrapper(IcebergConfig icebergConfig) {
this.catalogBackend =
IcebergCatalogBackend.valueOf(
icebergConfig.get(IcebergConfig.CATALOG_BACKEND).toUpperCase(Locale.ROOT));
Expand All @@ -113,14 +104,7 @@ public IcebergCatalogWrapper(IcebergConfig icebergConfig, boolean supportsCreden

this.catalogPropertiesMap = icebergConfig.getIcebergCatalogProperties();

String credentialType = icebergConfig.get(IcebergConfig.CREDENTIAL_TYPE);
if (StringUtils.isBlank(credentialType) || !supportsCredentialVending) {
this.credentialProvider = Optional.empty();
} else {
this.credentialProvider =
Optional.of(
CredentialProviderFactory.create(credentialType, icebergConfig.getAllConfig()));
}

}

private void validateNamespace(Optional<Namespace> namespace) {
Expand Down Expand Up @@ -175,7 +159,7 @@ public LoadTableResponse registerTable(Namespace namespace, RegisterTableRequest
/**
* Reload hadoop configuration, this is useful when the hadoop configuration UserGroupInformation
* is shared by multiple threads. UserGroupInformation#authenticationMethod was first initialized
* in KerberosClient, however, when switching to iceberg-rest thead,
* in KerberosClient, however, when switching to iceberg-rest thread,
* UserGroupInformation#authenticationMethod will be reset to the default value; we need to
* reinitialize it again.
*/
Expand All @@ -197,18 +181,6 @@ public LoadTableResponse createTable(Namespace namespace, CreateTableRequest req
this::getCatalogConfigToClient);
}

public LoadTableResponse createTableWithCredentialVending(
Namespace namespace, CreateTableRequest request) {
request.validate();
if (request.stageCreate()) {
return injectTableConfig(
() -> CatalogHandlers.stageTableCreate(catalog, namespace, request),
this::vendCredentials);
}
return injectTableConfig(
() -> CatalogHandlers.createTable(catalog, namespace, request), this::vendCredentials);
}

public void dropTable(TableIdentifier tableIdentifier) {
CatalogHandlers.dropTable(catalog, tableIdentifier);
}
Expand All @@ -222,11 +194,6 @@ public LoadTableResponse loadTable(TableIdentifier tableIdentifier) {
() -> CatalogHandlers.loadTable(catalog, tableIdentifier), this::getCatalogConfigToClient);
}

public LoadTableResponse loadTableWithCredentialVending(TableIdentifier tableIdentifier) {
return injectTableConfig(
() -> CatalogHandlers.loadTable(catalog, tableIdentifier), this::vendCredentials);
}

public boolean tableExists(TableIdentifier tableIdentifier) {
return catalog.tableExists(tableIdentifier);
}
Expand Down Expand Up @@ -287,14 +254,6 @@ public void close() throws Exception {
// JdbcCatalog and WrappedHiveCatalog need close.
((AutoCloseable) catalog).close();
}
credentialProvider.ifPresent(
provider -> {
try {
provider.close();
} catch (Exception e) {
LOG.warn("Close credential provider failed,", e);
}
});

// Because each catalog in Gravitino has its own classloader, after a catalog is no longer used
// for a long time or dropped, the instance of classloader needs to be released. In order to
Expand All @@ -316,7 +275,7 @@ public void close() throws Exception {
private void closeMySQLCatalogResource() {
try {
// Close thread AbandonedConnectionCleanupThread if we are using `com.mysql.cj.jdbc.Driver`,
// for driver `com.mysql.jdbc.Driver` (deprecated), the daemon thead maybe not this one.
// for driver `com.mysql.jdbc.Driver` (deprecated), the daemon thread maybe not this one.
Class.forName("com.mysql.cj.jdbc.AbandonedConnectionCleanupThread")
.getMethod("uncheckedShutdown")
.invoke(null);
Expand Down Expand Up @@ -361,25 +320,6 @@ private Map<String, String> getCatalogConfigToClient(String location) {
return catalogConfigToClients;
}

private Map<String, String> vendCredentials(String location) {
// ifPresentOrElse is not supported in Java8
if (!credentialProvider.isPresent()) {
throw new IllegalArgumentException(
"Credential vending is not enabled, please set "
+ CredentialConstants.CREDENTIAL_TYPE
+ " to proper values");
}

Map<String, String> configs = new HashMap<>(catalogConfigToClients);
// todo(fanng): check user privilege.
PathBasedCredentialContext pathBasedCredentialContext =
new PathBasedCredentialContext(
PrincipalUtils.getCurrentUserName(), ImmutableSet.of(location), ImmutableSet.of());
Credential credential = credentialProvider.get().getCredential(pathBasedCredentialContext);
configs.putAll(CredentialUtils.toIcebergProperties(credential));
return configs;
}

@Getter
@Setter
public static final class IcebergTableChange {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@
import java.util.stream.Collectors;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapperProvider;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogConfigProvider;
import org.apache.gravitino.utils.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -40,11 +39,11 @@
* gravitino.iceberg-rest.catalog.hive_proxy.catalog-backend = hive
* gravitino.iceberg-rest.catalog.hive_proxy.uri = thrift://{host}:{port} ...
*/
public class ConfigBasedIcebergCatalogWrapperProvider implements IcebergCatalogWrapperProvider {
public class ConfigBasedIcebergCatalogConfigProvider implements IcebergCatalogConfigProvider {
public static final Logger LOG =
LoggerFactory.getLogger(ConfigBasedIcebergCatalogWrapperProvider.class);
LoggerFactory.getLogger(ConfigBasedIcebergCatalogConfigProvider.class);

public static final String CONFIG_BASE_ICEBERG_TABLE_OPS_PROVIDER_NAME = "config-based-provider";
public static final String CONFIG_BASE_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME = "config-based-provider";

@VisibleForTesting Map<String, IcebergConfig> catalogConfigs;

Expand All @@ -68,14 +67,8 @@ public void initialize(Map<String, String> properties) {
}

@Override
public IcebergCatalogWrapper getIcebergTableOps(String catalogName) {
IcebergConfig icebergConfig = this.catalogConfigs.get(catalogName);
if (icebergConfig == null) {
String errorMsg = String.format("%s can not match any catalog", catalogName);
LOG.warn(errorMsg);
throw new RuntimeException(errorMsg);
}
return new IcebergCatalogWrapper(icebergConfig, true);
public IcebergConfig getIcebergCatalogConfig(String catalogName) {
return this.catalogConfigs.get(catalogName);
}

private Optional<String> getCatalogName(String catalogConfigKey) {
Expand Down
Loading

0 comments on commit b72294a

Please sign in to comment.