From 49bacdf2a7f3d5c417412866063234773918d2ff Mon Sep 17 00:00:00 2001 From: xloya <982052490@qq.com> Date: Tue, 5 Mar 2024 15:21:42 +0800 Subject: [PATCH] [#2083] feat(core): Add JDBC backend operations for table (#2384) ### What changes were proposed in this pull request? The purpose of this PR is to implement JDBC backend operation for Table metadata. Depend on #2377 . Metadata operations of Fileset will be supported in the remaining PRs. ### Why are the changes needed? Fix: #2083 ### How was this patch tested? Add unit tests to test the table metadata ops. --------- Co-authored-by: xiaojiebao --- .../storage/relational/JDBCBackend.java | 12 + .../relational/mapper/TableMetaMapper.java | 154 +++++++++ .../storage/relational/po/TablePO.java | 160 +++++++++ .../service/CatalogMetaService.java | 5 + .../service/MetalakeMetaService.java | 5 + .../relational/service/SchemaMetaService.java | 20 +- .../relational/service/TableMetaService.java | 190 ++++++++++ .../session/SqlSessionFactoryHelper.java | 2 + .../relational/utils/POConverters.java | 112 +++++- core/src/main/resources/mysql/mysql_init.sql | 18 +- .../relational/TestRelationalEntityStore.java | 325 +++++++++++++++++- .../relational/utils/TestPOConverters.java | 109 ++++++ core/src/test/resources/h2/h2-init.sql | 20 +- 13 files changed, 1099 insertions(+), 33 deletions(-) create mode 100644 core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TableMetaMapper.java create mode 100644 core/src/main/java/com/datastrato/gravitino/storage/relational/po/TablePO.java create mode 100644 core/src/main/java/com/datastrato/gravitino/storage/relational/service/TableMetaService.java diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java index a02d6499e58..14f932fadd9 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java @@ -18,9 +18,11 @@ import com.datastrato.gravitino.meta.BaseMetalake; import com.datastrato.gravitino.meta.CatalogEntity; import com.datastrato.gravitino.meta.SchemaEntity; +import com.datastrato.gravitino.meta.TableEntity; import com.datastrato.gravitino.storage.relational.service.CatalogMetaService; import com.datastrato.gravitino.storage.relational.service.MetalakeMetaService; import com.datastrato.gravitino.storage.relational.service.SchemaMetaService; +import com.datastrato.gravitino.storage.relational.service.TableMetaService; import com.datastrato.gravitino.storage.relational.session.SqlSessionFactoryHelper; import java.io.IOException; import java.util.List; @@ -50,6 +52,8 @@ public List list( return (List) CatalogMetaService.getInstance().listCatalogsByNamespace(namespace); case SCHEMA: return (List) SchemaMetaService.getInstance().listSchemasByNamespace(namespace); + case TABLE: + return (List) TableMetaService.getInstance().listTablesByNamespace(namespace); default: throw new UnsupportedEntityTypeException( "Unsupported entity type: %s for list operation", entityType); @@ -75,6 +79,8 @@ public void insert(E e, boolean overwritten) CatalogMetaService.getInstance().insertCatalog((CatalogEntity) e, overwritten); } else if (e instanceof SchemaEntity) { SchemaMetaService.getInstance().insertSchema((SchemaEntity) e, overwritten); + } else if (e instanceof TableEntity) { + TableMetaService.getInstance().insertTable((TableEntity) e, overwritten); } else { throw new UnsupportedEntityTypeException( "Unsupported entity type: %s for insert operation", e.getClass()); @@ -92,6 +98,8 @@ public E update( return (E) CatalogMetaService.getInstance().updateCatalog(ident, updater); case SCHEMA: return (E) SchemaMetaService.getInstance().updateSchema(ident, updater); + case TABLE: + return (E) TableMetaService.getInstance().updateTable(ident, updater); default: throw new UnsupportedEntityTypeException( "Unsupported entity type: %s for update operation", entityType); @@ -108,6 +116,8 @@ public E get( return (E) CatalogMetaService.getInstance().getCatalogByIdentifier(ident); case SCHEMA: return (E) SchemaMetaService.getInstance().getSchemaByIdentifier(ident); + case TABLE: + return (E) TableMetaService.getInstance().getTableByIdentifier(ident); default: throw new UnsupportedEntityTypeException( "Unsupported entity type: %s for get operation", entityType); @@ -123,6 +133,8 @@ public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolea return CatalogMetaService.getInstance().deleteCatalog(ident, cascade); case SCHEMA: return SchemaMetaService.getInstance().deleteSchema(ident, cascade); + case TABLE: + return TableMetaService.getInstance().deleteTable(ident); default: throw new UnsupportedEntityTypeException( "Unsupported entity type: %s for delete operation", entityType); diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TableMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TableMetaMapper.java new file mode 100644 index 00000000000..f1de8a51bd5 --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TableMetaMapper.java @@ -0,0 +1,154 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.storage.relational.mapper; + +import com.datastrato.gravitino.storage.relational.po.TablePO; +import java.util.List; +import org.apache.ibatis.annotations.Insert; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; +import org.apache.ibatis.annotations.Update; + +/** + * A MyBatis Mapper for table meta operation SQLs. + * + *

This interface class is a specification defined by MyBatis. It requires this interface class + * to identify the corresponding SQLs for execution. We can write SQLs in an additional XML file, or + * write SQLs with annotations in this interface Mapper. See: + */ +public interface TableMetaMapper { + String TABLE_NAME = "table_meta"; + + @Select( + "SELECT table_id as tableId, table_name as tableName," + + " metalake_id as metalakeId, catalog_id as catalogId," + + " schema_id as schemaId, audit_info as auditInfo," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE schema_id = #{schemaId} AND deleted_at = 0") + List listTablePOsBySchemaId(@Param("schemaId") Long schemaId); + + @Select( + "SELECT table_id as tableId FROM " + + TABLE_NAME + + " WHERE schema_id = #{schemaId} AND table_name = #{tableName}" + + " AND deleted_at = 0") + Long selectTableIdBySchemaIdAndName( + @Param("schemaId") Long schemaId, @Param("tableName") String name); + + @Select( + "SELECT table_id as tableId, table_name as tableName," + + " metalake_id as metalakeId, catalog_id as catalogId," + + " schema_id as schemaId, audit_info as auditInfo," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE schema_id = #{schemaId} AND table_name = #{tableName} AND deleted_at = 0") + TablePO selectTableMetaBySchemaIdAndName( + @Param("schemaId") Long schemaId, @Param("tableName") String name); + + @Insert( + "INSERT INTO " + + TABLE_NAME + + "(table_id, table_name, metalake_id," + + " catalog_id, schema_id, audit_info," + + " current_version, last_version, deleted_at)" + + " VALUES(" + + " #{tableMeta.tableId}," + + " #{tableMeta.tableName}," + + " #{tableMeta.metalakeId}," + + " #{tableMeta.catalogId}," + + " #{tableMeta.schemaId}," + + " #{tableMeta.auditInfo}," + + " #{tableMeta.currentVersion}," + + " #{tableMeta.lastVersion}," + + " #{tableMeta.deletedAt}" + + " )") + void insertTableMeta(@Param("tableMeta") TablePO tablePO); + + @Insert( + "INSERT INTO " + + TABLE_NAME + + "(table_id, table_name, metalake_id," + + " catalog_id, schema_id, audit_info," + + " current_version, last_version, deleted_at)" + + " VALUES(" + + " #{tableMeta.tableId}," + + " #{tableMeta.tableName}," + + " #{tableMeta.metalakeId}," + + " #{tableMeta.catalogId}," + + " #{tableMeta.schemaId}," + + " #{tableMeta.auditInfo}," + + " #{tableMeta.currentVersion}," + + " #{tableMeta.lastVersion}," + + " #{tableMeta.deletedAt}" + + " )" + + " ON DUPLICATE KEY UPDATE" + + " table_name = #{tableMeta.tableName}," + + " metalake_id = #{tableMeta.metalakeId}," + + " catalog_id = #{tableMeta.catalogId}," + + " schema_id = #{tableMeta.schemaId}," + + " audit_info = #{tableMeta.auditInfo}," + + " current_version = #{tableMeta.currentVersion}," + + " last_version = #{tableMeta.lastVersion}," + + " deleted_at = #{tableMeta.deletedAt}") + void insertTableMetaOnDuplicateKeyUpdate(@Param("tableMeta") TablePO tablePO); + + @Update( + "UPDATE " + + TABLE_NAME + + " SET table_name = #{newTableMeta.tableName}," + + " metalake_id = #{newTableMeta.metalakeId}," + + " catalog_id = #{newTableMeta.catalogId}," + + " schema_id = #{newTableMeta.schemaId}," + + " audit_info = #{newTableMeta.auditInfo}," + + " current_version = #{newTableMeta.currentVersion}," + + " last_version = #{newTableMeta.lastVersion}," + + " deleted_at = #{newTableMeta.deletedAt}" + + " WHERE table_id = #{oldTableMeta.tableId}" + + " AND table_name = #{oldTableMeta.tableName}" + + " AND metalake_id = #{oldTableMeta.metalakeId}" + + " AND catalog_id = #{oldTableMeta.catalogId}" + + " AND schema_id = #{oldTableMeta.schemaId}" + + " AND audit_info = #{oldTableMeta.auditInfo}" + + " AND current_version = #{oldTableMeta.currentVersion}" + + " AND last_version = #{oldTableMeta.lastVersion}" + + " AND deleted_at = 0") + Integer updateTableMeta( + @Param("newTableMeta") TablePO newTablePO, @Param("oldTableMeta") TablePO oldTablePO); + + @Update( + "UPDATE " + + TABLE_NAME + + " SET deleted_at = UNIX_TIMESTAMP()" + + " WHERE table_id = #{tableId} AND deleted_at = 0") + Integer softDeleteTableMetasByTableId(@Param("tableId") Long tableId); + + @Update( + "UPDATE " + + TABLE_NAME + + " SET deleted_at = UNIX_TIMESTAMP()" + + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0") + Integer softDeleteTableMetasByMetalakeId(@Param("metalakeId") Long metalakeId); + + @Update( + "UPDATE " + + TABLE_NAME + + " SET deleted_at = UNIX_TIMESTAMP()" + + " WHERE catalog_id = #{catalogId} AND deleted_at = 0") + Integer softDeleteTableMetasByCatalogId(@Param("catalogId") Long catalogId); + + @Update( + "UPDATE " + + TABLE_NAME + + " SET deleted_at = UNIX_TIMESTAMP()" + + " WHERE schema_id = #{schemaId} AND deleted_at = 0") + Integer softDeleteTableMetasBySchemaId(@Param("schemaId") Long schemaId); +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/po/TablePO.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/po/TablePO.java new file mode 100644 index 00000000000..a35cfffdf0b --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/po/TablePO.java @@ -0,0 +1,160 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.storage.relational.po; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; + +public class TablePO { + private Long tableId; + private String tableName; + private Long metalakeId; + private Long catalogId; + private Long schemaId; + private String auditInfo; + private Long currentVersion; + private Long lastVersion; + private Long deletedAt; + + public Long getTableId() { + return tableId; + } + + public String getTableName() { + return tableName; + } + + public Long getMetalakeId() { + return metalakeId; + } + + public Long getCatalogId() { + return catalogId; + } + + public Long getSchemaId() { + return schemaId; + } + + public String getAuditInfo() { + return auditInfo; + } + + public Long getCurrentVersion() { + return currentVersion; + } + + public Long getLastVersion() { + return lastVersion; + } + + public Long getDeletedAt() { + return deletedAt; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof TablePO)) { + return false; + } + TablePO tablePO = (TablePO) o; + return Objects.equal(getTableId(), tablePO.getTableId()) + && Objects.equal(getTableName(), tablePO.getTableName()) + && Objects.equal(getMetalakeId(), tablePO.getMetalakeId()) + && Objects.equal(getCatalogId(), tablePO.getCatalogId()) + && Objects.equal(getSchemaId(), tablePO.getSchemaId()) + && Objects.equal(getAuditInfo(), tablePO.getAuditInfo()) + && Objects.equal(getCurrentVersion(), tablePO.getCurrentVersion()) + && Objects.equal(getLastVersion(), tablePO.getLastVersion()) + && Objects.equal(getDeletedAt(), tablePO.getDeletedAt()); + } + + @Override + public int hashCode() { + return Objects.hashCode( + getTableId(), + getTableName(), + getMetalakeId(), + getCatalogId(), + getSchemaId(), + getAuditInfo(), + getCurrentVersion(), + getLastVersion(), + getDeletedAt()); + } + + public static class Builder { + private final TablePO tablePO; + + public Builder() { + tablePO = new TablePO(); + } + + public Builder withTableId(Long tableId) { + tablePO.tableId = tableId; + return this; + } + + public Builder withTableName(String tableName) { + tablePO.tableName = tableName; + return this; + } + + public Builder withMetalakeId(Long metalakeId) { + tablePO.metalakeId = metalakeId; + return this; + } + + public Builder withCatalogId(Long catalogId) { + tablePO.catalogId = catalogId; + return this; + } + + public Builder withSchemaId(Long schemaId) { + tablePO.schemaId = schemaId; + return this; + } + + public Builder withAuditInfo(String auditInfo) { + tablePO.auditInfo = auditInfo; + return this; + } + + public Builder withCurrentVersion(Long currentVersion) { + tablePO.currentVersion = currentVersion; + return this; + } + + public Builder withLastVersion(Long lastVersion) { + tablePO.lastVersion = lastVersion; + return this; + } + + public Builder withDeletedAt(Long deletedAt) { + tablePO.deletedAt = deletedAt; + return this; + } + + private void validate() { + Preconditions.checkArgument(tablePO.tableId != null, "Table id is required"); + Preconditions.checkArgument(tablePO.tableName != null, "Table name is required"); + Preconditions.checkArgument(tablePO.metalakeId != null, "Metalake id is required"); + Preconditions.checkArgument(tablePO.catalogId != null, "Catalog id is required"); + Preconditions.checkArgument(tablePO.schemaId != null, "Schema id is required"); + Preconditions.checkArgument(tablePO.auditInfo != null, "Audit info is required"); + Preconditions.checkArgument(tablePO.currentVersion != null, "Current version is required"); + Preconditions.checkArgument(tablePO.lastVersion != null, "Last version is required"); + Preconditions.checkArgument(tablePO.deletedAt != null, "Deleted at is required"); + } + + public TablePO build() { + validate(); + return tablePO; + } + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/CatalogMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/CatalogMetaService.java index 4590545319d..a6256e9d16e 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/CatalogMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/CatalogMetaService.java @@ -14,6 +14,7 @@ import com.datastrato.gravitino.meta.SchemaEntity; import com.datastrato.gravitino.storage.relational.mapper.CatalogMetaMapper; import com.datastrato.gravitino.storage.relational.mapper.SchemaMetaMapper; +import com.datastrato.gravitino.storage.relational.mapper.TableMetaMapper; import com.datastrato.gravitino.storage.relational.po.CatalogPO; import com.datastrato.gravitino.storage.relational.utils.ExceptionUtils; import com.datastrato.gravitino.storage.relational.utils.POConverters; @@ -174,6 +175,10 @@ public boolean deleteCatalog(NameIdentifier identifier, boolean cascade) { SessionUtils.doWithoutCommit( SchemaMetaMapper.class, mapper -> mapper.softDeleteSchemaMetasByCatalogId(catalogId)), + () -> + SessionUtils.doWithoutCommit( + TableMetaMapper.class, + mapper -> mapper.softDeleteTableMetasByCatalogId(catalogId)), () -> { // TODO We will cascade delete the metadata of sub-resources under the catalog }); diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java index d4ae1817368..120c364bae7 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java @@ -16,6 +16,7 @@ import com.datastrato.gravitino.storage.relational.mapper.CatalogMetaMapper; import com.datastrato.gravitino.storage.relational.mapper.MetalakeMetaMapper; import com.datastrato.gravitino.storage.relational.mapper.SchemaMetaMapper; +import com.datastrato.gravitino.storage.relational.mapper.TableMetaMapper; import com.datastrato.gravitino.storage.relational.po.MetalakePO; import com.datastrato.gravitino.storage.relational.utils.ExceptionUtils; import com.datastrato.gravitino.storage.relational.utils.POConverters; @@ -151,6 +152,10 @@ public boolean deleteMetalake(NameIdentifier ident, boolean cascade) { SessionUtils.doWithoutCommit( SchemaMetaMapper.class, mapper -> mapper.softDeleteSchemaMetasByMetalakeId(metalakeId)), + () -> + SessionUtils.doWithoutCommit( + TableMetaMapper.class, + mapper -> mapper.softDeleteTableMetasByMetalakeId(metalakeId)), () -> { // TODO We will cascade delete the metadata of sub-resources under the metalake }); diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/SchemaMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/SchemaMetaService.java index 2d71b405aca..374106a4233 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/SchemaMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/SchemaMetaService.java @@ -9,8 +9,11 @@ import com.datastrato.gravitino.NameIdentifier; import com.datastrato.gravitino.Namespace; import com.datastrato.gravitino.exceptions.NoSuchEntityException; +import com.datastrato.gravitino.exceptions.NonEmptyEntityException; import com.datastrato.gravitino.meta.SchemaEntity; +import com.datastrato.gravitino.meta.TableEntity; import com.datastrato.gravitino.storage.relational.mapper.SchemaMetaMapper; +import com.datastrato.gravitino.storage.relational.mapper.TableMetaMapper; import com.datastrato.gravitino.storage.relational.po.SchemaPO; import com.datastrato.gravitino.storage.relational.utils.ExceptionUtils; import com.datastrato.gravitino.storage.relational.utils.POConverters; @@ -162,12 +165,25 @@ public boolean deleteSchema(NameIdentifier identifier, boolean cascade) { SessionUtils.doWithoutCommit( SchemaMetaMapper.class, mapper -> mapper.softDeleteSchemaMetasBySchemaId(schemaId)), + () -> + SessionUtils.doWithoutCommit( + TableMetaMapper.class, + mapper -> mapper.softDeleteTableMetasBySchemaId(schemaId)), () -> { // TODO We will cascade delete the metadata of sub-resources under the schema }); } else { - // TODO Check whether the sub-resources are empty. If the sub-resources are not empty, - // deletion is not allowed. + List tableEntities = + TableMetaService.getInstance() + .listTablesByNamespace( + Namespace.ofTable( + identifier.namespace().level(0), + identifier.namespace().level(1), + schemaName)); + if (!tableEntities.isEmpty()) { + throw new NonEmptyEntityException( + "Entity %s has sub-entities, you should remove sub-entities first", identifier); + } SessionUtils.doWithCommit( SchemaMetaMapper.class, mapper -> mapper.softDeleteSchemaMetasBySchemaId(schemaId)); } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TableMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TableMetaService.java new file mode 100644 index 00000000000..c781c11d3b3 --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TableMetaService.java @@ -0,0 +1,190 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.storage.relational.service; + +import com.datastrato.gravitino.Entity; +import com.datastrato.gravitino.HasIdentifier; +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.Namespace; +import com.datastrato.gravitino.exceptions.NoSuchEntityException; +import com.datastrato.gravitino.meta.TableEntity; +import com.datastrato.gravitino.storage.relational.mapper.TableMetaMapper; +import com.datastrato.gravitino.storage.relational.po.TablePO; +import com.datastrato.gravitino.storage.relational.utils.ExceptionUtils; +import com.datastrato.gravitino.storage.relational.utils.POConverters; +import com.datastrato.gravitino.storage.relational.utils.SessionUtils; +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.util.List; +import java.util.Objects; +import java.util.function.Function; + +/** The service class for table metadata. It provides the basic database operations for table. */ +public class TableMetaService { + private static final TableMetaService INSTANCE = new TableMetaService(); + + public static TableMetaService getInstance() { + return INSTANCE; + } + + private TableMetaService() {} + + public TablePO getTablePOBySchemaIdAndName(Long schemaId, String tableName) { + TablePO tablePO = + SessionUtils.getWithoutCommit( + TableMetaMapper.class, + mapper -> mapper.selectTableMetaBySchemaIdAndName(schemaId, tableName)); + + if (tablePO == null) { + throw new NoSuchEntityException( + NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, + Entity.EntityType.TABLE.name().toLowerCase(), + tableName); + } + return tablePO; + } + + public Long getTableIdBySchemaIdAndName(Long schemaId, String tableName) { + Long tableId = + SessionUtils.getWithoutCommit( + TableMetaMapper.class, + mapper -> mapper.selectTableIdBySchemaIdAndName(schemaId, tableName)); + + if (tableId == null) { + throw new NoSuchEntityException( + NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, + Entity.EntityType.TABLE.name().toLowerCase(), + tableName); + } + return tableId; + } + + public TableEntity getTableByIdentifier(NameIdentifier identifier) { + NameIdentifier.checkTable(identifier); + + Long schemaId = + CommonMetaService.getInstance().getParentEntityIdByNamespace(identifier.namespace()); + + TablePO tablePO = getTablePOBySchemaIdAndName(schemaId, identifier.name()); + + return POConverters.fromTablePO(tablePO, identifier.namespace()); + } + + public List listTablesByNamespace(Namespace namespace) { + Namespace.checkTable(namespace); + + Long schemaId = CommonMetaService.getInstance().getParentEntityIdByNamespace(namespace); + + List tablePOs = + SessionUtils.getWithoutCommit( + TableMetaMapper.class, mapper -> mapper.listTablePOsBySchemaId(schemaId)); + + return POConverters.fromTablePOs(tablePOs, namespace); + } + + public void insertTable(TableEntity tableEntity, boolean overwrite) { + try { + NameIdentifier.checkTable(tableEntity.nameIdentifier()); + + TablePO.Builder builder = new TablePO.Builder(); + fillTablePOBuilderParentEntityId(builder, tableEntity.namespace()); + + SessionUtils.doWithCommit( + TableMetaMapper.class, + mapper -> { + TablePO po = POConverters.initializeTablePOWithVersion(tableEntity, builder); + if (overwrite) { + mapper.insertTableMetaOnDuplicateKeyUpdate(po); + } else { + mapper.insertTableMeta(po); + } + }); + } catch (RuntimeException re) { + ExceptionUtils.checkSQLConstraintException( + re, Entity.EntityType.TABLE, tableEntity.nameIdentifier().toString()); + throw re; + } + } + + public TableEntity updateTable( + NameIdentifier identifier, Function updater) throws IOException { + NameIdentifier.checkTable(identifier); + + String tableName = identifier.name(); + + Long schemaId = + CommonMetaService.getInstance().getParentEntityIdByNamespace(identifier.namespace()); + + TablePO oldTablePO = getTablePOBySchemaIdAndName(schemaId, tableName); + TableEntity oldTableEntity = POConverters.fromTablePO(oldTablePO, identifier.namespace()); + TableEntity newEntity = (TableEntity) updater.apply((E) oldTableEntity); + Preconditions.checkArgument( + Objects.equals(oldTableEntity.id(), newEntity.id()), + "The updated table entity id: %s should be same with the table entity id before: %s", + newEntity.id(), + oldTableEntity.id()); + + Integer updateResult; + try { + updateResult = + SessionUtils.doWithCommitAndFetchResult( + TableMetaMapper.class, + mapper -> + mapper.updateTableMeta( + POConverters.updateTablePOWithVersion(oldTablePO, newEntity), oldTablePO)); + } catch (RuntimeException re) { + ExceptionUtils.checkSQLConstraintException( + re, Entity.EntityType.TABLE, newEntity.nameIdentifier().toString()); + throw re; + } + + if (updateResult > 0) { + return newEntity; + } else { + throw new IOException("Failed to update the entity: " + identifier); + } + } + + public boolean deleteTable(NameIdentifier identifier) { + NameIdentifier.checkTable(identifier); + + String tableName = identifier.name(); + + Long schemaId = + CommonMetaService.getInstance().getParentEntityIdByNamespace(identifier.namespace()); + + Long tableId = getTableIdBySchemaIdAndName(schemaId, tableName); + + SessionUtils.doWithCommit( + TableMetaMapper.class, mapper -> mapper.softDeleteTableMetasByTableId(tableId)); + + return true; + } + + private void fillTablePOBuilderParentEntityId(TablePO.Builder builder, Namespace namespace) { + Namespace.checkTable(namespace); + Long parentEntityId = null; + for (int level = 0; level < namespace.levels().length; level++) { + String name = namespace.level(level); + switch (level) { + case 0: + parentEntityId = MetalakeMetaService.getInstance().getMetalakeIdByName(name); + builder.withMetalakeId(parentEntityId); + continue; + case 1: + parentEntityId = + CatalogMetaService.getInstance() + .getCatalogIdByMetalakeIdAndName(parentEntityId, name); + builder.withCatalogId(parentEntityId); + continue; + case 2: + parentEntityId = + SchemaMetaService.getInstance().getSchemaIdByCatalogIdAndName(parentEntityId, name); + builder.withSchemaId(parentEntityId); + break; + } + } + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/session/SqlSessionFactoryHelper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/session/SqlSessionFactoryHelper.java index 618e1a65860..a73daf42e31 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/session/SqlSessionFactoryHelper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/session/SqlSessionFactoryHelper.java @@ -10,6 +10,7 @@ import com.datastrato.gravitino.storage.relational.mapper.CatalogMetaMapper; import com.datastrato.gravitino.storage.relational.mapper.MetalakeMetaMapper; import com.datastrato.gravitino.storage.relational.mapper.SchemaMetaMapper; +import com.datastrato.gravitino.storage.relational.mapper.TableMetaMapper; import com.google.common.base.Preconditions; import java.sql.SQLException; import java.time.Duration; @@ -78,6 +79,7 @@ public void init(Config config) { configuration.addMapper(MetalakeMetaMapper.class); configuration.addMapper(CatalogMetaMapper.class); configuration.addMapper(SchemaMetaMapper.class); + configuration.addMapper(TableMetaMapper.class); // Create the SqlSessionFactory object, it is a singleton object if (sqlSessionFactory == null) { diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/POConverters.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/POConverters.java index 8f9ed18dfd0..baf489523ae 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/POConverters.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/POConverters.java @@ -13,9 +13,11 @@ import com.datastrato.gravitino.meta.CatalogEntity; import com.datastrato.gravitino.meta.SchemaEntity; import com.datastrato.gravitino.meta.SchemaVersion; +import com.datastrato.gravitino.meta.TableEntity; import com.datastrato.gravitino.storage.relational.po.CatalogPO; import com.datastrato.gravitino.storage.relational.po.MetalakePO; import com.datastrato.gravitino.storage.relational.po.SchemaPO; +import com.datastrato.gravitino.storage.relational.po.TablePO; import com.fasterxml.jackson.core.JsonProcessingException; import java.util.List; import java.util.Map; @@ -23,6 +25,8 @@ /** POConverters is a utility class to convert PO to Base and vice versa. */ public class POConverters { + private static final long INIT_VERSION = 1L; + private static final long DEFAULT_DELETED_AT = 0L; private POConverters() {} @@ -42,9 +46,9 @@ public static MetalakePO initializeMetalakePOWithVersion(BaseMetalake baseMetala .withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(baseMetalake.auditInfo())) .withSchemaVersion( JsonUtils.anyFieldMapper().writeValueAsString(baseMetalake.getVersion())) - .withCurrentVersion(1L) - .withLastVersion(1L) - .withDeletedAt(0L) + .withCurrentVersion(INIT_VERSION) + .withLastVersion(INIT_VERSION) + .withDeletedAt(DEFAULT_DELETED_AT) .build(); } catch (JsonProcessingException e) { throw new RuntimeException("Failed to serialize json object:", e); @@ -74,7 +78,7 @@ public static MetalakePO updateMetalakePOWithVersion( JsonUtils.anyFieldMapper().writeValueAsString(newMetalake.getVersion())) .withCurrentVersion(nextVersion) .withLastVersion(nextVersion) - .withDeletedAt(0L) + .withDeletedAt(DEFAULT_DELETED_AT) .build(); } catch (JsonProcessingException e) { throw new RuntimeException("Failed to serialize json object:", e); @@ -135,9 +139,9 @@ public static CatalogPO initializeCatalogPOWithVersion( .withProperties( JsonUtils.anyFieldMapper().writeValueAsString(catalogEntity.getProperties())) .withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(catalogEntity.auditInfo())) - .withCurrentVersion(1L) - .withLastVersion(1L) - .withDeletedAt(0L) + .withCurrentVersion(INIT_VERSION) + .withLastVersion(INIT_VERSION) + .withDeletedAt(DEFAULT_DELETED_AT) .build(); } catch (JsonProcessingException e) { throw new RuntimeException("Failed to serialize json object:", e); @@ -168,7 +172,7 @@ public static CatalogPO updateCatalogPOWithVersion( .withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(newCatalog.auditInfo())) .withCurrentVersion(nextVersion) .withLastVersion(nextVersion) - .withDeletedAt(0L) + .withDeletedAt(DEFAULT_DELETED_AT) .build(); } catch (JsonProcessingException e) { throw new RuntimeException("Failed to serialize json object:", e); @@ -230,9 +234,9 @@ public static SchemaPO initializeSchemaPOWithVersion( .withSchemaComment(schemaEntity.comment()) .withProperties(JsonUtils.anyFieldMapper().writeValueAsString(schemaEntity.properties())) .withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(schemaEntity.auditInfo())) - .withCurrentVersion(1L) - .withLastVersion(1L) - .withDeletedAt(0L) + .withCurrentVersion(INIT_VERSION) + .withLastVersion(INIT_VERSION) + .withDeletedAt(DEFAULT_DELETED_AT) .build(); } catch (JsonProcessingException e) { throw new RuntimeException("Failed to serialize json object:", e); @@ -261,7 +265,7 @@ public static SchemaPO updateSchemaPOWithVersion(SchemaPO oldSchemaPO, SchemaEnt .withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(newSchema.auditInfo())) .withCurrentVersion(nextVersion) .withLastVersion(nextVersion) - .withDeletedAt(0L) + .withDeletedAt(DEFAULT_DELETED_AT) .build(); } catch (JsonProcessingException e) { throw new RuntimeException("Failed to serialize json object:", e); @@ -303,4 +307,88 @@ public static List fromSchemaPOs(List schemaPOs, Namespa .map(schemaPO -> POConverters.fromSchemaPO(schemaPO, namespace)) .collect(Collectors.toList()); } + + /** + * Initialize TablePO + * + * @param tableEntity TableEntity object + * @return TablePO object with version initialized + */ + public static TablePO initializeTablePOWithVersion( + TableEntity tableEntity, TablePO.Builder builder) { + try { + return builder + .withTableId(tableEntity.id()) + .withTableName(tableEntity.name()) + .withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(tableEntity.auditInfo())) + .withCurrentVersion(INIT_VERSION) + .withLastVersion(INIT_VERSION) + .withDeletedAt(DEFAULT_DELETED_AT) + .build(); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to serialize json object:", e); + } + } + + /** + * Update TablePO version + * + * @param oldTablePO the old TablePO object + * @param newTable the new TableEntity object + * @return TablePO object with updated version + */ + public static TablePO updateTablePOWithVersion(TablePO oldTablePO, TableEntity newTable) { + Long lastVersion = oldTablePO.getLastVersion(); + // Will set the version to the last version + 1 when having some fields need be multiple version + Long nextVersion = lastVersion; + try { + return new TablePO.Builder() + .withTableId(oldTablePO.getTableId()) + .withTableName(newTable.name()) + .withMetalakeId(oldTablePO.getMetalakeId()) + .withCatalogId(oldTablePO.getCatalogId()) + .withSchemaId(oldTablePO.getSchemaId()) + .withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(newTable.auditInfo())) + .withCurrentVersion(nextVersion) + .withLastVersion(nextVersion) + .withDeletedAt(DEFAULT_DELETED_AT) + .build(); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to serialize json object:", e); + } + } + + /** + * Convert {@link TablePO} to {@link TableEntity} + * + * @param tablePO TablePO object to be converted + * @param namespace Namespace object to be associated with the table + * @return TableEntity object from SchemaPO object + */ + public static TableEntity fromTablePO(TablePO tablePO, Namespace namespace) { + try { + return new TableEntity.Builder() + .withId(tablePO.getTableId()) + .withName(tablePO.getTableName()) + .withNamespace(namespace) + .withAuditInfo( + JsonUtils.anyFieldMapper().readValue(tablePO.getAuditInfo(), AuditInfo.class)) + .build(); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to deserialize json object:", e); + } + } + + /** + * Convert list of {@link TablePO} to list of {@link TableEntity} + * + * @param tablePOs list of TablePO objects + * @param namespace Namespace object to be associated with the table + * @return list of TableEntity objects from list of SchemaPO objects + */ + public static List fromTablePOs(List tablePOs, Namespace namespace) { + return tablePOs.stream() + .map(tablePO -> POConverters.fromTablePO(tablePO, namespace)) + .collect(Collectors.toList()); + } } diff --git a/core/src/main/resources/mysql/mysql_init.sql b/core/src/main/resources/mysql/mysql_init.sql index 78811a4cb71..0505a93a92f 100644 --- a/core/src/main/resources/mysql/mysql_init.sql +++ b/core/src/main/resources/mysql/mysql_init.sql @@ -47,4 +47,20 @@ CREATE TABLE IF NOT EXISTS `schema_meta` ( PRIMARY KEY (`schema_id`), UNIQUE KEY `uk_cid_sn_del` (`catalog_id`, `schema_name`, `deleted_at`), KEY `idx_mid` (`metalake_id`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'schema metadata'; \ No newline at end of file +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'schema metadata'; + +CREATE TABLE IF NOT EXISTS `table_meta` ( + `table_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'table id', + `table_name` VARCHAR(128) NOT NULL COMMENT 'table name', + `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id', + `catalog_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'catalog id', + `schema_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'schema id', + `audit_info` MEDIUMTEXT NOT NULL COMMENT 'table audit info', + `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'table current version', + `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'table last version', + `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'table deleted at', + PRIMARY KEY (`table_id`), + UNIQUE KEY `uk_sid_tn_del` (`schema_id`, `table_name`, `deleted_at`), + KEY `idx_mid` (`metalake_id`), + KEY `idx_cid` (`catalog_id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'table metadata'; \ No newline at end of file diff --git a/core/src/test/java/com/datastrato/gravitino/storage/relational/TestRelationalEntityStore.java b/core/src/test/java/com/datastrato/gravitino/storage/relational/TestRelationalEntityStore.java index f0be65117eb..18e17084479 100644 --- a/core/src/test/java/com/datastrato/gravitino/storage/relational/TestRelationalEntityStore.java +++ b/core/src/test/java/com/datastrato/gravitino/storage/relational/TestRelationalEntityStore.java @@ -33,6 +33,7 @@ import com.datastrato.gravitino.meta.CatalogEntity; import com.datastrato.gravitino.meta.SchemaEntity; import com.datastrato.gravitino.meta.SchemaVersion; +import com.datastrato.gravitino.meta.TableEntity; import com.datastrato.gravitino.storage.relational.session.SqlSessionFactoryHelper; import java.io.BufferedReader; import java.io.File; @@ -150,16 +151,16 @@ public void testMetalakePutAndGet() throws IOException { EntityAlreadyExistsException.class, () -> entityStore.put(duplicateMetalake, false)); // overwrite true - BaseMetalake overittenMetalake = createMetalake(1L, "test_metalake2", "this is test2"); - entityStore.put(overittenMetalake, true); + BaseMetalake overwrittenMetalake = createMetalake(1L, "test_metalake2", "this is test2"); + entityStore.put(overwrittenMetalake, true); BaseMetalake insertedMetalake1 = entityStore.get( - overittenMetalake.nameIdentifier(), Entity.EntityType.METALAKE, BaseMetalake.class); + overwrittenMetalake.nameIdentifier(), Entity.EntityType.METALAKE, BaseMetalake.class); assertEquals( 1, entityStore.list(Namespace.empty(), BaseMetalake.class, Entity.EntityType.METALAKE).size()); - assertEquals(overittenMetalake.name(), insertedMetalake1.name()); - assertEquals(overittenMetalake.comment(), insertedMetalake1.comment()); + assertEquals(overwrittenMetalake.name(), insertedMetalake1.name()); + assertEquals(overwrittenMetalake.comment(), insertedMetalake1.comment()); } @Test @@ -185,20 +186,20 @@ public void testCatalogPutAndGet() throws IOException { EntityAlreadyExistsException.class, () -> entityStore.put(duplicateCatalog, false)); // overwrite true - CatalogEntity overittenCatalog = + CatalogEntity overwrittenCatalog = createCatalog( 1L, "test_catalog1", Namespace.ofCatalog(metalake.name()), "this is catalog test1"); - entityStore.put(overittenCatalog, true); + entityStore.put(overwrittenCatalog, true); CatalogEntity insertedCatalog1 = entityStore.get( - overittenCatalog.nameIdentifier(), Entity.EntityType.CATALOG, CatalogEntity.class); + overwrittenCatalog.nameIdentifier(), Entity.EntityType.CATALOG, CatalogEntity.class); assertEquals( 1, entityStore - .list(overittenCatalog.namespace(), CatalogEntity.class, Entity.EntityType.CATALOG) + .list(overwrittenCatalog.namespace(), CatalogEntity.class, Entity.EntityType.CATALOG) .size()); - assertEquals(overittenCatalog.name(), insertedCatalog1.name()); - assertEquals(overittenCatalog.getComment(), insertedCatalog1.getComment()); + assertEquals(overwrittenCatalog.name(), insertedCatalog1.name()); + assertEquals(overwrittenCatalog.getComment(), insertedCatalog1.getComment()); } @Test @@ -207,7 +208,7 @@ public void testSchemaPutAndGet() throws IOException { entityStore.put(metalake, false); CatalogEntity catalog = - createCatalog(1L, "test_metalake", Namespace.ofCatalog(metalake.name()), "this is test"); + createCatalog(1L, "test_catalog", Namespace.ofCatalog(metalake.name()), "this is test"); entityStore.put(catalog, false); SchemaEntity schema = @@ -233,24 +234,75 @@ public void testSchemaPutAndGet() throws IOException { assertThrows(EntityAlreadyExistsException.class, () -> entityStore.put(duplicateSchema, false)); // overwrite true - SchemaEntity overittenSchema = + SchemaEntity overwrittenSchema = createSchema( 1L, "test_schema1", Namespace.ofSchema(metalake.name(), catalog.name()), "this is schema test1"); - entityStore.put(overittenSchema, true); + entityStore.put(overwrittenSchema, true); SchemaEntity insertedSchema1 = entityStore.get( - overittenSchema.nameIdentifier(), Entity.EntityType.SCHEMA, SchemaEntity.class); + overwrittenSchema.nameIdentifier(), Entity.EntityType.SCHEMA, SchemaEntity.class); assertEquals( 1, entityStore .list(insertedSchema1.namespace(), SchemaEntity.class, Entity.EntityType.SCHEMA) .size()); - assertEquals(overittenSchema.name(), insertedSchema1.name()); - assertEquals(overittenSchema.comment(), insertedSchema1.comment()); + assertEquals(overwrittenSchema.name(), insertedSchema1.name()); + assertEquals(overwrittenSchema.comment(), insertedSchema1.comment()); + } + + @Test + public void testTablePutAndGet() throws IOException { + BaseMetalake metalake = createMetalake(1L, "test_metalake", "this is test"); + entityStore.put(metalake, false); + + CatalogEntity catalog = + createCatalog(1L, "test_catalog", Namespace.ofCatalog(metalake.name()), "this is test"); + entityStore.put(catalog, false); + + SchemaEntity schema = + createSchema( + 1L, + "test_schema", + Namespace.ofSchema(metalake.name(), catalog.name()), + "this is schema test"); + entityStore.put(schema, false); + + TableEntity table = + createTable( + 1L, "test_table", Namespace.ofTable(metalake.name(), catalog.name(), schema.name())); + entityStore.put(table, false); + + TableEntity insertedTable = + entityStore.get(table.nameIdentifier(), Entity.EntityType.TABLE, TableEntity.class); + assertNotNull(insertedTable); + assertTrue(checkTableEquals(table, insertedTable)); + + // overwrite false + TableEntity duplicateTable = + createTable( + 1L, "test_table", Namespace.ofTable(metalake.name(), catalog.name(), schema.name())); + assertThrows(EntityAlreadyExistsException.class, () -> entityStore.put(duplicateTable, false)); + + // overwrite true + TableEntity overwrittenTable = + createTable( + 1L, "test_table1", Namespace.ofTable(metalake.name(), catalog.name(), schema.name())); + entityStore.put(overwrittenTable, true); + TableEntity insertedTable1 = + entityStore.get( + overwrittenTable.nameIdentifier(), Entity.EntityType.TABLE, TableEntity.class); + + assertEquals( + 1, + entityStore + .list(insertedTable1.namespace(), TableEntity.class, Entity.EntityType.TABLE) + .size()); + assertEquals(overwrittenTable.name(), insertedTable1.name()); + assertEquals(overwrittenTable.auditInfo().creator(), insertedTable1.auditInfo().creator()); } @Test @@ -342,6 +394,44 @@ public void testSchemaPutAndList() throws IOException { assertTrue(checkSchemaEquals(schema2, schemaEntities.get(1))); } + @Test + public void testTablePutAndList() throws IOException { + BaseMetalake metalake = createMetalake(1L, "test_metalake", "this is test 1"); + entityStore.put(metalake, false); + + CatalogEntity catalog = + createCatalog(1L, "test_catalog", Namespace.ofCatalog(metalake.name()), "this is test"); + entityStore.put(catalog, false); + + SchemaEntity schema = + createSchema( + 1L, "test_schema", Namespace.ofSchema(metalake.name(), catalog.name()), "this is test"); + entityStore.put(schema, false); + + TableEntity table1 = + createTable( + 1L, "test_table1", Namespace.ofTable(metalake.name(), catalog.name(), schema.name())); + TableEntity table2 = + createTable( + 2L, "test_schema2", Namespace.ofTable(metalake.name(), catalog.name(), schema.name())); + + List beforeTableList = + entityStore.list(table1.namespace(), TableEntity.class, Entity.EntityType.TABLE); + assertNotNull(beforeTableList); + assertEquals(0, beforeTableList.size()); + + entityStore.put(table1, false); + entityStore.put(table2, false); + List tableEntities = + entityStore.list(table1.namespace(), TableEntity.class, Entity.EntityType.TABLE).stream() + .sorted(Comparator.comparing(TableEntity::id)) + .collect(Collectors.toList()); + assertNotNull(tableEntities); + assertEquals(2, tableEntities.size()); + assertTrue(checkTableEquals(table1, tableEntities.get(0))); + assertTrue(checkTableEquals(table2, tableEntities.get(1))); + } + @Test public void testMetalakePutAndDelete() throws IOException, InterruptedException { BaseMetalake metalake = createMetalake(1L, "test_metalake", "this is test"); @@ -363,6 +453,7 @@ public void testMetalakePutAndDelete() throws IOException, InterruptedException // test cascade delete BaseMetalake metalake1 = createMetalake(2L, "test_metalake1", "this is test"); entityStore.put(metalake1, false); + CatalogEntity subCatalog = createCatalog( 1L, "test_catalog", Namespace.ofCatalog(metalake1.name()), "test cascade deleted"); @@ -376,6 +467,13 @@ public void testMetalakePutAndDelete() throws IOException, InterruptedException "test cascade deleted"); entityStore.put(subSchema, false); + TableEntity subTable = + createTable( + 1L, + "test_table", + Namespace.ofTable(metalake1.name(), subCatalog.name(), subSchema.name())); + entityStore.put(subTable, false); + // cascade is false assertThrows( NonEmptyEntityException.class, @@ -386,6 +484,7 @@ public void testMetalakePutAndDelete() throws IOException, InterruptedException assertFalse(entityStore.exists(metalake1.nameIdentifier(), Entity.EntityType.METALAKE)); assertFalse(entityStore.exists(subCatalog.nameIdentifier(), Entity.EntityType.CATALOG)); assertFalse(entityStore.exists(subSchema.nameIdentifier(), Entity.EntityType.SCHEMA)); + assertFalse(entityStore.exists(subTable.nameIdentifier(), Entity.EntityType.TABLE)); } @Test @@ -424,6 +523,13 @@ public void testCatalogPutAndDelete() throws IOException, InterruptedException { "test cascade deleted"); entityStore.put(subSchema, false); + TableEntity subTable = + createTable( + 1L, + "test_table", + Namespace.ofTable(metalake.name(), catalog1.name(), subSchema.name())); + entityStore.put(subTable, false); + // cascade is false assertThrows( NonEmptyEntityException.class, @@ -433,6 +539,7 @@ public void testCatalogPutAndDelete() throws IOException, InterruptedException { entityStore.delete(catalog1.nameIdentifier(), Entity.EntityType.CATALOG, true); assertFalse(entityStore.exists(catalog1.nameIdentifier(), Entity.EntityType.CATALOG)); assertFalse(entityStore.exists(subSchema.nameIdentifier(), Entity.EntityType.SCHEMA)); + assertFalse(entityStore.exists(subTable.nameIdentifier(), Entity.EntityType.TABLE)); } @Test @@ -470,9 +577,61 @@ public void testSchemaPutAndDelete() throws IOException, InterruptedException { "test cascade deleted"); entityStore.put(schema1, false); + TableEntity subTable = + createTable( + 1L, "test_table", Namespace.ofTable(metalake.name(), catalog.name(), schema1.name())); + entityStore.put(subTable, false); + + // cascade is false + assertThrows( + NonEmptyEntityException.class, + () -> entityStore.delete(schema1.nameIdentifier(), Entity.EntityType.SCHEMA, false)); + // cascade is true entityStore.delete(schema1.nameIdentifier(), Entity.EntityType.SCHEMA, true); assertFalse(entityStore.exists(schema1.nameIdentifier(), Entity.EntityType.SCHEMA)); + assertFalse(entityStore.exists(subTable.nameIdentifier(), Entity.EntityType.TABLE)); + } + + @Test + public void testTablePutAndDelete() throws IOException, InterruptedException { + BaseMetalake metalake = createMetalake(3L, "test_metalake", "this is test"); + entityStore.put(metalake, false); + + CatalogEntity catalog = + createCatalog(2L, "test_catalog", Namespace.ofCatalog(metalake.name()), "this is test"); + entityStore.put(catalog, false); + + SchemaEntity schema = + createSchema( + 2L, "test_schema", Namespace.ofSchema(metalake.name(), catalog.name()), "this is test"); + entityStore.put(schema, false); + + TableEntity table = + createTable( + 2L, "test_table", Namespace.ofTable(metalake.name(), catalog.name(), schema.name())); + entityStore.put(table, false); + + assertNotNull( + entityStore.get(table.nameIdentifier(), Entity.EntityType.TABLE, TableEntity.class)); + entityStore.delete(table.nameIdentifier(), Entity.EntityType.TABLE, false); + + assertThrows( + NoSuchEntityException.class, + () -> entityStore.get(table.nameIdentifier(), Entity.EntityType.TABLE, TableEntity.class)); + + // sleep 1s to make delete_at seconds differently + Thread.sleep(1000); + + // test cascade delete + TableEntity table1 = + createTable( + 3L, "test_table1", Namespace.ofTable(metalake.name(), catalog.name(), schema.name())); + entityStore.put(table1, false); + + // cascade is true + entityStore.delete(table1.nameIdentifier(), Entity.EntityType.TABLE, true); + assertFalse(entityStore.exists(table1.nameIdentifier(), Entity.EntityType.TABLE)); } @Test @@ -735,6 +894,98 @@ public void testSchemaPutAndUpdate() throws IOException { })); } + @Test + public void testTablePutAndUpdate() throws IOException { + BaseMetalake metalake = createMetalake(1L, "test_metalake", "this is test"); + entityStore.put(metalake, false); + + CatalogEntity catalog = + createCatalog( + 1L, "test_catalog", Namespace.ofCatalog(metalake.name()), "this is catalog test"); + entityStore.put(catalog, false); + + SchemaEntity schema = + createSchema( + 1L, + "test_schema", + Namespace.ofSchema(metalake.name(), catalog.name()), + "this is schema test"); + entityStore.put(schema, false); + + TableEntity table = + createTable( + 1L, "test_table", Namespace.ofTable(metalake.name(), catalog.name(), schema.name())); + entityStore.put(table, false); + + assertThrows( + RuntimeException.class, + () -> + entityStore.update( + table.nameIdentifier(), + TableEntity.class, + Entity.EntityType.TABLE, + s -> { + TableEntity.Builder builder = + new TableEntity.Builder() + // Change the id, which is not allowed + .withId(2L) + .withName("test_table2") + .withNamespace( + Namespace.ofTable(metalake.name(), catalog.name(), schema.name())) + .withAuditInfo(s.auditInfo()); + return builder.build(); + })); + + AuditInfo changedAuditInfo = + AuditInfo.builder().withCreator("changed_creator").withCreateTime(Instant.now()).build(); + TableEntity updatedTable = + entityStore.update( + table.nameIdentifier(), + TableEntity.class, + Entity.EntityType.TABLE, + s -> { + TableEntity.Builder builder = + new TableEntity.Builder() + .withId(s.id()) + .withName("test_table2") + .withNamespace( + Namespace.ofTable(metalake.name(), catalog.name(), schema.name())) + .withAuditInfo(changedAuditInfo); + return builder.build(); + }); + + TableEntity storedTable = + entityStore.get(updatedTable.nameIdentifier(), Entity.EntityType.TABLE, TableEntity.class); + + assertEquals(table.id(), storedTable.id()); + assertEquals("test_table2", storedTable.name()); + assertEquals(changedAuditInfo.creator(), storedTable.auditInfo().creator()); + + TableEntity table3 = + createTable( + 3L, "test_table3", Namespace.ofTable(metalake.name(), catalog.name(), schema.name())); + entityStore.put(table3, false); + + assertThrows( + EntityAlreadyExistsException.class, + () -> + entityStore.update( + table3.nameIdentifier(), + TableEntity.class, + Entity.EntityType.TABLE, + s -> { + TableEntity.Builder builder = + new TableEntity.Builder() + .withId(table3.id()) + // table name already exists + .withName("test_table2") + .withNamespace( + Namespace.ofTable(metalake.name(), catalog.name(), schema.name())) + .withAuditInfo(s.auditInfo()); + return builder.build(); + })); + } + @Test public void testMetalakePutAndExists() throws IOException { BaseMetalake metalake = createMetalake(1L, "test_metalake", "this is test"); @@ -771,6 +1022,28 @@ public void testSchemaPutAndExists() throws IOException { assertTrue(entityStore.exists(schema.nameIdentifier(), Entity.EntityType.SCHEMA)); } + @Test + public void testTablePutAndExists() throws IOException { + BaseMetalake metalake = createMetalake(1L, "test_metalake", "this is test"); + entityStore.put(metalake, false); + + CatalogEntity catalog = + createCatalog(1L, "test_catalog", Namespace.ofCatalog(metalake.name()), "this is test"); + entityStore.put(catalog, false); + + SchemaEntity schema = + createSchema( + 1L, "test_schema", Namespace.ofSchema(metalake.name(), catalog.name()), "this is test"); + entityStore.put(schema, false); + + TableEntity table = + createTable( + 1L, "test_table", Namespace.ofTable(metalake.name(), catalog.name(), schema.name())); + entityStore.put(table, false); + + assertTrue(entityStore.exists(table.nameIdentifier(), Entity.EntityType.TABLE)); + } + private static BaseMetalake createMetalake(Long id, String name, String comment) { AuditInfo auditInfo = AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build(); @@ -845,6 +1118,24 @@ private static boolean checkSchemaEquals(SchemaEntity expected, SchemaEntity act && expected.auditInfo().equals(actual.auditInfo()); } + private static TableEntity createTable(Long id, String name, Namespace namespace) { + AuditInfo auditInfo = + AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build(); + return new TableEntity.Builder() + .withId(id) + .withName(name) + .withNamespace(namespace) + .withAuditInfo(auditInfo) + .build(); + } + + private static boolean checkTableEquals(TableEntity expected, TableEntity actual) { + return expected.id().equals(actual.id()) + && expected.name().equals(actual.name()) + && expected.namespace().equals(actual.namespace()) + && expected.auditInfo().equals(actual.auditInfo()); + } + private static void truncateAllTables() { try (SqlSession sqlSession = SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true)) { diff --git a/core/src/test/java/com/datastrato/gravitino/storage/relational/utils/TestPOConverters.java b/core/src/test/java/com/datastrato/gravitino/storage/relational/utils/TestPOConverters.java index 1babed4e6a7..87d95c93c5d 100644 --- a/core/src/test/java/com/datastrato/gravitino/storage/relational/utils/TestPOConverters.java +++ b/core/src/test/java/com/datastrato/gravitino/storage/relational/utils/TestPOConverters.java @@ -15,9 +15,11 @@ import com.datastrato.gravitino.meta.CatalogEntity; import com.datastrato.gravitino.meta.SchemaEntity; import com.datastrato.gravitino.meta.SchemaVersion; +import com.datastrato.gravitino.meta.TableEntity; import com.datastrato.gravitino.storage.relational.po.CatalogPO; import com.datastrato.gravitino.storage.relational.po.MetalakePO; import com.datastrato.gravitino.storage.relational.po.SchemaPO; +import com.datastrato.gravitino.storage.relational.po.TablePO; import com.fasterxml.jackson.core.JsonProcessingException; import java.time.Instant; import java.time.LocalDateTime; @@ -94,6 +96,24 @@ public void testFromSchemaPO() throws JsonProcessingException { assertEquals(expectedSchema.auditInfo().creator(), convertedSchema.auditInfo().creator()); } + @Test + public void testFromTablePO() throws JsonProcessingException { + TablePO tablePO = createTablePO(1L, "test", 1L, 1L, 1L); + + TableEntity expectedTable = + createTable(1L, "test", Namespace.ofTable("test_metalake", "test_catalog", "test_schema")); + + TableEntity convertedTable = + POConverters.fromTablePO( + tablePO, Namespace.ofTable("test_metalake", "test_catalog", "test_schema")); + + // Assert + assertEquals(expectedTable.id(), convertedTable.id()); + assertEquals(expectedTable.name(), convertedTable.name()); + assertEquals(expectedTable.namespace(), convertedTable.namespace()); + assertEquals(expectedTable.auditInfo().creator(), convertedTable.auditInfo().creator()); + } + @Test public void testFromMetalakePOs() throws JsonProcessingException { MetalakePO metalakePO1 = createMetalakePO(1L, "test", "this is test"); @@ -185,6 +205,34 @@ public void testFromSchemaPOs() throws JsonProcessingException { } } + @Test + public void testFromTablePOs() throws JsonProcessingException { + TablePO tablePO1 = createTablePO(1L, "test", 1L, 1L, 1L); + TablePO tablePO2 = createTablePO(2L, "test2", 1L, 1L, 1L); + List tablePOs = new ArrayList<>(Arrays.asList(tablePO1, tablePO2)); + List convertedTables = + POConverters.fromTablePOs( + tablePOs, Namespace.ofTable("test_metalake", "test_catalog", "test_schema")); + + TableEntity expectedTable1 = + createTable(1L, "test", Namespace.ofTable("test_metalake", "test_catalog", "test_schema")); + TableEntity expectedTable2 = + createTable(2L, "test2", Namespace.ofTable("test_metalake", "test_catalog", "test_schema")); + List expectedTables = + new ArrayList<>(Arrays.asList(expectedTable1, expectedTable2)); + + // Assert + int index = 0; + for (TableEntity tableEntity : convertedTables) { + assertEquals(expectedTables.get(index).id(), tableEntity.id()); + assertEquals(expectedTables.get(index).name(), tableEntity.name()); + assertEquals(expectedTables.get(index).namespace(), tableEntity.namespace()); + assertEquals( + expectedTables.get(index).auditInfo().creator(), tableEntity.auditInfo().creator()); + index++; + } + } + @Test public void testInitMetalakePOVersion() { BaseMetalake metalake = createMetalake(1L, "test", "this is test"); @@ -218,6 +266,20 @@ public void testInitSchemaPOVersion() { assertEquals(0, initPO.getDeletedAt()); } + @Test + public void testInitTablePOVersion() { + TableEntity tableEntity = + createTable(1L, "test", Namespace.ofTable("test_metalake", "test_catalog", "test_schema")); + TablePO.Builder builder = new TablePO.Builder(); + builder.withMetalakeId(1L); + builder.withCatalogId(1L); + builder.withSchemaId(1L); + TablePO initPO = POConverters.initializeTablePOWithVersion(tableEntity, builder); + assertEquals(1, initPO.getCurrentVersion()); + assertEquals(1, initPO.getLastVersion()); + assertEquals(0, initPO.getDeletedAt()); + } + @Test public void testUpdateMetalakePOVersion() { BaseMetalake metalake = createMetalake(1L, "test", "this is test"); @@ -263,6 +325,24 @@ public void testUpdateSchemaPOVersion() { assertEquals("this is test2", updatePO.getSchemaComment()); } + @Test + public void testUpdateTablePOVersion() { + TableEntity tableEntity = + createTable(1L, "test", Namespace.ofTable("test_metalake", "test_catalog", "test_schema")); + TableEntity updatedTable = + createTable(1L, "test", Namespace.ofTable("test_metalake", "test_catalog", "test_schema")); + TablePO.Builder builder = new TablePO.Builder(); + builder.withMetalakeId(1L); + builder.withCatalogId(1L); + builder.withSchemaId(1L); + TablePO initPO = POConverters.initializeTablePOWithVersion(tableEntity, builder); + TablePO updatePO = POConverters.updateTablePOWithVersion(initPO, updatedTable); + assertEquals(1, initPO.getCurrentVersion()); + assertEquals(1, initPO.getLastVersion()); + assertEquals(0, initPO.getDeletedAt()); + assertEquals("test", updatePO.getTableName()); + } + private static BaseMetalake createMetalake(Long id, String name, String comment) { AuditInfo auditInfo = AuditInfo.builder().withCreator("creator").withCreateTime(FIX_INSTANT).build(); @@ -372,4 +452,33 @@ private static SchemaPO createSchemaPO( .withDeletedAt(0L) .build(); } + + private static TableEntity createTable(Long id, String name, Namespace namespace) { + AuditInfo auditInfo = + AuditInfo.builder().withCreator("creator").withCreateTime(FIX_INSTANT).build(); + return new TableEntity.Builder() + .withId(id) + .withName(name) + .withNamespace(namespace) + .withAuditInfo(auditInfo) + .build(); + } + + private static TablePO createTablePO( + Long id, String name, Long metalakeId, Long catalogId, Long schemaId) + throws JsonProcessingException { + AuditInfo auditInfo = + AuditInfo.builder().withCreator("creator").withCreateTime(FIX_INSTANT).build(); + return new TablePO.Builder() + .withTableId(id) + .withTableName(name) + .withMetalakeId(metalakeId) + .withCatalogId(catalogId) + .withSchemaId(schemaId) + .withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(auditInfo)) + .withCurrentVersion(1L) + .withLastVersion(1L) + .withDeletedAt(0L) + .build(); + } } diff --git a/core/src/test/resources/h2/h2-init.sql b/core/src/test/resources/h2/h2-init.sql index 4c9f4959531..ec7a9be332f 100644 --- a/core/src/test/resources/h2/h2-init.sql +++ b/core/src/test/resources/h2/h2-init.sql @@ -48,5 +48,23 @@ CREATE TABLE IF NOT EXISTS `schema_meta` ( `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'schema deleted at', PRIMARY KEY (schema_id), CONSTRAINT uk_cid_sn_del UNIQUE (catalog_id, schema_name, deleted_at), - KEY idx_mid (metalake_id) + -- Aliases are used here, and indexes with the same name in H2 can only be created once. + KEY idx_smid (metalake_id) +) ENGINE=InnoDB; + +CREATE TABLE IF NOT EXISTS `table_meta` ( + `table_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'table id', + `table_name` VARCHAR(128) NOT NULL COMMENT 'table name', + `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id', + `catalog_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'catalog id', + `schema_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'schema id', + `audit_info` MEDIUMTEXT NOT NULL COMMENT 'table audit info', + `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'table current version', + `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'table last version', + `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'table deleted at', + PRIMARY KEY (table_id), + CONSTRAINT uk_sid_tn_del UNIQUE (schema_id, table_name, deleted_at), + -- Aliases are used here, and indexes with the same name in H2 can only be created once. + KEY idx_tmid (metalake_id), + KEY idx_tcid (catalog_id) ) ENGINE=InnoDB; \ No newline at end of file