-
Notifications
You must be signed in to change notification settings - Fork 380
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[4958] feat(iceberg): support event listener for Iceberg REST server #5002
Conversation
...t-server/src/main/java/org/apache/gravitino/iceberg/service/rest/IcebergTableOperations.java
Outdated
Show resolved
Hide resolved
@jerryshao please help to review when you have time, thanks |
* @param config The configuration object to initialize the environment. | ||
*/ | ||
public void initializeAllComponents(Config config) { | ||
LOG.info("Initializing Gravitino all Environment..."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"full environement.."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class IcebergEventLogger implements EventListenerPlugin { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't have to had a IcebergEventLogger
here for demo purpose. A better way is to add audit log support for iceberg rest server when Audit PR is merged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
|
||
// remove the last '/' from the prefix, for example transform 'iceberg_catalog/' to | ||
// 'iceberg_catalog' | ||
private static String shelling(String rawPrefix) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the meaning of shelling
, it is hard to understand, can we use a better name?
NameIdentifier nameIdentifier = | ||
IcebergRestUtils.getGravitinoNameIdentifier(catalogName, tableIdentifier); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Do we support multiple level namespace for Iceberg rest server?
- Do we need to add metalake to the name identifier?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1, Our implementation doesn't limit on the namespace levels.
2, Iceberg rest server doesn't introduce the concept of metalake for now, should we add a default metalake name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use gravitino
as default metalake name
* @return A {@link LoadTableResponse} object containing the result of the operation. | ||
*/ | ||
LoadTableResponse createTable( | ||
String catalogName, Namespace namespace, CreateTableRequest createTableRequest); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So you only support one operation for now, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will you do this in the follow-up PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, will support the other table operations in another PR
import org.apache.iceberg.rest.responses.LoadTableResponse; | ||
|
||
/** | ||
* The {@code IcebergTableOperationProcessor} locates the corresponding {@code |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the meaning of "locate"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
* The {@code IcebergTableOperationProcessor} locates the corresponding {@code | ||
* IcebergCatalogWrapper} based on the prefix in order to perform the actual table operations. | ||
*/ | ||
public class IcebergTableOperationProcessor implements IcebergTableOperationDispatcher { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it better to use Executor
than Processor
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
public IcebergCreateTablePreEvent( | ||
String user, NameIdentifier resourceIdentifier, CreateTableRequest createTableRequest) { | ||
super(user, resourceIdentifier); | ||
this.createTableRequest = createTableRequest; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to clone this object here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it's designed to allow the user to change the object.
|
||
/** Represent a failure event when do Iceberg table operation failed. */ | ||
@DeveloperApi | ||
public class IcebergTableFailureEvent extends IcebergRESTFailureEvent { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need one IcebergRESTFailureEvent
and another IcebergTableFailureEvent
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another thing is that the class name is too verbose, I think it can be simplified like IcebergTableFailureEvent
, IcebergTablePostEvent
, IcebergFailureEvent
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to form a event hierarchy, FailureEvent -> IcebergRESTFailureEvent -> IcebergTableFailureEvent -> IcebergCreateTableFailureEvent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
import org.apache.gravitino.NameIdentifier; | ||
|
||
/** Represents an abstract table post event in Gravitino Iceberg REST server. */ | ||
public abstract class IcebergTablePostEvent extends IcebergRESTPostEvent { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
* @param config The configuration object to initialize the environment. | ||
*/ | ||
public void initializeFullComponents(Config config) { | ||
LOG.info("Initializing Gravitino full Environment..."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"environment..."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
String prefix = normalizePrefix(rawPrefix); | ||
Preconditions.checkArgument( | ||
!IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG.equals(prefix), | ||
String.format("%s is conflict with reserved key, please replace it", prefix)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"...is conflicted with reserved catalog name"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
|
||
/** Represents an abstract failure event in Gravitino Iceberg REST server. */ | ||
@DeveloperApi | ||
public abstract class IcebergRESTFailureEvent extends FailureEvent { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe IcebergFailureEvent
is enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
This seems has a lot of conflicts, please fix it first. |
updated the PR, please help to review again |
@@ -205,7 +205,7 @@ public class IcebergConfig extends Config implements OverwriteDefaultConfig { | |||
public static final ConfigEntry<String> ICEBERG_REST_CATALOG_CONFIG_PROVIDER = | |||
new ConfigBuilder(IcebergConstants.ICEBERG_REST_CATALOG_CONFIG_PROVIDER) | |||
.doc( | |||
"Catalog provider class name, you can develop a class that implements `IcebergCatalogConfigProvider` and add the corresponding jar file to the Iceberg REST service classpath directory.") | |||
"Catalog provider class name, you can develop a class that implements `IcebergConfigProvider` and add the corresponding jar file to the Iceberg REST service classpath directory.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is too long, you need to make it less than 100 chars.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
Map<String, String> configProperties = icebergConfig.getAllConfig(); | ||
EventBus eventBus = GravitinoEnv.getInstance().eventBus(); | ||
this.configProvider = IcebergConfigProviderFactory.create(configProperties); | ||
configProvider.initialize(configProperties); | ||
String metalakeName = configProvider.getMetalakeName(); | ||
this.icebergCatalogWrapperManager = | ||
new IcebergCatalogWrapperManager(configProperties, configProvider); | ||
this.icebergMetricsManager = new IcebergMetricsManager(icebergConfig); | ||
IcebergTableOperationExecutor icebergTableOperationExecutor = | ||
new IcebergTableOperationExecutor(icebergCatalogWrapperManager); | ||
IcebergTableEventDispatcher icebergTableEventDispatcher = | ||
new IcebergTableEventDispatcher(icebergTableOperationExecutor, eventBus, metalakeName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add some necessary blank line in this code block to make this code block easy to read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
|
||
public class IcebergConfigProviderFactory { | ||
public static final Logger LOG = LoggerFactory.getLogger(IcebergConfigProviderFactory.class); | ||
private static final ImmutableMap<String, String> ICEBERG_CATALOG_CONFIG_PROVIDER_NAMES = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a blank line above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
LOG.info( | ||
"Create Iceberg table, namespace: {}, create table request: {}, accessDelegation: {}, isCredentialVending: {}", | ||
namespace, | ||
"Create Iceberg table, catalog: {}, namespace: {}, create table request: {}, accessDelegation: {}, isCredentialVending: {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this line is also too long.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
…pache#5002) ### What changes were proposed in this pull request? 1. Integrate Event listener system to Iceberg REST server 2. only dispatch create table event like `CreateIcebergTableEvent` `CreateIcebergTablePreEvent` `CreateIcebergTableFailureEvent` ### Why are the changes needed? Fix: apache#4958 ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? 1. add configuration to add `IcebergEventLogger` in event listener ``` gravitino.eventListener.names = iceberg gravitino.eventListener.iceberg.class = org.apache.gravitino.iceberg.extension.IcebergEventLogger ``` 2. run query will see event logs. --------- Co-authored-by: Qi Yu <[email protected]>
What changes were proposed in this pull request?
CreateIcebergTableEvent
CreateIcebergTablePreEvent
CreateIcebergTableFailureEvent
Why are the changes needed?
Fix: #4958
Does this PR introduce any user-facing change?
no
How was this patch tested?
IcebergEventLogger
in event listener