Skip to content

Commit

Permalink
Add hook dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
jerqi committed Aug 8, 2024
1 parent 1a8d59a commit 4454e2b
Show file tree
Hide file tree
Showing 18 changed files with 1,020 additions and 290 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ public class SecurableObjects {

private static final Splitter DOT_SPLITTER = Splitter.on('.');

/**
* Create the metalake {@link SecurableObject} with the given metalake name.
*
* @param metalake The metalake name
* @param privileges The privileges of the metalake
* @return The created metalake {@link SecurableObject}
*/
public static SecurableObject ofMetalake(String metalake, List<Privilege> privileges) {
return of(MetadataObject.Type.METALAKE, Lists.newArrayList(metalake), privileges);
}

/**
* Create the catalog {@link SecurableObject} with the given catalog name.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@ public class TestSecurableObjects {

@Test
public void testSecurableObjects() {

SecurableObject metalake =
SecurableObjects.ofMetalake(
"metalake", Lists.newArrayList(Privileges.CreateCatalog.allow()));
Assertions.assertEquals("metalake", metalake.fullName());
Assertions.assertEquals(MetadataObject.Type.METALAKE, metalake.type());
SecurableObject anotherMetalake =
SecurableObjects.of(
MetadataObject.Type.METALAKE,
Lists.newArrayList("metalake"),
Lists.newArrayList(Privileges.CreateCatalog.allow()));
Assertions.assertEquals(metalake, anotherMetalake);

SecurableObject catalog =
SecurableObjects.ofCatalog("catalog", Lists.newArrayList(Privileges.UseCatalog.allow()));
Assertions.assertEquals("catalog", catalog.fullName());
Expand Down
62 changes: 25 additions & 37 deletions core/src/main/java/org/apache/gravitino/GravitinoEnv.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.common.base.Preconditions;
import org.apache.gravitino.authorization.AccessControlDispatcher;
import org.apache.gravitino.authorization.AccessControlManager;
import org.apache.gravitino.authorization.AuthorizationUtils;
import org.apache.gravitino.authorization.OwnerManager;
import org.apache.gravitino.auxiliary.AuxiliaryServiceManager;
import org.apache.gravitino.catalog.CatalogDispatcher;
Expand All @@ -42,8 +41,13 @@
import org.apache.gravitino.catalog.TopicDispatcher;
import org.apache.gravitino.catalog.TopicNormalizeDispatcher;
import org.apache.gravitino.catalog.TopicOperationDispatcher;
import org.apache.gravitino.hook.DispatcherHookHelper;
import org.apache.gravitino.hook.DispatcherHooks;
import org.apache.gravitino.hook.AccessControlHookDispatcher;
import org.apache.gravitino.hook.CatalogHookDispatcher;
import org.apache.gravitino.hook.FilesetHookDispatcher;
import org.apache.gravitino.hook.MetalakeHookDispatcher;
import org.apache.gravitino.hook.SchemaHookDispatcher;
import org.apache.gravitino.hook.TableHookDispatcher;
import org.apache.gravitino.hook.TopicHookDispatcher;
import org.apache.gravitino.listener.CatalogEventDispatcher;
import org.apache.gravitino.listener.EventBus;
import org.apache.gravitino.listener.EventListenerManager;
Expand Down Expand Up @@ -348,28 +352,30 @@ private void initGravitinoServerComponents() {

// Create and initialize metalake related modules
MetalakeDispatcher metalakeManager = new MetalakeManager(entityStore, idGenerator);
MetalakeHookDispatcher metalakeHookDispatcher = new MetalakeHookDispatcher(metalakeManager);
MetalakeNormalizeDispatcher metalakeNormalizeDispatcher =
new MetalakeNormalizeDispatcher(installDispatcherHooks(metalakeManager));
new MetalakeNormalizeDispatcher(metalakeHookDispatcher);
this.metalakeDispatcher = new MetalakeEventDispatcher(eventBus, metalakeNormalizeDispatcher);

// Create and initialize Catalog related modules
this.catalogManager = new CatalogManager(config, entityStore, idGenerator);
CatalogHookDispatcher catalogHookDispatcher = new CatalogHookDispatcher(catalogManager);
CatalogNormalizeDispatcher catalogNormalizeDispatcher =
new CatalogNormalizeDispatcher(installDispatcherHooks((CatalogDispatcher) catalogManager));
new CatalogNormalizeDispatcher(catalogHookDispatcher);
this.catalogDispatcher = new CatalogEventDispatcher(eventBus, catalogNormalizeDispatcher);

SchemaOperationDispatcher schemaOperationDispatcher =
new SchemaOperationDispatcher(catalogManager, entityStore, idGenerator);
SchemaHookDispatcher schemaHookDispatcher = new SchemaHookDispatcher(schemaOperationDispatcher);
SchemaNormalizeDispatcher schemaNormalizeDispatcher =
new SchemaNormalizeDispatcher(
installDispatcherHooks((SchemaDispatcher) schemaOperationDispatcher), catalogManager);
new SchemaNormalizeDispatcher(schemaHookDispatcher, catalogManager);
this.schemaDispatcher = new SchemaEventDispatcher(eventBus, schemaNormalizeDispatcher);

TableOperationDispatcher tableOperationDispatcher =
new TableOperationDispatcher(catalogManager, entityStore, idGenerator);
TableHookDispatcher tableHookDispatcher = new TableHookDispatcher(tableOperationDispatcher);
TableNormalizeDispatcher tableNormalizeDispatcher =
new TableNormalizeDispatcher(
installDispatcherHooks((TableDispatcher) tableOperationDispatcher), catalogManager);
new TableNormalizeDispatcher(tableHookDispatcher, catalogManager);
this.tableDispatcher = new TableEventDispatcher(eventBus, tableNormalizeDispatcher);

// TODO: We can install hooks when we need, we only supports ownership post hook,
Expand All @@ -382,24 +388,27 @@ private void initGravitinoServerComponents() {

FilesetOperationDispatcher filesetOperationDispatcher =
new FilesetOperationDispatcher(catalogManager, entityStore, idGenerator);
FilesetHookDispatcher filesetHookDispatcher =
new FilesetHookDispatcher(filesetOperationDispatcher);
FilesetNormalizeDispatcher filesetNormalizeDispatcher =
new FilesetNormalizeDispatcher(
installDispatcherHooks((FilesetDispatcher) filesetOperationDispatcher), catalogManager);
new FilesetNormalizeDispatcher(filesetHookDispatcher, catalogManager);
this.filesetDispatcher = new FilesetEventDispatcher(eventBus, filesetNormalizeDispatcher);

TopicOperationDispatcher topicOperationDispatcher =
new TopicOperationDispatcher(catalogManager, entityStore, idGenerator);
TopicHookDispatcher topicHookDispatcher = new TopicHookDispatcher(topicOperationDispatcher);
TopicNormalizeDispatcher topicNormalizeDispatcher =
new TopicNormalizeDispatcher(
installDispatcherHooks((TopicDispatcher) topicOperationDispatcher), catalogManager);
new TopicNormalizeDispatcher(topicHookDispatcher, catalogManager);
this.topicDispatcher = new TopicEventDispatcher(eventBus, topicNormalizeDispatcher);

// Create and initialize access control related modules
boolean enableAuthorization = config.get(Configs.ENABLE_AUTHORIZATION);
if (enableAuthorization) {
this.accessControlDispatcher =
installDispatcherHooks(
(AccessControlDispatcher) new AccessControlManager(entityStore, idGenerator, config));
AccessControlHookDispatcher accessControlHookDispatcher =
new AccessControlHookDispatcher(
new AccessControlManager(entityStore, idGenerator, config));

this.accessControlDispatcher = accessControlHookDispatcher;
this.ownerManager = new OwnerManager(entityStore);
} else {
this.accessControlDispatcher = null;
Expand All @@ -415,25 +424,4 @@ private void initGravitinoServerComponents() {
// Tag manager
this.tagManager = new TagManager(idGenerator, entityStore);
}

// Provides a universal entrance to install dispatcher hooks. This method
// focuses the logic of installing hooks.
// We should reuse the ability of (Metalake|Schema|Table|Fileset|...)NormalizeDispatcher to avoid
// solving
// normalization names, this is useful for pre-hooks.
// so we can't install the hooks for the outside of
// (Metalake|Schema|Table|Fileset|...)NormalizeDispatcher.
private <T> T installDispatcherHooks(T manager) {
boolean enableAuthorization = config.get(Configs.ENABLE_AUTHORIZATION);
DispatcherHooks hooks = new DispatcherHooks();
if (enableAuthorization) {
AuthorizationUtils.prepareAuthorizationHooks(manager, hooks);
}

if (hooks.isEmpty()) {
return manager;
}

return DispatcherHookHelper.installHooks(manager, hooks);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.exceptions.NoSuchMetalakeException;
import org.apache.gravitino.hook.DispatcherHooks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -117,10 +116,4 @@ public static void checkRoleNamespace(Namespace namespace) {
"Role namespace must have 3 levels, the input namespace is %s",
namespace);
}

// Install some post hooks used for owner. The owner will have the all privileges
// of securable objects, users, groups, roles.
public static <T> void prepareAuthorizationHooks(T manager, DispatcherHooks hooks) {
// TODO: Refactor the post hook by adding new dispatcher
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@

/**
* OwnerManager is used for manage the owner of metadata object. The user and group don't have an
* owner
* owner. Because the post hook will call the methods. We shouldn't add the lock of the metadata
* object. Otherwise, it will cause deadlock.
*/
public class OwnerManager {
private static final Logger LOG = LoggerFactory.getLogger(OwnerManager.class);
Expand Down Expand Up @@ -71,45 +72,37 @@ public void setOwner(
if (ownerType == Owner.Type.USER) {
NameIdentifier ownerIdent = AuthorizationUtils.ofUser(metalake, ownerName);
TreeLockUtils.doWithTreeLock(
objectIdent,
ownerIdent,
LockType.READ,
() ->
TreeLockUtils.doWithTreeLock(
ownerIdent,
LockType.READ,
() -> {
store
.relationOperations()
.insertRelation(
SupportsRelationOperations.Type.OWNER_REL,
objectIdent,
MetadataObjectUtil.toEntityType(metadataObject),
ownerIdent,
Entity.EntityType.USER,
true);
return null;
}));
() -> {
store
.relationOperations()
.insertRelation(
SupportsRelationOperations.Type.OWNER_REL,
objectIdent,
MetadataObjectUtil.toEntityType(metadataObject),
ownerIdent,
Entity.EntityType.USER,
true);
return null;
});
} else if (ownerType == Owner.Type.GROUP) {
NameIdentifier ownerIdent = AuthorizationUtils.ofGroup(metalake, ownerName);
TreeLockUtils.doWithTreeLock(
objectIdent,
ownerIdent,
LockType.READ,
() ->
TreeLockUtils.doWithTreeLock(
ownerIdent,
LockType.READ,
() -> {
store
.relationOperations()
.insertRelation(
SupportsRelationOperations.Type.OWNER_REL,
objectIdent,
MetadataObjectUtil.toEntityType(metadataObject),
ownerIdent,
Entity.EntityType.GROUP,
true);
return null;
}));
() -> {
store
.relationOperations()
.insertRelation(
SupportsRelationOperations.Type.OWNER_REL,
objectIdent,
MetadataObjectUtil.toEntityType(metadataObject),
ownerIdent,
Entity.EntityType.GROUP,
true);
return null;
});
}
} catch (NoSuchEntityException nse) {
LOG.warn(
Expand Down
Loading

0 comments on commit 4454e2b

Please sign in to comment.