Skip to content

Commit

Permalink
Core: Fix skipped file counts in ManifestReader with deleted entries (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored Sep 1, 2023
1 parent 0200cc2 commit 5996b3d
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 22 deletions.
40 changes: 29 additions & 11 deletions core/src/main/java/org/apache/iceberg/ManifestReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -203,32 +203,46 @@ ManifestReader<F> scanMetrics(ScanMetrics newScanMetrics) {
}

CloseableIterable<ManifestEntry<F>> entries() {
if ((rowFilter != null && rowFilter != Expressions.alwaysTrue())
|| (partFilter != null && partFilter != Expressions.alwaysTrue())
|| (partitionSet != null)) {
return entries(false /* all entries */);
}

private CloseableIterable<ManifestEntry<F>> entries(boolean onlyLive) {
if (hasRowFilter() || hasPartitionFilter() || partitionSet != null) {
Evaluator evaluator = evaluator();
InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();

// ensure stats columns are present for metrics evaluation
boolean requireStatsProjection = requireStatsProjection(rowFilter, columns);
Collection<String> projectColumns =
requireStatsProjection ? withStatsColumns(columns) : columns;
CloseableIterable<ManifestEntry<F>> entries =
open(projection(fileSchema, fileProjection, projectColumns, caseSensitive));

return CloseableIterable.filter(
content == FileType.DATA_FILES
? scanMetrics.skippedDataFiles()
: scanMetrics.skippedDeleteFiles(),
open(projection(fileSchema, fileProjection, projectColumns, caseSensitive)),
onlyLive ? filterLiveEntries(entries) : entries,
entry ->
entry != null
&& evaluator.eval(entry.file().partition())
&& metricsEvaluator.eval(entry.file())
&& inPartitionSet(entry.file()));
} else {
return open(projection(fileSchema, fileProjection, columns, caseSensitive));
CloseableIterable<ManifestEntry<F>> entries =
open(projection(fileSchema, fileProjection, columns, caseSensitive));
return onlyLive ? filterLiveEntries(entries) : entries;
}
}

private boolean hasRowFilter() {
return rowFilter != null && rowFilter != Expressions.alwaysTrue();
}

private boolean hasPartitionFilter() {
return partFilter != null && partFilter != Expressions.alwaysTrue();
}

private boolean inPartitionSet(F fileToCheck) {
return partitionSet == null
|| partitionSet.contains(fileToCheck.specId(), fileToCheck.partition());
Expand Down Expand Up @@ -266,12 +280,16 @@ private CloseableIterable<ManifestEntry<F>> open(Schema projection) {
}

CloseableIterable<ManifestEntry<F>> liveEntries() {
return CloseableIterable.filter(
content == FileType.DATA_FILES
? scanMetrics.skippedDataFiles()
: scanMetrics.skippedDeleteFiles(),
entries(),
entry -> entry != null && entry.status() != ManifestEntry.Status.DELETED);
return entries(true /* only live entries */);
}

private CloseableIterable<ManifestEntry<F>> filterLiveEntries(
CloseableIterable<ManifestEntry<F>> entries) {
return CloseableIterable.filter(entries, this::isLiveEntry);
}

private boolean isLiveEntry(ManifestEntry<F> entry) {
return entry != null && entry.status() != ManifestEntry.Status.DELETED;
}

/** @return an Iterator of DataFile. Makes defensive copies of files before returning */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,19 +215,24 @@ public void scanningWithSkippedDataFiles() throws IOException {
Table table =
TestTables.create(
tableDir, tableName, SCHEMA, SPEC, SortOrder.unsorted(), formatVersion, reporter);
table.newAppend().appendFile(FILE_A).appendFile(FILE_D).commit();
table.newAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
table.newAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_D).commit();
table.newOverwrite().deleteFile(FILE_A).addFile(FILE_A2).commit();
table.newAppend().appendFile(FILE_C).commit();
TableScan tableScan = table.newScan();

List<FileScanTask> fileTasks = Lists.newArrayList();
try (CloseableIterable<FileScanTask> fileScanTasks =
tableScan.filter(Expressions.equal("data", "1")).planFiles()) {
fileScanTasks.forEach(task -> {});
fileScanTasks.forEach(fileTasks::add);
}
assertThat(fileTasks)
.singleElement()
.satisfies(task -> assertThat(task.file().path()).isEqualTo(FILE_D.path()));

ScanReport scanReport = reporter.lastReport();
assertThat(scanReport).isNotNull();
assertThat(scanReport.tableName()).isEqualTo(tableName);
assertThat(scanReport.snapshotId()).isEqualTo(2L);
assertThat(scanReport.snapshotId()).isEqualTo(3L);
ScanMetricsResult result = scanReport.scanMetrics();
assertThat(result.skippedDataFiles().value()).isEqualTo(1);
assertThat(result.skippedDeleteFiles().value()).isEqualTo(0);
Expand All @@ -236,9 +241,9 @@ public void scanningWithSkippedDataFiles() throws IOException {
assertThat(result.resultDeleteFiles().value()).isEqualTo(0);
assertThat(result.scannedDataManifests().value()).isEqualTo(1);
assertThat(result.scannedDeleteManifests().value()).isEqualTo(0);
assertThat(result.skippedDataManifests().value()).isEqualTo(1);
assertThat(result.skippedDataManifests().value()).isEqualTo(2);
assertThat(result.skippedDeleteManifests().value()).isEqualTo(0);
assertThat(result.totalDataManifests().value()).isEqualTo(2);
assertThat(result.totalDataManifests().value()).isEqualTo(3);
assertThat(result.totalDeleteManifests().value()).isEqualTo(0);
assertThat(result.totalFileSizeInBytes().value()).isEqualTo(10L);
assertThat(result.totalDeleteFileSizeInBytes().value()).isEqualTo(0L);
Expand All @@ -250,20 +255,25 @@ public void scanningWithSkippedDeleteFiles() throws IOException {
Table table =
TestTables.create(
tableDir, tableName, SCHEMA, SPEC, SortOrder.unsorted(), formatVersion, reporter);
table.newAppend().appendFile(FILE_A).appendFile(FILE_D).commit();
table.newAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_D).commit();
table.newOverwrite().deleteFile(FILE_A).addFile(FILE_A2).commit();
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_D2_DELETES).commit();
table.newRowDelta().addDeletes(FILE_B_DELETES).addDeletes(FILE_C2_DELETES).commit();
TableScan tableScan = table.newScan();

List<FileScanTask> fileTasks = Lists.newArrayList();
try (CloseableIterable<FileScanTask> fileScanTasks =
tableScan.filter(Expressions.equal("data", "1")).planFiles()) {
fileScanTasks.forEach(task -> {});
fileScanTasks.forEach(fileTasks::add);
}
assertThat(fileTasks)
.singleElement()
.satisfies(task -> assertThat(task.file().path()).isEqualTo(FILE_D.path()));

ScanReport scanReport = reporter.lastReport();
assertThat(scanReport).isNotNull();
assertThat(scanReport.tableName()).isEqualTo(tableName);
assertThat(scanReport.snapshotId()).isEqualTo(3L);
assertThat(scanReport.snapshotId()).isEqualTo(4L);
ScanMetricsResult result = scanReport.scanMetrics();
assertThat(result.totalPlanningDuration().totalDuration()).isGreaterThan(Duration.ZERO);
assertThat(result.resultDataFiles().value()).isEqualTo(1);
Expand All @@ -272,9 +282,9 @@ public void scanningWithSkippedDeleteFiles() throws IOException {
assertThat(result.skippedDeleteFiles().value()).isEqualTo(1);
assertThat(result.scannedDataManifests().value()).isEqualTo(1);
assertThat(result.scannedDeleteManifests().value()).isEqualTo(1);
assertThat(result.skippedDataManifests().value()).isEqualTo(0);
assertThat(result.skippedDataManifests().value()).isEqualTo(1);
assertThat(result.skippedDeleteManifests().value()).isEqualTo(1);
assertThat(result.totalDataManifests().value()).isEqualTo(1);
assertThat(result.totalDataManifests().value()).isEqualTo(2);
assertThat(result.totalDeleteManifests().value()).isEqualTo(2);
assertThat(result.totalFileSizeInBytes().value()).isEqualTo(10L);
assertThat(result.totalDeleteFileSizeInBytes().value()).isEqualTo(10L);
Expand Down

0 comments on commit 5996b3d

Please sign in to comment.