diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java index 4ccf9451d108..4ee51aa60c31 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestReader.java +++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java @@ -203,9 +203,11 @@ ManifestReader scanMetrics(ScanMetrics newScanMetrics) { } CloseableIterable> entries() { - if ((rowFilter != null && rowFilter != Expressions.alwaysTrue()) - || (partFilter != null && partFilter != Expressions.alwaysTrue()) - || (partitionSet != null)) { + return entries(false /* all entries */); + } + + private CloseableIterable> entries(boolean onlyLive) { + if (hasRowFilter() || hasPartitionFilter() || partitionSet != null) { Evaluator evaluator = evaluator(); InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator(); @@ -213,22 +215,34 @@ CloseableIterable> entries() { boolean requireStatsProjection = requireStatsProjection(rowFilter, columns); Collection projectColumns = requireStatsProjection ? withStatsColumns(columns) : columns; + CloseableIterable> entries = + open(projection(fileSchema, fileProjection, projectColumns, caseSensitive)); return CloseableIterable.filter( content == FileType.DATA_FILES ? scanMetrics.skippedDataFiles() : scanMetrics.skippedDeleteFiles(), - open(projection(fileSchema, fileProjection, projectColumns, caseSensitive)), + onlyLive ? filterLiveEntries(entries) : entries, entry -> entry != null && evaluator.eval(entry.file().partition()) && metricsEvaluator.eval(entry.file()) && inPartitionSet(entry.file())); } else { - return open(projection(fileSchema, fileProjection, columns, caseSensitive)); + CloseableIterable> entries = + open(projection(fileSchema, fileProjection, columns, caseSensitive)); + return onlyLive ? filterLiveEntries(entries) : entries; } } + private boolean hasRowFilter() { + return rowFilter != null && rowFilter != Expressions.alwaysTrue(); + } + + private boolean hasPartitionFilter() { + return partFilter != null && partFilter != Expressions.alwaysTrue(); + } + private boolean inPartitionSet(F fileToCheck) { return partitionSet == null || partitionSet.contains(fileToCheck.specId(), fileToCheck.partition()); @@ -266,12 +280,16 @@ private CloseableIterable> open(Schema projection) { } CloseableIterable> liveEntries() { - return CloseableIterable.filter( - content == FileType.DATA_FILES - ? scanMetrics.skippedDataFiles() - : scanMetrics.skippedDeleteFiles(), - entries(), - entry -> entry != null && entry.status() != ManifestEntry.Status.DELETED); + return entries(true /* only live entries */); + } + + private CloseableIterable> filterLiveEntries( + CloseableIterable> entries) { + return CloseableIterable.filter(entries, this::isLiveEntry); + } + + private boolean isLiveEntry(ManifestEntry entry) { + return entry != null && entry.status() != ManifestEntry.Status.DELETED; } /** @return an Iterator of DataFile. Makes defensive copies of files before returning */ diff --git a/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java b/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java index 8dbdd9cf6b7c..106c236f59b1 100644 --- a/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java +++ b/core/src/test/java/org/apache/iceberg/TestScanPlanningAndReporting.java @@ -215,19 +215,24 @@ public void scanningWithSkippedDataFiles() throws IOException { Table table = TestTables.create( tableDir, tableName, SCHEMA, SPEC, SortOrder.unsorted(), formatVersion, reporter); - table.newAppend().appendFile(FILE_A).appendFile(FILE_D).commit(); - table.newAppend().appendFile(FILE_B).appendFile(FILE_C).commit(); + table.newAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_D).commit(); + table.newOverwrite().deleteFile(FILE_A).addFile(FILE_A2).commit(); + table.newAppend().appendFile(FILE_C).commit(); TableScan tableScan = table.newScan(); + List fileTasks = Lists.newArrayList(); try (CloseableIterable fileScanTasks = tableScan.filter(Expressions.equal("data", "1")).planFiles()) { - fileScanTasks.forEach(task -> {}); + fileScanTasks.forEach(fileTasks::add); } + assertThat(fileTasks) + .singleElement() + .satisfies(task -> assertThat(task.file().path()).isEqualTo(FILE_D.path())); ScanReport scanReport = reporter.lastReport(); assertThat(scanReport).isNotNull(); assertThat(scanReport.tableName()).isEqualTo(tableName); - assertThat(scanReport.snapshotId()).isEqualTo(2L); + assertThat(scanReport.snapshotId()).isEqualTo(3L); ScanMetricsResult result = scanReport.scanMetrics(); assertThat(result.skippedDataFiles().value()).isEqualTo(1); assertThat(result.skippedDeleteFiles().value()).isEqualTo(0); @@ -236,9 +241,9 @@ public void scanningWithSkippedDataFiles() throws IOException { assertThat(result.resultDeleteFiles().value()).isEqualTo(0); assertThat(result.scannedDataManifests().value()).isEqualTo(1); assertThat(result.scannedDeleteManifests().value()).isEqualTo(0); - assertThat(result.skippedDataManifests().value()).isEqualTo(1); + assertThat(result.skippedDataManifests().value()).isEqualTo(2); assertThat(result.skippedDeleteManifests().value()).isEqualTo(0); - assertThat(result.totalDataManifests().value()).isEqualTo(2); + assertThat(result.totalDataManifests().value()).isEqualTo(3); assertThat(result.totalDeleteManifests().value()).isEqualTo(0); assertThat(result.totalFileSizeInBytes().value()).isEqualTo(10L); assertThat(result.totalDeleteFileSizeInBytes().value()).isEqualTo(0L); @@ -250,20 +255,25 @@ public void scanningWithSkippedDeleteFiles() throws IOException { Table table = TestTables.create( tableDir, tableName, SCHEMA, SPEC, SortOrder.unsorted(), formatVersion, reporter); - table.newAppend().appendFile(FILE_A).appendFile(FILE_D).commit(); + table.newAppend().appendFile(FILE_A).appendFile(FILE_B).appendFile(FILE_D).commit(); + table.newOverwrite().deleteFile(FILE_A).addFile(FILE_A2).commit(); table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_D2_DELETES).commit(); table.newRowDelta().addDeletes(FILE_B_DELETES).addDeletes(FILE_C2_DELETES).commit(); TableScan tableScan = table.newScan(); + List fileTasks = Lists.newArrayList(); try (CloseableIterable fileScanTasks = tableScan.filter(Expressions.equal("data", "1")).planFiles()) { - fileScanTasks.forEach(task -> {}); + fileScanTasks.forEach(fileTasks::add); } + assertThat(fileTasks) + .singleElement() + .satisfies(task -> assertThat(task.file().path()).isEqualTo(FILE_D.path())); ScanReport scanReport = reporter.lastReport(); assertThat(scanReport).isNotNull(); assertThat(scanReport.tableName()).isEqualTo(tableName); - assertThat(scanReport.snapshotId()).isEqualTo(3L); + assertThat(scanReport.snapshotId()).isEqualTo(4L); ScanMetricsResult result = scanReport.scanMetrics(); assertThat(result.totalPlanningDuration().totalDuration()).isGreaterThan(Duration.ZERO); assertThat(result.resultDataFiles().value()).isEqualTo(1); @@ -272,9 +282,9 @@ public void scanningWithSkippedDeleteFiles() throws IOException { assertThat(result.skippedDeleteFiles().value()).isEqualTo(1); assertThat(result.scannedDataManifests().value()).isEqualTo(1); assertThat(result.scannedDeleteManifests().value()).isEqualTo(1); - assertThat(result.skippedDataManifests().value()).isEqualTo(0); + assertThat(result.skippedDataManifests().value()).isEqualTo(1); assertThat(result.skippedDeleteManifests().value()).isEqualTo(1); - assertThat(result.totalDataManifests().value()).isEqualTo(1); + assertThat(result.totalDataManifests().value()).isEqualTo(2); assertThat(result.totalDeleteManifests().value()).isEqualTo(2); assertThat(result.totalFileSizeInBytes().value()).isEqualTo(10L); assertThat(result.totalDeleteFileSizeInBytes().value()).isEqualTo(10L);