Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#4236] feat(core): Rework of post hook for dispatcher #4429

Merged
merged 1 commit into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,18 @@ public class SecurableObjects {
private static final Splitter DOT_SPLITTER = Splitter.on('.');

/**
* Create the catalog {@link SecurableObject} with the given catalog name.
* Create the metalake {@link SecurableObject} with the given metalake name and privileges.
*
* @param metalake The metalake name
* @param privileges The privileges of the metalake
* @return The created metalake {@link SecurableObject}
*/
public static SecurableObject ofMetalake(String metalake, List<Privilege> privileges) {
return of(MetadataObject.Type.METALAKE, Lists.newArrayList(metalake), privileges);
}

/**
* Create the catalog {@link SecurableObject} with the given catalog name and privileges.
*
* @param catalog The catalog name
* @param privileges The privileges of the catalog
Expand All @@ -45,8 +56,8 @@ public static SecurableObject ofCatalog(String catalog, List<Privilege> privileg
}

/**
* Create the schema {@link SecurableObject} with the given securable catalog object and schema
* name.
* Create the schema {@link SecurableObject} with the given securable catalog object, schema name
* and privileges.
*
* @param catalog The catalog securable object.
* @param schema The schema name
Expand All @@ -60,7 +71,8 @@ public static SecurableObject ofSchema(
}

/**
* Create the table {@link SecurableObject} with the given securable schema object and table name.
* Create the table {@link SecurableObject} with the given securable schema object, table name and
* privileges.
*
* @param schema The schema securable object
* @param table The table name
Expand All @@ -75,7 +87,8 @@ public static SecurableObject ofTable(
}

/**
* Create the topic {@link SecurableObject} with the given securable schema object and topic name.
* Create the topic {@link SecurableObject} with the given securable schema object ,topic name and
* privileges.
*
* @param schema The schema securable object
* @param topic The topic name
Expand All @@ -90,8 +103,8 @@ public static SecurableObject ofTopic(
}

/**
* Create the table {@link SecurableObject} with the given securable schema object and fileset
* name.
* Create the table {@link SecurableObject} with the given securable schema object, fileset name
* and privileges.
*
* @param schema The schema securable object
* @param fileset The fileset name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@ public class TestSecurableObjects {

@Test
public void testSecurableObjects() {

SecurableObject metalake =
SecurableObjects.ofMetalake(
"metalake", Lists.newArrayList(Privileges.CreateCatalog.allow()));
Assertions.assertEquals("metalake", metalake.fullName());
Assertions.assertEquals(MetadataObject.Type.METALAKE, metalake.type());
SecurableObject anotherMetalake =
SecurableObjects.of(
MetadataObject.Type.METALAKE,
Lists.newArrayList("metalake"),
Lists.newArrayList(Privileges.CreateCatalog.allow()));
Assertions.assertEquals(metalake, anotherMetalake);

SecurableObject catalog =
SecurableObjects.ofCatalog("catalog", Lists.newArrayList(Privileges.UseCatalog.allow()));
Assertions.assertEquals("catalog", catalog.fullName());
Expand Down
62 changes: 25 additions & 37 deletions core/src/main/java/org/apache/gravitino/GravitinoEnv.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.common.base.Preconditions;
import org.apache.gravitino.authorization.AccessControlDispatcher;
import org.apache.gravitino.authorization.AccessControlManager;
import org.apache.gravitino.authorization.AuthorizationUtils;
import org.apache.gravitino.authorization.OwnerManager;
import org.apache.gravitino.auxiliary.AuxiliaryServiceManager;
import org.apache.gravitino.catalog.CatalogDispatcher;
Expand All @@ -42,8 +41,13 @@
import org.apache.gravitino.catalog.TopicDispatcher;
import org.apache.gravitino.catalog.TopicNormalizeDispatcher;
import org.apache.gravitino.catalog.TopicOperationDispatcher;
import org.apache.gravitino.hook.DispatcherHookHelper;
import org.apache.gravitino.hook.DispatcherHooks;
import org.apache.gravitino.hook.AccessControlHookDispatcher;
import org.apache.gravitino.hook.CatalogHookDispatcher;
import org.apache.gravitino.hook.FilesetHookDispatcher;
import org.apache.gravitino.hook.MetalakeHookDispatcher;
import org.apache.gravitino.hook.SchemaHookDispatcher;
import org.apache.gravitino.hook.TableHookDispatcher;
import org.apache.gravitino.hook.TopicHookDispatcher;
import org.apache.gravitino.listener.CatalogEventDispatcher;
import org.apache.gravitino.listener.EventBus;
import org.apache.gravitino.listener.EventListenerManager;
Expand Down Expand Up @@ -348,28 +352,30 @@ private void initGravitinoServerComponents() {

// Create and initialize metalake related modules
MetalakeDispatcher metalakeManager = new MetalakeManager(entityStore, idGenerator);
MetalakeHookDispatcher metalakeHookDispatcher = new MetalakeHookDispatcher(metalakeManager);
MetalakeNormalizeDispatcher metalakeNormalizeDispatcher =
new MetalakeNormalizeDispatcher(installDispatcherHooks(metalakeManager));
new MetalakeNormalizeDispatcher(metalakeHookDispatcher);
this.metalakeDispatcher = new MetalakeEventDispatcher(eventBus, metalakeNormalizeDispatcher);

// Create and initialize Catalog related modules
this.catalogManager = new CatalogManager(config, entityStore, idGenerator);
CatalogHookDispatcher catalogHookDispatcher = new CatalogHookDispatcher(catalogManager);
CatalogNormalizeDispatcher catalogNormalizeDispatcher =
new CatalogNormalizeDispatcher(installDispatcherHooks((CatalogDispatcher) catalogManager));
new CatalogNormalizeDispatcher(catalogHookDispatcher);
Comment on lines 361 to +364
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we have wrapped the dispatcher too many times.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Known issues. But we don't have better solution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can put normalizeDispatcher to HookDispatcher in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also feel the current Dispatcher mechanism inelegant.
I create a issue track it

this.catalogDispatcher = new CatalogEventDispatcher(eventBus, catalogNormalizeDispatcher);

SchemaOperationDispatcher schemaOperationDispatcher =
new SchemaOperationDispatcher(catalogManager, entityStore, idGenerator);
SchemaHookDispatcher schemaHookDispatcher = new SchemaHookDispatcher(schemaOperationDispatcher);
SchemaNormalizeDispatcher schemaNormalizeDispatcher =
new SchemaNormalizeDispatcher(
installDispatcherHooks((SchemaDispatcher) schemaOperationDispatcher), catalogManager);
new SchemaNormalizeDispatcher(schemaHookDispatcher, catalogManager);
this.schemaDispatcher = new SchemaEventDispatcher(eventBus, schemaNormalizeDispatcher);

TableOperationDispatcher tableOperationDispatcher =
new TableOperationDispatcher(catalogManager, entityStore, idGenerator);
TableHookDispatcher tableHookDispatcher = new TableHookDispatcher(tableOperationDispatcher);
TableNormalizeDispatcher tableNormalizeDispatcher =
new TableNormalizeDispatcher(
installDispatcherHooks((TableDispatcher) tableOperationDispatcher), catalogManager);
new TableNormalizeDispatcher(tableHookDispatcher, catalogManager);
this.tableDispatcher = new TableEventDispatcher(eventBus, tableNormalizeDispatcher);

// TODO: We can install hooks when we need, we only supports ownership post hook,
Expand All @@ -382,24 +388,27 @@ private void initGravitinoServerComponents() {

FilesetOperationDispatcher filesetOperationDispatcher =
new FilesetOperationDispatcher(catalogManager, entityStore, idGenerator);
FilesetHookDispatcher filesetHookDispatcher =
new FilesetHookDispatcher(filesetOperationDispatcher);
FilesetNormalizeDispatcher filesetNormalizeDispatcher =
new FilesetNormalizeDispatcher(
installDispatcherHooks((FilesetDispatcher) filesetOperationDispatcher), catalogManager);
new FilesetNormalizeDispatcher(filesetHookDispatcher, catalogManager);
this.filesetDispatcher = new FilesetEventDispatcher(eventBus, filesetNormalizeDispatcher);

TopicOperationDispatcher topicOperationDispatcher =
new TopicOperationDispatcher(catalogManager, entityStore, idGenerator);
TopicHookDispatcher topicHookDispatcher = new TopicHookDispatcher(topicOperationDispatcher);
TopicNormalizeDispatcher topicNormalizeDispatcher =
new TopicNormalizeDispatcher(
installDispatcherHooks((TopicDispatcher) topicOperationDispatcher), catalogManager);
new TopicNormalizeDispatcher(topicHookDispatcher, catalogManager);
this.topicDispatcher = new TopicEventDispatcher(eventBus, topicNormalizeDispatcher);

// Create and initialize access control related modules
boolean enableAuthorization = config.get(Configs.ENABLE_AUTHORIZATION);
if (enableAuthorization) {
this.accessControlDispatcher =
installDispatcherHooks(
(AccessControlDispatcher) new AccessControlManager(entityStore, idGenerator, config));
AccessControlHookDispatcher accessControlHookDispatcher =
new AccessControlHookDispatcher(
new AccessControlManager(entityStore, idGenerator, config));

this.accessControlDispatcher = accessControlHookDispatcher;
this.ownerManager = new OwnerManager(entityStore);
} else {
this.accessControlDispatcher = null;
Expand All @@ -415,25 +424,4 @@ private void initGravitinoServerComponents() {
// Tag manager
this.tagManager = new TagManager(idGenerator, entityStore);
}

// Provides a universal entrance to install dispatcher hooks. This method
// focuses the logic of installing hooks.
// We should reuse the ability of (Metalake|Schema|Table|Fileset|...)NormalizeDispatcher to avoid
// solving
// normalization names, this is useful for pre-hooks.
// so we can't install the hooks for the outside of
// (Metalake|Schema|Table|Fileset|...)NormalizeDispatcher.
private <T> T installDispatcherHooks(T manager) {
boolean enableAuthorization = config.get(Configs.ENABLE_AUTHORIZATION);
DispatcherHooks hooks = new DispatcherHooks();
if (enableAuthorization) {
AuthorizationUtils.prepareAuthorizationHooks(manager, hooks);
}

if (hooks.isEmpty()) {
return manager;
}

return DispatcherHookHelper.installHooks(manager, hooks);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.exceptions.NoSuchMetalakeException;
import org.apache.gravitino.hook.DispatcherHooks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -117,10 +116,4 @@ public static void checkRoleNamespace(Namespace namespace) {
"Role namespace must have 3 levels, the input namespace is %s",
namespace);
}

// Install some post hooks used for owner. The owner will have the all privileges
// of securable objects, users, groups, roles.
public static <T> void prepareAuthorizationHooks(T manager, DispatcherHooks hooks) {
// TODO: Refactor the post hook by adding new dispatcher
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@

/**
* OwnerManager is used for manage the owner of metadata object. The user and group don't have an
* owner
* owner. Because the post hook will call the methods. We shouldn't add the lock of the metadata
* object. Otherwise, it will cause deadlock.
*/
public class OwnerManager {
private static final Logger LOG = LoggerFactory.getLogger(OwnerManager.class);
Expand Down Expand Up @@ -71,45 +72,37 @@ public void setOwner(
if (ownerType == Owner.Type.USER) {
NameIdentifier ownerIdent = AuthorizationUtils.ofUser(metalake, ownerName);
TreeLockUtils.doWithTreeLock(
objectIdent,
ownerIdent,
LockType.READ,
() ->
TreeLockUtils.doWithTreeLock(
ownerIdent,
LockType.READ,
() -> {
store
.relationOperations()
.insertRelation(
SupportsRelationOperations.Type.OWNER_REL,
objectIdent,
MetadataObjectUtil.toEntityType(metadataObject),
ownerIdent,
Entity.EntityType.USER,
true);
return null;
}));
() -> {
store
.relationOperations()
.insertRelation(
SupportsRelationOperations.Type.OWNER_REL,
objectIdent,
MetadataObjectUtil.toEntityType(metadataObject),
ownerIdent,
Entity.EntityType.USER,
true);
return null;
});
} else if (ownerType == Owner.Type.GROUP) {
NameIdentifier ownerIdent = AuthorizationUtils.ofGroup(metalake, ownerName);
TreeLockUtils.doWithTreeLock(
objectIdent,
ownerIdent,
LockType.READ,
() ->
TreeLockUtils.doWithTreeLock(
ownerIdent,
LockType.READ,
() -> {
store
.relationOperations()
.insertRelation(
SupportsRelationOperations.Type.OWNER_REL,
objectIdent,
MetadataObjectUtil.toEntityType(metadataObject),
ownerIdent,
Entity.EntityType.GROUP,
true);
return null;
}));
() -> {
store
.relationOperations()
.insertRelation(
SupportsRelationOperations.Type.OWNER_REL,
objectIdent,
MetadataObjectUtil.toEntityType(metadataObject),
ownerIdent,
Entity.EntityType.GROUP,
true);
return null;
});
}
} catch (NoSuchEntityException nse) {
LOG.warn(
Expand Down
Loading
Loading