Skip to content

Commit

Permalink
[#2081] feat(core): Add JDBC backend operations for catalog (#2078)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

The purpose of this PR is to implement JDBC backend operation `Catalog`
metadata. Depend on #1980 . Metadata operations of Schema, Table and
Fileset will be supported in the remaining PRs.

### Why are the changes needed?

Fix: #2081 

### How was this patch tested?

Add unit tests to test the catalog metadata ops.

---------

Co-authored-by: xiaojiebao <[email protected]>
  • Loading branch information
xloya and xiaojiebao authored Feb 28, 2024
1 parent f59c36f commit 8726cc5
Show file tree
Hide file tree
Showing 13 changed files with 1,152 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

/** This exception is thrown when an entity is not found. */
public class NoSuchEntityException extends RuntimeException {
/** The no such entity message for the exception. */
public static final String NO_SUCH_ENTITY_MESSAGE = "No such entity: %s";

/**
* Constructs a new NoSuchEntityException.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import com.datastrato.gravitino.exceptions.AlreadyExistsException;
import com.datastrato.gravitino.exceptions.NoSuchEntityException;
import com.datastrato.gravitino.meta.BaseMetalake;
import com.datastrato.gravitino.meta.CatalogEntity;
import com.datastrato.gravitino.storage.relational.service.CatalogMetaService;
import com.datastrato.gravitino.storage.relational.service.MetalakeMetaService;
import com.datastrato.gravitino.storage.relational.session.SqlSessionFactoryHelper;
import java.io.IOException;
Expand All @@ -42,6 +44,8 @@ public <E extends Entity & HasIdentifier> List<E> list(
switch (entityType) {
case METALAKE:
return (List<E>) MetalakeMetaService.getInstance().listMetalakes();
case CATALOG:
return (List<E>) CatalogMetaService.getInstance().listCatalogsByNamespace(namespace);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for list operation", entityType);
Expand All @@ -63,6 +67,8 @@ public <E extends Entity & HasIdentifier> void insert(E e, boolean overwritten)
throws EntityAlreadyExistsException {
if (e instanceof BaseMetalake) {
MetalakeMetaService.getInstance().insertMetalake((BaseMetalake) e, overwritten);
} else if (e instanceof CatalogEntity) {
CatalogMetaService.getInstance().insertCatalog((CatalogEntity) e, overwritten);
} else {
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for insert operation", e.getClass());
Expand All @@ -76,6 +82,8 @@ public <E extends Entity & HasIdentifier> E update(
switch (entityType) {
case METALAKE:
return (E) MetalakeMetaService.getInstance().updateMetalake(ident, updater);
case CATALOG:
return (E) CatalogMetaService.getInstance().updateCatalog(ident, updater);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for update operation", entityType);
Expand All @@ -87,7 +95,9 @@ public <E extends Entity & HasIdentifier> E get(
NameIdentifier ident, Entity.EntityType entityType) throws NoSuchEntityException {
switch (entityType) {
case METALAKE:
return (E) MetalakeMetaService.getInstance().getMetalakeByIdent(ident);
return (E) MetalakeMetaService.getInstance().getMetalakeByIdentifier(ident);
case CATALOG:
return (E) CatalogMetaService.getInstance().getCatalogByIdentifier(ident);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for get operation", entityType);
Expand All @@ -99,6 +109,8 @@ public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolea
switch (entityType) {
case METALAKE:
return MetalakeMetaService.getInstance().deleteMetalake(ident, cascade);
case CATALOG:
return CatalogMetaService.getInstance().deleteCatalog(ident, cascade);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for delete operation", entityType);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.storage.relational.mapper;

import com.datastrato.gravitino.storage.relational.po.CatalogPO;
import java.util.List;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;

/**
* A MyBatis Mapper for catalog meta operation SQLs.
*
* <p>This interface class is a specification defined by MyBatis. It requires this interface class
* to identify the corresponding SQLs for execution. We can write SQLs in an additional XML file, or
* write SQLs with annotations in this interface Mapper. See: <a
* href="https://mybatis.org/mybatis-3/getting-started.html"></a>
*/
public interface CatalogMetaMapper {
String TABLE_NAME = "catalog_meta";

@Select(
"SELECT catalog_id as catalogId, catalog_name as catalogName,"
+ " metalake_id as metalakeId, type, provider,"
+ " catalog_comment as catalogComment, properties, audit_info as auditInfo,"
+ " current_version as currentVersion, last_version as lastVersion,"
+ " deleted_at as deletedAt"
+ " FROM "
+ TABLE_NAME
+ " WHERE metalake_id = #{metalakeId} AND deleted_at = 0")
List<CatalogPO> listCatalogPOsByMetalakeId(@Param("metalakeId") Long metalakeId);

@Select(
"SELECT catalog_id as catalogId FROM "
+ TABLE_NAME
+ " WHERE metalake_id = #{metalakeId} AND catalog_name = #{catalogName} AND deleted_at = 0")
Long selectCatalogIdByMetalakeIdAndName(
@Param("metalakeId") Long metalakeId, @Param("catalogName") String name);

@Select(
"SELECT catalog_id as catalogId, catalog_name as catalogName,"
+ " metalake_id as metalakeId, type, provider,"
+ " catalog_comment as catalogComment, properties, audit_info as auditInfo,"
+ " current_version as currentVersion, last_version as lastVersion,"
+ " deleted_at as deletedAt"
+ " FROM "
+ TABLE_NAME
+ " WHERE metalake_id = #{metalakeId} AND catalog_name = #{catalogName} AND deleted_at = 0")
CatalogPO selectCatalogMetaByMetalakeIdAndName(
@Param("metalakeId") Long metalakeId, @Param("catalogName") String name);

@Insert(
"INSERT INTO "
+ TABLE_NAME
+ "(catalog_id, catalog_name, metalake_id,"
+ " type, provider, catalog_comment, properties, audit_info,"
+ " current_version, last_version, deleted_at)"
+ " VALUES("
+ " #{catalogMeta.catalogId},"
+ " #{catalogMeta.catalogName},"
+ " #{catalogMeta.metalakeId},"
+ " #{catalogMeta.type},"
+ " #{catalogMeta.provider},"
+ " #{catalogMeta.catalogComment},"
+ " #{catalogMeta.properties},"
+ " #{catalogMeta.auditInfo},"
+ " #{catalogMeta.currentVersion},"
+ " #{catalogMeta.lastVersion},"
+ " #{catalogMeta.deletedAt}"
+ " )")
void insertCatalogMeta(@Param("catalogMeta") CatalogPO catalogPO);

@Insert(
"INSERT INTO "
+ TABLE_NAME
+ "(catalog_id, catalog_name, metalake_id,"
+ " type, provider, catalog_comment, properties, audit_info,"
+ " current_version, last_version, deleted_at)"
+ " VALUES("
+ " #{catalogMeta.catalogId},"
+ " #{catalogMeta.catalogName},"
+ " #{catalogMeta.metalakeId},"
+ " #{catalogMeta.type},"
+ " #{catalogMeta.provider},"
+ " #{catalogMeta.catalogComment},"
+ " #{catalogMeta.properties},"
+ " #{catalogMeta.auditInfo},"
+ " #{catalogMeta.currentVersion},"
+ " #{catalogMeta.lastVersion},"
+ " #{catalogMeta.deletedAt}"
+ " )"
+ " ON DUPLICATE KEY UPDATE"
+ " catalog_name = #{catalogMeta.catalogName},"
+ " metalake_id = #{catalogMeta.metalakeId},"
+ " type = #{catalogMeta.type},"
+ " provider = #{catalogMeta.provider},"
+ " catalog_comment = #{catalogMeta.catalogComment},"
+ " properties = #{catalogMeta.properties},"
+ " audit_info = #{catalogMeta.auditInfo},"
+ " current_version = #{catalogMeta.currentVersion},"
+ " last_version = #{catalogMeta.lastVersion},"
+ " deleted_at = #{catalogMeta.deletedAt}")
void insertCatalogMetaOnDuplicateKeyUpdate(@Param("catalogMeta") CatalogPO catalogPO);

@Update(
"UPDATE "
+ TABLE_NAME
+ " SET catalog_name = #{newCatalogMeta.catalogName},"
+ " metalake_id = #{newCatalogMeta.metalakeId},"
+ " type = #{newCatalogMeta.type},"
+ " provider = #{newCatalogMeta.provider},"
+ " catalog_comment = #{newCatalogMeta.catalogComment},"
+ " properties = #{newCatalogMeta.properties},"
+ " audit_info = #{newCatalogMeta.auditInfo},"
+ " current_version = #{newCatalogMeta.currentVersion},"
+ " last_version = #{newCatalogMeta.lastVersion},"
+ " deleted_at = #{newCatalogMeta.deletedAt}"
+ " WHERE catalog_id = #{oldCatalogMeta.catalogId}"
+ " AND catalog_name = #{oldCatalogMeta.catalogName}"
+ " AND metalake_id = #{oldCatalogMeta.metalakeId}"
+ " AND type = #{oldCatalogMeta.type}"
+ " AND provider = #{oldCatalogMeta.provider}"
+ " AND catalog_comment = #{oldCatalogMeta.catalogComment}"
+ " AND properties = #{oldCatalogMeta.properties}"
+ " AND audit_info = #{oldCatalogMeta.auditInfo}"
+ " AND current_version = #{oldCatalogMeta.currentVersion}"
+ " AND last_version = #{oldCatalogMeta.lastVersion}"
+ " AND deleted_at = 0")
Integer updateCatalogMeta(
@Param("newCatalogMeta") CatalogPO newCatalogPO,
@Param("oldCatalogMeta") CatalogPO oldCatalogPO);

@Update(
"UPDATE "
+ TABLE_NAME
+ " SET deleted_at = UNIX_TIMESTAMP()"
+ " WHERE catalog_id = #{catalogId} AND deleted_at = 0")
Integer softDeleteCatalogMetasByCatalogId(@Param("catalogId") Long catalogId);

@Update(
"UPDATE "
+ TABLE_NAME
+ " SET deleted_at = UNIX_TIMESTAMP()"
+ " WHERE metalake_id = #{metalakeId} AND deleted_at = 0")
Integer softDeleteCatalogMetasByMetalakeId(@Param("metalakeId") Long metalakeId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,21 +107,22 @@ public interface MetalakeMetaMapper {
+ " current_version = #{newMetalakeMeta.currentVersion},"
+ " last_version = #{newMetalakeMeta.lastVersion}"
+ " WHERE metalake_id = #{oldMetalakeMeta.metalakeId}"
+ " and metalake_name = #{oldMetalakeMeta.metalakeName}"
+ " and metalake_comment = #{oldMetalakeMeta.metalakeComment}"
+ " and properties = #{oldMetalakeMeta.properties}"
+ " and audit_info = #{oldMetalakeMeta.auditInfo}"
+ " and schema_version = #{oldMetalakeMeta.schemaVersion}"
+ " and current_version = #{oldMetalakeMeta.currentVersion}"
+ " and last_version = #{oldMetalakeMeta.lastVersion}"
+ " and deleted_at = 0")
+ " AND metalake_name = #{oldMetalakeMeta.metalakeName}"
+ " AND metalake_comment = #{oldMetalakeMeta.metalakeComment}"
+ " AND properties = #{oldMetalakeMeta.properties}"
+ " AND audit_info = #{oldMetalakeMeta.auditInfo}"
+ " AND schema_version = #{oldMetalakeMeta.schemaVersion}"
+ " AND current_version = #{oldMetalakeMeta.currentVersion}"
+ " AND last_version = #{oldMetalakeMeta.lastVersion}"
+ " AND deleted_at = 0")
Integer updateMetalakeMeta(
@Param("newMetalakeMeta") MetalakePO newMetalakePO,
@Param("oldMetalakeMeta") MetalakePO oldMetalakePO);

@Update(
"UPDATE "
+ TABLE_NAME
+ " SET deleted_at = UNIX_TIMESTAMP() WHERE metalake_id = #{metalakeId}")
+ " SET deleted_at = UNIX_TIMESTAMP()"
+ " WHERE metalake_id = #{metalakeId} AND deleted_at = 0")
Integer softDeleteMetalakeMetaByMetalakeId(@Param("metalakeId") Long metalakeId);
}
Loading

0 comments on commit 8726cc5

Please sign in to comment.