From 30b505319fa9dc443125e5acd83f874a98665edd Mon Sep 17 00:00:00 2001 From: Heng Qin Date: Wed, 29 May 2024 18:49:54 +0800 Subject: [PATCH] [ISSUE-3607] Support to sync the entities --- .../java/com/datastrato/gravitino/Schema.java | 4 + .../datastrato/gravitino/messaging/Topic.java | 4 + .../com/datastrato/gravitino/rel/Table.java | 4 + .../gravitino/dto/MetadataObjectDTO.java | 111 +++++++++++++ .../datastrato/gravitino/dto/SchemaDTO.java | 25 ++- .../gravitino/dto/messaging/TopicDTO.java | 23 ++- .../gravitino/dto/rel/TableDTO.java | 25 ++- .../gravitino/dto/requests/ImportRequest.java | 52 +++++++ .../dto/responses/ImportResponse.java | 43 +++++ .../gravitino/dto/util/DTOConverters.java | 3 + .../catalog/EntityCombinedSchema.java | 16 ++ .../catalog/EntityCombinedTable.java | 16 ++ .../catalog/EntityCombinedTopic.java | 16 ++ .../gravitino/catalog/SchemaDispatcher.java | 7 +- .../catalog/SchemaNormalizeDispatcher.java | 12 ++ .../catalog/SchemaOperationDispatcher.java | 147 +++++++++++++----- .../gravitino/catalog/TableDispatcher.java | 7 +- .../catalog/TableNormalizeDispatcher.java | 12 ++ .../catalog/TableOperationDispatcher.java | 125 +++++++++++---- .../gravitino/catalog/TopicDispatcher.java | 7 +- .../catalog/TopicNormalizeDispatcher.java | 12 ++ .../catalog/TopicOperationDispatcher.java | 128 +++++++++++---- .../listener/SchemaEventDispatcher.java | 10 ++ .../listener/TableEventDispatcher.java | 10 ++ .../listener/TopicEventDispatcher.java | 10 ++ .../server/web/rest/ExceptionHandlers.java | 33 ++++ .../server/web/rest/ImportOperations.java | 72 +++++++++ .../server/web/rest/OperationType.java | 3 +- .../server/web/rest/RoleOperations.java | 83 +++++++++- 29 files changed, 906 insertions(+), 114 deletions(-) create mode 100644 common/src/main/java/com/datastrato/gravitino/dto/MetadataObjectDTO.java create mode 100644 common/src/main/java/com/datastrato/gravitino/dto/requests/ImportRequest.java create mode 100644 common/src/main/java/com/datastrato/gravitino/dto/responses/ImportResponse.java create mode 100644 server/src/main/java/com/datastrato/gravitino/server/web/rest/ImportOperations.java diff --git a/api/src/main/java/com/datastrato/gravitino/Schema.java b/api/src/main/java/com/datastrato/gravitino/Schema.java index b910ebdef3a..e3785b230a8 100644 --- a/api/src/main/java/com/datastrato/gravitino/Schema.java +++ b/api/src/main/java/com/datastrato/gravitino/Schema.java @@ -33,4 +33,8 @@ default String comment() { default Map properties() { return Collections.emptyMap(); } + + default boolean imported() { + return false; + } } diff --git a/api/src/main/java/com/datastrato/gravitino/messaging/Topic.java b/api/src/main/java/com/datastrato/gravitino/messaging/Topic.java index aa356c09c8e..c8f83a9a430 100644 --- a/api/src/main/java/com/datastrato/gravitino/messaging/Topic.java +++ b/api/src/main/java/com/datastrato/gravitino/messaging/Topic.java @@ -34,4 +34,8 @@ default String comment() { default Map properties() { return Collections.emptyMap(); } + + default boolean imported() { + return false; + } } diff --git a/api/src/main/java/com/datastrato/gravitino/rel/Table.java b/api/src/main/java/com/datastrato/gravitino/rel/Table.java index 4efd91a5b63..6c6da299493 100644 --- a/api/src/main/java/com/datastrato/gravitino/rel/Table.java +++ b/api/src/main/java/com/datastrato/gravitino/rel/Table.java @@ -81,4 +81,8 @@ default Map properties() { default SupportsPartitions supportPartitions() throws UnsupportedOperationException { throw new UnsupportedOperationException("Table does not support partition operations."); } + + default boolean imported() { + return false; + } } diff --git a/common/src/main/java/com/datastrato/gravitino/dto/MetadataObjectDTO.java b/common/src/main/java/com/datastrato/gravitino/dto/MetadataObjectDTO.java new file mode 100644 index 00000000000..48124298aa3 --- /dev/null +++ b/common/src/main/java/com/datastrato/gravitino/dto/MetadataObjectDTO.java @@ -0,0 +1,111 @@ +package com.datastrato.gravitino.dto; + +import com.datastrato.gravitino.MetadataObject; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import javax.annotation.Nullable; +import org.apache.commons.lang3.StringUtils; + +public class MetadataObjectDTO implements MetadataObject { + + @JsonProperty("fullName") + private String fullName; + + @JsonProperty("type") + private Type type; + + private String parent; + private String name; + + /** Default constructor for Jackson deserialization. */ + protected MetadataObjectDTO() {} + + /** + * Creates a new instance of MetadataObject DTO. + * + * @param fullName The name of the MetadataObject DTO. + * @param type The type of the meta data object. + */ + protected MetadataObjectDTO(String fullName, Type type) { + this.type = type; + this.fullName = fullName; + int index = fullName.lastIndexOf("."); + + if (index == -1) { + this.parent = null; + this.name = fullName; + } else { + this.parent = fullName.substring(0, index); + this.name = fullName.substring(index + 1); + } + } + + @Nullable + @Override + public String parent() { + return parent; + } + + @Override + public String name() { + return name; + } + + @Override + public String fullName() { + return fullName; + } + + @Override + public Type type() { + return type; + } + + /** @return the builder for creating a new instance of MetadataObjectDTO. */ + public static Builder builder() { + return new Builder(); + } + + /** Builder for {@link MetadataObjectDTO}. */ + public static class Builder { + private String fullName; + private Type type; + + /** + * Sets the full name of the meta data object. + * + * @param fullName The full name of the meta data object. + * @return The builder instance. + */ + public Builder withFullName(String fullName) { + this.fullName = fullName; + return this; + } + + /** + * Sets the type of the meta data object. + * + * @param type The type of the meta data object. + * @return The builder instance. + */ + public Builder withType(Type type) { + this.type = type; + return this; + } + + /** + * Builds an instance of MetadataObjectDTO using the builder's properties. + * + * @return An instance of MetadataObjectDTO. + * @throws IllegalArgumentException If the full name or type are not set. + */ + public MetadataObjectDTO build() { + Preconditions.checkArgument( + StringUtils.isNotBlank(fullName), "full name cannot be null or empty"); + + Preconditions.checkArgument(type != null, "type cannot be null"); + + return new MetadataObjectDTO(fullName, type); + } + } +} diff --git a/common/src/main/java/com/datastrato/gravitino/dto/SchemaDTO.java b/common/src/main/java/com/datastrato/gravitino/dto/SchemaDTO.java index c07a9abdbf1..68e66cad0a5 100644 --- a/common/src/main/java/com/datastrato/gravitino/dto/SchemaDTO.java +++ b/common/src/main/java/com/datastrato/gravitino/dto/SchemaDTO.java @@ -21,6 +21,9 @@ public class SchemaDTO implements Schema { @JsonProperty("properties") private Map properties; + @JsonProperty("imported") + private boolean imported; + @JsonProperty("audit") private AuditDTO audit; @@ -34,10 +37,16 @@ private SchemaDTO() {} * @param properties The properties associated with the schema. * @param audit The audit information for the schema. */ - private SchemaDTO(String name, String comment, Map properties, AuditDTO audit) { + private SchemaDTO( + String name, + String comment, + Map properties, + boolean imported, + AuditDTO audit) { this.name = name; this.comment = comment; this.properties = properties; + this.imported = imported; this.audit = audit; } @@ -59,6 +68,11 @@ public Map properties() { return properties; } + @Override + public boolean imported() { + return imported; + } + /** @return The audit information for the schema. */ @Override public AuditDTO auditInfo() { @@ -79,6 +93,8 @@ public static class Builder { protected Map properties; /** The audit information for the schema. */ protected AuditDTO audit; + /** The imported status for the schema. */ + protected boolean imported; /** Constructs a new Builder instance. */ private Builder() {} @@ -127,6 +143,11 @@ public S withAudit(AuditDTO audit) { return (S) this; } + public S withImported(boolean imported) { + this.imported = imported; + return (S) this; + } + /** * Builds a Schema DTO based on the provided builder parameters. * @@ -137,7 +158,7 @@ public SchemaDTO build() { Preconditions.checkArgument(name != null && !name.isEmpty(), "name cannot be null or empty"); Preconditions.checkArgument(audit != null, "audit cannot be null"); - return new SchemaDTO(name, comment, properties, audit); + return new SchemaDTO(name, comment, properties, imported, audit); } } diff --git a/common/src/main/java/com/datastrato/gravitino/dto/messaging/TopicDTO.java b/common/src/main/java/com/datastrato/gravitino/dto/messaging/TopicDTO.java index f060f3d98f1..8a98769a2f7 100644 --- a/common/src/main/java/com/datastrato/gravitino/dto/messaging/TopicDTO.java +++ b/common/src/main/java/com/datastrato/gravitino/dto/messaging/TopicDTO.java @@ -29,6 +29,9 @@ public static Builder builder() { @JsonProperty("properties") private Map properties; + @JsonProperty("imported") + private boolean imported; + @JsonProperty("audit") private AuditDTO audit; @@ -42,7 +45,12 @@ private TopicDTO() {} * @param properties The properties associated with the topic. * @param audit The audit information for the topic. */ - private TopicDTO(String name, String comment, Map properties, AuditDTO audit) { + private TopicDTO( + String name, + String comment, + Map properties, + boolean imported, + AuditDTO audit) { this.name = name; this.comment = comment; this.properties = properties; @@ -65,6 +73,11 @@ public Map properties() { return properties; } + @Override + public boolean imported() { + return imported; + } + @Override public Audit auditInfo() { return audit; @@ -82,12 +95,13 @@ public boolean equals(Object o) { return Objects.equals(name, topicDTO.name) && Objects.equals(comment, topicDTO.comment) && Objects.equals(properties, topicDTO.properties) + && Objects.equals(imported, topicDTO.imported) && Objects.equals(audit, topicDTO.audit); } @Override public int hashCode() { - return Objects.hash(name, comment, properties, audit); + return Objects.hash(name, comment, properties, imported, audit); } /** A builder for constructing a Topic DTO. */ @@ -142,6 +156,11 @@ public Builder withAudit(AuditDTO audit) { return this; } + public Builder withImported(boolean imported) { + topic.imported = imported; + return this; + } + /** @return The constructed Topic DTO. */ public TopicDTO build() { return topic; diff --git a/common/src/main/java/com/datastrato/gravitino/dto/rel/TableDTO.java b/common/src/main/java/com/datastrato/gravitino/dto/rel/TableDTO.java index 84dd54b9917..bec662acd77 100644 --- a/common/src/main/java/com/datastrato/gravitino/dto/rel/TableDTO.java +++ b/common/src/main/java/com/datastrato/gravitino/dto/rel/TableDTO.java @@ -47,6 +47,9 @@ public class TableDTO implements Table { @JsonProperty("indexes") private IndexDTO[] indexes; + @JsonProperty("imported") + private boolean imported; + private TableDTO() {} /** @@ -58,7 +61,8 @@ private TableDTO() {} * @param properties The properties associated with the table. * @param audit The audit information for the table. * @param partitioning The partitioning of the table. - * @param indexes Teh indexes of the table. + * @param indexes The indexes of the table. + * @param imported The table is whether to be imported to Gravitino. */ private TableDTO( String name, @@ -69,7 +73,8 @@ private TableDTO( Partitioning[] partitioning, DistributionDTO distribution, SortOrderDTO[] sortOrderDTOs, - IndexDTO[] indexes) { + IndexDTO[] indexes, + boolean imported) { this.name = name; this.comment = comment; this.columns = columns; @@ -79,6 +84,7 @@ private TableDTO( this.sortOrders = sortOrderDTOs; this.partitioning = partitioning; this.indexes = indexes; + this.imported = imported; } /** @return The name of the table. */ @@ -135,6 +141,11 @@ public Index[] index() { return indexes; } + @Override + public boolean imported() { + return imported; + } + /** * Creates a new Builder to build a Table DTO. * @@ -169,6 +180,8 @@ public static class Builder { /** The indexes of the table. */ protected IndexDTO[] indexes; + protected boolean imported; + /** Default constructor. */ private Builder() {} @@ -271,6 +284,11 @@ public S withIndex(IndexDTO[] indexes) { return (S) this; } + public S withImported(boolean imported) { + this.imported = imported; + return (S) this; + } + /** * Builds a Table DTO based on the provided builder parameters. * @@ -292,7 +310,8 @@ public TableDTO build() { Partitioning, distributionDTO, sortOrderDTOs, - indexes); + indexes, + imported); } /** diff --git a/common/src/main/java/com/datastrato/gravitino/dto/requests/ImportRequest.java b/common/src/main/java/com/datastrato/gravitino/dto/requests/ImportRequest.java new file mode 100644 index 00000000000..b0934e503b4 --- /dev/null +++ b/common/src/main/java/com/datastrato/gravitino/dto/requests/ImportRequest.java @@ -0,0 +1,52 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.dto.requests; + +import com.datastrato.gravitino.dto.MetadataObjectDTO; +import com.datastrato.gravitino.rest.RESTRequest; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; +import lombok.extern.jackson.Jacksonized; + +@Getter +@EqualsAndHashCode +@ToString +@Builder +@Jacksonized +public class ImportRequest implements RESTRequest { + + @JsonProperty("objects") + MetadataObjectDTO[] objects; + + /** Default constructor for ImportRequest. (Used for Jackson deserialization.) */ + public ImportRequest() { + this(null); + } + + /** + * Creates a new ImportRequest. + * + * @param objects The objects need to import. + */ + public ImportRequest(MetadataObjectDTO[] objects) { + this.objects = objects; + } + + /** + * Validates the {@link ImportRequest} request. + * + * @throws IllegalArgumentException If the request is invalid, this exception is thrown. + */ + @Override + public void validate() throws IllegalArgumentException { + Preconditions.checkArgument( + objects != null && objects.length != 0, + "\"objects\" field is required and cannot be empty"); + } +} diff --git a/common/src/main/java/com/datastrato/gravitino/dto/responses/ImportResponse.java b/common/src/main/java/com/datastrato/gravitino/dto/responses/ImportResponse.java new file mode 100644 index 00000000000..50be0a8adc3 --- /dev/null +++ b/common/src/main/java/com/datastrato/gravitino/dto/responses/ImportResponse.java @@ -0,0 +1,43 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.dto.responses; + +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +/** Represents a response for an import operation. */ +@ToString +@EqualsAndHashCode(callSuper = true) +public class ImportResponse extends BaseResponse { + + @JsonProperty("imported") + private final boolean imported; + + /** + * Constructor for ImportResponse. + * + * @param imported Whether the import operation was successful. + */ + public ImportResponse(boolean imported) { + super(0); + this.imported = imported; + } + + /** Default constructor for ImportResponse (used by Jackson deserializer). */ + public ImportResponse() { + super(); + this.imported = false; + } + + /** + * Returns whether the import operation was successful. + * + * @return True if the import operation was successful, otherwise false. + */ + public boolean imported() { + return imported; + } +} diff --git a/common/src/main/java/com/datastrato/gravitino/dto/util/DTOConverters.java b/common/src/main/java/com/datastrato/gravitino/dto/util/DTOConverters.java index c140112d59d..7be8c009aa0 100644 --- a/common/src/main/java/com/datastrato/gravitino/dto/util/DTOConverters.java +++ b/common/src/main/java/com/datastrato/gravitino/dto/util/DTOConverters.java @@ -188,6 +188,7 @@ public static SchemaDTO toDTO(Schema schema) { .withComment(schema.comment()) .withProperties(schema.properties()) .withAudit(toDTO(schema.auditInfo())) + .withImported(schema.imported()) .build(); } @@ -230,6 +231,7 @@ public static TableDTO toDTO(Table table) { .withAudit(toDTO(table.auditInfo())) .withPartitioning(toDTOs(table.partitioning())) .withIndex(toDTOs(table.index())) + .withImported(table.imported()) .build(); } @@ -526,6 +528,7 @@ public static TopicDTO toDTO(Topic topic) { .withName(topic.name()) .withComment(topic.comment()) .withProperties(topic.properties()) + .withImported(topic.imported()) .withAudit(toDTO(topic.auditInfo())) .build(); } diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/EntityCombinedSchema.java b/core/src/main/java/com/datastrato/gravitino/catalog/EntityCombinedSchema.java index 02e45e6eb81..35752e9e195 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/EntityCombinedSchema.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/EntityCombinedSchema.java @@ -8,6 +8,7 @@ import com.datastrato.gravitino.Schema; import com.datastrato.gravitino.meta.AuditInfo; import com.datastrato.gravitino.meta.SchemaEntity; +import java.util.Collections; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -23,6 +24,7 @@ public final class EntityCombinedSchema implements Schema { // Sets of properties that should be hidden from the user. private Set hiddenProperties; + private boolean imported; private EntityCombinedSchema(Schema schema, SchemaEntity schemaEntity) { this.schema = schema; @@ -42,6 +44,11 @@ public EntityCombinedSchema withHiddenPropertiesSet(Set hiddenProperties return this; } + public EntityCombinedSchema withImported(boolean imported) { + this.imported = imported; + return this; + } + @Override public String name() { return schema.name(); @@ -73,4 +80,13 @@ public Audit auditInfo() { ? schema.auditInfo() : mergedAudit.merge(schemaEntity.auditInfo(), true /* overwrite */); } + + @Override + public boolean imported() { + return imported; + } + + Map schemaProperties() { + return Collections.unmodifiableMap(schema.properties()); + } } diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/EntityCombinedTable.java b/core/src/main/java/com/datastrato/gravitino/catalog/EntityCombinedTable.java index 593508f9e6e..6b37edacae4 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/EntityCombinedTable.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/EntityCombinedTable.java @@ -14,6 +14,7 @@ import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; import com.datastrato.gravitino.rel.expressions.transforms.Transform; import com.datastrato.gravitino.rel.indexes.Index; +import java.util.Collections; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -30,6 +31,7 @@ public final class EntityCombinedTable implements Table { // Sets of properties that should be hidden from the user. private Set hiddenProperties; + private boolean imported; private EntityCombinedTable(Table table, TableEntity tableEntity) { this.table = table; @@ -49,6 +51,11 @@ public EntityCombinedTable withHiddenPropertiesSet(Set hiddenProperties) return this; } + public EntityCombinedTable withImported(boolean imported) { + this.imported = imported; + return this; + } + @Override public String name() { return table.name(); @@ -96,6 +103,11 @@ public Index[] index() { return table.index(); } + @Override + public boolean imported() { + return imported; + } + @Override public Audit auditInfo() { AuditInfo mergedAudit = @@ -110,4 +122,8 @@ public Audit auditInfo() { ? table.auditInfo() : mergedAudit.merge(tableEntity.auditInfo(), true /* overwrite */); } + + Map tableProperties() { + return Collections.unmodifiableMap(table.properties()); + } } diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/EntityCombinedTopic.java b/core/src/main/java/com/datastrato/gravitino/catalog/EntityCombinedTopic.java index b6fed19ec32..9d2c270ce06 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/EntityCombinedTopic.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/EntityCombinedTopic.java @@ -8,6 +8,7 @@ import com.datastrato.gravitino.messaging.Topic; import com.datastrato.gravitino.meta.AuditInfo; import com.datastrato.gravitino.meta.TopicEntity; +import java.util.Collections; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -23,6 +24,7 @@ public class EntityCombinedTopic implements Topic { // Sets of properties that should be hidden from the user. private Set hiddenProperties; + private boolean imported; private EntityCombinedTopic(Topic topic, TopicEntity topicEntity) { this.topic = topic; @@ -42,6 +44,11 @@ public EntityCombinedTopic withHiddenPropertiesSet(Set hiddenProperties) return this; } + public EntityCombinedTopic withImported(boolean imported) { + this.imported = imported; + return this; + } + @Override public String name() { return topic.name(); @@ -73,4 +80,13 @@ public Audit auditInfo() { ? topic.auditInfo() : mergedAudit.merge(topicEntity.auditInfo(), true /* overwrite */); } + + @Override + public boolean imported() { + return imported; + } + + Map topicProperties() { + return Collections.unmodifiableMap(topic.properties()); + } } diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/SchemaDispatcher.java b/core/src/main/java/com/datastrato/gravitino/catalog/SchemaDispatcher.java index e61838e9556..e33a6405491 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/SchemaDispatcher.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/SchemaDispatcher.java @@ -5,6 +5,7 @@ package com.datastrato.gravitino.catalog; +import com.datastrato.gravitino.NameIdentifier; import com.datastrato.gravitino.connector.SupportsSchemas; /** @@ -13,4 +14,8 @@ * to dispatching or handling schema-related events or actions that are not covered by the standard * {@code SupportsSchemas} operations. */ -public interface SchemaDispatcher extends SupportsSchemas {} +public interface SchemaDispatcher extends SupportsSchemas { + boolean importSchema(NameIdentifier identifier); + + boolean schemaImported(NameIdentifier identifier); +} diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/SchemaNormalizeDispatcher.java b/core/src/main/java/com/datastrato/gravitino/catalog/SchemaNormalizeDispatcher.java index 9c004989e80..b80e429bfcd 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/SchemaNormalizeDispatcher.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/SchemaNormalizeDispatcher.java @@ -71,6 +71,18 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty applyCaseSensitive(ident, Capability.Scope.SCHEMA, dispatcher), cascade); } + @Override + public boolean importSchema(NameIdentifier identifier) { + return dispatcher.importSchema( + applyCaseSensitive(identifier, Capability.Scope.SCHEMA, dispatcher)); + } + + @Override + public boolean schemaImported(NameIdentifier identifier) { + return dispatcher.schemaImported( + applyCaseSensitive(identifier, Capability.Scope.SCHEMA, dispatcher)); + } + private NameIdentifier normalizeNameIdentifier(NameIdentifier ident) { Capability capability = dispatcher.getCatalogCapability(ident); return applyCapabilities(ident, Capability.Scope.SCHEMA, capability); diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/SchemaOperationDispatcher.java b/core/src/main/java/com/datastrato/gravitino/catalog/SchemaOperationDispatcher.java index 1575ded111d..edf9e0b7cfd 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/SchemaOperationDispatcher.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/SchemaOperationDispatcher.java @@ -159,47 +159,7 @@ public Schema createSchema(NameIdentifier ident, String comment, Map c.doWithSchemaOps(s -> s.loadSchema(ident)), - NoSuchSchemaException.class); - - // If the Schema is maintained by the Gravitino's store, we don't have to load again. - boolean isManagedSchema = isManagedEntity(catalogIdentifier, Capability.Scope.SCHEMA); - if (isManagedSchema) { - return EntityCombinedSchema.of(schema) - .withHiddenPropertiesSet( - getHiddenPropertyNames( - catalogIdentifier, - HasPropertyMetadata::schemaPropertiesMetadata, - schema.properties())); - } - - StringIdentifier stringId = getStringIdFromProperties(schema.properties()); - // Case 1: The schema is not created by Gravitino. - if (stringId == null) { - return EntityCombinedSchema.of(schema) - .withHiddenPropertiesSet( - getHiddenPropertyNames( - catalogIdentifier, - HasPropertyMetadata::schemaPropertiesMetadata, - schema.properties())); - } - - SchemaEntity schemaEntity = - operateOnEntity( - ident, - identifier -> store.get(identifier, SCHEMA, SchemaEntity.class), - "GET", - stringId.id()); - return EntityCombinedSchema.of(schema, schemaEntity) - .withHiddenPropertiesSet( - getHiddenPropertyNames( - catalogIdentifier, - HasPropertyMetadata::schemaPropertiesMetadata, - schema.properties())); + return loadCombinedSchema(ident); } /** @@ -330,4 +290,109 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty ? droppedFromStore : droppedFromCatalog; } + + @Override + public boolean importSchema(NameIdentifier identifier) { + EntityCombinedSchema combinedSchema = loadCombinedSchema(identifier); + if (combinedSchema.imported()) { + return false; + } + + StringIdentifier stringId = getStringIdFromProperties(combinedSchema.schemaProperties()); + long uid; + if (stringId != null) { + // If the entity in the store doesn't match the external system, we use the data + // of external system to correct it. + uid = stringId.id(); + } else { + // If store doesn't exist entity, we sync the entity from the external system. + uid = idGenerator.nextId(); + } + + SchemaEntity schemaEntity = + SchemaEntity.builder() + .withId(uid) + .withName(identifier.name()) + .withNamespace(identifier.namespace()) + .withAuditInfo( + AuditInfo.builder() + .withCreator(combinedSchema.auditInfo().creator()) + .withCreateTime(combinedSchema.auditInfo().createTime()) + .withLastModifier(combinedSchema.auditInfo().lastModifier()) + .withLastModifiedTime(combinedSchema.auditInfo().lastModifiedTime()) + .build()) + .build(); + try { + store.put(schemaEntity, true); + } catch (Exception e) { + LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", identifier, e); + throw new RuntimeException("Fail to access underlying storage"); + } + + return true; + } + + @Override + public boolean schemaImported(NameIdentifier identifier) { + return loadCombinedSchema(identifier).imported(); + } + + private EntityCombinedSchema loadCombinedSchema(NameIdentifier ident) { + NameIdentifier catalogIdentifier = getCatalogIdentifier(ident); + Schema schema = + doWithCatalog( + catalogIdentifier, + c -> c.doWithSchemaOps(s -> s.loadSchema(ident)), + NoSuchSchemaException.class); + + // If the Schema is maintained by the Gravitino's store, we don't have to load again. + boolean isManagedSchema = isManagedEntity(catalogIdentifier, Capability.Scope.SCHEMA); + if (isManagedSchema) { + return EntityCombinedSchema.of(schema) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdentifier, + HasPropertyMetadata::schemaPropertiesMetadata, + schema.properties())) + .withImported(true); + } + + StringIdentifier stringId = getStringIdFromProperties(schema.properties()); + // Case 1: The schema is not created by Gravitino. + if (stringId == null) { + return EntityCombinedSchema.of(schema) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdentifier, + HasPropertyMetadata::schemaPropertiesMetadata, + schema.properties())) + .withImported(isEntityExist(ident)); + } + + SchemaEntity schemaEntity = + operateOnEntity( + ident, + identifier -> store.get(identifier, SCHEMA, SchemaEntity.class), + "GET", + stringId.id()); + + boolean imported = schemaEntity != null; + + return EntityCombinedSchema.of(schema, schemaEntity) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdentifier, + HasPropertyMetadata::schemaPropertiesMetadata, + schema.properties())) + .withImported(imported); + } + + private boolean isEntityExist(NameIdentifier ident) { + try { + return store.exists(ident, SCHEMA); + } catch (Exception e) { + LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "exists", ident, e); + throw new RuntimeException("Fail to access underlying storage"); + } + } } diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/TableDispatcher.java b/core/src/main/java/com/datastrato/gravitino/catalog/TableDispatcher.java index 7b54ccd5794..cbbeb9b7fbd 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/TableDispatcher.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/TableDispatcher.java @@ -5,6 +5,7 @@ package com.datastrato.gravitino.catalog; +import com.datastrato.gravitino.NameIdentifier; import com.datastrato.gravitino.rel.TableCatalog; /** @@ -13,4 +14,8 @@ * dispatching or handling table-related events or actions that are not covered by the standard * {@code TableCatalog} operations. */ -public interface TableDispatcher extends TableCatalog {} +public interface TableDispatcher extends TableCatalog { + boolean importTable(NameIdentifier identifier); + + boolean tableImported(NameIdentifier identifier); +} diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/TableNormalizeDispatcher.java b/core/src/main/java/com/datastrato/gravitino/catalog/TableNormalizeDispatcher.java index ac7da6cbc55..742a1f96ac1 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/TableNormalizeDispatcher.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/TableNormalizeDispatcher.java @@ -99,6 +99,18 @@ public boolean tableExists(NameIdentifier ident) { return dispatcher.tableExists(applyCaseSensitive(ident, Capability.Scope.TABLE, dispatcher)); } + @Override + public boolean importTable(NameIdentifier identifier) { + return dispatcher.importTable( + applyCaseSensitive(identifier, Capability.Scope.TABLE, dispatcher)); + } + + @Override + public boolean tableImported(NameIdentifier identifier) { + return dispatcher.tableImported( + applyCaseSensitive(identifier, Capability.Scope.TABLE, dispatcher)); + } + private NameIdentifier normalizeNameIdentifier(NameIdentifier ident) { Capability capability = dispatcher.getCatalogCapability(ident); return applyCapabilities(ident, Capability.Scope.TABLE, capability); diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/TableOperationDispatcher.java b/core/src/main/java/com/datastrato/gravitino/catalog/TableOperationDispatcher.java index df41cdc2634..e1da80d872e 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/TableOperationDispatcher.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/TableOperationDispatcher.java @@ -79,37 +79,7 @@ public NameIdentifier[] listTables(Namespace namespace) throws NoSuchSchemaExcep */ @Override public Table loadTable(NameIdentifier ident) throws NoSuchTableException { - NameIdentifier catalogIdentifier = getCatalogIdentifier(ident); - Table table = - doWithCatalog( - catalogIdentifier, - c -> c.doWithTableOps(t -> t.loadTable(ident)), - NoSuchTableException.class); - - StringIdentifier stringId = getStringIdFromProperties(table.properties()); - // Case 1: The table is not created by Gravitino. - if (stringId == null) { - return EntityCombinedTable.of(table) - .withHiddenPropertiesSet( - getHiddenPropertyNames( - catalogIdentifier, - HasPropertyMetadata::tablePropertiesMetadata, - table.properties())); - } - - TableEntity tableEntity = - operateOnEntity( - ident, - identifier -> store.get(identifier, TABLE, TableEntity.class), - "GET", - stringId.id()); - - return EntityCombinedTable.of(table, tableEntity) - .withHiddenPropertiesSet( - getHiddenPropertyNames( - catalogIdentifier, - HasPropertyMetadata::tablePropertiesMetadata, - table.properties())); + return loadCombinedTable(ident); } /** @@ -379,4 +349,97 @@ public boolean purgeTable(NameIdentifier ident) throws UnsupportedOperationExcep ? droppedFromStore : droppedFromCatalog; } + + @Override + public boolean importTable(NameIdentifier identifier) { + EntityCombinedTable combinedTable = loadCombinedTable(identifier); + + if (combinedTable.imported()) { + return false; + } + + StringIdentifier stringId = getStringIdFromProperties(combinedTable.tableProperties()); + long uid; + if (stringId != null) { + // If the entity in the store doesn't match the external system, we use the data + // of external system to correct it. + uid = stringId.id(); + } else { + // If store doesn't exist entity, we sync the entity from the external system. + uid = idGenerator.nextId(); + } + + TableEntity tableEntity = + TableEntity.builder() + .withId(uid) + .withName(identifier.name()) + .withNamespace(identifier.namespace()) + .withAuditInfo( + AuditInfo.builder() + .withCreator(combinedTable.auditInfo().creator()) + .withCreateTime(combinedTable.auditInfo().createTime()) + .withLastModifier(combinedTable.auditInfo().lastModifier()) + .withLastModifiedTime(combinedTable.auditInfo().lastModifiedTime()) + .build()) + .build(); + try { + store.put(tableEntity, true); + } catch (Exception e) { + LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", identifier, e); + throw new RuntimeException("Fail to access underlying storage"); + } + return true; + } + + @Override + public boolean tableImported(NameIdentifier identifier) { + return loadCombinedTable(identifier).imported(); + } + + private boolean isEntityExist(NameIdentifier ident) { + try { + return store.exists(ident, TABLE); + } catch (Exception e) { + LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "exists", ident, e); + throw new RuntimeException("Fail to access underlying storage"); + } + } + + private EntityCombinedTable loadCombinedTable(NameIdentifier ident) { + NameIdentifier catalogIdentifier = getCatalogIdentifier(ident); + Table table = + doWithCatalog( + catalogIdentifier, + c -> c.doWithTableOps(t -> t.loadTable(ident)), + NoSuchTableException.class); + + StringIdentifier stringId = getStringIdFromProperties(table.properties()); + // Case 1: The table is not created by Gravitino. + if (stringId == null) { + return EntityCombinedTable.of(table) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdentifier, + HasPropertyMetadata::tablePropertiesMetadata, + table.properties())) + .withImported(isEntityExist(ident)); + } + + TableEntity tableEntity = + operateOnEntity( + ident, + identifier -> store.get(identifier, TABLE, TableEntity.class), + "GET", + stringId.id()); + + boolean imported = tableEntity != null; + + return EntityCombinedTable.of(table, tableEntity) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdentifier, + HasPropertyMetadata::tablePropertiesMetadata, + table.properties())) + .withImported(imported); + } } diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/TopicDispatcher.java b/core/src/main/java/com/datastrato/gravitino/catalog/TopicDispatcher.java index 131a600c621..c874be8a3bc 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/TopicDispatcher.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/TopicDispatcher.java @@ -5,6 +5,7 @@ package com.datastrato.gravitino.catalog; +import com.datastrato.gravitino.NameIdentifier; import com.datastrato.gravitino.messaging.TopicCatalog; /** @@ -13,4 +14,8 @@ * dispatching or handling topic-related events or actions that are not covered by the standard * {@code TopicCatalog} operations. */ -public interface TopicDispatcher extends TopicCatalog {} +public interface TopicDispatcher extends TopicCatalog { + boolean importTopic(NameIdentifier identifier); + + boolean topicImported(NameIdentifier identifier); +} diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/TopicNormalizeDispatcher.java b/core/src/main/java/com/datastrato/gravitino/catalog/TopicNormalizeDispatcher.java index aa372b623c1..283872bd8e8 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/TopicNormalizeDispatcher.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/TopicNormalizeDispatcher.java @@ -72,6 +72,18 @@ public boolean dropTopic(NameIdentifier ident) { return dispatcher.dropTopic(applyCaseSensitive(ident, Capability.Scope.TOPIC, dispatcher)); } + @Override + public boolean importTopic(NameIdentifier identifier) { + return dispatcher.importTopic( + applyCaseSensitive(identifier, Capability.Scope.TOPIC, dispatcher)); + } + + @Override + public boolean topicImported(NameIdentifier identifier) { + return dispatcher.topicImported( + applyCaseSensitive(identifier, Capability.Scope.TOPIC, dispatcher)); + } + private NameIdentifier normalizeNameIdentifier(NameIdentifier ident) { Capability capability = dispatcher.getCatalogCapability(ident); return applyCapabilities(ident, Capability.Scope.TOPIC, capability); diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/TopicOperationDispatcher.java b/core/src/main/java/com/datastrato/gravitino/catalog/TopicOperationDispatcher.java index 6d52e92d75a..70ae7073c91 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/TopicOperationDispatcher.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/TopicOperationDispatcher.java @@ -70,36 +70,7 @@ public NameIdentifier[] listTopics(Namespace namespace) throws NoSuchSchemaExcep */ @Override public Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException { - NameIdentifier catalogIdent = getCatalogIdentifier(ident); - Topic topic = - doWithCatalog( - catalogIdent, - c -> c.doWithTopicOps(t -> t.loadTopic(ident)), - NoSuchTopicException.class); - - StringIdentifier stringId = getStringIdFromProperties(topic.properties()); - // Case 1: The topic is not created by Gravitino. - // Note: for Kafka catalog, stringId will not be null. Because there is no way to store the - // Gravitino - // ID in Kafka, therefor we use the topic ID as the Gravitino ID - if (stringId == null) { - return EntityCombinedTopic.of(topic) - .withHiddenPropertiesSet( - getHiddenPropertyNames( - catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties())); - } - - TopicEntity topicEntity = - operateOnEntity( - ident, - identifier -> store.get(identifier, TOPIC, TopicEntity.class), - "GET", - getStringIdFromProperties(topic.properties()).id()); - - return EntityCombinedTopic.of(topic, topicEntity) - .withHiddenPropertiesSet( - getHiddenPropertyNames( - catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties())); + return loadCombinedTopic(ident); } /** @@ -282,4 +253,101 @@ public boolean dropTopic(NameIdentifier ident) { ? droppedFromStore : droppedFromCatalog; } + + @Override + public boolean importTopic(NameIdentifier identifier) { + + EntityCombinedTopic combinedTopic = loadCombinedTopic(identifier); + + if (combinedTopic.imported()) { + return false; + } + + StringIdentifier stringId = getStringIdFromProperties(combinedTopic.topicProperties()); + + long uid; + if (stringId != null) { + // If the entity in the store doesn't match the external system, we use the data + // of external system to correct it. + uid = stringId.id(); + } else { + // If store doesn't exist entity, we sync the entity from the external system. + uid = idGenerator.nextId(); + } + + TopicEntity topicEntity = + TopicEntity.builder() + .withId(uid) + .withName(combinedTopic.name()) + .withComment(combinedTopic.comment()) + .withNamespace(identifier.namespace()) + .withAuditInfo( + AuditInfo.builder() + .withCreator(combinedTopic.auditInfo().creator()) + .withCreateTime(combinedTopic.auditInfo().createTime()) + .withLastModifier(combinedTopic.auditInfo().lastModifier()) + .withLastModifiedTime(combinedTopic.auditInfo().lastModifiedTime()) + .build()) + .build(); + + try { + store.put(topicEntity, true); + } catch (Exception e) { + LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", identifier, e); + throw new RuntimeException("Fail to access underlying storage"); + } + + return true; + } + + @Override + public boolean topicImported(NameIdentifier identifier) { + return loadCombinedTopic(identifier).imported(); + } + + private boolean isEntityExist(NameIdentifier ident) { + try { + return store.exists(ident, TOPIC); + } catch (Exception e) { + LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "exists", ident, e); + throw new RuntimeException("Fail to access underlying storage"); + } + } + + private EntityCombinedTopic loadCombinedTopic(NameIdentifier ident) { + NameIdentifier catalogIdent = getCatalogIdentifier(ident); + Topic topic = + doWithCatalog( + catalogIdent, + c -> c.doWithTopicOps(t -> t.loadTopic(ident)), + NoSuchTopicException.class); + + StringIdentifier stringId = getStringIdFromProperties(topic.properties()); + // Case 1: The topic is not created by Gravitino. + // Note: for Kafka catalog, stringId will not be null. Because there is no way to store the + // Gravitino + // ID in Kafka, therefor we use the topic ID as the Gravitino ID + if (stringId == null) { + return EntityCombinedTopic.of(topic) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties())) + .withImported(isEntityExist(ident)); + } + + TopicEntity topicEntity = + operateOnEntity( + ident, + identifier -> store.get(identifier, TOPIC, TopicEntity.class), + "GET", + getStringIdFromProperties(topic.properties()).id()); + + boolean imported = topicEntity != null; + + return EntityCombinedTopic.of(topic, topicEntity) + .withHiddenPropertiesSet( + getHiddenPropertyNames( + catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, topic.properties())) + .withImported(imported); + } } diff --git a/core/src/main/java/com/datastrato/gravitino/listener/SchemaEventDispatcher.java b/core/src/main/java/com/datastrato/gravitino/listener/SchemaEventDispatcher.java index 50c6df4ea18..80e6ca727f4 100644 --- a/core/src/main/java/com/datastrato/gravitino/listener/SchemaEventDispatcher.java +++ b/core/src/main/java/com/datastrato/gravitino/listener/SchemaEventDispatcher.java @@ -130,4 +130,14 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty throw e; } } + + @Override + public boolean importSchema(NameIdentifier identifier) { + return dispatcher.importSchema(identifier); + } + + @Override + public boolean schemaImported(NameIdentifier identifier) { + return dispatcher.schemaImported(identifier); + } } diff --git a/core/src/main/java/com/datastrato/gravitino/listener/TableEventDispatcher.java b/core/src/main/java/com/datastrato/gravitino/listener/TableEventDispatcher.java index deacbba2094..457a0786fd8 100644 --- a/core/src/main/java/com/datastrato/gravitino/listener/TableEventDispatcher.java +++ b/core/src/main/java/com/datastrato/gravitino/listener/TableEventDispatcher.java @@ -169,4 +169,14 @@ public boolean purgeTable(NameIdentifier ident) { public boolean tableExists(NameIdentifier ident) { return dispatcher.tableExists(ident); } + + @Override + public boolean importTable(NameIdentifier identifier) { + return dispatcher.importTable(identifier); + } + + @Override + public boolean tableImported(NameIdentifier identifier) { + return dispatcher.tableImported(identifier); + } } diff --git a/core/src/main/java/com/datastrato/gravitino/listener/TopicEventDispatcher.java b/core/src/main/java/com/datastrato/gravitino/listener/TopicEventDispatcher.java index dd628a534d2..92de7c1fe44 100644 --- a/core/src/main/java/com/datastrato/gravitino/listener/TopicEventDispatcher.java +++ b/core/src/main/java/com/datastrato/gravitino/listener/TopicEventDispatcher.java @@ -128,4 +128,14 @@ public Topic createTopic( throw e; } } + + @Override + public boolean importTopic(NameIdentifier identifier) { + return dispatcher.importTopic(identifier); + } + + @Override + public boolean topicImported(NameIdentifier identifier) { + return dispatcher.topicImported(identifier); + } } diff --git a/server/src/main/java/com/datastrato/gravitino/server/web/rest/ExceptionHandlers.java b/server/src/main/java/com/datastrato/gravitino/server/web/rest/ExceptionHandlers.java index c51c666b793..b53a97520be 100644 --- a/server/src/main/java/com/datastrato/gravitino/server/web/rest/ExceptionHandlers.java +++ b/server/src/main/java/com/datastrato/gravitino/server/web/rest/ExceptionHandlers.java @@ -89,6 +89,11 @@ public static Response handleGroupPermissionOperationException( return GroupPermissionOperationExceptionHandler.INSTANCE.handle(op, roles, parent, e); } + public static Response handleImportException( + OperationType op, String objects, String parent, Exception e) { + return ImportExceptionHandler.INSTANCE.handle(op, objects, parent, e); + } + private static class PartitionExceptionHandler extends BaseExceptionHandler { private static final ExceptionHandler INSTANCE = new PartitionExceptionHandler(); @@ -474,6 +479,34 @@ public Response handle(OperationType op, String roles, String parent, Exception } } + private static class ImportExceptionHandler extends BaseExceptionHandler { + private static final ExceptionHandler INSTANCE = new ImportExceptionHandler(); + + private static String getImportErrorMsg( + String objects, String operation, String metalake, String reason) { + return String.format( + "Failed to operate objects [%s] operation [%s] under metalake [%s], reason [%s]", + objects, operation, metalake, reason); + } + + @Override + public Response handle(OperationType op, String objects, String metalake, Exception e) { + String formatted = StringUtil.isBlank(objects) ? "" : " [" + objects + "]"; + String errorMsg = getImportErrorMsg(formatted, op.name(), metalake, getErrorMsg(e)); + LOG.warn(errorMsg, e); + + if (e instanceof IllegalArgumentException) { + return Utils.illegalArguments(errorMsg, e); + + } else if (e instanceof NotFoundException) { + return Utils.notFound(errorMsg, e); + + } else { + return super.handle(op, objects, metalake, e); + } + } + } + @VisibleForTesting static class BaseExceptionHandler extends ExceptionHandler { diff --git a/server/src/main/java/com/datastrato/gravitino/server/web/rest/ImportOperations.java b/server/src/main/java/com/datastrato/gravitino/server/web/rest/ImportOperations.java new file mode 100644 index 00000000000..2ec13f948d1 --- /dev/null +++ b/server/src/main/java/com/datastrato/gravitino/server/web/rest/ImportOperations.java @@ -0,0 +1,72 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.server.web.rest; + +import com.codahale.metrics.annotation.ResponseMetered; +import com.codahale.metrics.annotation.Timed; +import com.datastrato.gravitino.GravitinoEnv; +import com.datastrato.gravitino.MetadataObject; +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.dto.requests.ImportRequest; +import com.datastrato.gravitino.dto.responses.ImportResponse; +import com.datastrato.gravitino.lock.LockType; +import com.datastrato.gravitino.lock.TreeLockUtils; +import com.datastrato.gravitino.metrics.MetricNames; +import com.datastrato.gravitino.server.web.Utils; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Response; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Path("/metalakes/{metalake}/import") +public class ImportOperations { + private static final Logger LOG = LoggerFactory.getLogger(ImportOperations.class); + + @POST + @Produces("application/vnd.gravitino.v1+json") + @Timed(name = "import." + MetricNames.HTTP_PROCESS_DURATION, absolute = true) + @ResponseMetered(name = "import", absolute = true) + public Response importMetadataObjects( + @PathParam("metalake") String metalake, ImportRequest request) { + try { + + for (MetadataObject object : request.getObjects()) { + if (object.type() != MetadataObject.Type.TABLE + && object.type() != MetadataObject.Type.TOPIC + && object.type() != MetadataObject.Type.SCHEMA) { + throw new IllegalArgumentException( + "Import operation only supports TABLE, TOPIC and SCHEMA"); + } + + // Metadata object ignores the metalake namespace, so we should add it back. + NameIdentifier identifier; + identifier = NameIdentifier.parse(String.format("%s.%s", metalake, object.fullName())); + + String[] levels = identifier.namespace().levels(); + TreeLockUtils.doWithTreeLock( + NameIdentifier.of(levels), + LockType.WRITE, + () -> { + if (object.type() == MetadataObject.Type.TABLE) { + GravitinoEnv.getInstance().tableDispatcher().importTable(identifier); + } else if (object.type() == MetadataObject.Type.TOPIC) { + GravitinoEnv.getInstance().topicDispatcher().importTopic(identifier); + } else { + GravitinoEnv.getInstance().schemaDispatcher().importSchema(identifier); + } + return null; + }); + } + return Utils.ok(new ImportResponse(true)); + } catch (Exception e) { + return ExceptionHandlers.handleImportException( + OperationType.IMPORT, StringUtils.join(",", request.getObjects()), metalake, e); + } + } +} diff --git a/server/src/main/java/com/datastrato/gravitino/server/web/rest/OperationType.java b/server/src/main/java/com/datastrato/gravitino/server/web/rest/OperationType.java index 7649f43b33b..65ff001b248 100644 --- a/server/src/main/java/com/datastrato/gravitino/server/web/rest/OperationType.java +++ b/server/src/main/java/com/datastrato/gravitino/server/web/rest/OperationType.java @@ -16,5 +16,6 @@ public enum OperationType { REMOVE, DELETE, GRANT, - REVOKE + REVOKE, + IMPORT } diff --git a/server/src/main/java/com/datastrato/gravitino/server/web/rest/RoleOperations.java b/server/src/main/java/com/datastrato/gravitino/server/web/rest/RoleOperations.java index 147fa42abaa..9266a0b93bc 100644 --- a/server/src/main/java/com/datastrato/gravitino/server/web/rest/RoleOperations.java +++ b/server/src/main/java/com/datastrato/gravitino/server/web/rest/RoleOperations.java @@ -6,16 +6,21 @@ import com.codahale.metrics.annotation.ResponseMetered; import com.codahale.metrics.annotation.Timed; -import com.datastrato.gravitino.GravitinoEnv; import com.datastrato.gravitino.authorization.AccessControlManager; import com.datastrato.gravitino.authorization.Privilege; import com.datastrato.gravitino.authorization.Privileges; import com.datastrato.gravitino.authorization.SecurableObject; import com.datastrato.gravitino.authorization.SecurableObjects; +import com.datastrato.gravitino.catalog.SchemaDispatcher; +import com.datastrato.gravitino.catalog.TableDispatcher; +import com.datastrato.gravitino.catalog.TopicDispatcher; +import com.datastrato.gravitino.dto.authorization.SecurableObjectDTO; import com.datastrato.gravitino.dto.requests.RoleCreateRequest; import com.datastrato.gravitino.dto.responses.DeleteResponse; import com.datastrato.gravitino.dto.responses.RoleResponse; import com.datastrato.gravitino.dto.util.DTOConverters; +import com.datastrato.gravitino.lock.LockType; +import com.datastrato.gravitino.lock.TreeLockUtils; import com.datastrato.gravitino.metrics.MetricNames; import com.datastrato.gravitino.server.web.Utils; import com.google.common.base.Preconditions; @@ -77,6 +82,10 @@ public Response createRole(@PathParam("metalake") String metalake, RoleCreateReq request.getSecurableObjects() != null && request.getSecurableObjects().length == 1, "The size of securable objects must be 1"); + for (SecurableObjectDTO object : request.getSecurableObjects()) { + checkSecurableObjects(metalake, object); + } + return Utils.doAs( httpRequest, () -> { @@ -132,4 +141,76 @@ public Response deleteRole( return ExceptionHandlers.handleRoleException(OperationType.DELETE, role, metalake, e); } } + + // Check every securable object whether exists and is imported. + private void checkSecurableObjects(String metalake, SecurableObjectDTO object) { + NameIdentifier identifier; + + // Securable object ignores the metalake namespace, so we should add it back. + if (object.type() == MetadataObject.Type.METALAKE) { + identifier = NameIdentifier.parse(object.fullName()); + } else { + identifier = NameIdentifier.parse(String.format("%s.%s", metalake, object.fullName())); + } + + String existErrMsg = "Securable object % doesn't exist"; + String importErrMsg = + "Securable object %s isn't created by Gravitino, you need to import it first"; + + TreeLockUtils.doWithTreeLock( + identifier, + LockType.READ, + () -> { + if (object.type() == MetadataObject.Type.METALAKE) { + if (!GravitinoEnv.getInstance().metalakeDispatcher().metalakeExists(identifier)) { + throw new IllegalArgumentException(String.format(existErrMsg, object.fullName())); + } + + } else if (object.type() == MetadataObject.Type.CATALOG) { + if (!GravitinoEnv.getInstance().catalogDispatcher().catalogExists(identifier)) { + throw new IllegalArgumentException(String.format(existErrMsg, object.fullName())); + } + + } else if (object.type() == MetadataObject.Type.SCHEMA) { + SchemaDispatcher dispatcher = GravitinoEnv.getInstance().schemaDispatcher(); + if (!dispatcher.schemaExists(identifier)) { + throw new IllegalArgumentException(String.format(existErrMsg, object.fullName())); + } + + if (!dispatcher.schemaImported(identifier)) { + throw new IllegalArgumentException(String.format(importErrMsg, object.fullName())); + } + + } else if (object.type() == MetadataObject.Type.FILESET) { + if (!GravitinoEnv.getInstance().filesetDispatcher().filesetExists(identifier)) { + throw new IllegalArgumentException(String.format(existErrMsg, object.fullName())); + } + + } else if (object.type() == MetadataObject.Type.TABLE) { + TableDispatcher dispatcher = GravitinoEnv.getInstance().tableDispatcher(); + if (!dispatcher.tableExists(identifier)) { + throw new IllegalArgumentException(String.format(existErrMsg, object.fullName())); + } + + if (!dispatcher.tableImported(identifier)) { + throw new IllegalArgumentException(String.format(importErrMsg, object.fullName())); + } + + } else if (object.type() == MetadataObject.Type.TOPIC) { + TopicDispatcher dispatcher = GravitinoEnv.getInstance().topicDispatcher(); + if (!dispatcher.topicExists(identifier)) { + throw new IllegalArgumentException(String.format(existErrMsg, object.fullName())); + } + if (!dispatcher.topicImported(identifier)) { + throw new IllegalArgumentException(String.format(importErrMsg, object.fullName())); + } + + } else { + throw new IllegalArgumentException( + String.format("Doesn't support the type %s", object.type())); + } + + return null; + }); + } }