Skip to content

Commit

Permalink
Move PostgreSQL related mapper to the new package postgresql.
Browse files Browse the repository at this point in the history
  • Loading branch information
yuqi1129 committed Sep 6, 2024
1 parent c8ba80f commit 9d192b2
Show file tree
Hide file tree
Showing 32 changed files with 1,372 additions and 1,009 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@

package org.apache.gravitino.storage.relational.mapper;

import static org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper.TABLE_NAME;

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
import org.apache.gravitino.storage.relational.mapper.postgresql.CatalogMetaPostgreSQLProvider;
import org.apache.gravitino.storage.relational.po.CatalogPO;
import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
import org.apache.ibatis.annotations.Param;
Expand Down Expand Up @@ -52,88 +51,6 @@ static class CatalogMetaMySQLProvider extends CatalogMetaBaseSQLProvider {}

static class CatalogMetaH2Provider extends CatalogMetaBaseSQLProvider {}

static class CatalogMetaPostgreSQLProvider extends CatalogMetaBaseSQLProvider {

@Override
public String softDeleteCatalogMetasByCatalogId(Long catalogId) {
return "UPDATE "
+ TABLE_NAME
+ " SET deleted_at = floor(extract(epoch from((current_timestamp - timestamp '1970-01-01 00:00:00')*1000)))"
+ " WHERE catalog_id = #{catalogId} AND deleted_at = 0";
}

@Override
public String softDeleteCatalogMetasByMetalakeId(Long metalakeId) {
return "UPDATE "
+ TABLE_NAME
+ " SET deleted_at = floor(extract(epoch from((current_timestamp - timestamp '1970-01-01 00:00:00')*1000)))"
+ " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
}

@Override
public String insertCatalogMetaOnDuplicateKeyUpdate(CatalogPO catalogPO) {
return "INSERT INTO "
+ TABLE_NAME
+ "(catalog_id, catalog_name, metalake_id,"
+ " type, provider, catalog_comment, properties, audit_info,"
+ " current_version, last_version, deleted_at)"
+ " VALUES("
+ " #{catalogMeta.catalogId},"
+ " #{catalogMeta.catalogName},"
+ " #{catalogMeta.metalakeId},"
+ " #{catalogMeta.type},"
+ " #{catalogMeta.provider},"
+ " #{catalogMeta.catalogComment},"
+ " #{catalogMeta.properties},"
+ " #{catalogMeta.auditInfo},"
+ " #{catalogMeta.currentVersion},"
+ " #{catalogMeta.lastVersion},"
+ " #{catalogMeta.deletedAt}"
+ " )"
+ " ON CONFLICT(catalog_id) DO UPDATE SET"
+ " catalog_name = #{catalogMeta.catalogName},"
+ " metalake_id = #{catalogMeta.metalakeId},"
+ " type = #{catalogMeta.type},"
+ " provider = #{catalogMeta.provider},"
+ " catalog_comment = #{catalogMeta.catalogComment},"
+ " properties = #{catalogMeta.properties},"
+ " audit_info = #{catalogMeta.auditInfo},"
+ " current_version = #{catalogMeta.currentVersion},"
+ " last_version = #{catalogMeta.lastVersion},"
+ " deleted_at = #{catalogMeta.deletedAt}";
}

public String updateCatalogMeta(
@Param("newCatalogMeta") CatalogPO newCatalogPO,
@Param("oldCatalogMeta") CatalogPO oldCatalogPO) {
return "UPDATE "
+ TABLE_NAME
+ " SET catalog_name = #{newCatalogMeta.catalogName},"
+ " metalake_id = #{newCatalogMeta.metalakeId},"
+ " type = #{newCatalogMeta.type},"
+ " provider = #{newCatalogMeta.provider},"
+ " catalog_comment = #{newCatalogMeta.catalogComment},"
+ " properties = #{newCatalogMeta.properties},"
+ " audit_info = #{newCatalogMeta.auditInfo},"
+ " current_version = #{newCatalogMeta.currentVersion},"
+ " last_version = #{newCatalogMeta.lastVersion},"
+ " deleted_at = #{newCatalogMeta.deletedAt}"
+ " WHERE catalog_id = #{oldCatalogMeta.catalogId}"
+ " AND catalog_name = #{oldCatalogMeta.catalogName}"
+ " AND metalake_id = #{oldCatalogMeta.metalakeId}"
+ " AND type = #{oldCatalogMeta.type}"
+ " AND provider = #{oldCatalogMeta.provider}"
+ " AND (catalog_comment = #{oldCatalogMeta.catalogComment} "
+ " OR (CAST(catalog_comment AS VARCHAR) IS NULL AND "
+ " CAST(#{oldCatalogMeta.catalogComment} AS VARCHAR) IS NULL))"
+ " AND properties = #{oldCatalogMeta.properties}"
+ " AND audit_info = #{oldCatalogMeta.auditInfo}"
+ " AND current_version = #{oldCatalogMeta.currentVersion}"
+ " AND last_version = #{oldCatalogMeta.lastVersion}"
+ " AND deleted_at = 0";
}
}

public static String listCatalogPOsByMetalakeId(@Param("metalakeId") Long metalakeId) {
return getProvider().listCatalogPOsByMetalakeId(metalakeId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@

package org.apache.gravitino.storage.relational.mapper;

import static org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper.META_TABLE_NAME;

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
import org.apache.gravitino.storage.relational.mapper.postgresql.FilesetMetaPostgreSQLProvider;
import org.apache.gravitino.storage.relational.po.FilesetPO;
import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
import org.apache.ibatis.annotations.Param;
Expand Down Expand Up @@ -51,72 +50,6 @@ static class FilesetMetaMySQLProvider extends FilesetMetaBaseSQLProvider {}

static class FilesetMetaH2Provider extends FilesetMetaBaseSQLProvider {}

static class FilesetMetaPostgreSQLProvider extends FilesetMetaBaseSQLProvider {

@Override
public String softDeleteFilesetMetasByMetalakeId(Long metalakeId) {
return "UPDATE "
+ META_TABLE_NAME
+ " SET deleted_at = floor(extract(epoch from((current_timestamp - timestamp '1970-01-01 00:00:00')*1000)))"
+ " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
}

@Override
public String softDeleteFilesetMetasByCatalogId(Long catalogId) {
return "UPDATE "
+ META_TABLE_NAME
+ " SET deleted_at = floor(extract(epoch from((current_timestamp - timestamp '1970-01-01 00:00:00')*1000)))"
+ " WHERE catalog_id = #{catalogId} AND deleted_at = 0";
}

@Override
public String softDeleteFilesetMetasBySchemaId(Long schemaId) {
return "UPDATE "
+ META_TABLE_NAME
+ " SET deleted_at = floor(extract(epoch from((current_timestamp - timestamp '1970-01-01 00:00:00')*1000)))"
+ " WHERE schema_id = #{schemaId} AND deleted_at = 0";
}

@Override
public String softDeleteFilesetMetasByFilesetId(Long filesetId) {
return "UPDATE "
+ META_TABLE_NAME
+ " SET deleted_at = floor(extract(epoch from((current_timestamp - timestamp '1970-01-01 00:00:00')*1000)))"
+ " WHERE fileset_id = #{filesetId} AND deleted_at = 0";
}

@Override
public String insertFilesetMetaOnDuplicateKeyUpdate(FilesetPO filesetPO) {
return "INSERT INTO "
+ META_TABLE_NAME
+ "(fileset_id, fileset_name, metalake_id,"
+ " catalog_id, schema_id, type, audit_info,"
+ " current_version, last_version, deleted_at)"
+ " VALUES("
+ " #{filesetMeta.filesetId},"
+ " #{filesetMeta.filesetName},"
+ " #{filesetMeta.metalakeId},"
+ " #{filesetMeta.catalogId},"
+ " #{filesetMeta.schemaId},"
+ " #{filesetMeta.type},"
+ " #{filesetMeta.auditInfo},"
+ " #{filesetMeta.currentVersion},"
+ " #{filesetMeta.lastVersion},"
+ " #{filesetMeta.deletedAt}"
+ " )"
+ " ON CONFLICT(fileset_id) DO UPDATE SET"
+ " fileset_name = #{filesetMeta.filesetName},"
+ " metalake_id = #{filesetMeta.metalakeId},"
+ " catalog_id = #{filesetMeta.catalogId},"
+ " schema_id = #{filesetMeta.schemaId},"
+ " type = #{filesetMeta.type},"
+ " audit_info = #{filesetMeta.auditInfo},"
+ " current_version = #{filesetMeta.currentVersion},"
+ " last_version = #{filesetMeta.lastVersion},"
+ " deleted_at = #{filesetMeta.deletedAt}";
}
}

public static String listFilesetPOsBySchemaId(@Param("schemaId") Long schemaId) {
return getProvider().listFilesetPOsBySchemaId(schemaId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
* under the License.
*/

import static org.apache.gravitino.storage.relational.mapper.FilesetVersionMapper.VERSION_TABLE_NAME;

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
import org.apache.gravitino.storage.relational.mapper.postgresql.FilesetVersionPostgreSQLProvider;
import org.apache.gravitino.storage.relational.po.FilesetVersionPO;
import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
import org.apache.ibatis.annotations.Param;
Expand Down Expand Up @@ -50,80 +49,6 @@ static class FilesetVersionMySQLProvider extends FilesetVersionBaseSQLProvider {

static class FilesetVersionH2Provider extends FilesetVersionBaseSQLProvider {}

static class FilesetVersionPostgreSQLProvider extends FilesetVersionBaseSQLProvider {

@Override
public String softDeleteFilesetVersionsByMetalakeId(Long metalakeId) {
return "UPDATE "
+ VERSION_TABLE_NAME
+ " SET deleted_at = floor(extract(epoch from((current_timestamp - timestamp '1970-01-01 00:00:00')*1000)))"
+ " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
}

@Override
public String softDeleteFilesetVersionsByCatalogId(Long catalogId) {
return "UPDATE "
+ VERSION_TABLE_NAME
+ " SET deleted_at = floor(extract(epoch from((current_timestamp - timestamp '1970-01-01 00:00:00')*1000)))"
+ " WHERE catalog_id = #{catalogId} AND deleted_at = 0";
}

@Override
public String softDeleteFilesetVersionsBySchemaId(Long schemaId) {
return "UPDATE "
+ VERSION_TABLE_NAME
+ " SET deleted_at = floor(extract(epoch from((current_timestamp - timestamp '1970-01-01 00:00:00')*1000)))"
+ " WHERE schema_id = #{schemaId} AND deleted_at = 0";
}

@Override
public String softDeleteFilesetVersionsByFilesetId(Long filesetId) {
return "UPDATE "
+ VERSION_TABLE_NAME
+ " SET deleted_at = floor(extract(epoch from((current_timestamp - timestamp '1970-01-01 00:00:00')*1000)))"
+ " WHERE fileset_id = #{filesetId} AND deleted_at = 0";
}

@Override
public String softDeleteFilesetVersionsByRetentionLine(
Long filesetId, long versionRetentionLine, int limit) {
return "UPDATE "
+ VERSION_TABLE_NAME
+ " SET deleted_at = floor(extract(epoch from((current_timestamp - timestamp '1970-01-01 00:00:00')*1000)))"
+ " WHERE fileset_id = #{filesetId} AND version <= #{versionRetentionLine} AND deleted_at = 0 LIMIT #{limit}";
}

@Override
public String insertFilesetVersionOnDuplicateKeyUpdate(FilesetVersionPO filesetVersionPO) {
return "INSERT INTO "
+ VERSION_TABLE_NAME
+ "(metalake_id, catalog_id, schema_id, fileset_id,"
+ " version, fileset_comment, properties, storage_location,"
+ " deleted_at)"
+ " VALUES("
+ " #{filesetVersion.metalakeId},"
+ " #{filesetVersion.catalogId},"
+ " #{filesetVersion.schemaId},"
+ " #{filesetVersion.filesetId},"
+ " #{filesetVersion.version},"
+ " #{filesetVersion.filesetComment},"
+ " #{filesetVersion.properties},"
+ " #{filesetVersion.storageLocation},"
+ " #{filesetVersion.deletedAt}"
+ " )"
+ " ON CONFLICT(fileset_id, version, deleted_at) DO UPDATE SET"
+ " metalake_id = #{filesetVersion.metalakeId},"
+ " catalog_id = #{filesetVersion.catalogId},"
+ " schema_id = #{filesetVersion.schemaId},"
+ " fileset_id = #{filesetVersion.filesetId},"
+ " version = #{filesetVersion.version},"
+ " fileset_comment = #{filesetVersion.filesetComment},"
+ " properties = #{filesetVersion.properties},"
+ " storage_location = #{filesetVersion.storageLocation},"
+ " deleted_at = #{filesetVersion.deletedAt}";
}
}

public static String insertFilesetVersion(
@Param("filesetVersion") FilesetVersionPO filesetVersionPO) {
return getProvider().insertFilesetVersion(filesetVersionPO);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
*/
package org.apache.gravitino.storage.relational.mapper;

import static org.apache.gravitino.storage.relational.mapper.GroupMetaMapper.GROUP_TABLE_NAME;

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
import org.apache.gravitino.storage.relational.mapper.postgresql.GroupMetaPostgreSQLProvider;
import org.apache.gravitino.storage.relational.po.GroupPO;
import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
import org.apache.ibatis.annotations.Param;
Expand Down Expand Up @@ -50,50 +49,6 @@ static class GroupMetaMySQLProvider extends GroupMetaBaseSQLProvider {}

static class GroupMetaH2Provider extends GroupMetaBaseSQLProvider {}

static class GroupMetaPostgreSQLProvider extends GroupMetaBaseSQLProvider {

@Override
public String softDeleteGroupMetaByGroupId(Long groupId) {
return "UPDATE "
+ GROUP_TABLE_NAME
+ " SET deleted_at = floor(extract(epoch from((current_timestamp - timestamp '1970-01-01 00:00:00')*1000)))"
+ " WHERE group_id = #{groupId} AND deleted_at = 0";
}

@Override
public String softDeleteGroupMetasByMetalakeId(Long metalakeId) {
return "UPDATE "
+ GROUP_TABLE_NAME
+ " SET deleted_at = floor(extract(epoch from((current_timestamp - timestamp '1970-01-01 00:00:00')*1000)))"
+ " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
}

@Override
public String insertGroupMetaOnDuplicateKeyUpdate(GroupPO groupPO) {
return "INSERT INTO "
+ GROUP_TABLE_NAME
+ "(group_id, group_name,"
+ "metalake_id, audit_info,"
+ " current_version, last_version, deleted_at)"
+ " VALUES("
+ " #{groupMeta.groupId},"
+ " #{groupMeta.groupName},"
+ " #{groupMeta.metalakeId},"
+ " #{groupMeta.auditInfo},"
+ " #{groupMeta.currentVersion},"
+ " #{groupMeta.lastVersion},"
+ " #{groupMeta.deletedAt}"
+ " )"
+ " ON CONFLICT(group_id) DO UPDATE SET"
+ " group_name = #{groupMeta.groupName},"
+ " metalake_id = #{groupMeta.metalakeId},"
+ " audit_info = #{groupMeta.auditInfo},"
+ " current_version = #{groupMeta.currentVersion},"
+ " last_version = #{groupMeta.lastVersion},"
+ " deleted_at = #{groupMeta.deletedAt}";
}
}

public static String selectGroupIdBySchemaIdAndName(
@Param("metalakeId") Long metalakeId, @Param("groupName") String name) {
return getProvider().selectGroupIdBySchemaIdAndName(metalakeId, name);
Expand Down
Loading

0 comments on commit 9d192b2

Please sign in to comment.