diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java index 31bc78c1a7d..219ef6c27ea 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/JDBCBackend.java @@ -40,8 +40,6 @@ import java.io.IOException; import java.util.List; import java.util.function.Function; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * {@link JDBCBackend} is a jdbc implementation of {@link RelationalBackend} interface. You can use @@ -51,8 +49,6 @@ */ public class JDBCBackend implements RelationalBackend { - private static final Logger LOG = LoggerFactory.getLogger(JDBCBackend.class); - /** Initialize the jdbc backend instance. */ @Override public void initialize(Config config) { @@ -203,11 +199,6 @@ public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolea @Override public int hardDeleteLegacyData(Entity.EntityType entityType, long legacyTimeLine) { - LOG.info( - "Try to physically delete {} legacy data that has been marked deleted before {}", - entityType, - legacyTimeLine); - switch (entityType) { case METALAKE: return MetalakeMetaService.getInstance() diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java index 3cc1a852782..87e99ba39ef 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/RelationalGarbageCollector.java @@ -63,16 +63,32 @@ private void collectAndClean() { long legacyTimeLine = System.currentTimeMillis() - storeDeleteAfterTimeMillis; for (Entity.EntityType entityType : Entity.EntityType.values()) { long deletedCount = Long.MAX_VALUE; - while (deletedCount > 0) { - deletedCount = backend.hardDeleteLegacyData(entityType, legacyTimeLine); + LOG.info( + "Try to physically delete {} legacy data that has been marked deleted before {}", + entityType, + legacyTimeLine); + try { + while (deletedCount > 0) { + deletedCount = backend.hardDeleteLegacyData(entityType, legacyTimeLine); + } + } catch (RuntimeException e) { + LOG.error("Failed to physically delete type of " + entityType + "'s legacy data: ", e); } } LOG.info("Start to collect and delete old version data by thread {}", threadId); for (Entity.EntityType entityType : Entity.EntityType.values()) { long deletedCount = Long.MAX_VALUE; - while (deletedCount > 0) { - deletedCount = backend.deleteOldVersionData(entityType, versionRetentionCount); + LOG.info( + "Try to softly delete {} old version data that has been over retention count {}", + entityType, + versionRetentionCount); + try { + while (deletedCount > 0) { + deletedCount = backend.deleteOldVersionData(entityType, versionRetentionCount); + } + } catch (RuntimeException e) { + LOG.error("Failed to softly delete type of " + entityType + "'s old version data: ", e); } } } catch (Exception e) { diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java index d5d9053c330..a16ee520916 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/mapper/FilesetVersionMapper.java @@ -5,9 +5,9 @@ package com.datastrato.gravitino.storage.relational.mapper; +import com.datastrato.gravitino.storage.relational.po.FilesetMaxVersionPO; import com.datastrato.gravitino.storage.relational.po.FilesetVersionPO; import java.util.List; -import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.ibatis.annotations.Delete; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Param; @@ -116,7 +116,7 @@ Integer deleteFilesetVersionsByLegacyTimeLine( + VERSION_TABLE_NAME + " WHERE version > #{versionRetentionCount} AND deleted_at = 0" + " GROUP BY fileset_id") - List> selectFilesetVersionsByRetentionCount( + List selectFilesetVersionsByRetentionCount( @Param("versionRetentionCount") Long versionRetentionCount); @Update( diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/po/FilesetMaxVersionPO.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/po/FilesetMaxVersionPO.java new file mode 100644 index 00000000000..f14777f819d --- /dev/null +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/po/FilesetMaxVersionPO.java @@ -0,0 +1,34 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.storage.relational.po; + +import com.google.common.base.Objects; + +public class FilesetMaxVersionPO { + private Long filesetId; + private Long version; + + public Long getFilesetId() { + return filesetId; + } + + public Long getVersion() { + return version; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof FilesetMaxVersionPO)) return false; + FilesetMaxVersionPO that = (FilesetMaxVersionPO) o; + return Objects.equal(getFilesetId(), that.getFilesetId()) + && Objects.equal(getVersion(), that.getVersion()); + } + + @Override + public int hashCode() { + return Objects.hashCode(getFilesetId(), getVersion()); + } +} diff --git a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java index ad041aee418..058b7464703 100644 --- a/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java +++ b/core/src/main/java/com/datastrato/gravitino/storage/relational/service/FilesetMetaService.java @@ -12,6 +12,7 @@ import com.datastrato.gravitino.meta.FilesetEntity; import com.datastrato.gravitino.storage.relational.mapper.FilesetMetaMapper; import com.datastrato.gravitino.storage.relational.mapper.FilesetVersionMapper; +import com.datastrato.gravitino.storage.relational.po.FilesetMaxVersionPO; import com.datastrato.gravitino.storage.relational.po.FilesetPO; import com.datastrato.gravitino.storage.relational.utils.ExceptionUtils; import com.datastrato.gravitino.storage.relational.utils.POConverters; @@ -21,7 +22,6 @@ import java.util.List; import java.util.Objects; import java.util.function.Function; -import org.apache.commons.lang3.tuple.ImmutablePair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -236,7 +236,7 @@ public int deleteFilesetAndVersionMetasByLegacyTimeLine(Long legacyTimeLine, int public int deleteFilesetVersionsByRetentionCount(Long versionRetentionCount, int limit) { // get the current version of all filesets. - List> filesetCurVersions = + List filesetCurVersions = SessionUtils.getWithoutCommit( FilesetVersionMapper.class, mapper -> mapper.selectFilesetVersionsByRetentionCount(versionRetentionCount)); @@ -244,14 +244,14 @@ public int deleteFilesetVersionsByRetentionCount(Long versionRetentionCount, int // soft delete old versions that are older than or equal to (currentVersion - // versionRetentionCount). int totalDeletedCount = 0; - for (ImmutablePair filesetCurVersion : filesetCurVersions) { - long versionRetentionLine = filesetCurVersion.getValue() - versionRetentionCount; + for (FilesetMaxVersionPO filesetCurVersion : filesetCurVersions) { + long versionRetentionLine = filesetCurVersion.getVersion() - versionRetentionCount; int deletedCount = SessionUtils.doWithCommitAndFetchResult( FilesetVersionMapper.class, mapper -> mapper.softDeleteFilesetVersionsByRetentionLine( - filesetCurVersion.getKey(), versionRetentionLine, limit)); + filesetCurVersion.getFilesetId(), versionRetentionLine, limit)); totalDeletedCount += deletedCount; // log the deletion by current fileset version. @@ -260,8 +260,8 @@ public int deleteFilesetVersionsByRetentionCount(Long versionRetentionCount, int + " versionRetentionLine: {}, the current filesetId and version is: <{}, {}>.", deletedCount, versionRetentionLine, - filesetCurVersion.getKey(), - filesetCurVersion.getValue()); + filesetCurVersion.getFilesetId(), + filesetCurVersion.getVersion()); } return totalDeletedCount; } diff --git a/integration-test/build.gradle.kts b/integration-test/build.gradle.kts index c9b68e75a8d..384f8417b18 100644 --- a/integration-test/build.gradle.kts +++ b/integration-test/build.gradle.kts @@ -99,6 +99,7 @@ dependencies { exclude("org.apache.directory.api", "api-ldap-schema-data") } testImplementation(libs.mockito.core) + testImplementation(libs.mybatis) testImplementation(libs.mysql.driver) testImplementation("org.apache.spark:spark-hive_$scalaVersion:$sparkVersion") { diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/store/relational/service/FilesetMetaServiceIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/store/relational/service/FilesetMetaServiceIT.java new file mode 100644 index 00000000000..b3259ae8ed3 --- /dev/null +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/store/relational/service/FilesetMetaServiceIT.java @@ -0,0 +1,242 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.integration.test.store.relational.service; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.datastrato.gravitino.Catalog; +import com.datastrato.gravitino.Config; +import com.datastrato.gravitino.Configs; +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.Namespace; +import com.datastrato.gravitino.config.ConfigConstants; +import com.datastrato.gravitino.file.Fileset; +import com.datastrato.gravitino.integration.test.container.ContainerSuite; +import com.datastrato.gravitino.integration.test.container.MySQLContainer; +import com.datastrato.gravitino.integration.test.util.GravitinoITUtils; +import com.datastrato.gravitino.integration.test.util.TestDatabaseName; +import com.datastrato.gravitino.meta.AuditInfo; +import com.datastrato.gravitino.meta.BaseMetalake; +import com.datastrato.gravitino.meta.CatalogEntity; +import com.datastrato.gravitino.meta.FilesetEntity; +import com.datastrato.gravitino.meta.SchemaEntity; +import com.datastrato.gravitino.meta.SchemaVersion; +import com.datastrato.gravitino.storage.IdGenerator; +import com.datastrato.gravitino.storage.RandomIdGenerator; +import com.datastrato.gravitino.storage.relational.service.CatalogMetaService; +import com.datastrato.gravitino.storage.relational.service.FilesetMetaService; +import com.datastrato.gravitino.storage.relational.service.MetalakeMetaService; +import com.datastrato.gravitino.storage.relational.service.SchemaMetaService; +import com.datastrato.gravitino.storage.relational.session.SqlSessionFactoryHelper; +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.ibatis.session.SqlSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Tag("gravitino-docker-it") +public class FilesetMetaServiceIT { + private static final Logger LOG = LoggerFactory.getLogger(FilesetMetaServiceIT.class); + private static final ContainerSuite containerSuite = ContainerSuite.getInstance(); + + @BeforeAll + public static void setup() { + Assumptions.assumeTrue("true".equals(System.getenv("jdbcBackend"))); + TestDatabaseName META_DATA = TestDatabaseName.MYSQL_JDBC_BACKEND; + containerSuite.startMySQLContainer(META_DATA); + MySQLContainer MYSQL_CONTAINER = containerSuite.getMySQLContainer(); + + String mysqlUrl = MYSQL_CONTAINER.getJdbcUrl(META_DATA); + LOG.info("MySQL URL: {}", mysqlUrl); + // Connect to the mysql docker and create a databases + try (Connection connection = + DriverManager.getConnection( + StringUtils.substring(mysqlUrl, 0, mysqlUrl.lastIndexOf("/")), "root", "root"); + final Statement statement = connection.createStatement()) { + statement.execute("drop database if exists " + META_DATA); + statement.execute("create database " + META_DATA); + String gravitinoHome = System.getenv("GRAVITINO_ROOT_DIR"); + String mysqlContent = + FileUtils.readFileToString( + new File( + gravitinoHome + + String.format( + "/scripts/mysql/schema-%s-mysql.sql", ConfigConstants.VERSION_0_5_0)), + "UTF-8"); + String[] initMySQLBackendSqls = mysqlContent.split(";"); + initMySQLBackendSqls = ArrayUtils.addFirst(initMySQLBackendSqls, "use " + META_DATA + ";"); + for (String sql : initMySQLBackendSqls) { + statement.execute(sql); + } + } catch (Exception e) { + LOG.error("Failed to create database in mysql", e); + throw new RuntimeException(e); + } + + Config config = Mockito.mock(Config.class); + Mockito.when(config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_URL)).thenReturn(mysqlUrl); + Mockito.when(config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER)) + .thenReturn("com.mysql.cj.jdbc.Driver"); + Mockito.when(config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_USER)).thenReturn("root"); + Mockito.when(config.get(Configs.ENTITY_RELATIONAL_JDBC_BACKEND_PASSWORD)).thenReturn("root"); + + SqlSessionFactoryHelper.getInstance().init(config); + } + + @AfterAll + public static void tearDown() {} + + @Test + public void testDeleteFilesetVersionsByRetentionCount() throws IOException { + Assumptions.assumeTrue("true".equals(System.getenv("jdbcBackend"))); + IdGenerator idGenerator = new RandomIdGenerator(); + AuditInfo auditInfo = + AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build(); + String metalakeName = GravitinoITUtils.genRandomName("tst_metalake"); + BaseMetalake metalake = createBaseMakeLake(idGenerator.nextId(), metalakeName, auditInfo); + MetalakeMetaService.getInstance().insertMetalake(metalake, true); + assertNotNull( + MetalakeMetaService.getInstance() + .getMetalakeByIdentifier(NameIdentifier.ofMetalake(metalakeName))); + String catalogName = GravitinoITUtils.genRandomName("tst_fs_catalog"); + CatalogEntity catalogEntity = + createCatalog( + idGenerator.nextId(), Namespace.ofCatalog(metalakeName), catalogName, auditInfo); + CatalogMetaService.getInstance().insertCatalog(catalogEntity, true); + assertNotNull( + CatalogMetaService.getInstance() + .getCatalogByIdentifier(NameIdentifier.ofCatalog(metalakeName, catalogName))); + String schemaName = GravitinoITUtils.genRandomName("tst_fs_schema"); + SchemaEntity schemaEntity = + createSchemaEntity( + idGenerator.nextId(), + Namespace.ofSchema(metalakeName, catalogName), + schemaName, + auditInfo); + SchemaMetaService.getInstance().insertSchema(schemaEntity, true); + assertNotNull( + SchemaMetaService.getInstance() + .getSchemaByIdentifier(NameIdentifier.ofSchema(metalakeName, catalogName, schemaName))); + String filesetName = GravitinoITUtils.genRandomName("tst_fs_fileset"); + FilesetEntity filesetEntity = + createFilesetEntity( + idGenerator.nextId(), + Namespace.ofFileset(metalakeName, catalogName, schemaName), + filesetName, + auditInfo, + "/tmp"); + FilesetMetaService.getInstance().insertFileset(filesetEntity, true); + assertNotNull( + FilesetMetaService.getInstance() + .getFilesetByIdentifier( + NameIdentifier.ofFileset(metalakeName, catalogName, schemaName, filesetName))); + FilesetMetaService.getInstance() + .updateFileset( + NameIdentifier.ofFileset(metalakeName, catalogName, schemaName, filesetName), + e -> { + AuditInfo auditInfo1 = + AuditInfo.builder().withCreator("creator5").withCreateTime(Instant.now()).build(); + return createFilesetEntity( + filesetEntity.id(), + Namespace.of(metalakeName, catalogName, schemaName), + "filesetChanged", + auditInfo1, + "/tmp1"); + }); + FilesetMetaService.getInstance().deleteFilesetVersionsByRetentionCount(1L, 100); + Map versionInfo = listFilesetValidVersions(filesetEntity.id()); + // version 1 should be softly deleted + assertTrue(versionInfo.get(1) > 0); + } + + private Map listFilesetValidVersions(Long filesetId) { + Map versionDeletedTime = new HashMap<>(); + try (SqlSession sqlSession = + SqlSessionFactoryHelper.getInstance().getSqlSessionFactory().openSession(true); + Connection connection = sqlSession.getConnection(); + Statement statement = connection.createStatement(); + ResultSet rs = + statement.executeQuery( + String.format( + "SELECT version, deleted_at FROM fileset_version_info WHERE fileset_id = %d", + filesetId))) { + while (rs.next()) { + versionDeletedTime.put(rs.getInt("version"), rs.getLong("deleted_at")); + } + } catch (SQLException e) { + throw new RuntimeException("SQL execution failed", e); + } + return versionDeletedTime; + } + + public static BaseMetalake createBaseMakeLake(Long id, String name, AuditInfo auditInfo) { + return BaseMetalake.builder() + .withId(id) + .withName(name) + .withAuditInfo(auditInfo) + .withComment("") + .withProperties(null) + .withVersion(SchemaVersion.V_0_1) + .build(); + } + + public static CatalogEntity createCatalog( + Long id, Namespace namespace, String name, AuditInfo auditInfo) { + return CatalogEntity.builder() + .withId(id) + .withName(name) + .withNamespace(namespace) + .withType(Catalog.Type.RELATIONAL) + .withProvider("test") + .withComment("") + .withProperties(null) + .withAuditInfo(auditInfo) + .build(); + } + + public static SchemaEntity createSchemaEntity( + Long id, Namespace namespace, String name, AuditInfo auditInfo) { + return SchemaEntity.builder() + .withId(id) + .withName(name) + .withNamespace(namespace) + .withComment("") + .withProperties(null) + .withAuditInfo(auditInfo) + .build(); + } + + public static FilesetEntity createFilesetEntity( + Long id, Namespace namespace, String name, AuditInfo auditInfo, String location) { + return FilesetEntity.builder() + .withId(id) + .withName(name) + .withNamespace(namespace) + .withFilesetType(Fileset.Type.MANAGED) + .withStorageLocation(location) + .withComment("") + .withProperties(null) + .withAuditInfo(auditInfo) + .build(); + } +}