From f78a236686bf6407c45fffc040a903d11422cac8 Mon Sep 17 00:00:00 2001 From: YxAc Date: Fri, 5 Apr 2024 17:26:43 +0800 Subject: [PATCH 01/14] fix conflict --- .../gravitino/storage/LogHelper.java | 42 +++++++ .../storage/kv/KvGarbageCollector.java | 23 +--- .../relational/RelationalEntityStore.java | 4 + .../RelationalGarbageCollector.java | 118 ++++++++++++++++++ .../storage/kv/TestKvGarbageCollector.java | 2 +- 5 files changed, 166 insertions(+), 23 deletions(-) create mode 100644 core/src/main/java/com/datastrato/gravitino/storage/LogHelper.java create mode 100644 core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java diff --git a/core/src/main/java/com/datastrato/gravitino/storage/LogHelper.java b/core/src/main/java/com/datastrato/gravitino/storage/LogHelper.java new file mode 100644 index 00000000000..186e07e4949 --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/LogHelper.java @@ -0,0 +1,42 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.storage; + +import com.datastrato.gravitino.Entity; +import com.datastrato.gravitino.NameIdentifier; +import com.google.common.annotations.VisibleForTesting; + +/** + * LogHelper is a utility class that provides logging-related functionality for both KV and Relational or other backends in the storage. + * It holds information about an entity such as its identifier, type, and creation time. + * The class also provides a NONE instance which is an implementation of the Null Object pattern. + * This instance can be used to avoid null checks and NullPointerExceptions. + */ +public class LogHelper { + + @VisibleForTesting + public final NameIdentifier identifier; + @VisibleForTesting + public final Entity.EntityType type; + @VisibleForTesting + public final long createTimeInMs; + // Formatted createTime + @VisibleForTesting + public final String createTimeAsString; + + public static final LogHelper NONE = new LogHelper(null, null, 0L, null); + + public LogHelper( + NameIdentifier identifier, + Entity.EntityType type, + long createTimeInMs, + String createTimeAsString) { + this.identifier = identifier; + this.type = type; + this.createTimeInMs = createTimeInMs; + this.createTimeAsString = createTimeAsString; + } +} \ No newline at end of file diff --git a/core/src/main/java/com/datastrato/gravitino/storage/kv/KvGarbageCollector.java b/core/src/main/java/com/datastrato/gravitino/storage/kv/KvGarbageCollector.java index 309323f490c..6d22d0d267f 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/kv/KvGarbageCollector.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/kv/KvGarbageCollector.java @@ -17,6 +17,7 @@ import com.datastrato.gravitino.Entity.EntityType; import com.datastrato.gravitino.NameIdentifier; import com.datastrato.gravitino.storage.EntityKeyEncoder; +import com.datastrato.gravitino.storage.LogHelper; import com.datastrato.gravitino.utils.Bytes; import com.google.common.annotations.VisibleForTesting; import java.io.Closeable; @@ -323,28 +324,6 @@ private void removeAllVersionsOfKey(byte[] rawKey, byte[] key, boolean includeSt } } - static class LogHelper { - - @VisibleForTesting final NameIdentifier identifier; - @VisibleForTesting final EntityType type; - @VisibleForTesting final long createTimeInMs; - // Formatted createTime - @VisibleForTesting final String createTimeAsString; - - public static final LogHelper NONE = new LogHelper(null, null, 0L, null); - - public LogHelper( - NameIdentifier identifier, - EntityType type, - long createTimeInMs, - String createTimeAsString) { - this.identifier = identifier; - this.type = type; - this.createTimeInMs = createTimeInMs; - this.createTimeAsString = createTimeAsString; - } - } - @VisibleForTesting LogHelper decodeKey(byte[] key, byte[] timestampArray) { if (entityKeyEncoder == null) { diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalEntityStore.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalEntityStore.java index 8427e7484af..8e6a3d2b661 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalEntityStore.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalEntityStore.java @@ -36,10 +36,13 @@ public class RelationalEntityStore implements EntityStore { ImmutableMap.of( Configs.DEFAULT_ENTITY_RELATIONAL_STORE, JDBCBackend.class.getCanonicalName()); private RelationalBackend backend; + private RelationalGarbageCollector garbageCollector; @Override public void initialize(Config config) throws RuntimeException { this.backend = createRelationalEntityBackend(config); + this.garbageCollector = new RelationalGarbageCollector(backend, config); + this.garbageCollector.start(); } private static RelationalBackend createRelationalEntityBackend(Config config) { @@ -109,6 +112,7 @@ public R executeInTransaction(Executable executab @Override public void close() throws IOException { + garbageCollector.close(); backend.close(); } } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java new file mode 100644 index 00000000000..0e0920d2f20 --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java @@ -0,0 +1,118 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.storage.relational; + +import com.datastrato.gravitino.Config; +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.*; + +import static com.datastrato.gravitino.Configs.STORE_DELETE_AFTER_TIME; + +public final class RelationalGarbageCollector implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(RelationalGarbageCollector.class); + private final RelationalBackend backend; + + private final Config config; + + @VisibleForTesting + final ScheduledExecutorService garbageCollectorPool = + new ScheduledThreadPoolExecutor( + 2, + r -> { + Thread t = new Thread(r, "RelationalBackend-Garbage-Collector"); + t.setDaemon(true); + return t; + }, + new ThreadPoolExecutor.AbortPolicy()); + + public RelationalGarbageCollector(RelationalBackend backend, Config config) { + this.backend = backend; + this.config = config; + } + + public void start() { + long dateTimeLineMinute = config.get(STORE_DELETE_AFTER_TIME) / 1000 / 60; + + // We will collect garbage every 10 minutes at least. If the dateTimeLineMinute is larger than + // 100 minutes, we would collect garbage every dateTimeLineMinute/10 minutes. + long frequency = Math.max(dateTimeLineMinute / 10, 10); + garbageCollectorPool.scheduleAtFixedRate(this::collectAndClean, 5, frequency, TimeUnit.MINUTES); + } + + private void collectAndClean() { + long threadId = Thread.currentThread().getId(); + LOG.info("Thread {} start to collect garbage...", threadId); + try { + LOG.info("Start to collect and delete old version data by thread {}", threadId); + collectAndRemoveOldVersionData(); + } catch (Exception e) { + LOG.error("Thread {} failed to collect and clean garbage.", threadId, e); + } finally { + LOG.info("Thread {} finish to collect garbage.", threadId); + } + } + + private void collectAndRemoveOldVersionData() throws SQLException { + long deleteTimeLine = System.currentTimeMillis() - STORE_DELETE_AFTER_TIME; + + List tables = Arrays.asList("metalake", "catalog", "schema", "table", "fileset"); + + for (String table : tables) { + String sql = "SELECT * FROM " + table + " WHERE deleted_at != 0 AND deleted_at < ?"; + try (PreparedStatement stmt = connection.prepareStatement(sql)) { + stmt.setLong(1, deleteTimeLine); + try (ResultSet rs = stmt.executeQuery()) { + while (rs.next()) { + String deleteSql = "DELETE FROM " + table + " WHERE id = ?"; + try (PreparedStatement deleteStmt = connection.prepareStatement(deleteSql)) { + deleteStmt.setLong(1, rs.getLong("id")); + deleteStmt.executeUpdate(); + } + + String checkSql = "SELECT * FROM " + table + " WHERE id > ? LIMIT 1"; + try (PreparedStatement checkStmt = connection.prepareStatement(checkSql)) { + checkStmt.setLong(1, rs.getLong("id")); + try (ResultSet checkRs = checkStmt.executeQuery()) { + if (checkRs.next()) { + String deleteOldVersionSql = "DELETE FROM " + table + " WHERE id = ?"; + try (PreparedStatement deleteOldVersionStmt = connection.prepareStatement(deleteOldVersionSql)) { + deleteOldVersionStmt.setLong(1, rs.getLong("id")); + deleteOldVersionStmt.executeUpdate(); + } + } + } + } + } + } + } + } + } + + + @Override + public void close() throws IOException { + this.garbageCollectorPool.shutdown(); + try { + if (!this.garbageCollectorPool.awaitTermination(5, TimeUnit.SECONDS)) { + this.garbageCollectorPool.shutdownNow(); + } + } catch (InterruptedException ex) { + this.garbageCollectorPool.shutdownNow(); + Thread.currentThread().interrupt(); + } + } +} \ No newline at end of file diff --git a/core/src/test/java/com/datastrato/gravitino/storage/kv/TestKvGarbageCollector.java b/core/src/test/java/com/datastrato/gravitino/storage/kv/TestKvGarbageCollector.java index 2ac7468906a..b9c598b46e0 100644 --- a/core/src/test/java/com/datastrato/gravitino/storage/kv/TestKvGarbageCollector.java +++ b/core/src/test/java/com/datastrato/gravitino/storage/kv/TestKvGarbageCollector.java @@ -35,8 +35,8 @@ import com.datastrato.gravitino.meta.SchemaEntity; import com.datastrato.gravitino.meta.TableEntity; import com.datastrato.gravitino.meta.UserEntity; +import com.datastrato.gravitino.storage.LogHelper; import com.datastrato.gravitino.storage.TransactionIdGenerator; -import com.datastrato.gravitino.storage.kv.KvGarbageCollector.LogHelper; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; From 81786c2c0903a2f22d74acd49f7a91ef87d8381c Mon Sep 17 00:00:00 2001 From: YxAc Date: Tue, 26 Mar 2024 22:29:36 +0800 Subject: [PATCH 02/14] define AllTables --- .../gravitino/storage/LogHelper.java | 23 +++++------ .../storage/relational/AllTables.java | 38 +++++++++++++++++++ .../RelationalGarbageCollector.java | 36 ++++++++++-------- .../relational/mapper/CatalogMetaMapper.java | 3 +- .../relational/mapper/FilesetMetaMapper.java | 5 ++- .../mapper/FilesetVersionMapper.java | 3 +- .../relational/mapper/MetalakeMetaMapper.java | 3 +- .../relational/mapper/SchemaMetaMapper.java | 3 +- .../relational/mapper/TableMetaMapper.java | 3 +- 9 files changed, 81 insertions(+), 36 deletions(-) create mode 100644 core/src/main/java/com/datastrato/gravitino/storage/relational/AllTables.java diff --git a/core/src/main/java/com/datastrato/gravitino/storage/LogHelper.java b/core/src/main/java/com/datastrato/gravitino/storage/LogHelper.java index 186e07e4949..bfd1c8ca316 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/LogHelper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/LogHelper.java @@ -10,22 +10,19 @@ import com.google.common.annotations.VisibleForTesting; /** - * LogHelper is a utility class that provides logging-related functionality for both KV and Relational or other backends in the storage. - * It holds information about an entity such as its identifier, type, and creation time. - * The class also provides a NONE instance which is an implementation of the Null Object pattern. - * This instance can be used to avoid null checks and NullPointerExceptions. + * LogHelper is a utility class that provides logging-related functionality for both KV and + * Relational or other backends in the storage. It holds information about an entity such as its + * identifier, type, and creation time. The class also provides a NONE instance which is an + * implementation of the Null Object pattern. This instance can be used to avoid null checks and + * NullPointerExceptions. */ public class LogHelper { - @VisibleForTesting - public final NameIdentifier identifier; - @VisibleForTesting - public final Entity.EntityType type; - @VisibleForTesting - public final long createTimeInMs; + @VisibleForTesting public final NameIdentifier identifier; + @VisibleForTesting public final Entity.EntityType type; + @VisibleForTesting public final long createTimeInMs; // Formatted createTime - @VisibleForTesting - public final String createTimeAsString; + @VisibleForTesting public final String createTimeAsString; public static final LogHelper NONE = new LogHelper(null, null, 0L, null); @@ -39,4 +36,4 @@ public LogHelper( this.createTimeInMs = createTimeInMs; this.createTimeAsString = createTimeAsString; } -} \ No newline at end of file +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/AllTables.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/AllTables.java new file mode 100644 index 00000000000..f8c05da1d38 --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/AllTables.java @@ -0,0 +1,38 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.storage.relational; + +public class AllTables { + public static final String METALAKE_TABLE_NAME = "metalake_meta"; + public static final String CATALOG_TABLE_NAME = "catalog_meta"; + public static final String SCHEMA_TABLE_NAME = "schema_meta"; + public static final String TABLE_TABLE_NAME = "table_meta"; + public static final String FILESET_TABLE_NAME = "fileset_meta"; + public static final String FILESET_VERSION_TABLE_NAME = "fileset_version_info"; + + private AllTables() { + // Prevent instantiation. + } + + public static enum TABLE_NAMES { + METALAKE_TABLE_NAME(AllTables.METALAKE_TABLE_NAME), + CATALOG_TABLE_NAME(AllTables.CATALOG_TABLE_NAME), + SCHEMA_TABLE_NAME(AllTables.SCHEMA_TABLE_NAME), + TABLE_TABLE_NAME(AllTables.TABLE_TABLE_NAME), + FILESET_TABLE_NAME(AllTables.FILESET_TABLE_NAME), + FILESET_VERSION_TABLE_NAME(AllTables.FILESET_VERSION_TABLE_NAME); + + private final String tableName; + + TABLE_NAMES(String tableName) { + this.tableName = tableName; + } + + public String getTableName() { + return this.tableName; + } + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java index 0e0920d2f20..b455f5d8872 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java @@ -5,21 +5,23 @@ package com.datastrato.gravitino.storage.relational; +import static com.datastrato.gravitino.Configs.KV_DELETE_AFTER_TIME; + import com.datastrato.gravitino.Config; import com.google.common.annotations.VisibleForTesting; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.Closeable; import java.io.IOException; +import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.*; - -import static com.datastrato.gravitino.Configs.STORE_DELETE_AFTER_TIME; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.commons.dbcp2.DelegatingConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public final class RelationalGarbageCollector implements Closeable { @@ -45,7 +47,7 @@ public RelationalGarbageCollector(RelationalBackend backend, Config config) { } public void start() { - long dateTimeLineMinute = config.get(STORE_DELETE_AFTER_TIME) / 1000 / 60; + long dateTimeLineMinute = config.get(KV_DELETE_AFTER_TIME) / 1000 / 60; // We will collect garbage every 10 minutes at least. If the dateTimeLineMinute is larger than // 100 minutes, we would collect garbage every dateTimeLineMinute/10 minutes. @@ -67,12 +69,14 @@ private void collectAndClean() { } private void collectAndRemoveOldVersionData() throws SQLException { - long deleteTimeLine = System.currentTimeMillis() - STORE_DELETE_AFTER_TIME; + long deleteTimeLine = System.currentTimeMillis() - config.get(KV_DELETE_AFTER_TIME); - List tables = Arrays.asList("metalake", "catalog", "schema", "table", "fileset"); + for (AllTables.TABLE_NAMES table : AllTables.TABLE_NAMES.values()) { + System.out.println(table.getTableName()); - for (String table : tables) { + /* String sql = "SELECT * FROM " + table + " WHERE deleted_at != 0 AND deleted_at < ?"; + DelegatingConnection connection = null; try (PreparedStatement stmt = connection.prepareStatement(sql)) { stmt.setLong(1, deleteTimeLine); try (ResultSet rs = stmt.executeQuery()) { @@ -89,7 +93,8 @@ private void collectAndRemoveOldVersionData() throws SQLException { try (ResultSet checkRs = checkStmt.executeQuery()) { if (checkRs.next()) { String deleteOldVersionSql = "DELETE FROM " + table + " WHERE id = ?"; - try (PreparedStatement deleteOldVersionStmt = connection.prepareStatement(deleteOldVersionSql)) { + try (PreparedStatement deleteOldVersionStmt = + connection.prepareStatement(deleteOldVersionSql)) { deleteOldVersionStmt.setLong(1, rs.getLong("id")); deleteOldVersionStmt.executeUpdate(); } @@ -98,11 +103,10 @@ private void collectAndRemoveOldVersionData() throws SQLException { } } } - } + } */ } } - @Override public void close() throws IOException { this.garbageCollectorPool.shutdown(); @@ -115,4 +119,4 @@ public void close() throws IOException { Thread.currentThread().interrupt(); } } -} \ No newline at end of file +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/CatalogMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/CatalogMetaMapper.java index 038b4256f0b..f3f60376927 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/CatalogMetaMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/CatalogMetaMapper.java @@ -5,6 +5,7 @@ package com.datastrato.gravitino.storage.relational.mapper; +import com.datastrato.gravitino.storage.relational.AllTables; import com.datastrato.gravitino.storage.relational.po.CatalogPO; import java.util.List; import org.apache.ibatis.annotations.Insert; @@ -21,7 +22,7 @@ * href="https://mybatis.org/mybatis-3/getting-started.html"> */ public interface CatalogMetaMapper { - String TABLE_NAME = "catalog_meta"; + String TABLE_NAME = AllTables.CATALOG_TABLE_NAME; @Select( "SELECT catalog_id as catalogId, catalog_name as catalogName," diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetMetaMapper.java index 885a8d89908..5532f388906 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetMetaMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetMetaMapper.java @@ -5,6 +5,7 @@ package com.datastrato.gravitino.storage.relational.mapper; +import com.datastrato.gravitino.storage.relational.AllTables; import com.datastrato.gravitino.storage.relational.po.FilesetPO; import java.util.List; import org.apache.ibatis.annotations.Insert; @@ -23,9 +24,9 @@ * href="https://mybatis.org/mybatis-3/getting-started.html"> */ public interface FilesetMetaMapper { - String META_TABLE_NAME = "fileset_meta"; + String META_TABLE_NAME = AllTables.FILESET_TABLE_NAME; - String VERSION_TABLE_NAME = "fileset_version_info"; + String VERSION_TABLE_NAME = AllTables.FILESET_VERSION_TABLE_NAME; @Select( "SELECT fm.fileset_id, fm.fileset_name, fm.metalake_id, fm.catalog_id, fm.schema_id," diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java index af08416ea09..e28ffc952c8 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java @@ -5,6 +5,7 @@ package com.datastrato.gravitino.storage.relational.mapper; +import com.datastrato.gravitino.storage.relational.AllTables; import com.datastrato.gravitino.storage.relational.po.FilesetVersionPO; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Param; @@ -19,7 +20,7 @@ * href="https://mybatis.org/mybatis-3/getting-started.html"> */ public interface FilesetVersionMapper { - String VERSION_TABLE_NAME = "fileset_version_info"; + String VERSION_TABLE_NAME = AllTables.FILESET_VERSION_TABLE_NAME; @Insert( "INSERT INTO " diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/MetalakeMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/MetalakeMetaMapper.java index d1cb2d1ea73..a5d4f7f4dc1 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/MetalakeMetaMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/MetalakeMetaMapper.java @@ -5,6 +5,7 @@ package com.datastrato.gravitino.storage.relational.mapper; +import com.datastrato.gravitino.storage.relational.AllTables; import com.datastrato.gravitino.storage.relational.po.MetalakePO; import java.util.List; import org.apache.ibatis.annotations.Insert; @@ -21,7 +22,7 @@ * href="https://mybatis.org/mybatis-3/getting-started.html"> */ public interface MetalakeMetaMapper { - String TABLE_NAME = "metalake_meta"; + String TABLE_NAME = AllTables.METALAKE_TABLE_NAME; @Select( "SELECT metalake_id as metalakeId, metalake_name as metalakeName," diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/SchemaMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/SchemaMetaMapper.java index 7f36478cf0a..9883f8db238 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/SchemaMetaMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/SchemaMetaMapper.java @@ -5,6 +5,7 @@ package com.datastrato.gravitino.storage.relational.mapper; +import com.datastrato.gravitino.storage.relational.AllTables; import com.datastrato.gravitino.storage.relational.po.SchemaPO; import java.util.List; import org.apache.ibatis.annotations.Insert; @@ -21,7 +22,7 @@ * href="https://mybatis.org/mybatis-3/getting-started.html"> */ public interface SchemaMetaMapper { - String TABLE_NAME = "schema_meta"; + String TABLE_NAME = AllTables.SCHEMA_TABLE_NAME; @Select( "SELECT schema_id as schemaId, schema_name as schemaName," diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TableMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TableMetaMapper.java index 331847f8cdc..a900498fda4 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TableMetaMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TableMetaMapper.java @@ -5,6 +5,7 @@ package com.datastrato.gravitino.storage.relational.mapper; +import com.datastrato.gravitino.storage.relational.AllTables; import com.datastrato.gravitino.storage.relational.po.TablePO; import java.util.List; import org.apache.ibatis.annotations.Insert; @@ -21,7 +22,7 @@ * href="https://mybatis.org/mybatis-3/getting-started.html"> */ public interface TableMetaMapper { - String TABLE_NAME = "table_meta"; + String TABLE_NAME = AllTables.TABLE_TABLE_NAME; @Select( "SELECT table_id as tableId, table_name as tableName," From 8582fd4a71a6353bfb2fa83dad44ee7690d0bcac Mon Sep 17 00:00:00 2001 From: YxAc Date: Fri, 29 Mar 2024 09:54:02 +0800 Subject: [PATCH 03/14] update --- .../RelationalGarbageCollector.java | 80 ++++++++++--------- .../relational/mapper/CatalogMetaMapper.java | 8 ++ .../relational/mapper/FilesetMetaMapper.java | 8 ++ .../mapper/FilesetVersionMapper.java | 8 ++ .../relational/mapper/MetalakeMetaMapper.java | 8 ++ .../relational/mapper/SchemaMetaMapper.java | 8 ++ .../relational/mapper/TableMetaMapper.java | 8 ++ .../service/CatalogMetaService.java | 8 ++ .../service/FilesetMetaService.java | 13 +++ .../service/MetalakeMetaService.java | 8 ++ .../relational/service/SchemaMetaService.java | 8 ++ .../relational/service/TableMetaService.java | 8 ++ 12 files changed, 134 insertions(+), 39 deletions(-) diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java index b455f5d8872..1c8679d15d1 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java @@ -8,18 +8,19 @@ import static com.datastrato.gravitino.Configs.KV_DELETE_AFTER_TIME; import com.datastrato.gravitino.Config; +import com.datastrato.gravitino.storage.relational.service.CatalogMetaService; +import com.datastrato.gravitino.storage.relational.service.FilesetMetaService; +import com.datastrato.gravitino.storage.relational.service.MetalakeMetaService; +import com.datastrato.gravitino.storage.relational.service.SchemaMetaService; +import com.datastrato.gravitino.storage.relational.service.TableMetaService; import com.google.common.annotations.VisibleForTesting; import java.io.Closeable; import java.io.IOException; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; import java.sql.SQLException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.commons.dbcp2.DelegatingConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,6 +60,8 @@ private void collectAndClean() { long threadId = Thread.currentThread().getId(); LOG.info("Thread {} start to collect garbage...", threadId); try { + LOG.info("Start to collect and delete legacy data by thread {}", threadId); + collectAndRemoveLegacyData(); LOG.info("Start to collect and delete old version data by thread {}", threadId); collectAndRemoveOldVersionData(); } catch (Exception e) { @@ -68,43 +71,42 @@ private void collectAndClean() { } } - private void collectAndRemoveOldVersionData() throws SQLException { + private void collectAndRemoveLegacyData() throws SQLException { + long legacyTimeLine = System.currentTimeMillis() - config.get(KV_DELETE_AFTER_TIME); + // TODO: put limit to configuration + int limit = 20; + + for (AllTables.TABLE_NAMES tableName : AllTables.TABLE_NAMES.values()) { + switch (tableName) { + case METALAKE_TABLE_NAME: + MetalakeMetaService.getInstance() + .deleteMetalakeMetasByLegacyTimeLine(legacyTimeLine, limit); + case CATALOG_TABLE_NAME: + CatalogMetaService.getInstance() + .deleteCatalogMetasByLegacyTimeLine(legacyTimeLine, limit); + case SCHEMA_TABLE_NAME: + SchemaMetaService.getInstance().deleteSchemaMetasByLegacyTimeLine(legacyTimeLine, limit); + case TABLE_TABLE_NAME: + TableMetaService.getInstance().deleteTableMetasByLegacyTimeLine(legacyTimeLine, limit); + case FILESET_TABLE_NAME: + FilesetMetaService.getInstance() + .deleteFilesetAndVersionMetasByLegacyTimeLine(legacyTimeLine, limit); + case FILESET_VERSION_TABLE_NAME: + FilesetMetaService.getInstance() + .deleteFilesetAndVersionMetasByLegacyTimeLine(legacyTimeLine, limit); + default: + throw new IllegalArgumentException("Unsupported table name: " + tableName); + } + } + } + + private void collectAndRemoveOldVersionData() { long deleteTimeLine = System.currentTimeMillis() - config.get(KV_DELETE_AFTER_TIME); - for (AllTables.TABLE_NAMES table : AllTables.TABLE_NAMES.values()) { - System.out.println(table.getTableName()); - - /* - String sql = "SELECT * FROM " + table + " WHERE deleted_at != 0 AND deleted_at < ?"; - DelegatingConnection connection = null; - try (PreparedStatement stmt = connection.prepareStatement(sql)) { - stmt.setLong(1, deleteTimeLine); - try (ResultSet rs = stmt.executeQuery()) { - while (rs.next()) { - String deleteSql = "DELETE FROM " + table + " WHERE id = ?"; - try (PreparedStatement deleteStmt = connection.prepareStatement(deleteSql)) { - deleteStmt.setLong(1, rs.getLong("id")); - deleteStmt.executeUpdate(); - } - - String checkSql = "SELECT * FROM " + table + " WHERE id > ? LIMIT 1"; - try (PreparedStatement checkStmt = connection.prepareStatement(checkSql)) { - checkStmt.setLong(1, rs.getLong("id")); - try (ResultSet checkRs = checkStmt.executeQuery()) { - if (checkRs.next()) { - String deleteOldVersionSql = "DELETE FROM " + table + " WHERE id = ?"; - try (PreparedStatement deleteOldVersionStmt = - connection.prepareStatement(deleteOldVersionSql)) { - deleteOldVersionStmt.setLong(1, rs.getLong("id")); - deleteOldVersionStmt.executeUpdate(); - } - } - } - } - } - } - } */ - } + int version_retention_count = 1; + + + for (AllTables.TABLE_NAMES table : AllTables.TABLE_NAMES.values()) {} } @Override diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/CatalogMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/CatalogMetaMapper.java index f3f60376927..1a61717edbd 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/CatalogMetaMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/CatalogMetaMapper.java @@ -8,6 +8,7 @@ import com.datastrato.gravitino.storage.relational.AllTables; import com.datastrato.gravitino.storage.relational.po.CatalogPO; import java.util.List; +import org.apache.ibatis.annotations.Delete; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select; @@ -148,4 +149,11 @@ Integer updateCatalogMeta( + " SET deleted_at = UNIX_TIMESTAMP(CURRENT_TIMESTAMP(3)) * 1000.0" + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0") Integer softDeleteCatalogMetasByMetalakeId(@Param("metalakeId") Long metalakeId); + + @Delete( + "DELETE FROM " + + TABLE_NAME + + " WHERE deleted_at != 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}") + Integer deleteCatalogMetasByLegacyTimeLine( + @Param("legacyTimeLine") Long legacyTimeLine, @Param("limit") int limit); } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetMetaMapper.java index 5532f388906..eed1aea24e3 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetMetaMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetMetaMapper.java @@ -8,6 +8,7 @@ import com.datastrato.gravitino.storage.relational.AllTables; import com.datastrato.gravitino.storage.relational.po.FilesetPO; import java.util.List; +import org.apache.ibatis.annotations.Delete; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Result; @@ -215,4 +216,11 @@ Integer updateFilesetMeta( + " SET deleted_at = UNIX_TIMESTAMP(CURRENT_TIMESTAMP(3)) * 1000.0" + " WHERE fileset_id = #{filesetId} AND deleted_at = 0") Integer softDeleteFilesetMetasByFilesetId(@Param("filesetId") Long filesetId); + + @Delete( + "DELETE FROM " + + META_TABLE_NAME + + " WHERE deleted_at != 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}") + Integer deleteFilesetMetasByLegacyTimeLine( + @Param("legacyTimeLine") Long legacyTimeLine, @Param("limit") int limit); } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java index e28ffc952c8..59db50bd341 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java @@ -7,6 +7,7 @@ import com.datastrato.gravitino.storage.relational.AllTables; import com.datastrato.gravitino.storage.relational.po.FilesetVersionPO; +import org.apache.ibatis.annotations.Delete; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Update; @@ -98,4 +99,11 @@ void insertFilesetVersionOnDuplicateKeyUpdate( + " SET deleted_at = UNIX_TIMESTAMP(CURRENT_TIMESTAMP(3)) * 1000.0" + " WHERE fileset_id = #{filesetId} AND deleted_at = 0") Integer softDeleteFilesetVersionsByFilesetId(@Param("filesetId") Long filesetId); + + @Delete( + "DELETE FROM " + + VERSION_TABLE_NAME + + " WHERE deleted_at != 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}") + Integer deleteFilesetVersionsByLegacyTimeLine( + @Param("legacyTimeLine") Long legacyTimeLine, @Param("limit") int limit); } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/MetalakeMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/MetalakeMetaMapper.java index a5d4f7f4dc1..1ca6322a4ce 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/MetalakeMetaMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/MetalakeMetaMapper.java @@ -8,6 +8,7 @@ import com.datastrato.gravitino.storage.relational.AllTables; import com.datastrato.gravitino.storage.relational.po.MetalakePO; import java.util.List; +import org.apache.ibatis.annotations.Delete; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select; @@ -126,4 +127,11 @@ Integer updateMetalakeMeta( + " SET deleted_at = UNIX_TIMESTAMP(CURRENT_TIMESTAMP(3)) * 1000.0" + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0") Integer softDeleteMetalakeMetaByMetalakeId(@Param("metalakeId") Long metalakeId); + + @Delete( + "DELETE FROM " + + TABLE_NAME + + " WHERE deleted_at != 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}") + Integer deleteMetalakeMetasByLegacyTimeLine( + @Param("legacyTimeLine") Long legacyTimeLine, @Param("limit") int limit); } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/SchemaMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/SchemaMetaMapper.java index 9883f8db238..3610dc2a770 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/SchemaMetaMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/SchemaMetaMapper.java @@ -8,6 +8,7 @@ import com.datastrato.gravitino.storage.relational.AllTables; import com.datastrato.gravitino.storage.relational.po.SchemaPO; import java.util.List; +import org.apache.ibatis.annotations.Delete; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select; @@ -150,4 +151,11 @@ Integer updateSchemaMeta( + " SET deleted_at = UNIX_TIMESTAMP(CURRENT_TIMESTAMP(3)) * 1000.0" + " WHERE catalog_id = #{catalogId} AND deleted_at = 0") Integer softDeleteSchemaMetasByCatalogId(@Param("catalogId") Long catalogId); + + @Delete( + "DELETE FROM " + + TABLE_NAME + + " WHERE deleted_at != 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}") + Integer deleteSchemaMetasByLegacyTimeLine( + @Param("legacyTimeLine") Long legacyTimeLine, @Param("limit") int limit); } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TableMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TableMetaMapper.java index a900498fda4..0af901c4518 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TableMetaMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TableMetaMapper.java @@ -8,6 +8,7 @@ import com.datastrato.gravitino.storage.relational.AllTables; import com.datastrato.gravitino.storage.relational.po.TablePO; import java.util.List; +import org.apache.ibatis.annotations.Delete; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select; @@ -152,4 +153,11 @@ Integer updateTableMeta( + " SET deleted_at = UNIX_TIMESTAMP(CURRENT_TIMESTAMP(3)) * 1000.0" + " WHERE schema_id = #{schemaId} AND deleted_at = 0") Integer softDeleteTableMetasBySchemaId(@Param("schemaId") Long schemaId); + + @Delete( + "DELETE FROM " + + TABLE_NAME + + " WHERE deleted_at != 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}") + Integer deleteTableMetasByLegacyTimeLine( + @Param("legacyTimeLine") Long legacyTimeLine, @Param("limit") int limit); } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/CatalogMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/CatalogMetaService.java index a075dbf8d39..dca26b21c44 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/CatalogMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/CatalogMetaService.java @@ -209,4 +209,12 @@ public boolean deleteCatalog(NameIdentifier identifier, boolean cascade) { return true; } + + public void deleteCatalogMetasByLegacyTimeLine(Long legacyTimeLine, int limit) { + SessionUtils.doWithCommit( + CatalogMetaMapper.class, + mapper -> { + mapper.deleteCatalogMetasByLegacyTimeLine(legacyTimeLine, limit); + }); + } } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java index eb8b1924ac2..7ad5793a553 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java @@ -213,6 +213,19 @@ public boolean deleteFileset(NameIdentifier identifier) { return true; } + public void deleteFilesetAndVersionMetasByLegacyTimeLine(Long legacyTimeLine, int limit) { + SessionUtils.doWithCommit( + FilesetMetaMapper.class, + mapper -> { + mapper.deleteFilesetMetasByLegacyTimeLine(legacyTimeLine, limit); + }); + SessionUtils.doWithCommit( + FilesetVersionMapper.class, + mapper -> { + mapper.deleteFilesetVersionsByLegacyTimeLine(legacyTimeLine, limit); + }); + } + private void fillFilesetPOBuilderParentEntityId(FilesetPO.Builder builder, Namespace namespace) { Namespace.checkFileset(namespace); Long parentEntityId = null; diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java index 7c76ab4ebb5..eb35ec49e00 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java @@ -186,4 +186,12 @@ public boolean deleteMetalake(NameIdentifier ident, boolean cascade) { } return true; } + + public void deleteMetalakeMetasByLegacyTimeLine(Long legacyTimeLine, int limit) { + SessionUtils.doWithCommit( + MetalakeMetaMapper.class, + mapper -> { + mapper.deleteMetalakeMetasByLegacyTimeLine(legacyTimeLine, limit); + }); + } } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/SchemaMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/SchemaMetaService.java index c1b8ba490d2..8c852bb978d 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/SchemaMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/SchemaMetaService.java @@ -215,6 +215,14 @@ public boolean deleteSchema(NameIdentifier identifier, boolean cascade) { return true; } + public void deleteSchemaMetasByLegacyTimeLine(Long legacyTimeLine, int limit) { + SessionUtils.doWithCommit( + SchemaMetaMapper.class, + mapper -> { + mapper.deleteSchemaMetasByLegacyTimeLine(legacyTimeLine, limit); + }); + } + private void fillSchemaPOBuilderParentEntityId(SchemaPO.Builder builder, Namespace namespace) { Namespace.checkSchema(namespace); Long parentEntityId = null; diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TableMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TableMetaService.java index fc6a03db757..5279d7abacc 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TableMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TableMetaService.java @@ -163,6 +163,14 @@ public boolean deleteTable(NameIdentifier identifier) { return true; } + public void deleteTableMetasByLegacyTimeLine(Long legacyTimeLine, int limit) { + SessionUtils.doWithCommit( + TableMetaMapper.class, + mapper -> { + mapper.deleteTableMetasByLegacyTimeLine(legacyTimeLine, limit); + }); + } + private void fillTablePOBuilderParentEntityId(TablePO.Builder builder, Namespace namespace) { Namespace.checkTable(namespace); Long parentEntityId = null; From 9e60f961294e436052be51e24d42b77b2f21f071 Mon Sep 17 00:00:00 2001 From: YxAc Date: Tue, 16 Apr 2024 14:28:36 +0800 Subject: [PATCH 04/14] update --- .../com/datastrato/gravitino/Configs.java | 1 + .../gravitino/storage/LogHelper.java | 39 ----------- .../storage/kv/KvGarbageCollector.java | 23 ++++++- .../RelationalGarbageCollector.java | 68 ++++++++++++++----- .../mapper/FilesetVersionMapper.java | 23 +++++++ .../service/FilesetMetaService.java | 15 ++++ .../storage/kv/TestKvGarbageCollector.java | 2 +- 7 files changed, 113 insertions(+), 58 deletions(-) delete mode 100644 core/src/main/java/com/datastrato/gravitino/storage/LogHelper.java diff --git a/core/src/main/java/com/datastrato/gravitino/Configs.java b/core/src/main/java/com/datastrato/gravitino/Configs.java index 4d361f43436..f92faee88da 100644 --- a/core/src/main/java/com/datastrato/gravitino/Configs.java +++ b/core/src/main/java/com/datastrato/gravitino/Configs.java @@ -57,6 +57,7 @@ public interface Configs { String DEFAULT_KV_ROCKSDB_BACKEND_PATH = String.join(File.separator, System.getenv("GRAVITINO_HOME"), "data", "rocksdb"); + int GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT = 100; long MAX_NODE_IN_MEMORY = 100000L; long MIN_NODE_IN_MEMORY = 1000L; diff --git a/core/src/main/java/com/datastrato/gravitino/storage/LogHelper.java b/core/src/main/java/com/datastrato/gravitino/storage/LogHelper.java deleted file mode 100644 index bfd1c8ca316..00000000000 --- a/core/src/main/java/com/datastrato/gravitino/storage/LogHelper.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2024 Datastrato Pvt Ltd. - * This software is licensed under the Apache License version 2. - */ - -package com.datastrato.gravitino.storage; - -import com.datastrato.gravitino.Entity; -import com.datastrato.gravitino.NameIdentifier; -import com.google.common.annotations.VisibleForTesting; - -/** - * LogHelper is a utility class that provides logging-related functionality for both KV and - * Relational or other backends in the storage. It holds information about an entity such as its - * identifier, type, and creation time. The class also provides a NONE instance which is an - * implementation of the Null Object pattern. This instance can be used to avoid null checks and - * NullPointerExceptions. - */ -public class LogHelper { - - @VisibleForTesting public final NameIdentifier identifier; - @VisibleForTesting public final Entity.EntityType type; - @VisibleForTesting public final long createTimeInMs; - // Formatted createTime - @VisibleForTesting public final String createTimeAsString; - - public static final LogHelper NONE = new LogHelper(null, null, 0L, null); - - public LogHelper( - NameIdentifier identifier, - Entity.EntityType type, - long createTimeInMs, - String createTimeAsString) { - this.identifier = identifier; - this.type = type; - this.createTimeInMs = createTimeInMs; - this.createTimeAsString = createTimeAsString; - } -} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/kv/KvGarbageCollector.java b/core/src/main/java/com/datastrato/gravitino/storage/kv/KvGarbageCollector.java index 6d22d0d267f..309323f490c 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/kv/KvGarbageCollector.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/kv/KvGarbageCollector.java @@ -17,7 +17,6 @@ import com.datastrato.gravitino.Entity.EntityType; import com.datastrato.gravitino.NameIdentifier; import com.datastrato.gravitino.storage.EntityKeyEncoder; -import com.datastrato.gravitino.storage.LogHelper; import com.datastrato.gravitino.utils.Bytes; import com.google.common.annotations.VisibleForTesting; import java.io.Closeable; @@ -324,6 +323,28 @@ private void removeAllVersionsOfKey(byte[] rawKey, byte[] key, boolean includeSt } } + static class LogHelper { + + @VisibleForTesting final NameIdentifier identifier; + @VisibleForTesting final EntityType type; + @VisibleForTesting final long createTimeInMs; + // Formatted createTime + @VisibleForTesting final String createTimeAsString; + + public static final LogHelper NONE = new LogHelper(null, null, 0L, null); + + public LogHelper( + NameIdentifier identifier, + EntityType type, + long createTimeInMs, + String createTimeAsString) { + this.identifier = identifier; + this.type = type; + this.createTimeInMs = createTimeInMs; + this.createTimeAsString = createTimeAsString; + } + } + @VisibleForTesting LogHelper decodeKey(byte[] key, byte[] timestampArray) { if (entityKeyEncoder == null) { diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java index 1c8679d15d1..d8b1197f030 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java @@ -5,9 +5,12 @@ package com.datastrato.gravitino.storage.relational; -import static com.datastrato.gravitino.Configs.KV_DELETE_AFTER_TIME; +import static com.datastrato.gravitino.Configs.GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT; +import static com.datastrato.gravitino.Configs.STORE_DELETE_AFTER_TIME; +import static com.datastrato.gravitino.Configs.VERSION_RETENTION_COUNT; import com.datastrato.gravitino.Config; +import com.datastrato.gravitino.storage.relational.po.FilesetVersionPO; import com.datastrato.gravitino.storage.relational.service.CatalogMetaService; import com.datastrato.gravitino.storage.relational.service.FilesetMetaService; import com.datastrato.gravitino.storage.relational.service.MetalakeMetaService; @@ -17,6 +20,7 @@ import java.io.Closeable; import java.io.IOException; import java.sql.SQLException; +import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; @@ -48,7 +52,7 @@ public RelationalGarbageCollector(RelationalBackend backend, Config config) { } public void start() { - long dateTimeLineMinute = config.get(KV_DELETE_AFTER_TIME) / 1000 / 60; + long dateTimeLineMinute = config.get(STORE_DELETE_AFTER_TIME) / 1000 / 60; // We will collect garbage every 10 minutes at least. If the dateTimeLineMinute is larger than // 100 minutes, we would collect garbage every dateTimeLineMinute/10 minutes. @@ -72,41 +76,71 @@ private void collectAndClean() { } private void collectAndRemoveLegacyData() throws SQLException { - long legacyTimeLine = System.currentTimeMillis() - config.get(KV_DELETE_AFTER_TIME); - // TODO: put limit to configuration - int limit = 20; + long legacyTimeLine = System.currentTimeMillis() - config.get(STORE_DELETE_AFTER_TIME); for (AllTables.TABLE_NAMES tableName : AllTables.TABLE_NAMES.values()) { switch (tableName) { case METALAKE_TABLE_NAME: MetalakeMetaService.getInstance() - .deleteMetalakeMetasByLegacyTimeLine(legacyTimeLine, limit); + .deleteMetalakeMetasByLegacyTimeLine( + legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); case CATALOG_TABLE_NAME: CatalogMetaService.getInstance() - .deleteCatalogMetasByLegacyTimeLine(legacyTimeLine, limit); + .deleteCatalogMetasByLegacyTimeLine( + legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); case SCHEMA_TABLE_NAME: - SchemaMetaService.getInstance().deleteSchemaMetasByLegacyTimeLine(legacyTimeLine, limit); + SchemaMetaService.getInstance() + .deleteSchemaMetasByLegacyTimeLine( + legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); case TABLE_TABLE_NAME: - TableMetaService.getInstance().deleteTableMetasByLegacyTimeLine(legacyTimeLine, limit); + TableMetaService.getInstance() + .deleteTableMetasByLegacyTimeLine( + legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); case FILESET_TABLE_NAME: FilesetMetaService.getInstance() - .deleteFilesetAndVersionMetasByLegacyTimeLine(legacyTimeLine, limit); + .deleteFilesetAndVersionMetasByLegacyTimeLine( + legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); case FILESET_VERSION_TABLE_NAME: FilesetMetaService.getInstance() - .deleteFilesetAndVersionMetasByLegacyTimeLine(legacyTimeLine, limit); + .deleteFilesetAndVersionMetasByLegacyTimeLine( + legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); default: - throw new IllegalArgumentException("Unsupported table name: " + tableName); + throw new IllegalArgumentException( + "Unsupported table name when collectAndRemoveLegacyData: " + tableName); } } } private void collectAndRemoveOldVersionData() { - long deleteTimeLine = System.currentTimeMillis() - config.get(KV_DELETE_AFTER_TIME); + long version_retention_count = config.get(VERSION_RETENTION_COUNT); - int version_retention_count = 1; - - - for (AllTables.TABLE_NAMES table : AllTables.TABLE_NAMES.values()) {} + for (AllTables.TABLE_NAMES tableName : AllTables.TABLE_NAMES.values()) { + switch (tableName) { + case FILESET_VERSION_TABLE_NAME: + List filesetCurVersions = + FilesetMetaService.getInstance() + .getFilesetVersionPOsByRetentionCount(version_retention_count); + + for (FilesetVersionPO filesetVersionPO : filesetCurVersions) { + long versionRetentionLine = + filesetVersionPO.getVersion().longValue() - version_retention_count; + FilesetMetaService.getInstance() + .deleteFilesetVersionsByRetentionLine( + filesetVersionPO.getFilesetId(), + versionRetentionLine, + GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); + } + case METALAKE_TABLE_NAME: + case CATALOG_TABLE_NAME: + case SCHEMA_TABLE_NAME: + case TABLE_TABLE_NAME: + case FILESET_TABLE_NAME: + continue; + default: + throw new IllegalArgumentException( + "Unsupported table name when collectAndRemoveOldVersionData: " + tableName); + } + } } @Override diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java index 59db50bd341..13b9ba5e116 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java @@ -7,9 +7,11 @@ import com.datastrato.gravitino.storage.relational.AllTables; import com.datastrato.gravitino.storage.relational.po.FilesetVersionPO; +import java.util.List; import org.apache.ibatis.annotations.Delete; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; import org.apache.ibatis.annotations.Update; /** @@ -106,4 +108,25 @@ void insertFilesetVersionOnDuplicateKeyUpdate( + " WHERE deleted_at != 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}") Integer deleteFilesetVersionsByLegacyTimeLine( @Param("legacyTimeLine") Long legacyTimeLine, @Param("limit") int limit); + + @Select( + "SELECT id, fileset_id as filesetId," + + " Max(version) as version, deleted_at as deletedAt," + + " metalake_id as metalakeId, catalog_id as catalogId, schema_id as schemaId," + + " storage_location as storageLocation" + + " FROM " + + VERSION_TABLE_NAME + + " WHERE version > #{versionRetentionCount} AND deleted_at = 0" + + " GROUP BY fileset_id") + List selectFilesetVersionsByRetentionCount( + @Param("versionRetentionCount") Long versionRetentionCount); + + @Delete( + "DELETE FROM " + + VERSION_TABLE_NAME + + " WHERE fileset_id = #{filesetId} AND version <= #{versionRetentionLine} AND deleted_at = 0 LIMIT #{limit}") + Integer deleteFilesetVersionsByRetentionLine( + @Param("filesetId") Long filesetId, + @Param("versionRetentionLine") Long versionRetentionLine, + @Param("limit") int limit); } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java index 7ad5793a553..32f95afe085 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java @@ -13,6 +13,7 @@ import com.datastrato.gravitino.storage.relational.mapper.FilesetMetaMapper; import com.datastrato.gravitino.storage.relational.mapper.FilesetVersionMapper; import com.datastrato.gravitino.storage.relational.po.FilesetPO; +import com.datastrato.gravitino.storage.relational.po.FilesetVersionPO; import com.datastrato.gravitino.storage.relational.utils.ExceptionUtils; import com.datastrato.gravitino.storage.relational.utils.POConverters; import com.datastrato.gravitino.storage.relational.utils.SessionUtils; @@ -50,6 +51,12 @@ public FilesetPO getFilesetPOBySchemaIdAndName(Long schemaId, String filesetName return filesetPO; } + public List getFilesetVersionPOsByRetentionCount(Long versionRetentionCount) { + return SessionUtils.getWithoutCommit( + FilesetVersionMapper.class, + mapper -> mapper.selectFilesetVersionsByRetentionCount(versionRetentionCount)); + } + public Long getFilesetIdBySchemaIdAndName(Long schemaId, String filesetName) { Long filesetId = SessionUtils.getWithoutCommit( @@ -226,6 +233,14 @@ public void deleteFilesetAndVersionMetasByLegacyTimeLine(Long legacyTimeLine, in }); } + public Integer deleteFilesetVersionsByRetentionLine( + Long filesetId, Long versionRetentionLine, int limit) { + return SessionUtils.doWithCommitAndFetchResult( + FilesetVersionMapper.class, + mapper -> + mapper.deleteFilesetVersionsByRetentionLine(filesetId, versionRetentionLine, limit)); + } + private void fillFilesetPOBuilderParentEntityId(FilesetPO.Builder builder, Namespace namespace) { Namespace.checkFileset(namespace); Long parentEntityId = null; diff --git a/core/src/test/java/com/datastrato/gravitino/storage/kv/TestKvGarbageCollector.java b/core/src/test/java/com/datastrato/gravitino/storage/kv/TestKvGarbageCollector.java index b9c598b46e0..2ac7468906a 100644 --- a/core/src/test/java/com/datastrato/gravitino/storage/kv/TestKvGarbageCollector.java +++ b/core/src/test/java/com/datastrato/gravitino/storage/kv/TestKvGarbageCollector.java @@ -35,8 +35,8 @@ import com.datastrato.gravitino.meta.SchemaEntity; import com.datastrato.gravitino.meta.TableEntity; import com.datastrato.gravitino.meta.UserEntity; -import com.datastrato.gravitino.storage.LogHelper; import com.datastrato.gravitino.storage.TransactionIdGenerator; +import com.datastrato.gravitino.storage.kv.KvGarbageCollector.LogHelper; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; From 4b13ed049a68b3c67cf05b92e02254c54e13506c Mon Sep 17 00:00:00 2001 From: YxAc Date: Tue, 16 Apr 2024 17:58:01 +0800 Subject: [PATCH 05/14] print log when physically delete --- .../storage/relational/RelationalBackend.java | 2 +- .../RelationalGarbageCollector.java | 12 +++++++--- .../relational/po/FilesetVersionPO.java | 24 +++++++++++++++++++ 3 files changed, 34 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalBackend.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalBackend.java index ea9506ed147..c8e11f1c3d4 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalBackend.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalBackend.java @@ -84,7 +84,7 @@ E get(NameIdentifier ident, Entity.EntityType throws IOException; /** - * Deletes the entity associated with the identifier and the entity type. + * Soft deletes the entity associated with the identifier and the entity type. * * @param ident The identifier of the entity. * @param entityType The type of the entity. diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java index d8b1197f030..ac2cf3ef3e7 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java @@ -77,6 +77,9 @@ private void collectAndClean() { private void collectAndRemoveLegacyData() throws SQLException { long legacyTimeLine = System.currentTimeMillis() - config.get(STORE_DELETE_AFTER_TIME); + LOG.info( + "Try to physically delete legacy data that has been marked deleted before {}", + legacyTimeLine); for (AllTables.TABLE_NAMES tableName : AllTables.TABLE_NAMES.values()) { switch (tableName) { @@ -101,9 +104,8 @@ private void collectAndRemoveLegacyData() throws SQLException { .deleteFilesetAndVersionMetasByLegacyTimeLine( legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); case FILESET_VERSION_TABLE_NAME: - FilesetMetaService.getInstance() - .deleteFilesetAndVersionMetasByLegacyTimeLine( - legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); + // using the same method as FILESET_TABLE_NAME + continue; default: throw new IllegalArgumentException( "Unsupported table name when collectAndRemoveLegacyData: " + tableName); @@ -129,6 +131,10 @@ private void collectAndRemoveOldVersionData() { filesetVersionPO.getFilesetId(), versionRetentionLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); + LOG.info( + "Physically delete FilesetVersion: {} which version is older than or equal to versionRetentionLine: {}.", + filesetVersionPO, + versionRetentionLine); } case METALAKE_TABLE_NAME: case CATALOG_TABLE_NAME: diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/po/FilesetVersionPO.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/po/FilesetVersionPO.java index ebfd7517ebe..e647e642b9d 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/po/FilesetVersionPO.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/po/FilesetVersionPO.java @@ -96,6 +96,30 @@ public int hashCode() { getDeletedAt()); } + @Override + public String toString() { + return new StringBuilder() + .append("FilesetVersionPO{id=") + .append(id) + .append(", metalakeId=") + .append(metalakeId) + .append(", catalogId=") + .append(catalogId) + .append(", schemaId=") + .append(schemaId) + .append(", filesetId=") + .append(filesetId) + .append(", version=") + .append(version) + .append(", storageLocation='") + .append(storageLocation) + .append('\'') + .append(", deletedAt=") + .append(deletedAt) + .append('}') + .toString(); + } + public static class Builder { private final FilesetVersionPO filesetVersionPO; From 587e78127ef22d9b31b5284af9e866287670db24 Mon Sep 17 00:00:00 2001 From: YxAc Date: Tue, 16 Apr 2024 23:05:04 +0800 Subject: [PATCH 06/14] update jdbc backend --- .../storage/relational/AllTables.java | 38 ------- .../storage/relational/JDBCBackend.java | 106 ++++++++++++++++++ .../storage/relational/RelationalBackend.java | 16 +++ .../RelationalGarbageCollector.java | 90 +-------------- .../relational/mapper/CatalogMetaMapper.java | 3 +- .../relational/mapper/FilesetMetaMapper.java | 5 +- .../mapper/FilesetVersionMapper.java | 3 +- .../relational/mapper/MetalakeMetaMapper.java | 3 +- .../relational/mapper/SchemaMetaMapper.java | 3 +- .../relational/mapper/TableMetaMapper.java | 3 +- .../TestRelationalGarbageCollector.java | 8 ++ 11 files changed, 142 insertions(+), 136 deletions(-) delete mode 100644 core/src/main/java/com/datastrato/gravitino/storage/relational/AllTables.java create mode 100644 core/src/test/java/com/datastrato/gravitino/storage/relational/TestRelationalGarbageCollector.java diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/AllTables.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/AllTables.java deleted file mode 100644 index f8c05da1d38..00000000000 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/AllTables.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2024 Datastrato Pvt Ltd. - * This software is licensed under the Apache License version 2. - */ - -package com.datastrato.gravitino.storage.relational; - -public class AllTables { - public static final String METALAKE_TABLE_NAME = "metalake_meta"; - public static final String CATALOG_TABLE_NAME = "catalog_meta"; - public static final String SCHEMA_TABLE_NAME = "schema_meta"; - public static final String TABLE_TABLE_NAME = "table_meta"; - public static final String FILESET_TABLE_NAME = "fileset_meta"; - public static final String FILESET_VERSION_TABLE_NAME = "fileset_version_info"; - - private AllTables() { - // Prevent instantiation. - } - - public static enum TABLE_NAMES { - METALAKE_TABLE_NAME(AllTables.METALAKE_TABLE_NAME), - CATALOG_TABLE_NAME(AllTables.CATALOG_TABLE_NAME), - SCHEMA_TABLE_NAME(AllTables.SCHEMA_TABLE_NAME), - TABLE_TABLE_NAME(AllTables.TABLE_TABLE_NAME), - FILESET_TABLE_NAME(AllTables.FILESET_TABLE_NAME), - FILESET_VERSION_TABLE_NAME(AllTables.FILESET_VERSION_TABLE_NAME); - - private final String tableName; - - TABLE_NAMES(String tableName) { - this.tableName = tableName; - } - - public String getTableName() { - return this.tableName; - } - } -} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java index 14a0df4fcac..6228e1b61eb 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java @@ -5,6 +5,8 @@ package com.datastrato.gravitino.storage.relational; +import static com.datastrato.gravitino.Configs.GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT; + import com.datastrato.gravitino.Config; import com.datastrato.gravitino.Configs; import com.datastrato.gravitino.Entity; @@ -22,6 +24,7 @@ import com.datastrato.gravitino.meta.TableEntity; import com.datastrato.gravitino.meta.TopicEntity; import com.datastrato.gravitino.storage.relational.converters.SQLExceptionConverterFactory; +import com.datastrato.gravitino.storage.relational.po.FilesetVersionPO; import com.datastrato.gravitino.storage.relational.service.CatalogMetaService; import com.datastrato.gravitino.storage.relational.service.FilesetMetaService; import com.datastrato.gravitino.storage.relational.service.MetalakeMetaService; @@ -32,6 +35,8 @@ import java.io.IOException; import java.util.List; import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * {@link JDBCBackend} is a jdbc implementation of {@link RelationalBackend} interface. You can use @@ -41,6 +46,8 @@ */ public class JDBCBackend implements RelationalBackend { + private static final Logger LOG = LoggerFactory.getLogger(JDBCBackend.class); + /** Initialize the jdbc backend instance. */ @Override public void initialize(Config config) { @@ -167,6 +174,105 @@ public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolea } } + @Override + public void hardDeleteLegacyData(long legacyTimeLine) { + LOG.info( + "Try to physically delete legacy data that has been marked deleted before {}", + legacyTimeLine); + + for (Entity.EntityType entityType : Entity.EntityType.values()) { + switch (entityType) { + case METALAKE: + MetalakeMetaService.getInstance() + .deleteMetalakeMetasByLegacyTimeLine( + legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); + break; + case CATALOG: + CatalogMetaService.getInstance() + .deleteCatalogMetasByLegacyTimeLine( + legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); + break; + case SCHEMA: + SchemaMetaService.getInstance() + .deleteSchemaMetasByLegacyTimeLine( + legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); + break; + case TABLE: + TableMetaService.getInstance() + .deleteTableMetasByLegacyTimeLine( + legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); + break; + case FILESET: + FilesetMetaService.getInstance() + .deleteFilesetAndVersionMetasByLegacyTimeLine( + legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); + break; + + case COLUMN: + case TOPIC: + case USER: + case GROUP: + case AUDIT: + continue; + // TODO: Implement the delete logic for these entity types. + + default: + throw new IllegalArgumentException( + "Unsupported entity type when collectAndRemoveLegacyData: " + entityType); + } + } + } + + @Override + public void hardDeleteOldVersionData(long versionRetentionCount) { + for (Entity.EntityType entityType : Entity.EntityType.values()) { + switch (entityType) { + case METALAKE: + case CATALOG: + case SCHEMA: + case TABLE: + case COLUMN: + case TOPIC: + case USER: + case GROUP: + case AUDIT: + // These entity types have not implemented multi-versions, so we can skip. + continue; + + case FILESET: + // Get the current version of all filesets. + List filesetCurVersions = + FilesetMetaService.getInstance() + .getFilesetVersionPOsByRetentionCount(versionRetentionCount); + + // Delete old versions that are older than or equal to (currentVersion - + // versionRetentionCount). + for (FilesetVersionPO filesetVersionPO : filesetCurVersions) { + long versionRetentionLine = + filesetVersionPO.getVersion().longValue() - versionRetentionCount; + int deletedCount = + FilesetMetaService.getInstance() + .deleteFilesetVersionsByRetentionLine( + filesetVersionPO.getFilesetId(), + versionRetentionLine, + GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); + + // Log the deletion by current fileset version. + LOG.info( + "Physically deleted count: {} which fileset version is older than or equal to versionRetentionLine: {} by current FilesetVersion: {}.", + deletedCount, + versionRetentionLine, + filesetVersionPO); + } + break; + + default: + throw new IllegalArgumentException( + "Unsupported entity type when collectAndRemoveOldVersionData: " + entityType); + } + } + } + @Override public void close() throws IOException { SqlSessionFactoryHelper.getInstance().close(); diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalBackend.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalBackend.java index c8e11f1c3d4..67fd0d49876 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalBackend.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalBackend.java @@ -92,4 +92,20 @@ E get(NameIdentifier ident, Entity.EntityType * @return True, if the entity was successfully deleted, else false. */ boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolean cascade); + + /** + * Permanent deletes the legacy data that has been marked as deleted before the given legacy + * timeline. + * + * @param legacyTimeLine The time before which the data has been marked as deleted. + */ + void hardDeleteLegacyData(long legacyTimeLine); + + /** + * Permanent deletes the old version data that is older than or equal to the given version + * retention count. + * + * @param versionRetentionCount The count of versions to retain. + */ + void hardDeleteOldVersionData(long versionRetentionCount); } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java index ac2cf3ef3e7..25d66f19043 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java @@ -5,22 +5,13 @@ package com.datastrato.gravitino.storage.relational; -import static com.datastrato.gravitino.Configs.GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT; import static com.datastrato.gravitino.Configs.STORE_DELETE_AFTER_TIME; import static com.datastrato.gravitino.Configs.VERSION_RETENTION_COUNT; import com.datastrato.gravitino.Config; -import com.datastrato.gravitino.storage.relational.po.FilesetVersionPO; -import com.datastrato.gravitino.storage.relational.service.CatalogMetaService; -import com.datastrato.gravitino.storage.relational.service.FilesetMetaService; -import com.datastrato.gravitino.storage.relational.service.MetalakeMetaService; -import com.datastrato.gravitino.storage.relational.service.SchemaMetaService; -import com.datastrato.gravitino.storage.relational.service.TableMetaService; import com.google.common.annotations.VisibleForTesting; import java.io.Closeable; import java.io.IOException; -import java.sql.SQLException; -import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; @@ -63,11 +54,14 @@ public void start() { private void collectAndClean() { long threadId = Thread.currentThread().getId(); LOG.info("Thread {} start to collect garbage...", threadId); + try { LOG.info("Start to collect and delete legacy data by thread {}", threadId); - collectAndRemoveLegacyData(); + long legacyTimeLine = System.currentTimeMillis() - config.get(STORE_DELETE_AFTER_TIME); + backend.hardDeleteLegacyData(legacyTimeLine); + LOG.info("Start to collect and delete old version data by thread {}", threadId); - collectAndRemoveOldVersionData(); + backend.hardDeleteOldVersionData(config.get(VERSION_RETENTION_COUNT)); } catch (Exception e) { LOG.error("Thread {} failed to collect and clean garbage.", threadId, e); } finally { @@ -75,80 +69,6 @@ private void collectAndClean() { } } - private void collectAndRemoveLegacyData() throws SQLException { - long legacyTimeLine = System.currentTimeMillis() - config.get(STORE_DELETE_AFTER_TIME); - LOG.info( - "Try to physically delete legacy data that has been marked deleted before {}", - legacyTimeLine); - - for (AllTables.TABLE_NAMES tableName : AllTables.TABLE_NAMES.values()) { - switch (tableName) { - case METALAKE_TABLE_NAME: - MetalakeMetaService.getInstance() - .deleteMetalakeMetasByLegacyTimeLine( - legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); - case CATALOG_TABLE_NAME: - CatalogMetaService.getInstance() - .deleteCatalogMetasByLegacyTimeLine( - legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); - case SCHEMA_TABLE_NAME: - SchemaMetaService.getInstance() - .deleteSchemaMetasByLegacyTimeLine( - legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); - case TABLE_TABLE_NAME: - TableMetaService.getInstance() - .deleteTableMetasByLegacyTimeLine( - legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); - case FILESET_TABLE_NAME: - FilesetMetaService.getInstance() - .deleteFilesetAndVersionMetasByLegacyTimeLine( - legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); - case FILESET_VERSION_TABLE_NAME: - // using the same method as FILESET_TABLE_NAME - continue; - default: - throw new IllegalArgumentException( - "Unsupported table name when collectAndRemoveLegacyData: " + tableName); - } - } - } - - private void collectAndRemoveOldVersionData() { - long version_retention_count = config.get(VERSION_RETENTION_COUNT); - - for (AllTables.TABLE_NAMES tableName : AllTables.TABLE_NAMES.values()) { - switch (tableName) { - case FILESET_VERSION_TABLE_NAME: - List filesetCurVersions = - FilesetMetaService.getInstance() - .getFilesetVersionPOsByRetentionCount(version_retention_count); - - for (FilesetVersionPO filesetVersionPO : filesetCurVersions) { - long versionRetentionLine = - filesetVersionPO.getVersion().longValue() - version_retention_count; - FilesetMetaService.getInstance() - .deleteFilesetVersionsByRetentionLine( - filesetVersionPO.getFilesetId(), - versionRetentionLine, - GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); - LOG.info( - "Physically delete FilesetVersion: {} which version is older than or equal to versionRetentionLine: {}.", - filesetVersionPO, - versionRetentionLine); - } - case METALAKE_TABLE_NAME: - case CATALOG_TABLE_NAME: - case SCHEMA_TABLE_NAME: - case TABLE_TABLE_NAME: - case FILESET_TABLE_NAME: - continue; - default: - throw new IllegalArgumentException( - "Unsupported table name when collectAndRemoveOldVersionData: " + tableName); - } - } - } - @Override public void close() throws IOException { this.garbageCollectorPool.shutdown(); diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/CatalogMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/CatalogMetaMapper.java index 1a61717edbd..0682c43db9b 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/CatalogMetaMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/CatalogMetaMapper.java @@ -5,7 +5,6 @@ package com.datastrato.gravitino.storage.relational.mapper; -import com.datastrato.gravitino.storage.relational.AllTables; import com.datastrato.gravitino.storage.relational.po.CatalogPO; import java.util.List; import org.apache.ibatis.annotations.Delete; @@ -23,7 +22,7 @@ * href="https://mybatis.org/mybatis-3/getting-started.html"> */ public interface CatalogMetaMapper { - String TABLE_NAME = AllTables.CATALOG_TABLE_NAME; + String TABLE_NAME = "catalog_meta"; @Select( "SELECT catalog_id as catalogId, catalog_name as catalogName," diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetMetaMapper.java index eed1aea24e3..956b566066a 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetMetaMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetMetaMapper.java @@ -5,7 +5,6 @@ package com.datastrato.gravitino.storage.relational.mapper; -import com.datastrato.gravitino.storage.relational.AllTables; import com.datastrato.gravitino.storage.relational.po.FilesetPO; import java.util.List; import org.apache.ibatis.annotations.Delete; @@ -25,9 +24,9 @@ * href="https://mybatis.org/mybatis-3/getting-started.html"> */ public interface FilesetMetaMapper { - String META_TABLE_NAME = AllTables.FILESET_TABLE_NAME; + String META_TABLE_NAME = "fileset_meta"; - String VERSION_TABLE_NAME = AllTables.FILESET_VERSION_TABLE_NAME; + String VERSION_TABLE_NAME = "fileset_version_info"; @Select( "SELECT fm.fileset_id, fm.fileset_name, fm.metalake_id, fm.catalog_id, fm.schema_id," diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java index 13b9ba5e116..8d2e1b41bd9 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java @@ -5,7 +5,6 @@ package com.datastrato.gravitino.storage.relational.mapper; -import com.datastrato.gravitino.storage.relational.AllTables; import com.datastrato.gravitino.storage.relational.po.FilesetVersionPO; import java.util.List; import org.apache.ibatis.annotations.Delete; @@ -23,7 +22,7 @@ * href="https://mybatis.org/mybatis-3/getting-started.html"> */ public interface FilesetVersionMapper { - String VERSION_TABLE_NAME = AllTables.FILESET_VERSION_TABLE_NAME; + String VERSION_TABLE_NAME = "fileset_version_info"; @Insert( "INSERT INTO " diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/MetalakeMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/MetalakeMetaMapper.java index 1ca6322a4ce..7bad3386e0d 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/MetalakeMetaMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/MetalakeMetaMapper.java @@ -5,7 +5,6 @@ package com.datastrato.gravitino.storage.relational.mapper; -import com.datastrato.gravitino.storage.relational.AllTables; import com.datastrato.gravitino.storage.relational.po.MetalakePO; import java.util.List; import org.apache.ibatis.annotations.Delete; @@ -23,7 +22,7 @@ * href="https://mybatis.org/mybatis-3/getting-started.html"> */ public interface MetalakeMetaMapper { - String TABLE_NAME = AllTables.METALAKE_TABLE_NAME; + String TABLE_NAME = "metalake_meta"; @Select( "SELECT metalake_id as metalakeId, metalake_name as metalakeName," diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/SchemaMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/SchemaMetaMapper.java index 3610dc2a770..597e2ce9ba2 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/SchemaMetaMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/SchemaMetaMapper.java @@ -5,7 +5,6 @@ package com.datastrato.gravitino.storage.relational.mapper; -import com.datastrato.gravitino.storage.relational.AllTables; import com.datastrato.gravitino.storage.relational.po.SchemaPO; import java.util.List; import org.apache.ibatis.annotations.Delete; @@ -23,7 +22,7 @@ * href="https://mybatis.org/mybatis-3/getting-started.html"> */ public interface SchemaMetaMapper { - String TABLE_NAME = AllTables.SCHEMA_TABLE_NAME; + String TABLE_NAME = "schema_meta"; @Select( "SELECT schema_id as schemaId, schema_name as schemaName," diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TableMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TableMetaMapper.java index 0af901c4518..8a284d70741 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TableMetaMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TableMetaMapper.java @@ -5,7 +5,6 @@ package com.datastrato.gravitino.storage.relational.mapper; -import com.datastrato.gravitino.storage.relational.AllTables; import com.datastrato.gravitino.storage.relational.po.TablePO; import java.util.List; import org.apache.ibatis.annotations.Delete; @@ -23,7 +22,7 @@ * href="https://mybatis.org/mybatis-3/getting-started.html"> */ public interface TableMetaMapper { - String TABLE_NAME = AllTables.TABLE_TABLE_NAME; + String TABLE_NAME = "table_meta"; @Select( "SELECT table_id as tableId, table_name as tableName," diff --git a/core/src/test/java/com/datastrato/gravitino/storage/relational/TestRelationalGarbageCollector.java b/core/src/test/java/com/datastrato/gravitino/storage/relational/TestRelationalGarbageCollector.java new file mode 100644 index 00000000000..7590244cec3 --- /dev/null +++ b/core/src/test/java/com/datastrato/gravitino/storage/relational/TestRelationalGarbageCollector.java @@ -0,0 +1,8 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.storage.relational; + +public class TestRelationalGarbageCollector {} From ed5fee9c97f2e9481227384eb0f2ccc2aff315d7 Mon Sep 17 00:00:00 2001 From: YxAc Date: Thu, 18 Apr 2024 18:07:14 +0800 Subject: [PATCH 07/14] add backend ut --- .../storage/relational/JDBCBackend.java | 13 +- .../mapper/FilesetVersionMapper.java | 2 +- .../relational/mapper/TopicMetaMapper.java | 8 + .../relational/service/TopicMetaService.java | 8 + .../storage/relational/TestJDBCBackend.java | 248 +++++++++++++++++- 5 files changed, 274 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java index 6228e1b61eb..5dbcace396c 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java @@ -207,14 +207,19 @@ public void hardDeleteLegacyData(long legacyTimeLine) { .deleteFilesetAndVersionMetasByLegacyTimeLine( legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); break; + case TOPIC: + TopicMetaService.getInstance() + .deleteTopicMetasByLegacyTimeLine( + legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); + break; case COLUMN: - case TOPIC: case USER: case GROUP: case AUDIT: + case ROLE: continue; - // TODO: Implement the delete logic for these entity types. + // TODO: Implement hard delete logic for these entity types. default: throw new IllegalArgumentException( @@ -236,6 +241,7 @@ public void hardDeleteOldVersionData(long versionRetentionCount) { case USER: case GROUP: case AUDIT: + case ROLE: // These entity types have not implemented multi-versions, so we can skip. continue; @@ -259,7 +265,8 @@ public void hardDeleteOldVersionData(long versionRetentionCount) { // Log the deletion by current fileset version. LOG.info( - "Physically deleted count: {} which fileset version is older than or equal to versionRetentionLine: {} by current FilesetVersion: {}.", + "Physically delete filesetVersions count: {} which versions are older than or equal to" + + " versionRetentionLine: {}, the current FilesetVersion is: {}.", deletedCount, versionRetentionLine, filesetVersionPO); diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java index 8d2e1b41bd9..0e467b9cf69 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java @@ -109,7 +109,7 @@ Integer deleteFilesetVersionsByLegacyTimeLine( @Param("legacyTimeLine") Long legacyTimeLine, @Param("limit") int limit); @Select( - "SELECT id, fileset_id as filesetId," + "SELECT fileset_id as filesetId," + " Max(version) as version, deleted_at as deletedAt," + " metalake_id as metalakeId, catalog_id as catalogId, schema_id as schemaId," + " storage_location as storageLocation" diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TopicMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TopicMetaMapper.java index 5db879f67e8..06f0d611db0 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TopicMetaMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TopicMetaMapper.java @@ -6,6 +6,7 @@ import com.datastrato.gravitino.storage.relational.po.TopicPO; import java.util.List; +import org.apache.ibatis.annotations.Delete; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select; @@ -152,4 +153,11 @@ Long selectTopicIdBySchemaIdAndName( + " SET deleted_at = UNIX_TIMESTAMP(CURRENT_TIMESTAMP(3)) * 1000.0" + " WHERE schema_id = #{schemaId} AND deleted_at = 0") Integer softDeleteTopicMetasBySchemaId(@Param("schemaId") Long schemaId); + + @Delete( + "DELETE FROM " + + TABLE_NAME + + " WHERE deleted_at != 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}") + Integer deleteTopicMetasByLegacyTimeLine( + @Param("legacyTimeLine") Long legacyTimeLine, @Param("limit") int limit); } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TopicMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TopicMetaService.java index cc60e266f2d..36f5eddd3e8 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TopicMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TopicMetaService.java @@ -176,6 +176,14 @@ public boolean deleteTopic(NameIdentifier identifier) { return true; } + public void deleteTopicMetasByLegacyTimeLine(Long legacyTimeLine, int limit) { + SessionUtils.doWithCommit( + TopicMetaMapper.class, + mapper -> { + mapper.deleteTopicMetasByLegacyTimeLine(legacyTimeLine, limit); + }); + } + private Long getTopicIdBySchemaIdAndName(Long schemaId, String topicName) { Long topicId = SessionUtils.getWithoutCommit( diff --git a/core/src/test/java/com/datastrato/gravitino/storage/relational/TestJDBCBackend.java b/core/src/test/java/com/datastrato/gravitino/storage/relational/TestJDBCBackend.java index 965e6043678..249d6b74842 100644 --- a/core/src/test/java/com/datastrato/gravitino/storage/relational/TestJDBCBackend.java +++ b/core/src/test/java/com/datastrato/gravitino/storage/relational/TestJDBCBackend.java @@ -12,7 +12,10 @@ import static com.datastrato.gravitino.Configs.ENTITY_RELATIONAL_STORE; import static com.datastrato.gravitino.Configs.ENTITY_STORE; import static com.datastrato.gravitino.Configs.RELATIONAL_ENTITY_STORE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.datastrato.gravitino.Catalog; import com.datastrato.gravitino.Config; @@ -41,6 +44,7 @@ import java.sql.Statement; import java.time.Instant; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Objects; import java.util.UUID; @@ -394,6 +398,248 @@ public void testUpdateAlreadyExistsException() { e -> createTopicEntity(topicCopy.id(), topicCopy.namespace(), "topic", auditInfo))); } + @Test + public void testMetaLifeCycleFromCreationToDeletion() throws IOException { + AuditInfo auditInfo = + AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build(); + + // meta data creation + BaseMetalake metalake = + createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), "metalake", auditInfo); + backend.insert(metalake, false); + + CatalogEntity catalog = + createCatalog( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofCatalog("metalake"), + "catalog", + auditInfo); + backend.insert(catalog, false); + + SchemaEntity schema = + createSchemaEntity( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofSchema("metalake", "catalog"), + "schema", + auditInfo); + backend.insert(schema, false); + + TableEntity table = + createTableEntity( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofTable("metalake", "catalog", "schema"), + "table", + auditInfo); + backend.insert(table, false); + + FilesetEntity fileset = + createFilesetEntity( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofFileset("metalake", "catalog", "schema"), + "fileset", + auditInfo); + backend.insert(fileset, false); + + TopicEntity topic = + createTopicEntity( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofFileset("metalake", "catalog", "schema"), + "topic", + auditInfo); + backend.insert(topic, false); + + // update fileset properties and version + FilesetEntity filesetV2 = + createFilesetEntity( + fileset.id(), + Namespace.ofFileset("metalake", "catalog", "schema"), + "fileset", + auditInfo); + filesetV2.properties().put("version", "2"); + backend.update(fileset.nameIdentifier(), Entity.EntityType.FILESET, e -> filesetV2); + + // another meta data creation + BaseMetalake anotherMetaLake = + createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), "another-metalake", auditInfo); + backend.insert(anotherMetaLake, false); + + CatalogEntity anotherCatalog = + createCatalog( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofCatalog("another-metalake"), + "another-catalog", + auditInfo); + backend.insert(anotherCatalog, false); + + SchemaEntity anotherSchema = + createSchemaEntity( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofSchema("another-metalake", "another-catalog"), + "another-schema", + auditInfo); + backend.insert(anotherSchema, false); + + FilesetEntity anotherFileset = + createFilesetEntity( + RandomIdGenerator.INSTANCE.nextId(), + Namespace.ofFileset("another-metalake", "another-catalog", "another-schema"), + "anotherFileset", + auditInfo); + backend.insert(anotherFileset, false); + + FilesetEntity anotherFilesetV2 = + createFilesetEntity( + anotherFileset.id(), + Namespace.ofFileset("another-metalake", "another-catalog", "another-schema"), + "anotherFileset", + auditInfo); + anotherFilesetV2.properties().put("version", "2"); + backend.update( + anotherFileset.nameIdentifier(), Entity.EntityType.FILESET, e -> anotherFilesetV2); + + FilesetEntity anotherFilesetV3 = + createFilesetEntity( + anotherFileset.id(), + Namespace.ofFileset("another-metalake", "another-catalog", "another-schema"), + "anotherFileset", + auditInfo); + anotherFilesetV3.properties().put("version", "3"); + backend.update( + anotherFileset.nameIdentifier(), Entity.EntityType.FILESET, e -> anotherFilesetV3); + + // meta data list + List metaLakes = backend.list(metalake.namespace(), Entity.EntityType.METALAKE); + assertTrue(metaLakes.contains(metalake)); + + List catalogs = backend.list(catalog.namespace(), Entity.EntityType.CATALOG); + assertTrue(catalogs.contains(catalog)); + + List schemas = backend.list(schema.namespace(), Entity.EntityType.SCHEMA); + assertTrue(schemas.contains(schema)); + + List tables = backend.list(table.namespace(), Entity.EntityType.TABLE); + assertTrue(tables.contains(table)); + + List filesets = backend.list(fileset.namespace(), Entity.EntityType.FILESET); + assertFalse(filesets.contains(fileset)); + assertTrue(filesets.contains(filesetV2)); + assertEquals("2", filesets.get(filesets.indexOf(filesetV2)).properties().get("version")); + + List topics = backend.list(topic.namespace(), Entity.EntityType.TOPIC); + assertTrue(topics.contains(topic)); + + // meta data soft delete + backend.delete(metalake.nameIdentifier(), Entity.EntityType.METALAKE, true); + + // check existence after soft delete + assertFalse(backend.exists(metalake.nameIdentifier(), Entity.EntityType.METALAKE)); + assertTrue(backend.exists(anotherMetaLake.nameIdentifier(), Entity.EntityType.METALAKE)); + + assertFalse(backend.exists(catalog.nameIdentifier(), Entity.EntityType.CATALOG)); + assertTrue(backend.exists(anotherCatalog.nameIdentifier(), Entity.EntityType.CATALOG)); + + assertFalse(backend.exists(schema.nameIdentifier(), Entity.EntityType.SCHEMA)); + assertTrue(backend.exists(anotherSchema.nameIdentifier(), Entity.EntityType.SCHEMA)); + + assertFalse(backend.exists(fileset.nameIdentifier(), Entity.EntityType.FILESET)); + assertTrue(backend.exists(anotherFileset.nameIdentifier(), Entity.EntityType.FILESET)); + + assertFalse(backend.exists(table.nameIdentifier(), Entity.EntityType.TABLE)); + assertFalse(backend.exists(topic.nameIdentifier(), Entity.EntityType.TOPIC)); + + // check legacy record after soft delete + assertTrue(legacyRecordExistsInDB(metalake.id(), Entity.EntityType.METALAKE)); + assertTrue(legacyRecordExistsInDB(catalog.id(), Entity.EntityType.CATALOG)); + assertTrue(legacyRecordExistsInDB(schema.id(), Entity.EntityType.SCHEMA)); + assertTrue(legacyRecordExistsInDB(table.id(), Entity.EntityType.TABLE)); + assertTrue(legacyRecordExistsInDB(topic.id(), Entity.EntityType.TOPIC)); + assertTrue(legacyRecordExistsInDB(fileset.id(), Entity.EntityType.FILESET)); + assertEquals(2, listFilesetVersions(fileset.id()).size()); + assertEquals(3, listFilesetVersions(anotherFileset.id()).size()); + + // meta data hard delete + backend.hardDeleteLegacyData(Instant.now().toEpochMilli() + 1000); + assertFalse(legacyRecordExistsInDB(metalake.id(), Entity.EntityType.METALAKE)); + assertFalse(legacyRecordExistsInDB(catalog.id(), Entity.EntityType.CATALOG)); + assertFalse(legacyRecordExistsInDB(schema.id(), Entity.EntityType.SCHEMA)); + assertFalse(legacyRecordExistsInDB(table.id(), Entity.EntityType.TABLE)); + assertFalse(legacyRecordExistsInDB(fileset.id(), Entity.EntityType.FILESET)); + assertFalse(legacyRecordExistsInDB(topic.id(), Entity.EntityType.TOPIC)); + assertEquals(0, listFilesetVersions(fileset.id()).size()); + + // hard delete for old version fileset + assertEquals(3, listFilesetVersions(anotherFileset.id()).size()); + backend.hardDeleteOldVersionData(1); + assertEquals(1, listFilesetVersions(anotherFileset.id()).size()); + } + + private boolean legacyRecordExistsInDB(Long id, Entity.EntityType entityType) { + String tableName; + String idColumnName; + + switch (entityType) { + case METALAKE: + tableName = "metalake_meta"; + idColumnName = "metalake_id"; + break; + case CATALOG: + tableName = "catalog_meta"; + idColumnName = "catalog_id"; + break; + case SCHEMA: + tableName = "schema_meta"; + idColumnName = "schema_id"; + break; + case TABLE: + tableName = "table_meta"; + idColumnName = "table_id"; + break; + case FILESET: + tableName = "fileset_meta"; + idColumnName = "fileset_id"; + break; + case TOPIC: + tableName = "topic_meta"; + idColumnName = "topic_id"; + break; + default: + throw new IllegalArgumentException("Unsupported entity type: " + entityType); + } + + try (SqlSession sqlSession = + SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true); + Connection connection = sqlSession.getConnection(); + Statement statement = connection.createStatement(); + ResultSet rs = + statement.executeQuery( + String.format( + "SELECT * FROM %s WHERE %s = %d AND deleted_at != 0", + tableName, idColumnName, id))) { + return rs.next(); + } catch (SQLException e) { + throw new RuntimeException("SQL execution failed", e); + } + } + + private List listFilesetVersions(Long filesetId) { + List versions = new ArrayList<>(); + try (SqlSession sqlSession = + SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true); + Connection connection = sqlSession.getConnection(); + Statement statement = connection.createStatement(); + ResultSet rs = + statement.executeQuery( + String.format( + "SELECT version FROM fileset_version_info WHERE fileset_id = %d", filesetId))) { + while (rs.next()) { + versions.add(rs.getInt("version")); + } + } catch (SQLException e) { + throw new RuntimeException("SQL execution failed", e); + } + return versions; + } + public static BaseMetalake createBaseMakeLake(Long id, String name, AuditInfo auditInfo) { return BaseMetalake.builder() .withId(id) @@ -450,7 +696,7 @@ public static FilesetEntity createFilesetEntity( .withFilesetType(Fileset.Type.MANAGED) .withStorageLocation("/tmp") .withComment("") - .withProperties(null) + .withProperties(new HashMap<>()) .withAuditInfo(auditInfo) .build(); } From b95b6e9a643f404f1bffc8699580224d041649f0 Mon Sep 17 00:00:00 2001 From: YxAc Date: Thu, 18 Apr 2024 18:11:59 +0800 Subject: [PATCH 08/14] update --- .../relational/TestRelationalGarbageCollector.java | 8 -------- 1 file changed, 8 deletions(-) delete mode 100644 core/src/test/java/com/datastrato/gravitino/storage/relational/TestRelationalGarbageCollector.java diff --git a/core/src/test/java/com/datastrato/gravitino/storage/relational/TestRelationalGarbageCollector.java b/core/src/test/java/com/datastrato/gravitino/storage/relational/TestRelationalGarbageCollector.java deleted file mode 100644 index 7590244cec3..00000000000 --- a/core/src/test/java/com/datastrato/gravitino/storage/relational/TestRelationalGarbageCollector.java +++ /dev/null @@ -1,8 +0,0 @@ -/* - * Copyright 2024 Datastrato Pvt Ltd. - * This software is licensed under the Apache License version 2. - */ - -package com.datastrato.gravitino.storage.relational; - -public class TestRelationalGarbageCollector {} From b58f1a1c6dd73eb490d8e65e5fcd4ae88978ba78 Mon Sep 17 00:00:00 2001 From: YxAc Date: Thu, 18 Apr 2024 19:52:40 +0800 Subject: [PATCH 09/14] update ut --- .../java/com/datastrato/gravitino/storage/TestEntityStorage.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/java/com/datastrato/gravitino/storage/TestEntityStorage.java b/core/src/test/java/com/datastrato/gravitino/storage/TestEntityStorage.java index 3bd7cf8a70d..7e2befbaa88 100644 --- a/core/src/test/java/com/datastrato/gravitino/storage/TestEntityStorage.java +++ b/core/src/test/java/com/datastrato/gravitino/storage/TestEntityStorage.java @@ -114,6 +114,7 @@ private void init(String type, Config config) { Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_USER)).thenReturn("root"); Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_PASSWORD)).thenReturn("123"); Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER)).thenReturn("org.h2.Driver"); + Mockito.when(config.get(STORE_DELETE_AFTER_TIME)).thenReturn(20 * 60 * 1000L); } else { throw new UnsupportedOperationException("Unsupported entity store type: " + type); } From 01acfb6432170a1ae5c1dde23bd34c2e750e9bd1 Mon Sep 17 00:00:00 2001 From: YxAc Date: Thu, 18 Apr 2024 21:36:30 +0800 Subject: [PATCH 10/14] refine by review comment --- .../storage/relational/JDBCBackend.java | 171 +++++++++--------- .../storage/relational/RelationalBackend.java | 8 +- .../RelationalGarbageCollector.java | 23 ++- .../relational/mapper/CatalogMetaMapper.java | 2 +- .../relational/mapper/FilesetMetaMapper.java | 2 +- .../mapper/FilesetVersionMapper.java | 2 +- .../relational/mapper/MetalakeMetaMapper.java | 2 +- .../relational/mapper/SchemaMetaMapper.java | 2 +- .../relational/mapper/TableMetaMapper.java | 2 +- .../service/CatalogMetaService.java | 6 +- .../service/FilesetMetaService.java | 25 +-- .../service/MetalakeMetaService.java | 6 +- .../relational/service/SchemaMetaService.java | 6 +- .../relational/service/TableMetaService.java | 6 +- .../relational/service/TopicMetaService.java | 6 +- .../storage/relational/TestJDBCBackend.java | 8 +- 16 files changed, 148 insertions(+), 129 deletions(-) diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java index 5dbcace396c..831d2eda86a 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java @@ -175,108 +175,101 @@ public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolea } @Override - public void hardDeleteLegacyData(long legacyTimeLine) { + public int hardDeleteLegacyData(Entity.EntityType entityType, long legacyTimeLine) { LOG.info( - "Try to physically delete legacy data that has been marked deleted before {}", + "Try to physically delete {} legacy data that has been marked deleted before {}", + entityType, legacyTimeLine); - for (Entity.EntityType entityType : Entity.EntityType.values()) { - switch (entityType) { - case METALAKE: - MetalakeMetaService.getInstance() - .deleteMetalakeMetasByLegacyTimeLine( - legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); - break; - case CATALOG: - CatalogMetaService.getInstance() - .deleteCatalogMetasByLegacyTimeLine( - legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); - break; - case SCHEMA: - SchemaMetaService.getInstance() - .deleteSchemaMetasByLegacyTimeLine( - legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); - break; - case TABLE: - TableMetaService.getInstance() - .deleteTableMetasByLegacyTimeLine( - legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); - break; - case FILESET: - FilesetMetaService.getInstance() - .deleteFilesetAndVersionMetasByLegacyTimeLine( - legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); - break; - case TOPIC: - TopicMetaService.getInstance() - .deleteTopicMetasByLegacyTimeLine( - legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); - break; + switch (entityType) { + case METALAKE: + return MetalakeMetaService.getInstance() + .deleteMetalakeMetasByLegacyTimeLine( + legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); + case CATALOG: + return CatalogMetaService.getInstance() + .deleteCatalogMetasByLegacyTimeLine( + legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); + case SCHEMA: + return SchemaMetaService.getInstance() + .deleteSchemaMetasByLegacyTimeLine( + legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); + case TABLE: + return TableMetaService.getInstance() + .deleteTableMetasByLegacyTimeLine( + legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); + case FILESET: + return FilesetMetaService.getInstance() + .deleteFilesetAndVersionMetasByLegacyTimeLine( + legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); + case TOPIC: + return TopicMetaService.getInstance() + .deleteTopicMetasByLegacyTimeLine( + legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); - case COLUMN: - case USER: - case GROUP: - case AUDIT: - case ROLE: - continue; - // TODO: Implement hard delete logic for these entity types. + case COLUMN: + case USER: + case GROUP: + case AUDIT: + case ROLE: + return 0; + // TODO: Implement hard delete logic for these entity types. - default: - throw new IllegalArgumentException( - "Unsupported entity type when collectAndRemoveLegacyData: " + entityType); - } + default: + throw new IllegalArgumentException( + "Unsupported entity type when collectAndRemoveLegacyData: " + entityType); } } @Override - public void hardDeleteOldVersionData(long versionRetentionCount) { - for (Entity.EntityType entityType : Entity.EntityType.values()) { - switch (entityType) { - case METALAKE: - case CATALOG: - case SCHEMA: - case TABLE: - case COLUMN: - case TOPIC: - case USER: - case GROUP: - case AUDIT: - case ROLE: - // These entity types have not implemented multi-versions, so we can skip. - continue; + public int hardDeleteOldVersionData(Entity.EntityType entityType, long versionRetentionCount) { + switch (entityType) { + case METALAKE: + case CATALOG: + case SCHEMA: + case TABLE: + case COLUMN: + case TOPIC: + case USER: + case GROUP: + case AUDIT: + case ROLE: + // These entity types have not implemented multi-versions, so we can skip. + return 0; - case FILESET: - // Get the current version of all filesets. - List filesetCurVersions = - FilesetMetaService.getInstance() - .getFilesetVersionPOsByRetentionCount(versionRetentionCount); + case FILESET: + // Get the current version of all filesets. + List filesetCurVersions = + FilesetMetaService.getInstance() + .getFilesetVersionPOsByRetentionCount(versionRetentionCount); - // Delete old versions that are older than or equal to (currentVersion - - // versionRetentionCount). - for (FilesetVersionPO filesetVersionPO : filesetCurVersions) { - long versionRetentionLine = - filesetVersionPO.getVersion().longValue() - versionRetentionCount; - int deletedCount = - FilesetMetaService.getInstance() - .deleteFilesetVersionsByRetentionLine( - filesetVersionPO.getFilesetId(), - versionRetentionLine, - GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); + // Delete old versions that are older than or equal to (currentVersion - + // versionRetentionCount). + int totalDeletedCount = 0; + for (FilesetVersionPO filesetVersionPO : filesetCurVersions) { + long versionRetentionLine = + filesetVersionPO.getVersion().longValue() - versionRetentionCount; + int deletedCount = + FilesetMetaService.getInstance() + .deleteFilesetVersionsByRetentionLine( + filesetVersionPO.getFilesetId(), + versionRetentionLine, + GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); + totalDeletedCount += deletedCount; - // Log the deletion by current fileset version. - LOG.info( - "Physically delete filesetVersions count: {} which versions are older than or equal to" - + " versionRetentionLine: {}, the current FilesetVersion is: {}.", - deletedCount, - versionRetentionLine, - filesetVersionPO); - } - break; + // Log the deletion by current fileset version. + LOG.info( + "Physically delete filesetVersions count: {} which versions are older than or equal to" + + " versionRetentionLine: {}, the current FilesetVersion is: {}.", + deletedCount, + versionRetentionLine, + filesetVersionPO); + } + return totalDeletedCount; - default: - throw new IllegalArgumentException( - "Unsupported entity type when collectAndRemoveOldVersionData: " + entityType); - } + default: + throw new IllegalArgumentException( + "Unsupported entity type when collectAndRemoveOldVersionData: " + entityType); } } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalBackend.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalBackend.java index 67fd0d49876..988dfc73838 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalBackend.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalBackend.java @@ -97,15 +97,19 @@ E get(NameIdentifier ident, Entity.EntityType * Permanent deletes the legacy data that has been marked as deleted before the given legacy * timeline. * + * @param entityType The type of the entity. * @param legacyTimeLine The time before which the data has been marked as deleted. + * @return The count of the deleted data. */ - void hardDeleteLegacyData(long legacyTimeLine); + int hardDeleteLegacyData(Entity.EntityType entityType, long legacyTimeLine); /** * Permanent deletes the old version data that is older than or equal to the given version * retention count. * + * @param entityType The type of the entity. * @param versionRetentionCount The count of versions to retain. + * @return The count of the deleted data. */ - void hardDeleteOldVersionData(long versionRetentionCount); + int hardDeleteOldVersionData(Entity.EntityType entityType, long versionRetentionCount); } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java index 25d66f19043..4affc8f5b0a 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java @@ -9,6 +9,7 @@ import static com.datastrato.gravitino.Configs.VERSION_RETENTION_COUNT; import com.datastrato.gravitino.Config; +import com.datastrato.gravitino.Entity; import com.google.common.annotations.VisibleForTesting; import java.io.Closeable; import java.io.IOException; @@ -25,6 +26,8 @@ public final class RelationalGarbageCollector implements Closeable { private final RelationalBackend backend; private final Config config; + private final long storeDeleteAfterTimeMillis; + private final long versionRetentionCount; @VisibleForTesting final ScheduledExecutorService garbageCollectorPool = @@ -40,10 +43,12 @@ public final class RelationalGarbageCollector implements Closeable { public RelationalGarbageCollector(RelationalBackend backend, Config config) { this.backend = backend; this.config = config; + storeDeleteAfterTimeMillis = this.config.get(STORE_DELETE_AFTER_TIME); + versionRetentionCount = this.config.get(VERSION_RETENTION_COUNT); } public void start() { - long dateTimeLineMinute = config.get(STORE_DELETE_AFTER_TIME) / 1000 / 60; + long dateTimeLineMinute = storeDeleteAfterTimeMillis / 1000 / 60; // We will collect garbage every 10 minutes at least. If the dateTimeLineMinute is larger than // 100 minutes, we would collect garbage every dateTimeLineMinute/10 minutes. @@ -57,11 +62,21 @@ private void collectAndClean() { try { LOG.info("Start to collect and delete legacy data by thread {}", threadId); - long legacyTimeLine = System.currentTimeMillis() - config.get(STORE_DELETE_AFTER_TIME); - backend.hardDeleteLegacyData(legacyTimeLine); + long legacyTimeLine = System.currentTimeMillis() - storeDeleteAfterTimeMillis; + for (Entity.EntityType entityType : Entity.EntityType.values()) { + long deletedCount = Long.MAX_VALUE; + while (deletedCount > 0) { + deletedCount = backend.hardDeleteLegacyData(entityType, legacyTimeLine); + } + } LOG.info("Start to collect and delete old version data by thread {}", threadId); - backend.hardDeleteOldVersionData(config.get(VERSION_RETENTION_COUNT)); + for (Entity.EntityType entityType : Entity.EntityType.values()) { + long deletedCount = Long.MAX_VALUE; + while (deletedCount > 0) { + deletedCount = backend.hardDeleteOldVersionData(entityType, versionRetentionCount); + } + } } catch (Exception e) { LOG.error("Thread {} failed to collect and clean garbage.", threadId, e); } finally { diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/CatalogMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/CatalogMetaMapper.java index 0682c43db9b..5e40deb39b1 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/CatalogMetaMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/CatalogMetaMapper.java @@ -152,7 +152,7 @@ Integer updateCatalogMeta( @Delete( "DELETE FROM " + TABLE_NAME - + " WHERE deleted_at != 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}") + + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}") Integer deleteCatalogMetasByLegacyTimeLine( @Param("legacyTimeLine") Long legacyTimeLine, @Param("limit") int limit); } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetMetaMapper.java index 956b566066a..ee87daa43fd 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetMetaMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetMetaMapper.java @@ -219,7 +219,7 @@ Integer updateFilesetMeta( @Delete( "DELETE FROM " + META_TABLE_NAME - + " WHERE deleted_at != 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}") + + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}") Integer deleteFilesetMetasByLegacyTimeLine( @Param("legacyTimeLine") Long legacyTimeLine, @Param("limit") int limit); } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java index 0e467b9cf69..ba30e4ce598 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java @@ -104,7 +104,7 @@ void insertFilesetVersionOnDuplicateKeyUpdate( @Delete( "DELETE FROM " + VERSION_TABLE_NAME - + " WHERE deleted_at != 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}") + + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}") Integer deleteFilesetVersionsByLegacyTimeLine( @Param("legacyTimeLine") Long legacyTimeLine, @Param("limit") int limit); diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/MetalakeMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/MetalakeMetaMapper.java index 7bad3386e0d..29b87d64b50 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/MetalakeMetaMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/MetalakeMetaMapper.java @@ -130,7 +130,7 @@ Integer updateMetalakeMeta( @Delete( "DELETE FROM " + TABLE_NAME - + " WHERE deleted_at != 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}") + + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}") Integer deleteMetalakeMetasByLegacyTimeLine( @Param("legacyTimeLine") Long legacyTimeLine, @Param("limit") int limit); } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/SchemaMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/SchemaMetaMapper.java index 597e2ce9ba2..8a27a440ab4 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/SchemaMetaMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/SchemaMetaMapper.java @@ -154,7 +154,7 @@ Integer updateSchemaMeta( @Delete( "DELETE FROM " + TABLE_NAME - + " WHERE deleted_at != 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}") + + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}") Integer deleteSchemaMetasByLegacyTimeLine( @Param("legacyTimeLine") Long legacyTimeLine, @Param("limit") int limit); } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TableMetaMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TableMetaMapper.java index 8a284d70741..aeeaa1eac14 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TableMetaMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/TableMetaMapper.java @@ -156,7 +156,7 @@ Integer updateTableMeta( @Delete( "DELETE FROM " + TABLE_NAME - + " WHERE deleted_at != 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}") + + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}") Integer deleteTableMetasByLegacyTimeLine( @Param("legacyTimeLine") Long legacyTimeLine, @Param("limit") int limit); } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/CatalogMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/CatalogMetaService.java index dca26b21c44..9264fac3464 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/CatalogMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/CatalogMetaService.java @@ -210,11 +210,11 @@ public boolean deleteCatalog(NameIdentifier identifier, boolean cascade) { return true; } - public void deleteCatalogMetasByLegacyTimeLine(Long legacyTimeLine, int limit) { - SessionUtils.doWithCommit( + public int deleteCatalogMetasByLegacyTimeLine(Long legacyTimeLine, int limit) { + return SessionUtils.doWithCommitAndFetchResult( CatalogMetaMapper.class, mapper -> { - mapper.deleteCatalogMetasByLegacyTimeLine(legacyTimeLine, limit); + return mapper.deleteCatalogMetasByLegacyTimeLine(legacyTimeLine, limit); }); } } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java index 32f95afe085..268e3595254 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java @@ -220,17 +220,20 @@ public boolean deleteFileset(NameIdentifier identifier) { return true; } - public void deleteFilesetAndVersionMetasByLegacyTimeLine(Long legacyTimeLine, int limit) { - SessionUtils.doWithCommit( - FilesetMetaMapper.class, - mapper -> { - mapper.deleteFilesetMetasByLegacyTimeLine(legacyTimeLine, limit); - }); - SessionUtils.doWithCommit( - FilesetVersionMapper.class, - mapper -> { - mapper.deleteFilesetVersionsByLegacyTimeLine(legacyTimeLine, limit); - }); + public int deleteFilesetAndVersionMetasByLegacyTimeLine(Long legacyTimeLine, int limit) { + int filesetDeletedCount = + SessionUtils.doWithCommitAndFetchResult( + FilesetMetaMapper.class, + mapper -> { + return mapper.deleteFilesetMetasByLegacyTimeLine(legacyTimeLine, limit); + }); + int filesetVersionDeletedCount = + SessionUtils.doWithCommitAndFetchResult( + FilesetVersionMapper.class, + mapper -> { + return mapper.deleteFilesetVersionsByLegacyTimeLine(legacyTimeLine, limit); + }); + return filesetDeletedCount + filesetVersionDeletedCount; } public Integer deleteFilesetVersionsByRetentionLine( diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java index eb35ec49e00..558c7568ea4 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/MetalakeMetaService.java @@ -187,11 +187,11 @@ public boolean deleteMetalake(NameIdentifier ident, boolean cascade) { return true; } - public void deleteMetalakeMetasByLegacyTimeLine(Long legacyTimeLine, int limit) { - SessionUtils.doWithCommit( + public int deleteMetalakeMetasByLegacyTimeLine(Long legacyTimeLine, int limit) { + return SessionUtils.doWithCommitAndFetchResult( MetalakeMetaMapper.class, mapper -> { - mapper.deleteMetalakeMetasByLegacyTimeLine(legacyTimeLine, limit); + return mapper.deleteMetalakeMetasByLegacyTimeLine(legacyTimeLine, limit); }); } } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/SchemaMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/SchemaMetaService.java index 8c852bb978d..a5ae1063880 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/SchemaMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/SchemaMetaService.java @@ -215,11 +215,11 @@ public boolean deleteSchema(NameIdentifier identifier, boolean cascade) { return true; } - public void deleteSchemaMetasByLegacyTimeLine(Long legacyTimeLine, int limit) { - SessionUtils.doWithCommit( + public int deleteSchemaMetasByLegacyTimeLine(Long legacyTimeLine, int limit) { + return SessionUtils.doWithCommitAndFetchResult( SchemaMetaMapper.class, mapper -> { - mapper.deleteSchemaMetasByLegacyTimeLine(legacyTimeLine, limit); + return mapper.deleteSchemaMetasByLegacyTimeLine(legacyTimeLine, limit); }); } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TableMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TableMetaService.java index 5279d7abacc..f71bbc41b10 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TableMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TableMetaService.java @@ -163,11 +163,11 @@ public boolean deleteTable(NameIdentifier identifier) { return true; } - public void deleteTableMetasByLegacyTimeLine(Long legacyTimeLine, int limit) { - SessionUtils.doWithCommit( + public int deleteTableMetasByLegacyTimeLine(Long legacyTimeLine, int limit) { + return SessionUtils.doWithCommitAndFetchResult( TableMetaMapper.class, mapper -> { - mapper.deleteTableMetasByLegacyTimeLine(legacyTimeLine, limit); + return mapper.deleteTableMetasByLegacyTimeLine(legacyTimeLine, limit); }); } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TopicMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TopicMetaService.java index 36f5eddd3e8..06042a9659f 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TopicMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/TopicMetaService.java @@ -176,11 +176,11 @@ public boolean deleteTopic(NameIdentifier identifier) { return true; } - public void deleteTopicMetasByLegacyTimeLine(Long legacyTimeLine, int limit) { - SessionUtils.doWithCommit( + public int deleteTopicMetasByLegacyTimeLine(Long legacyTimeLine, int limit) { + return SessionUtils.doWithCommitAndFetchResult( TopicMetaMapper.class, mapper -> { - mapper.deleteTopicMetasByLegacyTimeLine(legacyTimeLine, limit); + return mapper.deleteTopicMetasByLegacyTimeLine(legacyTimeLine, limit); }); } diff --git a/core/src/test/java/com/datastrato/gravitino/storage/relational/TestJDBCBackend.java b/core/src/test/java/com/datastrato/gravitino/storage/relational/TestJDBCBackend.java index 249d6b74842..f279cfdda62 100644 --- a/core/src/test/java/com/datastrato/gravitino/storage/relational/TestJDBCBackend.java +++ b/core/src/test/java/com/datastrato/gravitino/storage/relational/TestJDBCBackend.java @@ -558,7 +558,9 @@ public void testMetaLifeCycleFromCreationToDeletion() throws IOException { assertEquals(3, listFilesetVersions(anotherFileset.id()).size()); // meta data hard delete - backend.hardDeleteLegacyData(Instant.now().toEpochMilli() + 1000); + for (Entity.EntityType entityType : Entity.EntityType.values()) { + backend.hardDeleteLegacyData(entityType, Instant.now().toEpochMilli() + 1000); + } assertFalse(legacyRecordExistsInDB(metalake.id(), Entity.EntityType.METALAKE)); assertFalse(legacyRecordExistsInDB(catalog.id(), Entity.EntityType.CATALOG)); assertFalse(legacyRecordExistsInDB(schema.id(), Entity.EntityType.SCHEMA)); @@ -569,7 +571,9 @@ public void testMetaLifeCycleFromCreationToDeletion() throws IOException { // hard delete for old version fileset assertEquals(3, listFilesetVersions(anotherFileset.id()).size()); - backend.hardDeleteOldVersionData(1); + for (Entity.EntityType entityType : Entity.EntityType.values()) { + backend.hardDeleteOldVersionData(entityType, 1); + } assertEquals(1, listFilesetVersions(anotherFileset.id()).size()); } From 4032b6524f0fc3f2dabbfc79be9feefbb890a1c7 Mon Sep 17 00:00:00 2001 From: YxAc Date: Thu, 18 Apr 2024 22:13:55 +0800 Subject: [PATCH 11/14] refine the hard delete to be simpler --- .../storage/relational/JDBCBackend.java | 32 ++----------- .../RelationalGarbageCollector.java | 6 +-- .../service/FilesetMetaService.java | 45 ++++++++++++++----- .../gravitino/storage/TestEntityStorage.java | 3 ++ 4 files changed, 41 insertions(+), 45 deletions(-) diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java index 831d2eda86a..17cf6eaac2a 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java @@ -24,7 +24,6 @@ import com.datastrato.gravitino.meta.TableEntity; import com.datastrato.gravitino.meta.TopicEntity; import com.datastrato.gravitino.storage.relational.converters.SQLExceptionConverterFactory; -import com.datastrato.gravitino.storage.relational.po.FilesetVersionPO; import com.datastrato.gravitino.storage.relational.service.CatalogMetaService; import com.datastrato.gravitino.storage.relational.service.FilesetMetaService; import com.datastrato.gravitino.storage.relational.service.MetalakeMetaService; @@ -238,34 +237,9 @@ public int hardDeleteOldVersionData(Entity.EntityType entityType, long versionRe return 0; case FILESET: - // Get the current version of all filesets. - List filesetCurVersions = - FilesetMetaService.getInstance() - .getFilesetVersionPOsByRetentionCount(versionRetentionCount); - - // Delete old versions that are older than or equal to (currentVersion - - // versionRetentionCount). - int totalDeletedCount = 0; - for (FilesetVersionPO filesetVersionPO : filesetCurVersions) { - long versionRetentionLine = - filesetVersionPO.getVersion().longValue() - versionRetentionCount; - int deletedCount = - FilesetMetaService.getInstance() - .deleteFilesetVersionsByRetentionLine( - filesetVersionPO.getFilesetId(), - versionRetentionLine, - GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); - totalDeletedCount += deletedCount; - - // Log the deletion by current fileset version. - LOG.info( - "Physically delete filesetVersions count: {} which versions are older than or equal to" - + " versionRetentionLine: {}, the current FilesetVersion is: {}.", - deletedCount, - versionRetentionLine, - filesetVersionPO); - } - return totalDeletedCount; + return FilesetMetaService.getInstance() + .deleteFilesetVersionsByRetentionCount( + versionRetentionCount, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT); default: throw new IllegalArgumentException( diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java index 4affc8f5b0a..828a279cc09 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java @@ -25,7 +25,6 @@ public final class RelationalGarbageCollector implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(RelationalGarbageCollector.class); private final RelationalBackend backend; - private final Config config; private final long storeDeleteAfterTimeMillis; private final long versionRetentionCount; @@ -42,9 +41,8 @@ public final class RelationalGarbageCollector implements Closeable { public RelationalGarbageCollector(RelationalBackend backend, Config config) { this.backend = backend; - this.config = config; - storeDeleteAfterTimeMillis = this.config.get(STORE_DELETE_AFTER_TIME); - versionRetentionCount = this.config.get(VERSION_RETENTION_COUNT); + storeDeleteAfterTimeMillis = config.get(STORE_DELETE_AFTER_TIME); + versionRetentionCount = config.get(VERSION_RETENTION_COUNT); } public void start() { diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java index 268e3595254..6e2057e239d 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java @@ -22,6 +22,8 @@ import java.util.List; import java.util.Objects; import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The service class for fileset metadata and version info. It provides the basic database @@ -30,6 +32,8 @@ public class FilesetMetaService { private static final FilesetMetaService INSTANCE = new FilesetMetaService(); + private static final Logger LOG = LoggerFactory.getLogger(FilesetMetaService.class); + public static FilesetMetaService getInstance() { return INSTANCE; } @@ -51,12 +55,6 @@ public FilesetPO getFilesetPOBySchemaIdAndName(Long schemaId, String filesetName return filesetPO; } - public List getFilesetVersionPOsByRetentionCount(Long versionRetentionCount) { - return SessionUtils.getWithoutCommit( - FilesetVersionMapper.class, - mapper -> mapper.selectFilesetVersionsByRetentionCount(versionRetentionCount)); - } - public Long getFilesetIdBySchemaIdAndName(Long schemaId, String filesetName) { Long filesetId = SessionUtils.getWithoutCommit( @@ -236,12 +234,35 @@ public int deleteFilesetAndVersionMetasByLegacyTimeLine(Long legacyTimeLine, int return filesetDeletedCount + filesetVersionDeletedCount; } - public Integer deleteFilesetVersionsByRetentionLine( - Long filesetId, Long versionRetentionLine, int limit) { - return SessionUtils.doWithCommitAndFetchResult( - FilesetVersionMapper.class, - mapper -> - mapper.deleteFilesetVersionsByRetentionLine(filesetId, versionRetentionLine, limit)); + public Integer deleteFilesetVersionsByRetentionCount(Long versionRetentionCount, int limit) { + // Get the current version of all filesets. + List filesetCurVersions = + SessionUtils.getWithoutCommit( + FilesetVersionMapper.class, + mapper -> mapper.selectFilesetVersionsByRetentionCount(versionRetentionCount)); + + // Delete old versions that are older than or equal to (currentVersion - + // versionRetentionCount). + int totalDeletedCount = 0; + for (FilesetVersionPO filesetVersionPO : filesetCurVersions) { + long versionRetentionLine = filesetVersionPO.getVersion().longValue() - versionRetentionCount; + int deletedCount = + SessionUtils.doWithCommitAndFetchResult( + FilesetVersionMapper.class, + mapper -> + mapper.deleteFilesetVersionsByRetentionLine( + filesetVersionPO.getFilesetId(), versionRetentionLine, limit)); + totalDeletedCount += deletedCount; + + // Log the deletion by current fileset version. + LOG.info( + "Physically delete filesetVersions count: {} which versions are older than or equal to" + + " versionRetentionLine: {}, the current FilesetVersion is: {}.", + deletedCount, + versionRetentionLine, + filesetVersionPO); + } + return totalDeletedCount; } private void fillFilesetPOBuilderParentEntityId(FilesetPO.Builder builder, Namespace namespace) { diff --git a/core/src/test/java/com/datastrato/gravitino/storage/TestEntityStorage.java b/core/src/test/java/com/datastrato/gravitino/storage/TestEntityStorage.java index 7e2befbaa88..239c7513530 100644 --- a/core/src/test/java/com/datastrato/gravitino/storage/TestEntityStorage.java +++ b/core/src/test/java/com/datastrato/gravitino/storage/TestEntityStorage.java @@ -18,6 +18,7 @@ import static com.datastrato.gravitino.Configs.RELATIONAL_ENTITY_STORE; import static com.datastrato.gravitino.Configs.STORE_DELETE_AFTER_TIME; import static com.datastrato.gravitino.Configs.STORE_TRANSACTION_MAX_SKEW_TIME; +import static com.datastrato.gravitino.Configs.VERSION_RETENTION_COUNT; import com.datastrato.gravitino.Catalog; import com.datastrato.gravitino.Config; @@ -101,6 +102,7 @@ private void init(String type, Config config) { Assertions.assertEquals(KV_STORE_PATH, config.get(ENTRY_KV_ROCKSDB_BACKEND_PATH)); Mockito.when(config.get(STORE_TRANSACTION_MAX_SKEW_TIME)).thenReturn(1000L); Mockito.when(config.get(STORE_DELETE_AFTER_TIME)).thenReturn(20 * 60 * 1000L); + Mockito.when(config.get(VERSION_RETENTION_COUNT)).thenReturn(1L); } else if (type.equals(Configs.RELATIONAL_ENTITY_STORE)) { File dir = new File(DB_DIR); if (dir.exists() || !dir.isDirectory()) { @@ -115,6 +117,7 @@ private void init(String type, Config config) { Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_PASSWORD)).thenReturn("123"); Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER)).thenReturn("org.h2.Driver"); Mockito.when(config.get(STORE_DELETE_AFTER_TIME)).thenReturn(20 * 60 * 1000L); + Mockito.when(config.get(VERSION_RETENTION_COUNT)).thenReturn(1L); } else { throw new UnsupportedOperationException("Unsupported entity store type: " + type); } From 93e9e0b73aeaddf0fce944fa8e525ed91756d49f Mon Sep 17 00:00:00 2001 From: YxAc Date: Fri, 19 Apr 2024 16:48:59 +0800 Subject: [PATCH 12/14] firstly soft delete for old version data --- .../storage/relational/JDBCBackend.java | 2 +- .../storage/relational/RelationalBackend.java | 8 +++---- .../RelationalGarbageCollector.java | 2 +- .../mapper/FilesetVersionMapper.java | 7 +++--- .../service/FilesetMetaService.java | 12 +++++----- .../storage/relational/TestJDBCBackend.java | 23 +++++++++++++------ 6 files changed, 32 insertions(+), 22 deletions(-) diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java index 17cf6eaac2a..aefeff0f269 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java @@ -221,7 +221,7 @@ public int hardDeleteLegacyData(Entity.EntityType entityType, long legacyTimeLin } @Override - public int hardDeleteOldVersionData(Entity.EntityType entityType, long versionRetentionCount) { + public int deleteOldVersionData(Entity.EntityType entityType, long versionRetentionCount) { switch (entityType) { case METALAKE: case CATALOG: diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalBackend.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalBackend.java index 988dfc73838..17e77533e42 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalBackend.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalBackend.java @@ -94,7 +94,7 @@ E get(NameIdentifier ident, Entity.EntityType boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolean cascade); /** - * Permanent deletes the legacy data that has been marked as deleted before the given legacy + * Permanently deletes the legacy data that has been marked as deleted before the given legacy * timeline. * * @param entityType The type of the entity. @@ -104,12 +104,12 @@ E get(NameIdentifier ident, Entity.EntityType int hardDeleteLegacyData(Entity.EntityType entityType, long legacyTimeLine); /** - * Permanent deletes the old version data that is older than or equal to the given version - * retention count. + * Soft deletes the old version data that is older than or equal to the given version retention + * count. * * @param entityType The type of the entity. * @param versionRetentionCount The count of versions to retain. * @return The count of the deleted data. */ - int hardDeleteOldVersionData(Entity.EntityType entityType, long versionRetentionCount); + int deleteOldVersionData(Entity.EntityType entityType, long versionRetentionCount); } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java index 828a279cc09..3cc1a852782 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java @@ -72,7 +72,7 @@ private void collectAndClean() { for (Entity.EntityType entityType : Entity.EntityType.values()) { long deletedCount = Long.MAX_VALUE; while (deletedCount > 0) { - deletedCount = backend.hardDeleteOldVersionData(entityType, versionRetentionCount); + deletedCount = backend.deleteOldVersionData(entityType, versionRetentionCount); } } } catch (Exception e) { diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java index ba30e4ce598..ee500f155a3 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java @@ -120,11 +120,12 @@ Integer deleteFilesetVersionsByLegacyTimeLine( List selectFilesetVersionsByRetentionCount( @Param("versionRetentionCount") Long versionRetentionCount); - @Delete( - "DELETE FROM " + @Update( + "UPDATE " + VERSION_TABLE_NAME + + " SET deleted_at = UNIX_TIMESTAMP(CURRENT_TIMESTAMP(3)) * 1000.0" + " WHERE fileset_id = #{filesetId} AND version <= #{versionRetentionLine} AND deleted_at = 0 LIMIT #{limit}") - Integer deleteFilesetVersionsByRetentionLine( + Integer softDeleteFilesetVersionsByRetentionLine( @Param("filesetId") Long filesetId, @Param("versionRetentionLine") Long versionRetentionLine, @Param("limit") int limit); diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java index 6e2057e239d..21c47d2069e 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java @@ -234,14 +234,14 @@ public int deleteFilesetAndVersionMetasByLegacyTimeLine(Long legacyTimeLine, int return filesetDeletedCount + filesetVersionDeletedCount; } - public Integer deleteFilesetVersionsByRetentionCount(Long versionRetentionCount, int limit) { - // Get the current version of all filesets. + public int deleteFilesetVersionsByRetentionCount(Long versionRetentionCount, int limit) { + // get the current version of all filesets. List filesetCurVersions = SessionUtils.getWithoutCommit( FilesetVersionMapper.class, mapper -> mapper.selectFilesetVersionsByRetentionCount(versionRetentionCount)); - // Delete old versions that are older than or equal to (currentVersion - + // soft delete old versions that are older than or equal to (currentVersion - // versionRetentionCount). int totalDeletedCount = 0; for (FilesetVersionPO filesetVersionPO : filesetCurVersions) { @@ -250,13 +250,13 @@ public Integer deleteFilesetVersionsByRetentionCount(Long versionRetentionCount, SessionUtils.doWithCommitAndFetchResult( FilesetVersionMapper.class, mapper -> - mapper.deleteFilesetVersionsByRetentionLine( + mapper.softDeleteFilesetVersionsByRetentionLine( filesetVersionPO.getFilesetId(), versionRetentionLine, limit)); totalDeletedCount += deletedCount; - // Log the deletion by current fileset version. + // log the deletion by current fileset version. LOG.info( - "Physically delete filesetVersions count: {} which versions are older than or equal to" + "Soft delete filesetVersions count: {} which versions are older than or equal to" + " versionRetentionLine: {}, the current FilesetVersion is: {}.", deletedCount, versionRetentionLine, diff --git a/core/src/test/java/com/datastrato/gravitino/storage/relational/TestJDBCBackend.java b/core/src/test/java/com/datastrato/gravitino/storage/relational/TestJDBCBackend.java index f279cfdda62..b8d7589fe73 100644 --- a/core/src/test/java/com/datastrato/gravitino/storage/relational/TestJDBCBackend.java +++ b/core/src/test/java/com/datastrato/gravitino/storage/relational/TestJDBCBackend.java @@ -46,6 +46,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.UUID; import org.apache.commons.io.IOUtils; @@ -569,11 +570,18 @@ public void testMetaLifeCycleFromCreationToDeletion() throws IOException { assertFalse(legacyRecordExistsInDB(topic.id(), Entity.EntityType.TOPIC)); assertEquals(0, listFilesetVersions(fileset.id()).size()); - // hard delete for old version fileset + // soft delete for old version fileset assertEquals(3, listFilesetVersions(anotherFileset.id()).size()); for (Entity.EntityType entityType : Entity.EntityType.values()) { - backend.hardDeleteOldVersionData(entityType, 1); + backend.deleteOldVersionData(entityType, 1); } + Map versionDeletedMap = listFilesetVersions(anotherFileset.id()); + assertEquals(3, versionDeletedMap.size()); + assertEquals(1, versionDeletedMap.values().stream().filter(value -> value == 0L).count()); + assertEquals(2, versionDeletedMap.values().stream().filter(value -> value != 0L).count()); + + // hard delete for old version fileset + backend.hardDeleteLegacyData(Entity.EntityType.FILESET, Instant.now().toEpochMilli() + 1000); assertEquals(1, listFilesetVersions(anotherFileset.id()).size()); } @@ -625,8 +633,8 @@ private boolean legacyRecordExistsInDB(Long id, Entity.EntityType entityType) { } } - private List listFilesetVersions(Long filesetId) { - List versions = new ArrayList<>(); + private Map listFilesetVersions(Long filesetId) { + Map versionDeletedTime = new HashMap<>(); try (SqlSession sqlSession = SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true); Connection connection = sqlSession.getConnection(); @@ -634,14 +642,15 @@ private List listFilesetVersions(Long filesetId) { ResultSet rs = statement.executeQuery( String.format( - "SELECT version FROM fileset_version_info WHERE fileset_id = %d", filesetId))) { + "SELECT version, deleted_at FROM fileset_version_info WHERE fileset_id = %d", + filesetId))) { while (rs.next()) { - versions.add(rs.getInt("version")); + versionDeletedTime.put(rs.getInt("version"), rs.getLong("deleted_at")); } } catch (SQLException e) { throw new RuntimeException("SQL execution failed", e); } - return versions; + return versionDeletedTime; } public static BaseMetalake createBaseMakeLake(Long id, String name, AuditInfo auditInfo) { From 668503ebd6bd4361447d4eace33eea269723ba55 Mon Sep 17 00:00:00 2001 From: YxAc Date: Fri, 19 Apr 2024 17:31:12 +0800 Subject: [PATCH 13/14] refine the sql query to reduce data returned --- .../mapper/FilesetVersionMapper.java | 7 +++--- .../relational/po/FilesetVersionPO.java | 24 ------------------- .../service/FilesetMetaService.java | 15 ++++++------ 3 files changed, 11 insertions(+), 35 deletions(-) diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java index ee500f155a3..f520a010b2e 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java @@ -7,6 +7,7 @@ import com.datastrato.gravitino.storage.relational.po.FilesetVersionPO; import java.util.List; +import org.apache.commons.lang3.tuple.Pair; import org.apache.ibatis.annotations.Delete; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Param; @@ -110,14 +111,12 @@ Integer deleteFilesetVersionsByLegacyTimeLine( @Select( "SELECT fileset_id as filesetId," - + " Max(version) as version, deleted_at as deletedAt," - + " metalake_id as metalakeId, catalog_id as catalogId, schema_id as schemaId," - + " storage_location as storageLocation" + + " Max(version) as version" + " FROM " + VERSION_TABLE_NAME + " WHERE version > #{versionRetentionCount} AND deleted_at = 0" + " GROUP BY fileset_id") - List selectFilesetVersionsByRetentionCount( + List> selectFilesetVersionsByRetentionCount( @Param("versionRetentionCount") Long versionRetentionCount); @Update( diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/po/FilesetVersionPO.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/po/FilesetVersionPO.java index e647e642b9d..ebfd7517ebe 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/po/FilesetVersionPO.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/po/FilesetVersionPO.java @@ -96,30 +96,6 @@ public int hashCode() { getDeletedAt()); } - @Override - public String toString() { - return new StringBuilder() - .append("FilesetVersionPO{id=") - .append(id) - .append(", metalakeId=") - .append(metalakeId) - .append(", catalogId=") - .append(catalogId) - .append(", schemaId=") - .append(schemaId) - .append(", filesetId=") - .append(filesetId) - .append(", version=") - .append(version) - .append(", storageLocation='") - .append(storageLocation) - .append('\'') - .append(", deletedAt=") - .append(deletedAt) - .append('}') - .toString(); - } - public static class Builder { private final FilesetVersionPO filesetVersionPO; diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java index 21c47d2069e..0df61dce30f 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java @@ -13,7 +13,6 @@ import com.datastrato.gravitino.storage.relational.mapper.FilesetMetaMapper; import com.datastrato.gravitino.storage.relational.mapper.FilesetVersionMapper; import com.datastrato.gravitino.storage.relational.po.FilesetPO; -import com.datastrato.gravitino.storage.relational.po.FilesetVersionPO; import com.datastrato.gravitino.storage.relational.utils.ExceptionUtils; import com.datastrato.gravitino.storage.relational.utils.POConverters; import com.datastrato.gravitino.storage.relational.utils.SessionUtils; @@ -22,6 +21,7 @@ import java.util.List; import java.util.Objects; import java.util.function.Function; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -236,7 +236,7 @@ public int deleteFilesetAndVersionMetasByLegacyTimeLine(Long legacyTimeLine, int public int deleteFilesetVersionsByRetentionCount(Long versionRetentionCount, int limit) { // get the current version of all filesets. - List filesetCurVersions = + List> filesetCurVersions = SessionUtils.getWithoutCommit( FilesetVersionMapper.class, mapper -> mapper.selectFilesetVersionsByRetentionCount(versionRetentionCount)); @@ -244,23 +244,24 @@ public int deleteFilesetVersionsByRetentionCount(Long versionRetentionCount, int // soft delete old versions that are older than or equal to (currentVersion - // versionRetentionCount). int totalDeletedCount = 0; - for (FilesetVersionPO filesetVersionPO : filesetCurVersions) { - long versionRetentionLine = filesetVersionPO.getVersion().longValue() - versionRetentionCount; + for (Pair filesetCurVersion : filesetCurVersions) { + long versionRetentionLine = filesetCurVersion.getValue() - versionRetentionCount; int deletedCount = SessionUtils.doWithCommitAndFetchResult( FilesetVersionMapper.class, mapper -> mapper.softDeleteFilesetVersionsByRetentionLine( - filesetVersionPO.getFilesetId(), versionRetentionLine, limit)); + filesetCurVersion.getKey(), versionRetentionLine, limit)); totalDeletedCount += deletedCount; // log the deletion by current fileset version. LOG.info( "Soft delete filesetVersions count: {} which versions are older than or equal to" - + " versionRetentionLine: {}, the current FilesetVersion is: {}.", + + " versionRetentionLine: {}, the current filesetId and version is: <{}, {}>.", deletedCount, versionRetentionLine, - filesetVersionPO); + filesetCurVersion.getKey(), + filesetCurVersion.getValue()); } return totalDeletedCount; } From 511e3f0ad92361f2ef74ff8990038ebaacf91b5e Mon Sep 17 00:00:00 2001 From: YxAc Date: Fri, 19 Apr 2024 20:26:03 +0800 Subject: [PATCH 14/14] fix type cast problem --- .../storage/relational/mapper/FilesetVersionMapper.java | 6 +++--- .../storage/relational/service/FilesetMetaService.java | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java index f520a010b2e..d5d9053c330 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java @@ -7,7 +7,7 @@ import com.datastrato.gravitino.storage.relational.po.FilesetVersionPO; import java.util.List; -import org.apache.commons.lang3.tuple.Pair; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.ibatis.annotations.Delete; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Param; @@ -116,7 +116,7 @@ Integer deleteFilesetVersionsByLegacyTimeLine( + VERSION_TABLE_NAME + " WHERE version > #{versionRetentionCount} AND deleted_at = 0" + " GROUP BY fileset_id") - List> selectFilesetVersionsByRetentionCount( + List> selectFilesetVersionsByRetentionCount( @Param("versionRetentionCount") Long versionRetentionCount); @Update( @@ -126,6 +126,6 @@ List> selectFilesetVersionsByRetentionCount( + " WHERE fileset_id = #{filesetId} AND version <= #{versionRetentionLine} AND deleted_at = 0 LIMIT #{limit}") Integer softDeleteFilesetVersionsByRetentionLine( @Param("filesetId") Long filesetId, - @Param("versionRetentionLine") Long versionRetentionLine, + @Param("versionRetentionLine") long versionRetentionLine, @Param("limit") int limit); } diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java index 0df61dce30f..ad041aee418 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java @@ -21,7 +21,7 @@ import java.util.List; import java.util.Objects; import java.util.function.Function; -import org.apache.commons.lang3.tuple.Pair; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -236,7 +236,7 @@ public int deleteFilesetAndVersionMetasByLegacyTimeLine(Long legacyTimeLine, int public int deleteFilesetVersionsByRetentionCount(Long versionRetentionCount, int limit) { // get the current version of all filesets. - List> filesetCurVersions = + List> filesetCurVersions = SessionUtils.getWithoutCommit( FilesetVersionMapper.class, mapper -> mapper.selectFilesetVersionsByRetentionCount(versionRetentionCount)); @@ -244,7 +244,7 @@ public int deleteFilesetVersionsByRetentionCount(Long versionRetentionCount, int // soft delete old versions that are older than or equal to (currentVersion - // versionRetentionCount). int totalDeletedCount = 0; - for (Pair filesetCurVersion : filesetCurVersions) { + for (ImmutablePair filesetCurVersion : filesetCurVersions) { long versionRetentionLine = filesetCurVersion.getValue() - versionRetentionCount; int deletedCount = SessionUtils.doWithCommitAndFetchResult(