Skip to content

Commit

Permalink
[HUDI-7235] Fix checkpoint bug for S3/GCS Incremental Source (#10336)
Browse files Browse the repository at this point in the history
Co-authored-by: Balaji Varadarajan <[email protected]>
Co-authored-by: Balaji Varadarajan <[email protected]>
  • Loading branch information
3 people authored and yihua committed May 15, 2024
1 parent d4ef0b6 commit d42f399
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastChec
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
filteredSourceData, sourceLimit, queryInfo, cloudObjectIncrCheckpoint);
if (!checkPointAndDataset.getRight().isPresent()) {
LOG.info("Empty source, returning endpoint:" + queryInfo.getEndInstant());
return Pair.of(Option.empty(), queryInfo.getEndInstant());
LOG.info("Empty source, returning endpoint:" + checkPointAndDataset.getLeft());
return Pair.of(Option.empty(), checkPointAndDataset.getLeft().toString());
}
LOG.info("Adjusted end checkpoint :" + checkPointAndDataset.getLeft());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastChec
IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
filteredSourceData, sourceLimit, queryInfo, cloudObjectIncrCheckpoint);
if (!checkPointAndDataset.getRight().isPresent()) {
LOG.info("Empty source, returning endpoint:" + queryInfo.getEndInstant());
return Pair.of(Option.empty(), queryInfo.getEndInstant());
LOG.info("Empty source, returning endpoint:" + checkPointAndDataset.getLeft());
return Pair.of(Option.empty(), checkPointAndDataset.getLeft().toString());
}
LOG.info("Adjusted end checkpoint :" + checkPointAndDataset.getLeft());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,12 @@ public static Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> filterAndGen
long sourceLimit, QueryInfo queryInfo,
CloudObjectIncrCheckpoint cloudObjectIncrCheckpoint) {
if (sourceData.isEmpty()) {
return Pair.of(cloudObjectIncrCheckpoint, Option.empty());
// There is no file matching the prefix.
CloudObjectIncrCheckpoint updatedCheckpoint =
queryInfo.getEndInstant().equals(cloudObjectIncrCheckpoint.getCommit())
? cloudObjectIncrCheckpoint
: new CloudObjectIncrCheckpoint(queryInfo.getEndInstant(), null);
return Pair.of(updatedCheckpoint, Option.empty());
}
// Let's persist the dataset to avoid triggering the dag repeatedly
sourceData.persist(StorageLevel.MEMORY_AND_DISK());
Expand All @@ -199,11 +204,18 @@ public static Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> filterAndGen
functions.concat(functions.col(queryInfo.getOrderColumn()), functions.col(queryInfo.getKeyColumn())));
// Apply incremental filter
orderedDf = orderedDf.filter(functions.col("commit_key").gt(concatenatedKey.get())).drop("commit_key");
// We could be just at the end of the commit, so return empty
// If there are no more files where commit_key is greater than lastCheckpointCommit#lastCheckpointKey
if (orderedDf.isEmpty()) {
LOG.info("Empty ordered source, returning endpoint:" + queryInfo.getEndInstant());
sourceData.unpersist();
return Pair.of(new CloudObjectIncrCheckpoint(queryInfo.getEndInstant(), lastCheckpointKey.get()), Option.empty());
// queryInfo.getEndInstant() represents source table's last completed instant
// If current checkpoint is c1#abc and queryInfo.getEndInstant() is c1, return c1#abc.
// If current checkpoint is c1#abc and queryInfo.getEndInstant() is c2, return c2.
CloudObjectIncrCheckpoint updatedCheckpoint =
queryInfo.getEndInstant().equals(cloudObjectIncrCheckpoint.getCommit())
? cloudObjectIncrCheckpoint
: new CloudObjectIncrCheckpoint(queryInfo.getEndInstant(), null);
return Pair.of(updatedCheckpoint, Option.empty());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public void testTwoFilesAndContinueAcrossCommits(String extension) throws IOExce
@CsvSource({
"1,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,1",
"2,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,2",
"3,3#path/to/file5.json,3,1#path/to/file1.json,3"
"3,3#path/to/file5.json,3#path/to/file5.json,1#path/to/file1.json,3"
})
public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, String exptected2, String exptected3, String exptected4) throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,8 @@ public void testEmptyDataAfterFilter() throws IOException {

readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 1000L, "2", typedProperties);
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file3.json"), 1000L, "2", typedProperties);
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("2#path/to/skip4.json"), 1000L, "2", typedProperties);
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("2#path/to/skip5.json"), 1000L, "2", typedProperties);
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("2#path/to/skip4.json"), 1000L, "2#path/to/skip4.json", typedProperties);
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("2#path/to/skip5.json"), 1000L, "2#path/to/skip5.json", typedProperties);
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("2"), 1000L, "2", typedProperties);
}

Expand Down Expand Up @@ -434,7 +434,7 @@ public void testFilterAnEntireMiddleCommit() throws IOException {
@CsvSource({
"1,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,1",
"2,1#path/to/file2.json,3#path/to/file4.json,1#path/to/file1.json,2",
"3,3#path/to/file5.json,3,1#path/to/file1.json,3"
"3,3#path/to/file5.json,3#path/to/file5.json,1#path/to/file1.json,3"
})
public void testSplitSnapshotLoad(String snapshotCheckPoint, String exptected1, String exptected2, String exptected3, String exptected4) throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import java.util.stream.Collectors;

import static org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.INIT_INSTANT_TS;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -122,7 +121,7 @@ void testEmptySource() {
"s3.object.key", "s3.object.size");
Pair<CloudObjectIncrCheckpoint, Option<Dataset<Row>>> result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
emptyDataset, 50L, queryInfo, new CloudObjectIncrCheckpoint(null, null));
assertEquals(INIT_INSTANT_TS, result.getKey().toString());
assertEquals("commit2", result.getKey().toString());
assertTrue(!result.getRight().isPresent());
}

Expand Down Expand Up @@ -261,8 +260,10 @@ void testLastObjectInCommit() {
filePathSizeAndCommitTime.add(Triple.of("path/to/file8.json", 100L, "commit3"));
filePathSizeAndCommitTime.add(Triple.of("path/to/file6.json", 250L, "commit3"));
filePathSizeAndCommitTime.add(Triple.of("path/to/file7.json", 50L, "commit3"));
filePathSizeAndCommitTime.add(Triple.of("path/to/file8.json", 50L, "commit3"));
Dataset<Row> inputDs = generateDataset(filePathSizeAndCommitTime);

// Test case 1 when queryInfo.endInstant() is equal to lastCheckpointCommit
QueryInfo queryInfo = new QueryInfo(
QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1",
"commit3", "_hoodie_commit_time",
Expand All @@ -271,6 +272,15 @@ void testLastObjectInCommit() {
inputDs, 1500L, queryInfo, new CloudObjectIncrCheckpoint("commit3", "path/to/file8.json"));
assertEquals("commit3#path/to/file8.json", result.getKey().toString());
assertTrue(!result.getRight().isPresent());
// Test case 2 when queryInfo.endInstant() is greater than lastCheckpointCommit
queryInfo = new QueryInfo(
QUERY_TYPE_INCREMENTAL_OPT_VAL(), "commit1", "commit1",
"commit4", "_hoodie_commit_time",
"s3.object.key", "s3.object.size");
result = IncrSourceHelper.filterAndGenerateCheckpointBasedOnSourceLimit(
inputDs, 1500L, queryInfo, new CloudObjectIncrCheckpoint("commit3","path/to/file8.json"));
assertEquals("commit4", result.getKey().toString());
assertTrue(!result.getRight().isPresent());
}

private HoodieRecord generateS3EventMetadata(String commitTime, String bucketName, String objectKey, Long objectSize) {
Expand Down

0 comments on commit d42f399

Please sign in to comment.