Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2082] feat(core): Add JDBC backend operations for schema #2377

Merged
merged 45 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
4432c85
add rel interface
Feb 2, 2024
ca62a8a
fix the comment
Feb 4, 2024
b29b0b9
add the java doc for RelationBackend
Feb 4, 2024
0f28556
fix comments
Feb 5, 2024
eb70ecf
add the code skeleton for mysql backend operating metalake
Feb 2, 2024
a464241
fix update
Feb 4, 2024
3f29d1f
add unit test
Feb 5, 2024
5cc6d40
fix comments
Feb 6, 2024
4a152c8
fix comments and uts
Feb 6, 2024
42ce5ca
fix some comments
Feb 6, 2024
ff736d8
rename to jdbc backend
Feb 17, 2024
146256d
fix comments
Feb 17, 2024
45c88b2
fix comments
Feb 18, 2024
5a45836
fix comments
Feb 18, 2024
956b5a6
fix comments
Feb 18, 2024
dd806c6
remove type config
Feb 19, 2024
fdffa5c
refactor the schema
Feb 22, 2024
4130dce
fix the import order
xloya Feb 22, 2024
e5c25f2
update
xloya Feb 22, 2024
7760554
fix comments
Feb 25, 2024
ea83dcc
add the code skeleton for mysql backend operating metalake
Feb 2, 2024
97a00c7
fix comments and uts
Feb 6, 2024
5ad6b8d
fix some comments
Feb 6, 2024
723541f
rename to jdbc backend
Feb 17, 2024
f004dc3
fix comments
Feb 17, 2024
3057e0b
fix comments
Feb 6, 2024
51f45f4
add the code skeleton for mysql backend operating metalake
Feb 2, 2024
38403be
fix update
Feb 4, 2024
0740200
add unit test
Feb 5, 2024
11f4ed6
add mysql backend ops for catalog
Feb 5, 2024
bb5334e
refactor the catalog code
Feb 26, 2024
fada86d
Merge branch 'main' into add-mysql-backend-ops-for-catalog
Feb 26, 2024
70a91d0
fix comments
Feb 27, 2024
25a60b2
catch constraint exception when update
Feb 27, 2024
4adcac8
add ut for update unique key constraints
Feb 27, 2024
5fa006a
add the code skeleton
Feb 27, 2024
f15595e
support schema ops
Feb 28, 2024
1d10a0f
Merge branch 'main' into add-ops-for-schema
Feb 29, 2024
4d444a7
change metalake id to a normal index
Feb 29, 2024
fbb0a84
fix comments
Feb 29, 2024
26b83ca
extract common the code
Mar 1, 2024
f78c2d5
extract an exception util
Mar 1, 2024
3aa11e5
add common meta service
Mar 1, 2024
405db25
fix comments
Mar 1, 2024
1e40b2b
fix comments
Mar 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
/** This exception is thrown when an entity is not found. */
public class NoSuchEntityException extends RuntimeException {
/** The no such entity message for the exception. */
public static final String NO_SUCH_ENTITY_MESSAGE = "No such entity: %s";
public static final String NO_SUCH_ENTITY_MESSAGE = "No such %s entity: %s";

/**
* Constructs a new NoSuchEntityException.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
import com.datastrato.gravitino.exceptions.NoSuchEntityException;
import com.datastrato.gravitino.meta.BaseMetalake;
import com.datastrato.gravitino.meta.CatalogEntity;
import com.datastrato.gravitino.meta.SchemaEntity;
import com.datastrato.gravitino.storage.relational.service.CatalogMetaService;
import com.datastrato.gravitino.storage.relational.service.MetalakeMetaService;
import com.datastrato.gravitino.storage.relational.service.SchemaMetaService;
import com.datastrato.gravitino.storage.relational.session.SqlSessionFactoryHelper;
import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -46,6 +48,8 @@ public <E extends Entity & HasIdentifier> List<E> list(
return (List<E>) MetalakeMetaService.getInstance().listMetalakes();
case CATALOG:
return (List<E>) CatalogMetaService.getInstance().listCatalogsByNamespace(namespace);
case SCHEMA:
return (List<E>) SchemaMetaService.getInstance().listSchemasByNamespace(namespace);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for list operation", entityType);
Expand All @@ -69,6 +73,8 @@ public <E extends Entity & HasIdentifier> void insert(E e, boolean overwritten)
MetalakeMetaService.getInstance().insertMetalake((BaseMetalake) e, overwritten);
} else if (e instanceof CatalogEntity) {
CatalogMetaService.getInstance().insertCatalog((CatalogEntity) e, overwritten);
} else if (e instanceof SchemaEntity) {
SchemaMetaService.getInstance().insertSchema((SchemaEntity) e, overwritten);
} else {
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for insert operation", e.getClass());
Expand All @@ -84,6 +90,8 @@ public <E extends Entity & HasIdentifier> E update(
return (E) MetalakeMetaService.getInstance().updateMetalake(ident, updater);
case CATALOG:
return (E) CatalogMetaService.getInstance().updateCatalog(ident, updater);
case SCHEMA:
return (E) SchemaMetaService.getInstance().updateSchema(ident, updater);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for update operation", entityType);
Expand All @@ -98,6 +106,8 @@ public <E extends Entity & HasIdentifier> E get(
return (E) MetalakeMetaService.getInstance().getMetalakeByIdentifier(ident);
case CATALOG:
return (E) CatalogMetaService.getInstance().getCatalogByIdentifier(ident);
case SCHEMA:
return (E) SchemaMetaService.getInstance().getSchemaByIdentifier(ident);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for get operation", entityType);
Expand All @@ -111,6 +121,8 @@ public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolea
return MetalakeMetaService.getInstance().deleteMetalake(ident, cascade);
case CATALOG:
return CatalogMetaService.getInstance().deleteCatalog(ident, cascade);
case SCHEMA:
return SchemaMetaService.getInstance().deleteSchema(ident, cascade);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for delete operation", entityType);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.storage.relational.mapper;

import com.datastrato.gravitino.storage.relational.po.SchemaPO;
import java.util.List;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;

/**
* A MyBatis Mapper for schema meta operation SQLs.
*
* <p>This interface class is a specification defined by MyBatis. It requires this interface class
* to identify the corresponding SQLs for execution. We can write SQLs in an additional XML file, or
* write SQLs with annotations in this interface Mapper. See: <a
* href="https://mybatis.org/mybatis-3/getting-started.html"></a>
*/
public interface SchemaMetaMapper {
String TABLE_NAME = "schema_meta";

@Select(
"SELECT schema_id as schemaId, schema_name as schemaName,"
+ " metalake_id as metalakeId, catalog_id as catalogId,"
+ " schema_comment as schemaComment, properties, audit_info as auditInfo,"
+ " current_version as currentVersion, last_version as lastVersion,"
+ " deleted_at as deletedAt"
+ " FROM "
+ TABLE_NAME
+ " WHERE catalog_id = #{catalogId} AND deleted_at = 0")
List<SchemaPO> listSchemaPOsByCatalogId(@Param("catalogId") Long catalogId);

@Select(
"SELECT schema_id as schemaId FROM "
+ TABLE_NAME
+ " WHERE catalog_id = #{catalogId} AND schema_name = #{schemaName}"
+ " AND deleted_at = 0")
Long selectSchemaIdByCatalogIdAndName(
@Param("catalogId") Long catalogId, @Param("schemaName") String name);

@Select(
"SELECT schema_id as schemaId, schema_name as schemaName,"
+ " metalake_id as metalakeId, catalog_id as catalogId,"
+ " schema_comment as schemaComment, properties, audit_info as auditInfo,"
+ " current_version as currentVersion, last_version as lastVersion,"
+ " deleted_at as deletedAt"
+ " FROM "
+ TABLE_NAME
+ " WHERE catalog_id = #{catalogId} AND schema_name = #{schemaName} AND deleted_at = 0")
SchemaPO selectSchemaMetaByCatalogIdAndName(
@Param("catalogId") Long catalogId, @Param("schemaName") String name);

@Insert(
"INSERT INTO "
+ TABLE_NAME
+ "(schema_id, schema_name, metalake_id,"
+ " catalog_id, schema_comment, properties, audit_info,"
+ " current_version, last_version, deleted_at)"
+ " VALUES("
+ " #{schemaMeta.schemaId},"
+ " #{schemaMeta.schemaName},"
+ " #{schemaMeta.metalakeId},"
+ " #{schemaMeta.catalogId},"
+ " #{schemaMeta.schemaComment},"
+ " #{schemaMeta.properties},"
+ " #{schemaMeta.auditInfo},"
+ " #{schemaMeta.currentVersion},"
+ " #{schemaMeta.lastVersion},"
+ " #{schemaMeta.deletedAt}"
+ " )")
void insertSchemaMeta(@Param("schemaMeta") SchemaPO schemaPO);

@Insert(
"INSERT INTO "
+ TABLE_NAME
+ "(schema_id, schema_name, metalake_id,"
+ " catalog_id, schema_comment, properties, audit_info,"
+ " current_version, last_version, deleted_at)"
+ " VALUES("
+ " #{schemaMeta.schemaId},"
+ " #{schemaMeta.schemaName},"
+ " #{schemaMeta.metalakeId},"
+ " #{schemaMeta.catalogId},"
+ " #{schemaMeta.schemaComment},"
+ " #{schemaMeta.properties},"
+ " #{schemaMeta.auditInfo},"
+ " #{schemaMeta.currentVersion},"
+ " #{schemaMeta.lastVersion},"
+ " #{schemaMeta.deletedAt}"
+ " )"
+ " ON DUPLICATE KEY UPDATE"
+ " schema_name = #{schemaMeta.schemaName},"
+ " metalake_id = #{schemaMeta.metalakeId},"
+ " catalog_id = #{schemaMeta.catalogId},"
+ " schema_comment = #{schemaMeta.schemaComment},"
+ " properties = #{schemaMeta.properties},"
+ " audit_info = #{schemaMeta.auditInfo},"
+ " current_version = #{schemaMeta.currentVersion},"
+ " last_version = #{schemaMeta.lastVersion},"
+ " deleted_at = #{schemaMeta.deletedAt}")
void insertSchemaMetaOnDuplicateKeyUpdate(@Param("schemaMeta") SchemaPO schemaPO);

@Update(
"UPDATE "
+ TABLE_NAME
+ " SET schema_name = #{newSchemaMeta.schemaName},"
+ " metalake_id = #{newSchemaMeta.metalakeId},"
+ " catalog_id = #{newSchemaMeta.catalogId},"
+ " schema_comment = #{newSchemaMeta.schemaComment},"
+ " properties = #{newSchemaMeta.properties},"
+ " audit_info = #{newSchemaMeta.auditInfo},"
+ " current_version = #{newSchemaMeta.currentVersion},"
+ " last_version = #{newSchemaMeta.lastVersion},"
+ " deleted_at = #{newSchemaMeta.deletedAt}"
+ " WHERE schema_id = #{oldSchemaMeta.schemaId}"
+ " AND schema_name = #{oldSchemaMeta.schemaName}"
+ " AND metalake_id = #{oldSchemaMeta.metalakeId}"
+ " AND catalog_id = #{oldSchemaMeta.catalogId}"
+ " AND schema_comment = #{oldSchemaMeta.schemaComment}"
+ " AND properties = #{oldSchemaMeta.properties}"
+ " AND audit_info = #{oldSchemaMeta.auditInfo}"
+ " AND current_version = #{oldSchemaMeta.currentVersion}"
+ " AND last_version = #{oldSchemaMeta.lastVersion}"
+ " AND deleted_at = 0")
Integer updateSchemaMeta(
@Param("newSchemaMeta") SchemaPO newSchemaPO, @Param("oldSchemaMeta") SchemaPO oldSchemaPO);

@Update(
"UPDATE "
+ TABLE_NAME
+ " SET deleted_at = UNIX_TIMESTAMP()"
+ " WHERE schema_id = #{schemaId} AND deleted_at = 0")
Integer softDeleteSchemaMetasBySchemaId(@Param("schemaId") Long schemaId);

@Update(
"UPDATE "
+ TABLE_NAME
+ " SET deleted_at = UNIX_TIMESTAMP()"
+ " WHERE metalake_id = #{metalakeId} AND deleted_at = 0")
Integer softDeleteSchemaMetasByMetalakeId(@Param("metalakeId") Long metalakeId);

@Update(
"UPDATE "
+ TABLE_NAME
+ " SET deleted_at = UNIX_TIMESTAMP()"
+ " WHERE catalog_id = #{catalogId} AND deleted_at = 0")
Integer softDeleteSchemaMetasByCatalogId(@Param("catalogId") Long catalogId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package com.datastrato.gravitino.storage.relational.po;

import com.google.common.base.Objects;
import com.google.common.base.Preconditions;

public class CatalogPO {
private Long catalogId;
Expand Down Expand Up @@ -164,7 +165,20 @@ public CatalogPO.Builder withDeletedAt(Long deletedAt) {
return this;
}

private void validate() {
Preconditions.checkArgument(metalakePO.catalogId != null, "Catalog id is required");
Preconditions.checkArgument(metalakePO.catalogName != null, "Catalog name is required");
Preconditions.checkArgument(metalakePO.metalakeId != null, "Metalake id is required");
Preconditions.checkArgument(metalakePO.type != null, "Catalog type is required");
Preconditions.checkArgument(metalakePO.provider != null, "Catalog provider is required");
Preconditions.checkArgument(metalakePO.auditInfo != null, "Audit info is required");
Preconditions.checkArgument(metalakePO.currentVersion != null, "Current version is required");
Preconditions.checkArgument(metalakePO.lastVersion != null, "Last version is required");
Preconditions.checkArgument(metalakePO.deletedAt != null, "Deleted at is required");
}

public CatalogPO build() {
validate();
return metalakePO;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package com.datastrato.gravitino.storage.relational.po;

import com.google.common.base.Objects;
import com.google.common.base.Preconditions;

public class MetalakePO {
private Long metalakeId;
Expand Down Expand Up @@ -140,7 +141,18 @@ public MetalakePO.Builder withDeletedAt(Long deletedAt) {
return this;
}

private void validate() {
Preconditions.checkArgument(metalakePO.metalakeId != null, "Metalake id is required");
Preconditions.checkArgument(metalakePO.metalakeName != null, "Metalake name is required");
Preconditions.checkArgument(metalakePO.auditInfo != null, "Audit info is required");
Preconditions.checkArgument(metalakePO.schemaVersion != null, "Schema version is required");
Preconditions.checkArgument(metalakePO.currentVersion != null, "Current version is required");
Preconditions.checkArgument(metalakePO.lastVersion != null, "Last version is required");
Preconditions.checkArgument(metalakePO.deletedAt != null, "Deleted at is required");
}

public MetalakePO build() {
validate();
return metalakePO;
}
}
Expand Down
Loading
Loading