Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2083] feat(core): Add JDBC backend operations for table #2384

Merged
merged 54 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
4432c85
add rel interface
Feb 2, 2024
ca62a8a
fix the comment
Feb 4, 2024
b29b0b9
add the java doc for RelationBackend
Feb 4, 2024
0f28556
fix comments
Feb 5, 2024
eb70ecf
add the code skeleton for mysql backend operating metalake
Feb 2, 2024
a464241
fix update
Feb 4, 2024
3f29d1f
add unit test
Feb 5, 2024
5cc6d40
fix comments
Feb 6, 2024
4a152c8
fix comments and uts
Feb 6, 2024
42ce5ca
fix some comments
Feb 6, 2024
ff736d8
rename to jdbc backend
Feb 17, 2024
146256d
fix comments
Feb 17, 2024
45c88b2
fix comments
Feb 18, 2024
5a45836
fix comments
Feb 18, 2024
956b5a6
fix comments
Feb 18, 2024
dd806c6
remove type config
Feb 19, 2024
fdffa5c
refactor the schema
Feb 22, 2024
4130dce
fix the import order
xloya Feb 22, 2024
e5c25f2
update
xloya Feb 22, 2024
7760554
fix comments
Feb 25, 2024
ea83dcc
add the code skeleton for mysql backend operating metalake
Feb 2, 2024
97a00c7
fix comments and uts
Feb 6, 2024
5ad6b8d
fix some comments
Feb 6, 2024
723541f
rename to jdbc backend
Feb 17, 2024
f004dc3
fix comments
Feb 17, 2024
3057e0b
fix comments
Feb 6, 2024
51f45f4
add the code skeleton for mysql backend operating metalake
Feb 2, 2024
38403be
fix update
Feb 4, 2024
0740200
add unit test
Feb 5, 2024
11f4ed6
add mysql backend ops for catalog
Feb 5, 2024
bb5334e
refactor the catalog code
Feb 26, 2024
fada86d
Merge branch 'main' into add-mysql-backend-ops-for-catalog
Feb 26, 2024
70a91d0
fix comments
Feb 27, 2024
25a60b2
catch constraint exception when update
Feb 27, 2024
4adcac8
add ut for update unique key constraints
Feb 27, 2024
5fa006a
add the code skeleton
Feb 27, 2024
f15595e
support schema ops
Feb 28, 2024
1d10a0f
Merge branch 'main' into add-ops-for-schema
Feb 29, 2024
4d444a7
change metalake id to a normal index
Feb 29, 2024
fbb0a84
fix comments
Feb 29, 2024
26b83ca
extract common the code
Mar 1, 2024
8e8a817
support ops for table
Feb 28, 2024
355b1f9
update the exception msg
Mar 1, 2024
385128b
extract common code
Mar 1, 2024
f78c2d5
extract an exception util
Mar 1, 2024
2bc14cc
Merge branch 'add-ops-for-schema' into add-ops-for-table
Mar 1, 2024
66c1dff
improve
Mar 1, 2024
3aa11e5
add common meta service
Mar 1, 2024
405db25
fix comments
Mar 1, 2024
1e40b2b
fix comments
Mar 1, 2024
5a2b797
Merge branch 'add-ops-for-schema' into add-ops-for-table
Mar 1, 2024
c521d75
update
Mar 1, 2024
0ad35fe
Merge branch 'main' into add-ops-for-table
Mar 4, 2024
9cf7272
fix comments
Mar 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import com.datastrato.gravitino.meta.BaseMetalake;
import com.datastrato.gravitino.meta.CatalogEntity;
import com.datastrato.gravitino.meta.SchemaEntity;
import com.datastrato.gravitino.meta.TableEntity;
import com.datastrato.gravitino.storage.relational.service.CatalogMetaService;
import com.datastrato.gravitino.storage.relational.service.MetalakeMetaService;
import com.datastrato.gravitino.storage.relational.service.SchemaMetaService;
import com.datastrato.gravitino.storage.relational.service.TableMetaService;
import com.datastrato.gravitino.storage.relational.session.SqlSessionFactoryHelper;
import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -50,6 +52,8 @@ public <E extends Entity & HasIdentifier> List<E> list(
return (List<E>) CatalogMetaService.getInstance().listCatalogsByNamespace(namespace);
case SCHEMA:
return (List<E>) SchemaMetaService.getInstance().listSchemasByNamespace(namespace);
case TABLE:
return (List<E>) TableMetaService.getInstance().listTablesByNamespace(namespace);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for list operation", entityType);
Expand All @@ -75,6 +79,8 @@ public <E extends Entity & HasIdentifier> void insert(E e, boolean overwritten)
CatalogMetaService.getInstance().insertCatalog((CatalogEntity) e, overwritten);
} else if (e instanceof SchemaEntity) {
SchemaMetaService.getInstance().insertSchema((SchemaEntity) e, overwritten);
} else if (e instanceof TableEntity) {
TableMetaService.getInstance().insertTable((TableEntity) e, overwritten);
} else {
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for insert operation", e.getClass());
Expand All @@ -92,6 +98,8 @@ public <E extends Entity & HasIdentifier> E update(
return (E) CatalogMetaService.getInstance().updateCatalog(ident, updater);
case SCHEMA:
return (E) SchemaMetaService.getInstance().updateSchema(ident, updater);
case TABLE:
return (E) TableMetaService.getInstance().updateTable(ident, updater);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for update operation", entityType);
Expand All @@ -108,6 +116,8 @@ public <E extends Entity & HasIdentifier> E get(
return (E) CatalogMetaService.getInstance().getCatalogByIdentifier(ident);
case SCHEMA:
return (E) SchemaMetaService.getInstance().getSchemaByIdentifier(ident);
case TABLE:
return (E) TableMetaService.getInstance().getTableByIdentifier(ident);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for get operation", entityType);
Expand All @@ -123,6 +133,8 @@ public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolea
return CatalogMetaService.getInstance().deleteCatalog(ident, cascade);
case SCHEMA:
return SchemaMetaService.getInstance().deleteSchema(ident, cascade);
case TABLE:
return TableMetaService.getInstance().deleteTable(ident);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for delete operation", entityType);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.storage.relational.mapper;

import com.datastrato.gravitino.storage.relational.po.TablePO;
import java.util.List;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;

/**
* A MyBatis Mapper for table meta operation SQLs.
*
* <p>This interface class is a specification defined by MyBatis. It requires this interface class
* to identify the corresponding SQLs for execution. We can write SQLs in an additional XML file, or
* write SQLs with annotations in this interface Mapper. See: <a
* href="https://mybatis.org/mybatis-3/getting-started.html"></a>
*/
public interface TableMetaMapper {
String TABLE_NAME = "table_meta";

@Select(
"SELECT table_id as tableId, table_name as tableName,"
+ " metalake_id as metalakeId, catalog_id as catalogId,"
+ " schema_id as schemaId, audit_info as auditInfo,"
+ " current_version as currentVersion, last_version as lastVersion,"
+ " deleted_at as deletedAt"
+ " FROM "
+ TABLE_NAME
+ " WHERE schema_id = #{schemaId} AND deleted_at = 0")
List<TablePO> listTablePOsBySchemaId(@Param("schemaId") Long schemaId);

@Select(
"SELECT table_id as tableId FROM "
+ TABLE_NAME
+ " WHERE schema_id = #{schemaId} AND table_name = #{tableName}"
+ " AND deleted_at = 0")
Long selectTableIdBySchemaIdAndName(
@Param("schemaId") Long schemaId, @Param("tableName") String name);

@Select(
"SELECT table_id as tableId, table_name as tableName,"
+ " metalake_id as metalakeId, catalog_id as catalogId,"
+ " schema_id as schemaId, audit_info as auditInfo,"
+ " current_version as currentVersion, last_version as lastVersion,"
+ " deleted_at as deletedAt"
+ " FROM "
+ TABLE_NAME
+ " WHERE schema_id = #{schemaId} AND table_name = #{tableName} AND deleted_at = 0")
TablePO selectTableMetaBySchemaIdAndName(
@Param("schemaId") Long schemaId, @Param("tableName") String name);

@Insert(
"INSERT INTO "
+ TABLE_NAME
+ "(table_id, table_name, metalake_id,"
+ " catalog_id, schema_id, audit_info,"
+ " current_version, last_version, deleted_at)"
+ " VALUES("
+ " #{tableMeta.tableId},"
+ " #{tableMeta.tableName},"
+ " #{tableMeta.metalakeId},"
+ " #{tableMeta.catalogId},"
+ " #{tableMeta.schemaId},"
+ " #{tableMeta.auditInfo},"
+ " #{tableMeta.currentVersion},"
+ " #{tableMeta.lastVersion},"
+ " #{tableMeta.deletedAt}"
+ " )")
void insertTableMeta(@Param("tableMeta") TablePO tablePO);

@Insert(
"INSERT INTO "
+ TABLE_NAME
+ "(table_id, table_name, metalake_id,"
+ " catalog_id, schema_id, audit_info,"
+ " current_version, last_version, deleted_at)"
+ " VALUES("
+ " #{tableMeta.tableId},"
+ " #{tableMeta.tableName},"
+ " #{tableMeta.metalakeId},"
+ " #{tableMeta.catalogId},"
+ " #{tableMeta.schemaId},"
+ " #{tableMeta.auditInfo},"
+ " #{tableMeta.currentVersion},"
+ " #{tableMeta.lastVersion},"
+ " #{tableMeta.deletedAt}"
+ " )"
+ " ON DUPLICATE KEY UPDATE"
+ " table_name = #{tableMeta.tableName},"
+ " metalake_id = #{tableMeta.metalakeId},"
+ " catalog_id = #{tableMeta.catalogId},"
+ " schema_id = #{tableMeta.schemaId},"
+ " audit_info = #{tableMeta.auditInfo},"
+ " current_version = #{tableMeta.currentVersion},"
+ " last_version = #{tableMeta.lastVersion},"
+ " deleted_at = #{tableMeta.deletedAt}")
void insertTableMetaOnDuplicateKeyUpdate(@Param("tableMeta") TablePO tablePO);

@Update(
"UPDATE "
+ TABLE_NAME
+ " SET table_name = #{newTableMeta.tableName},"
+ " metalake_id = #{newTableMeta.metalakeId},"
+ " catalog_id = #{newTableMeta.catalogId},"
+ " schema_id = #{newTableMeta.schemaId},"
+ " audit_info = #{newTableMeta.auditInfo},"
+ " current_version = #{newTableMeta.currentVersion},"
+ " last_version = #{newTableMeta.lastVersion},"
+ " deleted_at = #{newTableMeta.deletedAt}"
+ " WHERE table_id = #{oldTableMeta.tableId}"
+ " AND table_name = #{oldTableMeta.tableName}"
+ " AND metalake_id = #{oldTableMeta.metalakeId}"
+ " AND catalog_id = #{oldTableMeta.catalogId}"
+ " AND schema_id = #{oldTableMeta.schemaId}"
+ " AND audit_info = #{oldTableMeta.auditInfo}"
+ " AND current_version = #{oldTableMeta.currentVersion}"
+ " AND last_version = #{oldTableMeta.lastVersion}"
+ " AND deleted_at = 0")
Integer updateTableMeta(
@Param("newTableMeta") TablePO newTablePO, @Param("oldTableMeta") TablePO oldTablePO);

@Update(
"UPDATE "
+ TABLE_NAME
+ " SET deleted_at = UNIX_TIMESTAMP()"
+ " WHERE table_id = #{tableId} AND deleted_at = 0")
Integer softDeleteTableMetasByTableId(@Param("tableId") Long tableId);

@Update(
"UPDATE "
+ TABLE_NAME
+ " SET deleted_at = UNIX_TIMESTAMP()"
+ " WHERE metalake_id = #{metalakeId} AND deleted_at = 0")
Integer softDeleteTableMetasByMetalakeId(@Param("metalakeId") Long metalakeId);

@Update(
"UPDATE "
+ TABLE_NAME
+ " SET deleted_at = UNIX_TIMESTAMP()"
+ " WHERE catalog_id = #{catalogId} AND deleted_at = 0")
Integer softDeleteTableMetasByCatalogId(@Param("catalogId") Long catalogId);

@Update(
"UPDATE "
+ TABLE_NAME
+ " SET deleted_at = UNIX_TIMESTAMP()"
+ " WHERE schema_id = #{schemaId} AND deleted_at = 0")
Integer softDeleteTableMetasBySchemaId(@Param("schemaId") Long schemaId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.storage.relational.po;

import com.google.common.base.Objects;
import com.google.common.base.Preconditions;

public class TablePO {
private Long tableId;
private String tableName;
private Long metalakeId;
private Long catalogId;
private Long schemaId;
private String auditInfo;
private Long currentVersion;
private Long lastVersion;
private Long deletedAt;

public Long getTableId() {
return tableId;
}

public String getTableName() {
return tableName;
}

public Long getMetalakeId() {
return metalakeId;
}

public Long getCatalogId() {
return catalogId;
}

public Long getSchemaId() {
return schemaId;
}

public String getAuditInfo() {
return auditInfo;
}

public Long getCurrentVersion() {
return currentVersion;
}

public Long getLastVersion() {
return lastVersion;
}

public Long getDeletedAt() {
return deletedAt;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof TablePO)) {
return false;
}
TablePO tablePO = (TablePO) o;
return Objects.equal(getTableId(), tablePO.getTableId())
&& Objects.equal(getTableName(), tablePO.getTableName())
&& Objects.equal(getMetalakeId(), tablePO.getMetalakeId())
&& Objects.equal(getCatalogId(), tablePO.getCatalogId())
&& Objects.equal(getSchemaId(), tablePO.getSchemaId())
&& Objects.equal(getAuditInfo(), tablePO.getAuditInfo())
&& Objects.equal(getCurrentVersion(), tablePO.getCurrentVersion())
&& Objects.equal(getLastVersion(), tablePO.getLastVersion())
&& Objects.equal(getDeletedAt(), tablePO.getDeletedAt());
}

@Override
public int hashCode() {
return Objects.hashCode(
getTableId(),
getTableName(),
getMetalakeId(),
getCatalogId(),
getSchemaId(),
getAuditInfo(),
getCurrentVersion(),
getLastVersion(),
getDeletedAt());
}

public static class Builder {
private final TablePO tablePO;

public Builder() {
tablePO = new TablePO();
}

public Builder withTableId(Long tableId) {
tablePO.tableId = tableId;
return this;
}

public Builder withTableName(String tableName) {
tablePO.tableName = tableName;
return this;
}

public Builder withMetalakeId(Long metalakeId) {
tablePO.metalakeId = metalakeId;
return this;
}

public Builder withCatalogId(Long catalogId) {
tablePO.catalogId = catalogId;
return this;
}

public Builder withSchemaId(Long schemaId) {
tablePO.schemaId = schemaId;
return this;
}

public Builder withAuditInfo(String auditInfo) {
tablePO.auditInfo = auditInfo;
return this;
}

public Builder withCurrentVersion(Long currentVersion) {
tablePO.currentVersion = currentVersion;
return this;
}

public Builder withLastVersion(Long lastVersion) {
tablePO.lastVersion = lastVersion;
return this;
}

public Builder withDeletedAt(Long deletedAt) {
tablePO.deletedAt = deletedAt;
return this;
}

private void validate() {
Preconditions.checkArgument(tablePO.tableId != null, "Table id is required");
Preconditions.checkArgument(tablePO.tableName != null, "Table name is required");
Preconditions.checkArgument(tablePO.metalakeId != null, "Metalake id is required");
Preconditions.checkArgument(tablePO.catalogId != null, "Catalog id is required");
Preconditions.checkArgument(tablePO.schemaId != null, "Schema id is required");
Preconditions.checkArgument(tablePO.auditInfo != null, "Audit info is required");
Preconditions.checkArgument(tablePO.currentVersion != null, "Current version is required");
Preconditions.checkArgument(tablePO.lastVersion != null, "Last version is required");
Preconditions.checkArgument(tablePO.deletedAt != null, "Deleted at is required");
}

public TablePO build() {
validate();
return tablePO;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.datastrato.gravitino.meta.SchemaEntity;
import com.datastrato.gravitino.storage.relational.mapper.CatalogMetaMapper;
import com.datastrato.gravitino.storage.relational.mapper.SchemaMetaMapper;
import com.datastrato.gravitino.storage.relational.mapper.TableMetaMapper;
import com.datastrato.gravitino.storage.relational.po.CatalogPO;
import com.datastrato.gravitino.storage.relational.utils.ExceptionUtils;
import com.datastrato.gravitino.storage.relational.utils.POConverters;
Expand Down Expand Up @@ -174,6 +175,10 @@ public boolean deleteCatalog(NameIdentifier identifier, boolean cascade) {
SessionUtils.doWithoutCommit(
SchemaMetaMapper.class,
mapper -> mapper.softDeleteSchemaMetasByCatalogId(catalogId)),
() ->
SessionUtils.doWithoutCommit(
TableMetaMapper.class,
mapper -> mapper.softDeleteTableMetasByCatalogId(catalogId)),
() -> {
// TODO We will cascade delete the metadata of sub-resources under the catalog
});
Expand Down
Loading
Loading