Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3607] feat(core): Support to import the entities when loading entities #3623

Merged
merged 17 commits into from
Jun 18, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private TableDTO() {}
* @param properties The properties associated with the table.
* @param audit The audit information for the table.
* @param partitioning The partitioning of the table.
* @param indexes Teh indexes of the table.
* @param indexes The indexes of the table.
*/
private TableDTO(
String name,
Expand Down
35 changes: 0 additions & 35 deletions core/src/main/java/com/datastrato/gravitino/GravitinoEnv.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import com.datastrato.gravitino.metrics.source.JVMMetricsSource;
import com.datastrato.gravitino.storage.IdGenerator;
import com.datastrato.gravitino.storage.RandomIdGenerator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -98,40 +97,6 @@ public static GravitinoEnv getInstance() {
return InstanceHolder.INSTANCE;
}

/**
* This method is used for testing purposes only to set the lock manager for test in package
* `com.datastrato.gravitino.server.web.rest`, as tree lock depends on the lock manager and we did
* not mock the lock manager in the test, so we need to set the lock manager for test.
*
* @param lockManager The lock manager to be set.
*/
@VisibleForTesting
public void setLockManager(LockManager lockManager) {
this.lockManager = lockManager;
}

/**
* This method is used for testing purposes only to set the access manager for test in package
* `com.datastrato.gravitino.server.web.rest` and `com.datastrato.gravitino.authorization`.
*
* @param accessControlManager The access control manager to be set.
*/
@VisibleForTesting
public void setAccessControlManager(AccessControlManager accessControlManager) {
this.accessControlManager = accessControlManager;
}

/**
* This method is used for testing purposes only to set the entity store for test in package
* `com.datastrato.gravitino.authorization`.
*
* @param entityStore The entity store to be set.
*/
@VisibleForTesting
public void setEntityStore(EntityStore entityStore) {
this.entityStore = entityStore;
}

/**
* Initialize the Gravitino environment.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.datastrato.gravitino.Audit;
import com.datastrato.gravitino.Schema;
import com.datastrato.gravitino.StringIdentifier;
import com.datastrato.gravitino.meta.AuditInfo;
import com.datastrato.gravitino.meta.SchemaEntity;
import java.util.Map;
Expand All @@ -19,14 +20,21 @@
public final class EntityCombinedSchema implements Schema {

private final Schema schema;

private final SchemaEntity schemaEntity;

// Sets of properties that should be hidden from the user.
private Set<String> hiddenProperties;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can handle this not in this PR, we may need a class named 'combinedEntity' to unify the handling of entities from different sources.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mchades what is the meaning here, can you explain more?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean do we need to provide an abstract class CombinedEntity<T> for EntityCombinedSchema, EntityCombinedTable, and EntityCombinedTopic?


// Field "imported" is used to indicate whether the entity has been imported to Gravitino
// managed storage backend. If "imported" is true, it means that storage backend have stored
// the correct entity. Otherwise, we should import the external entity to the storage backend.
private boolean imported;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should add some comments about what it was used for. What should we do if it's true or false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Further information may be required, such as why we need to import it to the Gravitino-managed store.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Further information may be required, such as why we need to import it to the Gravitino-managed store.

@qqqttt123 do you take a look at these comments and check whether they need to be resolved?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me update the doc.


private EntityCombinedSchema(Schema schema, SchemaEntity schemaEntity) {
this.schema = schema;
this.schemaEntity = schemaEntity;
this.imported = false;
}

public static EntityCombinedSchema of(Schema schema, SchemaEntity schemaEntity) {
Expand All @@ -42,6 +50,11 @@ public EntityCombinedSchema withHiddenPropertiesSet(Set<String> hiddenProperties
return this;
}

public EntityCombinedSchema withImported(boolean imported) {
this.imported = imported;
return this;
}

@Override
public String name() {
return schema.name();
Expand Down Expand Up @@ -73,4 +86,12 @@ public Audit auditInfo() {
? schema.auditInfo()
: mergedAudit.merge(schemaEntity.auditInfo(), true /* overwrite */);
}

public boolean imported() {
return imported;
}

StringIdentifier stringIdentifier() {
return StringIdentifier.fromProperties(schema.properties());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package com.datastrato.gravitino.catalog;

import com.datastrato.gravitino.Audit;
import com.datastrato.gravitino.StringIdentifier;
import com.datastrato.gravitino.meta.AuditInfo;
import com.datastrato.gravitino.meta.TableEntity;
import com.datastrato.gravitino.rel.Column;
Expand All @@ -31,9 +32,15 @@ public final class EntityCombinedTable implements Table {
// Sets of properties that should be hidden from the user.
private Set<String> hiddenProperties;

// Field "imported" is used to indicate whether the entity has been imported to Gravitino
// managed storage backend. If "imported" is true, it means that storage backend have stored
// the correct entity. Otherwise, we should import the external entity to the storage backend.
private boolean imported;

private EntityCombinedTable(Table table, TableEntity tableEntity) {
this.table = table;
this.tableEntity = tableEntity;
this.imported = false;
}

public static EntityCombinedTable of(Table table, TableEntity tableEntity) {
Expand All @@ -49,6 +56,11 @@ public EntityCombinedTable withHiddenPropertiesSet(Set<String> hiddenProperties)
return this;
}

public EntityCombinedTable withImported(boolean imported) {
this.imported = imported;
return this;
}

@Override
public String name() {
return table.name();
Expand Down Expand Up @@ -96,6 +108,10 @@ public Index[] index() {
return table.index();
}

public boolean imported() {
return imported;
}

@Override
public Audit auditInfo() {
AuditInfo mergedAudit =
Expand All @@ -110,4 +126,8 @@ public Audit auditInfo() {
? table.auditInfo()
: mergedAudit.merge(tableEntity.auditInfo(), true /* overwrite */);
}

StringIdentifier stringIdentifier() {
return StringIdentifier.fromProperties(table.properties());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package com.datastrato.gravitino.catalog;

import com.datastrato.gravitino.Audit;
import com.datastrato.gravitino.StringIdentifier;
import com.datastrato.gravitino.messaging.Topic;
import com.datastrato.gravitino.meta.AuditInfo;
import com.datastrato.gravitino.meta.TopicEntity;
Expand All @@ -24,9 +25,15 @@ public class EntityCombinedTopic implements Topic {
// Sets of properties that should be hidden from the user.
private Set<String> hiddenProperties;

// Field "imported" is used to indicate whether the entity has been imported to Gravitino
// managed storage backend. If "imported" is true, it means that storage backend have stored
// the correct entity. Otherwise, we should import the external entity to the storage backend.
private boolean imported;

private EntityCombinedTopic(Topic topic, TopicEntity topicEntity) {
this.topic = topic;
this.topicEntity = topicEntity;
this.imported = false;
}

public static EntityCombinedTopic of(Topic topic, TopicEntity topicEntity) {
Expand All @@ -42,6 +49,11 @@ public EntityCombinedTopic withHiddenPropertiesSet(Set<String> hiddenProperties)
return this;
}

public EntityCombinedTopic withImported(boolean imported) {
this.imported = imported;
return this;
}

@Override
public String name() {
return topic.name();
Expand Down Expand Up @@ -73,4 +85,12 @@ public Audit auditInfo() {
? topic.auditInfo()
: mergedAudit.merge(topicEntity.auditInfo(), true /* overwrite */);
}

public boolean imported() {
return imported;
}

StringIdentifier stringIdentifier() {
return StringIdentifier.fromProperties(topic.properties());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import static com.datastrato.gravitino.catalog.PropertiesMetadataHelpers.validatePropertyForAlter;

import com.datastrato.gravitino.Entity;
import com.datastrato.gravitino.EntityStore;
import com.datastrato.gravitino.HasIdentifier;
import com.datastrato.gravitino.NameIdentifier;
Expand Down Expand Up @@ -44,7 +45,7 @@ public abstract class OperationDispatcher {

protected final EntityStore store;

final IdGenerator idGenerator;
protected final IdGenerator idGenerator;

/**
* Creates a new CatalogOperationDispatcher instance.
Expand All @@ -60,7 +61,21 @@ public OperationDispatcher(
this.idGenerator = idGenerator;
}

<R, E extends Throwable> R doWithTable(
protected Capability getCatalogCapability(NameIdentifier ident) {
return doWithCatalog(
getCatalogIdentifier(ident),
CatalogManager.CatalogWrapper::capabilities,
IllegalArgumentException.class);
}

protected Capability getCatalogCapability(Namespace namespace) {
return doWithCatalog(
getCatalogIdentifier(NameIdentifier.of(namespace.levels())),
CatalogManager.CatalogWrapper::capabilities,
IllegalArgumentException.class);
}

protected <R, E extends Throwable> R doWithTable(
NameIdentifier tableIdent, ThrowableFunction<SupportsPartitions, R> fn, Class<E> ex)
throws E {
try {
Expand All @@ -78,7 +93,7 @@ <R, E extends Throwable> R doWithTable(
}
}

<R, E extends Throwable> R doWithCatalog(
protected <R, E extends Throwable> R doWithCatalog(
NameIdentifier ident, ThrowableFunction<CatalogManager.CatalogWrapper, R> fn, Class<E> ex)
throws E {
try {
Expand All @@ -95,7 +110,7 @@ <R, E extends Throwable> R doWithCatalog(
}
}

<R, E1 extends Throwable, E2 extends Throwable> R doWithCatalog(
protected <R, E1 extends Throwable, E2 extends Throwable> R doWithCatalog(
NameIdentifier ident,
ThrowableFunction<CatalogManager.CatalogWrapper, R> fn,
Class<E1> ex1,
Expand All @@ -118,21 +133,7 @@ <R, E1 extends Throwable, E2 extends Throwable> R doWithCatalog(
}
}

Capability getCatalogCapability(NameIdentifier ident) {
return doWithCatalog(
getCatalogIdentifier(ident),
CatalogManager.CatalogWrapper::capabilities,
IllegalArgumentException.class);
}

Capability getCatalogCapability(Namespace namespace) {
return doWithCatalog(
getCatalogIdentifier(NameIdentifier.of(namespace.levels())),
CatalogManager.CatalogWrapper::capabilities,
IllegalArgumentException.class);
}

Set<String> getHiddenPropertyNames(
protected Set<String> getHiddenPropertyNames(
NameIdentifier catalogIdent,
ThrowableFunction<HasPropertyMetadata, PropertiesMetadata> provider,
Map<String, String> properties) {
Expand All @@ -149,7 +150,7 @@ Set<String> getHiddenPropertyNames(
IllegalArgumentException.class);
}

<T> void validateAlterProperties(
protected <T> void validateAlterProperties(
NameIdentifier ident,
ThrowableFunction<HasPropertyMetadata, PropertiesMetadata> provider,
T... changes) {
Expand All @@ -166,27 +167,6 @@ <T> void validateAlterProperties(
IllegalArgumentException.class);
}

private <T> Map<String, String> getPropertiesForSet(T... t) {
Map<String, String> properties = Maps.newHashMap();
for (T item : t) {
if (item instanceof TableChange.SetProperty) {
TableChange.SetProperty setProperty = (TableChange.SetProperty) item;
properties.put(setProperty.getProperty(), setProperty.getValue());
} else if (item instanceof SchemaChange.SetProperty) {
SchemaChange.SetProperty setProperty = (SchemaChange.SetProperty) item;
properties.put(setProperty.getProperty(), setProperty.getValue());
} else if (item instanceof FilesetChange.SetProperty) {
FilesetChange.SetProperty setProperty = (FilesetChange.SetProperty) item;
properties.put(setProperty.getProperty(), setProperty.getValue());
} else if (item instanceof TopicChange.SetProperty) {
TopicChange.SetProperty setProperty = (TopicChange.SetProperty) item;
properties.put(setProperty.getProperty(), setProperty.getValue());
}
}

return properties;
}

private <T> Map<String, String> getPropertiesForDelete(T... t) {
Map<String, String> properties = Maps.newHashMap();
for (T item : t) {
Expand All @@ -208,7 +188,7 @@ private <T> Map<String, String> getPropertiesForDelete(T... t) {
return properties;
}

StringIdentifier getStringIdFromProperties(Map<String, String> properties) {
protected StringIdentifier getStringIdFromProperties(Map<String, String> properties) {
try {
StringIdentifier stringId = StringIdentifier.fromProperties(properties);
if (stringId == null) {
Expand All @@ -221,7 +201,7 @@ StringIdentifier getStringIdFromProperties(Map<String, String> properties) {
}
}

<R extends HasIdentifier> R operateOnEntity(
protected <R extends HasIdentifier> R operateOnEntity(
NameIdentifier ident, ThrowableFunction<NameIdentifier, R> fn, String opName, long id) {
R ret = null;
try {
Expand All @@ -247,7 +227,7 @@ <R extends HasIdentifier> R operateOnEntity(

// TODO(xun): Remove this method when we implement a better way to get the catalog identifier
// [#257] Add an explicit get catalog functions in NameIdentifier
NameIdentifier getCatalogIdentifier(NameIdentifier ident) {
protected NameIdentifier getCatalogIdentifier(NameIdentifier ident) {
NameIdentifier.check(
ident.name() != null, "The name variable in the NameIdentifier must have value.");
Namespace.check(
Expand All @@ -272,6 +252,36 @@ boolean isManagedEntity(NameIdentifier catalogIdent, Capability.Scope scope) {
IllegalArgumentException.class);
}

protected boolean isEntityExist(NameIdentifier ident, Entity.EntityType type) {
try {
return store.exists(ident, type);
} catch (Exception e) {
LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "exists", ident, e);
throw new RuntimeException("Fail to check if entity is existed", e);
}
}

private <T> Map<String, String> getPropertiesForSet(T... t) {
Map<String, String> properties = Maps.newHashMap();
for (T item : t) {
if (item instanceof TableChange.SetProperty) {
TableChange.SetProperty setProperty = (TableChange.SetProperty) item;
properties.put(setProperty.getProperty(), setProperty.getValue());
} else if (item instanceof SchemaChange.SetProperty) {
SchemaChange.SetProperty setProperty = (SchemaChange.SetProperty) item;
properties.put(setProperty.getProperty(), setProperty.getValue());
} else if (item instanceof FilesetChange.SetProperty) {
FilesetChange.SetProperty setProperty = (FilesetChange.SetProperty) item;
properties.put(setProperty.getProperty(), setProperty.getValue());
} else if (item instanceof TopicChange.SetProperty) {
TopicChange.SetProperty setProperty = (TopicChange.SetProperty) item;
properties.put(setProperty.getProperty(), setProperty.getValue());
}
}

return properties;
}

static final class FormattedErrorMessages {
static final String STORE_OP_FAILURE =
"Failed to {} entity for {} in "
Expand Down
Loading
Loading