Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3607] feat(core): Support to import the entities when loading entities #3623

Merged
merged 17 commits into from
Jun 18, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private TableDTO() {}
* @param properties The properties associated with the table.
* @param audit The audit information for the table.
* @param partitioning The partitioning of the table.
* @param indexes Teh indexes of the table.
* @param indexes The indexes of the table.
*/
private TableDTO(
String name,
Expand Down
67 changes: 64 additions & 3 deletions core/src/main/java/com/datastrato/gravitino/GravitinoEnv.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ public static GravitinoEnv getInstance() {
}

/**
* This method is used for testing purposes only to set the lock manager for test in package
* `com.datastrato.gravitino.server.web.rest`, as tree lock depends on the lock manager and we did
* not mock the lock manager in the test, so we need to set the lock manager for test.
* This method is used for testing purposes only to set the lock manager for test in package as
* tree lock depends on the lock manager and we did not mock the lock manager in the test, so we
* need to set the lock manager for test.
*
* @param lockManager The lock manager to be set.
*/
Expand All @@ -121,6 +121,67 @@ public void setAccessControlManager(AccessControlManager accessControlManager) {
this.accessControlManager = accessControlManager;
}

/**
* This method is used for testing purposes only to set the metalake dispatcher for test.
*
* @param metalakeDispatcher The metalake dispatcher to be set.
*/
@VisibleForTesting
public void setMetalakeDispatcher(MetalakeDispatcher metalakeDispatcher) {
this.metalakeDispatcher = metalakeDispatcher;
}

/**
* This method is used for testing purposes only to set the access manager for test in package
* `com.datastrato.gravitino.server.web.rest`.
*
* @param catalogDispatcher The catalog dispatcher to be set.
*/
@VisibleForTesting
public void setCatalogDispatcher(CatalogDispatcher catalogDispatcher) {
this.catalogDispatcher = catalogDispatcher;
}

/**
* This method is used for testing purposes only to set the schema dispatcher for test.
*
* @param schemaDispatcher The schema dispatcher to be set.
*/
@VisibleForTesting
public void setSchemaDispatcher(SchemaDispatcher schemaDispatcher) {
this.schemaDispatcher = schemaDispatcher;
}

/**
* This method is used for testing purposes only to set the table dispatcher for test.
*
* @param tableDispatcher The table dispatcher to be set.
*/
@VisibleForTesting
public void setTableDispatcher(TableDispatcher tableDispatcher) {
this.tableDispatcher = tableDispatcher;
}

/**
* This method is used for testing purposes only to set the topic dispatcher for test.
*
* @param topicDispatcher The topic dispatcher to be set.
*/
@VisibleForTesting
public void setTopicDispatcher(TopicDispatcher topicDispatcher) {
this.topicDispatcher = topicDispatcher;
}

/**
* This method is used for testing purposes only to set the filset dispatcher for test.
*
* @param filesetDispatcher The fileset dispatcher to be set.
*/
@VisibleForTesting
public void setFilesetDispatcher(FilesetDispatcher filesetDispatcher) {
this.filesetDispatcher = filesetDispatcher;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if you can use mock or junit reflection solution to test these interfaces, instead of create bunch of methods only for tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can have a try.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java reflection can do the same thing, but I'm not sure whether it's more elegant that the current implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


/**
* This method is used for testing purposes only to set the entity store for test in package
* `com.datastrato.gravitino.authorization`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.datastrato.gravitino.Schema;
import com.datastrato.gravitino.meta.AuditInfo;
import com.datastrato.gravitino.meta.SchemaEntity;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -23,6 +24,7 @@ public final class EntityCombinedSchema implements Schema {

// Sets of properties that should be hidden from the user.
private Set<String> hiddenProperties;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can handle this not in this PR, we may need a class named 'combinedEntity' to unify the handling of entities from different sources.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mchades what is the meaning here, can you explain more?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean do we need to provide an abstract class CombinedEntity<T> for EntityCombinedSchema, EntityCombinedTable, and EntityCombinedTopic?

private boolean imported;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should add some comments about what it was used for. What should we do if it's true or false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Further information may be required, such as why we need to import it to the Gravitino-managed store.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Further information may be required, such as why we need to import it to the Gravitino-managed store.

@qqqttt123 do you take a look at these comments and check whether they need to be resolved?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me update the doc.


private EntityCombinedSchema(Schema schema, SchemaEntity schemaEntity) {
this.schema = schema;
Expand All @@ -42,6 +44,11 @@ public EntityCombinedSchema withHiddenPropertiesSet(Set<String> hiddenProperties
return this;
}

public EntityCombinedSchema withImported(boolean imported) {
this.imported = imported;
return this;
}

@Override
public String name() {
return schema.name();
Expand Down Expand Up @@ -73,4 +80,12 @@ public Audit auditInfo() {
? schema.auditInfo()
: mergedAudit.merge(schemaEntity.auditInfo(), true /* overwrite */);
}

public boolean imported() {
return imported;
}

Map<String, String> schemaProperties() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need this method, can you use properties()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need a get StringIdentifier.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should define a better method for your requirement, also is it package public or public?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Package public is enough now.

return Collections.unmodifiableMap(schema.properties());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.datastrato.gravitino.rel.expressions.sorts.SortOrder;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.indexes.Index;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -30,6 +31,7 @@ public final class EntityCombinedTable implements Table {

// Sets of properties that should be hidden from the user.
private Set<String> hiddenProperties;
private boolean imported;

private EntityCombinedTable(Table table, TableEntity tableEntity) {
this.table = table;
Expand All @@ -49,6 +51,11 @@ public EntityCombinedTable withHiddenPropertiesSet(Set<String> hiddenProperties)
return this;
}

public EntityCombinedTable withImported(boolean imported) {
this.imported = imported;
return this;
}

@Override
public String name() {
return table.name();
Expand Down Expand Up @@ -96,6 +103,10 @@ public Index[] index() {
return table.index();
}

public boolean imported() {
return imported;
}

@Override
public Audit auditInfo() {
AuditInfo mergedAudit =
Expand All @@ -110,4 +121,8 @@ public Audit auditInfo() {
? table.auditInfo()
: mergedAudit.merge(tableEntity.auditInfo(), true /* overwrite */);
}

Map<String, String> tableProperties() {
return Collections.unmodifiableMap(table.properties());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.datastrato.gravitino.messaging.Topic;
import com.datastrato.gravitino.meta.AuditInfo;
import com.datastrato.gravitino.meta.TopicEntity;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -23,6 +24,7 @@ public class EntityCombinedTopic implements Topic {

// Sets of properties that should be hidden from the user.
private Set<String> hiddenProperties;
private boolean imported;

private EntityCombinedTopic(Topic topic, TopicEntity topicEntity) {
this.topic = topic;
Expand All @@ -42,6 +44,11 @@ public EntityCombinedTopic withHiddenPropertiesSet(Set<String> hiddenProperties)
return this;
}

public EntityCombinedTopic withImported(boolean imported) {
this.imported = imported;
return this;
}

@Override
public String name() {
return topic.name();
Expand Down Expand Up @@ -73,4 +80,12 @@ public Audit auditInfo() {
? topic.auditInfo()
: mergedAudit.merge(topicEntity.auditInfo(), true /* overwrite */);
}

public boolean imported() {
return imported;
}

Map<String, String> topicProperties() {
return Collections.unmodifiableMap(topic.properties());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.datastrato.gravitino.exceptions.NoSuchSchemaException;
import com.datastrato.gravitino.exceptions.NonEmptySchemaException;
import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException;
import com.datastrato.gravitino.lock.LockType;
import com.datastrato.gravitino.lock.TreeLockUtils;
import com.datastrato.gravitino.meta.AuditInfo;
import com.datastrato.gravitino.meta.SchemaEntity;
import com.datastrato.gravitino.storage.IdGenerator;
Expand Down Expand Up @@ -159,47 +161,17 @@ public Schema createSchema(NameIdentifier ident, String comment, Map<String, Str
*/
@Override
public Schema loadSchema(NameIdentifier ident) throws NoSuchSchemaException {
NameIdentifier catalogIdentifier = getCatalogIdentifier(ident);
Schema schema =
doWithCatalog(
catalogIdentifier,
c -> c.doWithSchemaOps(s -> s.loadSchema(ident)),
NoSuchSchemaException.class);
EntityCombinedSchema schema =
TreeLockUtils.doWithTreeLock(ident, LockType.READ, () -> loadCombinedSchema(ident));

// If the Schema is maintained by the Gravitino's store, we don't have to load again.
boolean isManagedSchema = isManagedEntity(catalogIdentifier, Capability.Scope.SCHEMA);
if (isManagedSchema) {
return EntityCombinedSchema.of(schema)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdentifier,
HasPropertyMetadata::schemaPropertiesMetadata,
schema.properties()));
if (schema == null || schema.imported()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should throw a NoSuchSchemaException instead of return null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you really think that method will return null, can you please carefully check the code?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the exists method, it will assume that the entity will return null. I keep the same behaviour with it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method definition here will throw a NoSuchSchemaException instead of returning null, you should follow this, the method definition doesn't return null, instead it throws an exception.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, do you really check that the code above will return null or throw an exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the method won't return null. I will modify here. I just obey the same style as the method exists.

Copy link
Contributor Author

@qqqttt123 qqqttt123 Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ever thought that we assume that this method may return null, I add the check of null.

  default boolean tableExists(NameIdentifier ident) {
    try {
      return loadTable(ident) != null;
    } catch (NoSuchTableException e) {
      return false;
    }
  }

Do we need to modify here, too?

return schema;
}

StringIdentifier stringId = getStringIdFromProperties(schema.properties());
// Case 1: The schema is not created by Gravitino.
if (stringId == null) {
return EntityCombinedSchema.of(schema)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdentifier,
HasPropertyMetadata::schemaPropertiesMetadata,
schema.properties()));
}
TreeLockUtils.doWithTreeLock(
NameIdentifier.of(ident.namespace().levels()), LockType.WRITE, () -> importSchema(ident));

SchemaEntity schemaEntity =
operateOnEntity(
ident,
identifier -> store.get(identifier, SCHEMA, SchemaEntity.class),
"GET",
stringId.id());
return EntityCombinedSchema.of(schema, schemaEntity)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdentifier,
HasPropertyMetadata::schemaPropertiesMetadata,
schema.properties()));
return schema;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you return a schema you got previously here, IIUC, should we return a schema after it is imported?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If schema is imported, we don't need to import it again, so we can return it directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, what I mean is that you will have a new entity combined schema in importSchema(), you should return the new one, not the old one. If you don't understand what I'm talking about, just ping me offline.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has this comment been resolved? I have the same question, I think here should return EntityCombinedSchema after import.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After digging into the code, the difference of schema before and after import is only about AuditInfo, and since the imported entity doesn't have the new AuditInfo, so schema is the same before and after import, no need to create a new one. It is also the same for table and topic. @mchades .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. If there is no change in the auditInfo then it is acceptable not to use imported EntityCombinedSchema here.

}

/**
Expand Down Expand Up @@ -280,6 +252,7 @@ public Schema alterSchema(NameIdentifier ident, SchemaChange... changes)
.build()),
"UPDATE",
stringId.id());

return EntityCombinedSchema.of(alteredSchema, updatedSchemaEntity)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
Expand Down Expand Up @@ -330,4 +303,103 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty
? droppedFromStore
: droppedFromCatalog;
}

private boolean importSchema(NameIdentifier identifier) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You didn't leverage the return value of this method, so why do you need to define a return value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, Maybe I should add some logs.

EntityCombinedSchema combinedSchema = loadCombinedSchema(identifier);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to loadCombinedSchema here again?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The table may be imported by other thread before we use write lock after we released read lock.

if (combinedSchema.imported()) {
return false;
}

StringIdentifier stringId = getStringIdFromProperties(combinedSchema.schemaProperties());
long uid;
if (stringId != null) {
// If the entity in the store doesn't match the external system, we use the data
// of external system to correct it.
uid = stringId.id();
} else {
// If store doesn't exist entity, we sync the entity from the external system.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct this comment's grammar.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

uid = idGenerator.nextId();
}

SchemaEntity schemaEntity =
SchemaEntity.builder()
.withId(uid)
.withName(identifier.name())
.withNamespace(identifier.namespace())
.withAuditInfo(
AuditInfo.builder()
.withCreator(combinedSchema.auditInfo().creator())
.withCreateTime(combinedSchema.auditInfo().createTime())
.withLastModifier(combinedSchema.auditInfo().lastModifier())
.withLastModifiedTime(combinedSchema.auditInfo().lastModifiedTime())
.build())
.build();
try {
store.put(schemaEntity, true);
} catch (Exception e) {
LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", identifier, e);
throw new RuntimeException("Fail to access underlying storage");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to pass the exception e to RuntimeException.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I pass the e, users will find a situation.

When they load a load table, it may throw a write exception.

}

return true;
}

private EntityCombinedSchema loadCombinedSchema(NameIdentifier ident) {
NameIdentifier catalogIdentifier = getCatalogIdentifier(ident);
Schema schema =
doWithCatalog(
catalogIdentifier,
c -> c.doWithSchemaOps(s -> s.loadSchema(ident)),
NoSuchSchemaException.class);

// If the Schema is maintained by the Gravitino's store, we don't have to load again.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't understand why we do not need to load schema again as s.loadSchema only load schema from backend storage

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fileset schema is only stored in the backend storage.

boolean isManagedSchema = isManagedEntity(catalogIdentifier, Capability.Scope.SCHEMA);
if (isManagedSchema) {
return EntityCombinedSchema.of(schema)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdentifier,
HasPropertyMetadata::schemaPropertiesMetadata,
schema.properties()))
.withImported(true);
}

StringIdentifier stringId = getStringIdFromProperties(schema.properties());
// Case 1: The schema is not created by Gravitino.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The schema is not created by Gravitino or the backend storage does not support storing string identifiers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You didn't update as what @yuqi1129 mentioned here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I revert the change by mistake. I have done again.

if (stringId == null) {
return EntityCombinedSchema.of(schema)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdentifier,
HasPropertyMetadata::schemaPropertiesMetadata,
schema.properties()))
.withImported(isEntityExist(ident));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to check isEntityExist, what kind of scenario do you need to cover?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First case: PG schema doesn't have properties. So we should judge it whether is imported by enitity exist instead of StringIdentifier.
Second case: If some table isn't created by us, they don't have stringIdentifier. But if we import it, the entity exist.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should comment on it, not explained here when we ask.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I will.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

SchemaEntity schemaEntity =
operateOnEntity(
ident,
identifier -> store.get(identifier, SCHEMA, SchemaEntity.class),
"GET",
stringId.id());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For entity store, it throws a NoSuchEntityException instead of returning null. So the code below is not valid.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

operateOnEntity will catch NoSuchEntityException and return null.


boolean imported = schemaEntity != null;

return EntityCombinedSchema.of(schema, schemaEntity)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you check that two uid are different here?

Copy link
Contributor

@jerqi jerqi Jun 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need. If two uids are different. The schemaEntity will be null. We will overwrite the entity using external system's data.

.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdentifier,
HasPropertyMetadata::schemaPropertiesMetadata,
schema.properties()))
.withImported(imported);
}

private boolean isEntityExist(NameIdentifier ident) {
try {
return store.exists(ident, SCHEMA);
} catch (Exception e) {
LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "exists", ident, e);
throw new RuntimeException("Fail to access underlying storage");
}
}
}
Loading
Loading