Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[4958] feat(iceberg): support event listener for Iceberg REST server #5002

Merged
merged 10 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bundles/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@

tasks.all {
enabled = false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,6 @@ public class IcebergConstants {

public static final String GRAVITINO_METALAKE = "gravitino-metalake";

public static final String GRAVITINO_DEFAULT_CATALOG = "__gravitino_default_catalog";
public static final String ICEBERG_REST_DEFAULT_METALAKE = "gravitino";
public static final String ICEBERG_REST_DEFAULT_CATALOG = "default_catalog";
}
35 changes: 24 additions & 11 deletions core/src/main/java/org/apache/gravitino/GravitinoEnv.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public class GravitinoEnv {
private static final Logger LOG = LoggerFactory.getLogger(GravitinoEnv.class);

private Config config;
// Iceberg REST server use base components while Gravitino Server use full components.
private boolean manageFullComponents = true;

private EntityStore entityStore;

Expand Down Expand Up @@ -130,21 +132,30 @@ public static GravitinoEnv getInstance() {
}

/**
* Initialize the Gravitino environment.
* Initialize base components, used for Iceberg REST server.
*
* @param config The configuration object to initialize the environment.
* @param isGravitinoServer A boolean flag indicating whether the initialization is for the
* Gravitino server. If true, server-specific components will be initialized in addition to
* the base components.
*/
public void initialize(Config config, boolean isGravitinoServer) {
LOG.info("Initializing Gravitino Environment...");
public void initializeBaseComponents(Config config) {
LOG.info("Initializing Gravitino base environment...");
this.config = config;
this.manageFullComponents = false;
initBaseComponents();
if (isGravitinoServer) {
initGravitinoServerComponents();
}
LOG.info("Gravitino Environment is initialized.");
LOG.info("Gravitino base environment is initialized.");
}

/**
* Initialize all components, used for Gravitino server.
*
* @param config The configuration object to initialize the environment.
*/
public void initializeFullComponents(Config config) {
LOG.info("Initializing Gravitino full environment...");
this.config = config;
this.manageFullComponents = true;
initBaseComponents();
initGravitinoServerComponents();
LOG.info("Gravitino full environment is initialized.");
}

/**
Expand Down Expand Up @@ -308,9 +319,11 @@ public FutureGrantManager futureGrantManager() {
}

public void start() {
auxServiceManager.serviceStart();
metricsSystem.start();
eventListenerManager.start();
if (manageFullComponents) {
auxServiceManager.serviceStart();
}
}

/** Shutdown the Gravitino environment. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public class IcebergConfig extends Config implements OverwriteDefaultConfig {
public static final ConfigEntry<String> CATALOG_BACKEND_IMPL =
new ConfigBuilder(IcebergConstants.CATALOG_BACKEND_IMPL)
.doc(
"The fully-qualified class name of a custom catalog implementation, only worked if `catalog-backend` is `custom`")
"The fully-qualified class name of a custom catalog implementation, "
+ "only worked if `catalog-backend` is `custom`")
.version(ConfigConstants.VERSION_0_7_0)
.stringConf()
.create();
Expand Down Expand Up @@ -175,7 +176,8 @@ public class IcebergConfig extends Config implements OverwriteDefaultConfig {
public static final ConfigEntry<Integer> ICEBERG_METRICS_STORE_RETAIN_DAYS =
new ConfigBuilder(IcebergConstants.ICEBERG_METRICS_STORE_RETAIN_DAYS)
.doc(
"The retain days of Iceberg metrics, the value not greater than 0 means retain forever")
"The retain days of Iceberg metrics, the value not greater than 0 means "
+ "retain forever")
.version(ConfigConstants.VERSION_0_4_0)
.intConf()
.createWithDefault(-1);
Expand Down Expand Up @@ -205,23 +207,27 @@ public class IcebergConfig extends Config implements OverwriteDefaultConfig {
public static final ConfigEntry<String> ICEBERG_REST_CATALOG_CONFIG_PROVIDER =
new ConfigBuilder(IcebergConstants.ICEBERG_REST_CATALOG_CONFIG_PROVIDER)
.doc(
"Catalog provider class name, you can develop a class that implements `IcebergCatalogConfigProvider` and add the corresponding jar file to the Iceberg REST service classpath directory.")
"Catalog provider class name, you can develop a class that implements "
+ "`IcebergConfigProvider` and add the corresponding jar file to the Iceberg "
+ "REST service classpath directory.")
.version(ConfigConstants.VERSION_0_7_0)
.stringConf()
.createWithDefault(IcebergConstants.STATIC_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME);

public static final ConfigEntry<String> GRAVITINO_URI =
new ConfigBuilder(IcebergConstants.GRAVITINO_URI)
.doc(
"The uri of Gravitino server address, only worked if `catalog-provider` is `gravitino-based-provider`.")
"The uri of Gravitino server address, only worked if `catalog-provider` is "
+ "`gravitino-based-provider`.")
.version(ConfigConstants.VERSION_0_7_0)
.stringConf()
.create();

public static final ConfigEntry<String> GRAVITINO_METALAKE =
new ConfigBuilder(IcebergConstants.GRAVITINO_METALAKE)
.doc(
"The metalake name that `gravitino-based-provider` used to request to Gravitino, only worked if `catalog-provider` is `gravitino-based-provider`.")
"The metalake name that `gravitino-based-provider` used to request to Gravitino, "
+ "only worked if `catalog-provider` is `gravitino-based-provider`.")
.version(ConfigConstants.VERSION_0_7_0)
.stringConf()
.create();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@
import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager;
import org.apache.gravitino.iceberg.service.IcebergExceptionMapper;
import org.apache.gravitino.iceberg.service.IcebergObjectMapperProvider;
import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableEventDispatcher;
import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationDispatcher;
import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationExecutor;
import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager;
import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider;
import org.apache.gravitino.iceberg.service.provider.IcebergConfigProviderFactory;
import org.apache.gravitino.listener.EventBus;
import org.apache.gravitino.metrics.MetricsSystem;
import org.apache.gravitino.metrics.source.MetricsSource;
import org.apache.gravitino.server.web.HttpServerMetricsSource;
Expand All @@ -54,6 +60,7 @@ public class RESTService implements GravitinoAuxiliaryService {

private IcebergCatalogWrapperManager icebergCatalogWrapperManager;
private IcebergMetricsManager icebergMetricsManager;
private IcebergConfigProvider configProvider;

private void initServer(IcebergConfig icebergConfig) {
JettyServerConfig serverConfig = JettyServerConfig.fromConfig(icebergConfig);
Expand All @@ -70,14 +77,27 @@ private void initServer(IcebergConfig icebergConfig) {
new HttpServerMetricsSource(MetricsSource.ICEBERG_REST_SERVER_METRIC_NAME, config, server);
metricsSystem.register(httpServerMetricsSource);

icebergCatalogWrapperManager = new IcebergCatalogWrapperManager(icebergConfig.getAllConfig());
icebergMetricsManager = new IcebergMetricsManager(icebergConfig);
Map<String, String> configProperties = icebergConfig.getAllConfig();
this.configProvider = IcebergConfigProviderFactory.create(configProperties);
configProvider.initialize(configProperties);
String metalakeName = configProvider.getMetalakeName();

EventBus eventBus = GravitinoEnv.getInstance().eventBus();
this.icebergCatalogWrapperManager =
new IcebergCatalogWrapperManager(configProperties, configProvider);
this.icebergMetricsManager = new IcebergMetricsManager(icebergConfig);
IcebergTableOperationExecutor icebergTableOperationExecutor =
new IcebergTableOperationExecutor(icebergCatalogWrapperManager);
IcebergTableEventDispatcher icebergTableEventDispatcher =
new IcebergTableEventDispatcher(icebergTableOperationExecutor, eventBus, metalakeName);

config.register(
new AbstractBinder() {
@Override
protected void configure() {
bind(icebergCatalogWrapperManager).to(IcebergCatalogWrapperManager.class).ranked(1);
bind(icebergMetricsManager).to(IcebergMetricsManager.class).ranked(1);
bind(icebergTableEventDispatcher).to(IcebergTableOperationDispatcher.class).ranked(1);
}
});

Expand Down Expand Up @@ -118,6 +138,9 @@ public void serviceStop() throws Exception {
server.stop();
LOG.info("Iceberg REST service stopped");
}
if (configProvider != null) {
configProvider.close();
}
if (icebergCatalogWrapperManager != null) {
icebergCatalogWrapperManager.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,14 @@ public GravitinoIcebergRESTServer(Config config) {
}

private void initialize() {
gravitinoEnv.initialize(serverConfig, false);
gravitinoEnv.initializeBaseComponents(serverConfig);
icebergRESTService.serviceInit(
serverConfig.getConfigsWithPrefix(IcebergConfig.ICEBERG_CONFIG_PREFIX));
ServerAuthenticator.getInstance().initialize(serverConfig);
}

private void start() {
gravitinoEnv.start();
icebergRESTService.serviceStart();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,46 +22,34 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.credential.CredentialProvider;
import org.apache.gravitino.credential.CredentialProviderFactory;
import org.apache.gravitino.credential.CredentialProviderManager;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogConfigProvider;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
import org.apache.gravitino.iceberg.provider.DynamicIcebergCatalogConfigProvider;
import org.apache.gravitino.iceberg.provider.StaticIcebergCatalogConfigProvider;
import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergCatalogWrapperManager implements AutoCloseable {
public static final Logger LOG = LoggerFactory.getLogger(IcebergCatalogWrapperManager.class);

private static final ImmutableMap<String, String> ICEBERG_CATALOG_CONFIG_PROVIDER_NAMES =
ImmutableMap.of(
IcebergConstants.STATIC_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME,
StaticIcebergCatalogConfigProvider.class.getCanonicalName(),
IcebergConstants.DYNAMIC_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME,
DynamicIcebergCatalogConfigProvider.class.getCanonicalName());

private final Cache<String, IcebergCatalogWrapper> icebergCatalogWrapperCache;

private final IcebergCatalogConfigProvider provider;
private final IcebergConfigProvider configProvider;

private CredentialProviderManager credentialProviderManager;

public IcebergCatalogWrapperManager(Map<String, String> properties) {
public IcebergCatalogWrapperManager(
Map<String, String> properties, IcebergConfigProvider configProvider) {
this.credentialProviderManager = new CredentialProviderManager();
this.provider = createIcebergCatalogConfigProvider(properties);
this.provider.initialize(properties);
this.configProvider = configProvider;
this.icebergCatalogWrapperCache =
Caffeine.newBuilder()
.expireAfterWrite(
Expand Down Expand Up @@ -92,17 +80,21 @@ public IcebergCatalogWrapperManager(Map<String, String> properties) {
* @return the instance of IcebergCatalogWrapper.
*/
public IcebergCatalogWrapper getOps(String rawPrefix) {
String catalogName = getCatalogName(rawPrefix);
IcebergCatalogWrapper tableOps =
String catalogName = IcebergRestUtils.getCatalogName(rawPrefix);
return getCatalogWrapper(catalogName);
}

public IcebergCatalogWrapper getCatalogWrapper(String catalogName) {
IcebergCatalogWrapper catalogWrapper =
icebergCatalogWrapperCache.get(catalogName, k -> createCatalogWrapper(catalogName));
// Reload conf to reset UserGroupInformation or icebergTableOps will always use
// Simple auth.
tableOps.reloadHadoopConf();
return tableOps;
catalogWrapper.reloadHadoopConf();
return catalogWrapper;
}

public CredentialProvider getCredentialProvider(String prefix) {
String catalogName = getCatalogName(prefix);
String catalogName = IcebergRestUtils.getCatalogName(prefix);
return credentialProviderManager.getCredentialProvider(catalogName);
}

Expand All @@ -112,7 +104,7 @@ protected IcebergCatalogWrapper createIcebergCatalogWrapper(IcebergConfig iceber
}

private IcebergCatalogWrapper createCatalogWrapper(String catalogName) {
Optional<IcebergConfig> icebergConfig = provider.getIcebergCatalogConfig(catalogName);
Optional<IcebergConfig> icebergConfig = configProvider.getIcebergCatalogConfig(catalogName);
if (!icebergConfig.isPresent()) {
throw new RuntimeException("Couldn't find Iceberg configuration for " + catalogName);
}
Expand All @@ -128,43 +120,6 @@ private IcebergCatalogWrapper createCatalogWrapper(String catalogName) {
return createIcebergCatalogWrapper(icebergConfig.get());
}

private String getCatalogName(String rawPrefix) {
String prefix = shelling(rawPrefix);
Preconditions.checkArgument(
!IcebergConstants.GRAVITINO_DEFAULT_CATALOG.equals(prefix),
String.format("%s is conflict with reserved key, please replace it", prefix));
if (StringUtils.isBlank(prefix)) {
return IcebergConstants.GRAVITINO_DEFAULT_CATALOG;
}
return prefix;
}

private IcebergCatalogConfigProvider createIcebergCatalogConfigProvider(
Map<String, String> properties) {
String providerName =
(new IcebergConfig(properties)).get(IcebergConfig.ICEBERG_REST_CATALOG_CONFIG_PROVIDER);
String className =
ICEBERG_CATALOG_CONFIG_PROVIDER_NAMES.getOrDefault(providerName, providerName);
LOG.info("Load Iceberg catalog provider: {}.", className);
try {
Class<?> providerClz = Class.forName(className);
return (IcebergCatalogConfigProvider) providerClz.getDeclaredConstructor().newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private String shelling(String rawPrefix) {
if (StringUtils.isBlank(rawPrefix)) {
return rawPrefix;
} else {
// rawPrefix is a string matching ([^/]*/) which end with /
Preconditions.checkArgument(
rawPrefix.endsWith("/"), String.format("rawPrefix %s format is illegal", rawPrefix));
return rawPrefix.substring(0, rawPrefix.length() - 1);
}
}

private void closeIcebergCatalogWrapper(IcebergCatalogWrapper catalogWrapper) {
try {
catalogWrapper.close();
Expand All @@ -176,8 +131,5 @@ private void closeIcebergCatalogWrapper(IcebergCatalogWrapper catalogWrapper) {
@Override
public void close() throws Exception {
icebergCatalogWrapperCache.invalidateAll();
if (provider instanceof AutoCloseable) {
((AutoCloseable) provider).close();
}
}
}
Loading
Loading