Skip to content

Commit

Permalink
refactor the catalog code
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaojiebao committed Feb 26, 2024
1 parent 11f4ed6 commit bb5334e
Show file tree
Hide file tree
Showing 11 changed files with 784 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package com.datastrato.gravitino.storage.relational;

import com.datastrato.gravitino.Config;
import com.datastrato.gravitino.Configs;
import com.datastrato.gravitino.Entity;
import com.datastrato.gravitino.EntityAlreadyExistsException;
import com.datastrato.gravitino.HasIdentifier;
Expand All @@ -15,6 +16,8 @@
import com.datastrato.gravitino.exceptions.AlreadyExistsException;
import com.datastrato.gravitino.exceptions.NoSuchEntityException;
import com.datastrato.gravitino.meta.BaseMetalake;
import com.datastrato.gravitino.meta.CatalogEntity;
import com.datastrato.gravitino.storage.relational.service.CatalogMetaService;
import com.datastrato.gravitino.storage.relational.service.MetalakeMetaService;
import com.datastrato.gravitino.storage.relational.session.SqlSessionFactoryHelper;
import java.io.IOException;
Expand All @@ -41,6 +44,8 @@ public <E extends Entity & HasIdentifier> List<E> list(
switch (entityType) {
case METALAKE:
return (List<E>) MetalakeMetaService.getInstance().listMetalakes();
case CATALOG:
return (List<E>) CatalogMetaService.getInstance().listCatalogsByNamespace(namespace);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for list operation", entityType);
Expand All @@ -62,6 +67,8 @@ public <E extends Entity & HasIdentifier> void insert(E e, boolean overwritten)
throws EntityAlreadyExistsException {
if (e instanceof BaseMetalake) {
MetalakeMetaService.getInstance().insertMetalake((BaseMetalake) e, overwritten);
} else if (e instanceof CatalogEntity) {
CatalogMetaService.getInstance().insertCatalog((CatalogEntity) e, overwritten);
} else {
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for insert operation", e.getClass());
Expand All @@ -75,6 +82,8 @@ public <E extends Entity & HasIdentifier> E update(
switch (entityType) {
case METALAKE:
return (E) MetalakeMetaService.getInstance().updateMetalake(ident, updater);
case CATALOG:
return (E) CatalogMetaService.getInstance().updateCatalog(ident, updater);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for update operation", entityType);
Expand All @@ -86,7 +95,9 @@ public <E extends Entity & HasIdentifier> E get(
NameIdentifier ident, Entity.EntityType entityType) throws NoSuchEntityException {
switch (entityType) {
case METALAKE:
return (E) MetalakeMetaService.getInstance().getMetalakeByIdent(ident);
return (E) MetalakeMetaService.getInstance().getMetalakeByIdentifier(ident);
case CATALOG:
return (E) CatalogMetaService.getInstance().getCatalogByIdentifier(ident);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for get operation", entityType);
Expand All @@ -98,6 +109,8 @@ public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolea
switch (entityType) {
case METALAKE:
return MetalakeMetaService.getInstance().deleteMetalake(ident, cascade);
case CATALOG:
return CatalogMetaService.getInstance().deleteCatalog(ident, cascade);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for delete operation", entityType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import com.datastrato.gravitino.storage.relational.po.CatalogPO;
import java.util.List;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
Expand All @@ -17,61 +16,74 @@ public interface CatalogMetaMapper {
String TABLE_NAME = "catalog_meta";

@Select(
"SELECT id, catalog_name as catalogName, metalake_id as metalakeId,"
+ " type, provider, catalog_comment as catalogComment,"
+ " properties, audit_info as auditInfo"
"SELECT catalog_id as catalogId, catalog_name as catalogName,"
+ " metalake_id as metalakeId, type, provider,"
+ " catalog_comment as catalogComment, properties, audit_info as auditInfo,"
+ " current_version as currentVersion, last_version as lastVersion,"
+ " deleted_at as deletedAt"
+ " FROM "
+ TABLE_NAME
+ " WHERE metalake_id = #{metalakeId}")
+ " WHERE metalake_id = #{metalakeId} AND deleted_at = 0")
List<CatalogPO> listCatalogPOsByMetalakeId(@Param("metalakeId") Long metalakeId);

@Select(
"SELECT id FROM "
"SELECT catalog_id as catalogId FROM "
+ TABLE_NAME
+ " WHERE catalog_name = #{catalogName} and metalake_id = #{metalakeId}")
Long selectCatalogIdByNameAndMetalakeId(
@Param("catalogName") String name, @Param("metalakeId") Long metalakeId);
+ " WHERE metalake_id = #{metalakeId} AND catalog_name = #{catalogName} AND deleted_at = 0")
Long selectCatalogIdByMetalakeIdAndName(
@Param("metalakeId") Long metalakeId, @Param("catalogName") String name);

@Select(
"SELECT id, catalog_name as catalogName,"
"SELECT catalog_id as catalogId, catalog_name as catalogName,"
+ " metalake_id as metalakeId, type, provider,"
+ " catalog_comment as catalogComment, properties,"
+ " audit_info as auditInfo"
+ " catalog_comment as catalogComment, properties, audit_info as auditInfo,"
+ " current_version as currentVersion, last_version as lastVersion,"
+ " deleted_at as deletedAt"
+ " FROM "
+ TABLE_NAME
+ " WHERE catalog_name = #{catalogName} and metalake_id = #{metalakeId}")
CatalogPO selectCatalogMetaByNameAndMetalakeId(
@Param("catalogName") String name, @Param("metalakeId") Long metalakeId);
+ " WHERE metalake_id = #{metalakeId} AND catalog_name = #{catalogName} AND deleted_at = 0")
CatalogPO selectCatalogMetaByMetalakeIdAndName(
@Param("metalakeId") Long metalakeId, @Param("catalogName") String name);

@Insert(
"INSERT INTO "
+ TABLE_NAME
+ "(id, catalog_name, metalake_id, type, provider, catalog_comment, properties, audit_info)"
+ "(catalog_id, catalog_name, metalake_id,"
+ " type, provider, catalog_comment, properties, audit_info,"
+ " current_version, last_version, deleted_at)"
+ " VALUES("
+ " #{catalogMeta.id},"
+ " #{catalogMeta.catalogId},"
+ " #{catalogMeta.catalogName},"
+ " #{catalogMeta.metalakeId},"
+ " #{catalogMeta.type},"
+ " #{catalogMeta.provider},"
+ " #{catalogMeta.catalogComment},"
+ " #{catalogMeta.properties},"
+ " #{catalogMeta.auditInfo}"
+ " #{catalogMeta.auditInfo},"
+ " #{catalogMeta.currentVersion},"
+ " #{catalogMeta.lastVersion},"
+ " #{catalogMeta.deletedAt}"
+ " )")
void insertCatalogMeta(@Param("catalogMeta") CatalogPO catalogPO);

@Insert(
"INSERT INTO "
+ TABLE_NAME
+ "(id, catalog_name, metalake_id, type, provider, metalake_comment, properties, audit_info)"
+ "(catalog_id, catalog_name, metalake_id,"
+ " type, provider, catalog_comment, properties, audit_info,"
+ " current_version, last_version, deleted_at)"
+ " VALUES("
+ " #{catalogMeta.id},"
+ " #{catalogMeta.catalogId},"
+ " #{catalogMeta.catalogName},"
+ " #{catalogMeta.metalakeId},"
+ " #{catalogMeta.type},"
+ " #{catalogMeta.provider},"
+ " #{catalogMeta.catalogComment},"
+ " #{catalogMeta.properties},"
+ " #{catalogMeta.auditInfo}"
+ " #{catalogMeta.auditInfo},"
+ " #{catalogMeta.currentVersion},"
+ " #{catalogMeta.lastVersion},"
+ " #{catalogMeta.deletedAt}"
+ " )"
+ " ON DUPLICATE KEY UPDATE"
+ " catalog_name = #{catalogMeta.catalogName},"
Expand All @@ -80,34 +92,51 @@ CatalogPO selectCatalogMetaByNameAndMetalakeId(
+ " provider = #{catalogMeta.provider},"
+ " catalog_comment = #{catalogMeta.catalogComment},"
+ " properties = #{catalogMeta.properties},"
+ " audit_info = #{catalogMeta.auditInfo}")
+ " audit_info = #{catalogMeta.auditInfo},"
+ " current_version = #{catalogMeta.currentVersion},"
+ " last_version = #{catalogMeta.lastVersion},"
+ " deleted_at = #{catalogMeta.deletedAt}")
void insertCatalogMetaOnDuplicateKeyUpdate(@Param("catalogMeta") CatalogPO catalogPO);

@Update(
"UPDATE "
+ TABLE_NAME
+ " SET catalog_name = #{newCatalogMeta.metalakeName},"
+ " SET catalog_name = #{newCatalogMeta.catalogName},"
+ " metalake_id = #{newCatalogMeta.metalakeId},"
+ " type = #{newCatalogMeta.type},"
+ " provider = #{newCatalogMeta.provider},"
+ " catalog_comment = #{newCatalogMeta.catalogComment},"
+ " properties = #{newCatalogMeta.properties},"
+ " audit_info = #{newCatalogMeta.auditInfo}"
+ " WHERE id = #{oldCatalogMeta.id}"
+ " and catalog_name = #{oldCatalogMeta.catalogName}"
+ " and metalake_id = #{oldCatalogMeta.metalakeId}"
+ " and type = #{oldCatalogMeta.type}"
+ " and provider = #{oldCatalogMeta.provider}"
+ " and catalog_comment = #{oldCatalogMeta.catalogComment}"
+ " and properties = #{oldCatalogMeta.properties}"
+ " and audit_info = #{oldCatalogMeta.auditInfo}")
+ " audit_info = #{newCatalogMeta.auditInfo},"
+ " current_version = #{newCatalogMeta.currentVersion},"
+ " last_version = #{newCatalogMeta.lastVersion},"
+ " deleted_at = #{newCatalogMeta.deletedAt}"
+ " WHERE catalog_id = #{oldCatalogMeta.catalogId}"
+ " AND catalog_name = #{oldCatalogMeta.catalogName}"
+ " AND metalake_id = #{oldCatalogMeta.metalakeId}"
+ " AND type = #{oldCatalogMeta.type}"
+ " AND provider = #{oldCatalogMeta.provider}"
+ " AND catalog_comment = #{oldCatalogMeta.catalogComment}"
+ " AND properties = #{oldCatalogMeta.properties}"
+ " AND audit_info = #{oldCatalogMeta.auditInfo}"
+ " AND current_version = #{oldCatalogMeta.currentVersion}"
+ " AND last_version = #{oldCatalogMeta.lastVersion}"
+ " AND deleted_at = 0")
Integer updateCatalogMeta(
@Param("newCatalogMeta") CatalogPO newCatalogPO,
@Param("oldCatalogMeta") CatalogPO oldCatalogPO);

@Delete("DELETE FROM " + TABLE_NAME + " WHERE id = #{id}")
Integer deleteCatalogMetasById(@Param("id") Long id);
@Update(
"UPDATE "
+ TABLE_NAME
+ " SET deleted_at = UNIX_TIMESTAMP()"
+ " WHERE catalog_id = #{catalogId} AND deleted_at = 0")
Integer softDeleteCatalogMetasByCatalogId(@Param("catalogId") Long catalogId);

@Delete("DELETE FROM " + TABLE_NAME + " WHERE metalake_id = #{metalakeId}")
Integer deleteCatalogMetasByMetalakeId(@Param("metalakeId") Long metalakeId);
@Update(
"UPDATE "
+ TABLE_NAME
+ " SET deleted_at = UNIX_TIMESTAMP()"
+ " WHERE metalake_id = #{metalakeId} AND deleted_at = 0")
Integer softDeleteCatalogMetasByMetalakeId(@Param("metalakeId") Long metalakeId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,21 +107,22 @@ public interface MetalakeMetaMapper {
+ " current_version = #{newMetalakeMeta.currentVersion},"
+ " last_version = #{newMetalakeMeta.lastVersion}"
+ " WHERE metalake_id = #{oldMetalakeMeta.metalakeId}"
+ " and metalake_name = #{oldMetalakeMeta.metalakeName}"
+ " and metalake_comment = #{oldMetalakeMeta.metalakeComment}"
+ " and properties = #{oldMetalakeMeta.properties}"
+ " and audit_info = #{oldMetalakeMeta.auditInfo}"
+ " and schema_version = #{oldMetalakeMeta.schemaVersion}"
+ " and current_version = #{oldMetalakeMeta.currentVersion}"
+ " and last_version = #{oldMetalakeMeta.lastVersion}"
+ " and deleted_at = 0")
+ " AND metalake_name = #{oldMetalakeMeta.metalakeName}"
+ " AND metalake_comment = #{oldMetalakeMeta.metalakeComment}"
+ " AND properties = #{oldMetalakeMeta.properties}"
+ " AND audit_info = #{oldMetalakeMeta.auditInfo}"
+ " AND schema_version = #{oldMetalakeMeta.schemaVersion}"
+ " AND current_version = #{oldMetalakeMeta.currentVersion}"
+ " AND last_version = #{oldMetalakeMeta.lastVersion}"
+ " AND deleted_at = 0")
Integer updateMetalakeMeta(
@Param("newMetalakeMeta") MetalakePO newMetalakePO,
@Param("oldMetalakeMeta") MetalakePO oldMetalakePO);

@Update(
"UPDATE "
+ TABLE_NAME
+ " SET deleted_at = UNIX_TIMESTAMP() WHERE metalake_id = #{metalakeId}")
+ " SET deleted_at = UNIX_TIMESTAMP()"
+ " WHERE metalake_id = #{metalakeId} AND deleted_at = 0")
Integer softDeleteMetalakeMetaByMetalakeId(@Param("metalakeId") Long metalakeId);
}
Loading

0 comments on commit bb5334e

Please sign in to comment.