Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2385] feat(core): Add the Relational Garbage Collector #3016

Merged
merged 15 commits into from
Apr 20, 2024
1 change: 1 addition & 0 deletions core/src/main/java/com/datastrato/gravitino/Configs.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public interface Configs {
String DEFAULT_KV_ROCKSDB_BACKEND_PATH =
String.join(File.separator, System.getenv("GRAVITINO_HOME"), "data", "rocksdb");

int GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT = 100;
YxAc marked this conversation as resolved.
Show resolved Hide resolved
long MAX_NODE_IN_MEMORY = 100000L;

long MIN_NODE_IN_MEMORY = 1000L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package com.datastrato.gravitino.storage.relational;

import static com.datastrato.gravitino.Configs.GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT;

import com.datastrato.gravitino.Config;
import com.datastrato.gravitino.Configs;
import com.datastrato.gravitino.Entity;
Expand All @@ -22,6 +24,7 @@
import com.datastrato.gravitino.meta.TableEntity;
import com.datastrato.gravitino.meta.TopicEntity;
import com.datastrato.gravitino.storage.relational.converters.SQLExceptionConverterFactory;
import com.datastrato.gravitino.storage.relational.po.FilesetVersionPO;
import com.datastrato.gravitino.storage.relational.service.CatalogMetaService;
import com.datastrato.gravitino.storage.relational.service.FilesetMetaService;
import com.datastrato.gravitino.storage.relational.service.MetalakeMetaService;
Expand All @@ -32,6 +35,8 @@
import java.io.IOException;
import java.util.List;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* {@link JDBCBackend} is a jdbc implementation of {@link RelationalBackend} interface. You can use
Expand All @@ -41,6 +46,8 @@
*/
public class JDBCBackend implements RelationalBackend {

private static final Logger LOG = LoggerFactory.getLogger(JDBCBackend.class);

/** Initialize the jdbc backend instance. */
@Override
public void initialize(Config config) {
Expand Down Expand Up @@ -167,6 +174,105 @@ public boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolea
}
}

@Override
public int hardDeleteLegacyData(Entity.EntityType entityType, long legacyTimeLine) {
LOG.info(
"Try to physically delete {} legacy data that has been marked deleted before {}",
entityType,
legacyTimeLine);

switch (entityType) {
case METALAKE:
return MetalakeMetaService.getInstance()
.deleteMetalakeMetasByLegacyTimeLine(
legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
case CATALOG:
return CatalogMetaService.getInstance()
.deleteCatalogMetasByLegacyTimeLine(
legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
case SCHEMA:
return SchemaMetaService.getInstance()
.deleteSchemaMetasByLegacyTimeLine(
legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
case TABLE:
return TableMetaService.getInstance()
.deleteTableMetasByLegacyTimeLine(
legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
case FILESET:
return FilesetMetaService.getInstance()
.deleteFilesetAndVersionMetasByLegacyTimeLine(
legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
case TOPIC:
return TopicMetaService.getInstance()
.deleteTopicMetasByLegacyTimeLine(
legacyTimeLine, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);

case COLUMN:
case USER:
case GROUP:
case AUDIT:
case ROLE:
return 0;
// TODO: Implement hard delete logic for these entity types.

default:
throw new IllegalArgumentException(
"Unsupported entity type when collectAndRemoveLegacyData: " + entityType);
}
}

@Override
public int hardDeleteOldVersionData(Entity.EntityType entityType, long versionRetentionCount) {
switch (entityType) {
case METALAKE:
case CATALOG:
case SCHEMA:
case TABLE:
case COLUMN:
case TOPIC:
case USER:
case GROUP:
case AUDIT:
case ROLE:
// These entity types have not implemented multi-versions, so we can skip.
return 0;

case FILESET:
// Get the current version of all filesets.
List<FilesetVersionPO> filesetCurVersions =
YxAc marked this conversation as resolved.
Show resolved Hide resolved
FilesetMetaService.getInstance()
.getFilesetVersionPOsByRetentionCount(versionRetentionCount);

// Delete old versions that are older than or equal to (currentVersion -
// versionRetentionCount).
int totalDeletedCount = 0;
for (FilesetVersionPO filesetVersionPO : filesetCurVersions) {
long versionRetentionLine =
filesetVersionPO.getVersion().longValue() - versionRetentionCount;
int deletedCount =
FilesetMetaService.getInstance()
.deleteFilesetVersionsByRetentionLine(
filesetVersionPO.getFilesetId(),
versionRetentionLine,
GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
totalDeletedCount += deletedCount;

// Log the deletion by current fileset version.
LOG.info(
"Physically delete filesetVersions count: {} which versions are older than or equal to"
+ " versionRetentionLine: {}, the current FilesetVersion is: {}.",
deletedCount,
versionRetentionLine,
filesetVersionPO);
}
return totalDeletedCount;

default:
throw new IllegalArgumentException(
"Unsupported entity type when collectAndRemoveOldVersionData: " + entityType);
}
}

@Override
public void close() throws IOException {
SqlSessionFactoryHelper.getInstance().close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,32 @@ <E extends Entity & HasIdentifier> E get(NameIdentifier ident, Entity.EntityType
throws IOException;

/**
* Deletes the entity associated with the identifier and the entity type.
* Soft deletes the entity associated with the identifier and the entity type.
*
* @param ident The identifier of the entity.
* @param entityType The type of the entity.
* @param cascade True, If you need to cascade delete entities, else false.
* @return True, if the entity was successfully deleted, else false.
*/
boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolean cascade);

/**
* Permanent deletes the legacy data that has been marked as deleted before the given legacy
YxAc marked this conversation as resolved.
Show resolved Hide resolved
* timeline.
*
* @param entityType The type of the entity.
* @param legacyTimeLine The time before which the data has been marked as deleted.
* @return The count of the deleted data.
*/
int hardDeleteLegacyData(Entity.EntityType entityType, long legacyTimeLine);

/**
* Permanent deletes the old version data that is older than or equal to the given version
* retention count.
*
* @param entityType The type of the entity.
* @param versionRetentionCount The count of versions to retain.
* @return The count of the deleted data.
*/
int hardDeleteOldVersionData(Entity.EntityType entityType, long versionRetentionCount);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@ public class RelationalEntityStore implements EntityStore {
ImmutableMap.of(
Configs.DEFAULT_ENTITY_RELATIONAL_STORE, JDBCBackend.class.getCanonicalName());
private RelationalBackend backend;
private RelationalGarbageCollector garbageCollector;

@Override
public void initialize(Config config) throws RuntimeException {
this.backend = createRelationalEntityBackend(config);
this.garbageCollector = new RelationalGarbageCollector(backend, config);
this.garbageCollector.start();
}

private static RelationalBackend createRelationalEntityBackend(Config config) {
Expand Down Expand Up @@ -109,6 +112,7 @@ public <R, E extends Exception> R executeInTransaction(Executable<R, E> executab

@Override
public void close() throws IOException {
garbageCollector.close();
backend.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.storage.relational;

import static com.datastrato.gravitino.Configs.STORE_DELETE_AFTER_TIME;
import static com.datastrato.gravitino.Configs.VERSION_RETENTION_COUNT;

import com.datastrato.gravitino.Config;
import com.datastrato.gravitino.Entity;
import com.google.common.annotations.VisibleForTesting;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class RelationalGarbageCollector implements Closeable {

private static final Logger LOG = LoggerFactory.getLogger(RelationalGarbageCollector.class);
private final RelationalBackend backend;

private final Config config;
private final long storeDeleteAfterTimeMillis;
private final long versionRetentionCount;

@VisibleForTesting
final ScheduledExecutorService garbageCollectorPool =
new ScheduledThreadPoolExecutor(
2,
r -> {
Thread t = new Thread(r, "RelationalBackend-Garbage-Collector");
YxAc marked this conversation as resolved.
Show resolved Hide resolved
t.setDaemon(true);
return t;
},
new ThreadPoolExecutor.AbortPolicy());

public RelationalGarbageCollector(RelationalBackend backend, Config config) {
this.backend = backend;
this.config = config;
storeDeleteAfterTimeMillis = this.config.get(STORE_DELETE_AFTER_TIME);
versionRetentionCount = this.config.get(VERSION_RETENTION_COUNT);
}

public void start() {
long dateTimeLineMinute = storeDeleteAfterTimeMillis / 1000 / 60;

// We will collect garbage every 10 minutes at least. If the dateTimeLineMinute is larger than
// 100 minutes, we would collect garbage every dateTimeLineMinute/10 minutes.
long frequency = Math.max(dateTimeLineMinute / 10, 10);
garbageCollectorPool.scheduleAtFixedRate(this::collectAndClean, 5, frequency, TimeUnit.MINUTES);
}

private void collectAndClean() {
long threadId = Thread.currentThread().getId();
LOG.info("Thread {} start to collect garbage...", threadId);

try {
LOG.info("Start to collect and delete legacy data by thread {}", threadId);
long legacyTimeLine = System.currentTimeMillis() - storeDeleteAfterTimeMillis;
for (Entity.EntityType entityType : Entity.EntityType.values()) {
YxAc marked this conversation as resolved.
Show resolved Hide resolved
long deletedCount = Long.MAX_VALUE;
while (deletedCount > 0) {
deletedCount = backend.hardDeleteLegacyData(entityType, legacyTimeLine);
}
}

LOG.info("Start to collect and delete old version data by thread {}", threadId);
for (Entity.EntityType entityType : Entity.EntityType.values()) {
long deletedCount = Long.MAX_VALUE;
while (deletedCount > 0) {
deletedCount = backend.hardDeleteOldVersionData(entityType, versionRetentionCount);
YxAc marked this conversation as resolved.
Show resolved Hide resolved
}
}
} catch (Exception e) {
LOG.error("Thread {} failed to collect and clean garbage.", threadId, e);
} finally {
LOG.info("Thread {} finish to collect garbage.", threadId);
}
}

@Override
public void close() throws IOException {
this.garbageCollectorPool.shutdown();
try {
if (!this.garbageCollectorPool.awaitTermination(5, TimeUnit.SECONDS)) {
this.garbageCollectorPool.shutdownNow();
}
} catch (InterruptedException ex) {
this.garbageCollectorPool.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.datastrato.gravitino.storage.relational.po.CatalogPO;
import java.util.List;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
Expand Down Expand Up @@ -147,4 +148,11 @@ Integer updateCatalogMeta(
+ " SET deleted_at = UNIX_TIMESTAMP(CURRENT_TIMESTAMP(3)) * 1000.0"
+ " WHERE metalake_id = #{metalakeId} AND deleted_at = 0")
Integer softDeleteCatalogMetasByMetalakeId(@Param("metalakeId") Long metalakeId);

@Delete(
"DELETE FROM "
+ TABLE_NAME
+ " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}")
Integer deleteCatalogMetasByLegacyTimeLine(
@Param("legacyTimeLine") Long legacyTimeLine, @Param("limit") int limit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.datastrato.gravitino.storage.relational.po.FilesetPO;
import java.util.List;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Result;
Expand Down Expand Up @@ -214,4 +215,11 @@ Integer updateFilesetMeta(
+ " SET deleted_at = UNIX_TIMESTAMP(CURRENT_TIMESTAMP(3)) * 1000.0"
+ " WHERE fileset_id = #{filesetId} AND deleted_at = 0")
Integer softDeleteFilesetMetasByFilesetId(@Param("filesetId") Long filesetId);

@Delete(
"DELETE FROM "
+ META_TABLE_NAME
+ " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}")
Integer deleteFilesetMetasByLegacyTimeLine(
@Param("legacyTimeLine") Long legacyTimeLine, @Param("limit") int limit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
package com.datastrato.gravitino.storage.relational.mapper;

import com.datastrato.gravitino.storage.relational.po.FilesetVersionPO;
import java.util.List;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;

/**
Expand Down Expand Up @@ -97,4 +100,32 @@ void insertFilesetVersionOnDuplicateKeyUpdate(
+ " SET deleted_at = UNIX_TIMESTAMP(CURRENT_TIMESTAMP(3)) * 1000.0"
+ " WHERE fileset_id = #{filesetId} AND deleted_at = 0")
Integer softDeleteFilesetVersionsByFilesetId(@Param("filesetId") Long filesetId);

@Delete(
"DELETE FROM "
+ VERSION_TABLE_NAME
+ " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeLine} LIMIT #{limit}")
Integer deleteFilesetVersionsByLegacyTimeLine(
@Param("legacyTimeLine") Long legacyTimeLine, @Param("limit") int limit);

@Select(
"SELECT fileset_id as filesetId,"
+ " Max(version) as version, deleted_at as deletedAt,"
+ " metalake_id as metalakeId, catalog_id as catalogId, schema_id as schemaId,"
+ " storage_location as storageLocation"
+ " FROM "
+ VERSION_TABLE_NAME
+ " WHERE version > #{versionRetentionCount} AND deleted_at = 0"
+ " GROUP BY fileset_id")
List<FilesetVersionPO> selectFilesetVersionsByRetentionCount(
YxAc marked this conversation as resolved.
Show resolved Hide resolved
@Param("versionRetentionCount") Long versionRetentionCount);

@Delete(
"DELETE FROM "
+ VERSION_TABLE_NAME
+ " WHERE fileset_id = #{filesetId} AND version <= #{versionRetentionLine} AND deleted_at = 0 LIMIT #{limit}")
Integer deleteFilesetVersionsByRetentionLine(
@Param("filesetId") Long filesetId,
@Param("versionRetentionLine") Long versionRetentionLine,
@Param("limit") int limit);
}
Loading
Loading