Skip to content

Commit

Permalink
Core: Enable column statistics filtering after planning (apache#8803)
Browse files Browse the repository at this point in the history
  • Loading branch information
pvary authored and devangjhabakh committed Apr 22, 2024
1 parent a79909d commit 642d47d
Show file tree
Hide file tree
Showing 25 changed files with 422 additions and 59 deletions.
5 changes: 5 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,11 @@ acceptedBreaks:
old: "method void org.apache.iceberg.encryption.Ciphers::<init>()"
new: "method void org.apache.iceberg.encryption.Ciphers::<init>()"
justification: "Static utility class - should not have public constructor"
"1.4.0":
org.apache.iceberg:iceberg-core:
- code: "java.field.serialVersionUIDChanged"
new: "field org.apache.iceberg.util.SerializableMap<K, V>.serialVersionUID"
justification: "Serialization is not be used"
apache-iceberg-0.14.0:
org.apache.iceberg:iceberg-api:
- code: "java.class.defaultSerializationChanged"
Expand Down
5 changes: 5 additions & 0 deletions api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ public BatchScan includeColumnStats() {
return new BatchScanAdapter(scan.includeColumnStats());
}

@Override
public BatchScan includeColumnStats(Collection<String> requestedColumns) {
return new BatchScanAdapter(scan.includeColumnStats(requestedColumns));
}

@Override
public BatchScan select(Collection<String> columns) {
return new BatchScanAdapter(scan.select(columns));
Expand Down
15 changes: 15 additions & 0 deletions api/src/main/java/org/apache/iceberg/ContentFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Superinterface of {@link DataFile} and {@link DeleteFile} that exposes common methods.
Expand Down Expand Up @@ -165,6 +166,20 @@ default Long fileSequenceNumber() {
*/
F copyWithoutStats();

/**
* Copies this file with column stats only for specific columns. Manifest readers can reuse file
* instances; use this method to copy data with stats only for specific columns when collecting
* files.
*
* @param requestedColumnIds column IDs for which to keep stats.
* @return a copy of data file, with lower bounds, upper bounds, value counts, null value counts,
* and nan value counts for only specific columns.
*/
default F copyWithStats(Set<Integer> requestedColumnIds) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement copyWithStats");
}

/**
* Copies this file (potentially without file stats). Manifest readers can reuse file instances;
* use this method to copy data when collecting files from tasks.
Expand Down
14 changes: 14 additions & 0 deletions api/src/main/java/org/apache/iceberg/Scan.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,20 @@ public interface Scan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>> {
*/
ThisT includeColumnStats();

/**
* Create a new scan from this that loads the column stats for the specific columns with each data
* file.
*
* <p>Column stats include: value count, null value count, lower bounds, and upper bounds.
*
* @param requestedColumns column names for which to keep the stats.
* @return a new scan based on this that loads column stats for specific columns.
*/
default ThisT includeColumnStats(Collection<String> requestedColumns) {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement includeColumnStats");
}

/**
* Create a new scan from this that will read the given data columns. This produces an expected
* schema that includes all fields that are either selected or used by this scan's filter
Expand Down
6 changes: 6 additions & 0 deletions api/src/test/java/org/apache/iceberg/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.IntStream;
import org.apache.iceberg.expressions.BoundPredicate;
import org.apache.iceberg.expressions.BoundSetPredicate;
Expand Down Expand Up @@ -666,6 +667,11 @@ public DataFile copyWithoutStats() {
return this;
}

@Override
public DataFile copyWithStats(Set<Integer> requestedColumns) {
return this;
}

@Override
public List<Long> splitOffsets() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.iceberg.metrics.ScanMetricsUtil;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.ParallelIterable;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.iceberg.util.ThreadPools;
Expand Down Expand Up @@ -368,14 +369,18 @@ private CloseableIterable<ScanTask> toFileTasks(
ScanMetricsUtil.fileTask(scanMetrics(), dataFile, deleteFiles);

return new BaseFileScanTask(
copyDataFiles ? dataFile.copy(shouldReturnColumnStats()) : dataFile,
copyDataFiles ? copy(dataFile) : dataFile,
deleteFiles,
schemaString,
specString,
residuals);
});
}

private <F extends ContentFile<F>> F copy(F file) {
return ContentFileUtil.copy(file, shouldReturnColumnStats(), columnsToKeepStats());
}

private ManifestEvaluator newManifestEvaluator(PartitionSpec spec) {
Expression projection = Projections.inclusive(spec, isCaseSensitive()).project(filter());
return ManifestEvaluator.forPartitionFilter(projection, spec, isCaseSensitive());
Expand Down
30 changes: 21 additions & 9 deletions core/src/main/java/org/apache/iceberg/BaseFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.specific.SpecificData;
Expand Down Expand Up @@ -170,9 +171,11 @@ public PartitionData copy() {
* Copy constructor.
*
* @param toCopy a generic data file to copy.
* @param fullCopy whether to copy all fields or to drop column-level stats
* @param copyStats whether to copy all fields or to drop column-level stats
* @param requestedColumnIds column ids for which to keep stats. If <code>null</code> then every
* column stat is kept.
*/
BaseFile(BaseFile<F> toCopy, boolean fullCopy) {
BaseFile(BaseFile<F> toCopy, boolean copyStats, Set<Integer> requestedColumnIds) {
this.fileOrdinal = toCopy.fileOrdinal;
this.partitionSpecId = toCopy.partitionSpecId;
this.content = toCopy.content;
Expand All @@ -182,13 +185,13 @@ public PartitionData copy() {
this.partitionType = toCopy.partitionType;
this.recordCount = toCopy.recordCount;
this.fileSizeInBytes = toCopy.fileSizeInBytes;
if (fullCopy) {
this.columnSizes = SerializableMap.copyOf(toCopy.columnSizes);
this.valueCounts = SerializableMap.copyOf(toCopy.valueCounts);
this.nullValueCounts = SerializableMap.copyOf(toCopy.nullValueCounts);
this.nanValueCounts = SerializableMap.copyOf(toCopy.nanValueCounts);
this.lowerBounds = SerializableByteBufferMap.wrap(SerializableMap.copyOf(toCopy.lowerBounds));
this.upperBounds = SerializableByteBufferMap.wrap(SerializableMap.copyOf(toCopy.upperBounds));
if (copyStats) {
this.columnSizes = copyMap(toCopy.columnSizes, requestedColumnIds);
this.valueCounts = copyMap(toCopy.valueCounts, requestedColumnIds);
this.nullValueCounts = copyMap(toCopy.nullValueCounts, requestedColumnIds);
this.nanValueCounts = copyMap(toCopy.nanValueCounts, requestedColumnIds);
this.lowerBounds = copyByteBufferMap(toCopy.lowerBounds, requestedColumnIds);
this.upperBounds = copyByteBufferMap(toCopy.upperBounds, requestedColumnIds);
} else {
this.columnSizes = null;
this.valueCounts = null;
Expand Down Expand Up @@ -493,6 +496,15 @@ public Integer sortOrderId() {
return sortOrderId;
}

private static <K, V> Map<K, V> copyMap(Map<K, V> map, Set<K> keys) {
return keys == null ? SerializableMap.copyOf(map) : SerializableMap.filteredCopyOf(map, keys);
}

private static Map<Integer, ByteBuffer> copyByteBufferMap(
Map<Integer, ByteBuffer> map, Set<Integer> keys) {
return SerializableByteBufferMap.wrap(copyMap(map, keys));
}

private static <K, V> Map<K, V> toReadableMap(Map<K, V> map) {
if (map == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ private CloseableIterable<FileScanTask> appendFilesFromSnapshots(List<Snapshot>
snapshotIds.contains(manifestEntry.snapshotId())
&& manifestEntry.status() == ManifestEntry.Status.ADDED)
.specsById(table().specs())
.ignoreDeleted();
.ignoreDeleted()
.columnsToKeepStats(columnsToKeepStats());

if (context().ignoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ protected CloseableIterable<ChangelogScanTask> doPlanFiles(
.select(scanColumns())
.filterData(filter())
.filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId()))
.ignoreExisting();
.ignoreExisting()
.columnsToKeepStats(columnsToKeepStats());

if (shouldIgnoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
Expand Down
18 changes: 18 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
Expand Down Expand Up @@ -122,6 +123,10 @@ protected boolean shouldReturnColumnStats() {
return context().returnColumnStats();
}

protected Set<Integer> columnsToKeepStats() {
return context().columnsToKeepStats();
}

protected boolean shouldIgnoreResiduals() {
return context().ignoreResiduals();
}
Expand Down Expand Up @@ -166,6 +171,19 @@ public ThisT includeColumnStats() {
return newRefinedScan(table, schema, context.shouldReturnColumnStats(true));
}

@Override
public ThisT includeColumnStats(Collection<String> requestedColumns) {
return newRefinedScan(
table,
schema,
context
.shouldReturnColumnStats(true)
.columnsToKeepStats(
requestedColumns.stream()
.map(c -> schema.findField(c).fieldId())
.collect(Collectors.toSet())));
}

@Override
public ThisT select(Collection<String> columns) {
return newRefinedScan(table, schema, context.selectColumns(columns));
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/DataScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ protected ManifestGroup newManifestGroup(
.filterData(filter())
.specsById(table().specs())
.scanMetrics(scanMetrics())
.ignoreDeleted();
.ignoreDeleted()
.columnsToKeepStats(columnsToKeepStats());

if (shouldIgnoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/DataTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public CloseableIterable<FileScanTask> doPlanFiles() {
.filterData(filter())
.specsById(table().specs())
.scanMetrics(scanMetrics())
.ignoreDeleted();
.ignoreDeleted()
.columnsToKeepStats(columnsToKeepStats());

if (shouldIgnoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
Expand Down
19 changes: 14 additions & 5 deletions core/src/main/java/org/apache/iceberg/GenericDataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -66,23 +67,31 @@ class GenericDataFile extends BaseFile<DataFile> implements DataFile {
* Copy constructor.
*
* @param toCopy a generic data file to copy.
* @param fullCopy whether to copy all fields or to drop column-level stats
* @param copyStats whether to copy all fields or to drop column-level stats.
* @param requestedColumnIds column ids for which to keep stats. If <code>null</code> then every
* column stat is kept.
*/
private GenericDataFile(GenericDataFile toCopy, boolean fullCopy) {
super(toCopy, fullCopy);
private GenericDataFile(
GenericDataFile toCopy, boolean copyStats, Set<Integer> requestedColumnIds) {
super(toCopy, copyStats, requestedColumnIds);
}

/** Constructor for Java serialization. */
GenericDataFile() {}

@Override
public DataFile copyWithoutStats() {
return new GenericDataFile(this, false /* drop stats */);
return new GenericDataFile(this, false /* drop stats */, null);
}

@Override
public DataFile copyWithStats(Set<Integer> requestedColumnIds) {
return new GenericDataFile(this, true, requestedColumnIds);
}

@Override
public DataFile copy() {
return new GenericDataFile(this, true /* full copy */);
return new GenericDataFile(this, true /* full copy */, null);
}

@Override
Expand Down
19 changes: 14 additions & 5 deletions core/src/main/java/org/apache/iceberg/GenericDeleteFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -67,23 +68,31 @@ class GenericDeleteFile extends BaseFile<DeleteFile> implements DeleteFile {
* Copy constructor.
*
* @param toCopy a generic data file to copy.
* @param fullCopy whether to copy all fields or to drop column-level stats
* @param copyStats whether to copy all fields or to drop column-level stats.
* @param requestedColumnIds column ids for which to keep stats. If <code>null</code> then every
* column stat is kept.
*/
private GenericDeleteFile(GenericDeleteFile toCopy, boolean fullCopy) {
super(toCopy, fullCopy);
private GenericDeleteFile(
GenericDeleteFile toCopy, boolean copyStats, Set<Integer> requestedColumnIds) {
super(toCopy, copyStats, requestedColumnIds);
}

/** Constructor for Java serialization. */
GenericDeleteFile() {}

@Override
public DeleteFile copyWithoutStats() {
return new GenericDeleteFile(this, false /* drop stats */);
return new GenericDeleteFile(this, false /* drop stats */, null);
}

@Override
public DeleteFile copyWithStats(Set<Integer> requestedColumnIds) {
return new GenericDeleteFile(this, true, requestedColumnIds);
}

@Override
public DeleteFile copy() {
return new GenericDeleteFile(this, true /* full copy */);
return new GenericDeleteFile(this, true /* full copy */, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ public CloseableIterable<FileScanTask> planFiles() {
snapshotIds.contains(manifestEntry.snapshotId())
&& manifestEntry.status() == ManifestEntry.Status.ADDED)
.specsById(table().specs())
.ignoreDeleted();
.ignoreDeleted()
.columnsToKeepStats(columnsToKeepStats());

if (shouldIgnoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
Expand Down
Loading

0 comments on commit 642d47d

Please sign in to comment.