diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/MySQLBackend.java b/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/MySQLBackend.java index 097b9c3ff82..7db6eaf5b1e 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/MySQLBackend.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/MySQLBackend.java @@ -13,20 +13,21 @@ import com.datastrato.gravitino.exceptions.AlreadyExistsException; import com.datastrato.gravitino.exceptions.NoSuchEntityException; import com.datastrato.gravitino.meta.BaseMetalake; +import com.datastrato.gravitino.meta.CatalogEntity; import com.datastrato.gravitino.storage.relation.RelationBackend; +import com.datastrato.gravitino.storage.relation.mysql.mapper.CatalogMetaMapper; import com.datastrato.gravitino.storage.relation.mysql.mapper.MetalakeMetaMapper; import com.datastrato.gravitino.storage.relation.mysql.orm.SqlSessionFactoryHelper; import com.datastrato.gravitino.storage.relation.mysql.orm.SqlSessions; +import com.datastrato.gravitino.storage.relation.mysql.po.CatalogPO; import com.datastrato.gravitino.storage.relation.mysql.po.MetalakePO; import com.datastrato.gravitino.storage.relation.mysql.utils.POConverters; -import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.base.Preconditions; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.function.Function; -import java.util.stream.Collectors; import org.apache.ibatis.session.SqlSession; /** @@ -52,18 +53,18 @@ public List list( ((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class)) .listMetalakePOs(); return metalakePOS != null - ? metalakePOS.stream() - .map( - metalakePO -> { - try { - return (E) POConverters.fromMetalakePO(metalakePO); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - }) - .collect(Collectors.toList()) + ? (List) POConverters.fromMetalakePOs(metalakePOS) : new ArrayList<>(); case CATALOG: + Preconditions.checkArgument( + namespace.levels().length == 1, + "Catalog entity namespace should have only one level"); + List catalogPOS = + ((CatalogMetaMapper) SqlSessions.getMapper(CatalogMetaMapper.class)) + .listCatalogPOsByMetalakeId(getMetalakeIdByName(namespace.level(0))); + return catalogPOS != null + ? (List) POConverters.fromCatalogPOs(catalogPOS, namespace) + : new ArrayList<>(); case SCHEMA: case TABLE: case FILESET: @@ -81,11 +82,22 @@ public boolean exists(NameIdentifier ident, Entity.EntityType entityType) { try (SqlSession session = SqlSessions.getSqlSession()) { switch (entityType) { case METALAKE: - MetalakePO metalakePO = + Long metalakeId = ((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class)) - .selectMetalakeMetaByName(ident.name()); - return metalakePO != null; + .selectMetalakeIdMetaByName(ident.name()); + return metalakeId != null; case CATALOG: + Preconditions.checkArgument( + ident.hasNamespace() && ident.namespace().levels().length == 1, + "Catalog entity namespace should have only one level"); + try { + Long catalogId = + getCatalogIdByNameAndMetalakeId( + ident.name(), getMetalakeIdByName(ident.namespace().level(0))); + return catalogId != null; + } catch (NoSuchEntityException e) { + return false; + } case SCHEMA: case TABLE: case FILESET: @@ -111,7 +123,6 @@ public void insert(E e, boolean overwritten) throw new EntityAlreadyExistsException( String.format("Metalake entity: %s already exists", e.nameIdentifier().name())); } - if (overwritten) { ((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class)) .insertMetalakeMetaWithUpdate(POConverters.toMetalakePO((BaseMetalake) e)); @@ -120,6 +131,28 @@ public void insert(E e, boolean overwritten) .insertMetalakeMeta(POConverters.toMetalakePO((BaseMetalake) e)); } SqlSessions.commitAndCloseSqlSession(); + } else if (e instanceof CatalogEntity) { + Preconditions.checkArgument( + e.nameIdentifier().hasNamespace() + && e.nameIdentifier().namespace().levels().length == 1, + "Catalog entity namespace should have only one level"); + Long metalakeId = getMetalakeIdByName(e.nameIdentifier().namespace().level(0)); + CatalogPO catalogPO = + ((CatalogMetaMapper) SqlSessions.getMapper(CatalogMetaMapper.class)) + .selectCatalogMetaByNameAndMetalakeId(e.nameIdentifier().name(), metalakeId); + if (!overwritten && catalogPO != null) { + throw new EntityAlreadyExistsException( + String.format("Catalog entity: %s already exists", e.nameIdentifier().name())); + } + if (overwritten) { + ((CatalogMetaMapper) SqlSessions.getMapper(CatalogMetaMapper.class)) + .insertCatalogMetaWithUpdate( + POConverters.toCatalogPO((CatalogEntity) e, metalakeId)); + } else { + ((CatalogMetaMapper) SqlSessions.getMapper(CatalogMetaMapper.class)) + .insertCatalogMeta(POConverters.toCatalogPO((CatalogEntity) e, metalakeId)); + } + SqlSessions.commitAndCloseSqlSession(); } else { SqlSessions.closeSqlSession(); throw new IllegalArgumentException( @@ -155,6 +188,26 @@ public E update( SqlSessions.commitAndCloseSqlSession(); return (E) newMetalakeEntity; case CATALOG: + Preconditions.checkArgument( + ident.hasNamespace() && ident.namespace().levels().length == 1, + "Catalog entity namespace should have only one level"); + Long metalakeId = getMetalakeIdByName(ident.namespace().level(0)); + CatalogPO oldCatalogPO = + ((CatalogMetaMapper) SqlSessions.getMapper(CatalogMetaMapper.class)) + .selectCatalogMetaByNameAndMetalakeId(ident.name(), metalakeId); + CatalogEntity oldCatalogEntity = + POConverters.fromCatalogPO(oldCatalogPO, ident.namespace()); + CatalogEntity newCatalogEntity = (CatalogEntity) updater.apply((E) oldCatalogEntity); + Preconditions.checkArgument( + Objects.equals(oldCatalogEntity.id(), newCatalogEntity.id()), + String.format( + "The updated catalog entity id: %s should same with the catalog entity id before: %s", + newCatalogEntity.id(), oldCatalogEntity.id())); + ((CatalogMetaMapper) SqlSessions.getMapper(CatalogMetaMapper.class)) + .updateCatalogMeta( + POConverters.toCatalogPO(newCatalogEntity, metalakeId), oldCatalogPO); + SqlSessions.commitAndCloseSqlSession(); + return (E) newCatalogEntity; case SCHEMA: case TABLE: case FILESET: @@ -184,6 +237,17 @@ public E get( } return (E) POConverters.fromMetalakePO(metalakePO); case CATALOG: + Preconditions.checkArgument( + ident.hasNamespace() && ident.namespace().levels().length == 1, + "Catalog entity namespace should have only one level"); + CatalogPO catalogPO = + ((CatalogMetaMapper) SqlSessions.getMapper(CatalogMetaMapper.class)) + .selectCatalogMetaByNameAndMetalakeId( + ident.name(), getMetalakeIdByName(ident.namespace().level(0))); + if (catalogPO == null) { + return null; + } + return (E) POConverters.fromCatalogPO(catalogPO, ident.namespace()); case SCHEMA: case TABLE: case FILESET: @@ -202,20 +266,32 @@ public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolea try { switch (entityType) { case METALAKE: - Long metalakeId = - ((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class)) - .selectMetalakeIdMetaByName(ident.name()); - if (metalakeId != null) { - // delete metalake - ((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class)) - .deleteMetalakeMetaById(metalakeId); - if (cascade) { - // TODO We will cascade delete the metadata of sub-resources under the metalake - } - SqlSessions.commitAndCloseSqlSession(); + Long metalakeId = getMetalakeIdByName(ident.name()); + // delete metalake + ((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class)) + .deleteMetalakeMetaById(metalakeId); + if (cascade) { + ((CatalogMetaMapper) SqlSessions.getMapper(CatalogMetaMapper.class)) + .deleteCatalogMetasByMetalakeId(metalakeId); + // TODO delete schema, table, fileset } + SqlSessions.commitAndCloseSqlSession(); return true; case CATALOG: + Preconditions.checkArgument( + ident.hasNamespace() && ident.namespace().levels().length == 1, + "Catalog entity namespace should have only one level"); + Long catalogId = + getCatalogIdByNameAndMetalakeId( + ident.name(), getMetalakeIdByName(ident.namespace().level(0))); + // delete catalog + ((CatalogMetaMapper) SqlSessions.getMapper(CatalogMetaMapper.class)) + .deleteCatalogMetasById(catalogId); + if (cascade) { + // TODO delete schema, table, fileset + } + SqlSessions.commitAndCloseSqlSession(); + return true; case SCHEMA: case TABLE: case FILESET: @@ -230,6 +306,26 @@ public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolea } } + private Long getMetalakeIdByName(String name) { + Long metalakeId = + ((MetalakeMetaMapper) SqlSessions.getMapper(MetalakeMetaMapper.class)) + .selectMetalakeIdMetaByName(name); + if (metalakeId == null) { + throw new NoSuchEntityException(String.format("Metalake entity: %s not exists", name)); + } + return metalakeId; + } + + private Long getCatalogIdByNameAndMetalakeId(String name, Long metalakeId) { + Long catalogId = + ((CatalogMetaMapper) SqlSessions.getMapper(CatalogMetaMapper.class)) + .selectCatalogIdByNameAndMetalakeId(name, metalakeId); + if (catalogId == null) { + throw new NoSuchEntityException(String.format("Catalog entity: %s not exists", name)); + } + return catalogId; + } + @Override public void close() throws IOException {} } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/mapper/CatalogMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/mapper/CatalogMetaMapper.java new file mode 100644 index 00000000000..8bb47599549 --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/mapper/CatalogMetaMapper.java @@ -0,0 +1,112 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.storage.relation.mysql.mapper; + +import com.datastrato.gravitino.storage.relation.mysql.po.CatalogPO; +import java.util.List; +import org.apache.ibatis.annotations.Delete; +import org.apache.ibatis.annotations.Insert; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; +import org.apache.ibatis.annotations.Update; + +public interface CatalogMetaMapper { + String TABLE_NAME = "catalog_meta"; + + @Select( + "SELECT id, catalog_name as catalogName, metalake_id as metalakeId," + + " type, provider, catalog_comment as catalogComment," + + " properties, audit_info as auditInfo" + + " FROM " + + TABLE_NAME) + List listCatalogPOsByMetalakeId(@Param("metalakeId") Long metalakeId); + + @Select( + "SELECT id FROM " + + TABLE_NAME + + " WHERE catalog_name = #{catalogName} and metalake_id = #{metalakeId}") + Long selectCatalogIdByNameAndMetalakeId( + @Param("catalogName") String name, @Param("metalakeId") Long metalakeId); + + @Select( + "SELECT id, catalog_name as catalogName," + + " metalake_id as metalakeId, type, provider," + + " catalog_comment as catalogComment, properties," + + " audit_info as auditInfo" + + " FROM " + + TABLE_NAME + + " WHERE catalog_name = #{catalogName} and metalake_id = #{metalakeId}") + CatalogPO selectCatalogMetaByNameAndMetalakeId( + @Param("catalogName") String name, @Param("metalakeId") Long metalakeId); + + @Insert( + "INSERT INTO " + + TABLE_NAME + + "(id, catalog_name, metalake_id, type, provider, catalog_comment, properties, audit_info)" + + " VALUES(" + + " #{catalogMeta.id}," + + " #{catalogMeta.catalogName}," + + " #{catalogMeta.metalakeId}," + + " #{catalogMeta.type}," + + " #{catalogMeta.provider}," + + " #{catalogMeta.catalogComment}," + + " #{catalogMeta.properties}," + + " #{catalogMeta.auditInfo}" + + " )") + void insertCatalogMeta(@Param("catalogMeta") CatalogPO catalogPO); + + @Insert( + "INSERT INTO " + + TABLE_NAME + + "(id, catalog_name, metalake_id, type, provider, metalake_comment, properties, audit_info)" + + " VALUES(" + + " #{catalogMeta.id}," + + " #{catalogMeta.catalogName}," + + " #{catalogMeta.metalakeId}," + + " #{catalogMeta.type}," + + " #{catalogMeta.provider}," + + " #{catalogMeta.catalogComment}," + + " #{catalogMeta.properties}," + + " #{catalogMeta.auditInfo}" + + " )" + + " ON DUPLICATE KEY UPDATE" + + " catalog_name = #{catalogMeta.catalogName}," + + " metalake_id = #{catalogMeta.metalakeId}," + + " type = #{catalogMeta.type}," + + " provider = #{catalogMeta.provider}," + + " catalog_comment = #{catalogMeta.catalogComment}," + + " properties = #{catalogMeta.properties}," + + " audit_info = #{catalogMeta.auditInfo}") + void insertCatalogMetaWithUpdate(@Param("catalogMeta") CatalogPO catalogPO); + + @Update( + "UPDATE " + + TABLE_NAME + + " SET catalog_name = #{newCatalogMeta.metalakeName}," + + " metalake_id = #{newCatalogMeta.metalakeId}," + + " type = #{newCatalogMeta.type}," + + " provider = #{newCatalogMeta.provider}," + + " catalog_comment = #{newCatalogMeta.catalogComment}," + + " properties = #{newCatalogMeta.properties}," + + " audit_info = #{newCatalogMeta.auditInfo}" + + " WHERE id = #{oldCatalogMeta.id}" + + " and catalog_name = #{oldCatalogMeta.catalogName}" + + " and metalake_id = #{oldCatalogMeta.metalakeId}" + + " and type = #{oldCatalogMeta.type}" + + " and provider = #{oldCatalogMeta.provider}" + + " and catalog_comment = #{oldCatalogMeta.catalogComment}" + + " and properties = #{oldCatalogMeta.properties}" + + " and audit_info = #{oldCatalogMeta.auditInfo}") + void updateCatalogMeta( + @Param("newCatalogMeta") CatalogPO newCatalogPO, + @Param("oldCatalogMeta") CatalogPO oldCatalogPO); + + @Delete("DELETE FROM " + TABLE_NAME + " WHERE id = #{id}") + Integer deleteCatalogMetasById(@Param("id") Long id); + + @Delete("DELETE FROM " + TABLE_NAME + " WHERE metalake_id = #{metalakeId}") + Integer deleteCatalogMetasByMetalakeId(@Param("metalakeId") Long metalakeId); +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/orm/SqlSessionFactoryHelper.java b/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/orm/SqlSessionFactoryHelper.java index 67162dd761c..0ad804b5983 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/orm/SqlSessionFactoryHelper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/orm/SqlSessionFactoryHelper.java @@ -7,6 +7,7 @@ import com.datastrato.gravitino.Config; import com.datastrato.gravitino.Configs; +import com.datastrato.gravitino.storage.relation.mysql.mapper.CatalogMetaMapper; import com.datastrato.gravitino.storage.relation.mysql.mapper.MetalakeMetaMapper; import com.google.common.base.Preconditions; import java.time.Duration; @@ -68,6 +69,7 @@ public void init(Config config) { Environment environment = new Environment("development", transactionFactory, dataSource); Configuration configuration = new Configuration(environment); configuration.addMapper(MetalakeMetaMapper.class); + configuration.addMapper(CatalogMetaMapper.class); if (sqlSessionFactory == null) { synchronized (SqlSessionFactoryHelper.class) { if (sqlSessionFactory == null) { diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/po/CatalogPO.java b/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/po/CatalogPO.java new file mode 100644 index 00000000000..8a64a8bc4e4 --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/po/CatalogPO.java @@ -0,0 +1,168 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.storage.relation.mysql.po; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.base.Objects; + +public class CatalogPO { + private Long id; + private String catalogName; + private Long metalakeId; + private String type; + private String provider; + private String catalogComment; + private String properties; + private String auditInfo; + + public Long getId() { + return id; + } + + public void setId(Long id) { + this.id = id; + } + + public String getCatalogName() { + return catalogName; + } + + public void setCatalogName(String catalogName) { + this.catalogName = catalogName; + } + + public Long getMetalakeId() { + return metalakeId; + } + + public void setMetalakeId(Long metalakeId) { + this.metalakeId = metalakeId; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getProvider() { + return provider; + } + + public void setProvider(String provider) { + this.provider = provider; + } + + public String getCatalogComment() { + return catalogComment; + } + + public void setCatalogComment(String catalogComment) { + this.catalogComment = catalogComment; + } + + public String getProperties() { + return properties; + } + + public void setProperties(String properties) { + this.properties = properties; + } + + public String getAuditInfo() { + return auditInfo; + } + + public void setAuditInfo(String auditInfo) { + this.auditInfo = auditInfo; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof CatalogPO)) { + return false; + } + CatalogPO catalogPO = (CatalogPO) o; + return Objects.equal(getId(), catalogPO.getId()) + && Objects.equal(getCatalogName(), catalogPO.getCatalogName()) + && Objects.equal(getMetalakeId(), catalogPO.getMetalakeId()) + && Objects.equal(getType(), catalogPO.getType()) + && Objects.equal(getProvider(), catalogPO.getProvider()) + && Objects.equal(getCatalogComment(), catalogPO.getCatalogComment()) + && Objects.equal(getProperties(), catalogPO.getProperties()) + && Objects.equal(getAuditInfo(), catalogPO.getAuditInfo()); + } + + @Override + public int hashCode() { + return Objects.hashCode( + getId(), + getCatalogName(), + getMetalakeId(), + getType(), + getProvider(), + getCatalogComment(), + getProperties(), + getAuditInfo()); + } + + public static class Builder { + private final CatalogPO metalakePO; + + public Builder() { + metalakePO = new CatalogPO(); + } + + public CatalogPO.Builder withId(Long id) { + metalakePO.id = id; + return this; + } + + public CatalogPO.Builder withCatalogName(String name) { + metalakePO.catalogName = name; + return this; + } + + public CatalogPO.Builder withMetalakeId(Long metalakeId) { + metalakePO.metalakeId = metalakeId; + return this; + } + + public CatalogPO.Builder withType(String type) { + metalakePO.type = type; + return this; + } + + public CatalogPO.Builder withProvider(String provider) { + metalakePO.provider = provider; + return this; + } + + public CatalogPO.Builder withCatalogComment(String comment) { + metalakePO.catalogComment = comment; + return this; + } + + public CatalogPO.Builder withProperties(String properties) throws JsonProcessingException { + metalakePO.properties = properties; + return this; + } + + public CatalogPO.Builder withAuditInfo(String auditInfo) throws JsonProcessingException { + metalakePO.auditInfo = auditInfo; + return this; + } + + public CatalogPO build() { + return metalakePO; + } + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/utils/POConverters.java b/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/utils/POConverters.java index d15c3a1f918..8381ec68eae 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/utils/POConverters.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relation/mysql/utils/POConverters.java @@ -5,13 +5,19 @@ package com.datastrato.gravitino.storage.relation.mysql.utils; +import com.datastrato.gravitino.Catalog; +import com.datastrato.gravitino.Namespace; import com.datastrato.gravitino.json.JsonUtils; import com.datastrato.gravitino.meta.AuditInfo; import com.datastrato.gravitino.meta.BaseMetalake; +import com.datastrato.gravitino.meta.CatalogEntity; import com.datastrato.gravitino.meta.SchemaVersion; +import com.datastrato.gravitino.storage.relation.mysql.po.CatalogPO; import com.datastrato.gravitino.storage.relation.mysql.po.MetalakePO; import com.fasterxml.jackson.core.JsonProcessingException; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** POConverters is a utility class to convert PO to Base and vice versa. */ public class POConverters { @@ -22,7 +28,7 @@ private POConverters() {} * * @param baseMetalake BaseMetalake object * @return MetalakePO object from BaseMetalake object - * @throws JsonProcessingException + * @throws JsonProcessingException if there is an error in processing the JSON */ public static MetalakePO toMetalakePO(BaseMetalake baseMetalake) throws JsonProcessingException { return new MetalakePO.Builder() @@ -40,7 +46,7 @@ public static MetalakePO toMetalakePO(BaseMetalake baseMetalake) throws JsonProc * * @param metalakePO MetalakePO object * @return BaseMetalake object from MetalakePO object - * @throws JsonProcessingException + * @throws JsonProcessingException if there is an error in processing the JSON */ public static BaseMetalake fromMetalakePO(MetalakePO metalakePO) throws JsonProcessingException { return new BaseMetalake.Builder() @@ -54,4 +60,89 @@ public static BaseMetalake fromMetalakePO(MetalakePO metalakePO) throws JsonProc JsonUtils.objectMapper().readValue(metalakePO.getSchemaVersion(), SchemaVersion.class)) .build(); } + + /** + * Convert list of {@link MetalakePO} to list of {@link BaseMetalake} + * + * @param metalakePOS list of MetalakePO objects + * @return list of BaseMetalake objects from list of MetalakePO objects + */ + public static List fromMetalakePOs(List metalakePOS) { + return metalakePOS.stream() + .map( + metalakePO -> { + try { + return POConverters.fromMetalakePO(metalakePO); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + } + + /** + * Convert {@link CatalogEntity} to {@link CatalogPO} + * + * @param catalogEntity CatalogEntity object to be converted + * @param metalakeId Metalake id to be associated with the catalog + * @return CatalogPO object from CatalogEntity object + * @throws JsonProcessingException if there is an error in processing the JSON + */ + public static CatalogPO toCatalogPO(CatalogEntity catalogEntity, Long metalakeId) + throws JsonProcessingException { + return new CatalogPO.Builder() + .withId(catalogEntity.id()) + .withCatalogName(catalogEntity.name()) + .withMetalakeId(metalakeId) + .withType(catalogEntity.type().name()) + .withProvider(catalogEntity.getProvider()) + .withCatalogComment(catalogEntity.getComment()) + .withProperties(JsonUtils.objectMapper().writeValueAsString(catalogEntity.getProperties())) + .withAuditInfo(JsonUtils.objectMapper().writeValueAsString(catalogEntity.auditInfo())) + .build(); + } + + /** + * Convert {@link CatalogPO} to {@link CatalogEntity} + * + * @param catalogPO CatalogPO object to be converted + * @param namespace Namespace object to be associated with the catalog + * @return CatalogEntity object from CatalogPO object + * @throws JsonProcessingException if there is an error in processing the JSON + */ + public static CatalogEntity fromCatalogPO(CatalogPO catalogPO, Namespace namespace) + throws JsonProcessingException { + return new CatalogEntity.Builder() + .withId(catalogPO.getId()) + .withName(catalogPO.getCatalogName()) + .withNamespace(namespace) + .withType(Catalog.Type.valueOf(catalogPO.getType())) + .withProvider(catalogPO.getProvider()) + .withComment(catalogPO.getCatalogComment()) + .withProperties(JsonUtils.objectMapper().readValue(catalogPO.getProperties(), Map.class)) + .withAuditInfo( + JsonUtils.objectMapper().readValue(catalogPO.getAuditInfo(), AuditInfo.class)) + .build(); + } + + /** + * Convert list of {@link MetalakePO} to list of {@link BaseMetalake} + * + * @param catalogPOS list of MetalakePO objects + * @param namespace Namespace object to be associated with the metalake + * @return list of BaseMetalake objects from list of MetalakePO objects + */ + public static List fromCatalogPOs( + List catalogPOS, Namespace namespace) { + return catalogPOS.stream() + .map( + catalogPO -> { + try { + return POConverters.fromCatalogPO(catalogPO, namespace); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }) + .collect(Collectors.toList()); + } } diff --git a/core/src/main/resources/mysql/mysql_init.sql b/core/src/main/resources/mysql/mysql_init.sql index 54b28a6d7c8..ebcd2d3da1f 100644 --- a/core/src/main/resources/mysql/mysql_init.sql +++ b/core/src/main/resources/mysql/mysql_init.sql @@ -8,4 +8,18 @@ CREATE TABLE IF NOT EXISTS `metalake_meta` `schema_version` TEXT NOT NULL COMMENT 'metalake schema version info', PRIMARY KEY (`id`), UNIQUE KEY `uk_mn` (`metalake_name`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'metalake metadata'; \ No newline at end of file +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'metalake metadata'; + +CREATE TABLE IF NOT EXISTS `catalog_meta` +( + `id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'catalog id', + `catalog_name` VARCHAR(128) NOT NULL COMMENT 'catalog name', + `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id', + `type` VARCHAR(64) NOT NULL COMMENT 'catalog type', + `provider` VARCHAR(64) NOT NULL COMMENT 'catalog provider', + `catalog_comment` VARCHAR(256) DEFAULT '' COMMENT 'catalog comment', + `properties` MEDIUMTEXT DEFAULT NULL COMMENT 'catalog properties', + `audit_info` MEDIUMTEXT NOT NULL COMMENT 'catalog audit info', + PRIMARY KEY (`id`), + UNIQUE KEY `uk_cn_mid` (`catalog_name`, `metalake_id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'catalog metadata'; \ No newline at end of file diff --git a/core/src/test/resources/h2/h2-init.sql b/core/src/test/resources/h2/h2-init.sql index 7f5590ac8e6..77f8e11a93f 100644 --- a/core/src/test/resources/h2/h2-init.sql +++ b/core/src/test/resources/h2/h2-init.sql @@ -6,5 +6,18 @@ CREATE TABLE IF NOT EXISTS `metalake_meta` ( `audit_info` MEDIUMTEXT NOT NULL COMMENT 'metalake audit info', `schema_version` TEXT NOT NULL COMMENT 'metalake schema version info', PRIMARY KEY (id), - CONSTRAINT uk_mn UNIQUE (metalake_name) -) ENGINE = InnoDB; \ No newline at end of file + CONSTRAINT uk_mn UNIQUE (metalake_name) +) ENGINE = InnoDB; + +CREATE TABLE IF NOT EXISTS `catalog_meta` ( + `id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'catalog id', + `catalog_name` VARCHAR(128) NOT NULL COMMENT 'catalog name', + `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id', + `type` VARCHAR(64) NOT NULL COMMENT 'catalog type', + `provider` VARCHAR(64) NOT NULL COMMENT 'catalog provider', + `catalog_comment` VARCHAR(256) DEFAULT '' COMMENT 'catalog comment', + `properties` MEDIUMTEXT DEFAULT NULL COMMENT 'catalog properties', + `audit_info` MEDIUMTEXT NOT NULL COMMENT 'catalog audit info', + PRIMARY KEY (id), + CONSTRAINT uk_cn_mid UNIQUE (catalog_name, metalake_id) +) ENGINE = InnoDB;