Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#4429] Add hook dispatcher #12

Merged
merged 1 commit into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ public class SecurableObjects {

private static final Splitter DOT_SPLITTER = Splitter.on('.');

/**
* Create the metalake {@link SecurableObject} with the given metalake name.
*
* @param metalake The metalake name
* @param privileges The privileges of the metalake
* @return The created metalake {@link SecurableObject}
*/
public static SecurableObject ofMetalake(String metalake, List<Privilege> privileges) {
return of(MetadataObject.Type.METALAKE, Lists.newArrayList(metalake), privileges);
}

/**
* Create the catalog {@link SecurableObject} with the given catalog name.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@ public class TestSecurableObjects {

@Test
public void testSecurableObjects() {

SecurableObject metalake =
SecurableObjects.ofMetalake(
"metalake", Lists.newArrayList(Privileges.CreateCatalog.allow()));
Assertions.assertEquals("metalake", metalake.fullName());
Assertions.assertEquals(MetadataObject.Type.METALAKE, metalake.type());
SecurableObject anotherMetalake =
SecurableObjects.of(
MetadataObject.Type.METALAKE,
Lists.newArrayList("metalake"),
Lists.newArrayList(Privileges.CreateCatalog.allow()));
Assertions.assertEquals(metalake, anotherMetalake);

SecurableObject catalog =
SecurableObjects.ofCatalog("catalog", Lists.newArrayList(Privileges.UseCatalog.allow()));
Assertions.assertEquals("catalog", catalog.fullName());
Expand Down
62 changes: 25 additions & 37 deletions core/src/main/java/org/apache/gravitino/GravitinoEnv.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.common.base.Preconditions;
import org.apache.gravitino.authorization.AccessControlDispatcher;
import org.apache.gravitino.authorization.AccessControlManager;
import org.apache.gravitino.authorization.AuthorizationUtils;
import org.apache.gravitino.authorization.OwnerManager;
import org.apache.gravitino.auxiliary.AuxiliaryServiceManager;
import org.apache.gravitino.catalog.CatalogDispatcher;
Expand All @@ -42,8 +41,13 @@
import org.apache.gravitino.catalog.TopicDispatcher;
import org.apache.gravitino.catalog.TopicNormalizeDispatcher;
import org.apache.gravitino.catalog.TopicOperationDispatcher;
import org.apache.gravitino.hook.DispatcherHookHelper;
import org.apache.gravitino.hook.DispatcherHooks;
import org.apache.gravitino.hook.AccessControlHookDispatcher;
import org.apache.gravitino.hook.CatalogHookDispatcher;
import org.apache.gravitino.hook.FilesetHookDispatcher;
import org.apache.gravitino.hook.MetalakeHookDispatcher;
import org.apache.gravitino.hook.SchemaHookDispatcher;
import org.apache.gravitino.hook.TableHookDispatcher;
import org.apache.gravitino.hook.TopicHookDispatcher;
import org.apache.gravitino.listener.CatalogEventDispatcher;
import org.apache.gravitino.listener.EventBus;
import org.apache.gravitino.listener.EventListenerManager;
Expand Down Expand Up @@ -348,28 +352,30 @@ private void initGravitinoServerComponents() {

// Create and initialize metalake related modules
MetalakeDispatcher metalakeManager = new MetalakeManager(entityStore, idGenerator);
MetalakeHookDispatcher metalakeHookDispatcher = new MetalakeHookDispatcher(metalakeManager);
MetalakeNormalizeDispatcher metalakeNormalizeDispatcher =
new MetalakeNormalizeDispatcher(installDispatcherHooks(metalakeManager));
new MetalakeNormalizeDispatcher(metalakeHookDispatcher);
this.metalakeDispatcher = new MetalakeEventDispatcher(eventBus, metalakeNormalizeDispatcher);

// Create and initialize Catalog related modules
this.catalogManager = new CatalogManager(config, entityStore, idGenerator);
CatalogHookDispatcher catalogHookDispatcher = new CatalogHookDispatcher(catalogManager);
CatalogNormalizeDispatcher catalogNormalizeDispatcher =
new CatalogNormalizeDispatcher(installDispatcherHooks((CatalogDispatcher) catalogManager));
new CatalogNormalizeDispatcher(catalogHookDispatcher);
this.catalogDispatcher = new CatalogEventDispatcher(eventBus, catalogNormalizeDispatcher);

SchemaOperationDispatcher schemaOperationDispatcher =
new SchemaOperationDispatcher(catalogManager, entityStore, idGenerator);
SchemaHookDispatcher schemaHookDispatcher = new SchemaHookDispatcher(schemaOperationDispatcher);
SchemaNormalizeDispatcher schemaNormalizeDispatcher =
new SchemaNormalizeDispatcher(
installDispatcherHooks((SchemaDispatcher) schemaOperationDispatcher), catalogManager);
new SchemaNormalizeDispatcher(schemaHookDispatcher, catalogManager);
this.schemaDispatcher = new SchemaEventDispatcher(eventBus, schemaNormalizeDispatcher);

TableOperationDispatcher tableOperationDispatcher =
new TableOperationDispatcher(catalogManager, entityStore, idGenerator);
TableHookDispatcher tableHookDispatcher = new TableHookDispatcher(tableOperationDispatcher);
TableNormalizeDispatcher tableNormalizeDispatcher =
new TableNormalizeDispatcher(
installDispatcherHooks((TableDispatcher) tableOperationDispatcher), catalogManager);
new TableNormalizeDispatcher(tableHookDispatcher, catalogManager);
this.tableDispatcher = new TableEventDispatcher(eventBus, tableNormalizeDispatcher);

// TODO: We can install hooks when we need, we only supports ownership post hook,
Expand All @@ -382,24 +388,27 @@ private void initGravitinoServerComponents() {

FilesetOperationDispatcher filesetOperationDispatcher =
new FilesetOperationDispatcher(catalogManager, entityStore, idGenerator);
FilesetHookDispatcher filesetHookDispatcher =
new FilesetHookDispatcher(filesetOperationDispatcher);
FilesetNormalizeDispatcher filesetNormalizeDispatcher =
new FilesetNormalizeDispatcher(
installDispatcherHooks((FilesetDispatcher) filesetOperationDispatcher), catalogManager);
new FilesetNormalizeDispatcher(filesetHookDispatcher, catalogManager);
this.filesetDispatcher = new FilesetEventDispatcher(eventBus, filesetNormalizeDispatcher);

TopicOperationDispatcher topicOperationDispatcher =
new TopicOperationDispatcher(catalogManager, entityStore, idGenerator);
TopicHookDispatcher topicHookDispatcher = new TopicHookDispatcher(topicOperationDispatcher);
TopicNormalizeDispatcher topicNormalizeDispatcher =
new TopicNormalizeDispatcher(
installDispatcherHooks((TopicDispatcher) topicOperationDispatcher), catalogManager);
new TopicNormalizeDispatcher(topicHookDispatcher, catalogManager);
this.topicDispatcher = new TopicEventDispatcher(eventBus, topicNormalizeDispatcher);

// Create and initialize access control related modules
boolean enableAuthorization = config.get(Configs.ENABLE_AUTHORIZATION);
if (enableAuthorization) {
this.accessControlDispatcher =
installDispatcherHooks(
(AccessControlDispatcher) new AccessControlManager(entityStore, idGenerator, config));
AccessControlHookDispatcher accessControlHookDispatcher =
new AccessControlHookDispatcher(
new AccessControlManager(entityStore, idGenerator, config));

this.accessControlDispatcher = accessControlHookDispatcher;
this.ownerManager = new OwnerManager(entityStore);
} else {
this.accessControlDispatcher = null;
Expand All @@ -415,25 +424,4 @@ private void initGravitinoServerComponents() {
// Tag manager
this.tagManager = new TagManager(idGenerator, entityStore);
}

// Provides a universal entrance to install dispatcher hooks. This method
// focuses the logic of installing hooks.
// We should reuse the ability of (Metalake|Schema|Table|Fileset|...)NormalizeDispatcher to avoid
// solving
// normalization names, this is useful for pre-hooks.
// so we can't install the hooks for the outside of
// (Metalake|Schema|Table|Fileset|...)NormalizeDispatcher.
private <T> T installDispatcherHooks(T manager) {
boolean enableAuthorization = config.get(Configs.ENABLE_AUTHORIZATION);
DispatcherHooks hooks = new DispatcherHooks();
if (enableAuthorization) {
AuthorizationUtils.prepareAuthorizationHooks(manager, hooks);
}

if (hooks.isEmpty()) {
return manager;
}

return DispatcherHookHelper.installHooks(manager, hooks);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.exceptions.NoSuchMetalakeException;
import org.apache.gravitino.hook.DispatcherHooks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -117,10 +116,4 @@ public static void checkRoleNamespace(Namespace namespace) {
"Role namespace must have 3 levels, the input namespace is %s",
namespace);
}

// Install some post hooks used for owner. The owner will have the all privileges
// of securable objects, users, groups, roles.
public static <T> void prepareAuthorizationHooks(T manager, DispatcherHooks hooks) {
// TODO: Refactor the post hook by adding new dispatcher
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@

/**
* OwnerManager is used for manage the owner of metadata object. The user and group don't have an
* owner
* owner. Because the post hook will call the methods. We shouldn't add the lock of the metadata
* object. Otherwise, it will cause deadlock.
*/
public class OwnerManager {
private static final Logger LOG = LoggerFactory.getLogger(OwnerManager.class);
Expand Down Expand Up @@ -71,45 +72,37 @@ public void setOwner(
if (ownerType == Owner.Type.USER) {
NameIdentifier ownerIdent = AuthorizationUtils.ofUser(metalake, ownerName);
TreeLockUtils.doWithTreeLock(
objectIdent,
ownerIdent,
LockType.READ,
() ->
TreeLockUtils.doWithTreeLock(
ownerIdent,
LockType.READ,
() -> {
store
.relationOperations()
.insertRelation(
SupportsRelationOperations.Type.OWNER_REL,
objectIdent,
MetadataObjectUtil.toEntityType(metadataObject),
ownerIdent,
Entity.EntityType.USER,
true);
return null;
}));
() -> {
store
.relationOperations()
.insertRelation(
SupportsRelationOperations.Type.OWNER_REL,
objectIdent,
MetadataObjectUtil.toEntityType(metadataObject),
ownerIdent,
Entity.EntityType.USER,
true);
return null;
});
} else if (ownerType == Owner.Type.GROUP) {
NameIdentifier ownerIdent = AuthorizationUtils.ofGroup(metalake, ownerName);
TreeLockUtils.doWithTreeLock(
objectIdent,
ownerIdent,
LockType.READ,
() ->
TreeLockUtils.doWithTreeLock(
ownerIdent,
LockType.READ,
() -> {
store
.relationOperations()
.insertRelation(
SupportsRelationOperations.Type.OWNER_REL,
objectIdent,
MetadataObjectUtil.toEntityType(metadataObject),
ownerIdent,
Entity.EntityType.GROUP,
true);
return null;
}));
() -> {
store
.relationOperations()
.insertRelation(
SupportsRelationOperations.Type.OWNER_REL,
objectIdent,
MetadataObjectUtil.toEntityType(metadataObject),
ownerIdent,
Entity.EntityType.GROUP,
true);
return null;
});
}
} catch (NoSuchEntityException nse) {
LOG.warn(
Expand Down
Loading
Loading