Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2084] feat(core): Add JDBC backend operations for fileset #2389

Merged
merged 66 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
4432c85
add rel interface
Feb 2, 2024
ca62a8a
fix the comment
Feb 4, 2024
b29b0b9
add the java doc for RelationBackend
Feb 4, 2024
0f28556
fix comments
Feb 5, 2024
eb70ecf
add the code skeleton for mysql backend operating metalake
Feb 2, 2024
a464241
fix update
Feb 4, 2024
3f29d1f
add unit test
Feb 5, 2024
5cc6d40
fix comments
Feb 6, 2024
4a152c8
fix comments and uts
Feb 6, 2024
42ce5ca
fix some comments
Feb 6, 2024
ff736d8
rename to jdbc backend
Feb 17, 2024
146256d
fix comments
Feb 17, 2024
45c88b2
fix comments
Feb 18, 2024
5a45836
fix comments
Feb 18, 2024
956b5a6
fix comments
Feb 18, 2024
dd806c6
remove type config
Feb 19, 2024
fdffa5c
refactor the schema
Feb 22, 2024
4130dce
fix the import order
xloya Feb 22, 2024
e5c25f2
update
xloya Feb 22, 2024
7760554
fix comments
Feb 25, 2024
ea83dcc
add the code skeleton for mysql backend operating metalake
Feb 2, 2024
97a00c7
fix comments and uts
Feb 6, 2024
5ad6b8d
fix some comments
Feb 6, 2024
723541f
rename to jdbc backend
Feb 17, 2024
f004dc3
fix comments
Feb 17, 2024
3057e0b
fix comments
Feb 6, 2024
51f45f4
add the code skeleton for mysql backend operating metalake
Feb 2, 2024
38403be
fix update
Feb 4, 2024
0740200
add unit test
Feb 5, 2024
11f4ed6
add mysql backend ops for catalog
Feb 5, 2024
bb5334e
refactor the catalog code
Feb 26, 2024
fada86d
Merge branch 'main' into add-mysql-backend-ops-for-catalog
Feb 26, 2024
70a91d0
fix comments
Feb 27, 2024
25a60b2
catch constraint exception when update
Feb 27, 2024
4adcac8
add ut for update unique key constraints
Feb 27, 2024
5fa006a
add the code skeleton
Feb 27, 2024
f15595e
support schema ops
Feb 28, 2024
1d10a0f
Merge branch 'main' into add-ops-for-schema
Feb 29, 2024
4d444a7
change metalake id to a normal index
Feb 29, 2024
fbb0a84
fix comments
Feb 29, 2024
fa6dea2
support ops for table
Feb 28, 2024
841bc01
update the exception msg
Mar 1, 2024
7f4472d
support ops for fileset
Feb 28, 2024
26194e3
update the exception msg
Mar 1, 2024
26b83ca
extract common the code
Mar 1, 2024
8e8a817
support ops for table
Feb 28, 2024
355b1f9
update the exception msg
Mar 1, 2024
385128b
extract common code
Mar 1, 2024
6e429cb
Merge branch 'add-ops-for-table' into add-ops-for-fileset
Mar 1, 2024
b7106f9
extract common code
Mar 1, 2024
f78c2d5
extract an exception util
Mar 1, 2024
2bc14cc
Merge branch 'add-ops-for-schema' into add-ops-for-table
Mar 1, 2024
66c1dff
improve
Mar 1, 2024
2be0128
Merge branch 'add-ops-for-table' into add-ops-for-fileset
Mar 1, 2024
3aa11e5
add common meta service
Mar 1, 2024
405db25
fix comments
Mar 1, 2024
1e40b2b
fix comments
Mar 1, 2024
5a2b797
Merge branch 'add-ops-for-schema' into add-ops-for-table
Mar 1, 2024
c521d75
update
Mar 1, 2024
aba9c06
Merge branch 'add-ops-for-table' into add-ops-for-fileset
Mar 1, 2024
bba2ce6
update
Mar 1, 2024
0ad35fe
Merge branch 'main' into add-ops-for-table
Mar 4, 2024
c4e03fe
Merge branch 'add-ops-for-table' into add-ops-for-fileset
Mar 4, 2024
1784706
add get fileset id method
Mar 4, 2024
aa5d005
Merge branch 'main' into add-ops-for-fileset
Mar 5, 2024
117cfb6
using StringUtils
Mar 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import com.datastrato.gravitino.exceptions.NoSuchEntityException;
import com.datastrato.gravitino.meta.BaseMetalake;
import com.datastrato.gravitino.meta.CatalogEntity;
import com.datastrato.gravitino.meta.FilesetEntity;
import com.datastrato.gravitino.meta.SchemaEntity;
import com.datastrato.gravitino.meta.TableEntity;
import com.datastrato.gravitino.storage.relational.service.CatalogMetaService;
import com.datastrato.gravitino.storage.relational.service.FilesetMetaService;
import com.datastrato.gravitino.storage.relational.service.MetalakeMetaService;
import com.datastrato.gravitino.storage.relational.service.SchemaMetaService;
import com.datastrato.gravitino.storage.relational.service.TableMetaService;
Expand Down Expand Up @@ -54,6 +56,8 @@ public <E extends Entity & HasIdentifier> List<E> list(
return (List<E>) SchemaMetaService.getInstance().listSchemasByNamespace(namespace);
case TABLE:
return (List<E>) TableMetaService.getInstance().listTablesByNamespace(namespace);
case FILESET:
return (List<E>) FilesetMetaService.getInstance().listFilesetsByNamespace(namespace);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for list operation", entityType);
Expand Down Expand Up @@ -81,6 +85,8 @@ public <E extends Entity & HasIdentifier> void insert(E e, boolean overwritten)
SchemaMetaService.getInstance().insertSchema((SchemaEntity) e, overwritten);
} else if (e instanceof TableEntity) {
TableMetaService.getInstance().insertTable((TableEntity) e, overwritten);
} else if (e instanceof FilesetEntity) {
FilesetMetaService.getInstance().insertFileset((FilesetEntity) e, overwritten);
} else {
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for insert operation", e.getClass());
Expand All @@ -100,6 +106,8 @@ public <E extends Entity & HasIdentifier> E update(
return (E) SchemaMetaService.getInstance().updateSchema(ident, updater);
case TABLE:
return (E) TableMetaService.getInstance().updateTable(ident, updater);
case FILESET:
return (E) FilesetMetaService.getInstance().updateFileset(ident, updater);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for update operation", entityType);
Expand All @@ -118,6 +126,8 @@ public <E extends Entity & HasIdentifier> E get(
return (E) SchemaMetaService.getInstance().getSchemaByIdentifier(ident);
case TABLE:
return (E) TableMetaService.getInstance().getTableByIdentifier(ident);
case FILESET:
return (E) FilesetMetaService.getInstance().getFilesetByIdentifier(ident);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for get operation", entityType);
Expand All @@ -135,6 +145,8 @@ public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolea
return SchemaMetaService.getInstance().deleteSchema(ident, cascade);
case TABLE:
return TableMetaService.getInstance().deleteTable(ident);
case FILESET:
return FilesetMetaService.getInstance().deleteFileset(ident);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for delete operation", entityType);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.storage.relational.mapper;

import com.datastrato.gravitino.storage.relational.po.FilesetPO;
import java.util.List;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Result;
import org.apache.ibatis.annotations.Results;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;

/**
* A MyBatis Mapper for fileset meta operation SQLs.
*
* <p>This interface class is a specification defined by MyBatis. It requires this interface class
* to identify the corresponding SQLs for execution. We can write SQLs in an additional XML file, or
* write SQLs with annotations in this interface Mapper. See: <a
* href="https://mybatis.org/mybatis-3/getting-started.html"></a>
*/
public interface FilesetMetaMapper {
String META_TABLE_NAME = "fileset_meta";

String VERSION_TABLE_NAME = "fileset_version_info";

@Select(
"SELECT fm.fileset_id, fm.fileset_name, fm.metalake_id, fm.catalog_id, fm.schema_id,"
+ " fm.type, fm.audit_info, fm.current_version, fm.last_version, fm.deleted_at,"
+ " vi.id, vi.metalake_id as version_metalake_id, vi.catalog_id as version_catalog_id,"
+ " vi.schema_id as version_schema_id, vi.fileset_id as version_fileset_id,"
+ " vi.version, vi.fileset_comment, vi.properties, vi.storage_location,"
+ " vi.deleted_at as version_deleted_at"
+ " FROM "
+ META_TABLE_NAME
+ " fm INNER JOIN "
+ VERSION_TABLE_NAME
+ " vi ON fm.fileset_id = vi.fileset_id AND fm.current_version = vi.version"
+ " WHERE fm.schema_id = #{schemaId} AND fm.deleted_at = 0 AND vi.deleted_at = 0")
@Results({
@Result(property = "filesetId", column = "fileset_id"),
@Result(property = "filesetName", column = "fileset_name"),
@Result(property = "metalakeId", column = "metalake_id"),
@Result(property = "catalogId", column = "catalog_id"),
@Result(property = "schemaId", column = "schema_id"),
@Result(property = "type", column = "type"),
@Result(property = "auditInfo", column = "audit_info"),
@Result(property = "currentVersion", column = "current_version"),
@Result(property = "lastVersion", column = "last_version"),
@Result(property = "deletedAt", column = "deleted_at"),
@Result(property = "filesetVersionPO.id", column = "id"),
@Result(property = "filesetVersionPO.metalakeId", column = "version_metalake_id"),
@Result(property = "filesetVersionPO.catalogId", column = "version_catalog_id"),
@Result(property = "filesetVersionPO.schemaId", column = "version_schema_id"),
@Result(property = "filesetVersionPO.filesetId", column = "version_fileset_id"),
@Result(property = "filesetVersionPO.version", column = "version"),
@Result(property = "filesetVersionPO.filesetComment", column = "fileset_comment"),
@Result(property = "filesetVersionPO.properties", column = "properties"),
@Result(property = "filesetVersionPO.storageLocation", column = "storage_location"),
@Result(property = "filesetVersionPO.deletedAt", column = "version_deleted_at")
})
List<FilesetPO> listFilesetPOsBySchemaId(@Param("schemaId") Long schemaId);

@Select(
"SELECT fileset_id as filesetId FROM "
+ META_TABLE_NAME
+ " WHERE schema_id = #{schemaId} AND fileset_name = #{filesetName}"
+ " AND deleted_at = 0")
Long selectFilesetIdBySchemaIdAndName(
@Param("schemaId") Long schemaId, @Param("filesetName") String name);

@Select(
"SELECT fm.fileset_id, fm.fileset_name, fm.metalake_id, fm.catalog_id, fm.schema_id,"
+ " fm.type, fm.audit_info, fm.current_version, fm.last_version, fm.deleted_at,"
+ " vi.id, vi.metalake_id as version_metalake_id, vi.catalog_id as version_catalog_id,"
+ " vi.schema_id as version_schema_id, vi.fileset_id as version_fileset_id,"
+ " vi.version, vi.fileset_comment, vi.properties, vi.storage_location,"
+ " vi.deleted_at as version_deleted_at"
+ " FROM "
+ META_TABLE_NAME
+ " fm INNER JOIN "
+ VERSION_TABLE_NAME
+ " vi ON fm.fileset_id = vi.fileset_id AND fm.current_version = vi.version"
+ " WHERE fm.schema_id = #{schemaId} AND fm.fileset_name = #{filesetName}"
+ " AND fm.deleted_at = 0 AND vi.deleted_at = 0")
@Results({
@Result(property = "filesetId", column = "fileset_id"),
@Result(property = "filesetName", column = "fileset_name"),
@Result(property = "metalakeId", column = "metalake_id"),
@Result(property = "catalogId", column = "catalog_id"),
@Result(property = "schemaId", column = "schema_id"),
@Result(property = "type", column = "type"),
@Result(property = "auditInfo", column = "audit_info"),
@Result(property = "currentVersion", column = "current_version"),
@Result(property = "lastVersion", column = "last_version"),
@Result(property = "deletedAt", column = "deleted_at"),
@Result(property = "filesetVersionPO.id", column = "id"),
@Result(property = "filesetVersionPO.metalakeId", column = "version_metalake_id"),
@Result(property = "filesetVersionPO.catalogId", column = "version_catalog_id"),
@Result(property = "filesetVersionPO.schemaId", column = "version_schema_id"),
@Result(property = "filesetVersionPO.filesetId", column = "version_fileset_id"),
@Result(property = "filesetVersionPO.version", column = "version"),
@Result(property = "filesetVersionPO.filesetComment", column = "fileset_comment"),
@Result(property = "filesetVersionPO.properties", column = "properties"),
@Result(property = "filesetVersionPO.storageLocation", column = "storage_location"),
@Result(property = "filesetVersionPO.deletedAt", column = "version_deleted_at")
})
FilesetPO selectFilesetMetaBySchemaIdAndName(
@Param("schemaId") Long schemaId, @Param("filesetName") String name);

@Insert(
"INSERT INTO "
+ META_TABLE_NAME
+ "(fileset_id, fileset_name, metalake_id,"
+ " catalog_id, schema_id, type, audit_info,"
+ " current_version, last_version, deleted_at)"
+ " VALUES("
+ " #{filesetMeta.filesetId},"
+ " #{filesetMeta.filesetName},"
+ " #{filesetMeta.metalakeId},"
+ " #{filesetMeta.catalogId},"
+ " #{filesetMeta.schemaId},"
+ " #{filesetMeta.type},"
+ " #{filesetMeta.auditInfo},"
+ " #{filesetMeta.currentVersion},"
+ " #{filesetMeta.lastVersion},"
+ " #{filesetMeta.deletedAt}"
+ " )")
void insertFilesetMeta(@Param("filesetMeta") FilesetPO filesetPO);

@Insert(
"INSERT INTO "
+ META_TABLE_NAME
+ "(fileset_id, fileset_name, metalake_id,"
+ " catalog_id, schema_id, type, audit_info,"
+ " current_version, last_version, deleted_at)"
+ " VALUES("
+ " #{filesetMeta.filesetId},"
+ " #{filesetMeta.filesetName},"
+ " #{filesetMeta.metalakeId},"
+ " #{filesetMeta.catalogId},"
+ " #{filesetMeta.schemaId},"
+ " #{filesetMeta.type},"
+ " #{filesetMeta.auditInfo},"
+ " #{filesetMeta.currentVersion},"
+ " #{filesetMeta.lastVersion},"
+ " #{filesetMeta.deletedAt}"
+ " )"
+ " ON DUPLICATE KEY UPDATE"
+ " fileset_name = #{filesetMeta.filesetName},"
+ " metalake_id = #{filesetMeta.metalakeId},"
+ " catalog_id = #{filesetMeta.catalogId},"
+ " schema_id = #{filesetMeta.schemaId},"
+ " type = #{filesetMeta.type},"
+ " audit_info = #{filesetMeta.auditInfo},"
+ " current_version = #{filesetMeta.currentVersion},"
+ " last_version = #{filesetMeta.lastVersion},"
+ " deleted_at = #{filesetMeta.deletedAt}")
void insertFilesetMetaOnDuplicateKeyUpdate(@Param("filesetMeta") FilesetPO filesetPO);

@Update(
"UPDATE "
+ META_TABLE_NAME
+ " SET fileset_name = #{newFilesetMeta.filesetName},"
+ " metalake_id = #{newFilesetMeta.metalakeId},"
+ " catalog_id = #{newFilesetMeta.catalogId},"
+ " schema_id = #{newFilesetMeta.schemaId},"
+ " type = #{newFilesetMeta.type},"
+ " audit_info = #{newFilesetMeta.auditInfo},"
+ " current_version = #{newFilesetMeta.currentVersion},"
+ " last_version = #{newFilesetMeta.lastVersion},"
+ " deleted_at = #{newFilesetMeta.deletedAt}"
+ " WHERE fileset_id = #{oldFilesetMeta.filesetId}"
+ " AND fileset_name = #{oldFilesetMeta.filesetName}"
+ " AND metalake_id = #{oldFilesetMeta.metalakeId}"
+ " AND catalog_id = #{oldFilesetMeta.catalogId}"
+ " AND schema_id = #{oldFilesetMeta.schemaId}"
+ " AND type = #{oldFilesetMeta.type}"
+ " AND audit_info = #{oldFilesetMeta.auditInfo}"
+ " AND current_version = #{oldFilesetMeta.currentVersion}"
+ " AND last_version = #{oldFilesetMeta.lastVersion}"
+ " AND deleted_at = 0")
Integer updateFilesetMeta(
@Param("newFilesetMeta") FilesetPO newFilesetPO,
@Param("oldFilesetMeta") FilesetPO oldFilesetPO);

@Update(
"UPDATE "
+ META_TABLE_NAME
+ " SET deleted_at = UNIX_TIMESTAMP()"
+ " WHERE metalake_id = #{metalakeId} AND deleted_at = 0")
Integer softDeleteFilesetMetasByMetalakeId(@Param("metalakeId") Long metalakeId);

@Update(
"UPDATE "
+ META_TABLE_NAME
+ " SET deleted_at = UNIX_TIMESTAMP()"
+ " WHERE catalog_id = #{catalogId} AND deleted_at = 0")
Integer softDeleteFilesetMetasByCatalogId(@Param("catalogId") Long catalogId);

@Update(
"UPDATE "
+ META_TABLE_NAME
+ " SET deleted_at = UNIX_TIMESTAMP()"
+ " WHERE schema_id = #{schemaId} AND deleted_at = 0")
Integer softDeleteFilesetMetasBySchemaId(@Param("schemaId") Long schemaId);

@Update(
"UPDATE "
+ META_TABLE_NAME
+ " SET deleted_at = UNIX_TIMESTAMP()"
+ " WHERE fileset_id = #{filesetId} AND deleted_at = 0")
Integer softDeleteFilesetMetasByFilesetId(@Param("filesetId") Long filesetId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.storage.relational.mapper;

import com.datastrato.gravitino.storage.relational.po.FilesetVersionPO;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Update;

/**
* A MyBatis Mapper for fileset version info operation SQLs.
*
* <p>This interface class is a specification defined by MyBatis. It requires this interface class
* to identify the corresponding SQLs for execution. We can write SQLs in an additional XML file, or
* write SQLs with annotations in this interface Mapper. See: <a
* href="https://mybatis.org/mybatis-3/getting-started.html"></a>
*/
public interface FilesetVersionMapper {
String VERSION_TABLE_NAME = "fileset_version_info";

@Insert(
"INSERT INTO "
+ VERSION_TABLE_NAME
+ "(metalake_id, catalog_id, schema_id, fileset_id,"
+ " version, fileset_comment, properties, storage_location,"
+ " deleted_at)"
+ " VALUES("
+ " #{filesetVersion.metalakeId},"
+ " #{filesetVersion.catalogId},"
+ " #{filesetVersion.schemaId},"
+ " #{filesetVersion.filesetId},"
+ " #{filesetVersion.version},"
+ " #{filesetVersion.filesetComment},"
+ " #{filesetVersion.properties},"
+ " #{filesetVersion.storageLocation},"
+ " #{filesetVersion.deletedAt}"
+ " )")
void insertFilesetVersion(@Param("filesetVersion") FilesetVersionPO filesetVersionPO);

@Insert(
"INSERT INTO "
+ VERSION_TABLE_NAME
+ "(metalake_id, catalog_id, schema_id, fileset_id,"
+ " version, fileset_comment, properties, storage_location,"
+ " deleted_at)"
+ " VALUES("
+ " #{filesetVersion.metalakeId},"
+ " #{filesetVersion.catalogId},"
+ " #{filesetVersion.schemaId},"
+ " #{filesetVersion.filesetId},"
+ " #{filesetVersion.version},"
+ " #{filesetVersion.filesetComment},"
+ " #{filesetVersion.properties},"
+ " #{filesetVersion.storageLocation},"
+ " #{filesetVersion.deletedAt}"
+ " )"
+ " ON DUPLICATE KEY UPDATE"
+ " metalake_id = #{filesetVersion.metalakeId},"
+ " catalog_id = #{filesetVersion.catalogId},"
+ " schema_id = #{filesetVersion.schemaId},"
+ " fileset_id = #{filesetVersion.filesetId},"
+ " version = #{filesetVersion.version},"
+ " fileset_comment = #{filesetVersion.filesetComment},"
+ " properties = #{filesetVersion.properties},"
+ " storage_location = #{filesetVersion.storageLocation},"
+ " deleted_at = #{filesetVersion.deletedAt}")
void insertFilesetVersionOnDuplicateKeyUpdate(
@Param("filesetVersion") FilesetVersionPO filesetVersionPO);

@Update(
"UPDATE "
+ VERSION_TABLE_NAME
+ " SET deleted_at = UNIX_TIMESTAMP()"
+ " WHERE metalake_id = #{metalakeId} AND deleted_at = 0")
Integer softDeleteFilesetVersionsByMetalakeId(@Param("metalakeId") Long metalakeId);

@Update(
"UPDATE "
+ VERSION_TABLE_NAME
+ " SET deleted_at = UNIX_TIMESTAMP()"
+ " WHERE catalog_id = #{catalogId} AND deleted_at = 0")
Integer softDeleteFilesetVersionsByCatalogId(@Param("catalogId") Long catalogId);

@Update(
"UPDATE "
+ VERSION_TABLE_NAME
+ " SET deleted_at = UNIX_TIMESTAMP()"
+ " WHERE schema_id = #{schemaId} AND deleted_at = 0")
Integer softDeleteFilesetVersionsBySchemaId(@Param("schemaId") Long schemaId);

@Update(
"UPDATE "
+ VERSION_TABLE_NAME
+ " SET deleted_at = UNIX_TIMESTAMP()"
+ " WHERE fileset_id = #{filesetId} AND deleted_at = 0")
Integer softDeleteFilesetVersionsByFilesetId(@Param("filesetId") Long filesetId);
}
Loading
Loading