Skip to content

Commit

Permalink
[#4236] feat(core): Rework of post hook for dispatcher (#4449)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Rework of post hook for dispatcher .

1. Implement a new class dispatcher instead dynamic proxy.
2. Optimize the lock logic
3. Add the securable object metalake back. We just remove the privileges
of the metalake. We should remove the securable object metalake.

### Why are the changes needed?

Fix: #4236 

### Does this PR introduce _any_ user-facing change?
NO.

### How was this patch tested?

Add new ITs.
  • Loading branch information
jerqi authored Aug 13, 2024
1 parent c8b1702 commit 0804634
Show file tree
Hide file tree
Showing 18 changed files with 1,124 additions and 297 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,18 @@ public class SecurableObjects {
private static final Splitter DOT_SPLITTER = Splitter.on('.');

/**
* Create the catalog {@link SecurableObject} with the given catalog name.
* Create the metalake {@link SecurableObject} with the given metalake name and privileges.
*
* @param metalake The metalake name
* @param privileges The privileges of the metalake
* @return The created metalake {@link SecurableObject}
*/
public static SecurableObject ofMetalake(String metalake, List<Privilege> privileges) {
return of(MetadataObject.Type.METALAKE, Lists.newArrayList(metalake), privileges);
}

/**
* Create the catalog {@link SecurableObject} with the given catalog name and privileges.
*
* @param catalog The catalog name
* @param privileges The privileges of the catalog
Expand All @@ -45,8 +56,8 @@ public static SecurableObject ofCatalog(String catalog, List<Privilege> privileg
}

/**
* Create the schema {@link SecurableObject} with the given securable catalog object and schema
* name.
* Create the schema {@link SecurableObject} with the given securable catalog object, schema name
* and privileges.
*
* @param catalog The catalog securable object.
* @param schema The schema name
Expand All @@ -60,7 +71,8 @@ public static SecurableObject ofSchema(
}

/**
* Create the table {@link SecurableObject} with the given securable schema object and table name.
* Create the table {@link SecurableObject} with the given securable schema object, table name and
* privileges.
*
* @param schema The schema securable object
* @param table The table name
Expand All @@ -75,7 +87,8 @@ public static SecurableObject ofTable(
}

/**
* Create the topic {@link SecurableObject} with the given securable schema object and topic name.
* Create the topic {@link SecurableObject} with the given securable schema object ,topic name and
* privileges.
*
* @param schema The schema securable object
* @param topic The topic name
Expand All @@ -90,8 +103,8 @@ public static SecurableObject ofTopic(
}

/**
* Create the table {@link SecurableObject} with the given securable schema object and fileset
* name.
* Create the table {@link SecurableObject} with the given securable schema object, fileset name
* and privileges.
*
* @param schema The schema securable object
* @param fileset The fileset name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@ public class TestSecurableObjects {

@Test
public void testSecurableObjects() {

SecurableObject metalake =
SecurableObjects.ofMetalake(
"metalake", Lists.newArrayList(Privileges.CreateCatalog.allow()));
Assertions.assertEquals("metalake", metalake.fullName());
Assertions.assertEquals(MetadataObject.Type.METALAKE, metalake.type());
SecurableObject anotherMetalake =
SecurableObjects.of(
MetadataObject.Type.METALAKE,
Lists.newArrayList("metalake"),
Lists.newArrayList(Privileges.CreateCatalog.allow()));
Assertions.assertEquals(metalake, anotherMetalake);

SecurableObject catalog =
SecurableObjects.ofCatalog("catalog", Lists.newArrayList(Privileges.UseCatalog.allow()));
Assertions.assertEquals("catalog", catalog.fullName());
Expand Down
62 changes: 25 additions & 37 deletions core/src/main/java/org/apache/gravitino/GravitinoEnv.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.common.base.Preconditions;
import org.apache.gravitino.authorization.AccessControlDispatcher;
import org.apache.gravitino.authorization.AccessControlManager;
import org.apache.gravitino.authorization.AuthorizationUtils;
import org.apache.gravitino.authorization.OwnerManager;
import org.apache.gravitino.auxiliary.AuxiliaryServiceManager;
import org.apache.gravitino.catalog.CatalogDispatcher;
Expand All @@ -42,8 +41,13 @@
import org.apache.gravitino.catalog.TopicDispatcher;
import org.apache.gravitino.catalog.TopicNormalizeDispatcher;
import org.apache.gravitino.catalog.TopicOperationDispatcher;
import org.apache.gravitino.hook.DispatcherHookHelper;
import org.apache.gravitino.hook.DispatcherHooks;
import org.apache.gravitino.hook.AccessControlHookDispatcher;
import org.apache.gravitino.hook.CatalogHookDispatcher;
import org.apache.gravitino.hook.FilesetHookDispatcher;
import org.apache.gravitino.hook.MetalakeHookDispatcher;
import org.apache.gravitino.hook.SchemaHookDispatcher;
import org.apache.gravitino.hook.TableHookDispatcher;
import org.apache.gravitino.hook.TopicHookDispatcher;
import org.apache.gravitino.listener.CatalogEventDispatcher;
import org.apache.gravitino.listener.EventBus;
import org.apache.gravitino.listener.EventListenerManager;
Expand Down Expand Up @@ -348,28 +352,30 @@ private void initGravitinoServerComponents() {

// Create and initialize metalake related modules
MetalakeDispatcher metalakeManager = new MetalakeManager(entityStore, idGenerator);
MetalakeHookDispatcher metalakeHookDispatcher = new MetalakeHookDispatcher(metalakeManager);
MetalakeNormalizeDispatcher metalakeNormalizeDispatcher =
new MetalakeNormalizeDispatcher(installDispatcherHooks(metalakeManager));
new MetalakeNormalizeDispatcher(metalakeHookDispatcher);
this.metalakeDispatcher = new MetalakeEventDispatcher(eventBus, metalakeNormalizeDispatcher);

// Create and initialize Catalog related modules
this.catalogManager = new CatalogManager(config, entityStore, idGenerator);
CatalogHookDispatcher catalogHookDispatcher = new CatalogHookDispatcher(catalogManager);
CatalogNormalizeDispatcher catalogNormalizeDispatcher =
new CatalogNormalizeDispatcher(installDispatcherHooks((CatalogDispatcher) catalogManager));
new CatalogNormalizeDispatcher(catalogHookDispatcher);
this.catalogDispatcher = new CatalogEventDispatcher(eventBus, catalogNormalizeDispatcher);

SchemaOperationDispatcher schemaOperationDispatcher =
new SchemaOperationDispatcher(catalogManager, entityStore, idGenerator);
SchemaHookDispatcher schemaHookDispatcher = new SchemaHookDispatcher(schemaOperationDispatcher);
SchemaNormalizeDispatcher schemaNormalizeDispatcher =
new SchemaNormalizeDispatcher(
installDispatcherHooks((SchemaDispatcher) schemaOperationDispatcher), catalogManager);
new SchemaNormalizeDispatcher(schemaHookDispatcher, catalogManager);
this.schemaDispatcher = new SchemaEventDispatcher(eventBus, schemaNormalizeDispatcher);

TableOperationDispatcher tableOperationDispatcher =
new TableOperationDispatcher(catalogManager, entityStore, idGenerator);
TableHookDispatcher tableHookDispatcher = new TableHookDispatcher(tableOperationDispatcher);
TableNormalizeDispatcher tableNormalizeDispatcher =
new TableNormalizeDispatcher(
installDispatcherHooks((TableDispatcher) tableOperationDispatcher), catalogManager);
new TableNormalizeDispatcher(tableHookDispatcher, catalogManager);
this.tableDispatcher = new TableEventDispatcher(eventBus, tableNormalizeDispatcher);

// TODO: We can install hooks when we need, we only supports ownership post hook,
Expand All @@ -382,24 +388,27 @@ private void initGravitinoServerComponents() {

FilesetOperationDispatcher filesetOperationDispatcher =
new FilesetOperationDispatcher(catalogManager, entityStore, idGenerator);
FilesetHookDispatcher filesetHookDispatcher =
new FilesetHookDispatcher(filesetOperationDispatcher);
FilesetNormalizeDispatcher filesetNormalizeDispatcher =
new FilesetNormalizeDispatcher(
installDispatcherHooks((FilesetDispatcher) filesetOperationDispatcher), catalogManager);
new FilesetNormalizeDispatcher(filesetHookDispatcher, catalogManager);
this.filesetDispatcher = new FilesetEventDispatcher(eventBus, filesetNormalizeDispatcher);

TopicOperationDispatcher topicOperationDispatcher =
new TopicOperationDispatcher(catalogManager, entityStore, idGenerator);
TopicHookDispatcher topicHookDispatcher = new TopicHookDispatcher(topicOperationDispatcher);
TopicNormalizeDispatcher topicNormalizeDispatcher =
new TopicNormalizeDispatcher(
installDispatcherHooks((TopicDispatcher) topicOperationDispatcher), catalogManager);
new TopicNormalizeDispatcher(topicHookDispatcher, catalogManager);
this.topicDispatcher = new TopicEventDispatcher(eventBus, topicNormalizeDispatcher);

// Create and initialize access control related modules
boolean enableAuthorization = config.get(Configs.ENABLE_AUTHORIZATION);
if (enableAuthorization) {
this.accessControlDispatcher =
installDispatcherHooks(
(AccessControlDispatcher) new AccessControlManager(entityStore, idGenerator, config));
AccessControlHookDispatcher accessControlHookDispatcher =
new AccessControlHookDispatcher(
new AccessControlManager(entityStore, idGenerator, config));

this.accessControlDispatcher = accessControlHookDispatcher;
this.ownerManager = new OwnerManager(entityStore);
} else {
this.accessControlDispatcher = null;
Expand All @@ -415,25 +424,4 @@ private void initGravitinoServerComponents() {
// Tag manager
this.tagManager = new TagManager(idGenerator, entityStore);
}

// Provides a universal entrance to install dispatcher hooks. This method
// focuses the logic of installing hooks.
// We should reuse the ability of (Metalake|Schema|Table|Fileset|...)NormalizeDispatcher to avoid
// solving
// normalization names, this is useful for pre-hooks.
// so we can't install the hooks for the outside of
// (Metalake|Schema|Table|Fileset|...)NormalizeDispatcher.
private <T> T installDispatcherHooks(T manager) {
boolean enableAuthorization = config.get(Configs.ENABLE_AUTHORIZATION);
DispatcherHooks hooks = new DispatcherHooks();
if (enableAuthorization) {
AuthorizationUtils.prepareAuthorizationHooks(manager, hooks);
}

if (hooks.isEmpty()) {
return manager;
}

return DispatcherHookHelper.installHooks(manager, hooks);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.exceptions.NoSuchMetalakeException;
import org.apache.gravitino.hook.DispatcherHooks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -117,10 +116,4 @@ public static void checkRoleNamespace(Namespace namespace) {
"Role namespace must have 3 levels, the input namespace is %s",
namespace);
}

// Install some post hooks used for owner. The owner will have the all privileges
// of securable objects, users, groups, roles.
public static <T> void prepareAuthorizationHooks(T manager, DispatcherHooks hooks) {
// TODO: Refactor the post hook by adding new dispatcher
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@

/**
* OwnerManager is used for manage the owner of metadata object. The user and group don't have an
* owner
* owner. Because the post hook will call the methods. We shouldn't add the lock of the metadata
* object. Otherwise, it will cause deadlock.
*/
public class OwnerManager {
private static final Logger LOG = LoggerFactory.getLogger(OwnerManager.class);
Expand Down Expand Up @@ -71,45 +72,37 @@ public void setOwner(
if (ownerType == Owner.Type.USER) {
NameIdentifier ownerIdent = AuthorizationUtils.ofUser(metalake, ownerName);
TreeLockUtils.doWithTreeLock(
objectIdent,
ownerIdent,
LockType.READ,
() ->
TreeLockUtils.doWithTreeLock(
ownerIdent,
LockType.READ,
() -> {
store
.relationOperations()
.insertRelation(
SupportsRelationOperations.Type.OWNER_REL,
objectIdent,
MetadataObjectUtil.toEntityType(metadataObject),
ownerIdent,
Entity.EntityType.USER,
true);
return null;
}));
() -> {
store
.relationOperations()
.insertRelation(
SupportsRelationOperations.Type.OWNER_REL,
objectIdent,
MetadataObjectUtil.toEntityType(metadataObject),
ownerIdent,
Entity.EntityType.USER,
true);
return null;
});
} else if (ownerType == Owner.Type.GROUP) {
NameIdentifier ownerIdent = AuthorizationUtils.ofGroup(metalake, ownerName);
TreeLockUtils.doWithTreeLock(
objectIdent,
ownerIdent,
LockType.READ,
() ->
TreeLockUtils.doWithTreeLock(
ownerIdent,
LockType.READ,
() -> {
store
.relationOperations()
.insertRelation(
SupportsRelationOperations.Type.OWNER_REL,
objectIdent,
MetadataObjectUtil.toEntityType(metadataObject),
ownerIdent,
Entity.EntityType.GROUP,
true);
return null;
}));
() -> {
store
.relationOperations()
.insertRelation(
SupportsRelationOperations.Type.OWNER_REL,
objectIdent,
MetadataObjectUtil.toEntityType(metadataObject),
ownerIdent,
Entity.EntityType.GROUP,
true);
return null;
});
}
} catch (NoSuchEntityException nse) {
LOG.warn(
Expand Down
Loading

0 comments on commit 0804634

Please sign in to comment.