Skip to content

Commit

Permalink
[4958] feat(iceberg): support event listener for Iceberg REST server (#…
Browse files Browse the repository at this point in the history
…5002)

### What changes were proposed in this pull request?

1.   Integrate Event listener system to Iceberg REST server
2. only dispatch create table event like `CreateIcebergTableEvent`
`CreateIcebergTablePreEvent` `CreateIcebergTableFailureEvent`


### Why are the changes needed?

Fix: #4958 

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
1. add configuration to add `IcebergEventLogger`  in event listener
```
gravitino.eventListener.names = iceberg
gravitino.eventListener.iceberg.class = org.apache.gravitino.iceberg.extension.IcebergEventLogger
```
2. run query will see event logs.

---------

Co-authored-by: Qi Yu <[email protected]>
  • Loading branch information
FANNG1 and yuqi1129 authored Oct 21, 2024
1 parent dffff6b commit d35ab31
Show file tree
Hide file tree
Showing 34 changed files with 929 additions and 227 deletions.
2 changes: 1 addition & 1 deletion bundles/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@

tasks.all {
enabled = false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,6 @@ public class IcebergConstants {

public static final String GRAVITINO_METALAKE = "gravitino-metalake";

public static final String GRAVITINO_DEFAULT_CATALOG = "__gravitino_default_catalog";
public static final String ICEBERG_REST_DEFAULT_METALAKE = "gravitino";
public static final String ICEBERG_REST_DEFAULT_CATALOG = "default_catalog";
}
35 changes: 24 additions & 11 deletions core/src/main/java/org/apache/gravitino/GravitinoEnv.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ public class GravitinoEnv {
private static final Logger LOG = LoggerFactory.getLogger(GravitinoEnv.class);

private Config config;
// Iceberg REST server use base components while Gravitino Server use full components.
private boolean manageFullComponents = true;

private EntityStore entityStore;

Expand Down Expand Up @@ -133,21 +135,30 @@ public static GravitinoEnv getInstance() {
}

/**
* Initialize the Gravitino environment.
* Initialize base components, used for Iceberg REST server.
*
* @param config The configuration object to initialize the environment.
* @param isGravitinoServer A boolean flag indicating whether the initialization is for the
* Gravitino server. If true, server-specific components will be initialized in addition to
* the base components.
*/
public void initialize(Config config, boolean isGravitinoServer) {
LOG.info("Initializing Gravitino Environment...");
public void initializeBaseComponents(Config config) {
LOG.info("Initializing Gravitino base environment...");
this.config = config;
this.manageFullComponents = false;
initBaseComponents();
if (isGravitinoServer) {
initGravitinoServerComponents();
}
LOG.info("Gravitino Environment is initialized.");
LOG.info("Gravitino base environment is initialized.");
}

/**
* Initialize all components, used for Gravitino server.
*
* @param config The configuration object to initialize the environment.
*/
public void initializeFullComponents(Config config) {
LOG.info("Initializing Gravitino full environment...");
this.config = config;
this.manageFullComponents = true;
initBaseComponents();
initGravitinoServerComponents();
LOG.info("Gravitino full environment is initialized.");
}

/**
Expand Down Expand Up @@ -311,9 +322,11 @@ public FutureGrantManager futureGrantManager() {
}

public void start() {
auxServiceManager.serviceStart();
metricsSystem.start();
eventListenerManager.start();
if (manageFullComponents) {
auxServiceManager.serviceStart();
}
}

/** Shutdown the Gravitino environment. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public class IcebergConfig extends Config implements OverwriteDefaultConfig {
public static final ConfigEntry<String> CATALOG_BACKEND_IMPL =
new ConfigBuilder(IcebergConstants.CATALOG_BACKEND_IMPL)
.doc(
"The fully-qualified class name of a custom catalog implementation, only worked if `catalog-backend` is `custom`")
"The fully-qualified class name of a custom catalog implementation, "
+ "only worked if `catalog-backend` is `custom`")
.version(ConfigConstants.VERSION_0_7_0)
.stringConf()
.create();
Expand Down Expand Up @@ -175,7 +176,8 @@ public class IcebergConfig extends Config implements OverwriteDefaultConfig {
public static final ConfigEntry<Integer> ICEBERG_METRICS_STORE_RETAIN_DAYS =
new ConfigBuilder(IcebergConstants.ICEBERG_METRICS_STORE_RETAIN_DAYS)
.doc(
"The retain days of Iceberg metrics, the value not greater than 0 means retain forever")
"The retain days of Iceberg metrics, the value not greater than 0 means "
+ "retain forever")
.version(ConfigConstants.VERSION_0_4_0)
.intConf()
.createWithDefault(-1);
Expand Down Expand Up @@ -205,23 +207,27 @@ public class IcebergConfig extends Config implements OverwriteDefaultConfig {
public static final ConfigEntry<String> ICEBERG_REST_CATALOG_CONFIG_PROVIDER =
new ConfigBuilder(IcebergConstants.ICEBERG_REST_CATALOG_CONFIG_PROVIDER)
.doc(
"Catalog provider class name, you can develop a class that implements `IcebergCatalogConfigProvider` and add the corresponding jar file to the Iceberg REST service classpath directory.")
"Catalog provider class name, you can develop a class that implements "
+ "`IcebergConfigProvider` and add the corresponding jar file to the Iceberg "
+ "REST service classpath directory.")
.version(ConfigConstants.VERSION_0_7_0)
.stringConf()
.createWithDefault(IcebergConstants.STATIC_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME);

public static final ConfigEntry<String> GRAVITINO_URI =
new ConfigBuilder(IcebergConstants.GRAVITINO_URI)
.doc(
"The uri of Gravitino server address, only worked if `catalog-provider` is `gravitino-based-provider`.")
"The uri of Gravitino server address, only worked if `catalog-provider` is "
+ "`gravitino-based-provider`.")
.version(ConfigConstants.VERSION_0_7_0)
.stringConf()
.create();

public static final ConfigEntry<String> GRAVITINO_METALAKE =
new ConfigBuilder(IcebergConstants.GRAVITINO_METALAKE)
.doc(
"The metalake name that `gravitino-based-provider` used to request to Gravitino, only worked if `catalog-provider` is `gravitino-based-provider`.")
"The metalake name that `gravitino-based-provider` used to request to Gravitino, "
+ "only worked if `catalog-provider` is `gravitino-based-provider`.")
.version(ConfigConstants.VERSION_0_7_0)
.stringConf()
.create();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@
import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager;
import org.apache.gravitino.iceberg.service.IcebergExceptionMapper;
import org.apache.gravitino.iceberg.service.IcebergObjectMapperProvider;
import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableEventDispatcher;
import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationDispatcher;
import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationExecutor;
import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager;
import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider;
import org.apache.gravitino.iceberg.service.provider.IcebergConfigProviderFactory;
import org.apache.gravitino.listener.EventBus;
import org.apache.gravitino.metrics.MetricsSystem;
import org.apache.gravitino.metrics.source.MetricsSource;
import org.apache.gravitino.server.web.HttpServerMetricsSource;
Expand All @@ -54,6 +60,7 @@ public class RESTService implements GravitinoAuxiliaryService {

private IcebergCatalogWrapperManager icebergCatalogWrapperManager;
private IcebergMetricsManager icebergMetricsManager;
private IcebergConfigProvider configProvider;

private void initServer(IcebergConfig icebergConfig) {
JettyServerConfig serverConfig = JettyServerConfig.fromConfig(icebergConfig);
Expand All @@ -70,14 +77,27 @@ private void initServer(IcebergConfig icebergConfig) {
new HttpServerMetricsSource(MetricsSource.ICEBERG_REST_SERVER_METRIC_NAME, config, server);
metricsSystem.register(httpServerMetricsSource);

icebergCatalogWrapperManager = new IcebergCatalogWrapperManager(icebergConfig.getAllConfig());
icebergMetricsManager = new IcebergMetricsManager(icebergConfig);
Map<String, String> configProperties = icebergConfig.getAllConfig();
this.configProvider = IcebergConfigProviderFactory.create(configProperties);
configProvider.initialize(configProperties);
String metalakeName = configProvider.getMetalakeName();

EventBus eventBus = GravitinoEnv.getInstance().eventBus();
this.icebergCatalogWrapperManager =
new IcebergCatalogWrapperManager(configProperties, configProvider);
this.icebergMetricsManager = new IcebergMetricsManager(icebergConfig);
IcebergTableOperationExecutor icebergTableOperationExecutor =
new IcebergTableOperationExecutor(icebergCatalogWrapperManager);
IcebergTableEventDispatcher icebergTableEventDispatcher =
new IcebergTableEventDispatcher(icebergTableOperationExecutor, eventBus, metalakeName);

config.register(
new AbstractBinder() {
@Override
protected void configure() {
bind(icebergCatalogWrapperManager).to(IcebergCatalogWrapperManager.class).ranked(1);
bind(icebergMetricsManager).to(IcebergMetricsManager.class).ranked(1);
bind(icebergTableEventDispatcher).to(IcebergTableOperationDispatcher.class).ranked(1);
}
});

Expand Down Expand Up @@ -118,6 +138,9 @@ public void serviceStop() throws Exception {
server.stop();
LOG.info("Iceberg REST service stopped");
}
if (configProvider != null) {
configProvider.close();
}
if (icebergCatalogWrapperManager != null) {
icebergCatalogWrapperManager.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,14 @@ public GravitinoIcebergRESTServer(Config config) {
}

private void initialize() {
gravitinoEnv.initialize(serverConfig, false);
gravitinoEnv.initializeBaseComponents(serverConfig);
icebergRESTService.serviceInit(
serverConfig.getConfigsWithPrefix(IcebergConfig.ICEBERG_CONFIG_PREFIX));
ServerAuthenticator.getInstance().initialize(serverConfig);
}

private void start() {
gravitinoEnv.start();
icebergRESTService.serviceStart();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,46 +22,34 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.credential.CredentialProvider;
import org.apache.gravitino.credential.CredentialProviderFactory;
import org.apache.gravitino.credential.CredentialProviderManager;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogConfigProvider;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
import org.apache.gravitino.iceberg.provider.DynamicIcebergCatalogConfigProvider;
import org.apache.gravitino.iceberg.provider.StaticIcebergCatalogConfigProvider;
import org.apache.gravitino.iceberg.service.provider.IcebergConfigProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergCatalogWrapperManager implements AutoCloseable {
public static final Logger LOG = LoggerFactory.getLogger(IcebergCatalogWrapperManager.class);

private static final ImmutableMap<String, String> ICEBERG_CATALOG_CONFIG_PROVIDER_NAMES =
ImmutableMap.of(
IcebergConstants.STATIC_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME,
StaticIcebergCatalogConfigProvider.class.getCanonicalName(),
IcebergConstants.DYNAMIC_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME,
DynamicIcebergCatalogConfigProvider.class.getCanonicalName());

private final Cache<String, IcebergCatalogWrapper> icebergCatalogWrapperCache;

private final IcebergCatalogConfigProvider provider;
private final IcebergConfigProvider configProvider;

private CredentialProviderManager credentialProviderManager;

public IcebergCatalogWrapperManager(Map<String, String> properties) {
public IcebergCatalogWrapperManager(
Map<String, String> properties, IcebergConfigProvider configProvider) {
this.credentialProviderManager = new CredentialProviderManager();
this.provider = createIcebergCatalogConfigProvider(properties);
this.provider.initialize(properties);
this.configProvider = configProvider;
this.icebergCatalogWrapperCache =
Caffeine.newBuilder()
.expireAfterWrite(
Expand Down Expand Up @@ -92,17 +80,21 @@ public IcebergCatalogWrapperManager(Map<String, String> properties) {
* @return the instance of IcebergCatalogWrapper.
*/
public IcebergCatalogWrapper getOps(String rawPrefix) {
String catalogName = getCatalogName(rawPrefix);
IcebergCatalogWrapper tableOps =
String catalogName = IcebergRestUtils.getCatalogName(rawPrefix);
return getCatalogWrapper(catalogName);
}

public IcebergCatalogWrapper getCatalogWrapper(String catalogName) {
IcebergCatalogWrapper catalogWrapper =
icebergCatalogWrapperCache.get(catalogName, k -> createCatalogWrapper(catalogName));
// Reload conf to reset UserGroupInformation or icebergTableOps will always use
// Simple auth.
tableOps.reloadHadoopConf();
return tableOps;
catalogWrapper.reloadHadoopConf();
return catalogWrapper;
}

public CredentialProvider getCredentialProvider(String prefix) {
String catalogName = getCatalogName(prefix);
String catalogName = IcebergRestUtils.getCatalogName(prefix);
return credentialProviderManager.getCredentialProvider(catalogName);
}

Expand All @@ -112,7 +104,7 @@ protected IcebergCatalogWrapper createIcebergCatalogWrapper(IcebergConfig iceber
}

private IcebergCatalogWrapper createCatalogWrapper(String catalogName) {
Optional<IcebergConfig> icebergConfig = provider.getIcebergCatalogConfig(catalogName);
Optional<IcebergConfig> icebergConfig = configProvider.getIcebergCatalogConfig(catalogName);
if (!icebergConfig.isPresent()) {
throw new RuntimeException("Couldn't find Iceberg configuration for " + catalogName);
}
Expand All @@ -128,43 +120,6 @@ private IcebergCatalogWrapper createCatalogWrapper(String catalogName) {
return createIcebergCatalogWrapper(icebergConfig.get());
}

private String getCatalogName(String rawPrefix) {
String prefix = shelling(rawPrefix);
Preconditions.checkArgument(
!IcebergConstants.GRAVITINO_DEFAULT_CATALOG.equals(prefix),
String.format("%s is conflict with reserved key, please replace it", prefix));
if (StringUtils.isBlank(prefix)) {
return IcebergConstants.GRAVITINO_DEFAULT_CATALOG;
}
return prefix;
}

private IcebergCatalogConfigProvider createIcebergCatalogConfigProvider(
Map<String, String> properties) {
String providerName =
(new IcebergConfig(properties)).get(IcebergConfig.ICEBERG_REST_CATALOG_CONFIG_PROVIDER);
String className =
ICEBERG_CATALOG_CONFIG_PROVIDER_NAMES.getOrDefault(providerName, providerName);
LOG.info("Load Iceberg catalog provider: {}.", className);
try {
Class<?> providerClz = Class.forName(className);
return (IcebergCatalogConfigProvider) providerClz.getDeclaredConstructor().newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private String shelling(String rawPrefix) {
if (StringUtils.isBlank(rawPrefix)) {
return rawPrefix;
} else {
// rawPrefix is a string matching ([^/]*/) which end with /
Preconditions.checkArgument(
rawPrefix.endsWith("/"), String.format("rawPrefix %s format is illegal", rawPrefix));
return rawPrefix.substring(0, rawPrefix.length() - 1);
}
}

private void closeIcebergCatalogWrapper(IcebergCatalogWrapper catalogWrapper) {
try {
catalogWrapper.close();
Expand All @@ -176,8 +131,5 @@ private void closeIcebergCatalogWrapper(IcebergCatalogWrapper catalogWrapper) {
@Override
public void close() throws Exception {
icebergCatalogWrapperCache.invalidateAll();
if (provider instanceof AutoCloseable) {
((AutoCloseable) provider).close();
}
}
}
Loading

0 comments on commit d35ab31

Please sign in to comment.