diff --git a/common/src/main/java/com/datastrato/gravitino/dto/rel/TableDTO.java b/common/src/main/java/com/datastrato/gravitino/dto/rel/TableDTO.java index 84dd54b9917..2633ce34faf 100644 --- a/common/src/main/java/com/datastrato/gravitino/dto/rel/TableDTO.java +++ b/common/src/main/java/com/datastrato/gravitino/dto/rel/TableDTO.java @@ -58,7 +58,7 @@ private TableDTO() {} * @param properties The properties associated with the table. * @param audit The audit information for the table. * @param partitioning The partitioning of the table. - * @param indexes Teh indexes of the table. + * @param indexes The indexes of the table. */ private TableDTO( String name, diff --git a/core/src/main/java/com/datastrato/gravitino/GravitinoEnv.java b/core/src/main/java/com/datastrato/gravitino/GravitinoEnv.java index a05bb3662ee..c48e0fcf02c 100644 --- a/core/src/main/java/com/datastrato/gravitino/GravitinoEnv.java +++ b/core/src/main/java/com/datastrato/gravitino/GravitinoEnv.java @@ -99,9 +99,9 @@ public static GravitinoEnv getInstance() { } /** - * This method is used for testing purposes only to set the lock manager for test in package - * `com.datastrato.gravitino.server.web.rest`, as tree lock depends on the lock manager and we did - * not mock the lock manager in the test, so we need to set the lock manager for test. + * This method is used for testing purposes only to set the lock manager for test in package as + * tree lock depends on the lock manager and we did not mock the lock manager in the test, so we + * need to set the lock manager for test. * * @param lockManager The lock manager to be set. */ @@ -121,6 +121,17 @@ public void setAccessControlManager(AccessControlManager accessControlManager) { this.accessControlManager = accessControlManager; } + /** + * This method is used for testing purposes only to set the access manager for test in package + * `com.datastrato.gravitino.server.web.rest`. + * + * @param catalogDispatcher The catalog dispatcher to be set. + */ + @VisibleForTesting + public void setCatalogDispatcher(CatalogDispatcher catalogDispatcher) { + this.catalogDispatcher = catalogDispatcher; + } + /** * This method is used for testing purposes only to set the entity store for test in package * `com.datastrato.gravitino.authorization`. diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/EntityCombinedSchema.java b/core/src/main/java/com/datastrato/gravitino/catalog/EntityCombinedSchema.java index 02e45e6eb81..f64ab40cc31 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/EntityCombinedSchema.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/EntityCombinedSchema.java @@ -8,6 +8,7 @@ import com.datastrato.gravitino.Schema; import com.datastrato.gravitino.meta.AuditInfo; import com.datastrato.gravitino.meta.SchemaEntity; +import java.util.Collections; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -23,6 +24,7 @@ public final class EntityCombinedSchema implements Schema { // Sets of properties that should be hidden from the user. private Set hiddenProperties; + private boolean imported; private EntityCombinedSchema(Schema schema, SchemaEntity schemaEntity) { this.schema = schema; @@ -42,6 +44,11 @@ public EntityCombinedSchema withHiddenPropertiesSet(Set hiddenProperties return this; } + public EntityCombinedSchema withImported(boolean imported) { + this.imported = imported; + return this; + } + @Override public String name() { return schema.name(); @@ -73,4 +80,12 @@ public Audit auditInfo() { ? schema.auditInfo() : mergedAudit.merge(schemaEntity.auditInfo(), true /* overwrite */); } + + public boolean imported() { + return imported; + } + + Map schemaProperties() { + return Collections.unmodifiableMap(schema.properties()); + } } diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/EntityCombinedTable.java b/core/src/main/java/com/datastrato/gravitino/catalog/EntityCombinedTable.java index 593508f9e6e..d0e5efacfc8 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/EntityCombinedTable.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/EntityCombinedTable.java @@ -14,6 +14,7 @@ import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; import com.datastrato.gravitino.rel.expressions.transforms.Transform; import com.datastrato.gravitino.rel.indexes.Index; +import java.util.Collections; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -30,6 +31,7 @@ public final class EntityCombinedTable implements Table { // Sets of properties that should be hidden from the user. private Set hiddenProperties; + private boolean imported; private EntityCombinedTable(Table table, TableEntity tableEntity) { this.table = table; @@ -49,6 +51,11 @@ public EntityCombinedTable withHiddenPropertiesSet(Set hiddenProperties) return this; } + public EntityCombinedTable withImported(boolean imported) { + this.imported = imported; + return this; + } + @Override public String name() { return table.name(); @@ -96,6 +103,10 @@ public Index[] index() { return table.index(); } + public boolean imported() { + return imported; + } + @Override public Audit auditInfo() { AuditInfo mergedAudit = @@ -110,4 +121,8 @@ public Audit auditInfo() { ? table.auditInfo() : mergedAudit.merge(tableEntity.auditInfo(), true /* overwrite */); } + + Map tableProperties() { + return Collections.unmodifiableMap(table.properties()); + } } diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/EntityCombinedTopic.java b/core/src/main/java/com/datastrato/gravitino/catalog/EntityCombinedTopic.java index b6fed19ec32..19bef7f63fc 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/EntityCombinedTopic.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/EntityCombinedTopic.java @@ -8,6 +8,7 @@ import com.datastrato.gravitino.messaging.Topic; import com.datastrato.gravitino.meta.AuditInfo; import com.datastrato.gravitino.meta.TopicEntity; +import java.util.Collections; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -23,6 +24,7 @@ public class EntityCombinedTopic implements Topic { // Sets of properties that should be hidden from the user. private Set hiddenProperties; + private boolean imported; private EntityCombinedTopic(Topic topic, TopicEntity topicEntity) { this.topic = topic; @@ -42,6 +44,11 @@ public EntityCombinedTopic withHiddenPropertiesSet(Set hiddenProperties) return this; } + public EntityCombinedTopic withImported(boolean imported) { + this.imported = imported; + return this; + } + @Override public String name() { return topic.name(); @@ -73,4 +80,12 @@ public Audit auditInfo() { ? topic.auditInfo() : mergedAudit.merge(topicEntity.auditInfo(), true /* overwrite */); } + + public boolean imported() { + return imported; + } + + Map topicProperties() { + return Collections.unmodifiableMap(topic.properties()); + } } diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/SchemaDispatcher.java b/core/src/main/java/com/datastrato/gravitino/catalog/SchemaDispatcher.java index e61838e9556..3d92101017a 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/SchemaDispatcher.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/SchemaDispatcher.java @@ -5,6 +5,7 @@ package com.datastrato.gravitino.catalog; +import com.datastrato.gravitino.NameIdentifier; import com.datastrato.gravitino.connector.SupportsSchemas; /** @@ -13,4 +14,6 @@ * to dispatching or handling schema-related events or actions that are not covered by the standard * {@code SupportsSchemas} operations. */ -public interface SchemaDispatcher extends SupportsSchemas {} +public interface SchemaDispatcher extends SupportsSchemas { + boolean importSchema(NameIdentifier identifier); +} diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/SchemaNormalizeDispatcher.java b/core/src/main/java/com/datastrato/gravitino/catalog/SchemaNormalizeDispatcher.java index 9c004989e80..32d1be56d76 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/SchemaNormalizeDispatcher.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/SchemaNormalizeDispatcher.java @@ -71,6 +71,12 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty applyCaseSensitive(ident, Capability.Scope.SCHEMA, dispatcher), cascade); } + @Override + public boolean importSchema(NameIdentifier identifier) { + return dispatcher.importSchema( + applyCaseSensitive(identifier, Capability.Scope.SCHEMA, dispatcher)); + } + private NameIdentifier normalizeNameIdentifier(NameIdentifier ident) { Capability capability = dispatcher.getCatalogCapability(ident); return applyCapabilities(ident, Capability.Scope.SCHEMA, capability); diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/SchemaOperationDispatcher.java b/core/src/main/java/com/datastrato/gravitino/catalog/SchemaOperationDispatcher.java index 1575ded111d..b5ab82276a2 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/SchemaOperationDispatcher.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/SchemaOperationDispatcher.java @@ -20,6 +20,8 @@ import com.datastrato.gravitino.exceptions.NoSuchSchemaException; import com.datastrato.gravitino.exceptions.NonEmptySchemaException; import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException; +import com.datastrato.gravitino.lock.LockType; +import com.datastrato.gravitino.lock.TreeLockUtils; import com.datastrato.gravitino.meta.AuditInfo; import com.datastrato.gravitino.meta.SchemaEntity; import com.datastrato.gravitino.storage.IdGenerator; @@ -159,47 +161,17 @@ public Schema createSchema(NameIdentifier ident, String comment, Map c.doWithSchemaOps(s -> s.loadSchema(ident)), - NoSuchSchemaException.class); + EntityCombinedSchema schema = + TreeLockUtils.doWithTreeLock(ident, LockType.READ, () -> loadCombinedSchema(ident)); - // If the Schema is maintained by the Gravitino's store, we don't have to load again. - boolean isManagedSchema = isManagedEntity(catalogIdentifier, Capability.Scope.SCHEMA); - if (isManagedSchema) { - return EntityCombinedSchema.of(schema) - .withHiddenPropertiesSet( - getHiddenPropertyNames( - catalogIdentifier, - HasPropertyMetadata::schemaPropertiesMetadata, - schema.properties())); + if (schema.imported()) { + return schema; } - StringIdentifier stringId = getStringIdFromProperties(schema.properties()); - // Case 1: The schema is not created by Gravitino. - if (stringId == null) { - return EntityCombinedSchema.of(schema) - .withHiddenPropertiesSet( - getHiddenPropertyNames( - catalogIdentifier, - HasPropertyMetadata::schemaPropertiesMetadata, - schema.properties())); - } + TreeLockUtils.doWithTreeLock( + NameIdentifier.of(ident.namespace().levels()), LockType.WRITE, () -> importSchema(ident)); - SchemaEntity schemaEntity = - operateOnEntity( - ident, - identifier -> store.get(identifier, SCHEMA, SchemaEntity.class), - "GET", - stringId.id()); - return EntityCombinedSchema.of(schema, schemaEntity) - .withHiddenPropertiesSet( - getHiddenPropertyNames( - catalogIdentifier, - HasPropertyMetadata::schemaPropertiesMetadata, - schema.properties())); + return schema; } /** @@ -242,7 +214,8 @@ public Schema alterSchema(NameIdentifier ident, SchemaChange... changes) getHiddenPropertyNames( catalogIdent, HasPropertyMetadata::schemaPropertiesMetadata, - alteredSchema.properties())); + alteredSchema.properties())) + .withImported(true); } StringIdentifier stringId = getStringIdFromProperties(alteredSchema.properties()); @@ -253,7 +226,8 @@ public Schema alterSchema(NameIdentifier ident, SchemaChange... changes) getHiddenPropertyNames( catalogIdent, HasPropertyMetadata::schemaPropertiesMetadata, - alteredSchema.properties())); + alteredSchema.properties())) + .withImported(isEntityExist(ident)); } SchemaEntity updatedSchemaEntity = @@ -280,12 +254,16 @@ public Schema alterSchema(NameIdentifier ident, SchemaChange... changes) .build()), "UPDATE", stringId.id()); + + boolean imported = updatedSchemaEntity != null; + return EntityCombinedSchema.of(alteredSchema, updatedSchemaEntity) .withHiddenPropertiesSet( getHiddenPropertyNames( catalogIdent, HasPropertyMetadata::schemaPropertiesMetadata, - alteredSchema.properties())); + alteredSchema.properties())) + .withImported(imported); } /** @@ -330,4 +308,104 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty ? droppedFromStore : droppedFromCatalog; } + + @Override + public boolean importSchema(NameIdentifier identifier) { + EntityCombinedSchema combinedSchema = loadCombinedSchema(identifier); + if (combinedSchema.imported()) { + return false; + } + + StringIdentifier stringId = getStringIdFromProperties(combinedSchema.schemaProperties()); + long uid; + if (stringId != null) { + // If the entity in the store doesn't match the external system, we use the data + // of external system to correct it. + uid = stringId.id(); + } else { + // If store doesn't exist entity, we sync the entity from the external system. + uid = idGenerator.nextId(); + } + + SchemaEntity schemaEntity = + SchemaEntity.builder() + .withId(uid) + .withName(identifier.name()) + .withNamespace(identifier.namespace()) + .withAuditInfo( + AuditInfo.builder() + .withCreator(combinedSchema.auditInfo().creator()) + .withCreateTime(combinedSchema.auditInfo().createTime()) + .withLastModifier(combinedSchema.auditInfo().lastModifier()) + .withLastModifiedTime(combinedSchema.auditInfo().lastModifiedTime()) + .build()) + .build(); + try { + store.put(schemaEntity, true); + } catch (Exception e) { + LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", identifier, e); + throw new RuntimeException("Fail to access underlying storage"); + } + + return true; + } + + private EntityCombinedSchema loadCombinedSchema(NameIdentifier ident) { + NameIdentifier catalogIdentifier = getCatalogIdentifier(ident); + Schema schema = + doWithCatalog( + catalogIdentifier, + c -> c.doWithSchemaOps(s -> s.loadSchema(ident)), + NoSuchSchemaException.class); + + // If the Schema is maintained by the Gravitino's store, we don't have to load again. + boolean isManagedSchema = isManagedEntity(catalogIdentifier, Capability.Scope.SCHEMA); + if (isManagedSchema) { + return EntityCombinedSchema.of(schema) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdentifier, + HasPropertyMetadata::schemaPropertiesMetadata, + schema.properties())) + .withImported(true); + } + + StringIdentifier stringId = getStringIdFromProperties(schema.properties()); + // Case 1: The schema is not created by Gravitino. + if (stringId == null) { + return EntityCombinedSchema.of(schema) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdentifier, + HasPropertyMetadata::schemaPropertiesMetadata, + schema.properties())) + .withImported(isEntityExist(ident)); + } + + SchemaEntity schemaEntity = + operateOnEntity( + ident, + identifier -> store.get(identifier, SCHEMA, SchemaEntity.class), + "GET", + stringId.id()); + + boolean imported = schemaEntity != null; + + return EntityCombinedSchema.of(schema, schemaEntity) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdentifier, + HasPropertyMetadata::schemaPropertiesMetadata, + schema.properties())) + .withImported(imported); + } + + private boolean isEntityExist(NameIdentifier ident) { + try { + return store.exists(ident, SCHEMA); + } catch (Exception e) { + LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "exists", ident, e); + throw new RuntimeException("Fail to access underlying storage"); + } + } } diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/TableDispatcher.java b/core/src/main/java/com/datastrato/gravitino/catalog/TableDispatcher.java index 7b54ccd5794..96a604d2a4f 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/TableDispatcher.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/TableDispatcher.java @@ -5,6 +5,7 @@ package com.datastrato.gravitino.catalog; +import com.datastrato.gravitino.NameIdentifier; import com.datastrato.gravitino.rel.TableCatalog; /** @@ -13,4 +14,6 @@ * dispatching or handling table-related events or actions that are not covered by the standard * {@code TableCatalog} operations. */ -public interface TableDispatcher extends TableCatalog {} +public interface TableDispatcher extends TableCatalog { + boolean importTable(NameIdentifier identifier); +} diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/TableNormalizeDispatcher.java b/core/src/main/java/com/datastrato/gravitino/catalog/TableNormalizeDispatcher.java index ac7da6cbc55..aa1730fcfe0 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/TableNormalizeDispatcher.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/TableNormalizeDispatcher.java @@ -99,6 +99,12 @@ public boolean tableExists(NameIdentifier ident) { return dispatcher.tableExists(applyCaseSensitive(ident, Capability.Scope.TABLE, dispatcher)); } + @Override + public boolean importTable(NameIdentifier identifier) { + return dispatcher.importTable( + applyCaseSensitive(identifier, Capability.Scope.TABLE, dispatcher)); + } + private NameIdentifier normalizeNameIdentifier(NameIdentifier ident) { Capability capability = dispatcher.getCatalogCapability(ident); return applyCapabilities(ident, Capability.Scope.TABLE, capability); diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/TableOperationDispatcher.java b/core/src/main/java/com/datastrato/gravitino/catalog/TableOperationDispatcher.java index df41cdc2634..314781fafd6 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/TableOperationDispatcher.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/TableOperationDispatcher.java @@ -19,6 +19,8 @@ import com.datastrato.gravitino.exceptions.NoSuchSchemaException; import com.datastrato.gravitino.exceptions.NoSuchTableException; import com.datastrato.gravitino.exceptions.TableAlreadyExistsException; +import com.datastrato.gravitino.lock.LockType; +import com.datastrato.gravitino.lock.TreeLockUtils; import com.datastrato.gravitino.meta.AuditInfo; import com.datastrato.gravitino.meta.TableEntity; import com.datastrato.gravitino.rel.Column; @@ -79,37 +81,17 @@ public NameIdentifier[] listTables(Namespace namespace) throws NoSuchSchemaExcep */ @Override public Table loadTable(NameIdentifier ident) throws NoSuchTableException { - NameIdentifier catalogIdentifier = getCatalogIdentifier(ident); - Table table = - doWithCatalog( - catalogIdentifier, - c -> c.doWithTableOps(t -> t.loadTable(ident)), - NoSuchTableException.class); + EntityCombinedTable table = + TreeLockUtils.doWithTreeLock(ident, LockType.READ, () -> loadCombinedTable(ident)); - StringIdentifier stringId = getStringIdFromProperties(table.properties()); - // Case 1: The table is not created by Gravitino. - if (stringId == null) { - return EntityCombinedTable.of(table) - .withHiddenPropertiesSet( - getHiddenPropertyNames( - catalogIdentifier, - HasPropertyMetadata::tablePropertiesMetadata, - table.properties())); + if (table.imported()) { + return table; } - TableEntity tableEntity = - operateOnEntity( - ident, - identifier -> store.get(identifier, TABLE, TableEntity.class), - "GET", - stringId.id()); + TreeLockUtils.doWithTreeLock( + NameIdentifier.of(ident.namespace().levels()), LockType.WRITE, () -> importTable(ident)); - return EntityCombinedTable.of(table, tableEntity) - .withHiddenPropertiesSet( - getHiddenPropertyNames( - catalogIdentifier, - HasPropertyMetadata::tablePropertiesMetadata, - table.properties())); + return table; } /** @@ -249,7 +231,8 @@ public Table alterTable(NameIdentifier ident, TableChange... changes) getHiddenPropertyNames( getCatalogIdentifier(ident), HasPropertyMetadata::tablePropertiesMetadata, - alteredTable.properties())); + alteredTable.properties())) + .withImported(isEntityExist(ident)); } TableEntity updatedTableEntity = @@ -284,12 +267,15 @@ public Table alterTable(NameIdentifier ident, TableChange... changes) "UPDATE", stringId.id()); + boolean imported = updatedTableEntity != null; + return EntityCombinedTable.of(alteredTable, updatedTableEntity) .withHiddenPropertiesSet( getHiddenPropertyNames( getCatalogIdentifier(ident), HasPropertyMetadata::tablePropertiesMetadata, - alteredTable.properties())); + alteredTable.properties())) + .withImported(imported); } /** @@ -379,4 +365,92 @@ public boolean purgeTable(NameIdentifier ident) throws UnsupportedOperationExcep ? droppedFromStore : droppedFromCatalog; } + + @Override + public boolean importTable(NameIdentifier identifier) { + EntityCombinedTable combinedTable = loadCombinedTable(identifier); + + if (combinedTable.imported()) { + return false; + } + + StringIdentifier stringId = getStringIdFromProperties(combinedTable.tableProperties()); + long uid; + if (stringId != null) { + // If the entity in the store doesn't match the external system, we use the data + // of external system to correct it. + uid = stringId.id(); + } else { + // If store doesn't exist entity, we sync the entity from the external system. + uid = idGenerator.nextId(); + } + + TableEntity tableEntity = + TableEntity.builder() + .withId(uid) + .withName(identifier.name()) + .withNamespace(identifier.namespace()) + .withAuditInfo( + AuditInfo.builder() + .withCreator(combinedTable.auditInfo().creator()) + .withCreateTime(combinedTable.auditInfo().createTime()) + .withLastModifier(combinedTable.auditInfo().lastModifier()) + .withLastModifiedTime(combinedTable.auditInfo().lastModifiedTime()) + .build()) + .build(); + try { + store.put(tableEntity, true); + } catch (Exception e) { + LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", identifier, e); + throw new RuntimeException("Fail to access underlying storage"); + } + return true; + } + + private boolean isEntityExist(NameIdentifier ident) { + try { + return store.exists(ident, TABLE); + } catch (Exception e) { + LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "exists", ident, e); + throw new RuntimeException("Fail to access underlying storage"); + } + } + + private EntityCombinedTable loadCombinedTable(NameIdentifier ident) { + NameIdentifier catalogIdentifier = getCatalogIdentifier(ident); + Table table = + doWithCatalog( + catalogIdentifier, + c -> c.doWithTableOps(t -> t.loadTable(ident)), + NoSuchTableException.class); + + StringIdentifier stringId = getStringIdFromProperties(table.properties()); + // Case 1: The table is not created by Gravitino. + if (stringId == null) { + return EntityCombinedTable.of(table) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdentifier, + HasPropertyMetadata::tablePropertiesMetadata, + table.properties())) + .withImported(isEntityExist(ident)); + } + + TableEntity tableEntity = + operateOnEntity( + ident, + identifier -> store.get(identifier, TABLE, TableEntity.class), + "GET", + stringId.id()); + + boolean imported = tableEntity != null; + + return EntityCombinedTable.of(table, tableEntity) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdentifier, + HasPropertyMetadata::tablePropertiesMetadata, + table.properties())) + .withImported(imported); + } } diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/TopicDispatcher.java b/core/src/main/java/com/datastrato/gravitino/catalog/TopicDispatcher.java index 131a600c621..7d96c5abf95 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/TopicDispatcher.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/TopicDispatcher.java @@ -5,6 +5,7 @@ package com.datastrato.gravitino.catalog; +import com.datastrato.gravitino.NameIdentifier; import com.datastrato.gravitino.messaging.TopicCatalog; /** @@ -13,4 +14,6 @@ * dispatching or handling topic-related events or actions that are not covered by the standard * {@code TopicCatalog} operations. */ -public interface TopicDispatcher extends TopicCatalog {} +public interface TopicDispatcher extends TopicCatalog { + boolean importTopic(NameIdentifier identifier); +} diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/TopicNormalizeDispatcher.java b/core/src/main/java/com/datastrato/gravitino/catalog/TopicNormalizeDispatcher.java index aa372b623c1..de05fc04cc4 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/TopicNormalizeDispatcher.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/TopicNormalizeDispatcher.java @@ -72,6 +72,12 @@ public boolean dropTopic(NameIdentifier ident) { return dispatcher.dropTopic(applyCaseSensitive(ident, Capability.Scope.TOPIC, dispatcher)); } + @Override + public boolean importTopic(NameIdentifier identifier) { + return dispatcher.importTopic( + applyCaseSensitive(identifier, Capability.Scope.TOPIC, dispatcher)); + } + private NameIdentifier normalizeNameIdentifier(NameIdentifier ident) { Capability capability = dispatcher.getCatalogCapability(ident); return applyCapabilities(ident, Capability.Scope.TOPIC, capability); diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/TopicOperationDispatcher.java b/core/src/main/java/com/datastrato/gravitino/catalog/TopicOperationDispatcher.java index 6d52e92d75a..998421d3cbf 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/TopicOperationDispatcher.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/TopicOperationDispatcher.java @@ -18,6 +18,8 @@ import com.datastrato.gravitino.exceptions.NoSuchSchemaException; import com.datastrato.gravitino.exceptions.NoSuchTopicException; import com.datastrato.gravitino.exceptions.TopicAlreadyExistsException; +import com.datastrato.gravitino.lock.LockType; +import com.datastrato.gravitino.lock.TreeLockUtils; import com.datastrato.gravitino.messaging.DataLayout; import com.datastrato.gravitino.messaging.Topic; import com.datastrato.gravitino.messaging.TopicChange; @@ -70,36 +72,16 @@ public NameIdentifier[] listTopics(Namespace namespace) throws NoSuchSchemaExcep */ @Override public Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException { - NameIdentifier catalogIdent = getCatalogIdentifier(ident); - Topic topic = - doWithCatalog( - catalogIdent, - c -> c.doWithTopicOps(t -> t.loadTopic(ident)), - NoSuchTopicException.class); + EntityCombinedTopic topic = + TreeLockUtils.doWithTreeLock(ident, LockType.READ, () -> loadCombinedTopic(ident)); - StringIdentifier stringId = getStringIdFromProperties(topic.properties()); - // Case 1: The topic is not created by Gravitino. - // Note: for Kafka catalog, stringId will not be null. Because there is no way to store the - // Gravitino - // ID in Kafka, therefor we use the topic ID as the Gravitino ID - if (stringId == null) { - return EntityCombinedTopic.of(topic) - .withHiddenPropertiesSet( - getHiddenPropertyNames( - catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties())); + if (topic.imported()) { + return topic; } - TopicEntity topicEntity = - operateOnEntity( - ident, - identifier -> store.get(identifier, TOPIC, TopicEntity.class), - "GET", - getStringIdFromProperties(topic.properties()).id()); - - return EntityCombinedTopic.of(topic, topicEntity) - .withHiddenPropertiesSet( - getHiddenPropertyNames( - catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties())); + TreeLockUtils.doWithTreeLock( + NameIdentifier.of(ident.namespace().levels()), LockType.WRITE, () -> importTopic(ident)); + return topic; } /** @@ -237,12 +219,15 @@ public Topic alterTopic(NameIdentifier ident, TopicChange... changes) "UPDATE", getStringIdFromProperties(alteredTopic.properties()).id()); + boolean imported = updatedTopicEntity != null; + return EntityCombinedTopic.of(alteredTopic, updatedTopicEntity) .withHiddenPropertiesSet( getHiddenPropertyNames( catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, - alteredTopic.properties())); + alteredTopic.properties())) + .withImported(imported); } /** @@ -282,4 +267,96 @@ public boolean dropTopic(NameIdentifier ident) { ? droppedFromStore : droppedFromCatalog; } + + @Override + public boolean importTopic(NameIdentifier identifier) { + + EntityCombinedTopic combinedTopic = loadCombinedTopic(identifier); + + if (combinedTopic.imported()) { + return false; + } + + StringIdentifier stringId = getStringIdFromProperties(combinedTopic.topicProperties()); + + long uid; + if (stringId != null) { + // If the entity in the store doesn't match the external system, we use the data + // of external system to correct it. + uid = stringId.id(); + } else { + // If store doesn't exist entity, we sync the entity from the external system. + uid = idGenerator.nextId(); + } + + TopicEntity topicEntity = + TopicEntity.builder() + .withId(uid) + .withName(combinedTopic.name()) + .withComment(combinedTopic.comment()) + .withNamespace(identifier.namespace()) + .withAuditInfo( + AuditInfo.builder() + .withCreator(combinedTopic.auditInfo().creator()) + .withCreateTime(combinedTopic.auditInfo().createTime()) + .withLastModifier(combinedTopic.auditInfo().lastModifier()) + .withLastModifiedTime(combinedTopic.auditInfo().lastModifiedTime()) + .build()) + .build(); + + try { + store.put(topicEntity, true); + } catch (Exception e) { + LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", identifier, e); + throw new RuntimeException("Fail to access underlying storage"); + } + + return true; + } + + private boolean isEntityExist(NameIdentifier ident) { + try { + return store.exists(ident, TOPIC); + } catch (Exception e) { + LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "exists", ident, e); + throw new RuntimeException("Fail to access underlying storage"); + } + } + + private EntityCombinedTopic loadCombinedTopic(NameIdentifier ident) { + NameIdentifier catalogIdent = getCatalogIdentifier(ident); + Topic topic = + doWithCatalog( + catalogIdent, + c -> c.doWithTopicOps(t -> t.loadTopic(ident)), + NoSuchTopicException.class); + + StringIdentifier stringId = getStringIdFromProperties(topic.properties()); + // Case 1: The topic is not created by Gravitino. + // Note: for Kafka catalog, stringId will not be null. Because there is no way to store the + // Gravitino + // ID in Kafka, therefor we use the topic ID as the Gravitino ID + if (stringId == null) { + return EntityCombinedTopic.of(topic) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties())) + .withImported(isEntityExist(ident)); + } + + TopicEntity topicEntity = + operateOnEntity( + ident, + identifier -> store.get(identifier, TOPIC, TopicEntity.class), + "GET", + getStringIdFromProperties(topic.properties()).id()); + + boolean imported = topicEntity != null; + + return EntityCombinedTopic.of(topic, topicEntity) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties())) + .withImported(imported); + } } diff --git a/core/src/main/java/com/datastrato/gravitino/listener/SchemaEventDispatcher.java b/core/src/main/java/com/datastrato/gravitino/listener/SchemaEventDispatcher.java index 50c6df4ea18..21ec9a704db 100644 --- a/core/src/main/java/com/datastrato/gravitino/listener/SchemaEventDispatcher.java +++ b/core/src/main/java/com/datastrato/gravitino/listener/SchemaEventDispatcher.java @@ -130,4 +130,9 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty throw e; } } + + @Override + public boolean importSchema(NameIdentifier identifier) { + return dispatcher.importSchema(identifier); + } } diff --git a/core/src/main/java/com/datastrato/gravitino/listener/TableEventDispatcher.java b/core/src/main/java/com/datastrato/gravitino/listener/TableEventDispatcher.java index deacbba2094..e1a78c35840 100644 --- a/core/src/main/java/com/datastrato/gravitino/listener/TableEventDispatcher.java +++ b/core/src/main/java/com/datastrato/gravitino/listener/TableEventDispatcher.java @@ -169,4 +169,9 @@ public boolean purgeTable(NameIdentifier ident) { public boolean tableExists(NameIdentifier ident) { return dispatcher.tableExists(ident); } + + @Override + public boolean importTable(NameIdentifier identifier) { + return dispatcher.importTable(identifier); + } } diff --git a/core/src/main/java/com/datastrato/gravitino/listener/TopicEventDispatcher.java b/core/src/main/java/com/datastrato/gravitino/listener/TopicEventDispatcher.java index dd628a534d2..61777703064 100644 --- a/core/src/main/java/com/datastrato/gravitino/listener/TopicEventDispatcher.java +++ b/core/src/main/java/com/datastrato/gravitino/listener/TopicEventDispatcher.java @@ -128,4 +128,9 @@ public Topic createTopic( throw e; } } + + @Override + public boolean importTopic(NameIdentifier identifier) { + return dispatcher.importTopic(identifier); + } } diff --git a/core/src/test/java/com/datastrato/gravitino/catalog/TestSchemaOperationDispatcher.java b/core/src/test/java/com/datastrato/gravitino/catalog/TestSchemaOperationDispatcher.java index c04b8e1a72b..b8e8e303fa7 100644 --- a/core/src/test/java/com/datastrato/gravitino/catalog/TestSchemaOperationDispatcher.java +++ b/core/src/test/java/com/datastrato/gravitino/catalog/TestSchemaOperationDispatcher.java @@ -11,14 +11,19 @@ import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; +import com.datastrato.gravitino.Config; +import com.datastrato.gravitino.Configs; +import com.datastrato.gravitino.GravitinoEnv; import com.datastrato.gravitino.NameIdentifier; import com.datastrato.gravitino.Namespace; import com.datastrato.gravitino.Schema; import com.datastrato.gravitino.SchemaChange; import com.datastrato.gravitino.auth.AuthConstants; import com.datastrato.gravitino.exceptions.NoSuchEntityException; +import com.datastrato.gravitino.lock.LockManager; import com.datastrato.gravitino.meta.AuditInfo; import com.datastrato.gravitino.meta.SchemaEntity; import com.google.common.collect.ImmutableMap; @@ -39,6 +44,12 @@ public class TestSchemaOperationDispatcher extends TestOperationDispatcher { @BeforeAll public static void initialize() throws IOException { dispatcher = new SchemaOperationDispatcher(catalogManager, entityStore, idGenerator); + + Config config = mock(Config.class); + doReturn(100000L).when(config).get(Configs.TREE_LOCK_MAX_NODE_IN_MEMORY); + doReturn(1000L).when(config).get(Configs.TREE_LOCK_MIN_NODE_IN_MEMORY); + doReturn(36000L).when(config).get(Configs.TREE_LOCK_CLEAN_INTERVAL); + GravitinoEnv.getInstance().setLockManager(new LockManager(config)); } @Test diff --git a/core/src/test/java/com/datastrato/gravitino/catalog/TestTableOperationDispatcher.java b/core/src/test/java/com/datastrato/gravitino/catalog/TestTableOperationDispatcher.java index 83cee605232..f3c1324b671 100644 --- a/core/src/test/java/com/datastrato/gravitino/catalog/TestTableOperationDispatcher.java +++ b/core/src/test/java/com/datastrato/gravitino/catalog/TestTableOperationDispatcher.java @@ -4,6 +4,9 @@ */ package com.datastrato.gravitino.catalog; +import static com.datastrato.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL; +import static com.datastrato.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY; +import static com.datastrato.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY; import static com.datastrato.gravitino.Entity.EntityType.TABLE; import static com.datastrato.gravitino.StringIdentifier.ID_KEY; import static com.datastrato.gravitino.TestBasePropertiesMetadata.COMMENT_KEY; @@ -11,13 +14,17 @@ import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; +import com.datastrato.gravitino.Config; +import com.datastrato.gravitino.GravitinoEnv; import com.datastrato.gravitino.NameIdentifier; import com.datastrato.gravitino.Namespace; import com.datastrato.gravitino.TestColumn; import com.datastrato.gravitino.auth.AuthConstants; import com.datastrato.gravitino.exceptions.NoSuchEntityException; +import com.datastrato.gravitino.lock.LockManager; import com.datastrato.gravitino.meta.AuditInfo; import com.datastrato.gravitino.meta.TableEntity; import com.datastrato.gravitino.rel.Column; @@ -46,6 +53,12 @@ public static void initialize() throws IOException { new SchemaOperationDispatcher(catalogManager, entityStore, idGenerator); tableOperationDispatcher = new TableOperationDispatcher(catalogManager, entityStore, idGenerator); + + Config config = mock(Config.class); + doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY); + doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY); + doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL); + GravitinoEnv.getInstance().setLockManager(new LockManager(config)); } @Test diff --git a/core/src/test/java/com/datastrato/gravitino/catalog/TestTopicOperationDispatcher.java b/core/src/test/java/com/datastrato/gravitino/catalog/TestTopicOperationDispatcher.java index fd06db7614a..ccf55f1a19d 100644 --- a/core/src/test/java/com/datastrato/gravitino/catalog/TestTopicOperationDispatcher.java +++ b/core/src/test/java/com/datastrato/gravitino/catalog/TestTopicOperationDispatcher.java @@ -4,18 +4,25 @@ */ package com.datastrato.gravitino.catalog; +import static com.datastrato.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL; +import static com.datastrato.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY; +import static com.datastrato.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY; import static com.datastrato.gravitino.StringIdentifier.ID_KEY; import static com.datastrato.gravitino.TestBasePropertiesMetadata.COMMENT_KEY; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.reset; +import com.datastrato.gravitino.Config; +import com.datastrato.gravitino.GravitinoEnv; import com.datastrato.gravitino.NameIdentifier; import com.datastrato.gravitino.Namespace; import com.datastrato.gravitino.auth.AuthConstants; import com.datastrato.gravitino.exceptions.NoSuchEntityException; +import com.datastrato.gravitino.lock.LockManager; import com.datastrato.gravitino.messaging.Topic; import com.datastrato.gravitino.messaging.TopicChange; import com.datastrato.gravitino.meta.AuditInfo; @@ -39,6 +46,12 @@ public static void initialize() throws IOException { new SchemaOperationDispatcher(catalogManager, entityStore, idGenerator); topicOperationDispatcher = new TopicOperationDispatcher(catalogManager, entityStore, idGenerator); + + Config config = mock(Config.class); + doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY); + doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY); + doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL); + GravitinoEnv.getInstance().setLockManager(new LockManager(config)); } @Test diff --git a/server/src/main/java/com/datastrato/gravitino/server/web/rest/RoleOperations.java b/server/src/main/java/com/datastrato/gravitino/server/web/rest/RoleOperations.java index 147fa42abaa..bfc84ee0936 100644 --- a/server/src/main/java/com/datastrato/gravitino/server/web/rest/RoleOperations.java +++ b/server/src/main/java/com/datastrato/gravitino/server/web/rest/RoleOperations.java @@ -7,15 +7,23 @@ import com.codahale.metrics.annotation.ResponseMetered; import com.codahale.metrics.annotation.Timed; import com.datastrato.gravitino.GravitinoEnv; +import com.datastrato.gravitino.MetadataObject; +import com.datastrato.gravitino.NameIdentifier; import com.datastrato.gravitino.authorization.AccessControlManager; import com.datastrato.gravitino.authorization.Privilege; import com.datastrato.gravitino.authorization.Privileges; import com.datastrato.gravitino.authorization.SecurableObject; import com.datastrato.gravitino.authorization.SecurableObjects; +import com.datastrato.gravitino.catalog.SchemaDispatcher; +import com.datastrato.gravitino.catalog.TableDispatcher; +import com.datastrato.gravitino.catalog.TopicDispatcher; +import com.datastrato.gravitino.dto.authorization.SecurableObjectDTO; import com.datastrato.gravitino.dto.requests.RoleCreateRequest; import com.datastrato.gravitino.dto.responses.DeleteResponse; import com.datastrato.gravitino.dto.responses.RoleResponse; import com.datastrato.gravitino.dto.util.DTOConverters; +import com.datastrato.gravitino.lock.LockType; +import com.datastrato.gravitino.lock.TreeLockUtils; import com.datastrato.gravitino.metrics.MetricNames; import com.datastrato.gravitino.server.web.Utils; import com.google.common.base.Preconditions; @@ -77,6 +85,10 @@ public Response createRole(@PathParam("metalake") String metalake, RoleCreateReq request.getSecurableObjects() != null && request.getSecurableObjects().length == 1, "The size of securable objects must be 1"); + for (SecurableObjectDTO object : request.getSecurableObjects()) { + checkSecurableObjects(metalake, object); + } + return Utils.doAs( httpRequest, () -> { @@ -132,4 +144,75 @@ public Response deleteRole( return ExceptionHandlers.handleRoleException(OperationType.DELETE, role, metalake, e); } } + + // Check every securable object whether exists and is imported. + private void checkSecurableObjects(String metalake, SecurableObjectDTO object) { + NameIdentifier identifier; + + // Securable object ignores the metalake namespace, so we should add it back. + if (object.type() == MetadataObject.Type.METALAKE) { + identifier = NameIdentifier.parse(object.fullName()); + } else { + identifier = NameIdentifier.parse(String.format("%s.%s", metalake, object.fullName())); + } + + String existErrMsg = "Securable object % doesn't exist"; + + TreeLockUtils.doWithTreeLock( + identifier, + LockType.READ, + () -> { + switch (object.type()) { + case METALAKE: + if (!GravitinoEnv.getInstance().metalakeDispatcher().metalakeExists(identifier)) { + throw new IllegalArgumentException(String.format(existErrMsg, object.fullName())); + } + + break; + case CATALOG: + if (!GravitinoEnv.getInstance().catalogDispatcher().catalogExists(identifier)) { + throw new IllegalArgumentException(String.format(existErrMsg, object.fullName())); + } + + break; + case SCHEMA: + { + SchemaDispatcher dispatcher = GravitinoEnv.getInstance().schemaDispatcher(); + if (!dispatcher.schemaExists(identifier)) { + throw new IllegalArgumentException(String.format(existErrMsg, object.fullName())); + } + break; + } + case FILESET: + if (!GravitinoEnv.getInstance().filesetDispatcher().filesetExists(identifier)) { + throw new IllegalArgumentException(String.format(existErrMsg, object.fullName())); + } + + break; + case TABLE: + { + TableDispatcher dispatcher = GravitinoEnv.getInstance().tableDispatcher(); + if (!dispatcher.tableExists(identifier)) { + throw new IllegalArgumentException(String.format(existErrMsg, object.fullName())); + } + + break; + } + case TOPIC: + { + TopicDispatcher dispatcher = GravitinoEnv.getInstance().topicDispatcher(); + if (!dispatcher.topicExists(identifier)) { + throw new IllegalArgumentException(String.format(existErrMsg, object.fullName())); + } + + break; + } + default: + throw new IllegalArgumentException( + String.format("Doesn't support the type %s", object.type())); + } + + return null; + }); + } } diff --git a/server/src/main/java/com/datastrato/gravitino/server/web/rest/SchemaOperations.java b/server/src/main/java/com/datastrato/gravitino/server/web/rest/SchemaOperations.java index 4bdcb1c09c3..33531102b7c 100644 --- a/server/src/main/java/com/datastrato/gravitino/server/web/rest/SchemaOperations.java +++ b/server/src/main/java/com/datastrato/gravitino/server/web/rest/SchemaOperations.java @@ -133,9 +133,7 @@ public Response loadSchema( httpRequest, () -> { NameIdentifier ident = NameIdentifierUtil.ofSchema(metalake, catalog, schema); - Schema s = - TreeLockUtils.doWithTreeLock( - ident, LockType.READ, () -> dispatcher.loadSchema(ident)); + Schema s = dispatcher.loadSchema(ident); Response response = Utils.ok(new SchemaResponse(DTOConverters.toDTO(s))); LOG.info("Schema loaded: {}.{}.{}", metalake, catalog, s.name()); return response; diff --git a/server/src/main/java/com/datastrato/gravitino/server/web/rest/TableOperations.java b/server/src/main/java/com/datastrato/gravitino/server/web/rest/TableOperations.java index b0093fd2b9d..42630da8c36 100644 --- a/server/src/main/java/com/datastrato/gravitino/server/web/rest/TableOperations.java +++ b/server/src/main/java/com/datastrato/gravitino/server/web/rest/TableOperations.java @@ -148,9 +148,7 @@ public Response loadTable( httpRequest, () -> { NameIdentifier ident = NameIdentifierUtil.ofTable(metalake, catalog, schema, table); - Table t = - TreeLockUtils.doWithTreeLock( - ident, LockType.READ, () -> dispatcher.loadTable(ident)); + Table t = dispatcher.loadTable(ident); Response response = Utils.ok(new TableResponse(DTOConverters.toDTO(t))); LOG.info("Table loaded: {}.{}.{}.{}", metalake, catalog, schema, table); return response; diff --git a/server/src/main/java/com/datastrato/gravitino/server/web/rest/TopicOperations.java b/server/src/main/java/com/datastrato/gravitino/server/web/rest/TopicOperations.java index c49552ac8b5..1932585ef36 100644 --- a/server/src/main/java/com/datastrato/gravitino/server/web/rest/TopicOperations.java +++ b/server/src/main/java/com/datastrato/gravitino/server/web/rest/TopicOperations.java @@ -142,9 +142,7 @@ public Response loadTopic( () -> { LOG.info("Loading topic: {}.{}.{}.{}", metalake, catalog, schema, topic); NameIdentifier ident = NameIdentifierUtil.ofTopic(metalake, catalog, schema, topic); - Topic t = - TreeLockUtils.doWithTreeLock( - ident, LockType.READ, () -> dispatcher.loadTopic(ident)); + Topic t = dispatcher.loadTopic(ident); Response response = Utils.ok(new TopicResponse(DTOConverters.toDTO(t))); LOG.info("Topic loaded: {}.{}.{}.{}", metalake, catalog, schema, topic); return response; diff --git a/server/src/test/java/com/datastrato/gravitino/server/web/rest/TestRoleOperations.java b/server/src/test/java/com/datastrato/gravitino/server/web/rest/TestRoleOperations.java index c32e201e85f..e8cdced23e6 100644 --- a/server/src/test/java/com/datastrato/gravitino/server/web/rest/TestRoleOperations.java +++ b/server/src/test/java/com/datastrato/gravitino/server/web/rest/TestRoleOperations.java @@ -19,6 +19,7 @@ import com.datastrato.gravitino.authorization.Role; import com.datastrato.gravitino.authorization.SecurableObject; import com.datastrato.gravitino.authorization.SecurableObjects; +import com.datastrato.gravitino.catalog.CatalogDispatcher; import com.datastrato.gravitino.dto.authorization.RoleDTO; import com.datastrato.gravitino.dto.authorization.SecurableObjectDTO; import com.datastrato.gravitino.dto.requests.RoleCreateRequest; @@ -55,6 +56,7 @@ public class TestRoleOperations extends JerseyTest { private static final AccessControlManager manager = mock(AccessControlManager.class); + private static final CatalogDispatcher dispatcher = mock(CatalogDispatcher.class); private static class MockServletRequestFactory extends ServletRequestFactoryBase { @Override @@ -73,6 +75,7 @@ public static void setup() { Mockito.doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL); GravitinoEnv.getInstance().setLockManager(new LockManager(config)); GravitinoEnv.getInstance().setAccessControlManager(manager); + GravitinoEnv.getInstance().setCatalogDispatcher(dispatcher); } @Override @@ -110,6 +113,7 @@ public void testCreateRole() { Role role = buildRole("role1"); when(manager.createRole(any(), any(), any(), any())).thenReturn(role); + when(dispatcher.catalogExists(any())).thenReturn(true); Response resp = target("/metalakes/metalake1/roles")