Skip to content

Commit

Permalink
[#3190] fix(jdbc-backend): Fix the cast issue when Relational Garbage…
Browse files Browse the repository at this point in the history
… Collector cleaning fileset version info (#3191)

### What changes were proposed in this pull request?

When obtaining the maximum version result of fileset, pass the result
into the object to solve the `ClassCastException` problem.

### Why are the changes needed?

Fix: #3190 

### How was this patch tested?

Add some ITs.

---------

Co-authored-by: xiaojiebao <[email protected]>
  • Loading branch information
2 people authored and web-flow committed Apr 30, 2024
1 parent a507530 commit 76be09d
Show file tree
Hide file tree
Showing 7 changed files with 306 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@
import java.io.IOException;
import java.util.List;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* {@link JDBCBackend} is a jdbc implementation of {@link RelationalBackend} interface. You can use
Expand All @@ -51,8 +49,6 @@
*/
public class JDBCBackend implements RelationalBackend {

private static final Logger LOG = LoggerFactory.getLogger(JDBCBackend.class);

/** Initialize the jdbc backend instance. */
@Override
public void initialize(Config config) {
Expand Down Expand Up @@ -203,11 +199,6 @@ public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolea

@Override
public int hardDeleteLegacyData(Entity.EntityType entityType, long legacyTimeLine) {
LOG.info(
"Try to physically delete {} legacy data that has been marked deleted before {}",
entityType,
legacyTimeLine);

switch (entityType) {
case METALAKE:
return MetalakeMetaService.getInstance()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,32 @@ private void collectAndClean() {
long legacyTimeLine = System.currentTimeMillis() - storeDeleteAfterTimeMillis;
for (Entity.EntityType entityType : Entity.EntityType.values()) {
long deletedCount = Long.MAX_VALUE;
while (deletedCount > 0) {
deletedCount = backend.hardDeleteLegacyData(entityType, legacyTimeLine);
LOG.info(
"Try to physically delete {} legacy data that has been marked deleted before {}",
entityType,
legacyTimeLine);
try {
while (deletedCount > 0) {
deletedCount = backend.hardDeleteLegacyData(entityType, legacyTimeLine);
}
} catch (RuntimeException e) {
LOG.error("Failed to physically delete type of " + entityType + "'s legacy data: ", e);
}
}

LOG.info("Start to collect and delete old version data by thread {}", threadId);
for (Entity.EntityType entityType : Entity.EntityType.values()) {
long deletedCount = Long.MAX_VALUE;
while (deletedCount > 0) {
deletedCount = backend.deleteOldVersionData(entityType, versionRetentionCount);
LOG.info(
"Try to softly delete {} old version data that has been over retention count {}",
entityType,
versionRetentionCount);
try {
while (deletedCount > 0) {
deletedCount = backend.deleteOldVersionData(entityType, versionRetentionCount);
}
} catch (RuntimeException e) {
LOG.error("Failed to softly delete type of " + entityType + "'s old version data: ", e);
}
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

package com.datastrato.gravitino.storage.relational.mapper;

import com.datastrato.gravitino.storage.relational.po.FilesetMaxVersionPO;
import com.datastrato.gravitino.storage.relational.po.FilesetVersionPO;
import java.util.List;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
Expand Down Expand Up @@ -116,7 +116,7 @@ Integer deleteFilesetVersionsByLegacyTimeLine(
+ VERSION_TABLE_NAME
+ " WHERE version > #{versionRetentionCount} AND deleted_at = 0"
+ " GROUP BY fileset_id")
List<ImmutablePair<Long, Integer>> selectFilesetVersionsByRetentionCount(
List<FilesetMaxVersionPO> selectFilesetVersionsByRetentionCount(
@Param("versionRetentionCount") Long versionRetentionCount);

@Update(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.storage.relational.po;

import com.google.common.base.Objects;

public class FilesetMaxVersionPO {
private Long filesetId;
private Long version;

public Long getFilesetId() {
return filesetId;
}

public Long getVersion() {
return version;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof FilesetMaxVersionPO)) return false;
FilesetMaxVersionPO that = (FilesetMaxVersionPO) o;
return Objects.equal(getFilesetId(), that.getFilesetId())
&& Objects.equal(getVersion(), that.getVersion());
}

@Override
public int hashCode() {
return Objects.hashCode(getFilesetId(), getVersion());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.datastrato.gravitino.meta.FilesetEntity;
import com.datastrato.gravitino.storage.relational.mapper.FilesetMetaMapper;
import com.datastrato.gravitino.storage.relational.mapper.FilesetVersionMapper;
import com.datastrato.gravitino.storage.relational.po.FilesetMaxVersionPO;
import com.datastrato.gravitino.storage.relational.po.FilesetPO;
import com.datastrato.gravitino.storage.relational.utils.ExceptionUtils;
import com.datastrato.gravitino.storage.relational.utils.POConverters;
Expand All @@ -21,7 +22,6 @@
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -236,22 +236,22 @@ public int deleteFilesetAndVersionMetasByLegacyTimeLine(Long legacyTimeLine, int

public int deleteFilesetVersionsByRetentionCount(Long versionRetentionCount, int limit) {
// get the current version of all filesets.
List<ImmutablePair<Long, Integer>> filesetCurVersions =
List<FilesetMaxVersionPO> filesetCurVersions =
SessionUtils.getWithoutCommit(
FilesetVersionMapper.class,
mapper -> mapper.selectFilesetVersionsByRetentionCount(versionRetentionCount));

// soft delete old versions that are older than or equal to (currentVersion -
// versionRetentionCount).
int totalDeletedCount = 0;
for (ImmutablePair<Long, Integer> filesetCurVersion : filesetCurVersions) {
long versionRetentionLine = filesetCurVersion.getValue() - versionRetentionCount;
for (FilesetMaxVersionPO filesetCurVersion : filesetCurVersions) {
long versionRetentionLine = filesetCurVersion.getVersion() - versionRetentionCount;
int deletedCount =
SessionUtils.doWithCommitAndFetchResult(
FilesetVersionMapper.class,
mapper ->
mapper.softDeleteFilesetVersionsByRetentionLine(
filesetCurVersion.getKey(), versionRetentionLine, limit));
filesetCurVersion.getFilesetId(), versionRetentionLine, limit));
totalDeletedCount += deletedCount;

// log the deletion by current fileset version.
Expand All @@ -260,8 +260,8 @@ public int deleteFilesetVersionsByRetentionCount(Long versionRetentionCount, int
+ " versionRetentionLine: {}, the current filesetId and version is: <{}, {}>.",
deletedCount,
versionRetentionLine,
filesetCurVersion.getKey(),
filesetCurVersion.getValue());
filesetCurVersion.getFilesetId(),
filesetCurVersion.getVersion());
}
return totalDeletedCount;
}
Expand Down
1 change: 1 addition & 0 deletions integration-test/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ dependencies {
exclude("org.apache.directory.api", "api-ldap-schema-data")
}
testImplementation(libs.mockito.core)
testImplementation(libs.mybatis)
testImplementation(libs.mysql.driver)

testImplementation("org.apache.spark:spark-hive_$scalaVersion:$sparkVersion") {
Expand Down
Loading

0 comments on commit 76be09d

Please sign in to comment.