Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2081] feat(core): Add JDBC backend operations for catalog #2078

Merged
merged 35 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
4432c85
add rel interface
Feb 2, 2024
ca62a8a
fix the comment
Feb 4, 2024
b29b0b9
add the java doc for RelationBackend
Feb 4, 2024
0f28556
fix comments
Feb 5, 2024
eb70ecf
add the code skeleton for mysql backend operating metalake
Feb 2, 2024
a464241
fix update
Feb 4, 2024
3f29d1f
add unit test
Feb 5, 2024
5cc6d40
fix comments
Feb 6, 2024
4a152c8
fix comments and uts
Feb 6, 2024
42ce5ca
fix some comments
Feb 6, 2024
ff736d8
rename to jdbc backend
Feb 17, 2024
146256d
fix comments
Feb 17, 2024
45c88b2
fix comments
Feb 18, 2024
5a45836
fix comments
Feb 18, 2024
956b5a6
fix comments
Feb 18, 2024
dd806c6
remove type config
Feb 19, 2024
fdffa5c
refactor the schema
Feb 22, 2024
4130dce
fix the import order
xloya Feb 22, 2024
e5c25f2
update
xloya Feb 22, 2024
7760554
fix comments
Feb 25, 2024
ea83dcc
add the code skeleton for mysql backend operating metalake
Feb 2, 2024
97a00c7
fix comments and uts
Feb 6, 2024
5ad6b8d
fix some comments
Feb 6, 2024
723541f
rename to jdbc backend
Feb 17, 2024
f004dc3
fix comments
Feb 17, 2024
3057e0b
fix comments
Feb 6, 2024
51f45f4
add the code skeleton for mysql backend operating metalake
Feb 2, 2024
38403be
fix update
Feb 4, 2024
0740200
add unit test
Feb 5, 2024
11f4ed6
add mysql backend ops for catalog
Feb 5, 2024
bb5334e
refactor the catalog code
Feb 26, 2024
fada86d
Merge branch 'main' into add-mysql-backend-ops-for-catalog
Feb 26, 2024
70a91d0
fix comments
Feb 27, 2024
25a60b2
catch constraint exception when update
Feb 27, 2024
4adcac8
add ut for update unique key constraints
Feb 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

/** This exception is thrown when an entity is not found. */
public class NoSuchEntityException extends RuntimeException {
/** The no such an entity message for the exception. */
public static final String NO_SUCH_AN_ENTITY_MESSAGE = "No such an entity: %s";

/**
* Constructs a new NoSuchEntityException.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import com.datastrato.gravitino.exceptions.AlreadyExistsException;
import com.datastrato.gravitino.exceptions.NoSuchEntityException;
import com.datastrato.gravitino.meta.BaseMetalake;
import com.datastrato.gravitino.meta.CatalogEntity;
import com.datastrato.gravitino.storage.relational.service.CatalogMetaService;
import com.datastrato.gravitino.storage.relational.service.MetalakeMetaService;
import com.datastrato.gravitino.storage.relational.session.SqlSessionFactoryHelper;
import java.io.IOException;
Expand All @@ -42,6 +44,8 @@ public <E extends Entity & HasIdentifier> List<E> list(
switch (entityType) {
case METALAKE:
return (List<E>) MetalakeMetaService.getInstance().listMetalakes();
case CATALOG:
return (List<E>) CatalogMetaService.getInstance().listCatalogsByNamespace(namespace);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for list operation", entityType);
Expand All @@ -63,6 +67,8 @@ public <E extends Entity & HasIdentifier> void insert(E e, boolean overwritten)
throws EntityAlreadyExistsException {
if (e instanceof BaseMetalake) {
MetalakeMetaService.getInstance().insertMetalake((BaseMetalake) e, overwritten);
} else if (e instanceof CatalogEntity) {
CatalogMetaService.getInstance().insertCatalog((CatalogEntity) e, overwritten);
} else {
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for insert operation", e.getClass());
Expand All @@ -76,6 +82,8 @@ public <E extends Entity & HasIdentifier> E update(
switch (entityType) {
case METALAKE:
return (E) MetalakeMetaService.getInstance().updateMetalake(ident, updater);
case CATALOG:
return (E) CatalogMetaService.getInstance().updateCatalog(ident, updater);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for update operation", entityType);
Expand All @@ -87,7 +95,9 @@ public <E extends Entity & HasIdentifier> E get(
NameIdentifier ident, Entity.EntityType entityType) throws NoSuchEntityException {
switch (entityType) {
case METALAKE:
return (E) MetalakeMetaService.getInstance().getMetalakeByIdent(ident);
return (E) MetalakeMetaService.getInstance().getMetalakeByIdentifier(ident);
case CATALOG:
return (E) CatalogMetaService.getInstance().getCatalogByIdentifier(ident);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for get operation", entityType);
Expand All @@ -99,6 +109,8 @@ public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolea
switch (entityType) {
case METALAKE:
return MetalakeMetaService.getInstance().deleteMetalake(ident, cascade);
case CATALOG:
return CatalogMetaService.getInstance().deleteCatalog(ident, cascade);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for delete operation", entityType);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.storage.relational.mapper;

import com.datastrato.gravitino.storage.relational.po.CatalogPO;
import java.util.List;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;

/**
* A MyBatis Mapper for catalog meta operation SQLs.
*
* <p>This interface class is a specification defined by MyBatis. It requires this interface class
* to identify the corresponding SQLs for execution. We can write SQLs in an additional XML file, or
* write SQLs with annotations in this interface Mapper. See: <a
* href="https://mybatis.org/mybatis-3/getting-started.html"></a>
*/
public interface CatalogMetaMapper {
String TABLE_NAME = "catalog_meta";

@Select(
"SELECT catalog_id as catalogId, catalog_name as catalogName,"
+ " metalake_id as metalakeId, type, provider,"
+ " catalog_comment as catalogComment, properties, audit_info as auditInfo,"
+ " current_version as currentVersion, last_version as lastVersion,"
+ " deleted_at as deletedAt"
+ " FROM "
+ TABLE_NAME
+ " WHERE metalake_id = #{metalakeId} AND deleted_at = 0")
List<CatalogPO> listCatalogPOsByMetalakeId(@Param("metalakeId") Long metalakeId);

@Select(
"SELECT catalog_id as catalogId FROM "
+ TABLE_NAME
+ " WHERE metalake_id = #{metalakeId} AND catalog_name = #{catalogName} AND deleted_at = 0")
Long selectCatalogIdByMetalakeIdAndName(
@Param("metalakeId") Long metalakeId, @Param("catalogName") String name);

@Select(
"SELECT catalog_id as catalogId, catalog_name as catalogName,"
+ " metalake_id as metalakeId, type, provider,"
+ " catalog_comment as catalogComment, properties, audit_info as auditInfo,"
+ " current_version as currentVersion, last_version as lastVersion,"
+ " deleted_at as deletedAt"
+ " FROM "
+ TABLE_NAME
+ " WHERE metalake_id = #{metalakeId} AND catalog_name = #{catalogName} AND deleted_at = 0")
CatalogPO selectCatalogMetaByMetalakeIdAndName(
@Param("metalakeId") Long metalakeId, @Param("catalogName") String name);

@Insert(
"INSERT INTO "
+ TABLE_NAME
+ "(catalog_id, catalog_name, metalake_id,"
+ " type, provider, catalog_comment, properties, audit_info,"
+ " current_version, last_version, deleted_at)"
+ " VALUES("
+ " #{catalogMeta.catalogId},"
+ " #{catalogMeta.catalogName},"
+ " #{catalogMeta.metalakeId},"
+ " #{catalogMeta.type},"
+ " #{catalogMeta.provider},"
+ " #{catalogMeta.catalogComment},"
+ " #{catalogMeta.properties},"
+ " #{catalogMeta.auditInfo},"
+ " #{catalogMeta.currentVersion},"
+ " #{catalogMeta.lastVersion},"
+ " #{catalogMeta.deletedAt}"
+ " )")
void insertCatalogMeta(@Param("catalogMeta") CatalogPO catalogPO);

@Insert(
"INSERT INTO "
+ TABLE_NAME
+ "(catalog_id, catalog_name, metalake_id,"
+ " type, provider, catalog_comment, properties, audit_info,"
+ " current_version, last_version, deleted_at)"
+ " VALUES("
+ " #{catalogMeta.catalogId},"
+ " #{catalogMeta.catalogName},"
+ " #{catalogMeta.metalakeId},"
+ " #{catalogMeta.type},"
+ " #{catalogMeta.provider},"
+ " #{catalogMeta.catalogComment},"
+ " #{catalogMeta.properties},"
+ " #{catalogMeta.auditInfo},"
+ " #{catalogMeta.currentVersion},"
+ " #{catalogMeta.lastVersion},"
+ " #{catalogMeta.deletedAt}"
+ " )"
+ " ON DUPLICATE KEY UPDATE"
+ " catalog_name = #{catalogMeta.catalogName},"
+ " metalake_id = #{catalogMeta.metalakeId},"
+ " type = #{catalogMeta.type},"
+ " provider = #{catalogMeta.provider},"
+ " catalog_comment = #{catalogMeta.catalogComment},"
+ " properties = #{catalogMeta.properties},"
+ " audit_info = #{catalogMeta.auditInfo},"
+ " current_version = #{catalogMeta.currentVersion},"
+ " last_version = #{catalogMeta.lastVersion},"
+ " deleted_at = #{catalogMeta.deletedAt}")
void insertCatalogMetaOnDuplicateKeyUpdate(@Param("catalogMeta") CatalogPO catalogPO);

@Update(
"UPDATE "
+ TABLE_NAME
+ " SET catalog_name = #{newCatalogMeta.catalogName},"
+ " metalake_id = #{newCatalogMeta.metalakeId},"
+ " type = #{newCatalogMeta.type},"
+ " provider = #{newCatalogMeta.provider},"
+ " catalog_comment = #{newCatalogMeta.catalogComment},"
+ " properties = #{newCatalogMeta.properties},"
+ " audit_info = #{newCatalogMeta.auditInfo},"
+ " current_version = #{newCatalogMeta.currentVersion},"
+ " last_version = #{newCatalogMeta.lastVersion},"
+ " deleted_at = #{newCatalogMeta.deletedAt}"
+ " WHERE catalog_id = #{oldCatalogMeta.catalogId}"
+ " AND catalog_name = #{oldCatalogMeta.catalogName}"
+ " AND metalake_id = #{oldCatalogMeta.metalakeId}"
+ " AND type = #{oldCatalogMeta.type}"
+ " AND provider = #{oldCatalogMeta.provider}"
+ " AND catalog_comment = #{oldCatalogMeta.catalogComment}"
+ " AND properties = #{oldCatalogMeta.properties}"
+ " AND audit_info = #{oldCatalogMeta.auditInfo}"
+ " AND current_version = #{oldCatalogMeta.currentVersion}"
+ " AND last_version = #{oldCatalogMeta.lastVersion}"
+ " AND deleted_at = 0")
Integer updateCatalogMeta(
@Param("newCatalogMeta") CatalogPO newCatalogPO,
@Param("oldCatalogMeta") CatalogPO oldCatalogPO);

@Update(
"UPDATE "
+ TABLE_NAME
+ " SET deleted_at = UNIX_TIMESTAMP()"
+ " WHERE catalog_id = #{catalogId} AND deleted_at = 0")
Integer softDeleteCatalogMetasByCatalogId(@Param("catalogId") Long catalogId);

@Update(
"UPDATE "
+ TABLE_NAME
+ " SET deleted_at = UNIX_TIMESTAMP()"
+ " WHERE metalake_id = #{metalakeId} AND deleted_at = 0")
Integer softDeleteCatalogMetasByMetalakeId(@Param("metalakeId") Long metalakeId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,21 +107,22 @@ public interface MetalakeMetaMapper {
+ " current_version = #{newMetalakeMeta.currentVersion},"
+ " last_version = #{newMetalakeMeta.lastVersion}"
+ " WHERE metalake_id = #{oldMetalakeMeta.metalakeId}"
+ " and metalake_name = #{oldMetalakeMeta.metalakeName}"
+ " and metalake_comment = #{oldMetalakeMeta.metalakeComment}"
+ " and properties = #{oldMetalakeMeta.properties}"
+ " and audit_info = #{oldMetalakeMeta.auditInfo}"
+ " and schema_version = #{oldMetalakeMeta.schemaVersion}"
+ " and current_version = #{oldMetalakeMeta.currentVersion}"
+ " and last_version = #{oldMetalakeMeta.lastVersion}"
+ " and deleted_at = 0")
+ " AND metalake_name = #{oldMetalakeMeta.metalakeName}"
+ " AND metalake_comment = #{oldMetalakeMeta.metalakeComment}"
+ " AND properties = #{oldMetalakeMeta.properties}"
+ " AND audit_info = #{oldMetalakeMeta.auditInfo}"
+ " AND schema_version = #{oldMetalakeMeta.schemaVersion}"
+ " AND current_version = #{oldMetalakeMeta.currentVersion}"
+ " AND last_version = #{oldMetalakeMeta.lastVersion}"
+ " AND deleted_at = 0")
Integer updateMetalakeMeta(
@Param("newMetalakeMeta") MetalakePO newMetalakePO,
@Param("oldMetalakeMeta") MetalakePO oldMetalakePO);

@Update(
"UPDATE "
+ TABLE_NAME
+ " SET deleted_at = UNIX_TIMESTAMP() WHERE metalake_id = #{metalakeId}")
+ " SET deleted_at = UNIX_TIMESTAMP()"
+ " WHERE metalake_id = #{metalakeId} AND deleted_at = 0")
Integer softDeleteMetalakeMetaByMetalakeId(@Param("metalakeId") Long metalakeId);
}
Loading
Loading