diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java index 81e111a289b..b23c7667388 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java @@ -417,9 +417,10 @@ public void insertRelation( } } - enum JDBCBackendType { + public enum JDBCBackendType { H2(true), - MYSQL(false); + MYSQL(false), + POSTGRESQL(false); private final boolean embedded; @@ -432,10 +433,25 @@ public static JDBCBackendType fromURI(String jdbcURI) { return JDBCBackendType.H2; } else if (jdbcURI.startsWith("jdbc:mysql")) { return JDBCBackendType.MYSQL; + } else if (jdbcURI.startsWith("jdbc:postgresql")) { + return JDBCBackendType.POSTGRESQL; } else { throw new IllegalArgumentException("Unknown JDBC URI: " + jdbcURI); } } + + public static JDBCBackendType fromString(String jdbcType) { + switch (jdbcType) { + case "h2": + return JDBCBackendType.H2; + case "mysql": + return JDBCBackendType.MYSQL; + case "postgresql": + return JDBCBackendType.POSTGRESQL; + default: + throw new IllegalArgumentException("Unknown JDBC type: " + jdbcType); + } + } } /** Start JDBC database if necessary. For example, start the H2 database if the backend is H2. */ diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaBaseSQLProvider.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaBaseSQLProvider.java new file mode 100644 index 00000000000..cce5783898f --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaBaseSQLProvider.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.storage.relational.mapper; + +import static org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper.TABLE_NAME; + +import org.apache.gravitino.storage.relational.po.CatalogPO; +import org.apache.ibatis.annotations.Param; + +public class CatalogMetaBaseSQLProvider { + public String listCatalogPOsByMetalakeId(@Param("metalakeId") Long metalakeId) { + return "SELECT catalog_id as catalogId, catalog_name as catalogName," + + " metalake_id as metalakeId, type, provider," + + " catalog_comment as catalogComment, properties, audit_info as auditInfo," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0"; + } + + public String selectCatalogIdByMetalakeIdAndName( + @Param("metalakeId") Long metalakeId, @Param("catalogName") String name) { + return "SELECT catalog_id as catalogId FROM " + + TABLE_NAME + + " WHERE metalake_id = #{metalakeId} AND catalog_name = #{catalogName} AND deleted_at = 0"; + } + + public String selectCatalogMetaByMetalakeIdAndName( + @Param("metalakeId") Long metalakeId, @Param("catalogName") String name) { + return "SELECT catalog_id as catalogId, catalog_name as catalogName," + + " metalake_id as metalakeId, type, provider," + + " catalog_comment as catalogComment, properties, audit_info as auditInfo," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE metalake_id = #{metalakeId} AND catalog_name = #{catalogName} AND deleted_at = 0"; + } + + public String selectCatalogMetaById(@Param("catalogId") Long catalogId) { + return "SELECT catalog_id as catalogId, catalog_name as catalogName," + + " metalake_id as metalakeId, type, provider," + + " catalog_comment as catalogComment, properties, audit_info as auditInfo," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE catalog_id = #{catalogId} AND deleted_at = 0"; + } + + public String insertCatalogMeta(@Param("catalogMeta") CatalogPO catalogPO) { + return "INSERT INTO " + + TABLE_NAME + + "(catalog_id, catalog_name, metalake_id," + + " type, provider, catalog_comment, properties, audit_info," + + " current_version, last_version, deleted_at)" + + " VALUES(" + + " #{catalogMeta.catalogId}," + + " #{catalogMeta.catalogName}," + + " #{catalogMeta.metalakeId}," + + " #{catalogMeta.type}," + + " #{catalogMeta.provider}," + + " #{catalogMeta.catalogComment}," + + " #{catalogMeta.properties}," + + " #{catalogMeta.auditInfo}," + + " #{catalogMeta.currentVersion}," + + " #{catalogMeta.lastVersion}," + + " #{catalogMeta.deletedAt}" + + " )"; + } + + public String insertCatalogMetaOnDuplicateKeyUpdate(@Param("catalogMeta") CatalogPO catalogPO) { + return "INSERT INTO " + + TABLE_NAME + + "(catalog_id, catalog_name, metalake_id," + + " type, provider, catalog_comment, properties, audit_info," + + " current_version, last_version, deleted_at)" + + " VALUES(" + + " #{catalogMeta.catalogId}," + + " #{catalogMeta.catalogName}," + + " #{catalogMeta.metalakeId}," + + " #{catalogMeta.type}," + + " #{catalogMeta.provider}," + + " #{catalogMeta.catalogComment}," + + " #{catalogMeta.properties}," + + " #{catalogMeta.auditInfo}," + + " #{catalogMeta.currentVersion}," + + " #{catalogMeta.lastVersion}," + + " #{catalogMeta.deletedAt}" + + " )" + + " ON DUPLICATE KEY UPDATE" + + " catalog_name = #{catalogMeta.catalogName}," + + " metalake_id = #{catalogMeta.metalakeId}," + + " type = #{catalogMeta.type}," + + " provider = #{catalogMeta.provider}," + + " catalog_comment = #{catalogMeta.catalogComment}," + + " properties = #{catalogMeta.properties}," + + " audit_info = #{catalogMeta.auditInfo}," + + " current_version = #{catalogMeta.currentVersion}," + + " last_version = #{catalogMeta.lastVersion}," + + " deleted_at = #{catalogMeta.deletedAt}"; + } + + public String updateCatalogMeta( + @Param("newCatalogMeta") CatalogPO newCatalogPO, + @Param("oldCatalogMeta") CatalogPO oldCatalogPO) { + return "UPDATE " + + TABLE_NAME + + " SET catalog_name = #{newCatalogMeta.catalogName}," + + " metalake_id = #{newCatalogMeta.metalakeId}," + + " type = #{newCatalogMeta.type}," + + " provider = #{newCatalogMeta.provider}," + + " catalog_comment = #{newCatalogMeta.catalogComment}," + + " properties = #{newCatalogMeta.properties}," + + " audit_info = #{newCatalogMeta.auditInfo}," + + " current_version = #{newCatalogMeta.currentVersion}," + + " last_version = #{newCatalogMeta.lastVersion}," + + " deleted_at = #{newCatalogMeta.deletedAt}" + + " WHERE catalog_id = #{oldCatalogMeta.catalogId}" + + " AND catalog_name = #{oldCatalogMeta.catalogName}" + + " AND metalake_id = #{oldCatalogMeta.metalakeId}" + + " AND type = #{oldCatalogMeta.type}" + + " AND provider = #{oldCatalogMeta.provider}" + + " AND (catalog_comment = #{oldCatalogMeta.catalogComment} " + + " OR (catalog_comment IS NULL and #{oldCatalogMeta.catalogComment} IS NULL))" + + " AND properties = #{oldCatalogMeta.properties}" + + " AND audit_info = #{oldCatalogMeta.auditInfo}" + + " AND current_version = #{oldCatalogMeta.currentVersion}" + + " AND last_version = #{oldCatalogMeta.lastVersion}" + + " AND deleted_at = 0"; + } + + public String softDeleteCatalogMetasByCatalogId(@Param("catalogId") Long catalogId) { + return "UPDATE " + + TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE catalog_id = #{catalogId} AND deleted_at = 0"; + } + + public String softDeleteCatalogMetasByMetalakeId(@Param("metalakeId") Long metalakeId) { + return "UPDATE " + + TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0"; + } + + public String deleteCatalogMetasByLegacyTimeline( + @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) { + return "DELETE FROM " + + TABLE_NAME + + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}"; + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaMapper.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaMapper.java index faedbcd9642..01cadbb6e8b 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaMapper.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaMapper.java @@ -21,11 +21,11 @@ import java.util.List; import org.apache.gravitino.storage.relational.po.CatalogPO; -import org.apache.ibatis.annotations.Delete; -import org.apache.ibatis.annotations.Insert; +import org.apache.ibatis.annotations.DeleteProvider; +import org.apache.ibatis.annotations.InsertProvider; import org.apache.ibatis.annotations.Param; -import org.apache.ibatis.annotations.Select; -import org.apache.ibatis.annotations.Update; +import org.apache.ibatis.annotations.SelectProvider; +import org.apache.ibatis.annotations.UpdateProvider; /** * A MyBatis Mapper for catalog meta operation SQLs. @@ -38,149 +38,50 @@ public interface CatalogMetaMapper { String TABLE_NAME = "catalog_meta"; - @Select( - "SELECT catalog_id as catalogId, catalog_name as catalogName," - + " metalake_id as metalakeId, type, provider," - + " catalog_comment as catalogComment, properties, audit_info as auditInfo," - + " current_version as currentVersion, last_version as lastVersion," - + " deleted_at as deletedAt" - + " FROM " - + TABLE_NAME - + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0") + @SelectProvider(type = CatalogMetaSQLProviderFactory.class, method = "listCatalogPOsByMetalakeId") List listCatalogPOsByMetalakeId(@Param("metalakeId") Long metalakeId); - @Select( - "SELECT catalog_id as catalogId FROM " - + TABLE_NAME - + " WHERE metalake_id = #{metalakeId} AND catalog_name = #{catalogName} AND deleted_at = 0") + @SelectProvider( + type = CatalogMetaSQLProviderFactory.class, + method = "selectCatalogIdByMetalakeIdAndName") Long selectCatalogIdByMetalakeIdAndName( @Param("metalakeId") Long metalakeId, @Param("catalogName") String name); - @Select( - "SELECT catalog_id as catalogId, catalog_name as catalogName," - + " metalake_id as metalakeId, type, provider," - + " catalog_comment as catalogComment, properties, audit_info as auditInfo," - + " current_version as currentVersion, last_version as lastVersion," - + " deleted_at as deletedAt" - + " FROM " - + TABLE_NAME - + " WHERE metalake_id = #{metalakeId} AND catalog_name = #{catalogName} AND deleted_at = 0") + @SelectProvider( + type = CatalogMetaSQLProviderFactory.class, + method = "selectCatalogMetaByMetalakeIdAndName") CatalogPO selectCatalogMetaByMetalakeIdAndName( @Param("metalakeId") Long metalakeId, @Param("catalogName") String name); - @Select( - "SELECT catalog_id as catalogId, catalog_name as catalogName," - + " metalake_id as metalakeId, type, provider," - + " catalog_comment as catalogComment, properties, audit_info as auditInfo," - + " current_version as currentVersion, last_version as lastVersion," - + " deleted_at as deletedAt" - + " FROM " - + TABLE_NAME - + " WHERE catalog_id = #{catalogId} AND deleted_at = 0") + @SelectProvider(type = CatalogMetaSQLProviderFactory.class, method = "selectCatalogMetaById") CatalogPO selectCatalogMetaById(@Param("catalogId") Long catalogId); - @Insert( - "INSERT INTO " - + TABLE_NAME - + "(catalog_id, catalog_name, metalake_id," - + " type, provider, catalog_comment, properties, audit_info," - + " current_version, last_version, deleted_at)" - + " VALUES(" - + " #{catalogMeta.catalogId}," - + " #{catalogMeta.catalogName}," - + " #{catalogMeta.metalakeId}," - + " #{catalogMeta.type}," - + " #{catalogMeta.provider}," - + " #{catalogMeta.catalogComment}," - + " #{catalogMeta.properties}," - + " #{catalogMeta.auditInfo}," - + " #{catalogMeta.currentVersion}," - + " #{catalogMeta.lastVersion}," - + " #{catalogMeta.deletedAt}" - + " )") + @InsertProvider(type = CatalogMetaSQLProviderFactory.class, method = "insertCatalogMeta") void insertCatalogMeta(@Param("catalogMeta") CatalogPO catalogPO); - @Insert( - "INSERT INTO " - + TABLE_NAME - + "(catalog_id, catalog_name, metalake_id," - + " type, provider, catalog_comment, properties, audit_info," - + " current_version, last_version, deleted_at)" - + " VALUES(" - + " #{catalogMeta.catalogId}," - + " #{catalogMeta.catalogName}," - + " #{catalogMeta.metalakeId}," - + " #{catalogMeta.type}," - + " #{catalogMeta.provider}," - + " #{catalogMeta.catalogComment}," - + " #{catalogMeta.properties}," - + " #{catalogMeta.auditInfo}," - + " #{catalogMeta.currentVersion}," - + " #{catalogMeta.lastVersion}," - + " #{catalogMeta.deletedAt}" - + " )" - + " ON DUPLICATE KEY UPDATE" - + " catalog_name = #{catalogMeta.catalogName}," - + " metalake_id = #{catalogMeta.metalakeId}," - + " type = #{catalogMeta.type}," - + " provider = #{catalogMeta.provider}," - + " catalog_comment = #{catalogMeta.catalogComment}," - + " properties = #{catalogMeta.properties}," - + " audit_info = #{catalogMeta.auditInfo}," - + " current_version = #{catalogMeta.currentVersion}," - + " last_version = #{catalogMeta.lastVersion}," - + " deleted_at = #{catalogMeta.deletedAt}") + @InsertProvider( + type = CatalogMetaSQLProviderFactory.class, + method = "insertCatalogMetaOnDuplicateKeyUpdate") void insertCatalogMetaOnDuplicateKeyUpdate(@Param("catalogMeta") CatalogPO catalogPO); - @Update( - "UPDATE " - + TABLE_NAME - + " SET catalog_name = #{newCatalogMeta.catalogName}," - + " metalake_id = #{newCatalogMeta.metalakeId}," - + " type = #{newCatalogMeta.type}," - + " provider = #{newCatalogMeta.provider}," - + " catalog_comment = #{newCatalogMeta.catalogComment}," - + " properties = #{newCatalogMeta.properties}," - + " audit_info = #{newCatalogMeta.auditInfo}," - + " current_version = #{newCatalogMeta.currentVersion}," - + " last_version = #{newCatalogMeta.lastVersion}," - + " deleted_at = #{newCatalogMeta.deletedAt}" - + " WHERE catalog_id = #{oldCatalogMeta.catalogId}" - + " AND catalog_name = #{oldCatalogMeta.catalogName}" - + " AND metalake_id = #{oldCatalogMeta.metalakeId}" - + " AND type = #{oldCatalogMeta.type}" - + " AND provider = #{oldCatalogMeta.provider}" - + " AND (catalog_comment = #{oldCatalogMeta.catalogComment} " - + " OR (catalog_comment IS NULL and #{oldCatalogMeta.catalogComment} IS NULL))" - + " AND properties = #{oldCatalogMeta.properties}" - + " AND audit_info = #{oldCatalogMeta.auditInfo}" - + " AND current_version = #{oldCatalogMeta.currentVersion}" - + " AND last_version = #{oldCatalogMeta.lastVersion}" - + " AND deleted_at = 0") + @UpdateProvider(type = CatalogMetaSQLProviderFactory.class, method = "updateCatalogMeta") Integer updateCatalogMeta( @Param("newCatalogMeta") CatalogPO newCatalogPO, @Param("oldCatalogMeta") CatalogPO oldCatalogPO); - @Update( - "UPDATE " - + TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE catalog_id = #{catalogId} AND deleted_at = 0") + @UpdateProvider( + type = CatalogMetaSQLProviderFactory.class, + method = "softDeleteCatalogMetasByCatalogId") Integer softDeleteCatalogMetasByCatalogId(@Param("catalogId") Long catalogId); - @Update( - "UPDATE " - + TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0") + @UpdateProvider( + type = CatalogMetaSQLProviderFactory.class, + method = "softDeleteCatalogMetasByMetalakeId") Integer softDeleteCatalogMetasByMetalakeId(@Param("metalakeId") Long metalakeId); - @Delete( - "DELETE FROM " - + TABLE_NAME - + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}") + @DeleteProvider( + type = CatalogMetaSQLProviderFactory.class, + method = "deleteCatalogMetasByLegacyTimeline") Integer deleteCatalogMetasByLegacyTimeline( @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit); } diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaSQLProviderFactory.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaSQLProviderFactory.java new file mode 100644 index 00000000000..5c0e63f531b --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/CatalogMetaSQLProviderFactory.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.storage.relational.mapper; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType; +import org.apache.gravitino.storage.relational.po.CatalogPO; +import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper; +import org.apache.ibatis.annotations.Param; + +public class CatalogMetaSQLProviderFactory { + + private static final Map + METALAKE_META_SQL_PROVIDER_MAP = + ImmutableMap.of( + JDBCBackendType.MYSQL, new CatalogMetaMySQLProvider(), + JDBCBackendType.H2, new CatalogMetaH2Provider()); + + public static CatalogMetaBaseSQLProvider getProvider() { + String databaseId = + SqlSessionFactoryHelper.getInstance() + .getSqlSessionFactory() + .getConfiguration() + .getDatabaseId(); + + JDBCBackendType jdbcBackendType = JDBCBackendType.fromString(databaseId); + return METALAKE_META_SQL_PROVIDER_MAP.get(jdbcBackendType); + } + + static class CatalogMetaMySQLProvider extends CatalogMetaBaseSQLProvider {} + + static class CatalogMetaH2Provider extends CatalogMetaBaseSQLProvider {} + + public static String listCatalogPOsByMetalakeId(@Param("metalakeId") Long metalakeId) { + return getProvider().listCatalogPOsByMetalakeId(metalakeId); + } + + public static String selectCatalogIdByMetalakeIdAndName( + @Param("metalakeId") Long metalakeId, @Param("catalogName") String name) { + return getProvider().selectCatalogIdByMetalakeIdAndName(metalakeId, name); + } + + public static String selectCatalogMetaByMetalakeIdAndName( + @Param("metalakeId") Long metalakeId, @Param("catalogName") String name) { + return getProvider().selectCatalogMetaByMetalakeIdAndName(metalakeId, name); + } + + public static String selectCatalogMetaById(@Param("catalogId") Long catalogId) { + return getProvider().selectCatalogMetaById(catalogId); + } + + public static String insertCatalogMeta(@Param("catalogMeta") CatalogPO catalogPO) { + return getProvider().insertCatalogMeta(catalogPO); + } + + public static String insertCatalogMetaOnDuplicateKeyUpdate( + @Param("catalogMeta") CatalogPO catalogPO) { + return getProvider().insertCatalogMetaOnDuplicateKeyUpdate(catalogPO); + } + + public static String updateCatalogMeta( + @Param("newCatalogMeta") CatalogPO newCatalogPO, + @Param("oldCatalogMeta") CatalogPO oldCatalogPO) { + return getProvider().updateCatalogMeta(newCatalogPO, oldCatalogPO); + } + + public static String softDeleteCatalogMetasByCatalogId(@Param("catalogId") Long catalogId) { + return getProvider().softDeleteCatalogMetasByCatalogId(catalogId); + } + + public static String softDeleteCatalogMetasByMetalakeId(@Param("metalakeId") Long metalakeId) { + return getProvider().softDeleteCatalogMetasByMetalakeId(metalakeId); + } + + public static String deleteCatalogMetasByLegacyTimeline( + @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) { + return getProvider().deleteCatalogMetasByLegacyTimeline(legacyTimeline, limit); + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaBaseSQLProvider.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaBaseSQLProvider.java new file mode 100644 index 00000000000..fc79e6d8ead --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaBaseSQLProvider.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.storage.relational.mapper; + +import static org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper.META_TABLE_NAME; +import static org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper.VERSION_TABLE_NAME; + +import org.apache.gravitino.storage.relational.po.FilesetPO; +import org.apache.ibatis.annotations.Param; + +public class FilesetMetaBaseSQLProvider { + + public String listFilesetPOsBySchemaId(@Param("schemaId") Long schemaId) { + return "SELECT fm.fileset_id, fm.fileset_name, fm.metalake_id, fm.catalog_id, fm.schema_id," + + " fm.type, fm.audit_info, fm.current_version, fm.last_version, fm.deleted_at," + + " vi.id, vi.metalake_id as version_metalake_id, vi.catalog_id as version_catalog_id," + + " vi.schema_id as version_schema_id, vi.fileset_id as version_fileset_id," + + " vi.version, vi.fileset_comment, vi.properties, vi.storage_location," + + " vi.deleted_at as version_deleted_at" + + " FROM " + + META_TABLE_NAME + + " fm INNER JOIN " + + VERSION_TABLE_NAME + + " vi ON fm.fileset_id = vi.fileset_id AND fm.current_version = vi.version" + + " WHERE fm.schema_id = #{schemaId} AND fm.deleted_at = 0 AND vi.deleted_at = 0"; + } + + public String selectFilesetIdBySchemaIdAndName( + @Param("schemaId") Long schemaId, @Param("filesetName") String name) { + return "SELECT fileset_id as filesetId FROM " + + META_TABLE_NAME + + " WHERE schema_id = #{schemaId} AND fileset_name = #{filesetName}" + + " AND deleted_at = 0"; + } + + public String selectFilesetMetaBySchemaIdAndName( + @Param("schemaId") Long schemaId, @Param("filesetName") String name) { + return "SELECT fm.fileset_id, fm.fileset_name, fm.metalake_id, fm.catalog_id, fm.schema_id," + + " fm.type, fm.audit_info, fm.current_version, fm.last_version, fm.deleted_at," + + " vi.id, vi.metalake_id as version_metalake_id, vi.catalog_id as version_catalog_id," + + " vi.schema_id as version_schema_id, vi.fileset_id as version_fileset_id," + + " vi.version, vi.fileset_comment, vi.properties, vi.storage_location," + + " vi.deleted_at as version_deleted_at" + + " FROM " + + META_TABLE_NAME + + " fm INNER JOIN " + + VERSION_TABLE_NAME + + " vi ON fm.fileset_id = vi.fileset_id AND fm.current_version = vi.version" + + " WHERE fm.schema_id = #{schemaId} AND fm.fileset_name = #{filesetName}" + + " AND fm.deleted_at = 0 AND vi.deleted_at = 0"; + } + + public String selectFilesetMetaById(@Param("filesetId") Long filesetId) { + return "SELECT fm.fileset_id, fm.fileset_name, fm.metalake_id, fm.catalog_id, fm.schema_id," + + " fm.type, fm.audit_info, fm.current_version, fm.last_version, fm.deleted_at," + + " vi.id, vi.metalake_id as version_metalake_id, vi.catalog_id as version_catalog_id," + + " vi.schema_id as version_schema_id, vi.fileset_id as version_fileset_id," + + " vi.version, vi.fileset_comment, vi.properties, vi.storage_location," + + " vi.deleted_at as version_deleted_at" + + " FROM " + + META_TABLE_NAME + + " fm INNER JOIN " + + VERSION_TABLE_NAME + + " vi ON fm.fileset_id = vi.fileset_id AND fm.current_version = vi.version" + + " WHERE fm.fileset_id = #{filesetId}" + + " AND fm.deleted_at = 0 AND vi.deleted_at = 0"; + } + + public String insertFilesetMeta(@Param("filesetMeta") FilesetPO filesetPO) { + return "INSERT INTO " + + META_TABLE_NAME + + "(fileset_id, fileset_name, metalake_id," + + " catalog_id, schema_id, type, audit_info," + + " current_version, last_version, deleted_at)" + + " VALUES(" + + " #{filesetMeta.filesetId}," + + " #{filesetMeta.filesetName}," + + " #{filesetMeta.metalakeId}," + + " #{filesetMeta.catalogId}," + + " #{filesetMeta.schemaId}," + + " #{filesetMeta.type}," + + " #{filesetMeta.auditInfo}," + + " #{filesetMeta.currentVersion}," + + " #{filesetMeta.lastVersion}," + + " #{filesetMeta.deletedAt}" + + " )"; + } + + public String insertFilesetMetaOnDuplicateKeyUpdate(@Param("filesetMeta") FilesetPO filesetPO) { + return "INSERT INTO " + + META_TABLE_NAME + + "(fileset_id, fileset_name, metalake_id," + + " catalog_id, schema_id, type, audit_info," + + " current_version, last_version, deleted_at)" + + " VALUES(" + + " #{filesetMeta.filesetId}," + + " #{filesetMeta.filesetName}," + + " #{filesetMeta.metalakeId}," + + " #{filesetMeta.catalogId}," + + " #{filesetMeta.schemaId}," + + " #{filesetMeta.type}," + + " #{filesetMeta.auditInfo}," + + " #{filesetMeta.currentVersion}," + + " #{filesetMeta.lastVersion}," + + " #{filesetMeta.deletedAt}" + + " )" + + " ON DUPLICATE KEY UPDATE" + + " fileset_name = #{filesetMeta.filesetName}," + + " metalake_id = #{filesetMeta.metalakeId}," + + " catalog_id = #{filesetMeta.catalogId}," + + " schema_id = #{filesetMeta.schemaId}," + + " type = #{filesetMeta.type}," + + " audit_info = #{filesetMeta.auditInfo}," + + " current_version = #{filesetMeta.currentVersion}," + + " last_version = #{filesetMeta.lastVersion}," + + " deleted_at = #{filesetMeta.deletedAt}"; + } + + public String updateFilesetMeta( + @Param("newFilesetMeta") FilesetPO newFilesetPO, + @Param("oldFilesetMeta") FilesetPO oldFilesetPO) { + return "UPDATE " + + META_TABLE_NAME + + " SET fileset_name = #{newFilesetMeta.filesetName}," + + " metalake_id = #{newFilesetMeta.metalakeId}," + + " catalog_id = #{newFilesetMeta.catalogId}," + + " schema_id = #{newFilesetMeta.schemaId}," + + " type = #{newFilesetMeta.type}," + + " audit_info = #{newFilesetMeta.auditInfo}," + + " current_version = #{newFilesetMeta.currentVersion}," + + " last_version = #{newFilesetMeta.lastVersion}," + + " deleted_at = #{newFilesetMeta.deletedAt}" + + " WHERE fileset_id = #{oldFilesetMeta.filesetId}" + + " AND fileset_name = #{oldFilesetMeta.filesetName}" + + " AND metalake_id = #{oldFilesetMeta.metalakeId}" + + " AND catalog_id = #{oldFilesetMeta.catalogId}" + + " AND schema_id = #{oldFilesetMeta.schemaId}" + + " AND type = #{oldFilesetMeta.type}" + + " AND audit_info = #{oldFilesetMeta.auditInfo}" + + " AND current_version = #{oldFilesetMeta.currentVersion}" + + " AND last_version = #{oldFilesetMeta.lastVersion}" + + " AND deleted_at = 0"; + } + + public String softDeleteFilesetMetasByMetalakeId(@Param("metalakeId") Long metalakeId) { + return "UPDATE " + + META_TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0"; + } + + public String softDeleteFilesetMetasByCatalogId(@Param("catalogId") Long catalogId) { + return "UPDATE " + + META_TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE catalog_id = #{catalogId} AND deleted_at = 0"; + } + + public String softDeleteFilesetMetasBySchemaId(@Param("schemaId") Long schemaId) { + return "UPDATE " + + META_TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE schema_id = #{schemaId} AND deleted_at = 0"; + } + + public String softDeleteFilesetMetasByFilesetId(@Param("filesetId") Long filesetId) { + return "UPDATE " + + META_TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE fileset_id = #{filesetId} AND deleted_at = 0"; + } + + public String deleteFilesetMetasByLegacyTimeline( + @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) { + return "DELETE FROM " + + META_TABLE_NAME + + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}"; + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaMapper.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaMapper.java index 61e4d2f95ff..8692f8f890d 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaMapper.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaMapper.java @@ -21,13 +21,13 @@ import java.util.List; import org.apache.gravitino.storage.relational.po.FilesetPO; -import org.apache.ibatis.annotations.Delete; -import org.apache.ibatis.annotations.Insert; +import org.apache.ibatis.annotations.DeleteProvider; +import org.apache.ibatis.annotations.InsertProvider; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Result; import org.apache.ibatis.annotations.Results; -import org.apache.ibatis.annotations.Select; -import org.apache.ibatis.annotations.Update; +import org.apache.ibatis.annotations.SelectProvider; +import org.apache.ibatis.annotations.UpdateProvider; /** * A MyBatis Mapper for fileset meta operation SQLs. @@ -42,19 +42,6 @@ public interface FilesetMetaMapper { String VERSION_TABLE_NAME = "fileset_version_info"; - @Select( - "SELECT fm.fileset_id, fm.fileset_name, fm.metalake_id, fm.catalog_id, fm.schema_id," - + " fm.type, fm.audit_info, fm.current_version, fm.last_version, fm.deleted_at," - + " vi.id, vi.metalake_id as version_metalake_id, vi.catalog_id as version_catalog_id," - + " vi.schema_id as version_schema_id, vi.fileset_id as version_fileset_id," - + " vi.version, vi.fileset_comment, vi.properties, vi.storage_location," - + " vi.deleted_at as version_deleted_at" - + " FROM " - + META_TABLE_NAME - + " fm INNER JOIN " - + VERSION_TABLE_NAME - + " vi ON fm.fileset_id = vi.fileset_id AND fm.current_version = vi.version" - + " WHERE fm.schema_id = #{schemaId} AND fm.deleted_at = 0 AND vi.deleted_at = 0") @Results({ @Result(property = "filesetId", column = "fileset_id"), @Result(property = "filesetName", column = "fileset_name"), @@ -77,30 +64,15 @@ public interface FilesetMetaMapper { @Result(property = "filesetVersionPO.storageLocation", column = "storage_location"), @Result(property = "filesetVersionPO.deletedAt", column = "version_deleted_at") }) + @SelectProvider(type = FilesetMetaSQLProviderFactory.class, method = "listFilesetPOsBySchemaId") List listFilesetPOsBySchemaId(@Param("schemaId") Long schemaId); - @Select( - "SELECT fileset_id as filesetId FROM " - + META_TABLE_NAME - + " WHERE schema_id = #{schemaId} AND fileset_name = #{filesetName}" - + " AND deleted_at = 0") + @SelectProvider( + type = FilesetMetaSQLProviderFactory.class, + method = "selectFilesetIdBySchemaIdAndName") Long selectFilesetIdBySchemaIdAndName( @Param("schemaId") Long schemaId, @Param("filesetName") String name); - @Select( - "SELECT fm.fileset_id, fm.fileset_name, fm.metalake_id, fm.catalog_id, fm.schema_id," - + " fm.type, fm.audit_info, fm.current_version, fm.last_version, fm.deleted_at," - + " vi.id, vi.metalake_id as version_metalake_id, vi.catalog_id as version_catalog_id," - + " vi.schema_id as version_schema_id, vi.fileset_id as version_fileset_id," - + " vi.version, vi.fileset_comment, vi.properties, vi.storage_location," - + " vi.deleted_at as version_deleted_at" - + " FROM " - + META_TABLE_NAME - + " fm INNER JOIN " - + VERSION_TABLE_NAME - + " vi ON fm.fileset_id = vi.fileset_id AND fm.current_version = vi.version" - + " WHERE fm.schema_id = #{schemaId} AND fm.fileset_name = #{filesetName}" - + " AND fm.deleted_at = 0 AND vi.deleted_at = 0") @Results({ @Result(property = "filesetId", column = "fileset_id"), @Result(property = "filesetName", column = "fileset_name"), @@ -123,23 +95,12 @@ Long selectFilesetIdBySchemaIdAndName( @Result(property = "filesetVersionPO.storageLocation", column = "storage_location"), @Result(property = "filesetVersionPO.deletedAt", column = "version_deleted_at") }) + @SelectProvider( + type = FilesetMetaSQLProviderFactory.class, + method = "selectFilesetMetaBySchemaIdAndName") FilesetPO selectFilesetMetaBySchemaIdAndName( @Param("schemaId") Long schemaId, @Param("filesetName") String name); - @Select( - "SELECT fm.fileset_id, fm.fileset_name, fm.metalake_id, fm.catalog_id, fm.schema_id," - + " fm.type, fm.audit_info, fm.current_version, fm.last_version, fm.deleted_at," - + " vi.id, vi.metalake_id as version_metalake_id, vi.catalog_id as version_catalog_id," - + " vi.schema_id as version_schema_id, vi.fileset_id as version_fileset_id," - + " vi.version, vi.fileset_comment, vi.properties, vi.storage_location," - + " vi.deleted_at as version_deleted_at" - + " FROM " - + META_TABLE_NAME - + " fm INNER JOIN " - + VERSION_TABLE_NAME - + " vi ON fm.fileset_id = vi.fileset_id AND fm.current_version = vi.version" - + " WHERE fm.fileset_id = #{filesetId}" - + " AND fm.deleted_at = 0 AND vi.deleted_at = 0") @Results({ @Result(property = "filesetId", column = "fileset_id"), @Result(property = "filesetName", column = "fileset_name"), @@ -162,120 +123,45 @@ FilesetPO selectFilesetMetaBySchemaIdAndName( @Result(property = "filesetVersionPO.storageLocation", column = "storage_location"), @Result(property = "filesetVersionPO.deletedAt", column = "version_deleted_at") }) + @SelectProvider(type = FilesetMetaSQLProviderFactory.class, method = "selectFilesetMetaById") FilesetPO selectFilesetMetaById(@Param("filesetId") Long filesetId); - @Insert( - "INSERT INTO " - + META_TABLE_NAME - + "(fileset_id, fileset_name, metalake_id," - + " catalog_id, schema_id, type, audit_info," - + " current_version, last_version, deleted_at)" - + " VALUES(" - + " #{filesetMeta.filesetId}," - + " #{filesetMeta.filesetName}," - + " #{filesetMeta.metalakeId}," - + " #{filesetMeta.catalogId}," - + " #{filesetMeta.schemaId}," - + " #{filesetMeta.type}," - + " #{filesetMeta.auditInfo}," - + " #{filesetMeta.currentVersion}," - + " #{filesetMeta.lastVersion}," - + " #{filesetMeta.deletedAt}" - + " )") + @InsertProvider(type = FilesetMetaSQLProviderFactory.class, method = "insertFilesetMeta") void insertFilesetMeta(@Param("filesetMeta") FilesetPO filesetPO); - @Insert( - "INSERT INTO " - + META_TABLE_NAME - + "(fileset_id, fileset_name, metalake_id," - + " catalog_id, schema_id, type, audit_info," - + " current_version, last_version, deleted_at)" - + " VALUES(" - + " #{filesetMeta.filesetId}," - + " #{filesetMeta.filesetName}," - + " #{filesetMeta.metalakeId}," - + " #{filesetMeta.catalogId}," - + " #{filesetMeta.schemaId}," - + " #{filesetMeta.type}," - + " #{filesetMeta.auditInfo}," - + " #{filesetMeta.currentVersion}," - + " #{filesetMeta.lastVersion}," - + " #{filesetMeta.deletedAt}" - + " )" - + " ON DUPLICATE KEY UPDATE" - + " fileset_name = #{filesetMeta.filesetName}," - + " metalake_id = #{filesetMeta.metalakeId}," - + " catalog_id = #{filesetMeta.catalogId}," - + " schema_id = #{filesetMeta.schemaId}," - + " type = #{filesetMeta.type}," - + " audit_info = #{filesetMeta.auditInfo}," - + " current_version = #{filesetMeta.currentVersion}," - + " last_version = #{filesetMeta.lastVersion}," - + " deleted_at = #{filesetMeta.deletedAt}") + @InsertProvider( + type = FilesetMetaSQLProviderFactory.class, + method = "insertFilesetMetaOnDuplicateKeyUpdate") void insertFilesetMetaOnDuplicateKeyUpdate(@Param("filesetMeta") FilesetPO filesetPO); - @Update( - "UPDATE " - + META_TABLE_NAME - + " SET fileset_name = #{newFilesetMeta.filesetName}," - + " metalake_id = #{newFilesetMeta.metalakeId}," - + " catalog_id = #{newFilesetMeta.catalogId}," - + " schema_id = #{newFilesetMeta.schemaId}," - + " type = #{newFilesetMeta.type}," - + " audit_info = #{newFilesetMeta.auditInfo}," - + " current_version = #{newFilesetMeta.currentVersion}," - + " last_version = #{newFilesetMeta.lastVersion}," - + " deleted_at = #{newFilesetMeta.deletedAt}" - + " WHERE fileset_id = #{oldFilesetMeta.filesetId}" - + " AND fileset_name = #{oldFilesetMeta.filesetName}" - + " AND metalake_id = #{oldFilesetMeta.metalakeId}" - + " AND catalog_id = #{oldFilesetMeta.catalogId}" - + " AND schema_id = #{oldFilesetMeta.schemaId}" - + " AND type = #{oldFilesetMeta.type}" - + " AND audit_info = #{oldFilesetMeta.auditInfo}" - + " AND current_version = #{oldFilesetMeta.currentVersion}" - + " AND last_version = #{oldFilesetMeta.lastVersion}" - + " AND deleted_at = 0") + @UpdateProvider(type = FilesetMetaSQLProviderFactory.class, method = "updateFilesetMeta") Integer updateFilesetMeta( @Param("newFilesetMeta") FilesetPO newFilesetPO, @Param("oldFilesetMeta") FilesetPO oldFilesetPO); - @Update( - "UPDATE " - + META_TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0") + @UpdateProvider( + type = FilesetMetaSQLProviderFactory.class, + method = "softDeleteFilesetMetasByMetalakeId") Integer softDeleteFilesetMetasByMetalakeId(@Param("metalakeId") Long metalakeId); - @Update( - "UPDATE " - + META_TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE catalog_id = #{catalogId} AND deleted_at = 0") + @UpdateProvider( + type = FilesetMetaSQLProviderFactory.class, + method = "softDeleteFilesetMetasByCatalogId") Integer softDeleteFilesetMetasByCatalogId(@Param("catalogId") Long catalogId); - @Update( - "UPDATE " - + META_TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE schema_id = #{schemaId} AND deleted_at = 0") + @UpdateProvider( + type = FilesetMetaSQLProviderFactory.class, + method = "softDeleteFilesetMetasBySchemaId") Integer softDeleteFilesetMetasBySchemaId(@Param("schemaId") Long schemaId); - @Update( - "UPDATE " - + META_TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE fileset_id = #{filesetId} AND deleted_at = 0") + @UpdateProvider( + type = FilesetMetaSQLProviderFactory.class, + method = "softDeleteFilesetMetasByFilesetId") Integer softDeleteFilesetMetasByFilesetId(@Param("filesetId") Long filesetId); - @Delete( - "DELETE FROM " - + META_TABLE_NAME - + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}") + @DeleteProvider( + type = FilesetMetaSQLProviderFactory.class, + method = "deleteFilesetMetasByLegacyTimeline") Integer deleteFilesetMetasByLegacyTimeline( @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit); } diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaSQLProviderFactory.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaSQLProviderFactory.java new file mode 100644 index 00000000000..36ea94d5862 --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetMetaSQLProviderFactory.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.storage.relational.mapper; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType; +import org.apache.gravitino.storage.relational.po.FilesetPO; +import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper; +import org.apache.ibatis.annotations.Param; + +public class FilesetMetaSQLProviderFactory { + private static final Map + METALAKE_META_SQL_PROVIDER_MAP = + ImmutableMap.of( + JDBCBackendType.MYSQL, new FilesetMetaMySQLProvider(), + JDBCBackendType.H2, new FilesetMetaH2Provider()); + + public static FilesetMetaBaseSQLProvider getProvider() { + String databaseId = + SqlSessionFactoryHelper.getInstance() + .getSqlSessionFactory() + .getConfiguration() + .getDatabaseId(); + + JDBCBackendType jdbcBackendType = JDBCBackendType.fromString(databaseId); + return METALAKE_META_SQL_PROVIDER_MAP.get(jdbcBackendType); + } + + static class FilesetMetaMySQLProvider extends FilesetMetaBaseSQLProvider {} + + static class FilesetMetaH2Provider extends FilesetMetaBaseSQLProvider {} + + public static String listFilesetPOsBySchemaId(@Param("schemaId") Long schemaId) { + return getProvider().listFilesetPOsBySchemaId(schemaId); + } + + public static String selectFilesetIdBySchemaIdAndName( + @Param("schemaId") Long schemaId, @Param("filesetName") String name) { + return getProvider().selectFilesetIdBySchemaIdAndName(schemaId, name); + } + + public static String selectFilesetMetaBySchemaIdAndName( + @Param("schemaId") Long schemaId, @Param("filesetName") String name) { + return getProvider().selectFilesetMetaBySchemaIdAndName(schemaId, name); + } + + public static String selectFilesetMetaById(@Param("filesetId") Long filesetId) { + return getProvider().selectFilesetMetaById(filesetId); + } + + public static String insertFilesetMeta(@Param("filesetMeta") FilesetPO filesetPO) { + return getProvider().insertFilesetMeta(filesetPO); + } + + public static String insertFilesetMetaOnDuplicateKeyUpdate( + @Param("filesetMeta") FilesetPO filesetPO) { + return getProvider().insertFilesetMetaOnDuplicateKeyUpdate(filesetPO); + } + + public static String updateFilesetMeta( + @Param("newFilesetMeta") FilesetPO newFilesetPO, + @Param("oldFilesetMeta") FilesetPO oldFilesetPO) { + return getProvider().updateFilesetMeta(newFilesetPO, oldFilesetPO); + } + + public static String softDeleteFilesetMetasByMetalakeId(@Param("metalakeId") Long metalakeId) { + return getProvider().softDeleteFilesetMetasByMetalakeId(metalakeId); + } + + public static String softDeleteFilesetMetasByCatalogId(@Param("catalogId") Long catalogId) { + return getProvider().softDeleteFilesetMetasByCatalogId(catalogId); + } + + public static String softDeleteFilesetMetasBySchemaId(@Param("schemaId") Long schemaId) { + return getProvider().softDeleteFilesetMetasBySchemaId(schemaId); + } + + public String softDeleteFilesetMetasByFilesetId(@Param("filesetId") Long filesetId) { + return getProvider().softDeleteFilesetMetasByFilesetId(filesetId); + } + + public String deleteFilesetMetasByLegacyTimeline( + @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) { + return getProvider().deleteFilesetMetasByLegacyTimeline(legacyTimeline, limit); + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetVersionBaseSQLProvider.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetVersionBaseSQLProvider.java new file mode 100644 index 00000000000..f6ab85b38be --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetVersionBaseSQLProvider.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.storage.relational.mapper; + +import static org.apache.gravitino.storage.relational.mapper.FilesetVersionMapper.VERSION_TABLE_NAME; + +import org.apache.gravitino.storage.relational.po.FilesetVersionPO; +import org.apache.ibatis.annotations.Param; + +public class FilesetVersionBaseSQLProvider { + public String insertFilesetVersion(@Param("filesetVersion") FilesetVersionPO filesetVersionPO) { + return "INSERT INTO " + + VERSION_TABLE_NAME + + "(metalake_id, catalog_id, schema_id, fileset_id," + + " version, fileset_comment, properties, storage_location," + + " deleted_at)" + + " VALUES(" + + " #{filesetVersion.metalakeId}," + + " #{filesetVersion.catalogId}," + + " #{filesetVersion.schemaId}," + + " #{filesetVersion.filesetId}," + + " #{filesetVersion.version}," + + " #{filesetVersion.filesetComment}," + + " #{filesetVersion.properties}," + + " #{filesetVersion.storageLocation}," + + " #{filesetVersion.deletedAt}" + + " )"; + } + + public String insertFilesetVersionOnDuplicateKeyUpdate( + @Param("filesetVersion") FilesetVersionPO filesetVersionPO) { + return "INSERT INTO " + + VERSION_TABLE_NAME + + "(metalake_id, catalog_id, schema_id, fileset_id," + + " version, fileset_comment, properties, storage_location," + + " deleted_at)" + + " VALUES(" + + " #{filesetVersion.metalakeId}," + + " #{filesetVersion.catalogId}," + + " #{filesetVersion.schemaId}," + + " #{filesetVersion.filesetId}," + + " #{filesetVersion.version}," + + " #{filesetVersion.filesetComment}," + + " #{filesetVersion.properties}," + + " #{filesetVersion.storageLocation}," + + " #{filesetVersion.deletedAt}" + + " )" + + " ON DUPLICATE KEY UPDATE" + + " metalake_id = #{filesetVersion.metalakeId}," + + " catalog_id = #{filesetVersion.catalogId}," + + " schema_id = #{filesetVersion.schemaId}," + + " fileset_id = #{filesetVersion.filesetId}," + + " version = #{filesetVersion.version}," + + " fileset_comment = #{filesetVersion.filesetComment}," + + " properties = #{filesetVersion.properties}," + + " storage_location = #{filesetVersion.storageLocation}," + + " deleted_at = #{filesetVersion.deletedAt}"; + } + + public String softDeleteFilesetVersionsByMetalakeId(@Param("metalakeId") Long metalakeId) { + return "UPDATE " + + VERSION_TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0"; + } + + public String softDeleteFilesetVersionsByCatalogId(@Param("catalogId") Long catalogId) { + return "UPDATE " + + VERSION_TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE catalog_id = #{catalogId} AND deleted_at = 0"; + } + + public String softDeleteFilesetVersionsBySchemaId(@Param("schemaId") Long schemaId) { + return "UPDATE " + + VERSION_TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE schema_id = #{schemaId} AND deleted_at = 0"; + } + + public String softDeleteFilesetVersionsByFilesetId(@Param("filesetId") Long filesetId) { + return "UPDATE " + + VERSION_TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE fileset_id = #{filesetId} AND deleted_at = 0"; + } + + public String deleteFilesetVersionsByLegacyTimeline( + @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) { + return "DELETE FROM " + + VERSION_TABLE_NAME + + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}"; + } + + public String selectFilesetVersionsByRetentionCount( + @Param("versionRetentionCount") Long versionRetentionCount) { + return "SELECT fileset_id as filesetId," + + " Max(version) as version" + + " FROM " + + VERSION_TABLE_NAME + + " WHERE version > #{versionRetentionCount} AND deleted_at = 0" + + " GROUP BY fileset_id"; + } + + public String softDeleteFilesetVersionsByRetentionLine( + @Param("filesetId") Long filesetId, + @Param("versionRetentionLine") long versionRetentionLine, + @Param("limit") int limit) { + return "UPDATE " + + VERSION_TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE fileset_id = #{filesetId} AND version <= #{versionRetentionLine} AND deleted_at = 0 LIMIT #{limit}"; + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetVersionMapper.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetVersionMapper.java index cd4bfb9b613..09eca4b90e0 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetVersionMapper.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetVersionMapper.java @@ -22,11 +22,11 @@ import java.util.List; import org.apache.gravitino.storage.relational.po.FilesetMaxVersionPO; import org.apache.gravitino.storage.relational.po.FilesetVersionPO; -import org.apache.ibatis.annotations.Delete; -import org.apache.ibatis.annotations.Insert; +import org.apache.ibatis.annotations.DeleteProvider; +import org.apache.ibatis.annotations.InsertProvider; import org.apache.ibatis.annotations.Param; -import org.apache.ibatis.annotations.Select; -import org.apache.ibatis.annotations.Update; +import org.apache.ibatis.annotations.SelectProvider; +import org.apache.ibatis.annotations.UpdateProvider; /** * A MyBatis Mapper for fileset version info operation SQLs. @@ -39,110 +39,50 @@ public interface FilesetVersionMapper { String VERSION_TABLE_NAME = "fileset_version_info"; - @Insert( - "INSERT INTO " - + VERSION_TABLE_NAME - + "(metalake_id, catalog_id, schema_id, fileset_id," - + " version, fileset_comment, properties, storage_location," - + " deleted_at)" - + " VALUES(" - + " #{filesetVersion.metalakeId}," - + " #{filesetVersion.catalogId}," - + " #{filesetVersion.schemaId}," - + " #{filesetVersion.filesetId}," - + " #{filesetVersion.version}," - + " #{filesetVersion.filesetComment}," - + " #{filesetVersion.properties}," - + " #{filesetVersion.storageLocation}," - + " #{filesetVersion.deletedAt}" - + " )") + @InsertProvider(type = FilesetVersionSQLProviderFactory.class, method = "insertFilesetVersion") void insertFilesetVersion(@Param("filesetVersion") FilesetVersionPO filesetVersionPO); - @Insert( - "INSERT INTO " - + VERSION_TABLE_NAME - + "(metalake_id, catalog_id, schema_id, fileset_id," - + " version, fileset_comment, properties, storage_location," - + " deleted_at)" - + " VALUES(" - + " #{filesetVersion.metalakeId}," - + " #{filesetVersion.catalogId}," - + " #{filesetVersion.schemaId}," - + " #{filesetVersion.filesetId}," - + " #{filesetVersion.version}," - + " #{filesetVersion.filesetComment}," - + " #{filesetVersion.properties}," - + " #{filesetVersion.storageLocation}," - + " #{filesetVersion.deletedAt}" - + " )" - + " ON DUPLICATE KEY UPDATE" - + " metalake_id = #{filesetVersion.metalakeId}," - + " catalog_id = #{filesetVersion.catalogId}," - + " schema_id = #{filesetVersion.schemaId}," - + " fileset_id = #{filesetVersion.filesetId}," - + " version = #{filesetVersion.version}," - + " fileset_comment = #{filesetVersion.filesetComment}," - + " properties = #{filesetVersion.properties}," - + " storage_location = #{filesetVersion.storageLocation}," - + " deleted_at = #{filesetVersion.deletedAt}") + @InsertProvider( + type = FilesetVersionSQLProviderFactory.class, + method = "insertFilesetVersionOnDuplicateKeyUpdate") void insertFilesetVersionOnDuplicateKeyUpdate( @Param("filesetVersion") FilesetVersionPO filesetVersionPO); - @Update( - "UPDATE " - + VERSION_TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0") + @UpdateProvider( + type = FilesetVersionSQLProviderFactory.class, + method = "softDeleteFilesetVersionsByMetalakeId") Integer softDeleteFilesetVersionsByMetalakeId(@Param("metalakeId") Long metalakeId); - @Update( - "UPDATE " - + VERSION_TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE catalog_id = #{catalogId} AND deleted_at = 0") + @UpdateProvider( + type = FilesetVersionSQLProviderFactory.class, + method = "softDeleteFilesetVersionsByCatalogId") Integer softDeleteFilesetVersionsByCatalogId(@Param("catalogId") Long catalogId); - @Update( - "UPDATE " - + VERSION_TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE schema_id = #{schemaId} AND deleted_at = 0") + @UpdateProvider( + type = FilesetVersionSQLProviderFactory.class, + method = "softDeleteFilesetVersionsBySchemaId") Integer softDeleteFilesetVersionsBySchemaId(@Param("schemaId") Long schemaId); - @Update( - "UPDATE " - + VERSION_TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE fileset_id = #{filesetId} AND deleted_at = 0") + @UpdateProvider( + type = FilesetVersionSQLProviderFactory.class, + method = "softDeleteFilesetVersionsByFilesetId") Integer softDeleteFilesetVersionsByFilesetId(@Param("filesetId") Long filesetId); - @Delete( - "DELETE FROM " - + VERSION_TABLE_NAME - + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}") + @DeleteProvider( + type = FilesetVersionSQLProviderFactory.class, + method = "deleteFilesetVersionsByLegacyTimeline") Integer deleteFilesetVersionsByLegacyTimeline( @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit); - @Select( - "SELECT fileset_id as filesetId," - + " Max(version) as version" - + " FROM " - + VERSION_TABLE_NAME - + " WHERE version > #{versionRetentionCount} AND deleted_at = 0" - + " GROUP BY fileset_id") + @SelectProvider( + type = FilesetVersionSQLProviderFactory.class, + method = "selectFilesetVersionsByRetentionCount") List selectFilesetVersionsByRetentionCount( @Param("versionRetentionCount") Long versionRetentionCount); - @Update( - "UPDATE " - + VERSION_TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE fileset_id = #{filesetId} AND version <= #{versionRetentionLine} AND deleted_at = 0 LIMIT #{limit}") + @UpdateProvider( + type = FilesetVersionSQLProviderFactory.class, + method = "softDeleteFilesetVersionsByRetentionLine") Integer softDeleteFilesetVersionsByRetentionLine( @Param("filesetId") Long filesetId, @Param("versionRetentionLine") long versionRetentionLine, diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetVersionSQLProviderFactory.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetVersionSQLProviderFactory.java new file mode 100644 index 00000000000..163f2c882fe --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/FilesetVersionSQLProviderFactory.java @@ -0,0 +1,93 @@ +package org.apache.gravitino.storage.relational.mapper; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType; +import org.apache.gravitino.storage.relational.po.FilesetVersionPO; +import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper; +import org.apache.ibatis.annotations.Param; + +public class FilesetVersionSQLProviderFactory { + private static final Map + METALAKE_META_SQL_PROVIDER_MAP = + ImmutableMap.of( + JDBCBackendType.MYSQL, new FilesetVersionMySQLProvider(), + JDBCBackendType.H2, new FilesetVersionH2Provider()); + + public static FilesetVersionBaseSQLProvider getProvider() { + String databaseId = + SqlSessionFactoryHelper.getInstance() + .getSqlSessionFactory() + .getConfiguration() + .getDatabaseId(); + + JDBCBackendType jdbcBackendType = JDBCBackendType.fromString(databaseId); + return METALAKE_META_SQL_PROVIDER_MAP.get(jdbcBackendType); + } + + static class FilesetVersionMySQLProvider extends FilesetVersionBaseSQLProvider {} + + static class FilesetVersionH2Provider extends FilesetVersionBaseSQLProvider {} + + public static String insertFilesetVersion( + @Param("filesetVersion") FilesetVersionPO filesetVersionPO) { + return getProvider().insertFilesetVersion(filesetVersionPO); + } + + public static String insertFilesetVersionOnDuplicateKeyUpdate( + @Param("filesetVersion") FilesetVersionPO filesetVersionPO) { + return getProvider().insertFilesetVersionOnDuplicateKeyUpdate(filesetVersionPO); + } + + public static String softDeleteFilesetVersionsByMetalakeId(@Param("metalakeId") Long metalakeId) { + return getProvider().softDeleteFilesetVersionsByMetalakeId(metalakeId); + } + + public static String softDeleteFilesetVersionsByCatalogId(@Param("catalogId") Long catalogId) { + return getProvider().softDeleteFilesetVersionsByCatalogId(catalogId); + } + + public static String softDeleteFilesetVersionsBySchemaId(@Param("schemaId") Long schemaId) { + return getProvider().softDeleteFilesetVersionsBySchemaId(schemaId); + } + + public static String softDeleteFilesetVersionsByFilesetId(@Param("filesetId") Long filesetId) { + return getProvider().softDeleteFilesetVersionsByFilesetId(filesetId); + } + + public static String deleteFilesetVersionsByLegacyTimeline( + @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) { + return getProvider().deleteFilesetVersionsByLegacyTimeline(legacyTimeline, limit); + } + + public static String selectFilesetVersionsByRetentionCount( + @Param("versionRetentionCount") Long versionRetentionCount) { + return getProvider().selectFilesetVersionsByRetentionCount(versionRetentionCount); + } + + public static String softDeleteFilesetVersionsByRetentionLine( + @Param("filesetId") Long filesetId, + @Param("versionRetentionLine") long versionRetentionLine, + @Param("limit") int limit) { + return getProvider() + .softDeleteFilesetVersionsByRetentionLine(filesetId, versionRetentionLine, limit); + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaBaseSQLProvider.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaBaseSQLProvider.java new file mode 100644 index 00000000000..a782ddb8f0f --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaBaseSQLProvider.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.storage.relational.mapper; + +import static org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper.TABLE_NAME; + +import org.apache.gravitino.storage.relational.po.MetalakePO; +import org.apache.ibatis.annotations.Param; + +public class MetalakeMetaBaseSQLProvider { + + public String listMetalakePOs() { + return "SELECT metalake_id as metalakeId, metalake_name as metalakeName," + + " metalake_comment as metalakeComment, properties," + + " audit_info as auditInfo, schema_version as schemaVersion," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE deleted_at = 0"; + } + + public String selectMetalakeMetaByName(@Param("metalakeName") String metalakeName) { + return "SELECT metalake_id as metalakeId, metalake_name as metalakeName," + + " metalake_comment as metalakeComment, properties," + + " audit_info as auditInfo, schema_version as schemaVersion," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE metalake_name = #{metalakeName} and deleted_at = 0"; + } + + public String selectMetalakeMetaById(@Param("metalakeId") Long metalakeId) { + return "SELECT metalake_id as metalakeId, metalake_name as metalakeName," + + " metalake_comment as metalakeComment, properties," + + " audit_info as auditInfo, schema_version as schemaVersion," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE metalake_id = #{metalakeId} and deleted_at = 0"; + } + + public String selectMetalakeIdMetaByName(@Param("metalakeName") String metalakeName) { + return "SELECT metalake_id as metalakeId" + + " FROM " + + TABLE_NAME + + " WHERE metalake_name = #{metalakeName} and deleted_at = 0"; + } + + public String insertMetalakeMeta(@Param("metalakeMeta") MetalakePO metalakePO) { + return "INSERT INTO " + + TABLE_NAME + + "(metalake_id, metalake_name, metalake_comment, properties, audit_info," + + " schema_version, current_version, last_version, deleted_at)" + + " VALUES(" + + " #{metalakeMeta.metalakeId}," + + " #{metalakeMeta.metalakeName}," + + " #{metalakeMeta.metalakeComment}," + + " #{metalakeMeta.properties}," + + " #{metalakeMeta.auditInfo}," + + " #{metalakeMeta.schemaVersion}," + + " #{metalakeMeta.currentVersion}," + + " #{metalakeMeta.lastVersion}," + + " #{metalakeMeta.deletedAt}" + + " )"; + } + + public String insertMetalakeMetaOnDuplicateKeyUpdate( + @Param("metalakeMeta") MetalakePO metalakePO) { + return "INSERT INTO " + + TABLE_NAME + + "(metalake_id, metalake_name, metalake_comment, properties, audit_info," + + " schema_version, current_version, last_version, deleted_at)" + + " VALUES(" + + " #{metalakeMeta.metalakeId}," + + " #{metalakeMeta.metalakeName}," + + " #{metalakeMeta.metalakeComment}," + + " #{metalakeMeta.properties}," + + " #{metalakeMeta.auditInfo}," + + " #{metalakeMeta.schemaVersion}," + + " #{metalakeMeta.currentVersion}," + + " #{metalakeMeta.lastVersion}," + + " #{metalakeMeta.deletedAt}" + + " )" + + " ON DUPLICATE KEY UPDATE" + + " metalake_name = #{metalakeMeta.metalakeName}," + + " metalake_comment = #{metalakeMeta.metalakeComment}," + + " properties = #{metalakeMeta.properties}," + + " audit_info = #{metalakeMeta.auditInfo}," + + " schema_version = #{metalakeMeta.schemaVersion}," + + " current_version = #{metalakeMeta.currentVersion}," + + " last_version = #{metalakeMeta.lastVersion}," + + " deleted_at = #{metalakeMeta.deletedAt}"; + } + + public String updateMetalakeMeta( + @Param("newMetalakeMeta") MetalakePO newMetalakePO, + @Param("oldMetalakeMeta") MetalakePO oldMetalakePO) { + return "UPDATE " + + TABLE_NAME + + " SET metalake_name = #{newMetalakeMeta.metalakeName}," + + " metalake_comment = #{newMetalakeMeta.metalakeComment}," + + " properties = #{newMetalakeMeta.properties}," + + " audit_info = #{newMetalakeMeta.auditInfo}," + + " schema_version = #{newMetalakeMeta.schemaVersion}," + + " current_version = #{newMetalakeMeta.currentVersion}," + + " last_version = #{newMetalakeMeta.lastVersion}" + + " WHERE metalake_id = #{oldMetalakeMeta.metalakeId}" + + " AND metalake_name = #{oldMetalakeMeta.metalakeName}" + + " AND (metalake_comment = #{oldMetalakeMeta.metalakeComment} " + + " OR (metalake_comment IS NULL and #{oldMetalakeMeta.metalakeComment} IS NULL))" + + " AND properties = #{oldMetalakeMeta.properties}" + + " AND audit_info = #{oldMetalakeMeta.auditInfo}" + + " AND schema_version = #{oldMetalakeMeta.schemaVersion}" + + " AND current_version = #{oldMetalakeMeta.currentVersion}" + + " AND last_version = #{oldMetalakeMeta.lastVersion}" + + " AND deleted_at = 0"; + } + + public String softDeleteMetalakeMetaByMetalakeId(@Param("metalakeId") Long metalakeId) { + return "UPDATE " + + TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0"; + } + + public String deleteMetalakeMetasByLegacyTimeline( + @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) { + return "DELETE FROM " + + TABLE_NAME + + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}"; + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaMapper.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaMapper.java index 89f8d13ceb1..d5dc809bfe2 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaMapper.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaMapper.java @@ -21,11 +21,11 @@ import java.util.List; import org.apache.gravitino.storage.relational.po.MetalakePO; -import org.apache.ibatis.annotations.Delete; -import org.apache.ibatis.annotations.Insert; +import org.apache.ibatis.annotations.DeleteProvider; +import org.apache.ibatis.annotations.InsertProvider; import org.apache.ibatis.annotations.Param; -import org.apache.ibatis.annotations.Select; -import org.apache.ibatis.annotations.Update; +import org.apache.ibatis.annotations.SelectProvider; +import org.apache.ibatis.annotations.UpdateProvider; /** * A MyBatis Mapper for metalake meta operation SQLs. @@ -38,126 +38,41 @@ public interface MetalakeMetaMapper { String TABLE_NAME = "metalake_meta"; - @Select( - "SELECT metalake_id as metalakeId, metalake_name as metalakeName," - + " metalake_comment as metalakeComment, properties," - + " audit_info as auditInfo, schema_version as schemaVersion," - + " current_version as currentVersion, last_version as lastVersion," - + " deleted_at as deletedAt" - + " FROM " - + TABLE_NAME - + " WHERE deleted_at = 0") + @SelectProvider(type = MetalakeMetaSQLProviderFactory.class, method = "listMetalakePOs") List listMetalakePOs(); - @Select( - "SELECT metalake_id as metalakeId, metalake_name as metalakeName," - + " metalake_comment as metalakeComment, properties," - + " audit_info as auditInfo, schema_version as schemaVersion," - + " current_version as currentVersion, last_version as lastVersion," - + " deleted_at as deletedAt" - + " FROM " - + TABLE_NAME - + " WHERE metalake_name = #{metalakeName} and deleted_at = 0") + @SelectProvider(type = MetalakeMetaSQLProviderFactory.class, method = "selectMetalakeMetaByName") MetalakePO selectMetalakeMetaByName(@Param("metalakeName") String name); - @Select( - "SELECT metalake_id as metalakeId, metalake_name as metalakeName," - + " metalake_comment as metalakeComment, properties," - + " audit_info as auditInfo, schema_version as schemaVersion," - + " current_version as currentVersion, last_version as lastVersion," - + " deleted_at as deletedAt" - + " FROM " - + TABLE_NAME - + " WHERE metalake_id = #{metalakeId} and deleted_at = 0") + @SelectProvider(type = MetalakeMetaSQLProviderFactory.class, method = "selectMetalakeMetaById") MetalakePO selectMetalakeMetaById(@Param("metalakeId") Long metalakeId); - @Select( - "SELECT metalake_id FROM " - + TABLE_NAME - + " WHERE metalake_name = #{metalakeName} and deleted_at = 0") + @SelectProvider( + type = MetalakeMetaSQLProviderFactory.class, + method = "selectMetalakeIdMetaByName") Long selectMetalakeIdMetaByName(@Param("metalakeName") String name); - @Insert( - "INSERT INTO " - + TABLE_NAME - + "(metalake_id, metalake_name, metalake_comment, properties, audit_info," - + " schema_version, current_version, last_version, deleted_at)" - + " VALUES(" - + " #{metalakeMeta.metalakeId}," - + " #{metalakeMeta.metalakeName}," - + " #{metalakeMeta.metalakeComment}," - + " #{metalakeMeta.properties}," - + " #{metalakeMeta.auditInfo}," - + " #{metalakeMeta.schemaVersion}," - + " #{metalakeMeta.currentVersion}," - + " #{metalakeMeta.lastVersion}," - + " #{metalakeMeta.deletedAt}" - + " )") + @InsertProvider(type = MetalakeMetaSQLProviderFactory.class, method = "insertMetalakeMeta") void insertMetalakeMeta(@Param("metalakeMeta") MetalakePO metalakePO); - @Insert( - "INSERT INTO " - + TABLE_NAME - + "(metalake_id, metalake_name, metalake_comment, properties, audit_info," - + " schema_version, current_version, last_version, deleted_at)" - + " VALUES(" - + " #{metalakeMeta.metalakeId}," - + " #{metalakeMeta.metalakeName}," - + " #{metalakeMeta.metalakeComment}," - + " #{metalakeMeta.properties}," - + " #{metalakeMeta.auditInfo}," - + " #{metalakeMeta.schemaVersion}," - + " #{metalakeMeta.currentVersion}," - + " #{metalakeMeta.lastVersion}," - + " #{metalakeMeta.deletedAt}" - + " )" - + " ON DUPLICATE KEY UPDATE" - + " metalake_name = #{metalakeMeta.metalakeName}," - + " metalake_comment = #{metalakeMeta.metalakeComment}," - + " properties = #{metalakeMeta.properties}," - + " audit_info = #{metalakeMeta.auditInfo}," - + " schema_version = #{metalakeMeta.schemaVersion}," - + " current_version = #{metalakeMeta.currentVersion}," - + " last_version = #{metalakeMeta.lastVersion}," - + " deleted_at = #{metalakeMeta.deletedAt}") + @InsertProvider( + type = MetalakeMetaSQLProviderFactory.class, + method = "insertMetalakeMetaOnDuplicateKeyUpdate") void insertMetalakeMetaOnDuplicateKeyUpdate(@Param("metalakeMeta") MetalakePO metalakePO); - @Update( - "UPDATE " - + TABLE_NAME - + " SET metalake_name = #{newMetalakeMeta.metalakeName}," - + " metalake_comment = #{newMetalakeMeta.metalakeComment}," - + " properties = #{newMetalakeMeta.properties}," - + " audit_info = #{newMetalakeMeta.auditInfo}," - + " schema_version = #{newMetalakeMeta.schemaVersion}," - + " current_version = #{newMetalakeMeta.currentVersion}," - + " last_version = #{newMetalakeMeta.lastVersion}" - + " WHERE metalake_id = #{oldMetalakeMeta.metalakeId}" - + " AND metalake_name = #{oldMetalakeMeta.metalakeName}" - + " AND (metalake_comment = #{oldMetalakeMeta.metalakeComment} " - + " OR (metalake_comment IS NULL and #{oldMetalakeMeta.metalakeComment} IS NULL))" - + " AND properties = #{oldMetalakeMeta.properties}" - + " AND audit_info = #{oldMetalakeMeta.auditInfo}" - + " AND schema_version = #{oldMetalakeMeta.schemaVersion}" - + " AND current_version = #{oldMetalakeMeta.currentVersion}" - + " AND last_version = #{oldMetalakeMeta.lastVersion}" - + " AND deleted_at = 0") + @UpdateProvider(type = MetalakeMetaSQLProviderFactory.class, method = "updateMetalakeMeta") Integer updateMetalakeMeta( @Param("newMetalakeMeta") MetalakePO newMetalakePO, @Param("oldMetalakeMeta") MetalakePO oldMetalakePO); - @Update( - "UPDATE " - + TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0") + @UpdateProvider( + type = MetalakeMetaSQLProviderFactory.class, + method = "softDeleteMetalakeMetaByMetalakeId") Integer softDeleteMetalakeMetaByMetalakeId(@Param("metalakeId") Long metalakeId); - @Delete( - "DELETE FROM " - + TABLE_NAME - + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}") + @DeleteProvider( + type = MetalakeMetaSQLProviderFactory.class, + method = "deleteMetalakeMetasByLegacyTimeline") Integer deleteMetalakeMetasByLegacyTimeline( @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit); } diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaSQLProviderFactory.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaSQLProviderFactory.java new file mode 100644 index 00000000000..e28cbc9d774 --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/MetalakeMetaSQLProviderFactory.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.storage.relational.mapper; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType; +import org.apache.gravitino.storage.relational.po.MetalakePO; +import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper; +import org.apache.ibatis.annotations.Param; + +/** SQL Provider for Metalake Meta operations. */ +public class MetalakeMetaSQLProviderFactory { + private static final Map + METALAKE_META_SQL_PROVIDER_MAP = + ImmutableMap.of( + JDBCBackendType.MYSQL, new MetalakeMetaMySQLProvider(), + JDBCBackendType.H2, new MetalakeMetaH2Provider()); + + public static MetalakeMetaBaseSQLProvider getProvider() { + String databaseId = + SqlSessionFactoryHelper.getInstance() + .getSqlSessionFactory() + .getConfiguration() + .getDatabaseId(); + + JDBCBackendType jdbcBackendType = JDBCBackendType.fromString(databaseId); + return METALAKE_META_SQL_PROVIDER_MAP.get(jdbcBackendType); + } + + static class MetalakeMetaMySQLProvider extends MetalakeMetaBaseSQLProvider {} + + static class MetalakeMetaH2Provider extends MetalakeMetaBaseSQLProvider {} + + public String listMetalakePOs() { + return getProvider().listMetalakePOs(); + } + + public static String selectMetalakeMetaByName(@Param("metalakeName") String metalakeName) { + return getProvider().selectMetalakeMetaByName(metalakeName); + } + + public static String selectMetalakeMetaById(@Param("metalakeId") Long metalakeId) { + return getProvider().selectMetalakeMetaById(metalakeId); + } + + public static String selectMetalakeIdMetaByName(@Param("metalakeName") String metalakeName) { + return getProvider().selectMetalakeIdMetaByName(metalakeName); + } + + public static String insertMetalakeMeta(@Param("metalakeMeta") MetalakePO metalakePO) { + return getProvider().insertMetalakeMeta(metalakePO); + } + + public static String insertMetalakeMetaOnDuplicateKeyUpdate( + @Param("metalakeMeta") MetalakePO metalakePO) { + return getProvider().insertMetalakeMetaOnDuplicateKeyUpdate(metalakePO); + } + + public static String updateMetalakeMeta( + @Param("newMetalakeMeta") MetalakePO newMetalakePO, + @Param("oldMetalakeMeta") MetalakePO oldMetalakePO) { + return getProvider().updateMetalakeMeta(newMetalakePO, oldMetalakePO); + } + + public static String softDeleteMetalakeMetaByMetalakeId(@Param("metalakeId") Long metalakeId) { + return getProvider().softDeleteMetalakeMetaByMetalakeId(metalakeId); + } + + public static String deleteMetalakeMetasByLegacyTimeline( + @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) { + return getProvider().deleteMetalakeMetasByLegacyTimeline(legacyTimeline, limit); + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaBaseSQLProvider.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaBaseSQLProvider.java new file mode 100644 index 00000000000..056b8d2b581 --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaBaseSQLProvider.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.storage.relational.mapper; + +import static org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper.TABLE_NAME; + +import org.apache.gravitino.storage.relational.po.SchemaPO; +import org.apache.ibatis.annotations.Param; + +public class SchemaMetaBaseSQLProvider { + public String listSchemaPOsByCatalogId(@Param("catalogId") Long catalogId) { + return "SELECT schema_id as schemaId, schema_name as schemaName," + + " metalake_id as metalakeId, catalog_id as catalogId," + + " schema_comment as schemaComment, properties, audit_info as auditInfo," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE catalog_id = #{catalogId} AND deleted_at = 0"; + } + + public String selectSchemaIdByCatalogIdAndName( + @Param("catalogId") Long catalogId, @Param("schemaName") String name) { + return "SELECT schema_id as schemaId FROM " + + TABLE_NAME + + " WHERE catalog_id = #{catalogId} AND schema_name = #{schemaName}" + + " AND deleted_at = 0"; + } + + public String selectSchemaMetaByCatalogIdAndName( + @Param("catalogId") Long catalogId, @Param("schemaName") String name) { + return "SELECT schema_id as schemaId, schema_name as schemaName," + + " metalake_id as metalakeId, catalog_id as catalogId," + + " schema_comment as schemaComment, properties, audit_info as auditInfo," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE catalog_id = #{catalogId} AND schema_name = #{schemaName} AND deleted_at = 0"; + } + + public String selectSchemaMetaById(@Param("schemaId") Long schemaId) { + return "SELECT schema_id as schemaId, schema_name as schemaName," + + " metalake_id as metalakeId, catalog_id as catalogId," + + " schema_comment as schemaComment, properties, audit_info as auditInfo," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE schema_id = #{schemaId} AND deleted_at = 0"; + } + + public String insertSchemaMeta(@Param("schemaMeta") SchemaPO schemaPO) { + return "INSERT INTO " + + TABLE_NAME + + "(schema_id, schema_name, metalake_id," + + " catalog_id, schema_comment, properties, audit_info," + + " current_version, last_version, deleted_at)" + + " VALUES(" + + " #{schemaMeta.schemaId}," + + " #{schemaMeta.schemaName}," + + " #{schemaMeta.metalakeId}," + + " #{schemaMeta.catalogId}," + + " #{schemaMeta.schemaComment}," + + " #{schemaMeta.properties}," + + " #{schemaMeta.auditInfo}," + + " #{schemaMeta.currentVersion}," + + " #{schemaMeta.lastVersion}," + + " #{schemaMeta.deletedAt}" + + " )"; + } + + public String insertSchemaMetaOnDuplicateKeyUpdate(@Param("schemaMeta") SchemaPO schemaPO) { + return "INSERT INTO " + + TABLE_NAME + + "(schema_id, schema_name, metalake_id," + + " catalog_id, schema_comment, properties, audit_info," + + " current_version, last_version, deleted_at)" + + " VALUES(" + + " #{schemaMeta.schemaId}," + + " #{schemaMeta.schemaName}," + + " #{schemaMeta.metalakeId}," + + " #{schemaMeta.catalogId}," + + " #{schemaMeta.schemaComment}," + + " #{schemaMeta.properties}," + + " #{schemaMeta.auditInfo}," + + " #{schemaMeta.currentVersion}," + + " #{schemaMeta.lastVersion}," + + " #{schemaMeta.deletedAt}" + + " )" + + " ON DUPLICATE KEY UPDATE" + + " schema_name = #{schemaMeta.schemaName}," + + " metalake_id = #{schemaMeta.metalakeId}," + + " catalog_id = #{schemaMeta.catalogId}," + + " schema_comment = #{schemaMeta.schemaComment}," + + " properties = #{schemaMeta.properties}," + + " audit_info = #{schemaMeta.auditInfo}," + + " current_version = #{schemaMeta.currentVersion}," + + " last_version = #{schemaMeta.lastVersion}," + + " deleted_at = #{schemaMeta.deletedAt}"; + } + + public String updateSchemaMeta( + @Param("newSchemaMeta") SchemaPO newSchemaPO, @Param("oldSchemaMeta") SchemaPO oldSchemaPO) { + return "UPDATE " + + TABLE_NAME + + " SET schema_name = #{newSchemaMeta.schemaName}," + + " metalake_id = #{newSchemaMeta.metalakeId}," + + " catalog_id = #{newSchemaMeta.catalogId}," + + " schema_comment = #{newSchemaMeta.schemaComment}," + + " properties = #{newSchemaMeta.properties}," + + " audit_info = #{newSchemaMeta.auditInfo}," + + " current_version = #{newSchemaMeta.currentVersion}," + + " last_version = #{newSchemaMeta.lastVersion}," + + " deleted_at = #{newSchemaMeta.deletedAt}" + + " WHERE schema_id = #{oldSchemaMeta.schemaId}" + + " AND schema_name = #{oldSchemaMeta.schemaName}" + + " AND metalake_id = #{oldSchemaMeta.metalakeId}" + + " AND catalog_id = #{oldSchemaMeta.catalogId}" + + " AND (schema_comment IS NULL OR schema_comment = #{oldSchemaMeta.schemaComment})" + + " AND properties = #{oldSchemaMeta.properties}" + + " AND audit_info = #{oldSchemaMeta.auditInfo}" + + " AND current_version = #{oldSchemaMeta.currentVersion}" + + " AND last_version = #{oldSchemaMeta.lastVersion}" + + " AND deleted_at = 0"; + } + + public String softDeleteSchemaMetasBySchemaId(@Param("schemaId") Long schemaId) { + return "UPDATE " + + TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE schema_id = #{schemaId} AND deleted_at = 0"; + } + + public String softDeleteSchemaMetasByMetalakeId(@Param("metalakeId") Long metalakeId) { + return "UPDATE " + + TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0"; + } + + public String softDeleteSchemaMetasByCatalogId(@Param("catalogId") Long catalogId) { + return "UPDATE " + + TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE catalog_id = #{catalogId} AND deleted_at = 0"; + } + + public String deleteSchemaMetasByLegacyTimeline( + @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) { + return "DELETE FROM " + + TABLE_NAME + + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}"; + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaMapper.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaMapper.java index a71a4b7b6f3..054248dac0c 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaMapper.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaMapper.java @@ -21,11 +21,11 @@ import java.util.List; import org.apache.gravitino.storage.relational.po.SchemaPO; -import org.apache.ibatis.annotations.Delete; -import org.apache.ibatis.annotations.Insert; +import org.apache.ibatis.annotations.DeleteProvider; +import org.apache.ibatis.annotations.InsertProvider; import org.apache.ibatis.annotations.Param; -import org.apache.ibatis.annotations.Select; -import org.apache.ibatis.annotations.Update; +import org.apache.ibatis.annotations.SelectProvider; +import org.apache.ibatis.annotations.UpdateProvider; /** * A MyBatis Mapper for schema meta operation SQLs. @@ -38,151 +38,54 @@ public interface SchemaMetaMapper { String TABLE_NAME = "schema_meta"; - @Select( - "SELECT schema_id as schemaId, schema_name as schemaName," - + " metalake_id as metalakeId, catalog_id as catalogId," - + " schema_comment as schemaComment, properties, audit_info as auditInfo," - + " current_version as currentVersion, last_version as lastVersion," - + " deleted_at as deletedAt" - + " FROM " - + TABLE_NAME - + " WHERE catalog_id = #{catalogId} AND deleted_at = 0") + @SelectProvider(type = SchemaMetaSQLProviderFactory.class, method = "listSchemaPOsByCatalogId") List listSchemaPOsByCatalogId(@Param("catalogId") Long catalogId); - @Select( - "SELECT schema_id as schemaId FROM " - + TABLE_NAME - + " WHERE catalog_id = #{catalogId} AND schema_name = #{schemaName}" - + " AND deleted_at = 0") + @SelectProvider( + type = SchemaMetaSQLProviderFactory.class, + method = "selectSchemaIdByCatalogIdAndName") Long selectSchemaIdByCatalogIdAndName( @Param("catalogId") Long catalogId, @Param("schemaName") String name); - @Select( - "SELECT schema_id as schemaId, schema_name as schemaName," - + " metalake_id as metalakeId, catalog_id as catalogId," - + " schema_comment as schemaComment, properties, audit_info as auditInfo," - + " current_version as currentVersion, last_version as lastVersion," - + " deleted_at as deletedAt" - + " FROM " - + TABLE_NAME - + " WHERE catalog_id = #{catalogId} AND schema_name = #{schemaName} AND deleted_at = 0") + @SelectProvider( + type = SchemaMetaSQLProviderFactory.class, + method = "selectSchemaMetaByCatalogIdAndName") SchemaPO selectSchemaMetaByCatalogIdAndName( @Param("catalogId") Long catalogId, @Param("schemaName") String name); - @Select( - "SELECT schema_id as schemaId, schema_name as schemaName," - + " metalake_id as metalakeId, catalog_id as catalogId," - + " schema_comment as schemaComment, properties, audit_info as auditInfo," - + " current_version as currentVersion, last_version as lastVersion," - + " deleted_at as deletedAt" - + " FROM " - + TABLE_NAME - + " WHERE schema_id = #{schemaId} AND deleted_at = 0") + @SelectProvider(type = SchemaMetaSQLProviderFactory.class, method = "selectSchemaMetaById") SchemaPO selectSchemaMetaById(@Param("schemaId") Long schemaId); - @Insert( - "INSERT INTO " - + TABLE_NAME - + "(schema_id, schema_name, metalake_id," - + " catalog_id, schema_comment, properties, audit_info," - + " current_version, last_version, deleted_at)" - + " VALUES(" - + " #{schemaMeta.schemaId}," - + " #{schemaMeta.schemaName}," - + " #{schemaMeta.metalakeId}," - + " #{schemaMeta.catalogId}," - + " #{schemaMeta.schemaComment}," - + " #{schemaMeta.properties}," - + " #{schemaMeta.auditInfo}," - + " #{schemaMeta.currentVersion}," - + " #{schemaMeta.lastVersion}," - + " #{schemaMeta.deletedAt}" - + " )") + @InsertProvider(type = SchemaMetaSQLProviderFactory.class, method = "insertSchemaMeta") void insertSchemaMeta(@Param("schemaMeta") SchemaPO schemaPO); - @Insert( - "INSERT INTO " - + TABLE_NAME - + "(schema_id, schema_name, metalake_id," - + " catalog_id, schema_comment, properties, audit_info," - + " current_version, last_version, deleted_at)" - + " VALUES(" - + " #{schemaMeta.schemaId}," - + " #{schemaMeta.schemaName}," - + " #{schemaMeta.metalakeId}," - + " #{schemaMeta.catalogId}," - + " #{schemaMeta.schemaComment}," - + " #{schemaMeta.properties}," - + " #{schemaMeta.auditInfo}," - + " #{schemaMeta.currentVersion}," - + " #{schemaMeta.lastVersion}," - + " #{schemaMeta.deletedAt}" - + " )" - + " ON DUPLICATE KEY UPDATE" - + " schema_name = #{schemaMeta.schemaName}," - + " metalake_id = #{schemaMeta.metalakeId}," - + " catalog_id = #{schemaMeta.catalogId}," - + " schema_comment = #{schemaMeta.schemaComment}," - + " properties = #{schemaMeta.properties}," - + " audit_info = #{schemaMeta.auditInfo}," - + " current_version = #{schemaMeta.currentVersion}," - + " last_version = #{schemaMeta.lastVersion}," - + " deleted_at = #{schemaMeta.deletedAt}") + @InsertProvider( + type = SchemaMetaSQLProviderFactory.class, + method = "insertSchemaMetaOnDuplicateKeyUpdate") void insertSchemaMetaOnDuplicateKeyUpdate(@Param("schemaMeta") SchemaPO schemaPO); - @Update( - "UPDATE " - + TABLE_NAME - + " SET schema_name = #{newSchemaMeta.schemaName}," - + " metalake_id = #{newSchemaMeta.metalakeId}," - + " catalog_id = #{newSchemaMeta.catalogId}," - + " schema_comment = #{newSchemaMeta.schemaComment}," - + " properties = #{newSchemaMeta.properties}," - + " audit_info = #{newSchemaMeta.auditInfo}," - + " current_version = #{newSchemaMeta.currentVersion}," - + " last_version = #{newSchemaMeta.lastVersion}," - + " deleted_at = #{newSchemaMeta.deletedAt}" - + " WHERE schema_id = #{oldSchemaMeta.schemaId}" - + " AND schema_name = #{oldSchemaMeta.schemaName}" - + " AND metalake_id = #{oldSchemaMeta.metalakeId}" - + " AND catalog_id = #{oldSchemaMeta.catalogId}" - + " AND (schema_comment IS NULL OR schema_comment = #{oldSchemaMeta.schemaComment})" - + " AND properties = #{oldSchemaMeta.properties}" - + " AND audit_info = #{oldSchemaMeta.auditInfo}" - + " AND current_version = #{oldSchemaMeta.currentVersion}" - + " AND last_version = #{oldSchemaMeta.lastVersion}" - + " AND deleted_at = 0") + @UpdateProvider(type = SchemaMetaSQLProviderFactory.class, method = "updateSchemaMeta") Integer updateSchemaMeta( @Param("newSchemaMeta") SchemaPO newSchemaPO, @Param("oldSchemaMeta") SchemaPO oldSchemaPO); - @Update( - "UPDATE " - + TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE schema_id = #{schemaId} AND deleted_at = 0") + @UpdateProvider( + type = SchemaMetaSQLProviderFactory.class, + method = "softDeleteSchemaMetasBySchemaId") Integer softDeleteSchemaMetasBySchemaId(@Param("schemaId") Long schemaId); - @Update( - "UPDATE " - + TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0") + @UpdateProvider( + type = SchemaMetaSQLProviderFactory.class, + method = "softDeleteSchemaMetasByMetalakeId") Integer softDeleteSchemaMetasByMetalakeId(@Param("metalakeId") Long metalakeId); - @Update( - "UPDATE " - + TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE catalog_id = #{catalogId} AND deleted_at = 0") + @UpdateProvider( + type = SchemaMetaSQLProviderFactory.class, + method = "softDeleteSchemaMetasByCatalogId") Integer softDeleteSchemaMetasByCatalogId(@Param("catalogId") Long catalogId); - @Delete( - "DELETE FROM " - + TABLE_NAME - + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}") + @DeleteProvider( + type = SchemaMetaSQLProviderFactory.class, + method = "deleteSchemaMetasByLegacyTimeline") Integer deleteSchemaMetasByLegacyTimeline( @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit); } diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaSQLProviderFactory.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaSQLProviderFactory.java new file mode 100644 index 00000000000..5fa6252d5b6 --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/SchemaMetaSQLProviderFactory.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.storage.relational.mapper; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType; +import org.apache.gravitino.storage.relational.po.SchemaPO; +import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper; +import org.apache.ibatis.annotations.Param; + +public class SchemaMetaSQLProviderFactory { + private static final Map + METALAKE_META_SQL_PROVIDER_MAP = + ImmutableMap.of( + JDBCBackendType.MYSQL, new SchemaMetaMySQLProvider(), + JDBCBackendType.H2, new SchemaMetaH2Provider()); + + public static SchemaMetaBaseSQLProvider getProvider() { + String databaseId = + SqlSessionFactoryHelper.getInstance() + .getSqlSessionFactory() + .getConfiguration() + .getDatabaseId(); + + JDBCBackendType jdbcBackendType = JDBCBackendType.fromString(databaseId); + return METALAKE_META_SQL_PROVIDER_MAP.get(jdbcBackendType); + } + + static class SchemaMetaMySQLProvider extends SchemaMetaBaseSQLProvider {} + + static class SchemaMetaH2Provider extends SchemaMetaBaseSQLProvider {} + + public static String listSchemaPOsByCatalogId(@Param("catalogId") Long catalogId) { + return getProvider().listSchemaPOsByCatalogId(catalogId); + } + + public static String selectSchemaIdByCatalogIdAndName( + @Param("catalogId") Long catalogId, @Param("schemaName") String name) { + return getProvider().selectSchemaIdByCatalogIdAndName(catalogId, name); + } + + public static String selectSchemaMetaByCatalogIdAndName( + @Param("catalogId") Long catalogId, @Param("schemaName") String name) { + return getProvider().selectSchemaMetaByCatalogIdAndName(catalogId, name); + } + + public static String selectSchemaMetaById(@Param("schemaId") Long schemaId) { + return getProvider().selectSchemaMetaById(schemaId); + } + + public static String insertSchemaMeta(@Param("schemaMeta") SchemaPO schemaPO) { + return getProvider().insertSchemaMeta(schemaPO); + } + + public static String insertSchemaMetaOnDuplicateKeyUpdate( + @Param("schemaMeta") SchemaPO schemaPO) { + return getProvider().insertSchemaMetaOnDuplicateKeyUpdate(schemaPO); + } + + public static String updateSchemaMeta( + @Param("newSchemaMeta") SchemaPO newSchemaPO, @Param("oldSchemaMeta") SchemaPO oldSchemaPO) { + return getProvider().updateSchemaMeta(newSchemaPO, oldSchemaPO); + } + + public static String softDeleteSchemaMetasBySchemaId(@Param("schemaId") Long schemaId) { + return getProvider().softDeleteSchemaMetasBySchemaId(schemaId); + } + + public static String softDeleteSchemaMetasByMetalakeId(@Param("metalakeId") Long metalakeId) { + return getProvider().softDeleteSchemaMetasByMetalakeId(metalakeId); + } + + public static String softDeleteSchemaMetasByCatalogId(@Param("catalogId") Long catalogId) { + return getProvider().softDeleteSchemaMetasByCatalogId(catalogId); + } + + public static String deleteSchemaMetasByLegacyTimeline( + @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) { + return getProvider().deleteSchemaMetasByLegacyTimeline(legacyTimeline, limit); + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaBaseSQLProvider.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaBaseSQLProvider.java new file mode 100644 index 00000000000..03cf2582ff6 --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaBaseSQLProvider.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.storage.relational.mapper; + +import static org.apache.gravitino.storage.relational.mapper.TableMetaMapper.TABLE_NAME; + +import org.apache.gravitino.storage.relational.po.TablePO; +import org.apache.ibatis.annotations.Param; + +public class TableMetaBaseSQLProvider { + + public String listTablePOsBySchemaId(@Param("schemaId") Long schemaId) { + return "SELECT table_id as tableId, table_name as tableName," + + " metalake_id as metalakeId, catalog_id as catalogId," + + " schema_id as schemaId, audit_info as auditInfo," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE schema_id = #{schemaId} AND deleted_at = 0"; + } + + public String selectTableIdBySchemaIdAndName( + @Param("schemaId") Long schemaId, @Param("tableName") String name) { + return "SELECT table_id as tableId FROM " + + TABLE_NAME + + " WHERE schema_id = #{schemaId} AND table_name = #{tableName}" + + " AND deleted_at = 0"; + } + + public String selectTableMetaBySchemaIdAndName( + @Param("schemaId") Long schemaId, @Param("tableName") String name) { + return "SELECT table_id as tableId, table_name as tableName," + + " metalake_id as metalakeId, catalog_id as catalogId," + + " schema_id as schemaId, audit_info as auditInfo," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE schema_id = #{schemaId} AND table_name = #{tableName} AND deleted_at = 0"; + } + + public String selectTableMetaById(@Param("tableId") Long tableId) { + return "SELECT table_id as tableId, table_name as tableName," + + " metalake_id as metalakeId, catalog_id as catalogId," + + " schema_id as schemaId, audit_info as auditInfo," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE table_id = #{tableId} AND deleted_at = 0"; + } + + public String insertTableMeta(@Param("tableMeta") TablePO tablePO) { + return "INSERT INTO " + + TABLE_NAME + + "(table_id, table_name, metalake_id," + + " catalog_id, schema_id, audit_info," + + " current_version, last_version, deleted_at)" + + " VALUES(" + + " #{tableMeta.tableId}," + + " #{tableMeta.tableName}," + + " #{tableMeta.metalakeId}," + + " #{tableMeta.catalogId}," + + " #{tableMeta.schemaId}," + + " #{tableMeta.auditInfo}," + + " #{tableMeta.currentVersion}," + + " #{tableMeta.lastVersion}," + + " #{tableMeta.deletedAt}" + + " )"; + } + + public String insertTableMetaOnDuplicateKeyUpdate(@Param("tableMeta") TablePO tablePO) { + return "INSERT INTO " + + TABLE_NAME + + "(table_id, table_name, metalake_id," + + " catalog_id, schema_id, audit_info," + + " current_version, last_version, deleted_at)" + + " VALUES(" + + " #{tableMeta.tableId}," + + " #{tableMeta.tableName}," + + " #{tableMeta.metalakeId}," + + " #{tableMeta.catalogId}," + + " #{tableMeta.schemaId}," + + " #{tableMeta.auditInfo}," + + " #{tableMeta.currentVersion}," + + " #{tableMeta.lastVersion}," + + " #{tableMeta.deletedAt}" + + " )" + + " ON DUPLICATE KEY UPDATE" + + " table_name = #{tableMeta.tableName}," + + " metalake_id = #{tableMeta.metalakeId}," + + " catalog_id = #{tableMeta.catalogId}," + + " schema_id = #{tableMeta.schemaId}," + + " audit_info = #{tableMeta.auditInfo}," + + " current_version = #{tableMeta.currentVersion}," + + " last_version = #{tableMeta.lastVersion}," + + " deleted_at = #{tableMeta.deletedAt}"; + } + + public String updateTableMeta( + @Param("newTableMeta") TablePO newTablePO, @Param("oldTableMeta") TablePO oldTablePO) { + return "UPDATE " + + TABLE_NAME + + " SET table_name = #{newTableMeta.tableName}," + + " metalake_id = #{newTableMeta.metalakeId}," + + " catalog_id = #{newTableMeta.catalogId}," + + " schema_id = #{newTableMeta.schemaId}," + + " audit_info = #{newTableMeta.auditInfo}," + + " current_version = #{newTableMeta.currentVersion}," + + " last_version = #{newTableMeta.lastVersion}," + + " deleted_at = #{newTableMeta.deletedAt}" + + " WHERE table_id = #{oldTableMeta.tableId}" + + " AND table_name = #{oldTableMeta.tableName}" + + " AND metalake_id = #{oldTableMeta.metalakeId}" + + " AND catalog_id = #{oldTableMeta.catalogId}" + + " AND schema_id = #{oldTableMeta.schemaId}" + + " AND audit_info = #{oldTableMeta.auditInfo}" + + " AND current_version = #{oldTableMeta.currentVersion}" + + " AND last_version = #{oldTableMeta.lastVersion}" + + " AND deleted_at = 0"; + } + + public String softDeleteTableMetasByTableId(@Param("tableId") Long tableId) { + return "UPDATE " + + TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE table_id = #{tableId} AND deleted_at = 0"; + } + + public String softDeleteTableMetasByMetalakeId(@Param("metalakeId") Long metalakeId) { + return "UPDATE " + + TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0"; + } + + public String softDeleteTableMetasByCatalogId(@Param("catalogId") Long catalogId) { + return "UPDATE " + + TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE catalog_id = #{catalogId} AND deleted_at = 0"; + } + + public String softDeleteTableMetasBySchemaId(@Param("schemaId") Long schemaId) { + return "UPDATE " + + TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE schema_id = #{schemaId} AND deleted_at = 0"; + } + + public String deleteTableMetasByLegacyTimeline( + @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) { + return "DELETE FROM " + + TABLE_NAME + + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}"; + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaMapper.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaMapper.java index 98edc595fd7..a5224593765 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaMapper.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaMapper.java @@ -21,11 +21,11 @@ import java.util.List; import org.apache.gravitino.storage.relational.po.TablePO; -import org.apache.ibatis.annotations.Delete; -import org.apache.ibatis.annotations.Insert; +import org.apache.ibatis.annotations.DeleteProvider; +import org.apache.ibatis.annotations.InsertProvider; import org.apache.ibatis.annotations.Param; -import org.apache.ibatis.annotations.Select; -import org.apache.ibatis.annotations.Update; +import org.apache.ibatis.annotations.SelectProvider; +import org.apache.ibatis.annotations.UpdateProvider; /** * A MyBatis Mapper for table meta operation SQLs. @@ -38,154 +38,59 @@ public interface TableMetaMapper { String TABLE_NAME = "table_meta"; - @Select( - "SELECT table_id as tableId, table_name as tableName," - + " metalake_id as metalakeId, catalog_id as catalogId," - + " schema_id as schemaId, audit_info as auditInfo," - + " current_version as currentVersion, last_version as lastVersion," - + " deleted_at as deletedAt" - + " FROM " - + TABLE_NAME - + " WHERE schema_id = #{schemaId} AND deleted_at = 0") + @SelectProvider(type = TableMetaSQLProviderFactory.class, method = "listTablePOsBySchemaId") List listTablePOsBySchemaId(@Param("schemaId") Long schemaId); - @Select( - "SELECT table_id as tableId FROM " - + TABLE_NAME - + " WHERE schema_id = #{schemaId} AND table_name = #{tableName}" - + " AND deleted_at = 0") + @SelectProvider( + type = TableMetaSQLProviderFactory.class, + method = "selectTableIdBySchemaIdAndName") Long selectTableIdBySchemaIdAndName( @Param("schemaId") Long schemaId, @Param("tableName") String name); - @Select( - "SELECT table_id as tableId, table_name as tableName," - + " metalake_id as metalakeId, catalog_id as catalogId," - + " schema_id as schemaId, audit_info as auditInfo," - + " current_version as currentVersion, last_version as lastVersion," - + " deleted_at as deletedAt" - + " FROM " - + TABLE_NAME - + " WHERE schema_id = #{schemaId} AND table_name = #{tableName} AND deleted_at = 0") + @SelectProvider( + type = TableMetaSQLProviderFactory.class, + method = "selectTableMetaBySchemaIdAndName") TablePO selectTableMetaBySchemaIdAndName( @Param("schemaId") Long schemaId, @Param("tableName") String name); - @Select( - "SELECT table_id as tableId, table_name as tableName," - + " metalake_id as metalakeId, catalog_id as catalogId," - + " schema_id as schemaId, audit_info as auditInfo," - + " current_version as currentVersion, last_version as lastVersion," - + " deleted_at as deletedAt" - + " FROM " - + TABLE_NAME - + " WHERE table_id = #{tableId} AND deleted_at = 0") + @SelectProvider(type = TableMetaSQLProviderFactory.class, method = "selectTableMetaById") TablePO selectTableMetaById(@Param("tableId") Long tableId); - @Insert( - "INSERT INTO " - + TABLE_NAME - + "(table_id, table_name, metalake_id," - + " catalog_id, schema_id, audit_info," - + " current_version, last_version, deleted_at)" - + " VALUES(" - + " #{tableMeta.tableId}," - + " #{tableMeta.tableName}," - + " #{tableMeta.metalakeId}," - + " #{tableMeta.catalogId}," - + " #{tableMeta.schemaId}," - + " #{tableMeta.auditInfo}," - + " #{tableMeta.currentVersion}," - + " #{tableMeta.lastVersion}," - + " #{tableMeta.deletedAt}" - + " )") + @InsertProvider(type = TableMetaSQLProviderFactory.class, method = "insertTableMeta") void insertTableMeta(@Param("tableMeta") TablePO tablePO); - @Insert( - "INSERT INTO " - + TABLE_NAME - + "(table_id, table_name, metalake_id," - + " catalog_id, schema_id, audit_info," - + " current_version, last_version, deleted_at)" - + " VALUES(" - + " #{tableMeta.tableId}," - + " #{tableMeta.tableName}," - + " #{tableMeta.metalakeId}," - + " #{tableMeta.catalogId}," - + " #{tableMeta.schemaId}," - + " #{tableMeta.auditInfo}," - + " #{tableMeta.currentVersion}," - + " #{tableMeta.lastVersion}," - + " #{tableMeta.deletedAt}" - + " )" - + " ON DUPLICATE KEY UPDATE" - + " table_name = #{tableMeta.tableName}," - + " metalake_id = #{tableMeta.metalakeId}," - + " catalog_id = #{tableMeta.catalogId}," - + " schema_id = #{tableMeta.schemaId}," - + " audit_info = #{tableMeta.auditInfo}," - + " current_version = #{tableMeta.currentVersion}," - + " last_version = #{tableMeta.lastVersion}," - + " deleted_at = #{tableMeta.deletedAt}") + @InsertProvider( + type = TableMetaSQLProviderFactory.class, + method = "insertTableMetaOnDuplicateKeyUpdate") void insertTableMetaOnDuplicateKeyUpdate(@Param("tableMeta") TablePO tablePO); - @Update( - "UPDATE " - + TABLE_NAME - + " SET table_name = #{newTableMeta.tableName}," - + " metalake_id = #{newTableMeta.metalakeId}," - + " catalog_id = #{newTableMeta.catalogId}," - + " schema_id = #{newTableMeta.schemaId}," - + " audit_info = #{newTableMeta.auditInfo}," - + " current_version = #{newTableMeta.currentVersion}," - + " last_version = #{newTableMeta.lastVersion}," - + " deleted_at = #{newTableMeta.deletedAt}" - + " WHERE table_id = #{oldTableMeta.tableId}" - + " AND table_name = #{oldTableMeta.tableName}" - + " AND metalake_id = #{oldTableMeta.metalakeId}" - + " AND catalog_id = #{oldTableMeta.catalogId}" - + " AND schema_id = #{oldTableMeta.schemaId}" - + " AND audit_info = #{oldTableMeta.auditInfo}" - + " AND current_version = #{oldTableMeta.currentVersion}" - + " AND last_version = #{oldTableMeta.lastVersion}" - + " AND deleted_at = 0") + @UpdateProvider(type = TableMetaSQLProviderFactory.class, method = "updateTableMeta") Integer updateTableMeta( @Param("newTableMeta") TablePO newTablePO, @Param("oldTableMeta") TablePO oldTablePO); - @Update( - "UPDATE " - + TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE table_id = #{tableId} AND deleted_at = 0") + @UpdateProvider( + type = TableMetaSQLProviderFactory.class, + method = "softDeleteTableMetasByTableId") Integer softDeleteTableMetasByTableId(@Param("tableId") Long tableId); - @Update( - "UPDATE " - + TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0") + @UpdateProvider( + type = TableMetaSQLProviderFactory.class, + method = "softDeleteTableMetasByMetalakeId") Integer softDeleteTableMetasByMetalakeId(@Param("metalakeId") Long metalakeId); - @Update( - "UPDATE " - + TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE catalog_id = #{catalogId} AND deleted_at = 0") + @UpdateProvider( + type = TableMetaSQLProviderFactory.class, + method = "softDeleteTableMetasByCatalogId") Integer softDeleteTableMetasByCatalogId(@Param("catalogId") Long catalogId); - @Update( - "UPDATE " - + TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE schema_id = #{schemaId} AND deleted_at = 0") + @UpdateProvider( + type = TableMetaSQLProviderFactory.class, + method = "softDeleteTableMetasBySchemaId") Integer softDeleteTableMetasBySchemaId(@Param("schemaId") Long schemaId); - @Delete( - "DELETE FROM " - + TABLE_NAME - + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}") + @DeleteProvider( + type = TableMetaSQLProviderFactory.class, + method = "deleteTableMetasByLegacyTimeline") Integer deleteTableMetasByLegacyTimeline( @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit); } diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaSQLProviderFactory.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaSQLProviderFactory.java new file mode 100644 index 00000000000..833ba9a059d --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TableMetaSQLProviderFactory.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.storage.relational.mapper; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType; +import org.apache.gravitino.storage.relational.po.TablePO; +import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper; +import org.apache.ibatis.annotations.Param; + +public class TableMetaSQLProviderFactory { + + private static final Map + METALAKE_META_SQL_PROVIDER_MAP = + ImmutableMap.of( + JDBCBackendType.MYSQL, new TableMetaMySQLProvider(), + JDBCBackendType.H2, new TableMetaH2Provider()); + + public static TableMetaBaseSQLProvider getProvider() { + String databaseId = + SqlSessionFactoryHelper.getInstance() + .getSqlSessionFactory() + .getConfiguration() + .getDatabaseId(); + + JDBCBackendType jdbcBackendType = JDBCBackendType.fromString(databaseId); + return METALAKE_META_SQL_PROVIDER_MAP.get(jdbcBackendType); + } + + static class TableMetaMySQLProvider extends TableMetaBaseSQLProvider {} + + static class TableMetaH2Provider extends TableMetaBaseSQLProvider {} + + public static String listTablePOsBySchemaId(@Param("schemaId") Long schemaId) { + return getProvider().listTablePOsBySchemaId(schemaId); + } + + public static String selectTableIdBySchemaIdAndName( + @Param("schemaId") Long schemaId, @Param("tableName") String name) { + return getProvider().selectTableIdBySchemaIdAndName(schemaId, name); + } + + public static String selectTableMetaBySchemaIdAndName( + @Param("schemaId") Long schemaId, @Param("tableName") String name) { + return getProvider().selectTableMetaBySchemaIdAndName(schemaId, name); + } + + public static String selectTableMetaById(@Param("tableId") Long tableId) { + return getProvider().selectTableMetaById(tableId); + } + + public static String insertTableMeta(@Param("tableMeta") TablePO tablePO) { + return getProvider().insertTableMeta(tablePO); + } + + public static String insertTableMetaOnDuplicateKeyUpdate(@Param("tableMeta") TablePO tablePO) { + return getProvider().insertTableMetaOnDuplicateKeyUpdate(tablePO); + } + + public static String updateTableMeta( + @Param("newTableMeta") TablePO newTablePO, @Param("oldTableMeta") TablePO oldTablePO) { + return getProvider().updateTableMeta(newTablePO, oldTablePO); + } + + public static String softDeleteTableMetasByTableId(@Param("tableId") Long tableId) { + return getProvider().softDeleteTableMetasByTableId(tableId); + } + + public static String softDeleteTableMetasByMetalakeId(@Param("metalakeId") Long metalakeId) { + return getProvider().softDeleteTableMetasByMetalakeId(metalakeId); + } + + public static String softDeleteTableMetasByCatalogId(@Param("catalogId") Long catalogId) { + return getProvider().softDeleteTableMetasByCatalogId(catalogId); + } + + public static String softDeleteTableMetasBySchemaId(@Param("schemaId") Long schemaId) { + return getProvider().softDeleteTableMetasBySchemaId(schemaId); + } + + public static String deleteTableMetasByLegacyTimeline( + @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) { + return getProvider().deleteTableMetasByLegacyTimeline(legacyTimeline, limit); + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaBaseSQLProvider.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaBaseSQLProvider.java new file mode 100644 index 00000000000..aa46bbd7f9c --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaBaseSQLProvider.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.storage.relational.mapper; + +import static org.apache.gravitino.storage.relational.mapper.TopicMetaMapper.TABLE_NAME; + +import org.apache.gravitino.storage.relational.po.TopicPO; +import org.apache.ibatis.annotations.Param; + +public class TopicMetaBaseSQLProvider { + + public String insertTopicMeta(@Param("topicMeta") TopicPO topicPO) { + return "INSERT INTO " + + TABLE_NAME + + "(topic_id, topic_name, metalake_id, catalog_id, schema_id," + + " comment, properties, audit_info, current_version, last_version," + + " deleted_at)" + + " VALUES(" + + " #{topicMeta.topicId}," + + " #{topicMeta.topicName}," + + " #{topicMeta.metalakeId}," + + " #{topicMeta.catalogId}," + + " #{topicMeta.schemaId}," + + " #{topicMeta.comment}," + + " #{topicMeta.properties}," + + " #{topicMeta.auditInfo}," + + " #{topicMeta.currentVersion}," + + " #{topicMeta.lastVersion}," + + " #{topicMeta.deletedAt}" + + " )"; + } + + public String insertTopicMetaOnDuplicateKeyUpdate(@Param("topicMeta") TopicPO topicPO) { + return "INSERT INTO " + + TABLE_NAME + + "(topic_id, topic_name, metalake_id, catalog_id, schema_id," + + " comment, properties, audit_info, current_version, last_version," + + " deleted_at)" + + " VALUES(" + + " #{topicMeta.topicId}," + + " #{topicMeta.topicName}," + + " #{topicMeta.metalakeId}," + + " #{topicMeta.catalogId}," + + " #{topicMeta.schemaId}," + + " #{topicMeta.comment}," + + " #{topicMeta.properties}," + + " #{topicMeta.auditInfo}," + + " #{topicMeta.currentVersion}," + + " #{topicMeta.lastVersion}," + + " #{topicMeta.deletedAt}" + + " )" + + " ON DUPLICATE KEY UPDATE" + + " topic_name = #{topicMeta.topicName}," + + " metalake_id = #{topicMeta.metalakeId}," + + " catalog_id = #{topicMeta.catalogId}," + + " schema_id = #{topicMeta.schemaId}," + + " comment = #{topicMeta.comment}," + + " properties = #{topicMeta.properties}," + + " audit_info = #{topicMeta.auditInfo}," + + " current_version = #{topicMeta.currentVersion}," + + " last_version = #{topicMeta.lastVersion}," + + " deleted_at = #{topicMeta.deletedAt}"; + } + + public String listTopicPOsBySchemaId(@Param("schemaId") Long schemaId) { + return "SELECT topic_id as topicId, topic_name as topicName, metalake_id as metalakeId," + + " catalog_id as catalogId, schema_id as schemaId," + + " comment as comment, properties as properties, audit_info as auditInfo," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE schema_id = #{schemaId} AND deleted_at = 0"; + } + + public String selectTopicMetaBySchemaIdAndName( + @Param("schemaId") Long schemaId, @Param("topicName") String topicName) { + return "SELECT topic_id as topicId, topic_name as topicName," + + " metalake_id as metalakeId, catalog_id as catalogId, schema_id as schemaId," + + " comment as comment, properties as properties, audit_info as auditInfo," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE schema_id = #{schemaId} AND topic_name = #{topicName} AND deleted_at = 0"; + } + + public String selectTopicMetaById(@Param("topicId") Long topicId) { + return "SELECT topic_id as topicId, topic_name as topicName," + + " metalake_id as metalakeId, catalog_id as catalogId, schema_id as schemaId," + + " comment as comment, properties as properties, audit_info as auditInfo," + + " current_version as currentVersion, last_version as lastVersion," + + " deleted_at as deletedAt" + + " FROM " + + TABLE_NAME + + " WHERE topic_id = #{topicId} AND deleted_at = 0"; + } + + public String updateTopicMeta( + @Param("newTopicMeta") TopicPO newTopicPO, @Param("oldTopicMeta") TopicPO oldTopicPO) { + return "UPDATE " + + TABLE_NAME + + " SET topic_name = #{newTopicMeta.topicName}," + + " metalake_id = #{newTopicMeta.metalakeId}," + + " catalog_id = #{newTopicMeta.catalogId}," + + " schema_id = #{newTopicMeta.schemaId}," + + " comment = #{newTopicMeta.comment}," + + " properties = #{newTopicMeta.properties}," + + " audit_info = #{newTopicMeta.auditInfo}," + + " current_version = #{newTopicMeta.currentVersion}," + + " last_version = #{newTopicMeta.lastVersion}," + + " deleted_at = #{newTopicMeta.deletedAt}" + + " WHERE topic_id = #{oldTopicMeta.topicId}" + + " AND topic_name = #{oldTopicMeta.topicName}" + + " AND metalake_id = #{oldTopicMeta.metalakeId}" + + " AND catalog_id = #{oldTopicMeta.catalogId}" + + " AND schema_id = #{oldTopicMeta.schemaId}" + + " AND comment = #{oldTopicMeta.comment}" + + " AND properties = #{oldTopicMeta.properties}" + + " AND audit_info = #{oldTopicMeta.auditInfo}" + + " AND current_version = #{oldTopicMeta.currentVersion}" + + " AND last_version = #{oldTopicMeta.lastVersion}" + + " AND deleted_at = 0"; + } + + public String selectTopicIdBySchemaIdAndName( + @Param("schemaId") Long schemaId, @Param("topicName") String name) { + return "SELECT topic_id as topicId FROM " + + TABLE_NAME + + " WHERE schema_id = #{schemaId} AND topic_name = #{topicName}" + + " AND deleted_at = 0"; + } + + public String softDeleteTopicMetasByTopicId(@Param("topicId") Long topicId) { + return "UPDATE " + + TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE topic_id = #{topicId} AND deleted_at = 0"; + } + + public String softDeleteTopicMetasByCatalogId(@Param("catalogId") Long catalogId) { + return "UPDATE " + + TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE catalog_id = #{catalogId} AND deleted_at = 0"; + } + + public String softDeleteTopicMetasByMetalakeId(@Param("metalakeId") Long metalakeId) { + return "UPDATE " + + TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0"; + } + + public String softDeleteTopicMetasBySchemaId(@Param("schemaId") Long schemaId) { + return "UPDATE " + + TABLE_NAME + + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" + + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" + + " WHERE schema_id = #{schemaId} AND deleted_at = 0"; + } + + public String deleteTopicMetasByLegacyTimeline( + @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) { + return "DELETE FROM " + + TABLE_NAME + + " WHERE deleted_at != 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}"; + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaMapper.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaMapper.java index d8d08286e38..8c194caff4f 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaMapper.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaMapper.java @@ -20,173 +20,68 @@ import java.util.List; import org.apache.gravitino.storage.relational.po.TopicPO; -import org.apache.ibatis.annotations.Delete; -import org.apache.ibatis.annotations.Insert; +import org.apache.ibatis.annotations.DeleteProvider; +import org.apache.ibatis.annotations.InsertProvider; import org.apache.ibatis.annotations.Param; -import org.apache.ibatis.annotations.Select; -import org.apache.ibatis.annotations.Update; +import org.apache.ibatis.annotations.SelectProvider; +import org.apache.ibatis.annotations.UpdateProvider; public interface TopicMetaMapper { String TABLE_NAME = "topic_meta"; - @Insert( - "INSERT INTO " - + TABLE_NAME - + "(topic_id, topic_name, metalake_id, catalog_id, schema_id," - + " comment, properties, audit_info, current_version, last_version," - + " deleted_at)" - + " VALUES(" - + " #{topicMeta.topicId}," - + " #{topicMeta.topicName}," - + " #{topicMeta.metalakeId}," - + " #{topicMeta.catalogId}," - + " #{topicMeta.schemaId}," - + " #{topicMeta.comment}," - + " #{topicMeta.properties}," - + " #{topicMeta.auditInfo}," - + " #{topicMeta.currentVersion}," - + " #{topicMeta.lastVersion}," - + " #{topicMeta.deletedAt}" - + " )") + @InsertProvider(type = TopicMetaSQLProviderFactory.class, method = "insertTopicMeta") void insertTopicMeta(@Param("topicMeta") TopicPO topicPO); - @Insert( - "INSERT INTO " - + TABLE_NAME - + "(topic_id, topic_name, metalake_id, catalog_id, schema_id," - + " comment, properties, audit_info, current_version, last_version," - + " deleted_at)" - + " VALUES(" - + " #{topicMeta.topicId}," - + " #{topicMeta.topicName}," - + " #{topicMeta.metalakeId}," - + " #{topicMeta.catalogId}," - + " #{topicMeta.schemaId}," - + " #{topicMeta.comment}," - + " #{topicMeta.properties}," - + " #{topicMeta.auditInfo}," - + " #{topicMeta.currentVersion}," - + " #{topicMeta.lastVersion}," - + " #{topicMeta.deletedAt}" - + " )" - + " ON DUPLICATE KEY UPDATE" - + " topic_name = #{topicMeta.topicName}," - + " metalake_id = #{topicMeta.metalakeId}," - + " catalog_id = #{topicMeta.catalogId}," - + " schema_id = #{topicMeta.schemaId}," - + " comment = #{topicMeta.comment}," - + " properties = #{topicMeta.properties}," - + " audit_info = #{topicMeta.auditInfo}," - + " current_version = #{topicMeta.currentVersion}," - + " last_version = #{topicMeta.lastVersion}," - + " deleted_at = #{topicMeta.deletedAt}") + @InsertProvider( + type = TopicMetaSQLProviderFactory.class, + method = "insertTopicMetaOnDuplicateKeyUpdate") void insertTopicMetaOnDuplicateKeyUpdate(@Param("topicMeta") TopicPO topicPO); - @Select( - "SELECT topic_id as topicId, topic_name as topicName, metalake_id as metalakeId," - + " catalog_id as catalogId, schema_id as schemaId," - + " comment as comment, properties as properties, audit_info as auditInfo," - + " current_version as currentVersion, last_version as lastVersion," - + " deleted_at as deletedAt" - + " FROM " - + TABLE_NAME - + " WHERE schema_id = #{schemaId} AND deleted_at = 0") + @SelectProvider(type = TopicMetaSQLProviderFactory.class, method = "listTopicPOsBySchemaId") List listTopicPOsBySchemaId(@Param("schemaId") Long schemaId); - @Select( - "SELECT topic_id as topicId, topic_name as topicName," - + " metalake_id as metalakeId, catalog_id as catalogId, schema_id as schemaId," - + " comment as comment, properties as properties, audit_info as auditInfo," - + " current_version as currentVersion, last_version as lastVersion," - + " deleted_at as deletedAt" - + " FROM " - + TABLE_NAME - + " WHERE schema_id = #{schemaId} AND topic_name = #{topicName} AND deleted_at = 0") + @SelectProvider( + type = TopicMetaSQLProviderFactory.class, + method = "selectTopicMetaBySchemaIdAndName") TopicPO selectTopicMetaBySchemaIdAndName( @Param("schemaId") Long schemaId, @Param("topicName") String topicName); - @Select( - "SELECT topic_id as topicId, topic_name as topicName," - + " metalake_id as metalakeId, catalog_id as catalogId, schema_id as schemaId," - + " comment as comment, properties as properties, audit_info as auditInfo," - + " current_version as currentVersion, last_version as lastVersion," - + " deleted_at as deletedAt" - + " FROM " - + TABLE_NAME - + " WHERE topic_id = #{topicId} AND deleted_at = 0") + @SelectProvider(type = TopicMetaSQLProviderFactory.class, method = "selectTopicMetaById") TopicPO selectTopicMetaById(@Param("topicId") Long topicId); - @Update( - "UPDATE " - + TABLE_NAME - + " SET topic_name = #{newTopicMeta.topicName}," - + " metalake_id = #{newTopicMeta.metalakeId}," - + " catalog_id = #{newTopicMeta.catalogId}," - + " schema_id = #{newTopicMeta.schemaId}," - + " comment = #{newTopicMeta.comment}," - + " properties = #{newTopicMeta.properties}," - + " audit_info = #{newTopicMeta.auditInfo}," - + " current_version = #{newTopicMeta.currentVersion}," - + " last_version = #{newTopicMeta.lastVersion}," - + " deleted_at = #{newTopicMeta.deletedAt}" - + " WHERE topic_id = #{oldTopicMeta.topicId}" - + " AND topic_name = #{oldTopicMeta.topicName}" - + " AND metalake_id = #{oldTopicMeta.metalakeId}" - + " AND catalog_id = #{oldTopicMeta.catalogId}" - + " AND schema_id = #{oldTopicMeta.schemaId}" - + " AND comment = #{oldTopicMeta.comment}" - + " AND properties = #{oldTopicMeta.properties}" - + " AND audit_info = #{oldTopicMeta.auditInfo}" - + " AND current_version = #{oldTopicMeta.currentVersion}" - + " AND last_version = #{oldTopicMeta.lastVersion}" - + " AND deleted_at = 0") + @UpdateProvider(type = TopicMetaSQLProviderFactory.class, method = "updateTopicMeta") Integer updateTopicMeta( @Param("newTopicMeta") TopicPO newTopicPO, @Param("oldTopicMeta") TopicPO oldTopicPO); - @Select( - "SELECT topic_id as topicId FROM " - + TABLE_NAME - + " WHERE schema_id = #{schemaId} AND topic_name = #{topicName}" - + " AND deleted_at = 0") + @SelectProvider( + type = TopicMetaSQLProviderFactory.class, + method = "selectTopicIdBySchemaIdAndName") Long selectTopicIdBySchemaIdAndName( @Param("schemaId") Long schemaId, @Param("topicName") String name); - @Update( - "UPDATE " - + TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE topic_id = #{topicId} AND deleted_at = 0") + @UpdateProvider( + type = TopicMetaSQLProviderFactory.class, + method = "softDeleteTopicMetasByTopicId") Integer softDeleteTopicMetasByTopicId(@Param("topicId") Long topicId); - @Update( - "UPDATE " - + TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE catalog_id = #{catalogId} AND deleted_at = 0") + @UpdateProvider( + type = TopicMetaSQLProviderFactory.class, + method = "softDeleteTopicMetasByCatalogId") Integer softDeleteTopicMetasByCatalogId(@Param("catalogId") Long catalogId); - @Update( - "UPDATE " - + TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0") + @UpdateProvider( + type = TopicMetaSQLProviderFactory.class, + method = "softDeleteTopicMetasByMetalakeId") Integer softDeleteTopicMetasByMetalakeId(@Param("metalakeId") Long metalakeId); - @Update( - "UPDATE " - + TABLE_NAME - + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)" - + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000" - + " WHERE schema_id = #{schemaId} AND deleted_at = 0") + @UpdateProvider( + type = TopicMetaSQLProviderFactory.class, + method = "softDeleteTopicMetasBySchemaId") Integer softDeleteTopicMetasBySchemaId(@Param("schemaId") Long schemaId); - @Delete( - "DELETE FROM " - + TABLE_NAME - + " WHERE deleted_at != 0 AND deleted_at < #{legacyTimeline} LIMIT #{limit}") + @DeleteProvider( + type = TopicMetaSQLProviderFactory.class, + method = "deleteTopicMetasByLegacyTimeline") Integer deleteTopicMetasByLegacyTimeline( @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit); } diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaSQLProviderFactory.java b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaSQLProviderFactory.java new file mode 100644 index 00000000000..9a417e011a7 --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/TopicMetaSQLProviderFactory.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.storage.relational.mapper; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType; +import org.apache.gravitino.storage.relational.po.TopicPO; +import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper; +import org.apache.ibatis.annotations.Param; + +public class TopicMetaSQLProviderFactory { + + private static final Map + METALAKE_META_SQL_PROVIDER_MAP = + ImmutableMap.of( + JDBCBackendType.MYSQL, new TopicMetaMySQLProvider(), + JDBCBackendType.H2, new TopicMetaH2Provider()); + + public static TopicMetaBaseSQLProvider getProvider() { + String databaseId = + SqlSessionFactoryHelper.getInstance() + .getSqlSessionFactory() + .getConfiguration() + .getDatabaseId(); + + JDBCBackendType jdbcBackendType = JDBCBackendType.fromString(databaseId); + return METALAKE_META_SQL_PROVIDER_MAP.get(jdbcBackendType); + } + + static class TopicMetaMySQLProvider extends TopicMetaBaseSQLProvider {} + + static class TopicMetaH2Provider extends TopicMetaBaseSQLProvider {} + + public static String insertTopicMeta(@Param("topicMeta") TopicPO topicPO) { + return getProvider().insertTopicMeta(topicPO); + } + + public static String insertTopicMetaOnDuplicateKeyUpdate(@Param("topicMeta") TopicPO topicPO) { + return getProvider().insertTopicMetaOnDuplicateKeyUpdate(topicPO); + } + + public static String listTopicPOsBySchemaId(@Param("schemaId") Long schemaId) { + return getProvider().listTopicPOsBySchemaId(schemaId); + } + + public static String selectTopicMetaBySchemaIdAndName( + @Param("schemaId") Long schemaId, @Param("topicName") String topicName) { + return getProvider().selectTopicMetaBySchemaIdAndName(schemaId, topicName); + } + + public static String selectTopicMetaById(@Param("topicId") Long topicId) { + return getProvider().selectTopicMetaById(topicId); + } + + public static String updateTopicMeta( + @Param("newTopicMeta") TopicPO newTopicPO, @Param("oldTopicMeta") TopicPO oldTopicPO) { + return getProvider().updateTopicMeta(newTopicPO, oldTopicPO); + } + + public static String selectTopicIdBySchemaIdAndName( + @Param("schemaId") Long schemaId, @Param("topicName") String name) { + return getProvider().selectTopicIdBySchemaIdAndName(schemaId, name); + } + + public static String softDeleteTopicMetasByTopicId(@Param("topicId") Long topicId) { + return getProvider().softDeleteTopicMetasByTopicId(topicId); + } + + public static String softDeleteTopicMetasByCatalogId(@Param("catalogId") Long catalogId) { + return getProvider().softDeleteTopicMetasByCatalogId(catalogId); + } + + public static String softDeleteTopicMetasByMetalakeId(@Param("metalakeId") Long metalakeId) { + return getProvider().softDeleteTopicMetasByMetalakeId(metalakeId); + } + + public static String softDeleteTopicMetasBySchemaId(@Param("schemaId") Long schemaId) { + return getProvider().softDeleteTopicMetasBySchemaId(schemaId); + } + + public static String deleteTopicMetasByLegacyTimeline( + @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) { + return getProvider().deleteTopicMetasByLegacyTimeline(legacyTimeline, limit); + } +} diff --git a/core/src/main/java/org/apache/gravitino/storage/relational/session/SqlSessionFactoryHelper.java b/core/src/main/java/org/apache/gravitino/storage/relational/session/SqlSessionFactoryHelper.java index 684252f17ee..9d928271623 100644 --- a/core/src/main/java/org/apache/gravitino/storage/relational/session/SqlSessionFactoryHelper.java +++ b/core/src/main/java/org/apache/gravitino/storage/relational/session/SqlSessionFactoryHelper.java @@ -26,6 +26,7 @@ import org.apache.commons.pool2.impl.BaseObjectPoolConfig; import org.apache.gravitino.Config; import org.apache.gravitino.Configs; +import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType; import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper; import org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper; import org.apache.gravitino.storage.relational.mapper.FilesetVersionMapper; @@ -73,7 +74,9 @@ private SqlSessionFactoryHelper() {} public void init(Config config) { // Initialize the data source BasicDataSource dataSource = new BasicDataSource(); - dataSource.setUrl(config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_URL)); + String jdbcUrl = config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_URL); + JDBCBackendType jdbcType = JDBCBackendType.fromURI(jdbcUrl); + dataSource.setUrl(jdbcUrl); dataSource.setDriverClassName(config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER)); dataSource.setUsername(config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_USER)); dataSource.setPassword(config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_PASSWORD)); @@ -102,6 +105,7 @@ public void init(Config config) { // Initialize the configuration Configuration configuration = new Configuration(environment); + configuration.setDatabaseId(jdbcType.name().toLowerCase()); configuration.addMapper(MetalakeMetaMapper.class); configuration.addMapper(CatalogMetaMapper.class); configuration.addMapper(SchemaMetaMapper.class);