Skip to content

Commit

Permalink
[#2083] feat(core): Add JDBC backend operations for table (#2384)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

The purpose of this PR is to implement JDBC backend operation for Table
metadata. Depend on #2377 . Metadata operations of Fileset will be
supported in the remaining PRs.

### Why are the changes needed?

Fix: #2083 

### How was this patch tested?

Add unit tests to test the table metadata ops.

---------

Co-authored-by: xiaojiebao <[email protected]>
  • Loading branch information
xloya and xiaojiebao authored Mar 5, 2024
1 parent 169da0b commit 49bacdf
Show file tree
Hide file tree
Showing 13 changed files with 1,099 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import com.datastrato.gravitino.meta.BaseMetalake;
import com.datastrato.gravitino.meta.CatalogEntity;
import com.datastrato.gravitino.meta.SchemaEntity;
import com.datastrato.gravitino.meta.TableEntity;
import com.datastrato.gravitino.storage.relational.service.CatalogMetaService;
import com.datastrato.gravitino.storage.relational.service.MetalakeMetaService;
import com.datastrato.gravitino.storage.relational.service.SchemaMetaService;
import com.datastrato.gravitino.storage.relational.service.TableMetaService;
import com.datastrato.gravitino.storage.relational.session.SqlSessionFactoryHelper;
import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -50,6 +52,8 @@ public <E extends Entity & HasIdentifier> List<E> list(
return (List<E>) CatalogMetaService.getInstance().listCatalogsByNamespace(namespace);
case SCHEMA:
return (List<E>) SchemaMetaService.getInstance().listSchemasByNamespace(namespace);
case TABLE:
return (List<E>) TableMetaService.getInstance().listTablesByNamespace(namespace);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for list operation", entityType);
Expand All @@ -75,6 +79,8 @@ public <E extends Entity & HasIdentifier> void insert(E e, boolean overwritten)
CatalogMetaService.getInstance().insertCatalog((CatalogEntity) e, overwritten);
} else if (e instanceof SchemaEntity) {
SchemaMetaService.getInstance().insertSchema((SchemaEntity) e, overwritten);
} else if (e instanceof TableEntity) {
TableMetaService.getInstance().insertTable((TableEntity) e, overwritten);
} else {
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for insert operation", e.getClass());
Expand All @@ -92,6 +98,8 @@ public <E extends Entity & HasIdentifier> E update(
return (E) CatalogMetaService.getInstance().updateCatalog(ident, updater);
case SCHEMA:
return (E) SchemaMetaService.getInstance().updateSchema(ident, updater);
case TABLE:
return (E) TableMetaService.getInstance().updateTable(ident, updater);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for update operation", entityType);
Expand All @@ -108,6 +116,8 @@ public <E extends Entity & HasIdentifier> E get(
return (E) CatalogMetaService.getInstance().getCatalogByIdentifier(ident);
case SCHEMA:
return (E) SchemaMetaService.getInstance().getSchemaByIdentifier(ident);
case TABLE:
return (E) TableMetaService.getInstance().getTableByIdentifier(ident);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for get operation", entityType);
Expand All @@ -123,6 +133,8 @@ public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolea
return CatalogMetaService.getInstance().deleteCatalog(ident, cascade);
case SCHEMA:
return SchemaMetaService.getInstance().deleteSchema(ident, cascade);
case TABLE:
return TableMetaService.getInstance().deleteTable(ident);
default:
throw new UnsupportedEntityTypeException(
"Unsupported entity type: %s for delete operation", entityType);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.storage.relational.mapper;

import com.datastrato.gravitino.storage.relational.po.TablePO;
import java.util.List;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;

/**
* A MyBatis Mapper for table meta operation SQLs.
*
* <p>This interface class is a specification defined by MyBatis. It requires this interface class
* to identify the corresponding SQLs for execution. We can write SQLs in an additional XML file, or
* write SQLs with annotations in this interface Mapper. See: <a
* href="https://mybatis.org/mybatis-3/getting-started.html"></a>
*/
public interface TableMetaMapper {
String TABLE_NAME = "table_meta";

@Select(
"SELECT table_id as tableId, table_name as tableName,"
+ " metalake_id as metalakeId, catalog_id as catalogId,"
+ " schema_id as schemaId, audit_info as auditInfo,"
+ " current_version as currentVersion, last_version as lastVersion,"
+ " deleted_at as deletedAt"
+ " FROM "
+ TABLE_NAME
+ " WHERE schema_id = #{schemaId} AND deleted_at = 0")
List<TablePO> listTablePOsBySchemaId(@Param("schemaId") Long schemaId);

@Select(
"SELECT table_id as tableId FROM "
+ TABLE_NAME
+ " WHERE schema_id = #{schemaId} AND table_name = #{tableName}"
+ " AND deleted_at = 0")
Long selectTableIdBySchemaIdAndName(
@Param("schemaId") Long schemaId, @Param("tableName") String name);

@Select(
"SELECT table_id as tableId, table_name as tableName,"
+ " metalake_id as metalakeId, catalog_id as catalogId,"
+ " schema_id as schemaId, audit_info as auditInfo,"
+ " current_version as currentVersion, last_version as lastVersion,"
+ " deleted_at as deletedAt"
+ " FROM "
+ TABLE_NAME
+ " WHERE schema_id = #{schemaId} AND table_name = #{tableName} AND deleted_at = 0")
TablePO selectTableMetaBySchemaIdAndName(
@Param("schemaId") Long schemaId, @Param("tableName") String name);

@Insert(
"INSERT INTO "
+ TABLE_NAME
+ "(table_id, table_name, metalake_id,"
+ " catalog_id, schema_id, audit_info,"
+ " current_version, last_version, deleted_at)"
+ " VALUES("
+ " #{tableMeta.tableId},"
+ " #{tableMeta.tableName},"
+ " #{tableMeta.metalakeId},"
+ " #{tableMeta.catalogId},"
+ " #{tableMeta.schemaId},"
+ " #{tableMeta.auditInfo},"
+ " #{tableMeta.currentVersion},"
+ " #{tableMeta.lastVersion},"
+ " #{tableMeta.deletedAt}"
+ " )")
void insertTableMeta(@Param("tableMeta") TablePO tablePO);

@Insert(
"INSERT INTO "
+ TABLE_NAME
+ "(table_id, table_name, metalake_id,"
+ " catalog_id, schema_id, audit_info,"
+ " current_version, last_version, deleted_at)"
+ " VALUES("
+ " #{tableMeta.tableId},"
+ " #{tableMeta.tableName},"
+ " #{tableMeta.metalakeId},"
+ " #{tableMeta.catalogId},"
+ " #{tableMeta.schemaId},"
+ " #{tableMeta.auditInfo},"
+ " #{tableMeta.currentVersion},"
+ " #{tableMeta.lastVersion},"
+ " #{tableMeta.deletedAt}"
+ " )"
+ " ON DUPLICATE KEY UPDATE"
+ " table_name = #{tableMeta.tableName},"
+ " metalake_id = #{tableMeta.metalakeId},"
+ " catalog_id = #{tableMeta.catalogId},"
+ " schema_id = #{tableMeta.schemaId},"
+ " audit_info = #{tableMeta.auditInfo},"
+ " current_version = #{tableMeta.currentVersion},"
+ " last_version = #{tableMeta.lastVersion},"
+ " deleted_at = #{tableMeta.deletedAt}")
void insertTableMetaOnDuplicateKeyUpdate(@Param("tableMeta") TablePO tablePO);

@Update(
"UPDATE "
+ TABLE_NAME
+ " SET table_name = #{newTableMeta.tableName},"
+ " metalake_id = #{newTableMeta.metalakeId},"
+ " catalog_id = #{newTableMeta.catalogId},"
+ " schema_id = #{newTableMeta.schemaId},"
+ " audit_info = #{newTableMeta.auditInfo},"
+ " current_version = #{newTableMeta.currentVersion},"
+ " last_version = #{newTableMeta.lastVersion},"
+ " deleted_at = #{newTableMeta.deletedAt}"
+ " WHERE table_id = #{oldTableMeta.tableId}"
+ " AND table_name = #{oldTableMeta.tableName}"
+ " AND metalake_id = #{oldTableMeta.metalakeId}"
+ " AND catalog_id = #{oldTableMeta.catalogId}"
+ " AND schema_id = #{oldTableMeta.schemaId}"
+ " AND audit_info = #{oldTableMeta.auditInfo}"
+ " AND current_version = #{oldTableMeta.currentVersion}"
+ " AND last_version = #{oldTableMeta.lastVersion}"
+ " AND deleted_at = 0")
Integer updateTableMeta(
@Param("newTableMeta") TablePO newTablePO, @Param("oldTableMeta") TablePO oldTablePO);

@Update(
"UPDATE "
+ TABLE_NAME
+ " SET deleted_at = UNIX_TIMESTAMP()"
+ " WHERE table_id = #{tableId} AND deleted_at = 0")
Integer softDeleteTableMetasByTableId(@Param("tableId") Long tableId);

@Update(
"UPDATE "
+ TABLE_NAME
+ " SET deleted_at = UNIX_TIMESTAMP()"
+ " WHERE metalake_id = #{metalakeId} AND deleted_at = 0")
Integer softDeleteTableMetasByMetalakeId(@Param("metalakeId") Long metalakeId);

@Update(
"UPDATE "
+ TABLE_NAME
+ " SET deleted_at = UNIX_TIMESTAMP()"
+ " WHERE catalog_id = #{catalogId} AND deleted_at = 0")
Integer softDeleteTableMetasByCatalogId(@Param("catalogId") Long catalogId);

@Update(
"UPDATE "
+ TABLE_NAME
+ " SET deleted_at = UNIX_TIMESTAMP()"
+ " WHERE schema_id = #{schemaId} AND deleted_at = 0")
Integer softDeleteTableMetasBySchemaId(@Param("schemaId") Long schemaId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.storage.relational.po;

import com.google.common.base.Objects;
import com.google.common.base.Preconditions;

public class TablePO {
private Long tableId;
private String tableName;
private Long metalakeId;
private Long catalogId;
private Long schemaId;
private String auditInfo;
private Long currentVersion;
private Long lastVersion;
private Long deletedAt;

public Long getTableId() {
return tableId;
}

public String getTableName() {
return tableName;
}

public Long getMetalakeId() {
return metalakeId;
}

public Long getCatalogId() {
return catalogId;
}

public Long getSchemaId() {
return schemaId;
}

public String getAuditInfo() {
return auditInfo;
}

public Long getCurrentVersion() {
return currentVersion;
}

public Long getLastVersion() {
return lastVersion;
}

public Long getDeletedAt() {
return deletedAt;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof TablePO)) {
return false;
}
TablePO tablePO = (TablePO) o;
return Objects.equal(getTableId(), tablePO.getTableId())
&& Objects.equal(getTableName(), tablePO.getTableName())
&& Objects.equal(getMetalakeId(), tablePO.getMetalakeId())
&& Objects.equal(getCatalogId(), tablePO.getCatalogId())
&& Objects.equal(getSchemaId(), tablePO.getSchemaId())
&& Objects.equal(getAuditInfo(), tablePO.getAuditInfo())
&& Objects.equal(getCurrentVersion(), tablePO.getCurrentVersion())
&& Objects.equal(getLastVersion(), tablePO.getLastVersion())
&& Objects.equal(getDeletedAt(), tablePO.getDeletedAt());
}

@Override
public int hashCode() {
return Objects.hashCode(
getTableId(),
getTableName(),
getMetalakeId(),
getCatalogId(),
getSchemaId(),
getAuditInfo(),
getCurrentVersion(),
getLastVersion(),
getDeletedAt());
}

public static class Builder {
private final TablePO tablePO;

public Builder() {
tablePO = new TablePO();
}

public Builder withTableId(Long tableId) {
tablePO.tableId = tableId;
return this;
}

public Builder withTableName(String tableName) {
tablePO.tableName = tableName;
return this;
}

public Builder withMetalakeId(Long metalakeId) {
tablePO.metalakeId = metalakeId;
return this;
}

public Builder withCatalogId(Long catalogId) {
tablePO.catalogId = catalogId;
return this;
}

public Builder withSchemaId(Long schemaId) {
tablePO.schemaId = schemaId;
return this;
}

public Builder withAuditInfo(String auditInfo) {
tablePO.auditInfo = auditInfo;
return this;
}

public Builder withCurrentVersion(Long currentVersion) {
tablePO.currentVersion = currentVersion;
return this;
}

public Builder withLastVersion(Long lastVersion) {
tablePO.lastVersion = lastVersion;
return this;
}

public Builder withDeletedAt(Long deletedAt) {
tablePO.deletedAt = deletedAt;
return this;
}

private void validate() {
Preconditions.checkArgument(tablePO.tableId != null, "Table id is required");
Preconditions.checkArgument(tablePO.tableName != null, "Table name is required");
Preconditions.checkArgument(tablePO.metalakeId != null, "Metalake id is required");
Preconditions.checkArgument(tablePO.catalogId != null, "Catalog id is required");
Preconditions.checkArgument(tablePO.schemaId != null, "Schema id is required");
Preconditions.checkArgument(tablePO.auditInfo != null, "Audit info is required");
Preconditions.checkArgument(tablePO.currentVersion != null, "Current version is required");
Preconditions.checkArgument(tablePO.lastVersion != null, "Last version is required");
Preconditions.checkArgument(tablePO.deletedAt != null, "Deleted at is required");
}

public TablePO build() {
validate();
return tablePO;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.datastrato.gravitino.meta.SchemaEntity;
import com.datastrato.gravitino.storage.relational.mapper.CatalogMetaMapper;
import com.datastrato.gravitino.storage.relational.mapper.SchemaMetaMapper;
import com.datastrato.gravitino.storage.relational.mapper.TableMetaMapper;
import com.datastrato.gravitino.storage.relational.po.CatalogPO;
import com.datastrato.gravitino.storage.relational.utils.ExceptionUtils;
import com.datastrato.gravitino.storage.relational.utils.POConverters;
Expand Down Expand Up @@ -174,6 +175,10 @@ public boolean deleteCatalog(NameIdentifier identifier, boolean cascade) {
SessionUtils.doWithoutCommit(
SchemaMetaMapper.class,
mapper -> mapper.softDeleteSchemaMetasByCatalogId(catalogId)),
() ->
SessionUtils.doWithoutCommit(
TableMetaMapper.class,
mapper -> mapper.softDeleteTableMetasByCatalogId(catalogId)),
() -> {
// TODO We will cascade delete the metadata of sub-resources under the catalog
});
Expand Down
Loading

0 comments on commit 49bacdf

Please sign in to comment.