Skip to content

Commit

Permalink
check schema
Browse files Browse the repository at this point in the history
  • Loading branch information
Heng Qin committed Jun 6, 2024
1 parent d5ca7e7 commit c831685
Show file tree
Hide file tree
Showing 9 changed files with 310 additions and 50 deletions.
50 changes: 50 additions & 0 deletions core/src/main/java/com/datastrato/gravitino/GravitinoEnv.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ public void setAccessControlManager(AccessControlManager accessControlManager) {
this.accessControlManager = accessControlManager;
}

/**
* This method is used for testing purposes only to set the metalake dispatcher for test.
*
* @param metalakeDispatcher The metalake dispatcher to be set.
*/
@VisibleForTesting
public void setMetalakeDispatcher(MetalakeDispatcher metalakeDispatcher) {
this.metalakeDispatcher = metalakeDispatcher;
}

/**
* This method is used for testing purposes only to set the access manager for test in package
* `com.datastrato.gravitino.server.web.rest`.
Expand All @@ -132,6 +142,46 @@ public void setCatalogDispatcher(CatalogDispatcher catalogDispatcher) {
this.catalogDispatcher = catalogDispatcher;
}

/**
* This method is used for testing purposes only to set the schema dispatcher for test.
*
* @param schemaDispatcher The schema dispatcher to be set.
*/
@VisibleForTesting
public void setSchemaDispatcher(SchemaDispatcher schemaDispatcher) {
this.schemaDispatcher = schemaDispatcher;
}

/**
* This method is used for testing purposes only to set the table dispatcher for test.
*
* @param tableDispatcher The table dispatcher to be set.
*/
@VisibleForTesting
public void setTableDispatcher(TableDispatcher tableDispatcher) {
this.tableDispatcher = tableDispatcher;
}

/**
* This method is used for testing purposes only to set the topic dispatcher for test.
*
* @param topicDispatcher The topic dispatcher to be set.
*/
@VisibleForTesting
public void setTopicDispatcher(TopicDispatcher topicDispatcher) {
this.topicDispatcher = topicDispatcher;
}

/**
* This method is used for testing purposes only to set the filset dispatcher for test.
*
* @param filesetDispatcher The fileset dispatcher to be set.
*/
@VisibleForTesting
public void setFilesetDispatcher(FilesetDispatcher filesetDispatcher) {
this.filesetDispatcher = filesetDispatcher;
}

/**
* This method is used for testing purposes only to set the entity store for test in package
* `com.datastrato.gravitino.authorization`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,7 @@ public Schema alterSchema(NameIdentifier ident, SchemaChange... changes)
getHiddenPropertyNames(
catalogIdent,
HasPropertyMetadata::schemaPropertiesMetadata,
alteredSchema.properties()))
.withImported(true);
alteredSchema.properties()));
}

StringIdentifier stringId = getStringIdFromProperties(alteredSchema.properties());
Expand All @@ -226,8 +225,7 @@ public Schema alterSchema(NameIdentifier ident, SchemaChange... changes)
getHiddenPropertyNames(
catalogIdent,
HasPropertyMetadata::schemaPropertiesMetadata,
alteredSchema.properties()))
.withImported(isEntityExist(ident));
alteredSchema.properties()));
}

SchemaEntity updatedSchemaEntity =
Expand Down Expand Up @@ -255,15 +253,12 @@ public Schema alterSchema(NameIdentifier ident, SchemaChange... changes)
"UPDATE",
stringId.id());

boolean imported = updatedSchemaEntity != null;

return EntityCombinedSchema.of(alteredSchema, updatedSchemaEntity)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent,
HasPropertyMetadata::schemaPropertiesMetadata,
alteredSchema.properties()))
.withImported(imported);
alteredSchema.properties()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static com.datastrato.gravitino.rel.expressions.transforms.Transforms.EMPTY_TRANSFORM;

import com.datastrato.gravitino.EntityStore;
import com.datastrato.gravitino.GravitinoEnv;
import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.Namespace;
import com.datastrato.gravitino.StringIdentifier;
Expand Down Expand Up @@ -88,8 +89,12 @@ public Table loadTable(NameIdentifier ident) throws NoSuchTableException {
return table;
}

TreeLockUtils.doWithTreeLock(
NameIdentifier.of(ident.namespace().levels()), LockType.WRITE, () -> importTable(ident));
if (GravitinoEnv.getInstance()
.schemaDispatcher()
.schemaExists(NameIdentifier.of(ident.namespace().levels()))) {
TreeLockUtils.doWithTreeLock(
NameIdentifier.of(ident.namespace().levels()), LockType.WRITE, () -> importTable(ident));
}

return table;
}
Expand Down Expand Up @@ -231,8 +236,7 @@ public Table alterTable(NameIdentifier ident, TableChange... changes)
getHiddenPropertyNames(
getCatalogIdentifier(ident),
HasPropertyMetadata::tablePropertiesMetadata,
alteredTable.properties()))
.withImported(isEntityExist(ident));
alteredTable.properties()));
}

TableEntity updatedTableEntity =
Expand Down Expand Up @@ -267,15 +271,12 @@ public Table alterTable(NameIdentifier ident, TableChange... changes)
"UPDATE",
stringId.id());

boolean imported = updatedTableEntity != null;

return EntityCombinedTable.of(alteredTable, updatedTableEntity)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
getCatalogIdentifier(ident),
HasPropertyMetadata::tablePropertiesMetadata,
alteredTable.properties()))
.withImported(imported);
alteredTable.properties()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static com.datastrato.gravitino.catalog.PropertiesMetadataHelpers.validatePropertyForCreate;

import com.datastrato.gravitino.EntityStore;
import com.datastrato.gravitino.GravitinoEnv;
import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.Namespace;
import com.datastrato.gravitino.StringIdentifier;
Expand Down Expand Up @@ -79,8 +80,12 @@ public Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException {
return topic;
}

TreeLockUtils.doWithTreeLock(
NameIdentifier.of(ident.namespace().levels()), LockType.WRITE, () -> importTopic(ident));
if (GravitinoEnv.getInstance()
.schemaDispatcher()
.schemaExists(NameIdentifier.of(ident.namespace().levels()))) {
TreeLockUtils.doWithTreeLock(
NameIdentifier.of(ident.namespace().levels()), LockType.WRITE, () -> importTopic(ident));
}
return topic;
}

Expand Down Expand Up @@ -219,15 +224,12 @@ public Topic alterTopic(NameIdentifier ident, TopicChange... changes)
"UPDATE",
getStringIdFromProperties(alteredTopic.properties()).id());

boolean imported = updatedTopicEntity != null;

return EntityCombinedTopic.of(alteredTopic, updatedTopicEntity)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent,
HasPropertyMetadata::topicPropertiesMetadata,
alteredTopic.properties()))
.withImported(imported);
alteredTopic.properties()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.datastrato.gravitino.Config;
import com.datastrato.gravitino.Configs;
import com.datastrato.gravitino.Entity;
import com.datastrato.gravitino.GravitinoEnv;
import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.Namespace;
Expand Down Expand Up @@ -118,6 +119,74 @@ public void testCreateAndListSchemas() throws IOException {
Assertions.assertEquals("test", schema2.auditInfo().creator());
}

@Test
public void testCreateAndLoadSchema() throws IOException {
NameIdentifier schemaIdent = NameIdentifier.of(metalake, catalog, "schema20");
Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
Schema schema = dispatcher.createSchema(schemaIdent, "comment", props);
Assertions.assertEquals("schema20", schema.name());
Assertions.assertEquals("comment", schema.comment());
testProperties(props, schema.properties());

Schema loadedSchema = dispatcher.loadSchema(schemaIdent);
Assertions.assertEquals(loadedSchema.name(), schema.name());
Assertions.assertEquals(loadedSchema.comment(), schema.comment());
testProperties(loadedSchema.properties(), schema.properties());
// Audit info is gotten from the entity store
Assertions.assertEquals(AuthConstants.ANONYMOUS_USER, loadedSchema.auditInfo().creator());

// Case 2: Test if the schema is not found in entity store
doThrow(new NoSuchEntityException("mock error")).when(entityStore).get(any(), any(), any());
entityStore.delete(schemaIdent, Entity.EntityType.SCHEMA);
Schema loadedSchema1 = dispatcher.loadSchema(schemaIdent);
Assertions.assertEquals(schema.name(), loadedSchema1.name());
Assertions.assertEquals(schema.comment(), loadedSchema1.comment());
testProperties(props, loadedSchema1.properties());
// Succeed to import the topic entity
Assertions.assertTrue(entityStore.exists(schemaIdent, SCHEMA));

// Audit info is gotten from catalog, not from the entity store
Assertions.assertEquals("test", loadedSchema1.auditInfo().creator());

// Case 3: Test if entity store is failed to get the schema entity
reset(entityStore);
doThrow(new IOException()).when(entityStore).get(any(), any(), any());
entityStore.delete(schemaIdent, Entity.EntityType.SCHEMA);
Schema loadedSchema2 = dispatcher.loadSchema(schemaIdent);
// Succeed to import the topic entity
Assertions.assertTrue(entityStore.exists(schemaIdent, SCHEMA));
Assertions.assertEquals(schema.name(), loadedSchema2.name());
Assertions.assertEquals(schema.comment(), loadedSchema2.comment());
testProperties(props, loadedSchema2.properties());
// Audit info is gotten from catalog, not from the entity store
Assertions.assertEquals("test", loadedSchema2.auditInfo().creator());

// Case 4: Test if the fetched schema entity is matched.
reset(entityStore);
SchemaEntity unmatchedEntity =
SchemaEntity.builder()
.withId(1L)
.withName("schema21")
.withNamespace(Namespace.of(metalake, catalog))
.withAuditInfo(
AuditInfo.builder()
.withCreator(AuthConstants.ANONYMOUS_USER)
.withCreateTime(Instant.now())
.build())
.build();
doReturn(unmatchedEntity).when(entityStore).get(any(), any(), any());
Schema loadedSchema3 = dispatcher.loadSchema(schemaIdent);
// Succeed to import the schema entity
reset(entityStore);
SchemaEntity schemaEntity = entityStore.get(schemaIdent, SCHEMA, SchemaEntity.class);
Assertions.assertEquals("test", schemaEntity.auditInfo().creator());
Assertions.assertEquals(schema.name(), loadedSchema3.name());
Assertions.assertEquals(schema.comment(), loadedSchema3.comment());
testProperties(props, loadedSchema3.properties());
// Audit info is gotten from catalog, not from the entity store
Assertions.assertEquals("test", loadedSchema3.auditInfo().creator());
}

@Test
public void testCreateAndAlterSchema() throws IOException {
NameIdentifier schemaIdent = NameIdentifier.of(metalake, catalog, "schema21");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static com.datastrato.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL;
import static com.datastrato.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
import static com.datastrato.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
import static com.datastrato.gravitino.Entity.EntityType.SCHEMA;
import static com.datastrato.gravitino.Entity.EntityType.TABLE;
import static com.datastrato.gravitino.StringIdentifier.ID_KEY;
import static com.datastrato.gravitino.TestBasePropertiesMetadata.COMMENT_KEY;
Expand Down Expand Up @@ -59,6 +60,7 @@ public static void initialize() throws IOException {
doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL);
GravitinoEnv.getInstance().setLockManager(new LockManager(config));
GravitinoEnv.getInstance().setSchemaDispatcher(schemaOperationDispatcher);
}

@Test
Expand Down Expand Up @@ -167,15 +169,25 @@ public void testCreateAndLoadTable() throws IOException {

// Case 2: Test if the table entity is not found in the entity store
reset(entityStore);
entityStore.delete(tableIdent1, TABLE);
entityStore.delete(NameIdentifier.of(tableNs.levels()), SCHEMA);
doThrow(new NoSuchEntityException("")).when(entityStore).get(any(), any(), any());
Table loadedTable2 = tableOperationDispatcher.loadTable(tableIdent1);
// Succeed to import the topic entity
Assertions.assertTrue(entityStore.exists(NameIdentifier.of(tableNs.levels()), SCHEMA));
Assertions.assertTrue(entityStore.exists(tableIdent1, TABLE));
// Audit info is gotten from the catalog, not from the entity store
Assertions.assertEquals("test", loadedTable2.auditInfo().creator());

// Case 3: Test if the entity store is failed to get the table entity
reset(entityStore);
entityStore.delete(tableIdent1, TABLE);
entityStore.delete(NameIdentifier.of(tableNs.levels()), SCHEMA);
doThrow(new IOException()).when(entityStore).get(any(), any(), any());
Table loadedTable3 = tableOperationDispatcher.loadTable(tableIdent1);
// Succeed to import the topic entity
Assertions.assertTrue(entityStore.exists(NameIdentifier.of(tableNs.levels()), SCHEMA));
Assertions.assertTrue(entityStore.exists(tableIdent1, TABLE));
// Audit info is gotten from the catalog, not from the entity store
Assertions.assertEquals("test", loadedTable3.auditInfo().creator());

Expand All @@ -191,6 +203,10 @@ public void testCreateAndLoadTable() throws IOException {
.build();
doReturn(tableEntity).when(entityStore).get(any(), any(), any());
Table loadedTable4 = tableOperationDispatcher.loadTable(tableIdent1);
// Succeed to import the topic entity
reset(entityStore);
TableEntity tableImportedEntity = entityStore.get(tableIdent1, TABLE, TableEntity.class);
Assertions.assertEquals("test", tableImportedEntity.auditInfo().creator());
// Audit info is gotten from the catalog, not from the entity store
Assertions.assertEquals("test", loadedTable4.auditInfo().creator());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static org.mockito.Mockito.reset;

import com.datastrato.gravitino.Config;
import com.datastrato.gravitino.Entity;
import com.datastrato.gravitino.GravitinoEnv;
import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.Namespace;
Expand Down Expand Up @@ -52,6 +53,7 @@ public static void initialize() throws IOException {
doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL);
GravitinoEnv.getInstance().setLockManager(new LockManager(config));
GravitinoEnv.getInstance().setSchemaDispatcher(schemaOperationDispatcher);
}

@Test
Expand Down Expand Up @@ -103,15 +105,27 @@ public void testCreateAndLoadTopic() throws IOException {

// Case 2: Test if the topic entity is not found in the entity store
reset(entityStore);
entityStore.delete(topicIdent1, Entity.EntityType.TOPIC);
entityStore.delete(NameIdentifier.of(topicNs.levels()), Entity.EntityType.SCHEMA);
doThrow(new NoSuchEntityException("")).when(entityStore).get(any(), any(), any());
Topic loadedTopic2 = topicOperationDispatcher.loadTopic(topicIdent1);
// Succeed to import the topic entity
Assertions.assertTrue(entityStore.exists(topicIdent1, Entity.EntityType.TOPIC));
Assertions.assertTrue(
entityStore.exists(NameIdentifier.of(topicNs.levels()), Entity.EntityType.SCHEMA));
// Audit info is gotten from the catalog, not from the entity store
Assertions.assertEquals("test", loadedTopic2.auditInfo().creator());

// Case 3: Test if the entity store is failed to get the topic entity
reset(entityStore);
entityStore.delete(topicIdent1, Entity.EntityType.TOPIC);
entityStore.delete(NameIdentifier.of(topicNs.levels()), Entity.EntityType.SCHEMA);
doThrow(new IOException()).when(entityStore).get(any(), any(), any());
Topic loadedTopic3 = topicOperationDispatcher.loadTopic(topicIdent1);
// Succeed to import the topic entity
Assertions.assertTrue(
entityStore.exists(NameIdentifier.of(topicNs.levels()), Entity.EntityType.SCHEMA));
Assertions.assertTrue(entityStore.exists(topicIdent1, Entity.EntityType.TOPIC));
// Audit info is gotten from the catalog, not from the entity store
Assertions.assertEquals("test", loadedTopic3.auditInfo().creator());

Expand All @@ -127,6 +141,11 @@ public void testCreateAndLoadTopic() throws IOException {
.build();
doReturn(unmatchedEntity).when(entityStore).get(any(), any(), any());
Topic loadedTopic4 = topicOperationDispatcher.loadTopic(topicIdent1);
// Succeed to import the topic entity
reset(entityStore);
TopicEntity topicEntity =
entityStore.get(topicIdent1, Entity.EntityType.TOPIC, TopicEntity.class);
Assertions.assertEquals("test", topicEntity.auditInfo().creator());
// Audit info is gotten from the catalog, not from the entity store
Assertions.assertEquals("test", loadedTopic4.auditInfo().creator());
}
Expand Down
Loading

0 comments on commit c831685

Please sign in to comment.