Skip to content

Commit

Permalink
Merge pull request #15740 from cdapio/cherry_programheartbeat_clean
Browse files Browse the repository at this point in the history
[cherry-pick][6.10][CDAP-14950] Periodic clean up for program heart beat table
  • Loading branch information
itsankit-google authored Nov 20, 2024
2 parents 346ad20 + 7db4e8d commit 14645cb
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.cdap.cdap.data2.metadata.lineage.LineageTable;
import io.cdap.cdap.data2.metadata.lineage.field.FieldLineageTable;
import io.cdap.cdap.internal.app.store.AppMetadataStore;
import io.cdap.cdap.reporting.ProgramHeartbeatTable;
import io.cdap.cdap.spi.data.transaction.TransactionException;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import java.time.Clock;
Expand Down Expand Up @@ -82,7 +83,8 @@ public final class RunDataTimeToLiveService extends AbstractIdleService {
this.cleanupServiceList = ImmutableList.of(
new RunRecordsCleanupService(),
new LineageCleanupService(),
new FieldLineageCleanupService());
new FieldLineageCleanupService(),
new ProgramHeartbeatCleanupService());
}

@Override
Expand Down Expand Up @@ -188,4 +190,23 @@ public void doCleanup(Instant endDate) {
}
}
}

private class ProgramHeartbeatCleanupService implements CleanupService {

@Override
public void doCleanup(Instant endDate) {
LOG.info("Doing scheduled cleanup, deleting all ProgramHeartbeat records before {}", endDate);

try {
transactionRunner.run(
context -> {
ProgramHeartbeatTable programHeartbeatTable = new ProgramHeartbeatTable(context);

programHeartbeatTable.deleteRecordsBefore(endDate);
});
} catch (TransactionException e) {
LOG.error("Failed to clean up old field lineage records", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.cdap.cdap.reporting;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import io.cdap.cdap.api.dataset.lib.CloseableIterator;
Expand All @@ -33,6 +34,7 @@
import io.cdap.cdap.spi.data.table.field.Range;
import io.cdap.cdap.store.StoreDefinition;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -225,4 +227,22 @@ private ProgramRunId getProgramRunIdFromRow(StructuredRow row) {
row.getString(StoreDefinition.ProgramHeartbeatStore.PROGRAM_FIELD),
row.getString(StoreDefinition.ProgramHeartbeatStore.RUN_FIELD));
}


/**
* Delete the program heartbeat records that started before the {@param endTime}.
*/
public void deleteRecordsBefore(Instant endTime) throws IOException {
long endTimeEpochSecond = endTime.getEpochSecond();
table.scanDeleteAll(createTimestampEndRange(endTimeEpochSecond));
}

/**
* Create a range starting from all to the END TIME.
*/
private Range createTimestampEndRange(long endTime) {
ImmutableList<Field<?>> end = ImmutableList.of(
Fields.longField(StoreDefinition.ProgramHeartbeatStore.TIMESTAMP_SECONDS_FIELD, endTime));
return Range.to(end, Range.Bound.EXCLUSIVE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@
import io.cdap.cdap.internal.AppFabricTestHelper;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import io.cdap.cdap.spi.data.transaction.TransactionRunners;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import java.time.Instant;

public class NoSqlProgramHeartBeatTableTest extends ProgramHeartBeatTableTest {

Expand All @@ -36,4 +40,13 @@ public static void beforeClass() throws Exception {
public static void tearDown() {
AppFabricTestHelper.shutdown();
}

@Test(expected = UnsupportedOperationException.class)
public void testDeleteRecordsBeforeThrowsException() {
final Instant cutOffTime = Instant.now();
TransactionRunners.run(transactionRunner, context -> {
ProgramHeartbeatTable programHeartbeatTable = new ProgramHeartbeatTable(context);
programHeartbeatTable.deleteRecordsBefore(cutOffTime);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,35 @@
import com.google.inject.Injector;
import com.google.inject.Scopes;
import io.cdap.cdap.api.metrics.MetricsCollectionService;
import io.cdap.cdap.common.app.RunIds;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.guice.ConfigModule;
import io.cdap.cdap.common.guice.LocalLocationModule;
import io.cdap.cdap.common.metrics.NoOpMetricsCollectionService;
import io.cdap.cdap.data.runtime.StorageModule;
import io.cdap.cdap.data.runtime.SystemDatasetRuntimeModule;
import io.cdap.cdap.internal.app.store.RunRecordDetail;
import io.cdap.cdap.messaging.data.MessageId;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.spi.data.StructuredTableAdmin;
import io.cdap.cdap.spi.data.TableAlreadyExistsException;
import io.cdap.cdap.spi.data.sql.PostgresInstantiator;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import io.cdap.cdap.spi.data.transaction.TransactionRunners;
import io.cdap.cdap.store.StoreDefinition;
import io.zonky.test.db.postgres.embedded.EmbeddedPostgres;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;

import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class SqlProgramHeartBeatTableTest extends ProgramHeartBeatTableTest {
Expand Down Expand Up @@ -71,4 +84,33 @@ protected void configure() {
public static void teardown() throws IOException {
pg.close();
}

@Test
public void testDeleteRecordsBefore() {
RunRecordDetail runRecord = RunRecordDetail.builder()
.setProgramRunId(NamespaceId.DEFAULT.app("app").spark("spark").run(RunIds.generate()))
.setStartTime(System.currentTimeMillis())
.setSourceId(new byte[MessageId.RAW_ID_SIZE])
.build();
final Instant cutOffTime = Instant.now();
final Instant timeBefore5min = cutOffTime.minus(Duration.ofMinutes(5));
final Instant timeAfterCutOff = Instant.now().plus(Duration.ofMinutes(5));

TransactionRunners.run(transactionRunner, context -> {
ProgramHeartbeatTable programHeartbeatTable = new ProgramHeartbeatTable(context);
//Insert 4 Records with different times
programHeartbeatTable.writeRunRecordMeta(runRecord, timeBefore5min.getEpochSecond());
programHeartbeatTable.writeRunRecordMeta(runRecord, timeBefore5min.getEpochSecond());
programHeartbeatTable.writeRunRecordMeta(runRecord, cutOffTime.getEpochSecond());
programHeartbeatTable.writeRunRecordMeta(runRecord, timeAfterCutOff.getEpochSecond());

//Now delete all records <= cutOffTime
programHeartbeatTable.deleteRecordsBefore(cutOffTime);
Collection<RunRecordDetail> result =
programHeartbeatTable.scan(0, Long.MAX_VALUE, new HashSet<>(Arrays.asList("default")));

//This should contain only 1 record i.e. the last with timeAfterCutOff
Assert.assertEquals(1, result.size());
});
}
}

0 comments on commit 14645cb

Please sign in to comment.