diff --git a/core/src/main/java/com/datastrato/gravitino/Configs.java b/core/src/main/java/com/datastrato/gravitino/Configs.java index 4d361f43436..f92faee88da 100644 --- a/core/src/main/java/com/datastrato/gravitino/Configs.java +++ b/core/src/main/java/com/datastrato/gravitino/Configs.java @@ -57,6 +57,7 @@ public interface Configs { String DEFAULT_KV_ROCKSDB_BACKEND_PATH = String.join(File.separator, System.getenv("GRAVITINO_HOME"), "data", "rocksdb"); + int GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT = 100; long MAX_NODE_IN_MEMORY = 100000L; long MIN_NODE_IN_MEMORY = 1000L; diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java index 14a0df4fcac..aefeff0f269 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java @@ -5,6 +5,8 @@ package com.datastrato.gravitino.storage.relational; +import static com.datastrato.gravitino.Configs.GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT; + import com.datastrato.gravitino.Config; import com.datastrato.gravitino.Configs; import com.datastrato.gravitino.Entity; @@ -32,6 +34,8 @@ import java.io.IOException; import java.util.List; import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * {@link JDBCBackend} is a jdbc implementation of {@link RelationalBackend} interface. You can use @@ -41,6 +45,8 @@ */ public class JDBCBackend implements RelationalBackend { + private static final Logger LOG = LoggerFactory.getLogger(JDBCBackend.class); + /** Initialize the jdbc backend instance. */ @Override public void initialize(Config config) { @@ -167,6 +173,80 @@ public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolea } } + @Override + public int hardDeleteLegacyData(Entity.EntityType entityType, long legacyTimeLine) { + LOG.info( + "Try to physically delete {} legacy data that has been marked deleted before {}", + entityType, + legacyTimeLine); + + switch (entityType) { + case METALAKE: + return MetalakeMetaService.getInstance() + .deleteMetalakeMetasByLegacyTimeLine( + legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); + case CATALOG: + return CatalogMetaService.getInstance() + .deleteCatalogMetasByLegacyTimeLine( + legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); + case SCHEMA: + return SchemaMetaService.getInstance() + .deleteSchemaMetasByLegacyTimeLine( + legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); + case TABLE: + return TableMetaService.getInstance() + .deleteTableMetasByLegacyTimeLine( + legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); + case FILESET: + return FilesetMetaService.getInstance() + .deleteFilesetAndVersionMetasByLegacyTimeLine( + legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); + case TOPIC: + return TopicMetaService.getInstance() + .deleteTopicMetasByLegacyTimeLine( + legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); + + case COLUMN: + case USER: + case GROUP: + case AUDIT: + case ROLE: + return 0; + // TODO: Implement hard delete logic for these entity types. + + default: + throw new IllegalArgumentException( + "Unsupported entity type when collectAndRemoveLegacyData: " + entityType); + } + } + + @Override + public int deleteOldVersionData(Entity.EntityType entityType, long versionRetentionCount) { + switch (entityType) { + case METALAKE: + case CATALOG: + case SCHEMA: + case TABLE: + case COLUMN: + case TOPIC: + case USER: + case GROUP: + case AUDIT: + case ROLE: + // These entity types have not implemented multi-versions, so we can skip. + return 0; + + case FILESET: + return FilesetMetaService.getInstance() + .deleteFilesetVersionsByRetentionCount( + versionRetentionCount, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); + + default: + throw new IllegalArgumentException( + "Unsupported entity type when collectAndRemoveOldVersionData: " + entityType); + } + } + @Override public void close() throws IOException { SqlSessionFactoryHelper.getInstance().close(); diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalBackend.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalBackend.java index ea9506ed147..17e77533e42 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalBackend.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalBackend.java @@ -84,7 +84,7 @@ E get(NameIdentifier ident, Entity.EntityType throws IOException; /** - * Deletes the entity associated with the identifier and the entity type. + * Soft deletes the entity associated with the identifier and the entity type. * * @param ident The identifier of the entity. * @param entityType The type of the entity. @@ -92,4 +92,24 @@ E get(NameIdentifier ident, Entity.EntityType * @return True, if the entity was successfully deleted, else false. */ boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolean cascade); + + /** + * Permanently deletes the legacy data that has been marked as deleted before the given legacy + * timeline. + * + * @param entityType The type of the entity. + * @param legacyTimeLine The time before which the data has been marked as deleted. + * @return The count of the deleted data. + */ + int hardDeleteLegacyData(Entity.EntityType entityType, long legacyTimeLine); + + /** + * Soft deletes the old version data that is older than or equal to the given version retention + * count. + * + * @param entityType The type of the entity. + * @param versionRetentionCount The count of versions to retain. + * @return The count of the deleted data. + */ + int deleteOldVersionData(Entity.EntityType entityType, long versionRetentionCount); } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalEntityStore.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalEntityStore.java index 8427e7484af..8e6a3d2b661 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalEntityStore.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalEntityStore.java @@ -36,10 +36,13 @@ public class RelationalEntityStore implements EntityStore { ImmutableMap.of( Configs.DEFAULT_ENTITY_RELATIONAL_STORE, JDBCBackend.class.getCanonicalName()); private RelationalBackend backend; + private RelationalGarbageCollector garbageCollector; @Override public void initialize(Config config) throws RuntimeException { this.backend = createRelationalEntityBackend(config); + this.garbageCollector = new RelationalGarbageCollector(backend, config); + this.garbageCollector.start(); } private static RelationalBackend createRelationalEntityBackend(Config config) { @@ -109,6 +112,7 @@ public R executeInTransaction(Executable executab @Override public void close() throws IOException { + garbageCollector.close(); backend.close(); } } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java new file mode 100644 index 00000000000..3cc1a852782 --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java @@ -0,0 +1,97 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.storage.relational; + +import static com.datastrato.gravitino.Configs.STORE_DELETE_AFTER_TIME; +import static com.datastrato.gravitino.Configs.VERSION_RETENTION_COUNT; + +import com.datastrato.gravitino.Config; +import com.datastrato.gravitino.Entity; +import com.google.common.annotations.VisibleForTesting; +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class RelationalGarbageCollector implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(RelationalGarbageCollector.class); + private final RelationalBackend backend; + + private final long storeDeleteAfterTimeMillis; + private final long versionRetentionCount; + + @VisibleForTesting + final ScheduledExecutorService garbageCollectorPool = + new ScheduledThreadPoolExecutor( + 2, + r -> { + Thread t = new Thread(r, "RelationalBackend-Garbage-Collector"); + t.setDaemon(true); + return t; + }, + new ThreadPoolExecutor.AbortPolicy()); + + public RelationalGarbageCollector(RelationalBackend backend, Config config) { + this.backend = backend; + storeDeleteAfterTimeMillis = config.get(STORE_DELETE_AFTER_TIME); + versionRetentionCount = config.get(VERSION_RETENTION_COUNT); + } + + public void start() { + long dateTimeLineMinute = storeDeleteAfterTimeMillis / 1000 / 60; + + // We will collect garbage every 10 minutes at least. If the dateTimeLineMinute is larger than + // 100 minutes, we would collect garbage every dateTimeLineMinute/10 minutes. + long frequency = Math.max(dateTimeLineMinute / 10, 10); + garbageCollectorPool.scheduleAtFixedRate(this::collectAndClean, 5, frequency, TimeUnit.MINUTES); + } + + private void collectAndClean() { + long threadId = Thread.currentThread().getId(); + LOG.info("Thread {} start to collect garbage...", threadId); + + try { + LOG.info("Start to collect and delete legacy data by thread {}", threadId); + long legacyTimeLine = System.currentTimeMillis() - storeDeleteAfterTimeMillis; + for (Entity.EntityType entityType : Entity.EntityType.values()) { + long deletedCount = Long.MAX_VALUE; + while (deletedCount > 0) { + deletedCount = backend.hardDeleteLegacyData(entityType, legacyTimeLine); + } + } + + LOG.info("Start to collect and delete old version data by thread {}", threadId); + for (Entity.EntityType entityType : Entity.EntityType.values()) { + long deletedCount = Long.MAX_VALUE; + while (deletedCount > 0) { + deletedCount = backend.deleteOldVersionData(entityType, versionRetentionCount); + } + } + } catch (Exception e) { + LOG.error("Thread {} failed to collect and clean garbage.", threadId, e); + } finally { + LOG.info("Thread {} finish to collect garbage.", threadId); + } + } + + @Override + public void close() throws IOException { + this.garbageCollectorPool.shutdown(); + try { + if (!this.garbageCollectorPool.awaitTermination(5, TimeUnit.SECONDS)) { + this.garbageCollectorPool.shutdownNow(); + } + } catch (InterruptedException ex) { + this.garbageCollectorPool.shutdownNow(); + Thread.currentThread().interrupt(); + } + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/CatalogMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/CatalogMetaMapper.java index 038b4256f0b..5e40deb39b1 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/CatalogMetaMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/CatalogMetaMapper.java @@ -7,6 +7,7 @@ import com.datastrato.gravitino.storage.relational.po.CatalogPO; import java.util.List; +import org.apache.ibatis.annotations.Delete; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select; @@ -147,4 +148,11 @@ Integer updateCatalogMeta( + " SET deleted_at = UNIX_TIMESTAMP(CURRENT_TIMESTAMP(3)) * 1000.0" + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0") Integer softDeleteCatalogMetasByMetalakeId(@Param("metalakeId") Long metalakeId); + + @Delete( + "DELETE FROM " + + TABLE_NAME + + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}") + Integer deleteCatalogMetasByLegacyTimeLine( + @Param("legacyTimeLine") Long legacyTimeLine, @Param("limit") int limit); } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetMetaMapper.java index 885a8d89908..ee87daa43fd 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetMetaMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetMetaMapper.java @@ -7,6 +7,7 @@ import com.datastrato.gravitino.storage.relational.po.FilesetPO; import java.util.List; +import org.apache.ibatis.annotations.Delete; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Result; @@ -214,4 +215,11 @@ Integer updateFilesetMeta( + " SET deleted_at = UNIX_TIMESTAMP(CURRENT_TIMESTAMP(3)) * 1000.0" + " WHERE fileset_id = #{filesetId} AND deleted_at = 0") Integer softDeleteFilesetMetasByFilesetId(@Param("filesetId") Long filesetId); + + @Delete( + "DELETE FROM " + + META_TABLE_NAME + + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}") + Integer deleteFilesetMetasByLegacyTimeLine( + @Param("legacyTimeLine") Long legacyTimeLine, @Param("limit") int limit); } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java index af08416ea09..d5d9053c330 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java @@ -6,8 +6,12 @@ package com.datastrato.gravitino.storage.relational.mapper; import com.datastrato.gravitino.storage.relational.po.FilesetVersionPO; +import java.util.List; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.ibatis.annotations.Delete; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; import org.apache.ibatis.annotations.Update; /** @@ -97,4 +101,31 @@ void insertFilesetVersionOnDuplicateKeyUpdate( + " SET deleted_at = UNIX_TIMESTAMP(CURRENT_TIMESTAMP(3)) * 1000.0" + " WHERE fileset_id = #{filesetId} AND deleted_at = 0") Integer softDeleteFilesetVersionsByFilesetId(@Param("filesetId") Long filesetId); + + @Delete( + "DELETE FROM " + + VERSION_TABLE_NAME + + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}") + Integer deleteFilesetVersionsByLegacyTimeLine( + @Param("legacyTimeLine") Long legacyTimeLine, @Param("limit") int limit); + + @Select( + "SELECT fileset_id as filesetId," + + " Max(version) as version" + + " FROM " + + VERSION_TABLE_NAME + + " WHERE version > #{versionRetentionCount} AND deleted_at = 0" + + " GROUP BY fileset_id") + List> selectFilesetVersionsByRetentionCount( + @Param("versionRetentionCount") Long versionRetentionCount); + + @Update( + "UPDATE " + + VERSION_TABLE_NAME + + " SET deleted_at = UNIX_TIMESTAMP(CURRENT_TIMESTAMP(3)) * 1000.0" + + " WHERE fileset_id = #{filesetId} AND version <= #{versionRetentionLine} AND deleted_at = 0 LIMIT #{limit}") + Integer softDeleteFilesetVersionsByRetentionLine( + @Param("filesetId") Long filesetId, + @Param("versionRetentionLine") long versionRetentionLine, + @Param("limit") int limit); } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/MetalakeMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/MetalakeMetaMapper.java index d1cb2d1ea73..29b87d64b50 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/MetalakeMetaMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/MetalakeMetaMapper.java @@ -7,6 +7,7 @@ import com.datastrato.gravitino.storage.relational.po.MetalakePO; import java.util.List; +import org.apache.ibatis.annotations.Delete; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select; @@ -125,4 +126,11 @@ Integer updateMetalakeMeta( + " SET deleted_at = UNIX_TIMESTAMP(CURRENT_TIMESTAMP(3)) * 1000.0" + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0") Integer softDeleteMetalakeMetaByMetalakeId(@Param("metalakeId") Long metalakeId); + + @Delete( + "DELETE FROM " + + TABLE_NAME + + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}") + Integer deleteMetalakeMetasByLegacyTimeLine( + @Param("legacyTimeLine") Long legacyTimeLine, @Param("limit") int limit); } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/SchemaMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/SchemaMetaMapper.java index 7f36478cf0a..8a27a440ab4 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/SchemaMetaMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/SchemaMetaMapper.java @@ -7,6 +7,7 @@ import com.datastrato.gravitino.storage.relational.po.SchemaPO; import java.util.List; +import org.apache.ibatis.annotations.Delete; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select; @@ -149,4 +150,11 @@ Integer updateSchemaMeta( + " SET deleted_at = UNIX_TIMESTAMP(CURRENT_TIMESTAMP(3)) * 1000.0" + " WHERE catalog_id = #{catalogId} AND deleted_at = 0") Integer softDeleteSchemaMetasByCatalogId(@Param("catalogId") Long catalogId); + + @Delete( + "DELETE FROM " + + TABLE_NAME + + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}") + Integer deleteSchemaMetasByLegacyTimeLine( + @Param("legacyTimeLine") Long legacyTimeLine, @Param("limit") int limit); } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TableMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TableMetaMapper.java index 331847f8cdc..aeeaa1eac14 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TableMetaMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TableMetaMapper.java @@ -7,6 +7,7 @@ import com.datastrato.gravitino.storage.relational.po.TablePO; import java.util.List; +import org.apache.ibatis.annotations.Delete; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select; @@ -151,4 +152,11 @@ Integer updateTableMeta( + " SET deleted_at = UNIX_TIMESTAMP(CURRENT_TIMESTAMP(3)) * 1000.0" + " WHERE schema_id = #{schemaId} AND deleted_at = 0") Integer softDeleteTableMetasBySchemaId(@Param("schemaId") Long schemaId); + + @Delete( + "DELETE FROM " + + TABLE_NAME + + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}") + Integer deleteTableMetasByLegacyTimeLine( + @Param("legacyTimeLine") Long legacyTimeLine, @Param("limit") int limit); } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TopicMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TopicMetaMapper.java index 5db879f67e8..06f0d611db0 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TopicMetaMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TopicMetaMapper.java @@ -6,6 +6,7 @@ import com.datastrato.gravitino.storage.relational.po.TopicPO; import java.util.List; +import org.apache.ibatis.annotations.Delete; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select; @@ -152,4 +153,11 @@ Long selectTopicIdBySchemaIdAndName( + " SET deleted_at = UNIX_TIMESTAMP(CURRENT_TIMESTAMP(3)) * 1000.0" + " WHERE schema_id = #{schemaId} AND deleted_at = 0") Integer softDeleteTopicMetasBySchemaId(@Param("schemaId") Long schemaId); + + @Delete( + "DELETE FROM " + + TABLE_NAME + + " WHERE deleted_at != 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}") + Integer deleteTopicMetasByLegacyTimeLine( + @Param("legacyTimeLine") Long legacyTimeLine, @Param("limit") int limit); } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/CatalogMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/CatalogMetaService.java index a075dbf8d39..9264fac3464 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/CatalogMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/CatalogMetaService.java @@ -209,4 +209,12 @@ public boolean deleteCatalog(NameIdentifier identifier, boolean cascade) { return true; } + + public int deleteCatalogMetasByLegacyTimeLine(Long legacyTimeLine, int limit) { + return SessionUtils.doWithCommitAndFetchResult( + CatalogMetaMapper.class, + mapper -> { + return mapper.deleteCatalogMetasByLegacyTimeLine(legacyTimeLine, limit); + }); + } } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java index eb8b1924ac2..ad041aee418 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java @@ -21,6 +21,9 @@ import java.util.List; import java.util.Objects; import java.util.function.Function; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The service class for fileset metadata and version info. It provides the basic database @@ -29,6 +32,8 @@ public class FilesetMetaService { private static final FilesetMetaService INSTANCE = new FilesetMetaService(); + private static final Logger LOG = LoggerFactory.getLogger(FilesetMetaService.class); + public static FilesetMetaService getInstance() { return INSTANCE; } @@ -213,6 +218,54 @@ public boolean deleteFileset(NameIdentifier identifier) { return true; } + public int deleteFilesetAndVersionMetasByLegacyTimeLine(Long legacyTimeLine, int limit) { + int filesetDeletedCount = + SessionUtils.doWithCommitAndFetchResult( + FilesetMetaMapper.class, + mapper -> { + return mapper.deleteFilesetMetasByLegacyTimeLine(legacyTimeLine, limit); + }); + int filesetVersionDeletedCount = + SessionUtils.doWithCommitAndFetchResult( + FilesetVersionMapper.class, + mapper -> { + return mapper.deleteFilesetVersionsByLegacyTimeLine(legacyTimeLine, limit); + }); + return filesetDeletedCount + filesetVersionDeletedCount; + } + + public int deleteFilesetVersionsByRetentionCount(Long versionRetentionCount, int limit) { + // get the current version of all filesets. + List> filesetCurVersions = + SessionUtils.getWithoutCommit( + FilesetVersionMapper.class, + mapper -> mapper.selectFilesetVersionsByRetentionCount(versionRetentionCount)); + + // soft delete old versions that are older than or equal to (currentVersion - + // versionRetentionCount). + int totalDeletedCount = 0; + for (ImmutablePair filesetCurVersion : filesetCurVersions) { + long versionRetentionLine = filesetCurVersion.getValue() - versionRetentionCount; + int deletedCount = + SessionUtils.doWithCommitAndFetchResult( + FilesetVersionMapper.class, + mapper -> + mapper.softDeleteFilesetVersionsByRetentionLine( + filesetCurVersion.getKey(), versionRetentionLine, limit)); + totalDeletedCount += deletedCount; + + // log the deletion by current fileset version. + LOG.info( + "Soft delete filesetVersions count: {} which versions are older than or equal to" + + " versionRetentionLine: {}, the current filesetId and version is: <{}, {}>.", + deletedCount, + versionRetentionLine, + filesetCurVersion.getKey(), + filesetCurVersion.getValue()); + } + return totalDeletedCount; + } + private void fillFilesetPOBuilderParentEntityId(FilesetPO.Builder builder, Namespace namespace) { Namespace.checkFileset(namespace); Long parentEntityId = null; diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java index 7c76ab4ebb5..558c7568ea4 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java @@ -186,4 +186,12 @@ public boolean deleteMetalake(NameIdentifier ident, boolean cascade) { } return true; } + + public int deleteMetalakeMetasByLegacyTimeLine(Long legacyTimeLine, int limit) { + return SessionUtils.doWithCommitAndFetchResult( + MetalakeMetaMapper.class, + mapper -> { + return mapper.deleteMetalakeMetasByLegacyTimeLine(legacyTimeLine, limit); + }); + } } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/SchemaMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/SchemaMetaService.java index c1b8ba490d2..a5ae1063880 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/SchemaMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/SchemaMetaService.java @@ -215,6 +215,14 @@ public boolean deleteSchema(NameIdentifier identifier, boolean cascade) { return true; } + public int deleteSchemaMetasByLegacyTimeLine(Long legacyTimeLine, int limit) { + return SessionUtils.doWithCommitAndFetchResult( + SchemaMetaMapper.class, + mapper -> { + return mapper.deleteSchemaMetasByLegacyTimeLine(legacyTimeLine, limit); + }); + } + private void fillSchemaPOBuilderParentEntityId(SchemaPO.Builder builder, Namespace namespace) { Namespace.checkSchema(namespace); Long parentEntityId = null; diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TableMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TableMetaService.java index fc6a03db757..f71bbc41b10 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TableMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TableMetaService.java @@ -163,6 +163,14 @@ public boolean deleteTable(NameIdentifier identifier) { return true; } + public int deleteTableMetasByLegacyTimeLine(Long legacyTimeLine, int limit) { + return SessionUtils.doWithCommitAndFetchResult( + TableMetaMapper.class, + mapper -> { + return mapper.deleteTableMetasByLegacyTimeLine(legacyTimeLine, limit); + }); + } + private void fillTablePOBuilderParentEntityId(TablePO.Builder builder, Namespace namespace) { Namespace.checkTable(namespace); Long parentEntityId = null; diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TopicMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TopicMetaService.java index cc60e266f2d..06042a9659f 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TopicMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TopicMetaService.java @@ -176,6 +176,14 @@ public boolean deleteTopic(NameIdentifier identifier) { return true; } + public int deleteTopicMetasByLegacyTimeLine(Long legacyTimeLine, int limit) { + return SessionUtils.doWithCommitAndFetchResult( + TopicMetaMapper.class, + mapper -> { + return mapper.deleteTopicMetasByLegacyTimeLine(legacyTimeLine, limit); + }); + } + private Long getTopicIdBySchemaIdAndName(Long schemaId, String topicName) { Long topicId = SessionUtils.getWithoutCommit( diff --git a/core/src/test/java/com/datastrato/gravitino/storage/TestEntityStorage.java b/core/src/test/java/com/datastrato/gravitino/storage/TestEntityStorage.java index 3bd7cf8a70d..239c7513530 100644 --- a/core/src/test/java/com/datastrato/gravitino/storage/TestEntityStorage.java +++ b/core/src/test/java/com/datastrato/gravitino/storage/TestEntityStorage.java @@ -18,6 +18,7 @@ import static com.datastrato.gravitino.Configs.RELATIONAL_ENTITY_STORE; import static com.datastrato.gravitino.Configs.STORE_DELETE_AFTER_TIME; import static com.datastrato.gravitino.Configs.STORE_TRANSACTION_MAX_SKEW_TIME; +import static com.datastrato.gravitino.Configs.VERSION_RETENTION_COUNT; import com.datastrato.gravitino.Catalog; import com.datastrato.gravitino.Config; @@ -101,6 +102,7 @@ private void init(String type, Config config) { Assertions.assertEquals(KV_STORE_PATH, config.get(ENTRY_KV_ROCKSDB_BACKEND_PATH)); Mockito.when(config.get(STORE_TRANSACTION_MAX_SKEW_TIME)).thenReturn(1000L); Mockito.when(config.get(STORE_DELETE_AFTER_TIME)).thenReturn(20 * 60 * 1000L); + Mockito.when(config.get(VERSION_RETENTION_COUNT)).thenReturn(1L); } else if (type.equals(Configs.RELATIONAL_ENTITY_STORE)) { File dir = new File(DB_DIR); if (dir.exists() || !dir.isDirectory()) { @@ -114,6 +116,8 @@ private void init(String type, Config config) { Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_USER)).thenReturn("root"); Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_PASSWORD)).thenReturn("123"); Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER)).thenReturn("org.h2.Driver"); + Mockito.when(config.get(STORE_DELETE_AFTER_TIME)).thenReturn(20 * 60 * 1000L); + Mockito.when(config.get(VERSION_RETENTION_COUNT)).thenReturn(1L); } else { throw new UnsupportedOperationException("Unsupported entity store type: " + type); } diff --git a/core/src/test/java/com/datastrato/gravitino/storage/relational/TestJDBCBackend.java b/core/src/test/java/com/datastrato/gravitino/storage/relational/TestJDBCBackend.java index 965e6043678..b8d7589fe73 100644 --- a/core/src/test/java/com/datastrato/gravitino/storage/relational/TestJDBCBackend.java +++ b/core/src/test/java/com/datastrato/gravitino/storage/relational/TestJDBCBackend.java @@ -12,7 +12,10 @@ import static com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_STORE; import static com.datastrato.gravitino.Configs.ENTITY_STORE; import static com.datastrato.gravitino.Configs.RELATIONAL_ENTITY_STORE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.datastrato.gravitino.Catalog; import com.datastrato.gravitino.Config; @@ -41,7 +44,9 @@ import java.sql.Statement; import java.time.Instant; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.UUID; import org.apache.commons.io.IOUtils; @@ -394,6 +399,260 @@ public void testUpdateAlreadyExistsException() { e -> createTopicEntity(topicCopy.id(), topicCopy.namespace(), "topic", auditInfo))); } + @Test + public void testMetaLifeCycleFromCreationToDeletion() throws IOException { + AuditInfo auditInfo = + AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build(); + + // meta data creation + BaseMetalake metalake = + createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), "metalake", auditInfo); + backend.insert(metalake, false); + + CatalogEntity catalog = + createCatalog( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofCatalog("metalake"), + "catalog", + auditInfo); + backend.insert(catalog, false); + + SchemaEntity schema = + createSchemaEntity( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofSchema("metalake", "catalog"), + "schema", + auditInfo); + backend.insert(schema, false); + + TableEntity table = + createTableEntity( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofTable("metalake", "catalog", "schema"), + "table", + auditInfo); + backend.insert(table, false); + + FilesetEntity fileset = + createFilesetEntity( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofFileset("metalake", "catalog", "schema"), + "fileset", + auditInfo); + backend.insert(fileset, false); + + TopicEntity topic = + createTopicEntity( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofFileset("metalake", "catalog", "schema"), + "topic", + auditInfo); + backend.insert(topic, false); + + // update fileset properties and version + FilesetEntity filesetV2 = + createFilesetEntity( + fileset.id(), + Namespace.ofFileset("metalake", "catalog", "schema"), + "fileset", + auditInfo); + filesetV2.properties().put("version", "2"); + backend.update(fileset.nameIdentifier(), Entity.EntityType.FILESET, e -> filesetV2); + + // another meta data creation + BaseMetalake anotherMetaLake = + createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), "another-metalake", auditInfo); + backend.insert(anotherMetaLake, false); + + CatalogEntity anotherCatalog = + createCatalog( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofCatalog("another-metalake"), + "another-catalog", + auditInfo); + backend.insert(anotherCatalog, false); + + SchemaEntity anotherSchema = + createSchemaEntity( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofSchema("another-metalake", "another-catalog"), + "another-schema", + auditInfo); + backend.insert(anotherSchema, false); + + FilesetEntity anotherFileset = + createFilesetEntity( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofFileset("another-metalake", "another-catalog", "another-schema"), + "anotherFileset", + auditInfo); + backend.insert(anotherFileset, false); + + FilesetEntity anotherFilesetV2 = + createFilesetEntity( + anotherFileset.id(), + Namespace.ofFileset("another-metalake", "another-catalog", "another-schema"), + "anotherFileset", + auditInfo); + anotherFilesetV2.properties().put("version", "2"); + backend.update( + anotherFileset.nameIdentifier(), Entity.EntityType.FILESET, e -> anotherFilesetV2); + + FilesetEntity anotherFilesetV3 = + createFilesetEntity( + anotherFileset.id(), + Namespace.ofFileset("another-metalake", "another-catalog", "another-schema"), + "anotherFileset", + auditInfo); + anotherFilesetV3.properties().put("version", "3"); + backend.update( + anotherFileset.nameIdentifier(), Entity.EntityType.FILESET, e -> anotherFilesetV3); + + // meta data list + List metaLakes = backend.list(metalake.namespace(), Entity.EntityType.METALAKE); + assertTrue(metaLakes.contains(metalake)); + + List catalogs = backend.list(catalog.namespace(), Entity.EntityType.CATALOG); + assertTrue(catalogs.contains(catalog)); + + List schemas = backend.list(schema.namespace(), Entity.EntityType.SCHEMA); + assertTrue(schemas.contains(schema)); + + List tables = backend.list(table.namespace(), Entity.EntityType.TABLE); + assertTrue(tables.contains(table)); + + List filesets = backend.list(fileset.namespace(), Entity.EntityType.FILESET); + assertFalse(filesets.contains(fileset)); + assertTrue(filesets.contains(filesetV2)); + assertEquals("2", filesets.get(filesets.indexOf(filesetV2)).properties().get("version")); + + List topics = backend.list(topic.namespace(), Entity.EntityType.TOPIC); + assertTrue(topics.contains(topic)); + + // meta data soft delete + backend.delete(metalake.nameIdentifier(), Entity.EntityType.METALAKE, true); + + // check existence after soft delete + assertFalse(backend.exists(metalake.nameIdentifier(), Entity.EntityType.METALAKE)); + assertTrue(backend.exists(anotherMetaLake.nameIdentifier(), Entity.EntityType.METALAKE)); + + assertFalse(backend.exists(catalog.nameIdentifier(), Entity.EntityType.CATALOG)); + assertTrue(backend.exists(anotherCatalog.nameIdentifier(), Entity.EntityType.CATALOG)); + + assertFalse(backend.exists(schema.nameIdentifier(), Entity.EntityType.SCHEMA)); + assertTrue(backend.exists(anotherSchema.nameIdentifier(), Entity.EntityType.SCHEMA)); + + assertFalse(backend.exists(fileset.nameIdentifier(), Entity.EntityType.FILESET)); + assertTrue(backend.exists(anotherFileset.nameIdentifier(), Entity.EntityType.FILESET)); + + assertFalse(backend.exists(table.nameIdentifier(), Entity.EntityType.TABLE)); + assertFalse(backend.exists(topic.nameIdentifier(), Entity.EntityType.TOPIC)); + + // check legacy record after soft delete + assertTrue(legacyRecordExistsInDB(metalake.id(), Entity.EntityType.METALAKE)); + assertTrue(legacyRecordExistsInDB(catalog.id(), Entity.EntityType.CATALOG)); + assertTrue(legacyRecordExistsInDB(schema.id(), Entity.EntityType.SCHEMA)); + assertTrue(legacyRecordExistsInDB(table.id(), Entity.EntityType.TABLE)); + assertTrue(legacyRecordExistsInDB(topic.id(), Entity.EntityType.TOPIC)); + assertTrue(legacyRecordExistsInDB(fileset.id(), Entity.EntityType.FILESET)); + assertEquals(2, listFilesetVersions(fileset.id()).size()); + assertEquals(3, listFilesetVersions(anotherFileset.id()).size()); + + // meta data hard delete + for (Entity.EntityType entityType : Entity.EntityType.values()) { + backend.hardDeleteLegacyData(entityType, Instant.now().toEpochMilli() + 1000); + } + assertFalse(legacyRecordExistsInDB(metalake.id(), Entity.EntityType.METALAKE)); + assertFalse(legacyRecordExistsInDB(catalog.id(), Entity.EntityType.CATALOG)); + assertFalse(legacyRecordExistsInDB(schema.id(), Entity.EntityType.SCHEMA)); + assertFalse(legacyRecordExistsInDB(table.id(), Entity.EntityType.TABLE)); + assertFalse(legacyRecordExistsInDB(fileset.id(), Entity.EntityType.FILESET)); + assertFalse(legacyRecordExistsInDB(topic.id(), Entity.EntityType.TOPIC)); + assertEquals(0, listFilesetVersions(fileset.id()).size()); + + // soft delete for old version fileset + assertEquals(3, listFilesetVersions(anotherFileset.id()).size()); + for (Entity.EntityType entityType : Entity.EntityType.values()) { + backend.deleteOldVersionData(entityType, 1); + } + Map versionDeletedMap = listFilesetVersions(anotherFileset.id()); + assertEquals(3, versionDeletedMap.size()); + assertEquals(1, versionDeletedMap.values().stream().filter(value -> value == 0L).count()); + assertEquals(2, versionDeletedMap.values().stream().filter(value -> value != 0L).count()); + + // hard delete for old version fileset + backend.hardDeleteLegacyData(Entity.EntityType.FILESET, Instant.now().toEpochMilli() + 1000); + assertEquals(1, listFilesetVersions(anotherFileset.id()).size()); + } + + private boolean legacyRecordExistsInDB(Long id, Entity.EntityType entityType) { + String tableName; + String idColumnName; + + switch (entityType) { + case METALAKE: + tableName = "metalake_meta"; + idColumnName = "metalake_id"; + break; + case CATALOG: + tableName = "catalog_meta"; + idColumnName = "catalog_id"; + break; + case SCHEMA: + tableName = "schema_meta"; + idColumnName = "schema_id"; + break; + case TABLE: + tableName = "table_meta"; + idColumnName = "table_id"; + break; + case FILESET: + tableName = "fileset_meta"; + idColumnName = "fileset_id"; + break; + case TOPIC: + tableName = "topic_meta"; + idColumnName = "topic_id"; + break; + default: + throw new IllegalArgumentException("Unsupported entity type: " + entityType); + } + + try (SqlSession sqlSession = + SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true); + Connection connection = sqlSession.getConnection(); + Statement statement = connection.createStatement(); + ResultSet rs = + statement.executeQuery( + String.format( + "SELECT * FROM %s WHERE %s = %d AND deleted_at != 0", + tableName, idColumnName, id))) { + return rs.next(); + } catch (SQLException e) { + throw new RuntimeException("SQL execution failed", e); + } + } + + private Map listFilesetVersions(Long filesetId) { + Map versionDeletedTime = new HashMap<>(); + try (SqlSession sqlSession = + SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true); + Connection connection = sqlSession.getConnection(); + Statement statement = connection.createStatement(); + ResultSet rs = + statement.executeQuery( + String.format( + "SELECT version, deleted_at FROM fileset_version_info WHERE fileset_id = %d", + filesetId))) { + while (rs.next()) { + versionDeletedTime.put(rs.getInt("version"), rs.getLong("deleted_at")); + } + } catch (SQLException e) { + throw new RuntimeException("SQL execution failed", e); + } + return versionDeletedTime; + } + public static BaseMetalake createBaseMakeLake(Long id, String name, AuditInfo auditInfo) { return BaseMetalake.builder() .withId(id) @@ -450,7 +709,7 @@ public static FilesetEntity createFilesetEntity( .withFilesetType(Fileset.Type.MANAGED) .withStorageLocation("/tmp") .withComment("") - .withProperties(null) + .withProperties(new HashMap<>()) .withAuditInfo(auditInfo) .build(); }