Skip to content

Commit

Permalink
[#2385] feat(core): Add the Relational Garbage Collector (#3016)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
- Add the JDBC Backend Garbage Collector

### Why are the changes needed?
Fix: #2385 

### Does this PR introduce _any_ user-facing change?
N/A

### How was this patch tested?
UT

---------

Co-authored-by: YxAc <[email protected]>
  • Loading branch information
YxAc and YxAc authored Apr 20, 2024
1 parent 008adb4 commit a1dd279
Show file tree
Hide file tree
Showing 20 changed files with 639 additions and 2 deletions.
1 change: 1 addition & 0 deletions core/src/main/java/com/datastrato/gravitino/Configs.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public interface Configs {
String DEFAULT_KV_ROCKSDB_BACKEND_PATH =
String.join(File.separator, System.getenv("GRAVITINO_HOME"), "data", "rocksdb");

int GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT = 100;
long MAX_NODE_IN_MEMORY = 100000L;

long MIN_NODE_IN_MEMORY = 1000L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package com.datastrato.gravitino.storage.relational;

import static com.datastrato.gravitino.Configs.GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT;

import com.datastrato.gravitino.Config;
import com.datastrato.gravitino.Configs;
import com.datastrato.gravitino.Entity;
Expand Down Expand Up @@ -36,6 +38,8 @@
import java.io.IOException;
import java.util.List;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* {@link JDBCBackend} is a jdbc implementation of {@link RelationalBackend} interface. You can use
Expand All @@ -45,6 +49,8 @@
*/
public class JDBCBackend implements RelationalBackend {

private static final Logger LOG = LoggerFactory.getLogger(JDBCBackend.class);

/** Initialize the jdbc backend instance. */
@Override
public void initialize(Config config) {
Expand Down Expand Up @@ -181,6 +187,80 @@ public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolea
}
}

@Override
public int hardDeleteLegacyData(Entity.EntityType entityType, long legacyTimeLine) {
LOG.info(
"Try to physically delete {} legacy data that has been marked deleted before {}",
entityType,
legacyTimeLine);

switch (entityType) {
case METALAKE:
return MetalakeMetaService.getInstance()
.deleteMetalakeMetasByLegacyTimeLine(
legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
case CATALOG:
return CatalogMetaService.getInstance()
.deleteCatalogMetasByLegacyTimeLine(
legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
case SCHEMA:
return SchemaMetaService.getInstance()
.deleteSchemaMetasByLegacyTimeLine(
legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
case TABLE:
return TableMetaService.getInstance()
.deleteTableMetasByLegacyTimeLine(
legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
case FILESET:
return FilesetMetaService.getInstance()
.deleteFilesetAndVersionMetasByLegacyTimeLine(
legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
case TOPIC:
return TopicMetaService.getInstance()
.deleteTopicMetasByLegacyTimeLine(
legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);

case COLUMN:
case USER:
case GROUP:
case AUDIT:
case ROLE:
return 0;
// TODO: Implement hard delete logic for these entity types.

default:
throw new IllegalArgumentException(
"Unsupported entity type when collectAndRemoveLegacyData: " + entityType);
}
}

@Override
public int deleteOldVersionData(Entity.EntityType entityType, long versionRetentionCount) {
switch (entityType) {
case METALAKE:
case CATALOG:
case SCHEMA:
case TABLE:
case COLUMN:
case TOPIC:
case USER:
case GROUP:
case AUDIT:
case ROLE:
// These entity types have not implemented multi-versions, so we can skip.
return 0;

case FILESET:
return FilesetMetaService.getInstance()
.deleteFilesetVersionsByRetentionCount(
versionRetentionCount, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);

default:
throw new IllegalArgumentException(
"Unsupported entity type when collectAndRemoveOldVersionData: " + entityType);
}
}

@Override
public void close() throws IOException {
SqlSessionFactoryHelper.getInstance().close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,32 @@ <E extends Entity & HasIdentifier> E get(NameIdentifier ident, Entity.EntityType
throws IOException;

/**
* Deletes the entity associated with the identifier and the entity type.
* Soft deletes the entity associated with the identifier and the entity type.
*
* @param ident The identifier of the entity.
* @param entityType The type of the entity.
* @param cascade True, If you need to cascade delete entities, else false.
* @return True, if the entity was successfully deleted, else false.
*/
boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolean cascade);

/**
* Permanently deletes the legacy data that has been marked as deleted before the given legacy
* timeline.
*
* @param entityType The type of the entity.
* @param legacyTimeLine The time before which the data has been marked as deleted.
* @return The count of the deleted data.
*/
int hardDeleteLegacyData(Entity.EntityType entityType, long legacyTimeLine);

/**
* Soft deletes the old version data that is older than or equal to the given version retention
* count.
*
* @param entityType The type of the entity.
* @param versionRetentionCount The count of versions to retain.
* @return The count of the deleted data.
*/
int deleteOldVersionData(Entity.EntityType entityType, long versionRetentionCount);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@ public class RelationalEntityStore implements EntityStore {
ImmutableMap.of(
Configs.DEFAULT_ENTITY_RELATIONAL_STORE, JDBCBackend.class.getCanonicalName());
private RelationalBackend backend;
private RelationalGarbageCollector garbageCollector;

@Override
public void initialize(Config config) throws RuntimeException {
this.backend = createRelationalEntityBackend(config);
this.garbageCollector = new RelationalGarbageCollector(backend, config);
this.garbageCollector.start();
}

private static RelationalBackend createRelationalEntityBackend(Config config) {
Expand Down Expand Up @@ -109,6 +112,7 @@ public <R, E extends Exception> R executeInTransaction(Executable<R, E> executab

@Override
public void close() throws IOException {
garbageCollector.close();
backend.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.storage.relational;

import static com.datastrato.gravitino.Configs.STORE_DELETE_AFTER_TIME;
import static com.datastrato.gravitino.Configs.VERSION_RETENTION_COUNT;

import com.datastrato.gravitino.Config;
import com.datastrato.gravitino.Entity;
import com.google.common.annotations.VisibleForTesting;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class RelationalGarbageCollector implements Closeable {

private static final Logger LOG = LoggerFactory.getLogger(RelationalGarbageCollector.class);
private final RelationalBackend backend;

private final long storeDeleteAfterTimeMillis;
private final long versionRetentionCount;

@VisibleForTesting
final ScheduledExecutorService garbageCollectorPool =
new ScheduledThreadPoolExecutor(
2,
r -> {
Thread t = new Thread(r, "RelationalBackend-Garbage-Collector");
t.setDaemon(true);
return t;
},
new ThreadPoolExecutor.AbortPolicy());

public RelationalGarbageCollector(RelationalBackend backend, Config config) {
this.backend = backend;
storeDeleteAfterTimeMillis = config.get(STORE_DELETE_AFTER_TIME);
versionRetentionCount = config.get(VERSION_RETENTION_COUNT);
}

public void start() {
long dateTimeLineMinute = storeDeleteAfterTimeMillis / 1000 / 60;

// We will collect garbage every 10 minutes at least. If the dateTimeLineMinute is larger than
// 100 minutes, we would collect garbage every dateTimeLineMinute/10 minutes.
long frequency = Math.max(dateTimeLineMinute / 10, 10);
garbageCollectorPool.scheduleAtFixedRate(this::collectAndClean, 5, frequency, TimeUnit.MINUTES);
}

private void collectAndClean() {
long threadId = Thread.currentThread().getId();
LOG.info("Thread {} start to collect garbage...", threadId);

try {
LOG.info("Start to collect and delete legacy data by thread {}", threadId);
long legacyTimeLine = System.currentTimeMillis() - storeDeleteAfterTimeMillis;
for (Entity.EntityType entityType : Entity.EntityType.values()) {
long deletedCount = Long.MAX_VALUE;
while (deletedCount > 0) {
deletedCount = backend.hardDeleteLegacyData(entityType, legacyTimeLine);
}
}

LOG.info("Start to collect and delete old version data by thread {}", threadId);
for (Entity.EntityType entityType : Entity.EntityType.values()) {
long deletedCount = Long.MAX_VALUE;
while (deletedCount > 0) {
deletedCount = backend.deleteOldVersionData(entityType, versionRetentionCount);
}
}
} catch (Exception e) {
LOG.error("Thread {} failed to collect and clean garbage.", threadId, e);
} finally {
LOG.info("Thread {} finish to collect garbage.", threadId);
}
}

@Override
public void close() throws IOException {
this.garbageCollectorPool.shutdown();
try {
if (!this.garbageCollectorPool.awaitTermination(5, TimeUnit.SECONDS)) {
this.garbageCollectorPool.shutdownNow();
}
} catch (InterruptedException ex) {
this.garbageCollectorPool.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.datastrato.gravitino.storage.relational.po.CatalogPO;
import java.util.List;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
Expand Down Expand Up @@ -147,4 +148,11 @@ Integer updateCatalogMeta(
+ " SET deleted_at = UNIX_TIMESTAMP(CURRENT_TIMESTAMP(3)) * 1000.0"
+ " WHERE metalake_id = #{metalakeId} AND deleted_at = 0")
Integer softDeleteCatalogMetasByMetalakeId(@Param("metalakeId") Long metalakeId);

@Delete(
"DELETE FROM "
+ TABLE_NAME
+ " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}")
Integer deleteCatalogMetasByLegacyTimeLine(
@Param("legacyTimeLine") Long legacyTimeLine, @Param("limit") int limit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.datastrato.gravitino.storage.relational.po.FilesetPO;
import java.util.List;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Result;
Expand Down Expand Up @@ -214,4 +215,11 @@ Integer updateFilesetMeta(
+ " SET deleted_at = UNIX_TIMESTAMP(CURRENT_TIMESTAMP(3)) * 1000.0"
+ " WHERE fileset_id = #{filesetId} AND deleted_at = 0")
Integer softDeleteFilesetMetasByFilesetId(@Param("filesetId") Long filesetId);

@Delete(
"DELETE FROM "
+ META_TABLE_NAME
+ " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}")
Integer deleteFilesetMetasByLegacyTimeLine(
@Param("legacyTimeLine") Long legacyTimeLine, @Param("limit") int limit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
package com.datastrato.gravitino.storage.relational.mapper;

import com.datastrato.gravitino.storage.relational.po.FilesetVersionPO;
import java.util.List;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;

/**
Expand Down Expand Up @@ -97,4 +101,31 @@ void insertFilesetVersionOnDuplicateKeyUpdate(
+ " SET deleted_at = UNIX_TIMESTAMP(CURRENT_TIMESTAMP(3)) * 1000.0"
+ " WHERE fileset_id = #{filesetId} AND deleted_at = 0")
Integer softDeleteFilesetVersionsByFilesetId(@Param("filesetId") Long filesetId);

@Delete(
"DELETE FROM "
+ VERSION_TABLE_NAME
+ " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}")
Integer deleteFilesetVersionsByLegacyTimeLine(
@Param("legacyTimeLine") Long legacyTimeLine, @Param("limit") int limit);

@Select(
"SELECT fileset_id as filesetId,"
+ " Max(version) as version"
+ " FROM "
+ VERSION_TABLE_NAME
+ " WHERE version > #{versionRetentionCount} AND deleted_at = 0"
+ " GROUP BY fileset_id")
List<ImmutablePair<Long, Integer>> selectFilesetVersionsByRetentionCount(
@Param("versionRetentionCount") Long versionRetentionCount);

@Update(
"UPDATE "
+ VERSION_TABLE_NAME
+ " SET deleted_at = UNIX_TIMESTAMP(CURRENT_TIMESTAMP(3)) * 1000.0"
+ " WHERE fileset_id = #{filesetId} AND version <= #{versionRetentionLine} AND deleted_at = 0 LIMIT #{limit}")
Integer softDeleteFilesetVersionsByRetentionLine(
@Param("filesetId") Long filesetId,
@Param("versionRetentionLine") long versionRetentionLine,
@Param("limit") int limit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.datastrato.gravitino.storage.relational.po.MetalakePO;
import java.util.List;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
Expand Down Expand Up @@ -125,4 +126,11 @@ Integer updateMetalakeMeta(
+ " SET deleted_at = UNIX_TIMESTAMP(CURRENT_TIMESTAMP(3)) * 1000.0"
+ " WHERE metalake_id = #{metalakeId} AND deleted_at = 0")
Integer softDeleteMetalakeMetaByMetalakeId(@Param("metalakeId") Long metalakeId);

@Delete(
"DELETE FROM "
+ TABLE_NAME
+ " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}")
Integer deleteMetalakeMetasByLegacyTimeLine(
@Param("legacyTimeLine") Long legacyTimeLine, @Param("limit") int limit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.datastrato.gravitino.storage.relational.po.SchemaPO;
import java.util.List;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
Expand Down Expand Up @@ -149,4 +150,11 @@ Integer updateSchemaMeta(
+ " SET deleted_at = UNIX_TIMESTAMP(CURRENT_TIMESTAMP(3)) * 1000.0"
+ " WHERE catalog_id = #{catalogId} AND deleted_at = 0")
Integer softDeleteSchemaMetasByCatalogId(@Param("catalogId") Long catalogId);

@Delete(
"DELETE FROM "
+ TABLE_NAME
+ " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}")
Integer deleteSchemaMetasByLegacyTimeLine(
@Param("legacyTimeLine") Long legacyTimeLine, @Param("limit") int limit);
}
Loading

0 comments on commit a1dd279

Please sign in to comment.