From dc9cb1937427ae4d88764004af9d897ad7294739 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Thu, 5 Dec 2024 13:39:04 -0500 Subject: [PATCH] PARQUET-34: implement Size() filter for repeated columns --- .../filter2/predicate/ContainsRewriter.java | 6 + .../parquet/filter2/predicate/FilterApi.java | 5 + .../filter2/predicate/FilterPredicate.java | 5 + .../predicate/LogicalInverseRewriter.java | 6 + .../filter2/predicate/LogicalInverter.java | 14 ++ .../parquet/filter2/predicate/Operators.java | 76 +++++++ .../SchemaCompatibilityValidator.java | 25 ++- .../IncrementallyUpdatedFilterPredicate.java | 69 ++++++ ...allyUpdatedFilterPredicateBuilderBase.java | 18 ++ .../columnindex/ColumnIndexBuilder.java | 6 + .../columnindex/ColumnIndexFilter.java | 6 + .../predicate/TestLogicalInverter.java | 10 + .../TestSchemaCompatibilityValidator.java | 43 ++++ .../columnindex/TestColumnIndexFilter.java | 9 + ...ntallyUpdatedFilterPredicateGenerator.java | 32 +++ .../bloomfilterlevel/BloomFilterImpl.java | 5 + .../dictionarylevel/DictionaryFilter.java | 33 +++ .../statisticslevel/StatisticsFilter.java | 66 ++++++ .../dictionarylevel/DictionaryFilterTest.java | 26 +++ .../recordlevel/TestRecordLevelFilters.java | 41 +++- .../statisticslevel/TestStatisticsFilter.java | 207 +++++++++++++++++- 21 files changed, 696 insertions(+), 12 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ContainsRewriter.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ContainsRewriter.java index 5050ee8f01..ba999fb1d0 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ContainsRewriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ContainsRewriter.java @@ -33,6 +33,7 @@ import org.apache.parquet.filter2.predicate.Operators.NotEq; import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; +import org.apache.parquet.filter2.predicate.Operators.Size; import org.apache.parquet.filter2.predicate.Operators.UserDefined; /** @@ -97,6 +98,11 @@ public > FilterPredicate visit(Contains contains) { return contains; } + @Override + public FilterPredicate visit(Size size) { + return size; + } + @Override public FilterPredicate visit(And and) { final FilterPredicate left; diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java index 3c51680667..787f0cf90c 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterApi.java @@ -40,6 +40,7 @@ import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; import org.apache.parquet.filter2.predicate.Operators.SingleColumnFilterPredicate; +import org.apache.parquet.filter2.predicate.Operators.Size; import org.apache.parquet.filter2.predicate.Operators.SupportsEqNotEq; import org.apache.parquet.filter2.predicate.Operators.SupportsLtGt; import org.apache.parquet.filter2.predicate.Operators.UserDefined; @@ -263,6 +264,10 @@ public static , P extends SingleColumnFilterPredicate return Contains.of(pred); } + public static Size size(Column column, Size.Operator operator, int value) { + return new Size(column, operator, value); + } + /** * Keeps records that pass the provided {@link UserDefinedPredicate} *

diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java index a662bb0b17..5e75a3bc6a 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/FilterPredicate.java @@ -31,6 +31,7 @@ import org.apache.parquet.filter2.predicate.Operators.NotEq; import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; +import org.apache.parquet.filter2.predicate.Operators.Size; import org.apache.parquet.filter2.predicate.Operators.UserDefined; /** @@ -89,6 +90,10 @@ default > R visit(Contains contains) { throw new UnsupportedOperationException("visit Contains is not supported."); } + default R visit(Size size) { + throw new UnsupportedOperationException("visit Size is not supported."); + } + R visit(And and); R visit(Or or); diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java index d1d7f07e80..d4817460c6 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverseRewriter.java @@ -36,6 +36,7 @@ import org.apache.parquet.filter2.predicate.Operators.NotEq; import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; +import org.apache.parquet.filter2.predicate.Operators.Size; import org.apache.parquet.filter2.predicate.Operators.UserDefined; /** @@ -104,6 +105,11 @@ public > FilterPredicate visit(Contains contains) { return contains; } + @Override + public FilterPredicate visit(Size size) { + return size; + } + @Override public FilterPredicate visit(And and) { return and(and.getLeft().accept(this), and.getRight().accept(this)); diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java index 506b8f0e56..9ffbae069d 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/LogicalInverter.java @@ -33,6 +33,7 @@ import org.apache.parquet.filter2.predicate.Operators.NotEq; import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; +import org.apache.parquet.filter2.predicate.Operators.Size; import org.apache.parquet.filter2.predicate.Operators.UserDefined; /** @@ -98,6 +99,19 @@ public > FilterPredicate visit(Contains contains) { return contains.not(); } + @Override + public FilterPredicate visit(Size size) { + final long value = size.getValue(); + final Operators.Column column = size.getColumn(); + + return size.filter( + (eq) -> new Or(new Size(column, Size.Operator.LT, value), new Size(column, Size.Operator.GT, value)), + (lt) -> new Size(column, Size.Operator.GTE, value), + (ltEq) -> new Size(column, Size.Operator.GT, value), + (gt) -> new Size(column, Size.Operator.LTE, value), + (gtEq) -> new Size(column, Size.Operator.LT, value)); + } + @Override public FilterPredicate visit(And and) { return new Or(and.getLeft().accept(this), and.getRight().accept(this)); diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java index 60dc80cd7b..16df708c54 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/Operators.java @@ -505,6 +505,82 @@ public R filter( } } + public static final class Size implements FilterPredicate, Serializable { + public enum Operator { + EQ, + LT, + LTE, + GT, + GTE + } + + private final Column column; + private final Operator operator; + private final long value; + + Size(Column column, Operator operator, long value) { + this.column = column; + this.operator = operator; + if (value < 0) { + throw new IllegalArgumentException("Argument to size() operator cannot be negative: " + value); + } + this.value = value; + } + + @Override + public R accept(Visitor visitor) { + return visitor.visit(this); + } + + public long getValue() { + return value; + } + + public Column getColumn() { + return column; + } + + public R filter( + Function onEq, + Function onLt, + Function onLtEq, + Function onGt, + Function onGtEq) { + if (operator == Operator.EQ) { + return onEq.apply(value); + } else if (operator == Operator.LT) { + return onLt.apply(value); + } else if (operator == Operator.LTE) { + return onLtEq.apply(value); + } else if (operator == Operator.GT) { + return onGt.apply(value); + } else if (operator == Operator.GTE) { + return onGtEq.apply(value); + } else { + throw new UnsupportedOperationException("Operator " + operator + " cannot be used with size() filter"); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + return column.equals(((Size) o).column) && operator == ((Size) o).operator && value == ((Size) o).value; + } + + @Override + public int hashCode() { + return Objects.hash(column, operator, value); + } + + @Override + public String toString() { + String name = Size.class.getSimpleName().toLowerCase(Locale.ENGLISH); + return name + "(" + operator.toString().toLowerCase() + " " + value + ")"; + } + } + public static final class NotIn> extends SetColumnFilterPredicate { NotIn(Column column, Set values) { diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java index b5708a4a0c..4b94d20b13 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java @@ -38,9 +38,11 @@ import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; import org.apache.parquet.filter2.predicate.Operators.SetColumnFilterPredicate; +import org.apache.parquet.filter2.predicate.Operators.Size; import org.apache.parquet.filter2.predicate.Operators.UserDefined; import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; /** * Inspects the column types found in the provided {@link FilterPredicate} and compares them @@ -135,6 +137,12 @@ public > Void visit(Contains pred) { return null; } + @Override + public Void visit(Size size) { + validateColumn(size.getColumn(), true, true); + return null; + } + @Override public Void visit(And and) { and.getLeft().accept(this); @@ -175,14 +183,15 @@ private > void validateColumnFilterPredicate(SetColumnFi } private > void validateColumnFilterPredicate(Contains pred) { - validateColumn(pred.getColumn(), true); + validateColumn(pred.getColumn(), true, false); } private > void validateColumn(Column column) { - validateColumn(column, false); + validateColumn(column, false, false); } - private > void validateColumn(Column column, boolean shouldBeRepeated) { + private > void validateColumn( + Column column, boolean isRepeatedColumn, boolean mustBeRequired) { ColumnPath path = column.getColumnPath(); Class alreadySeen = columnTypesEncountered.get(path); @@ -204,15 +213,21 @@ private > void validateColumn(Column column, boolean return; } - if (shouldBeRepeated && descriptor.getMaxRepetitionLevel() == 0) { + if (isRepeatedColumn && descriptor.getMaxRepetitionLevel() == 0) { throw new IllegalArgumentException( "FilterPredicate for column " + path.toDotString() + " requires a repeated " + "schema, but found max repetition level " + descriptor.getMaxRepetitionLevel()); - } else if (!shouldBeRepeated && descriptor.getMaxRepetitionLevel() > 0) { + } else if (!isRepeatedColumn && descriptor.getMaxRepetitionLevel() > 0) { throw new IllegalArgumentException("FilterPredicates do not currently support repeated columns. " + "Column " + path.toDotString() + " is repeated."); } + if (mustBeRequired && descriptor.getPrimitiveType().isRepetition(Type.Repetition.OPTIONAL)) { + throw new IllegalArgumentException("FilterPredicate for column " + path.toDotString() + + " requires schema to have repetition REQUIRED, but found " + + descriptor.getPrimitiveType().getRepetition() + "."); + } + ValidTypeMap.assertTypeValid(column, descriptor.getType()); } diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java index c2aab2b6bf..96550f627f 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java @@ -20,6 +20,7 @@ import java.util.Arrays; import java.util.Objects; +import java.util.function.Function; import org.apache.parquet.io.api.Binary; /** @@ -223,6 +224,74 @@ public void reset() { } } + class CountingValueInspector extends ValueInspector { + private long observedValueCount; + private final ValueInspector delegate; + private final Function shouldUpdateDelegate; + + public CountingValueInspector(ValueInspector delegate, Function shouldUpdateDelegate) { + this.observedValueCount = 0; + this.delegate = delegate; + this.shouldUpdateDelegate = shouldUpdateDelegate; + } + + @Override + public void updateNull() { + delegate.update(observedValueCount); + if (!delegate.isKnown()) { + delegate.updateNull(); + } + setResult(delegate.getResult()); + } + + @Override + public void update(int value) { + incrementCount(); + } + + @Override + public void update(long value) { + incrementCount(); + } + + @Override + public void update(double value) { + incrementCount(); + } + + @Override + public void update(float value) { + incrementCount(); + } + + @Override + public void update(boolean value) { + incrementCount(); + } + + @Override + public void update(Binary value) { + incrementCount(); + } + + @Override + public void reset() { + super.reset(); + delegate.reset(); + observedValueCount = 0; + } + + private void incrementCount() { + observedValueCount++; + if (!delegate.isKnown() && shouldUpdateDelegate.apply(observedValueCount)) { + delegate.update(observedValueCount); + if (delegate.isKnown()) { + setResult(delegate.getResult()); + } + } + } + } + // base class for and / or abstract static class BinaryLogical implements IncrementallyUpdatedFilterPredicate { private final IncrementallyUpdatedFilterPredicate left; diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java index 588d06300b..d820945722 100644 --- a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java @@ -25,8 +25,10 @@ import java.util.List; import java.util.Map; import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.filter2.predicate.FilterApi; import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.filter2.predicate.FilterPredicate.Visitor; +import org.apache.parquet.filter2.predicate.Operators; import org.apache.parquet.filter2.predicate.Operators.And; import org.apache.parquet.filter2.predicate.Operators.Not; import org.apache.parquet.filter2.predicate.Operators.Or; @@ -34,6 +36,8 @@ import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.io.PrimitiveColumnIO; import org.apache.parquet.schema.PrimitiveComparator; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; /** * The implementation of this abstract class is auto-generated by @@ -56,6 +60,8 @@ */ public abstract class IncrementallyUpdatedFilterPredicateBuilderBase implements Visitor { + static final Operators.LongColumn SIZE_PSUEDOCOLUMN = FilterApi.longColumn("SIZE"); + private boolean built = false; private final Map> valueInspectorsByColumn = new HashMap<>(); private final Map> comparatorsByColumn = new HashMap<>(); @@ -70,6 +76,13 @@ public IncrementallyUpdatedFilterPredicateBuilderBase(List le PrimitiveComparator comparator = descriptor.getPrimitiveType().comparator(); comparatorsByColumn.put(path, comparator); } + comparatorsByColumn.put( + SIZE_PSUEDOCOLUMN.getColumnPath(), + new PrimitiveType( + Type.Repetition.REQUIRED, + PrimitiveType.PrimitiveTypeName.INT64, + SIZE_PSUEDOCOLUMN.getColumnPath().toDotString()) + .comparator()); } public final IncrementallyUpdatedFilterPredicate build(FilterPredicate pred) { @@ -80,6 +93,11 @@ public final IncrementallyUpdatedFilterPredicate build(FilterPredicate pred) { } protected final void addValueInspector(ColumnPath columnPath, ValueInspector valueInspector) { + if (columnPath.equals(SIZE_PSUEDOCOLUMN.getColumnPath())) { + // do not add psuedocolumn to list of value inspectors + return; + } + List valueInspectors = valueInspectorsByColumn.get(columnPath); if (valueInspectors == null) { valueInspectors = new ArrayList<>(); diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java index ffbb82197b..7468b51ac0 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java @@ -53,6 +53,7 @@ import org.apache.parquet.filter2.predicate.Operators.NotEq; import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; +import org.apache.parquet.filter2.predicate.Operators.Size; import org.apache.parquet.filter2.predicate.Operators.UserDefined; import org.apache.parquet.filter2.predicate.UserDefinedPredicate; import org.apache.parquet.io.api.Binary; @@ -378,6 +379,11 @@ public > PrimitiveIterator.OfInt visit(Contains conta indices -> IndexIterator.all(getPageCount())); } + @Override + public PrimitiveIterator.OfInt visit(Size size) { + return IndexIterator.all(getPageCount()); + } + @Override public , U extends UserDefinedPredicate> PrimitiveIterator.OfInt visit( UserDefined udp) { diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java index 8b6ee1f95d..ad07f3df89 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java @@ -39,6 +39,7 @@ import org.apache.parquet.filter2.predicate.Operators.Not; import org.apache.parquet.filter2.predicate.Operators.NotEq; import org.apache.parquet.filter2.predicate.Operators.Or; +import org.apache.parquet.filter2.predicate.Operators.Size; import org.apache.parquet.filter2.predicate.Operators.UserDefined; import org.apache.parquet.filter2.predicate.UserDefinedPredicate; import org.apache.parquet.hadoop.metadata.ColumnPath; @@ -161,6 +162,11 @@ public > RowRanges visit(Contains contains) { return contains.filter(this, RowRanges::intersection, RowRanges::union, ranges -> allRows()); } + @Override + public RowRanges visit(Size size) { + return applyPredicate(size.getColumn(), ci -> ci.visit(size), allRows()); + } + @Override public , U extends UserDefinedPredicate> RowRanges visit(UserDefined udp) { return applyPredicate( diff --git a/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestLogicalInverter.java b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestLogicalInverter.java index 6a7f81a6c2..dde742bbc8 100644 --- a/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestLogicalInverter.java +++ b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestLogicalInverter.java @@ -29,8 +29,10 @@ import static org.apache.parquet.filter2.predicate.FilterApi.not; import static org.apache.parquet.filter2.predicate.FilterApi.notEq; import static org.apache.parquet.filter2.predicate.FilterApi.or; +import static org.apache.parquet.filter2.predicate.FilterApi.size; import static org.apache.parquet.filter2.predicate.FilterApi.userDefined; import static org.apache.parquet.filter2.predicate.LogicalInverter.invert; +import static org.apache.parquet.filter2.predicate.Operators.Size.Operator; import static org.junit.Assert.assertEquals; import org.apache.parquet.filter2.predicate.Operators.DoubleColumn; @@ -87,5 +89,13 @@ public void testBaseCases() { @Test public void testComplex() { assertEquals(complexInverse, invert(complex)); + + assertEquals( + or(size(intColumn, Operator.LT, 5), size(intColumn, Operator.GT, 5)), + invert(size(intColumn, Operator.EQ, 5))); + assertEquals(size(intColumn, Operator.GTE, 5), invert(size(intColumn, Operator.LT, 5))); + assertEquals(size(intColumn, Operator.GT, 5), invert(size(intColumn, Operator.LTE, 5))); + assertEquals(size(intColumn, Operator.LTE, 5), invert(size(intColumn, Operator.GT, 5))); + assertEquals(size(intColumn, Operator.LT, 5), invert(size(intColumn, Operator.GTE, 5))); } } diff --git a/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestSchemaCompatibilityValidator.java b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestSchemaCompatibilityValidator.java index 47e9bdd5e6..8a44504adb 100644 --- a/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestSchemaCompatibilityValidator.java +++ b/parquet-column/src/test/java/org/apache/parquet/filter2/predicate/TestSchemaCompatibilityValidator.java @@ -29,6 +29,7 @@ import static org.apache.parquet.filter2.predicate.FilterApi.not; import static org.apache.parquet.filter2.predicate.FilterApi.notEq; import static org.apache.parquet.filter2.predicate.FilterApi.or; +import static org.apache.parquet.filter2.predicate.FilterApi.size; import static org.apache.parquet.filter2.predicate.FilterApi.userDefined; import static org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator.validate; import static org.junit.Assert.assertEquals; @@ -47,6 +48,8 @@ public class TestSchemaCompatibilityValidator { private static final LongColumn longBar = longColumn("x.bar"); private static final IntColumn intBar = intColumn("x.bar"); private static final LongColumn lotsOfLongs = longColumn("lotsOfLongs"); + private static final BinaryColumn threeLevelList = binaryColumn("nestedGroup.threeLevelList.list.element"); + private static final BinaryColumn listOfOptionals = binaryColumn("listOfOptionals.list.element"); private static final String schemaString = "message Document {\n" + " required int32 a;\n" @@ -54,6 +57,18 @@ public class TestSchemaCompatibilityValidator { + " required binary c (UTF8);\n" + " required group x { required int32 bar; }\n" + " repeated int64 lotsOfLongs;\n" + + " optional group nestedGroup {\n" + + " required group threeLevelList (LIST) {\n" + + " repeated group list {\n" + + " required binary element (STRING);\n" + + " }\n" + + " }\n" + + " }\n" + + " required group listOfOptionals (LIST) {\n" + + " repeated group list {\n" + + " optional binary element (STRING);\n" + + " }\n" + + " }\n" + "}\n"; private static final MessageType schema = MessageTypeParser.parseMessageType(schemaString); @@ -160,4 +175,32 @@ public void testNonRepeatedNotSupportedForContainsPredicates() { e.getMessage()); } } + + @Test + public void testSizePredicate() { + // Size predicate should succeed on repeated columns + try { + validate(size(lotsOfLongs, Operators.Size.Operator.LT, 10), schema); + validate(size(threeLevelList, Operators.Size.Operator.LT, 10), schema); + } catch (IllegalArgumentException e) { + fail("Valid repeated column predicates should not throw exceptions, but threw " + e); + } + + // Size predicate should fail on non-repeated columns and on non-optional list element types + try { + validate(size(intBar, Operators.Size.Operator.LT, 10), schema); + } catch (IllegalArgumentException e) { + assertEquals( + "FilterPredicate for column x.bar requires a repeated schema, but found max repetition level 0", + e.getMessage()); + } + + try { + validate(size(listOfOptionals, Operators.Size.Operator.LT, 10), schema); + } catch (IllegalArgumentException e) { + assertEquals( + "FilterPredicate for column listOfOptionals.list.element requires schema to have repetition REQUIRED, but found OPTIONAL.", + e.getMessage()); + } + } } diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java b/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java index 1574ce2474..f9c4e24b21 100644 --- a/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java +++ b/parquet-column/src/test/java/org/apache/parquet/internal/filter2/columnindex/TestColumnIndexFilter.java @@ -35,6 +35,7 @@ import static org.apache.parquet.filter2.predicate.FilterApi.notEq; import static org.apache.parquet.filter2.predicate.FilterApi.notIn; import static org.apache.parquet.filter2.predicate.FilterApi.or; +import static org.apache.parquet.filter2.predicate.FilterApi.size; import static org.apache.parquet.filter2.predicate.FilterApi.userDefined; import static org.apache.parquet.filter2.predicate.LogicalInverter.invert; import static org.apache.parquet.internal.column.columnindex.BoundaryOrder.ASCENDING; @@ -62,6 +63,7 @@ import java.util.stream.LongStream; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.predicate.Operators; import org.apache.parquet.filter2.predicate.Statistics; import org.apache.parquet.filter2.predicate.UserDefinedPredicate; import org.apache.parquet.hadoop.metadata.ColumnPath; @@ -970,6 +972,13 @@ public void testFiltering() { 11, 12, 13); + assertRows( + calculateRowRanges( + FilterCompat.get(size(intColumn("column6"), Operators.Size.Operator.GT, 5)), + STORE, + paths, + TOTAL_ROW_COUNT), + LongStream.range(0, 30).toArray()); } @Test diff --git a/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java b/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java index 7f66ce3821..a2eb218fc2 100644 --- a/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java +++ b/parquet-generator/src/main/java/org/apache/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java @@ -71,6 +71,7 @@ public void run() throws IOException { + "import java.util.Set;\n" + "\n" + "import org.apache.parquet.hadoop.metadata.ColumnPath;\n" + + "import org.apache.parquet.filter2.predicate.FilterApi;\n" + "import org.apache.parquet.filter2.predicate.FilterPredicate;\n" + "import org.apache.parquet.filter2.predicate.Operators;\n" + "import org.apache.parquet.filter2.predicate.Operators.Contains;\n" @@ -83,6 +84,7 @@ public void run() throws IOException { + "import org.apache.parquet.filter2.predicate.Operators.LtEq;\n" + "import org.apache.parquet.filter2.predicate.Operators.NotEq;\n" + "import org.apache.parquet.filter2.predicate.Operators.NotIn;\n" + + "import org.apache.parquet.filter2.predicate.Operators.Size;\n" + "import org.apache.parquet.filter2.predicate.Operators.UserDefined;\n" + "import org.apache.parquet.filter2.predicate.UserDefinedPredicate;\n" + "import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;\n" @@ -133,6 +135,8 @@ public void run() throws IOException { addContainsEnd(); addVisitEnd(); + addSizeCase(); + addVisitBegin("Lt"); for (TypeInfo info : TYPES) { addInequalityCase(info, "<", false); @@ -338,6 +342,30 @@ private void addUdpBegin() throws IOException { + "\n"); } + private void addSizeCase() throws IOException { + add(" @Override\n" + " public IncrementallyUpdatedFilterPredicate visit(Size size) {\n" + + " final ValueInspector delegate = (ValueInspector) size.filter(\n" + + " (onEq) -> visit(FilterApi.eq(SIZE_PSUEDOCOLUMN, onEq)),\n" + + " (lt) -> visit(FilterApi.lt(SIZE_PSUEDOCOLUMN, lt)),\n" + + " (lte) -> visit(FilterApi.ltEq(SIZE_PSUEDOCOLUMN, lte)),\n" + + " (gt) -> visit(FilterApi.gt(SIZE_PSUEDOCOLUMN, gt)),\n" + + " (gte) -> visit(FilterApi.gtEq(SIZE_PSUEDOCOLUMN, gte)));\n" + + "\n" + + " final ValueInspector valueInspector = new IncrementallyUpdatedFilterPredicate.CountingValueInspector(\n" + + " delegate,\n" + + " size.filter(\n" + + " (eqValue) -> (count) -> count > eqValue,\n" + + " (ltValue) -> (count) -> count >= ltValue,\n" + + " (lteValue) -> (count) -> count > lteValue,\n" + + " (gtValue) -> (count) -> count > gtValue,\n" + + " (gteValue) -> (count) -> count >= gteValue)\n" + + " );\n" + + "\n" + + " addValueInspector(size.getColumn().getColumnPath(), valueInspector);\n" + + " return valueInspector;\n" + + " }\n"); + } + private void addContainsInspectorVisitor(String op) throws IOException { add(" @Override\n" + " public > ContainsPredicate visit(" + op + " pred) {\n" @@ -499,6 +527,10 @@ private void addContainsBegin() throws IOException { + " @Override\n" + " public > ContainsPredicate visit(Contains contains) {\n" + " return contains.filter(this, ContainsAndPredicate::new, ContainsOrPredicate::new, ContainsPredicate::not);\n" + + " }\n" + "\n" + + " @Override\n" + + " public ContainsPredicate visit(Size size) {\n" + + " throw new UnsupportedOperationException(\"Unsupported predicate \" + size + \" cannot be used with contains()\");\n" + " }\n"); addContainsInspectorVisitor("Eq"); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/bloomfilterlevel/BloomFilterImpl.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/bloomfilterlevel/BloomFilterImpl.java index 39babc0ac3..4ceb6c7bb3 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/bloomfilterlevel/BloomFilterImpl.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/bloomfilterlevel/BloomFilterImpl.java @@ -124,6 +124,11 @@ public > Boolean visit(Operators.Contains contains) { return contains.filter(this, (l, r) -> l || r, (l, r) -> l && r, v -> BLOCK_MIGHT_MATCH); } + @Override + public Boolean visit(Operators.Size size) { + return BLOCK_MIGHT_MATCH; + } + @Override public > Boolean visit(Operators.In in) { Set values = in.getValues(); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java index be4455eeba..27024f66c8 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java @@ -48,6 +48,7 @@ import org.apache.parquet.filter2.predicate.Operators.NotEq; import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; +import org.apache.parquet.filter2.predicate.Operators.Size; import org.apache.parquet.filter2.predicate.Operators.UserDefined; import org.apache.parquet.filter2.predicate.UserDefinedPredicate; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; @@ -493,6 +494,38 @@ public > Boolean visit(Contains contains) { return contains.filter(this, (l, r) -> l || r, (l, r) -> l && r, v -> BLOCK_MIGHT_MATCH); } + @Override + public Boolean visit(Size size) { + ColumnChunkMetaData meta = getColumnChunk(size.getColumn().getColumnPath()); + + if (meta == null) { + // the column isn't in this file, so fail eq/gt/gte targeting size > 0 + final boolean blockCannotMatch = + size.filter((eq) -> eq > 0, (lt) -> false, (lte) -> false, (gt) -> gt >= 0, (gte) -> gte > 0); + return blockCannotMatch ? BLOCK_CANNOT_MATCH : BLOCK_MIGHT_MATCH; + } + + try { + // We know the block has at most `dictSize` array element values + final Set dict = expandDictionary(meta); + if (dict == null) { + return BLOCK_MIGHT_MATCH; + } + int dictSize = dict.size(); + final boolean blockCannotMatch = size.filter( + (eq) -> eq > dictSize, + (lt) -> false, + (lte) -> false, + (gt) -> gt >= dictSize, + (gte) -> gte > dictSize); + return blockCannotMatch ? BLOCK_CANNOT_MATCH : BLOCK_MIGHT_MATCH; + } catch (IOException e) { + LOG.warn("Failed to process dictionary for filter evaluation.", e); + } + + return BLOCK_MIGHT_MATCH; + } + @Override public Boolean visit(And and) { return and.getLeft().accept(this) || and.getRight().accept(this); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java index 4d7918c4f1..793fb0c8be 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java @@ -24,6 +24,7 @@ import java.util.Objects; import java.util.Set; import org.apache.parquet.column.MinMax; +import org.apache.parquet.column.statistics.SizeStatistics; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.filter2.predicate.Operators.And; @@ -40,6 +41,7 @@ import org.apache.parquet.filter2.predicate.Operators.NotEq; import org.apache.parquet.filter2.predicate.Operators.NotIn; import org.apache.parquet.filter2.predicate.Operators.Or; +import org.apache.parquet.filter2.predicate.Operators.Size; import org.apache.parquet.filter2.predicate.Operators.UserDefined; import org.apache.parquet.filter2.predicate.UserDefinedPredicate; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; @@ -217,6 +219,70 @@ public > Boolean visit(Contains contains) { return contains.filter(this, (l, r) -> l || r, (l, r) -> l && r, v -> BLOCK_MIGHT_MATCH); } + @Override + public Boolean visit(Size size) { + final ColumnChunkMetaData metadata = getColumnChunk(size.getColumn().getColumnPath()); + if (metadata == null) { + // the column isn't in this file, so fail eq/gt/gte targeting size > 0 + final boolean blockCannotMatch = + size.filter((eq) -> eq > 0, (lt) -> false, (lte) -> false, (gt) -> gt >= 0, (gte) -> gte > 0); + return blockCannotMatch ? BLOCK_CANNOT_MATCH : BLOCK_MIGHT_MATCH; + } + + final SizeStatistics stats = metadata.getSizeStatistics(); + final List repetitionLevelHistogram = stats.getRepetitionLevelHistogram(); + final List definitionLevelHistogram = stats.getDefinitionLevelHistogram(); + + if (repetitionLevelHistogram.isEmpty() || definitionLevelHistogram.isEmpty()) { + return BLOCK_MIGHT_MATCH; + } + + // If all values have repetition level 0, then no array has more than 1 element + if (repetitionLevelHistogram.size() == 1 + || repetitionLevelHistogram.subList(1, repetitionLevelHistogram.size()).stream() + .allMatch(l -> l == 0)) { + + // Null list fields are treated as having size 0 + if (( // all lists are nulls + definitionLevelHistogram.subList(1, definitionLevelHistogram.size()).stream() + .allMatch(l -> l == 0)) + || // all lists are size 0 + (definitionLevelHistogram.get(0) == 0 + && definitionLevelHistogram.subList(2, definitionLevelHistogram.size()).stream() + .allMatch(l -> l == 0))) { + + final boolean blockCannotMatch = + size.filter((eq) -> eq > 0, (lt) -> false, (lte) -> false, (gt) -> gt >= 0, (gte) -> gte > 0); + return blockCannotMatch ? BLOCK_CANNOT_MATCH : BLOCK_MIGHT_MATCH; + } + + long maxDefinitionLevel = definitionLevelHistogram.get(definitionLevelHistogram.size() - 1); + + // If all repetition levels are zero and all definitions level are > MAX_DEFINITION_LEVEL - 1, all lists + // are of size 1 + if (definitionLevelHistogram.stream().allMatch(l -> l > maxDefinitionLevel - 1)) { + final boolean blockCannotMatch = size.filter( + (eq) -> eq != 1, (lt) -> lt <= 1, (lte) -> lte < 1, (gt) -> gt >= 1, (gte) -> gte > 1); + + return blockCannotMatch ? BLOCK_CANNOT_MATCH : BLOCK_MIGHT_MATCH; + } + } + long nonNullElementCount = + repetitionLevelHistogram.stream().mapToLong(l -> l).sum() - definitionLevelHistogram.get(0); + long numNonNullRecords = repetitionLevelHistogram.get(0) - definitionLevelHistogram.get(0); + + // Given the total number of elements and non-null fields, we can compute the max size of any array field + long maxArrayElementCount = 1 + (nonNullElementCount - numNonNullRecords); + final boolean blockCannotMatch = size.filter( + (eq) -> eq > maxArrayElementCount, + (lt) -> false, + (lte) -> false, + (gt) -> gt >= maxArrayElementCount, + (gte) -> gte > maxArrayElementCount); + + return blockCannotMatch ? BLOCK_CANNOT_MATCH : BLOCK_MIGHT_MATCH; + } + @Override @SuppressWarnings("unchecked") public > Boolean visit(NotEq notEq) { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java index 5b9e638d60..fa5f529612 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java @@ -39,6 +39,7 @@ import static org.apache.parquet.filter2.predicate.FilterApi.notEq; import static org.apache.parquet.filter2.predicate.FilterApi.notIn; import static org.apache.parquet.filter2.predicate.FilterApi.or; +import static org.apache.parquet.filter2.predicate.FilterApi.size; import static org.apache.parquet.filter2.predicate.FilterApi.userDefined; import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP; import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; @@ -506,6 +507,31 @@ public void testGtEqDouble() throws Exception { "Should not drop: contains matching values", canDrop(gtEq(d, Double.MIN_VALUE), ccmd, dictionaries)); } + @Test + public void testSizeBinary() throws Exception { + BinaryColumn b = binaryColumn("repeated_binary_field"); + + // DictionaryFilter knows that `repeated_binary_field` column has at most 26 element values + assertTrue(canDrop(size(b, Operators.Size.Operator.GT, 26), ccmd, dictionaries)); + assertTrue(canDrop(size(b, Operators.Size.Operator.GTE, 27), ccmd, dictionaries)); + assertTrue(canDrop(size(b, Operators.Size.Operator.EQ, 27), ccmd, dictionaries)); + + assertFalse(canDrop(size(b, Operators.Size.Operator.LT, 27), ccmd, dictionaries)); + assertFalse(canDrop(size(b, Operators.Size.Operator.LTE, 26), ccmd, dictionaries)); + assertFalse(canDrop(size(b, Operators.Size.Operator.EQ, 26), ccmd, dictionaries)); + + // If column doesn't exist in meta, it should be treated as having size 0 + BinaryColumn nonExistentColumn = binaryColumn("nonexistant_col"); + + assertTrue(canDrop(size(nonExistentColumn, Operators.Size.Operator.GT, 0), ccmd, dictionaries)); + assertTrue(canDrop(size(nonExistentColumn, Operators.Size.Operator.GTE, 1), ccmd, dictionaries)); + assertTrue(canDrop(size(nonExistentColumn, Operators.Size.Operator.EQ, 1), ccmd, dictionaries)); + + assertFalse(canDrop(size(nonExistentColumn, Operators.Size.Operator.LT, 1), ccmd, dictionaries)); + assertFalse(canDrop(size(nonExistentColumn, Operators.Size.Operator.LTE, 0), ccmd, dictionaries)); + assertFalse(canDrop(size(nonExistentColumn, Operators.Size.Operator.EQ, 0), ccmd, dictionaries)); + } + @Test public void testInBinary() throws Exception { BinaryColumn b = binaryColumn("binary_field"); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java index 1a1a31e73c..2ee785b383 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/TestRecordLevelFilters.java @@ -33,7 +33,9 @@ import static org.apache.parquet.filter2.predicate.FilterApi.notEq; import static org.apache.parquet.filter2.predicate.FilterApi.notIn; import static org.apache.parquet.filter2.predicate.FilterApi.or; +import static org.apache.parquet.filter2.predicate.FilterApi.size; import static org.apache.parquet.filter2.predicate.FilterApi.userDefined; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import com.google.common.collect.ImmutableMap; @@ -50,6 +52,7 @@ import org.apache.parquet.example.data.Group; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.filter2.predicate.Operators; import org.apache.parquet.filter2.predicate.Operators.BinaryColumn; import org.apache.parquet.filter2.predicate.Operators.DoubleColumn; import org.apache.parquet.filter2.predicate.Operators.LongColumn; @@ -159,10 +162,9 @@ private static void assertFilter(List found, UserFilter f) { private static void assertPredicate(FilterPredicate predicate, long... expectedIds) throws IOException { List found = PhoneBookWriter.readFile(phonebookFile, FilterCompat.get(predicate)); - assertEquals(expectedIds.length, found.size()); - for (int i = 0; i < expectedIds.length; i++) { - assertEquals(expectedIds[i], found.get(i).getLong("id", 0)); - } + assertArrayEquals( + Arrays.stream(expectedIds).boxed().toArray(), + found.stream().map(f -> f.getLong("id", 0)).toArray(Long[]::new)); } @Test @@ -410,6 +412,37 @@ public void testArrayContainsMixedColumns() throws Exception { 30L); } + @Test + public void testArraySizeRequiredColumn() throws Exception { + assertPredicate(size(longColumn("phoneNumbers.phone.number"), Operators.Size.Operator.EQ, 2), 27L); + + assertPredicate(size(longColumn("phoneNumbers.phone.number"), Operators.Size.Operator.EQ, 4), 28L); + + assertPredicate(size(longColumn("phoneNumbers.phone.number"), Operators.Size.Operator.GT, 1), 27L, 28L); + + assertPredicate(size(longColumn("phoneNumbers.phone.number"), Operators.Size.Operator.GTE, 4), 28L); + + assertPredicate(size(longColumn("phoneNumbers.phone.number"), Operators.Size.Operator.EQ, 0), 17L, 18L, 19L); + + assertPredicate( + size(longColumn("phoneNumbers.phone.number"), Operators.Size.Operator.LT, 2), + LongStream.concat(LongStream.of(17L, 18L, 19L, 20L, 30L, 31L), LongStream.range(100, 200)) + .toArray()); + + assertPredicate( + size(longColumn("phoneNumbers.phone.number"), Operators.Size.Operator.LTE, 2), + LongStream.concat(LongStream.of(17L, 18L, 19L, 20L, 27L, 30L, 31L), LongStream.range(100, 200)) + .toArray()); + + assertPredicate( + not(size(longColumn("phoneNumbers.phone.number"), Operators.Size.Operator.EQ, 1)), + 17L, + 18L, + 19L, + 27L, + 28L); + } + @Test public void testNameNotNull() throws Exception { BinaryColumn name = binaryColumn("name"); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java index 15d0a8ab13..b42d05f730 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java @@ -33,21 +33,35 @@ import static org.apache.parquet.filter2.predicate.FilterApi.notEq; import static org.apache.parquet.filter2.predicate.FilterApi.notIn; import static org.apache.parquet.filter2.predicate.FilterApi.or; +import static org.apache.parquet.filter2.predicate.FilterApi.size; import static org.apache.parquet.filter2.predicate.FilterApi.userDefined; import static org.apache.parquet.filter2.statisticslevel.StatisticsFilter.canDrop; +import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; +import static org.apache.parquet.hadoop.ParquetFileReader.readFooter; import static org.apache.parquet.io.api.Binary.fromString; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.Type.Repetition.REQUIRED; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import com.google.common.collect.ImmutableList; +import java.io.File; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.statistics.DoubleStatistics; import org.apache.parquet.column.statistics.IntStatistics; +import org.apache.parquet.column.statistics.SizeStatistics; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroup; import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.filter2.predicate.LogicalInverseRewriter; import org.apache.parquet.filter2.predicate.Operators; @@ -56,10 +70,18 @@ import org.apache.parquet.filter2.predicate.Operators.IntColumn; import org.apache.parquet.filter2.predicate.Statistics; import org.apache.parquet.filter2.predicate.UserDefinedPredicate; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.Types; import org.junit.Test; @@ -68,17 +90,27 @@ public class TestStatisticsFilter { private static ColumnChunkMetaData getIntColumnMeta( org.apache.parquet.column.statistics.Statistics stats, long valueCount) { + return getIntColumnMeta(ColumnPath.get("int", "column"), stats, null, valueCount); + } + + private static ColumnChunkMetaData getIntColumnMeta( + ColumnPath columnPath, + org.apache.parquet.column.statistics.Statistics stats, + SizeStatistics sizeStatistics, + long valueCount) { return ColumnChunkMetaData.get( - ColumnPath.get("int", "column"), - PrimitiveTypeName.INT32, + columnPath, + new PrimitiveType(REQUIRED, INT32, columnPath.toDotString()), CompressionCodecName.GZIP, + null, new HashSet(Arrays.asList(Encoding.PLAIN)), stats, 0L, 0L, valueCount, 0L, - 0L); + 0L, + sizeStatistics); } private static ColumnChunkMetaData getDoubleColumnMeta( @@ -100,6 +132,7 @@ private static ColumnChunkMetaData getDoubleColumnMeta( private static final DoubleColumn doubleColumn = doubleColumn("double.column"); private static final BinaryColumn missingColumn = binaryColumn("missing"); private static final IntColumn missingColumn2 = intColumn("missing.int"); + private static final IntColumn nestedListColumn = intColumn("nestedGroup.listField.element"); private static final IntStatistics intStats = new IntStatistics(); private static final IntStatistics nullIntStats = new IntStatistics(); @@ -435,6 +468,174 @@ public void testOr() { assertFalse(canDrop(or(no, no), columnMetas)); } + @Test + public void testSizeFilterRequiredGroupRequiredElements() throws Exception { + final IntStatistics minMaxStats = new IntStatistics(); + + // Case 1: Lists are populated + List columnMeta = Collections.singletonList(getIntColumnMeta( + nestedListColumn.getColumnPath(), + minMaxStats, + createSizeStatisticsForRepeatedField( + true, + ImmutableList.of( + ImmutableList.of(1, 2, 3), + ImmutableList.of(1), + ImmutableList.of(1, 2, 3), + ImmutableList.of())), + 4)); + + // SizeStats tells us that there are 7 total array elements spread across 3 non-empty list_fields, + // so the max size any single list_field could have is 5 + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 6), columnMeta)); + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GT, 5), columnMeta)); + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 6), columnMeta)); + + // These predicates should not be able to filter out the page + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LT, 5), columnMeta)); + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LTE, 3), columnMeta)); + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 5), columnMeta)); + + // Case 2: All lists are empty + columnMeta = Collections.singletonList(getIntColumnMeta( + nestedListColumn.getColumnPath(), + minMaxStats, + createSizeStatisticsForRepeatedField( + true, ImmutableList.of(ImmutableList.of(), ImmutableList.of(), ImmutableList.of())), + 3)); + + // These predicates should be able to filter out the page + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GT, 0), columnMeta)); + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 1), columnMeta)); + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 5), columnMeta)); + + // These predicates should not be able to filter out the page + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LTE, 1), columnMeta)); + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LT, 1), columnMeta)); + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 0), columnMeta)); + } + + @Test + public void testSizeFilterRequiredGroupOptionalElements() throws Exception { + final IntStatistics minMaxStats = new IntStatistics(); + + // Case 1: List is non-empty + List listWithNulls = new ArrayList<>(); + listWithNulls.add(1); + listWithNulls.add(null); + listWithNulls.add(null); + List columnMeta = Collections.singletonList(getIntColumnMeta( + nestedListColumn.getColumnPath(), + minMaxStats, + createSizeStatisticsForRepeatedField( + true, + ImmutableList.of( + listWithNulls, ImmutableList.of(1), ImmutableList.of(1, 2, 3), ImmutableList.of())), + 4)); + + // These predicates should be able to filter out the page + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 6), columnMeta)); + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GT, 5), columnMeta)); + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 6), columnMeta)); + + // These predicates should not be able to filter out the page + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LT, 5), columnMeta)); + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LTE, 3), columnMeta)); + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 5), columnMeta)); + } + + @Test + public void testSizeFilterOptionalGroup() throws Exception { + final IntStatistics minMaxStats = new IntStatistics(); + + // Case 1: List is non-null + List columnMeta = Collections.singletonList(getIntColumnMeta( + nestedListColumn.getColumnPath(), + minMaxStats, + createSizeStatisticsForRepeatedField( + false, + ImmutableList.of(ImmutableList.of(1, 2, 3), ImmutableList.of(1), ImmutableList.of(1, 2, 3))), + 3)); + + // These predicates should be able to filter out the page + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 6), columnMeta)); + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GT, 5), columnMeta)); + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 6), columnMeta)); + + // These predicates should not be able to filter out the page + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LT, 5), columnMeta)); + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LTE, 3), columnMeta)); + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 5), columnMeta)); + + // Case 2: List is null + columnMeta = Collections.singletonList(getIntColumnMeta( + nestedListColumn.getColumnPath(), + minMaxStats, + createSizeStatisticsForRepeatedField(true, ImmutableList.of()), + 5)); + + // These predicates should be able to filter out the page + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GT, 0), columnMeta)); + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 1), columnMeta)); + assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 5), columnMeta)); + + // These predicates should not be able to filter out the page + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LTE, 1), columnMeta)); + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LT, 1), columnMeta)); + assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 0), columnMeta)); + } + + private static SizeStatistics createSizeStatisticsForRepeatedField( + boolean arrayGroupRequired, List> arrayValues) throws Exception { + + final MessageType messageSchema = Types.buildMessage() + .addField(Types.requiredGroup() + .addField((arrayGroupRequired ? Types.requiredGroup() : Types.optionalGroup()) + .as(LogicalTypeAnnotation.listType()) + .addField(Types.repeatedGroup() + .addField( + Types.primitive(INT32, REQUIRED).named("element")) + .named("list")) + .named("listField")) + .named("nestedGroup")) + .named("MyRecord"); + + // Write data + final File tmp = File.createTempFile(TestStatisticsFilter.class.getSimpleName(), ".tmp"); + tmp.deleteOnExit(); + tmp.delete(); + final Path file = new Path(tmp.getPath()); + + try (ParquetWriter writer = ExampleParquetWriter.builder(file) + .config(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, messageSchema.toString()) + .build()) { + + final SimpleGroup record = new SimpleGroup(messageSchema); + final Group nestedGroup = record.addGroup("nestedGroup"); + + for (List arrayValue : arrayValues) { + if (arrayValue != null) { + Group listField = nestedGroup.addGroup("listField"); + for (Integer value : arrayValue) { + Group list = listField.addGroup("list"); + if (value != null) { + list.append("element", value); + } + } + } + } + + writer.write(record); + } + + // Read size statistics + final ParquetMetadata footer = readFooter(new Configuration(), file, NO_FILTER); + assert (footer.getBlocks().size() == 1); + final BlockMetaData blockMetaData = footer.getBlocks().get(0); + assert (blockMetaData.getColumns().size() == 1); + return blockMetaData.getColumns().get(0).getSizeStatistics(); + } + public static class SevensAndEightsUdp extends UserDefinedPredicate { @Override