From 8726cc596cb84af8db5df535d42476e76b87983b Mon Sep 17 00:00:00 2001 From: xloya <982052490@qq.com> Date: Wed, 28 Feb 2024 22:13:06 +0800 Subject: [PATCH] [#2081] feat(core): Add JDBC backend operations for catalog (#2078) ### What changes were proposed in this pull request? The purpose of this PR is to implement JDBC backend operation `Catalog` metadata. Depend on #1980 . Metadata operations of Schema, Table and Fileset will be supported in the remaining PRs. ### Why are the changes needed? Fix: #2081 ### How was this patch tested? Add unit tests to test the catalog metadata ops. --------- Co-authored-by: xiaojiebao --- .../exceptions/NoSuchEntityException.java | 2 + .../storage/relational/JDBCBackend.java | 14 +- .../relational/mapper/CatalogMetaMapper.java | 150 ++++++++++ .../relational/mapper/MetalakeMetaMapper.java | 19 +- .../storage/relational/po/CatalogPO.java | 171 +++++++++++ .../service/CatalogMetaService.java | 206 +++++++++++++ .../service/MetalakeMetaService.java | 58 +++- .../session/SqlSessionFactoryHelper.java | 2 + .../relational/utils/POConverters.java | 163 ++++++++--- core/src/main/resources/mysql/mysql_init.sql | 20 +- .../relational/TestRelationalEntityStore.java | 270 +++++++++++++++++- .../relational/utils/TestPOConverters.java | 138 +++++++-- core/src/test/resources/h2/h2-init.sql | 22 +- 13 files changed, 1152 insertions(+), 83 deletions(-) create mode 100644 core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/CatalogMetaMapper.java create mode 100644 core/src/main/java/com/datastrato/gravitino/storage/relational/po/CatalogPO.java create mode 100644 core/src/main/java/com/datastrato/gravitino/storage/relational/service/CatalogMetaService.java diff --git a/api/src/main/java/com/datastrato/gravitino/exceptions/NoSuchEntityException.java b/api/src/main/java/com/datastrato/gravitino/exceptions/NoSuchEntityException.java index 8a625957024..27ba0569e3b 100644 --- a/api/src/main/java/com/datastrato/gravitino/exceptions/NoSuchEntityException.java +++ b/api/src/main/java/com/datastrato/gravitino/exceptions/NoSuchEntityException.java @@ -9,6 +9,8 @@ /** This exception is thrown when an entity is not found. */ public class NoSuchEntityException extends RuntimeException { + /** The no such entity message for the exception. */ + public static final String NO_SUCH_ENTITY_MESSAGE = "No such entity: %s"; /** * Constructs a new NoSuchEntityException. diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java index d7825445bcd..c62676293d4 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java @@ -16,6 +16,8 @@ import com.datastrato.gravitino.exceptions.AlreadyExistsException; import com.datastrato.gravitino.exceptions.NoSuchEntityException; import com.datastrato.gravitino.meta.BaseMetalake; +import com.datastrato.gravitino.meta.CatalogEntity; +import com.datastrato.gravitino.storage.relational.service.CatalogMetaService; import com.datastrato.gravitino.storage.relational.service.MetalakeMetaService; import com.datastrato.gravitino.storage.relational.session.SqlSessionFactoryHelper; import java.io.IOException; @@ -42,6 +44,8 @@ public List list( switch (entityType) { case METALAKE: return (List) MetalakeMetaService.getInstance().listMetalakes(); + case CATALOG: + return (List) CatalogMetaService.getInstance().listCatalogsByNamespace(namespace); default: throw new UnsupportedEntityTypeException( "Unsupported entity type: %s for list operation", entityType); @@ -63,6 +67,8 @@ public void insert(E e, boolean overwritten) throws EntityAlreadyExistsException { if (e instanceof BaseMetalake) { MetalakeMetaService.getInstance().insertMetalake((BaseMetalake) e, overwritten); + } else if (e instanceof CatalogEntity) { + CatalogMetaService.getInstance().insertCatalog((CatalogEntity) e, overwritten); } else { throw new UnsupportedEntityTypeException( "Unsupported entity type: %s for insert operation", e.getClass()); @@ -76,6 +82,8 @@ public E update( switch (entityType) { case METALAKE: return (E) MetalakeMetaService.getInstance().updateMetalake(ident, updater); + case CATALOG: + return (E) CatalogMetaService.getInstance().updateCatalog(ident, updater); default: throw new UnsupportedEntityTypeException( "Unsupported entity type: %s for update operation", entityType); @@ -87,7 +95,9 @@ public E get( NameIdentifier ident, Entity.EntityType entityType) throws NoSuchEntityException { switch (entityType) { case METALAKE: - return (E) MetalakeMetaService.getInstance().getMetalakeByIdent(ident); + return (E) MetalakeMetaService.getInstance().getMetalakeByIdentifier(ident); + case CATALOG: + return (E) CatalogMetaService.getInstance().getCatalogByIdentifier(ident); default: throw new UnsupportedEntityTypeException( "Unsupported entity type: %s for get operation", entityType); @@ -99,6 +109,8 @@ public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolea switch (entityType) { case METALAKE: return MetalakeMetaService.getInstance().deleteMetalake(ident, cascade); + case CATALOG: + return CatalogMetaService.getInstance().deleteCatalog(ident, cascade); default: throw new UnsupportedEntityTypeException( "Unsupported entity type: %s for delete operation", entityType); diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/CatalogMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/CatalogMetaMapper.java new file mode 100644 index 00000000000..652cf52cadd --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/CatalogMetaMapper.java @@ -0,0 +1,150 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.storage.relational.mapper; + +import com.datastrato.gravitino.storage.relational.po.CatalogPO; +import java.util.List; +import org.apache.ibatis.annotations.Insert; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; +import org.apache.ibatis.annotations.Update; + +/** + * A MyBatis Mapper for catalog meta operation SQLs. + * + *

This interface class is a specification defined by MyBatis. It requires this interface class + * to identify the corresponding SQLs for execution. We can write SQLs in an additional XML file, or + * write SQLs with annotations in this interface Mapper. See: + */ +public interface CatalogMetaMapper { + String TABLE_NAME = "catalog_meta"; + + @Select( + "SELECT catalog_id as catalogId, catalog_name as catalogName," + + " metalake_id as metalakeId, type, provider," + + " catalog_comment as catalogComment, properties, audit_info as auditInfo," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0") + List listCatalogPOsByMetalakeId(@Param("metalakeId") Long metalakeId); + + @Select( + "SELECT catalog_id as catalogId FROM " + + TABLE_NAME + + " WHERE metalake_id = #{metalakeId} AND catalog_name = #{catalogName} AND deleted_at = 0") + Long selectCatalogIdByMetalakeIdAndName( + @Param("metalakeId") Long metalakeId, @Param("catalogName") String name); + + @Select( + "SELECT catalog_id as catalogId, catalog_name as catalogName," + + " metalake_id as metalakeId, type, provider," + + " catalog_comment as catalogComment, properties, audit_info as auditInfo," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE metalake_id = #{metalakeId} AND catalog_name = #{catalogName} AND deleted_at = 0") + CatalogPO selectCatalogMetaByMetalakeIdAndName( + @Param("metalakeId") Long metalakeId, @Param("catalogName") String name); + + @Insert( + "INSERT INTO " + + TABLE_NAME + + "(catalog_id, catalog_name, metalake_id," + + " type, provider, catalog_comment, properties, audit_info," + + " current_version, last_version, deleted_at)" + + " VALUES(" + + " #{catalogMeta.catalogId}," + + " #{catalogMeta.catalogName}," + + " #{catalogMeta.metalakeId}," + + " #{catalogMeta.type}," + + " #{catalogMeta.provider}," + + " #{catalogMeta.catalogComment}," + + " #{catalogMeta.properties}," + + " #{catalogMeta.auditInfo}," + + " #{catalogMeta.currentVersion}," + + " #{catalogMeta.lastVersion}," + + " #{catalogMeta.deletedAt}" + + " )") + void insertCatalogMeta(@Param("catalogMeta") CatalogPO catalogPO); + + @Insert( + "INSERT INTO " + + TABLE_NAME + + "(catalog_id, catalog_name, metalake_id," + + " type, provider, catalog_comment, properties, audit_info," + + " current_version, last_version, deleted_at)" + + " VALUES(" + + " #{catalogMeta.catalogId}," + + " #{catalogMeta.catalogName}," + + " #{catalogMeta.metalakeId}," + + " #{catalogMeta.type}," + + " #{catalogMeta.provider}," + + " #{catalogMeta.catalogComment}," + + " #{catalogMeta.properties}," + + " #{catalogMeta.auditInfo}," + + " #{catalogMeta.currentVersion}," + + " #{catalogMeta.lastVersion}," + + " #{catalogMeta.deletedAt}" + + " )" + + " ON DUPLICATE KEY UPDATE" + + " catalog_name = #{catalogMeta.catalogName}," + + " metalake_id = #{catalogMeta.metalakeId}," + + " type = #{catalogMeta.type}," + + " provider = #{catalogMeta.provider}," + + " catalog_comment = #{catalogMeta.catalogComment}," + + " properties = #{catalogMeta.properties}," + + " audit_info = #{catalogMeta.auditInfo}," + + " current_version = #{catalogMeta.currentVersion}," + + " last_version = #{catalogMeta.lastVersion}," + + " deleted_at = #{catalogMeta.deletedAt}") + void insertCatalogMetaOnDuplicateKeyUpdate(@Param("catalogMeta") CatalogPO catalogPO); + + @Update( + "UPDATE " + + TABLE_NAME + + " SET catalog_name = #{newCatalogMeta.catalogName}," + + " metalake_id = #{newCatalogMeta.metalakeId}," + + " type = #{newCatalogMeta.type}," + + " provider = #{newCatalogMeta.provider}," + + " catalog_comment = #{newCatalogMeta.catalogComment}," + + " properties = #{newCatalogMeta.properties}," + + " audit_info = #{newCatalogMeta.auditInfo}," + + " current_version = #{newCatalogMeta.currentVersion}," + + " last_version = #{newCatalogMeta.lastVersion}," + + " deleted_at = #{newCatalogMeta.deletedAt}" + + " WHERE catalog_id = #{oldCatalogMeta.catalogId}" + + " AND catalog_name = #{oldCatalogMeta.catalogName}" + + " AND metalake_id = #{oldCatalogMeta.metalakeId}" + + " AND type = #{oldCatalogMeta.type}" + + " AND provider = #{oldCatalogMeta.provider}" + + " AND catalog_comment = #{oldCatalogMeta.catalogComment}" + + " AND properties = #{oldCatalogMeta.properties}" + + " AND audit_info = #{oldCatalogMeta.auditInfo}" + + " AND current_version = #{oldCatalogMeta.currentVersion}" + + " AND last_version = #{oldCatalogMeta.lastVersion}" + + " AND deleted_at = 0") + Integer updateCatalogMeta( + @Param("newCatalogMeta") CatalogPO newCatalogPO, + @Param("oldCatalogMeta") CatalogPO oldCatalogPO); + + @Update( + "UPDATE " + + TABLE_NAME + + " SET deleted_at = UNIX_TIMESTAMP()" + + " WHERE catalog_id = #{catalogId} AND deleted_at = 0") + Integer softDeleteCatalogMetasByCatalogId(@Param("catalogId") Long catalogId); + + @Update( + "UPDATE " + + TABLE_NAME + + " SET deleted_at = UNIX_TIMESTAMP()" + + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0") + Integer softDeleteCatalogMetasByMetalakeId(@Param("metalakeId") Long metalakeId); +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/MetalakeMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/MetalakeMetaMapper.java index b731b718442..2336a6761a6 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/MetalakeMetaMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/MetalakeMetaMapper.java @@ -107,14 +107,14 @@ public interface MetalakeMetaMapper { + " current_version = #{newMetalakeMeta.currentVersion}," + " last_version = #{newMetalakeMeta.lastVersion}" + " WHERE metalake_id = #{oldMetalakeMeta.metalakeId}" - + " and metalake_name = #{oldMetalakeMeta.metalakeName}" - + " and metalake_comment = #{oldMetalakeMeta.metalakeComment}" - + " and properties = #{oldMetalakeMeta.properties}" - + " and audit_info = #{oldMetalakeMeta.auditInfo}" - + " and schema_version = #{oldMetalakeMeta.schemaVersion}" - + " and current_version = #{oldMetalakeMeta.currentVersion}" - + " and last_version = #{oldMetalakeMeta.lastVersion}" - + " and deleted_at = 0") + + " AND metalake_name = #{oldMetalakeMeta.metalakeName}" + + " AND metalake_comment = #{oldMetalakeMeta.metalakeComment}" + + " AND properties = #{oldMetalakeMeta.properties}" + + " AND audit_info = #{oldMetalakeMeta.auditInfo}" + + " AND schema_version = #{oldMetalakeMeta.schemaVersion}" + + " AND current_version = #{oldMetalakeMeta.currentVersion}" + + " AND last_version = #{oldMetalakeMeta.lastVersion}" + + " AND deleted_at = 0") Integer updateMetalakeMeta( @Param("newMetalakeMeta") MetalakePO newMetalakePO, @Param("oldMetalakeMeta") MetalakePO oldMetalakePO); @@ -122,6 +122,7 @@ Integer updateMetalakeMeta( @Update( "UPDATE " + TABLE_NAME - + " SET deleted_at = UNIX_TIMESTAMP() WHERE metalake_id = #{metalakeId}") + + " SET deleted_at = UNIX_TIMESTAMP()" + + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0") Integer softDeleteMetalakeMetaByMetalakeId(@Param("metalakeId") Long metalakeId); } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/po/CatalogPO.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/po/CatalogPO.java new file mode 100644 index 00000000000..10f7df52c06 --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/po/CatalogPO.java @@ -0,0 +1,171 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.storage.relational.po; + +import com.google.common.base.Objects; + +public class CatalogPO { + private Long catalogId; + private String catalogName; + private Long metalakeId; + private String type; + private String provider; + private String catalogComment; + private String properties; + private String auditInfo; + private Long currentVersion; + private Long lastVersion; + private Long deletedAt; + + public Long getCatalogId() { + return catalogId; + } + + public String getCatalogName() { + return catalogName; + } + + public Long getMetalakeId() { + return metalakeId; + } + + public String getType() { + return type; + } + + public String getProvider() { + return provider; + } + + public String getCatalogComment() { + return catalogComment; + } + + public String getProperties() { + return properties; + } + + public String getAuditInfo() { + return auditInfo; + } + + public Long getCurrentVersion() { + return currentVersion; + } + + public Long getLastVersion() { + return lastVersion; + } + + public Long getDeletedAt() { + return deletedAt; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof CatalogPO)) { + return false; + } + CatalogPO catalogPO = (CatalogPO) o; + return Objects.equal(getCatalogId(), catalogPO.getCatalogId()) + && Objects.equal(getCatalogName(), catalogPO.getCatalogName()) + && Objects.equal(getMetalakeId(), catalogPO.getMetalakeId()) + && Objects.equal(getType(), catalogPO.getType()) + && Objects.equal(getProvider(), catalogPO.getProvider()) + && Objects.equal(getCatalogComment(), catalogPO.getCatalogComment()) + && Objects.equal(getProperties(), catalogPO.getProperties()) + && Objects.equal(getAuditInfo(), catalogPO.getAuditInfo()) + && Objects.equal(getCurrentVersion(), catalogPO.getCurrentVersion()) + && Objects.equal(getLastVersion(), catalogPO.getLastVersion()) + && Objects.equal(getDeletedAt(), catalogPO.getDeletedAt()); + } + + @Override + public int hashCode() { + return Objects.hashCode( + getCatalogId(), + getCatalogName(), + getMetalakeId(), + getType(), + getProvider(), + getCatalogComment(), + getProperties(), + getAuditInfo(), + getCurrentVersion(), + getLastVersion(), + getDeletedAt()); + } + + public static class Builder { + private final CatalogPO metalakePO; + + public Builder() { + metalakePO = new CatalogPO(); + } + + public CatalogPO.Builder withCatalogId(Long catalogId) { + metalakePO.catalogId = catalogId; + return this; + } + + public CatalogPO.Builder withCatalogName(String name) { + metalakePO.catalogName = name; + return this; + } + + public CatalogPO.Builder withMetalakeId(Long metalakeId) { + metalakePO.metalakeId = metalakeId; + return this; + } + + public CatalogPO.Builder withType(String type) { + metalakePO.type = type; + return this; + } + + public CatalogPO.Builder withProvider(String provider) { + metalakePO.provider = provider; + return this; + } + + public CatalogPO.Builder withCatalogComment(String comment) { + metalakePO.catalogComment = comment; + return this; + } + + public CatalogPO.Builder withProperties(String properties) { + metalakePO.properties = properties; + return this; + } + + public CatalogPO.Builder withAuditInfo(String auditInfo) { + metalakePO.auditInfo = auditInfo; + return this; + } + + public CatalogPO.Builder withCurrentVersion(Long currentVersion) { + metalakePO.currentVersion = currentVersion; + return this; + } + + public CatalogPO.Builder withLastVersion(Long lastVersion) { + metalakePO.lastVersion = lastVersion; + return this; + } + + public CatalogPO.Builder withDeletedAt(Long deletedAt) { + metalakePO.deletedAt = deletedAt; + return this; + } + + public CatalogPO build() { + return metalakePO; + } + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/CatalogMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/CatalogMetaService.java new file mode 100644 index 00000000000..27e22d47f9a --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/CatalogMetaService.java @@ -0,0 +1,206 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.storage.relational.service; + +import com.datastrato.gravitino.Entity; +import com.datastrato.gravitino.EntityAlreadyExistsException; +import com.datastrato.gravitino.HasIdentifier; +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.Namespace; +import com.datastrato.gravitino.exceptions.NoSuchEntityException; +import com.datastrato.gravitino.meta.CatalogEntity; +import com.datastrato.gravitino.storage.relational.mapper.CatalogMetaMapper; +import com.datastrato.gravitino.storage.relational.mapper.MetalakeMetaMapper; +import com.datastrato.gravitino.storage.relational.po.CatalogPO; +import com.datastrato.gravitino.storage.relational.utils.POConverters; +import com.datastrato.gravitino.storage.relational.utils.SessionUtils; +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.sql.SQLIntegrityConstraintViolationException; +import java.util.List; +import java.util.Objects; +import java.util.function.Function; + +/** + * The service class for catalog metadata. It provides the basic database operations for catalog. + */ +public class CatalogMetaService { + private static final CatalogMetaService INSTANCE = new CatalogMetaService(); + + public static CatalogMetaService getInstance() { + return INSTANCE; + } + + private CatalogMetaService() {} + + public CatalogEntity getCatalogByIdentifier(NameIdentifier identifier) { + NameIdentifier.checkCatalog(identifier); + String metalakeName = identifier.namespace().level(0); + String catalogName = identifier.name(); + Long metalakeId = + SessionUtils.getWithoutCommit( + MetalakeMetaMapper.class, mapper -> mapper.selectMetalakeIdMetaByName(metalakeName)); + if (metalakeId == null) { + throw new NoSuchEntityException( + NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, identifier.namespace().toString()); + } + CatalogPO catalogPO = + SessionUtils.getWithoutCommit( + CatalogMetaMapper.class, + mapper -> mapper.selectCatalogMetaByMetalakeIdAndName(metalakeId, catalogName)); + if (catalogPO == null) { + throw new NoSuchEntityException( + NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, identifier.toString()); + } + return POConverters.fromCatalogPO(catalogPO, identifier.namespace()); + } + + public List listCatalogsByNamespace(Namespace namespace) { + Namespace.checkCatalog(namespace); + String metalakeName = namespace.level(0); + Long metalakeId = + SessionUtils.getWithoutCommit( + MetalakeMetaMapper.class, mapper -> mapper.selectMetalakeIdMetaByName(metalakeName)); + if (metalakeId == null) { + throw new NoSuchEntityException( + NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, namespace.toString()); + } + List catalogPOS = + SessionUtils.getWithoutCommit( + CatalogMetaMapper.class, mapper -> mapper.listCatalogPOsByMetalakeId(metalakeId)); + return POConverters.fromCatalogPOs(catalogPOS, namespace); + } + + public void insertCatalog(CatalogEntity catalogEntity, boolean overwrite) { + try { + NameIdentifier.checkCatalog(catalogEntity.nameIdentifier()); + Long metalakeId = + SessionUtils.getWithoutCommit( + MetalakeMetaMapper.class, + mapper -> mapper.selectMetalakeIdMetaByName(catalogEntity.namespace().level(0))); + if (metalakeId == null) { + throw new NoSuchEntityException( + NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, catalogEntity.namespace().toString()); + } + SessionUtils.doWithCommit( + CatalogMetaMapper.class, + mapper -> { + CatalogPO po = POConverters.initializeCatalogPOWithVersion(catalogEntity, metalakeId); + if (overwrite) { + mapper.insertCatalogMetaOnDuplicateKeyUpdate(po); + } else { + mapper.insertCatalogMeta(po); + } + }); + } catch (RuntimeException re) { + if (re.getCause() != null + && re.getCause().getCause() != null + && re.getCause().getCause() instanceof SQLIntegrityConstraintViolationException) { + // TODO We should make more fine-grained exception judgments + // Usually throwing `SQLIntegrityConstraintViolationException` means that + // SQL violates the constraints of `primary key` and `unique key`. + // We simply think that the entity already exists at this time. + throw new EntityAlreadyExistsException( + String.format("Catalog entity: %s already exists", catalogEntity.nameIdentifier())); + } + throw re; + } + } + + public CatalogEntity updateCatalog( + NameIdentifier identifier, Function updater) throws IOException { + NameIdentifier.checkCatalog(identifier); + String metalakeName = identifier.namespace().level(0); + String catalogName = identifier.name(); + Long metalakeId = + SessionUtils.getWithoutCommit( + MetalakeMetaMapper.class, mapper -> mapper.selectMetalakeIdMetaByName(metalakeName)); + if (metalakeId == null) { + throw new NoSuchEntityException( + NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, identifier.namespace().toString()); + } + + CatalogPO oldCatalogPO = + SessionUtils.getWithoutCommit( + CatalogMetaMapper.class, + mapper -> mapper.selectCatalogMetaByMetalakeIdAndName(metalakeId, catalogName)); + if (oldCatalogPO == null) { + throw new NoSuchEntityException( + NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, identifier.toString()); + } + + CatalogEntity oldCatalogEntity = + POConverters.fromCatalogPO(oldCatalogPO, identifier.namespace()); + CatalogEntity newEntity = (CatalogEntity) updater.apply((E) oldCatalogEntity); + Preconditions.checkArgument( + Objects.equals(oldCatalogEntity.id(), newEntity.id()), + "The updated catalog entity id: %s should be same with the catalog entity id before: %s", + newEntity.id(), + oldCatalogEntity.id()); + Integer updateResult; + try { + updateResult = + SessionUtils.doWithCommitAndFetchResult( + CatalogMetaMapper.class, + mapper -> + mapper.updateCatalogMeta( + POConverters.updateCatalogPOWithVersion(oldCatalogPO, newEntity, metalakeId), + oldCatalogPO)); + } catch (RuntimeException re) { + if (re.getCause() != null + && re.getCause().getCause() != null + && re.getCause().getCause() instanceof SQLIntegrityConstraintViolationException) { + // TODO We should make more fine-grained exception judgments + // Usually throwing `SQLIntegrityConstraintViolationException` means that + // SQL violates the constraints of `primary key` and `unique key`. + // We simply think that the entity already exists at this time. + throw new EntityAlreadyExistsException( + String.format("Catalog entity: %s already exists", newEntity.nameIdentifier())); + } + throw re; + } + + if (updateResult > 0) { + return newEntity; + } else { + throw new IOException("Failed to update the entity: " + identifier); + } + } + + public boolean deleteCatalog(NameIdentifier identifier, boolean cascade) { + NameIdentifier.checkCatalog(identifier); + String metalakeName = identifier.namespace().level(0); + String catalogName = identifier.name(); + Long metalakeId = + SessionUtils.getWithoutCommit( + MetalakeMetaMapper.class, mapper -> mapper.selectMetalakeIdMetaByName(metalakeName)); + if (metalakeId == null) { + throw new NoSuchEntityException( + NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, identifier.namespace().toString()); + } + Long catalogId = + SessionUtils.getWithoutCommit( + CatalogMetaMapper.class, + mapper -> mapper.selectCatalogIdByMetalakeIdAndName(metalakeId, catalogName)); + if (catalogId != null) { + if (cascade) { + SessionUtils.doMultipleWithCommit( + () -> + SessionUtils.doWithoutCommit( + CatalogMetaMapper.class, + mapper -> mapper.softDeleteCatalogMetasByCatalogId(catalogId)), + () -> { + // TODO We will cascade delete the metadata of sub-resources under the catalog + }); + } else { + // TODO Check whether the sub-resources are empty. If the sub-resources are not empty, + // deletion is not allowed. + SessionUtils.doWithCommit( + CatalogMetaMapper.class, mapper -> mapper.softDeleteCatalogMetasByCatalogId(catalogId)); + } + } + return true; + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java index 9cb1418e444..3850e1e47e1 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java @@ -9,8 +9,12 @@ import com.datastrato.gravitino.EntityAlreadyExistsException; import com.datastrato.gravitino.HasIdentifier; import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.Namespace; import com.datastrato.gravitino.exceptions.NoSuchEntityException; +import com.datastrato.gravitino.exceptions.NonEmptyEntityException; import com.datastrato.gravitino.meta.BaseMetalake; +import com.datastrato.gravitino.meta.CatalogEntity; +import com.datastrato.gravitino.storage.relational.mapper.CatalogMetaMapper; import com.datastrato.gravitino.storage.relational.mapper.MetalakeMetaMapper; import com.datastrato.gravitino.storage.relational.po.MetalakePO; import com.datastrato.gravitino.storage.relational.utils.POConverters; @@ -22,6 +26,9 @@ import java.util.Objects; import java.util.function.Function; +/** + * The service class for metalake metadata. It provides the basic database operations for metalake. + */ public class MetalakeMetaService { private static final MetalakeMetaService INSTANCE = new MetalakeMetaService(); @@ -38,18 +45,21 @@ public List listMetalakes() { return POConverters.fromMetalakePOs(metalakePOS); } - public BaseMetalake getMetalakeByIdent(NameIdentifier ident) { + public BaseMetalake getMetalakeByIdentifier(NameIdentifier ident) { + NameIdentifier.checkMetalake(ident); MetalakePO metalakePO = SessionUtils.getWithoutCommit( MetalakeMetaMapper.class, mapper -> mapper.selectMetalakeMetaByName(ident.name())); if (metalakePO == null) { - throw new NoSuchEntityException("No such entity: %s", ident.toString()); + throw new NoSuchEntityException( + NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, ident.toString()); } return POConverters.fromMetalakePO(metalakePO); } public void insertMetalake(BaseMetalake baseMetalake, boolean overwrite) { try { + NameIdentifier.checkMetalake(baseMetalake.nameIdentifier()); SessionUtils.doWithCommit( MetalakeMetaMapper.class, mapper -> { @@ -69,8 +79,7 @@ public void insertMetalake(BaseMetalake baseMetalake, boolean overwrite) { // SQL violates the constraints of `primary key` and `unique key`. // We simply think that the entity already exists at this time. throw new EntityAlreadyExistsException( - String.format( - "Metalake entity: %s already exists", baseMetalake.nameIdentifier().name())); + String.format("Metalake entity: %s already exists", baseMetalake.nameIdentifier())); } throw re; } @@ -78,11 +87,13 @@ public void insertMetalake(BaseMetalake baseMetalake, boolean overwrite) { public BaseMetalake updateMetalake( NameIdentifier ident, Function updater) throws IOException { + NameIdentifier.checkMetalake(ident); MetalakePO oldMetalakePO = SessionUtils.getWithoutCommit( MetalakeMetaMapper.class, mapper -> mapper.selectMetalakeMetaByName(ident.name())); if (oldMetalakePO == null) { - throw new NoSuchEntityException("No such entity: %s", ident.toString()); + throw new NoSuchEntityException( + NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE, ident.toString()); } BaseMetalake oldMetalakeEntity = POConverters.fromMetalakePO(oldMetalakePO); @@ -94,11 +105,26 @@ public BaseMetalake updateMetalake( oldMetalakeEntity.id()); MetalakePO newMetalakePO = POConverters.updateMetalakePOWithVersion(oldMetalakePO, newMetalakeEntity); + Integer updateResult; + try { + updateResult = + SessionUtils.doWithCommitAndFetchResult( + MetalakeMetaMapper.class, + mapper -> mapper.updateMetalakeMeta(newMetalakePO, oldMetalakePO)); + } catch (RuntimeException re) { + if (re.getCause() != null + && re.getCause().getCause() != null + && re.getCause().getCause() instanceof SQLIntegrityConstraintViolationException) { + // TODO We should make more fine-grained exception judgments + // Usually throwing `SQLIntegrityConstraintViolationException` means that + // SQL violates the constraints of `primary key` and `unique key`. + // We simply think that the entity already exists at this time. + throw new EntityAlreadyExistsException( + String.format("Catalog entity: %s already exists", newMetalakeEntity.nameIdentifier())); + } + throw re; + } - Integer updateResult = - SessionUtils.doWithCommitAndFetchResult( - MetalakeMetaMapper.class, - mapper -> mapper.updateMetalakeMeta(newMetalakePO, oldMetalakePO)); if (updateResult > 0) { return newMetalakeEntity; } else { @@ -107,6 +133,7 @@ public BaseMetalake updateMetalake( } public boolean deleteMetalake(NameIdentifier ident, boolean cascade) { + NameIdentifier.checkMetalake(ident); Long metalakeId = SessionUtils.getWithoutCommit( MetalakeMetaMapper.class, mapper -> mapper.selectMetalakeIdMetaByName(ident.name())); @@ -117,12 +144,21 @@ public boolean deleteMetalake(NameIdentifier ident, boolean cascade) { SessionUtils.doWithoutCommit( MetalakeMetaMapper.class, mapper -> mapper.softDeleteMetalakeMetaByMetalakeId(metalakeId)), + () -> + SessionUtils.doWithoutCommit( + CatalogMetaMapper.class, + mapper -> mapper.softDeleteCatalogMetasByMetalakeId(metalakeId)), () -> { // TODO We will cascade delete the metadata of sub-resources under the metalake }); } else { - // TODO Check whether the sub-resources are empty. If the sub-resources are not empty, - // deletion is not allowed. + List catalogEntities = + CatalogMetaService.getInstance() + .listCatalogsByNamespace(Namespace.ofCatalog(ident.name())); + if (!catalogEntities.isEmpty()) { + throw new NonEmptyEntityException( + "Entity %s has sub-entities, you should remove sub-entities first", ident); + } SessionUtils.doWithCommit( MetalakeMetaMapper.class, mapper -> mapper.softDeleteMetalakeMetaByMetalakeId(metalakeId)); diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/session/SqlSessionFactoryHelper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/session/SqlSessionFactoryHelper.java index aaff7ce6130..b8dcd440115 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/session/SqlSessionFactoryHelper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/session/SqlSessionFactoryHelper.java @@ -7,6 +7,7 @@ import com.datastrato.gravitino.Config; import com.datastrato.gravitino.Configs; +import com.datastrato.gravitino.storage.relational.mapper.CatalogMetaMapper; import com.datastrato.gravitino.storage.relational.mapper.MetalakeMetaMapper; import com.google.common.base.Preconditions; import java.sql.SQLException; @@ -74,6 +75,7 @@ public void init(Config config) { // Initialize the configuration Configuration configuration = new Configuration(environment); configuration.addMapper(MetalakeMetaMapper.class); + configuration.addMapper(CatalogMetaMapper.class); // Create the SqlSessionFactory object, it is a singleton object if (sqlSessionFactory == null) { diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/POConverters.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/POConverters.java index 0c03faa77ba..4aa72329d73 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/POConverters.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/utils/POConverters.java @@ -5,13 +5,16 @@ package com.datastrato.gravitino.storage.relational.utils; +import com.datastrato.gravitino.Catalog; +import com.datastrato.gravitino.Namespace; import com.datastrato.gravitino.json.JsonUtils; import com.datastrato.gravitino.meta.AuditInfo; import com.datastrato.gravitino.meta.BaseMetalake; +import com.datastrato.gravitino.meta.CatalogEntity; import com.datastrato.gravitino.meta.SchemaVersion; +import com.datastrato.gravitino.storage.relational.po.CatalogPO; import com.datastrato.gravitino.storage.relational.po.MetalakePO; import com.fasterxml.jackson.core.JsonProcessingException; -import com.google.common.annotations.VisibleForTesting; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -22,13 +25,12 @@ public class POConverters { private POConverters() {} /** - * Convert {@link BaseMetalake} to {@link MetalakePO} + * Initialize MetalakePO * * @param baseMetalake BaseMetalake object - * @return MetalakePO object from BaseMetalake object + * @return MetalakePO object with version initialized */ - @VisibleForTesting - static MetalakePO toMetalakePO(BaseMetalake baseMetalake) { + public static MetalakePO initializeMetalakePOWithVersion(BaseMetalake baseMetalake) { try { return new MetalakePO.Builder() .withMetalakeId(baseMetalake.id()) @@ -38,33 +40,15 @@ static MetalakePO toMetalakePO(BaseMetalake baseMetalake) { .withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(baseMetalake.auditInfo())) .withSchemaVersion( JsonUtils.anyFieldMapper().writeValueAsString(baseMetalake.getVersion())) + .withCurrentVersion(1L) + .withLastVersion(1L) + .withDeletedAt(0L) .build(); } catch (JsonProcessingException e) { throw new RuntimeException("Failed to serialize json object:", e); } } - /** - * Initialize MetalakePO - * - * @param baseMetalake BaseMetalake object - * @return MetalakePO object with version initialized - */ - public static MetalakePO initializeMetalakePOWithVersion(BaseMetalake baseMetalake) { - MetalakePO metalakePO = toMetalakePO(baseMetalake); - return new MetalakePO.Builder() - .withMetalakeId(metalakePO.getMetalakeId()) - .withMetalakeName(metalakePO.getMetalakeName()) - .withMetalakeComment(metalakePO.getMetalakeComment()) - .withProperties(metalakePO.getProperties()) - .withAuditInfo(metalakePO.getAuditInfo()) - .withSchemaVersion(metalakePO.getSchemaVersion()) - .withCurrentVersion(1L) - .withLastVersion(1L) - .withDeletedAt(0L) - .build(); - } - /** * Update MetalakePO version * @@ -74,21 +58,25 @@ public static MetalakePO initializeMetalakePOWithVersion(BaseMetalake baseMetala */ public static MetalakePO updateMetalakePOWithVersion( MetalakePO oldMetalakePO, BaseMetalake newMetalake) { - MetalakePO newMetalakePO = toMetalakePO(newMetalake); Long lastVersion = oldMetalakePO.getLastVersion(); // Will set the version to the last version + 1 when having some fields need be multiple version Long nextVersion = lastVersion; - return new MetalakePO.Builder() - .withMetalakeId(newMetalakePO.getMetalakeId()) - .withMetalakeName(newMetalakePO.getMetalakeName()) - .withMetalakeComment(newMetalakePO.getMetalakeComment()) - .withProperties(newMetalakePO.getProperties()) - .withAuditInfo(newMetalakePO.getAuditInfo()) - .withSchemaVersion(newMetalakePO.getSchemaVersion()) - .withCurrentVersion(nextVersion) - .withLastVersion(nextVersion) - .withDeletedAt(0L) - .build(); + try { + return new MetalakePO.Builder() + .withMetalakeId(newMetalake.id()) + .withMetalakeName(newMetalake.name()) + .withMetalakeComment(newMetalake.comment()) + .withProperties(JsonUtils.anyFieldMapper().writeValueAsString(newMetalake.properties())) + .withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(newMetalake.auditInfo())) + .withSchemaVersion( + JsonUtils.anyFieldMapper().writeValueAsString(newMetalake.getVersion())) + .withCurrentVersion(nextVersion) + .withLastVersion(nextVersion) + .withDeletedAt(0L) + .build(); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to serialize json object:", e); + } } /** @@ -125,4 +113,103 @@ public static BaseMetalake fromMetalakePO(MetalakePO metalakePO) { public static List fromMetalakePOs(List metalakePOS) { return metalakePOS.stream().map(POConverters::fromMetalakePO).collect(Collectors.toList()); } + + /** + * Initialize CatalogPO + * + * @param catalogEntity CatalogEntity object + * @return CatalogPO object with version initialized + */ + public static CatalogPO initializeCatalogPOWithVersion( + CatalogEntity catalogEntity, Long metalakeId) { + try { + return new CatalogPO.Builder() + .withCatalogId(catalogEntity.id()) + .withCatalogName(catalogEntity.name()) + .withMetalakeId(metalakeId) + .withType(catalogEntity.getType().name()) + .withProvider(catalogEntity.getProvider()) + .withCatalogComment(catalogEntity.getComment()) + .withProperties( + JsonUtils.anyFieldMapper().writeValueAsString(catalogEntity.getProperties())) + .withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(catalogEntity.auditInfo())) + .withCurrentVersion(1L) + .withLastVersion(1L) + .withDeletedAt(0L) + .build(); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to serialize json object:", e); + } + } + + /** + * Update CatalogPO version + * + * @param oldCatalogPO the old CatalogPO object + * @param newCatalog the new CatalogEntity object + * @return CatalogPO object with updated version + */ + public static CatalogPO updateCatalogPOWithVersion( + CatalogPO oldCatalogPO, CatalogEntity newCatalog, Long metalakeId) { + Long lastVersion = oldCatalogPO.getLastVersion(); + // Will set the version to the last version + 1 when having some fields need be multiple version + Long nextVersion = lastVersion; + try { + return new CatalogPO.Builder() + .withCatalogId(newCatalog.id()) + .withCatalogName(newCatalog.name()) + .withMetalakeId(metalakeId) + .withType(newCatalog.getType().name()) + .withProvider(newCatalog.getProvider()) + .withCatalogComment(newCatalog.getComment()) + .withProperties(JsonUtils.anyFieldMapper().writeValueAsString(newCatalog.getProperties())) + .withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(newCatalog.auditInfo())) + .withCurrentVersion(nextVersion) + .withLastVersion(nextVersion) + .withDeletedAt(0L) + .build(); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to serialize json object:", e); + } + } + + /** + * Convert {@link CatalogPO} to {@link CatalogEntity} + * + * @param catalogPO CatalogPO object to be converted + * @param namespace Namespace object to be associated with the catalog + * @return CatalogEntity object from CatalogPO object + */ + public static CatalogEntity fromCatalogPO(CatalogPO catalogPO, Namespace namespace) { + try { + return CatalogEntity.builder() + .withId(catalogPO.getCatalogId()) + .withName(catalogPO.getCatalogName()) + .withNamespace(namespace) + .withType(Catalog.Type.valueOf(catalogPO.getType())) + .withProvider(catalogPO.getProvider()) + .withComment(catalogPO.getCatalogComment()) + .withProperties( + JsonUtils.anyFieldMapper().readValue(catalogPO.getProperties(), Map.class)) + .withAuditInfo( + JsonUtils.anyFieldMapper().readValue(catalogPO.getAuditInfo(), AuditInfo.class)) + .build(); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to deserialize json object:", e); + } + } + + /** + * Convert list of {@link MetalakePO} to list of {@link BaseMetalake} + * + * @param catalogPOS list of MetalakePO objects + * @param namespace Namespace object to be associated with the metalake + * @return list of BaseMetalake objects from list of MetalakePO objects + */ + public static List fromCatalogPOs( + List catalogPOS, Namespace namespace) { + return catalogPOS.stream() + .map(catalogPO -> POConverters.fromCatalogPO(catalogPO, namespace)) + .collect(Collectors.toList()); + } } diff --git a/core/src/main/resources/mysql/mysql_init.sql b/core/src/main/resources/mysql/mysql_init.sql index 77f300810b6..613d0b692b3 100644 --- a/core/src/main/resources/mysql/mysql_init.sql +++ b/core/src/main/resources/mysql/mysql_init.sql @@ -12,7 +12,23 @@ CREATE TABLE IF NOT EXISTS `metalake_meta` ( `schema_version` MEDIUMTEXT NOT NULL COMMENT 'metalake schema version info', `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'metalake current version', `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'metalake last version', - `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 NULL COMMENT 'metalake deleted at', + `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'metalake deleted at', PRIMARY KEY (`metalake_id`), UNIQUE KEY `uk_mn_del` (`metalake_name`, `deleted_at`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'metalake metadata'; \ No newline at end of file +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'metalake metadata'; + +CREATE TABLE IF NOT EXISTS `catalog_meta` ( + `catalog_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'catalog id', + `catalog_name` VARCHAR(128) NOT NULL COMMENT 'catalog name', + `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id', + `type` VARCHAR(64) NOT NULL COMMENT 'catalog type', + `provider` VARCHAR(64) NOT NULL COMMENT 'catalog provider', + `catalog_comment` VARCHAR(256) DEFAULT '' COMMENT 'catalog comment', + `properties` MEDIUMTEXT DEFAULT NULL COMMENT 'catalog properties', + `audit_info` MEDIUMTEXT NOT NULL COMMENT 'catalog audit info', + `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'catalog current version', + `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'catalog last version', + `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'catalog deleted at', + PRIMARY KEY (`catalog_id`), + UNIQUE KEY `uk_mid_cn_del` (`metalake_id`, `catalog_name`, `deleted_at`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'catalog metadata'; \ No newline at end of file diff --git a/core/src/test/java/com/datastrato/gravitino/storage/relational/TestRelationalEntityStore.java b/core/src/test/java/com/datastrato/gravitino/storage/relational/TestRelationalEntityStore.java index e1703c360ff..ccf84585d20 100644 --- a/core/src/test/java/com/datastrato/gravitino/storage/relational/TestRelationalEntityStore.java +++ b/core/src/test/java/com/datastrato/gravitino/storage/relational/TestRelationalEntityStore.java @@ -14,10 +14,12 @@ import static com.datastrato.gravitino.Configs.ENTITY_STORE; import static com.datastrato.gravitino.Configs.RELATIONAL_ENTITY_STORE; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import com.datastrato.gravitino.Catalog; import com.datastrato.gravitino.Config; import com.datastrato.gravitino.Entity; import com.datastrato.gravitino.EntityAlreadyExistsException; @@ -25,8 +27,10 @@ import com.datastrato.gravitino.EntityStoreFactory; import com.datastrato.gravitino.Namespace; import com.datastrato.gravitino.exceptions.NoSuchEntityException; +import com.datastrato.gravitino.exceptions.NonEmptyEntityException; import com.datastrato.gravitino.meta.AuditInfo; import com.datastrato.gravitino.meta.BaseMetalake; +import com.datastrato.gravitino.meta.CatalogEntity; import com.datastrato.gravitino.meta.SchemaVersion; import com.datastrato.gravitino.storage.relational.session.SqlSessionFactoryHelper; import java.io.BufferedReader; @@ -43,9 +47,11 @@ import java.sql.Statement; import java.time.Instant; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.UUID; +import java.util.stream.Collectors; import org.apache.ibatis.session.SqlSession; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -130,6 +136,7 @@ public static void tearDown() { @Test public void testPutAndGet() throws IOException { + // metalake BaseMetalake metalake = createMetalake(1L, "test_metalake", "this is test"); entityStore.put(metalake, false); BaseMetalake insertedMetalake = @@ -153,10 +160,44 @@ public void testPutAndGet() throws IOException { entityStore.list(Namespace.empty(), BaseMetalake.class, Entity.EntityType.METALAKE).size()); assertEquals("test_metalake2", insertedMetalake1.name()); assertEquals("this is test2", insertedMetalake1.comment()); + + // catalog + CatalogEntity catalog = + createCatalog( + 1L, "test_catalog", Namespace.ofCatalog("test_metalake2"), "this is catalog test"); + entityStore.put(catalog, false); + CatalogEntity insertedCatalog = + entityStore.get(catalog.nameIdentifier(), Entity.EntityType.CATALOG, CatalogEntity.class); + assertNotNull(insertedCatalog); + assertTrue(checkCatalogEquals(catalog, insertedCatalog)); + + // overwrite false + CatalogEntity duplicateCatalog = + createCatalog( + 1L, "test_catalog", Namespace.ofCatalog("test_metalake2"), "this is catalog test"); + assertThrows( + EntityAlreadyExistsException.class, () -> entityStore.put(duplicateCatalog, false)); + + // overwrite true + CatalogEntity overittenCatalog = + createCatalog( + 1L, "test_catalog1", Namespace.ofCatalog("test_metalake2"), "this is catalog test1"); + entityStore.put(overittenCatalog, true); + CatalogEntity insertedCatalog1 = + entityStore.get( + overittenCatalog.nameIdentifier(), Entity.EntityType.CATALOG, CatalogEntity.class); + assertEquals( + 1, + entityStore + .list(overittenCatalog.namespace(), CatalogEntity.class, Entity.EntityType.CATALOG) + .size()); + assertEquals("test_catalog1", insertedCatalog1.name()); + assertEquals("this is catalog test1", insertedCatalog1.getComment()); } @Test public void testPutAndList() throws IOException { + // metalake BaseMetalake metalake1 = createMetalake(1L, "test_metalake1", "this is test 1"); BaseMetalake metalake2 = createMetalake(2L, "test_metalake2", "this is test 2"); List beforePutList = @@ -167,27 +208,112 @@ public void testPutAndList() throws IOException { entityStore.put(metalake1, false); entityStore.put(metalake2, false); List metalakes = - entityStore.list(metalake1.namespace(), BaseMetalake.class, Entity.EntityType.METALAKE); + entityStore.list(metalake1.namespace(), BaseMetalake.class, Entity.EntityType.METALAKE) + .stream() + .sorted(Comparator.comparing(BaseMetalake::id)) + .collect(Collectors.toList()); assertNotNull(metalakes); assertEquals(2, metalakes.size()); assertTrue(checkMetalakeEquals(metalake1, metalakes.get(0))); assertTrue(checkMetalakeEquals(metalake2, metalakes.get(1))); + + // catalog + CatalogEntity catalog1 = + createCatalog( + 1L, "test_catalog1", Namespace.ofCatalog(metalake1.name()), "this is catalog 1"); + CatalogEntity catalog2 = + createCatalog( + 2L, "test_catalog2", Namespace.ofCatalog(metalake1.name()), "this is catalog 2"); + List beforeCatalogList = + entityStore.list(catalog1.namespace(), CatalogEntity.class, Entity.EntityType.CATALOG); + assertNotNull(beforeCatalogList); + assertEquals(0, beforeCatalogList.size()); + + entityStore.put(catalog1, false); + entityStore.put(catalog2, false); + List catalogEntities = + entityStore.list(catalog1.namespace(), CatalogEntity.class, Entity.EntityType.CATALOG) + .stream() + .sorted(Comparator.comparing(CatalogEntity::id)) + .collect(Collectors.toList()); + assertNotNull(catalogEntities); + assertEquals(2, catalogEntities.size()); + assertTrue(checkCatalogEquals(catalog1, catalogEntities.get(0))); + assertTrue(checkCatalogEquals(catalog2, catalogEntities.get(1))); } @Test - public void testPutAndDelete() throws IOException { + public void testPutAndDelete() throws IOException, InterruptedException { + // metalake BaseMetalake metalake = createMetalake(1L, "test_metalake", "this is test"); entityStore.put(metalake, false); + assertNotNull( + entityStore.get(metalake.nameIdentifier(), Entity.EntityType.METALAKE, BaseMetalake.class)); entityStore.delete(metalake.nameIdentifier(), Entity.EntityType.METALAKE, false); assertThrows( NoSuchEntityException.class, () -> entityStore.get( metalake.nameIdentifier(), Entity.EntityType.METALAKE, BaseMetalake.class)); + + // sleep 1s to make delete_at seconds differently + Thread.sleep(1000); + + // test cascade delete + BaseMetalake metalake1 = createMetalake(2L, "test_metalake", "this is test"); + entityStore.put(metalake1, false); + CatalogEntity subCatalog = + createCatalog( + 1L, "test_catalog", Namespace.ofCatalog(metalake1.name()), "test cascade deleted"); + entityStore.put(subCatalog, false); + + // cascade is false + assertThrows( + NonEmptyEntityException.class, + () -> entityStore.delete(metalake1.nameIdentifier(), Entity.EntityType.METALAKE, false)); + + // cascade is true + entityStore.delete(metalake1.nameIdentifier(), Entity.EntityType.METALAKE, true); + assertFalse(entityStore.exists(metalake1.nameIdentifier(), Entity.EntityType.METALAKE)); + assertFalse(entityStore.exists(subCatalog.nameIdentifier(), Entity.EntityType.CATALOG)); + + // catalog + BaseMetalake metalake2 = createMetalake(3L, "test_metalake", "this is test"); + entityStore.put(metalake2, false); + CatalogEntity catalog = + createCatalog(2L, "test_catalog", Namespace.ofCatalog("test_metalake"), "this is test"); + entityStore.put(catalog, false); + assertNotNull( + entityStore.get(catalog.nameIdentifier(), Entity.EntityType.CATALOG, CatalogEntity.class)); + entityStore.delete(catalog.nameIdentifier(), Entity.EntityType.CATALOG, false); + assertThrows( + NoSuchEntityException.class, + () -> + entityStore.get( + catalog.nameIdentifier(), Entity.EntityType.CATALOG, CatalogEntity.class)); + + // sleep 1s to make delete_at seconds differently + Thread.sleep(1000); + + // test cascade delete + CatalogEntity catalog1 = + createCatalog( + 3L, "test_catalog1", Namespace.ofCatalog(metalake2.name()), "test cascade deleted"); + entityStore.put(catalog1, false); + + // cascade is false + assertThrows( + NonEmptyEntityException.class, + () -> entityStore.delete(metalake2.nameIdentifier(), Entity.EntityType.METALAKE, false)); + + // cascade is true + entityStore.delete(catalog1.nameIdentifier(), Entity.EntityType.CATALOG, true); + assertFalse(entityStore.exists(catalog1.nameIdentifier(), Entity.EntityType.CATALOG)); } @Test public void testPutAndUpdate() throws IOException { + // metalake BaseMetalake metalake = createMetalake(1L, "test_metalake", "this is test"); entityStore.put(metalake, false); @@ -236,6 +362,119 @@ public void testPutAndUpdate() throws IOException { assertEquals("test_metalake2", updatedMetalake.name()); assertEquals("this is test 2", updatedMetalake.comment()); assertEquals(changedAuditInfo.creator(), updatedMetalake.auditInfo().creator()); + + BaseMetalake metalake3 = createMetalake(3L, "test_metalake3", "this is test 3"); + entityStore.put(metalake3, false); + assertThrows( + EntityAlreadyExistsException.class, + () -> + entityStore.update( + metalake3.nameIdentifier(), + BaseMetalake.class, + Entity.EntityType.METALAKE, + m -> { + BaseMetalake.Builder builder = + new BaseMetalake.Builder() + .withId(metalake3.id()) + .withName("test_metalake2") + .withComment(metalake3.comment()) + .withProperties(new HashMap<>()) + .withAuditInfo((AuditInfo) m.auditInfo()) + .withVersion(m.getVersion()); + return builder.build(); + })); + + // catalog + CatalogEntity catalog = + createCatalog( + 1L, "test_catalog", Namespace.ofCatalog("test_metalake2"), "this is catalog test"); + entityStore.put(catalog, false); + assertThrows( + RuntimeException.class, + () -> + entityStore.update( + catalog.nameIdentifier(), + CatalogEntity.class, + Entity.EntityType.CATALOG, + c -> { + CatalogEntity.Builder builder = + CatalogEntity.builder() + // Change the id, which is not allowed + .withId(2L) + .withName("test_catalog2") + .withNamespace(Namespace.ofCatalog(updatedMetalake.name())) + .withType(Catalog.Type.RELATIONAL) + .withProvider("test") + .withComment("this is catalog test 2") + .withProperties(new HashMap<>()) + .withAuditInfo((AuditInfo) c.auditInfo()); + return builder.build(); + })); + + CatalogEntity updatedCatalog = + entityStore.update( + catalog.nameIdentifier(), + CatalogEntity.class, + Entity.EntityType.CATALOG, + c -> { + CatalogEntity.Builder builder = + CatalogEntity.builder() + .withId(c.id()) + .withName("test_catalog2") + .withNamespace(Namespace.ofCatalog(updatedMetalake.name())) + .withType(Catalog.Type.RELATIONAL) + .withProvider("test") + .withComment("this is catalog test 2") + .withProperties(new HashMap<>()) + .withAuditInfo(changedAuditInfo); + return builder.build(); + }); + CatalogEntity storedCatalog = + entityStore.get( + updatedCatalog.nameIdentifier(), Entity.EntityType.CATALOG, CatalogEntity.class); + assertEquals(catalog.id(), storedCatalog.id()); + assertEquals("test_catalog2", updatedCatalog.name()); + assertEquals("this is catalog test 2", updatedCatalog.getComment()); + assertEquals(changedAuditInfo.creator(), updatedCatalog.auditInfo().creator()); + + CatalogEntity catalog3 = + createCatalog( + 3L, "test_catalog3", Namespace.ofCatalog("test_metalake2"), "this is catalog test 3"); + entityStore.put(catalog3, false); + assertThrows( + EntityAlreadyExistsException.class, + () -> + entityStore.update( + catalog3.nameIdentifier(), + CatalogEntity.class, + Entity.EntityType.CATALOG, + c -> { + CatalogEntity.Builder builder = + CatalogEntity.builder() + .withId(catalog3.id()) + .withName("test_catalog2") + .withNamespace(Namespace.ofCatalog(updatedMetalake.name())) + .withType(Catalog.Type.RELATIONAL) + .withProvider("test") + .withComment(catalog3.getComment()) + .withProperties(new HashMap<>()) + .withAuditInfo((AuditInfo) c.auditInfo()); + return builder.build(); + })); + } + + @Test + public void testPutAndExists() throws IOException, InterruptedException { + // metalake + BaseMetalake metalake = createMetalake(1L, "test_metalake", "this is test"); + entityStore.put(metalake, false); + assertTrue(entityStore.exists(metalake.nameIdentifier(), Entity.EntityType.METALAKE)); + + // catalog + CatalogEntity catalog = + createCatalog(1L, "test_catalog", Namespace.ofCatalog("test_metalake"), "this is test"); + entityStore.put(catalog, false); + assertTrue(entityStore.exists(catalog.nameIdentifier(), Entity.EntityType.CATALOG)); } private static BaseMetalake createMetalake(Long id, String name, String comment) { @@ -260,6 +499,33 @@ private static boolean checkMetalakeEquals(BaseMetalake expected, BaseMetalake a && expected.getVersion().equals(actual.getVersion()); } + private static CatalogEntity createCatalog( + Long id, String name, Namespace namespace, String comment) { + AuditInfo auditInfo = + AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build(); + return CatalogEntity.builder() + .withId(id) + .withName(name) + .withNamespace(namespace) + .withType(Catalog.Type.RELATIONAL) + .withProvider("test") + .withComment(comment) + .withProperties(new HashMap<>()) + .withAuditInfo(auditInfo) + .build(); + } + + private static boolean checkCatalogEquals(CatalogEntity expected, CatalogEntity actual) { + return expected.id().equals(actual.id()) + && expected.name().equals(actual.name()) + && expected.namespace().equals(actual.namespace()) + && expected.getType().equals(actual.getType()) + && expected.getProvider().equals(actual.getProvider()) + && expected.getProperties() != null + && expected.getProperties().equals(actual.getProperties()) + && expected.auditInfo().equals(actual.auditInfo()); + } + private static void truncateAllTables() { try (SqlSession sqlSession = SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true)) { diff --git a/core/src/test/java/com/datastrato/gravitino/storage/relational/utils/TestPOConverters.java b/core/src/test/java/com/datastrato/gravitino/storage/relational/utils/TestPOConverters.java index 285e756c578..544b5e713e6 100644 --- a/core/src/test/java/com/datastrato/gravitino/storage/relational/utils/TestPOConverters.java +++ b/core/src/test/java/com/datastrato/gravitino/storage/relational/utils/TestPOConverters.java @@ -7,10 +7,14 @@ import static org.junit.jupiter.api.Assertions.assertEquals; +import com.datastrato.gravitino.Catalog; +import com.datastrato.gravitino.Namespace; import com.datastrato.gravitino.json.JsonUtils; import com.datastrato.gravitino.meta.AuditInfo; import com.datastrato.gravitino.meta.BaseMetalake; +import com.datastrato.gravitino.meta.CatalogEntity; import com.datastrato.gravitino.meta.SchemaVersion; +import com.datastrato.gravitino.storage.relational.po.CatalogPO; import com.datastrato.gravitino.storage.relational.po.MetalakePO; import com.fasterxml.jackson.core.JsonProcessingException; import java.time.Instant; @@ -46,6 +50,28 @@ public void testFromMetalakePO() throws JsonProcessingException { assertEquals(expectedMetalake.getVersion(), convertedMetalake.getVersion()); } + @Test + public void testFromCatalogPO() throws JsonProcessingException { + CatalogPO catalogPO = createCatalogPO(1L, "test", 1L, "this is test"); + + CatalogEntity expectedCatalog = + createCatalog(1L, "test", Namespace.ofCatalog("test_metalake"), "this is test"); + + CatalogEntity convertedCatalog = + POConverters.fromCatalogPO(catalogPO, Namespace.ofCatalog("test_metalake")); + + // Assert + assertEquals(expectedCatalog.id(), convertedCatalog.id()); + assertEquals(expectedCatalog.name(), convertedCatalog.name()); + assertEquals(expectedCatalog.getComment(), convertedCatalog.getComment()); + assertEquals(expectedCatalog.getType(), convertedCatalog.getType()); + assertEquals(expectedCatalog.getProvider(), convertedCatalog.getProvider()); + assertEquals(expectedCatalog.namespace(), convertedCatalog.namespace()); + assertEquals( + expectedCatalog.getProperties().get("key"), convertedCatalog.getProperties().get("key")); + assertEquals(expectedCatalog.auditInfo().creator(), convertedCatalog.auditInfo().creator()); + } + @Test public void testFromMetalakePOs() throws JsonProcessingException { MetalakePO metalakePO1 = createMetalakePO(1L, "test", "this is test"); @@ -74,16 +100,59 @@ public void testFromMetalakePOs() throws JsonProcessingException { } @Test - public void testInitMetalakePOVersion() throws JsonProcessingException { - BaseMetalake metalakePO = createMetalake(1L, "test", "this is test"); - MetalakePO initPO = POConverters.initializeMetalakePOWithVersion(metalakePO); + public void testFromCatalogPOs() throws JsonProcessingException { + CatalogPO catalogPO1 = createCatalogPO(1L, "test", 1L, "this is test"); + CatalogPO catalogPO2 = createCatalogPO(2L, "test2", 1L, "this is test2"); + List catalogPOs = new ArrayList<>(Arrays.asList(catalogPO1, catalogPO2)); + List convertedCatalogs = + POConverters.fromCatalogPOs(catalogPOs, Namespace.ofCatalog("test_metalake")); + + CatalogEntity expectedCatalog1 = + createCatalog(1L, "test", Namespace.ofCatalog("test_metalake"), "this is test"); + CatalogEntity expectedCatalog2 = + createCatalog(2L, "test2", Namespace.ofCatalog("test_metalake"), "this is test2"); + List expectedCatalogs = + new ArrayList<>(Arrays.asList(expectedCatalog1, expectedCatalog2)); + + // Assert + int index = 0; + for (CatalogEntity catalog : convertedCatalogs) { + assertEquals(expectedCatalogs.get(index).id(), catalog.id()); + assertEquals(expectedCatalogs.get(index).name(), catalog.name()); + assertEquals(expectedCatalogs.get(index).getComment(), catalog.getComment()); + assertEquals(expectedCatalogs.get(index).getType(), catalog.getType()); + assertEquals(expectedCatalogs.get(index).getProvider(), catalog.getProvider()); + assertEquals(expectedCatalogs.get(index).namespace(), catalog.namespace()); + assertEquals( + expectedCatalogs.get(index).getProperties().get("key"), + catalog.getProperties().get("key")); + assertEquals( + expectedCatalogs.get(index).auditInfo().creator(), catalog.auditInfo().creator()); + index++; + } + } + + @Test + public void testInitMetalakePOVersion() { + BaseMetalake metalake = createMetalake(1L, "test", "this is test"); + MetalakePO initPO = POConverters.initializeMetalakePOWithVersion(metalake); + assertEquals(1, initPO.getCurrentVersion()); + assertEquals(1, initPO.getLastVersion()); + assertEquals(0, initPO.getDeletedAt()); + } + + @Test + public void testInitCatalogPOVersion() { + CatalogEntity catalog = + createCatalog(1L, "test", Namespace.ofCatalog("test_metalake"), "this is test"); + CatalogPO initPO = POConverters.initializeCatalogPOWithVersion(catalog, 1L); assertEquals(1, initPO.getCurrentVersion()); assertEquals(1, initPO.getLastVersion()); assertEquals(0, initPO.getDeletedAt()); } @Test - public void testUpdateMetalakePOVersion() throws JsonProcessingException { + public void testUpdateMetalakePOVersion() { BaseMetalake metalake = createMetalake(1L, "test", "this is test"); BaseMetalake updatedMetalake = createMetalake(1L, "test", "this is test2"); MetalakePO initPO = POConverters.initializeMetalakePOWithVersion(metalake); @@ -95,20 +164,17 @@ public void testUpdateMetalakePOVersion() throws JsonProcessingException { } @Test - public void testToMetalakePO() throws JsonProcessingException { - BaseMetalake metalake = createMetalake(1L, "test", "this is test"); - - MetalakePO expectedMetalakePO = createMetalakePO(1L, "test", "this is test"); - - MetalakePO actualMetalakePO = POConverters.toMetalakePO(metalake); - - // Assert - assertEquals(expectedMetalakePO.getMetalakeId(), actualMetalakePO.getMetalakeId()); - assertEquals(expectedMetalakePO.getMetalakeName(), actualMetalakePO.getMetalakeName()); - assertEquals(expectedMetalakePO.getMetalakeComment(), actualMetalakePO.getMetalakeComment()); - assertEquals(expectedMetalakePO.getProperties(), actualMetalakePO.getProperties()); - assertEquals(expectedMetalakePO.getAuditInfo(), actualMetalakePO.getAuditInfo()); - assertEquals(expectedMetalakePO.getSchemaVersion(), actualMetalakePO.getSchemaVersion()); + public void testUpdateCatalogPOVersion() { + CatalogEntity catalog = + createCatalog(1L, "test", Namespace.ofCatalog("test_metalake"), "this is test"); + CatalogEntity updatedCatalog = + createCatalog(1L, "test", Namespace.ofCatalog("test_metalake"), "this is test2"); + CatalogPO initPO = POConverters.initializeCatalogPOWithVersion(catalog, 1L); + CatalogPO updatePO = POConverters.updateCatalogPOWithVersion(initPO, updatedCatalog, 1L); + assertEquals(1, initPO.getCurrentVersion()); + assertEquals(1, initPO.getLastVersion()); + assertEquals(0, initPO.getDeletedAt()); + assertEquals("this is test2", updatePO.getCatalogComment()); } private static BaseMetalake createMetalake(Long id, String name, String comment) { @@ -141,4 +207,40 @@ private static MetalakePO createMetalakePO(Long id, String name, String comment) .withSchemaVersion(JsonUtils.anyFieldMapper().writeValueAsString(SchemaVersion.V_0_1)) .build(); } + + private static CatalogEntity createCatalog( + Long id, String name, Namespace namespace, String comment) { + AuditInfo auditInfo = + AuditInfo.builder().withCreator("creator").withCreateTime(FIX_INSTANT).build(); + Map properties = new HashMap<>(); + properties.put("key", "value"); + return CatalogEntity.builder() + .withId(id) + .withName(name) + .withNamespace(namespace) + .withType(Catalog.Type.RELATIONAL) + .withProvider("test") + .withComment(comment) + .withProperties(properties) + .withAuditInfo(auditInfo) + .build(); + } + + private static CatalogPO createCatalogPO(Long id, String name, Long metalakeId, String comment) + throws JsonProcessingException { + AuditInfo auditInfo = + AuditInfo.builder().withCreator("creator").withCreateTime(FIX_INSTANT).build(); + Map properties = new HashMap<>(); + properties.put("key", "value"); + return new CatalogPO.Builder() + .withCatalogId(id) + .withCatalogName(name) + .withMetalakeId(metalakeId) + .withType(Catalog.Type.RELATIONAL.name()) + .withProvider("test") + .withCatalogComment(comment) + .withProperties(JsonUtils.anyFieldMapper().writeValueAsString(properties)) + .withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(auditInfo)) + .build(); + } } diff --git a/core/src/test/resources/h2/h2-init.sql b/core/src/test/resources/h2/h2-init.sql index 74d00874577..9f47ee11dfe 100644 --- a/core/src/test/resources/h2/h2-init.sql +++ b/core/src/test/resources/h2/h2-init.sql @@ -12,7 +12,25 @@ CREATE TABLE IF NOT EXISTS `metalake_meta` ( `schema_version` MEDIUMTEXT NOT NULL COMMENT 'metalake schema version info', `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'metalake current version', `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'metalake last version', - `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 NULL COMMENT 'metalake deleted at', + `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'metalake deleted at', PRIMARY KEY (metalake_id), CONSTRAINT uk_mn_del UNIQUE (metalake_name, deleted_at) -) ENGINE = InnoDB; \ No newline at end of file +) ENGINE = InnoDB; + + +CREATE TABLE IF NOT EXISTS `catalog_meta` +( + `catalog_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'catalog id', + `catalog_name` VARCHAR(128) NOT NULL COMMENT 'catalog name', + `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id', + `type` VARCHAR(64) NOT NULL COMMENT 'catalog type', + `provider` VARCHAR(64) NOT NULL COMMENT 'catalog provider', + `catalog_comment` VARCHAR(256) DEFAULT '' COMMENT 'catalog comment', + `properties` MEDIUMTEXT DEFAULT NULL COMMENT 'catalog properties', + `audit_info` MEDIUMTEXT NOT NULL COMMENT 'catalog audit info', + `current_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'catalog current version', + `last_version` INT UNSIGNED NOT NULL DEFAULT 1 COMMENT 'catalog last version', + `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'catalog deleted at', + PRIMARY KEY (catalog_id), + CONSTRAINT uk_mid_cn_del UNIQUE (metalake_id, catalog_name, deleted_at) +) ENGINE=InnoDB; \ No newline at end of file