diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ColumnWriter.java b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ColumnWriter.java index 3fec6b9c2f8b5..1e25b0da2631f 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ColumnWriter.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/ColumnWriter.java @@ -21,7 +21,6 @@ import java.util.Optional; import java.util.OptionalInt; -import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; public interface ColumnWriter @@ -51,7 +50,6 @@ public BufferData(List data, OptionalInt dictionaryPageSize, this.dictionaryPageSize = requireNonNull(dictionaryPageSize, "dictionaryPageSize is null"); this.bloomFilter = requireNonNull(bloomFilter, "bloomFilter is null"); this.metaData = requireNonNull(metaData, "metaData is null"); - checkArgument(dictionaryPageSize.isEmpty() || bloomFilter.isEmpty(), "dictionaryPagesSize and bloomFilter cannot both be set"); } public ColumnMetaData getMetaData() diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/PrimitiveColumnWriter.java b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/PrimitiveColumnWriter.java index 2a8ed65e9f76c..6715be46ecec5 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/PrimitiveColumnWriter.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/PrimitiveColumnWriter.java @@ -27,6 +27,7 @@ import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.EncodingStats; import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.column.values.bloomfilter.BloomFilter; @@ -51,6 +52,7 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.airlift.slice.SizeOf.instanceSize; +import static io.trino.parquet.ParquetMetadataConverter.convertEncodingStats; import static io.trino.parquet.ParquetMetadataConverter.getEncoding; import static io.trino.parquet.writer.ParquetCompressor.getCompressor; import static io.trino.parquet.writer.ParquetDataOutput.createDataOutput; @@ -183,7 +185,16 @@ public List getBuffer() { checkState(closed); DataStreams dataStreams = getDataStreams(); - return ImmutableList.of(new BufferData(dataStreams.data(), dataStreams.dictionaryPageSize(), dataStreams.bloomFilter(), getColumnMetaData())); + ColumnMetaData columnMetaData = getColumnMetaData(); + + EncodingStats stats = convertEncodingStats(columnMetaData.getEncoding_stats()); + boolean isOnlyDictionaryEncodingPages = stats.hasDictionaryPages() && !stats.hasNonDictionaryEncodedPages(); + + return ImmutableList.of(new BufferData( + dataStreams.data(), + dataStreams.dictionaryPageSize(), + isOnlyDictionaryEncodingPages ? Optional.empty() : dataStreams.bloomFilter(), + columnMetaData)); } // Returns ColumnMetaData that offset is invalid diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/valuewriter/BloomFilterValuesWriter.java b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/valuewriter/BloomFilterValuesWriter.java index 0280bd202832a..2f1d44cafc2aa 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/valuewriter/BloomFilterValuesWriter.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/valuewriter/BloomFilterValuesWriter.java @@ -23,8 +23,6 @@ import java.util.Optional; -import static java.lang.Math.toIntExact; - public class BloomFilterValuesWriter extends ValuesWriter { @@ -54,7 +52,7 @@ public ValuesWriter getWriter() @Override public long getBufferedSize() { - return writer.getBufferedSize() + bloomFilter.getBitsetSize(); + return writer.getBufferedSize(); } @Override @@ -96,7 +94,7 @@ public void resetDictionary() @Override public long getAllocatedSize() { - return writer.getAllocatedSize(); + return writer.getAllocatedSize() + bloomFilter.getBitsetSize(); } @Override @@ -122,14 +120,14 @@ public void writeBytes(Binary v) public void writeInteger(int v) { writer.writeInteger(v); - bloomFilter.insertHash(bloomFilter.hash(toIntExact(((Number) v).longValue()))); + bloomFilter.insertHash(bloomFilter.hash(v)); } @Override public void writeLong(long v) { writer.writeLong(v); - bloomFilter.insertHash(bloomFilter.hash(((Number) v).longValue())); + bloomFilter.insertHash(bloomFilter.hash(v)); } @Override diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/valuewriter/TrinoValuesWriterFactory.java b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/valuewriter/TrinoValuesWriterFactory.java index 654c95013b4bb..bbcc966d27631 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/writer/valuewriter/TrinoValuesWriterFactory.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/writer/valuewriter/TrinoValuesWriterFactory.java @@ -66,38 +66,38 @@ private ValuesWriter getFixedLenByteArrayValuesWriter(ColumnDescriptor path, Opt private ValuesWriter getBinaryValuesWriter(ColumnDescriptor path, Optional bloomFilter) { - ValuesWriter fallbackWriter = createBloomFilterValuesWriter(new PlainValuesWriter(INITIAL_SLAB_SIZE, maxPageSize, new HeapByteBufferAllocator()), bloomFilter); - return dictWriterWithFallBack(path, getEncodingForDictionaryPage(), getEncodingForDataPage(), fallbackWriter); + ValuesWriter fallbackWriter = new PlainValuesWriter(INITIAL_SLAB_SIZE, maxPageSize, new HeapByteBufferAllocator()); + return createBloomFilterValuesWriter(dictWriterWithFallBack(path, getEncodingForDictionaryPage(), getEncodingForDataPage(), fallbackWriter), bloomFilter); } private ValuesWriter getInt32ValuesWriter(ColumnDescriptor path, Optional bloomFilter) { - ValuesWriter fallbackWriter = createBloomFilterValuesWriter(new PlainValuesWriter(INITIAL_SLAB_SIZE, maxPageSize, new HeapByteBufferAllocator()), bloomFilter); - return dictWriterWithFallBack(path, getEncodingForDictionaryPage(), getEncodingForDataPage(), fallbackWriter); + ValuesWriter fallbackWriter = new PlainValuesWriter(INITIAL_SLAB_SIZE, maxPageSize, new HeapByteBufferAllocator()); + return createBloomFilterValuesWriter(dictWriterWithFallBack(path, getEncodingForDictionaryPage(), getEncodingForDataPage(), fallbackWriter), bloomFilter); } private ValuesWriter getInt64ValuesWriter(ColumnDescriptor path, Optional bloomFilter) { - ValuesWriter fallbackWriter = createBloomFilterValuesWriter(new PlainValuesWriter(INITIAL_SLAB_SIZE, maxPageSize, new HeapByteBufferAllocator()), bloomFilter); - return dictWriterWithFallBack(path, getEncodingForDictionaryPage(), getEncodingForDataPage(), fallbackWriter); + ValuesWriter fallbackWriter = new PlainValuesWriter(INITIAL_SLAB_SIZE, maxPageSize, new HeapByteBufferAllocator()); + return createBloomFilterValuesWriter(dictWriterWithFallBack(path, getEncodingForDictionaryPage(), getEncodingForDataPage(), fallbackWriter), bloomFilter); } private ValuesWriter getInt96ValuesWriter(ColumnDescriptor path, Optional bloomFilter) { - ValuesWriter fallbackWriter = createBloomFilterValuesWriter(new FixedLenByteArrayPlainValuesWriter(12, INITIAL_SLAB_SIZE, maxPageSize, new HeapByteBufferAllocator()), bloomFilter); - return dictWriterWithFallBack(path, getEncodingForDictionaryPage(), getEncodingForDataPage(), fallbackWriter); + ValuesWriter fallbackWriter = new FixedLenByteArrayPlainValuesWriter(12, INITIAL_SLAB_SIZE, maxPageSize, new HeapByteBufferAllocator()); + return createBloomFilterValuesWriter(dictWriterWithFallBack(path, getEncodingForDictionaryPage(), getEncodingForDataPage(), fallbackWriter), bloomFilter); } private ValuesWriter getDoubleValuesWriter(ColumnDescriptor path, Optional bloomFilter) { - ValuesWriter fallbackWriter = createBloomFilterValuesWriter(new PlainValuesWriter(INITIAL_SLAB_SIZE, maxPageSize, new HeapByteBufferAllocator()), bloomFilter); - return dictWriterWithFallBack(path, getEncodingForDictionaryPage(), getEncodingForDataPage(), fallbackWriter); + ValuesWriter fallbackWriter = new PlainValuesWriter(INITIAL_SLAB_SIZE, maxPageSize, new HeapByteBufferAllocator()); + return createBloomFilterValuesWriter(dictWriterWithFallBack(path, getEncodingForDictionaryPage(), getEncodingForDataPage(), fallbackWriter), bloomFilter); } private ValuesWriter getFloatValuesWriter(ColumnDescriptor path, Optional bloomFilter) { - ValuesWriter fallbackWriter = createBloomFilterValuesWriter(new PlainValuesWriter(INITIAL_SLAB_SIZE, maxPageSize, new HeapByteBufferAllocator()), bloomFilter); - return dictWriterWithFallBack(path, getEncodingForDictionaryPage(), getEncodingForDataPage(), fallbackWriter); + ValuesWriter fallbackWriter = new PlainValuesWriter(INITIAL_SLAB_SIZE, maxPageSize, new HeapByteBufferAllocator()); + return createBloomFilterValuesWriter(dictWriterWithFallBack(path, getEncodingForDictionaryPage(), getEncodingForDataPage(), fallbackWriter), bloomFilter); } @SuppressWarnings("deprecation") diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/writer/AbstractColumnWriterBenchmark.java b/lib/trino-parquet/src/test/java/io/trino/parquet/writer/AbstractColumnWriterBenchmark.java index a41765a7f9538..7d4a225428056 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/writer/AbstractColumnWriterBenchmark.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/writer/AbstractColumnWriterBenchmark.java @@ -57,7 +57,7 @@ public abstract class AbstractColumnWriterBenchmark @Param({ "1", "1048576" // 1MB is default page size }) - public int dictionaryPageSize; + public int maxDictionaryPageSize; public enum BloomFilterType { @@ -94,7 +94,7 @@ Optional getBloomFilter() private PrimitiveValueWriter createValuesWriter() { - TrinoValuesWriterFactory valuesWriterFactory = new TrinoValuesWriterFactory(1024 * 1024, dictionaryPageSize); + TrinoValuesWriterFactory valuesWriterFactory = new TrinoValuesWriterFactory(1024 * 1024, maxDictionaryPageSize); ColumnDescriptor columnDescriptor = new ColumnDescriptor(new String[] {"test"}, getParquetType(), 0, 0); return getValueWriter(valuesWriterFactory.newValuesWriter(columnDescriptor, bloomFilterType.getBloomFilter()), getTrinoType(), columnDescriptor.getPrimitiveType(), Optional.empty()); } diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestColumnWriterBenchmark.java b/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestColumnWriterBenchmark.java new file mode 100644 index 0000000000000..e11bc7a7c118f --- /dev/null +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestColumnWriterBenchmark.java @@ -0,0 +1,61 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.parquet.writer; + +import com.google.common.collect.ImmutableList; +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +final class TestColumnWriterBenchmark +{ + @Test + void testLongColumnWriterBenchmark() + throws IOException + { + for (int bitWidth = 1; bitWidth <= 64; bitWidth += 4) { + for (AbstractColumnWriterBenchmark.BloomFilterType bloomFilterType : AbstractColumnWriterBenchmark.BloomFilterType.values()) { + for (int maxDictionaryPageSize : ImmutableList.of(1, 1048576)) { + BenchmarkLongColumnWriter benchmark = new BenchmarkLongColumnWriter(); + benchmark.bitWidth = bitWidth; + benchmark.bloomFilterType = bloomFilterType; + benchmark.maxDictionaryPageSize = maxDictionaryPageSize; + benchmark.setup(); + benchmark.write(); + } + } + } + } + + @Test + void testBinaryColumnWriterBenchmark() + throws IOException + { + for (BenchmarkBinaryColumnWriter.FieldType fieldType : BenchmarkBinaryColumnWriter.FieldType.values()) { + for (BenchmarkBinaryColumnWriter.PositionLength positionLength : BenchmarkBinaryColumnWriter.PositionLength.values()) { + for (AbstractColumnWriterBenchmark.BloomFilterType bloomFilterType : AbstractColumnWriterBenchmark.BloomFilterType.values()) { + for (int maxDictionaryPageSize : ImmutableList.of(1, 1048576)) { + BenchmarkBinaryColumnWriter benchmark = new BenchmarkBinaryColumnWriter(); + benchmark.type = fieldType; + benchmark.positionLength = positionLength; + benchmark.bloomFilterType = bloomFilterType; + benchmark.maxDictionaryPageSize = maxDictionaryPageSize; + benchmark.setup(); + benchmark.write(); + } + } + } + } + } +} diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestParquetWriter.java b/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestParquetWriter.java index 846080c3297ad..c67217385b088 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestParquetWriter.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestParquetWriter.java @@ -69,6 +69,7 @@ import static com.google.common.collect.Iterables.getOnlyElement; import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static io.trino.operator.scalar.CharacterStringCasts.varcharToVarcharSaturatedFloorCast; +import static io.trino.parquet.BloomFilterStore.hasBloomFilter; import static io.trino.parquet.ParquetCompressionUtils.decompress; import static io.trino.parquet.ParquetTestUtils.createParquetReader; import static io.trino.parquet.ParquetTestUtils.createParquetWriter; @@ -354,6 +355,10 @@ public void testDictionaryPageOffset() assertThat(chunkMetaData.getDictionaryPageOffset()).isGreaterThan(0); int dictionaryPageSize = toIntExact(chunkMetaData.getFirstDataPageOffset() - chunkMetaData.getDictionaryPageOffset()); assertThat(dictionaryPageSize).isGreaterThan(0); + assertThat(chunkMetaData.getEncodingStats().hasDictionaryPages()).isTrue(); + assertThat(chunkMetaData.getEncodingStats().hasDictionaryEncodedPages()).isTrue(); + assertThat(chunkMetaData.getEncodingStats().hasNonDictionaryEncodedPages()).isFalse(); + assertThat(hasBloomFilter(chunkMetaData)).isFalse(); // verify reading dictionary page SliceInput inputStream = dataSource.readFully(chunkMetaData.getStartingPos(), dictionaryPageSize).getInput(); @@ -379,9 +384,8 @@ public void testDictionaryPageOffset() public void testWriteBloomFilters(Type type, List data) throws IOException { - String columnName = "column"; - List columnNames = ImmutableList.of(columnName); - List types = ImmutableList.of(type); + List columnNames = ImmutableList.of("columnA", "columnB"); + List types = ImmutableList.of(type, type); ParquetDataSource dataSource = new TestingParquetDataSource( writeParquetFile( ParquetWriterOptions.builder() @@ -397,9 +401,17 @@ public void testWriteBloomFilters(Type type, List data) // Check that bloom filters are right after each other int bloomFilterSize = Integer.highestOneBit(BlockSplitBloomFilter.optimalNumOfBits(BLOOM_FILTER_EXPECTED_ENTRIES, DEFAULT_BLOOM_FILTER_FPP) / 8) << 1; for (BlockMetadata block : parquetMetadata.getBlocks()) { - for (int i = 1; i < block.columns().size(); i++) { - assertThat(block.columns().get(i - 1).getBloomFilterOffset() + bloomFilterSize + 17) // + 17 bytes for Bloom filter metadata - .isEqualTo(block.columns().get(i).getBloomFilterOffset()); + for (int i = 0; i < block.columns().size(); i++) { + ColumnChunkMetadata chunkMetaData = block.columns().get(i); + assertThat(hasBloomFilter(chunkMetaData)).isTrue(); + assertThat(chunkMetaData.getEncodingStats().hasDictionaryPages()).isFalse(); + assertThat(chunkMetaData.getEncodingStats().hasDictionaryEncodedPages()).isFalse(); + assertThat(chunkMetaData.getEncodingStats().hasNonDictionaryEncodedPages()).isTrue(); + + if (i < block.columns().size() - 1) { + assertThat(chunkMetaData.getBloomFilterOffset() + bloomFilterSize + 17) // + 17 bytes for Bloom filter metadata + .isEqualTo(block.columns().get(i + 1).getBloomFilterOffset()); + } } } int rowGroupCount = parquetMetadata.getBlocks().size(); @@ -407,7 +419,7 @@ public void testWriteBloomFilters(Type type, List data) TupleDomain predicate = TupleDomain.withColumnDomains( ImmutableMap.of( - columnName, Domain.singleValue(type, data.get(data.size() / 2)))); + "columnA", Domain.singleValue(type, data.get(data.size() / 2)))); try (ParquetReader reader = createParquetReader(dataSource, parquetMetadata, new ParquetReaderOptions().withBloomFilter(true), newSimpleAggregatedMemoryContext(), types, columnNames, predicate)) { Page page = reader.nextPage(); int rowsRead = 0; @@ -429,6 +441,36 @@ public void testWriteBloomFilters(Type type, List data) } } + @Test + void testBloomFilterWithDictionaryFallback() + throws IOException + { + List columnNames = ImmutableList.of("column"); + List types = ImmutableList.of(BIGINT); + ParquetDataSource dataSource = new TestingParquetDataSource( + writeParquetFile( + ParquetWriterOptions.builder() + .setMaxPageValueCount(200) + .setBloomFilterColumns(ImmutableSet.copyOf(columnNames)) + .build(), + types, + columnNames, + ImmutableList.builder() + .addAll(generateInputPages(types, 10, 10)) + // Max size of dictionary page is 1024 * 1024 + .addAll(generateInputPages(types, 200, shuffle(new Random(42), (1024 * 1025) / Long.BYTES))) + .build()), + new ParquetReaderOptions()); + + ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty()); + BlockMetadata blockMetaData = getOnlyElement(parquetMetadata.getBlocks()); + ColumnChunkMetadata chunkMetaData = getOnlyElement(blockMetaData.columns()); + assertThat(chunkMetaData.getEncodingStats().hasDictionaryPages()).isTrue(); + assertThat(chunkMetaData.getEncodingStats().hasDictionaryEncodedPages()).isTrue(); + assertThat(chunkMetaData.getEncodingStats().hasNonDictionaryEncodedPages()).isTrue(); + assertThat(hasBloomFilter(chunkMetaData)).isTrue(); + } + public static Stream testWriteBloomFiltersParams() { int size = 2000; diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestTrinoValuesWriterFactory.java b/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestTrinoValuesWriterFactory.java index c36e3cd70c362..a91d6922f6847 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestTrinoValuesWriterFactory.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/writer/TestTrinoValuesWriterFactory.java @@ -176,11 +176,11 @@ private void validateFallbackWriter(ValuesWriter writer, Class initialWriterClass, Class fallbackWriterClass) { - validateWriterType(writer, DictionaryFallbackValuesWriter.class); + validateWriterType(writer, BloomFilterValuesWriter.class); - DictionaryFallbackValuesWriter fallbackValuesWriter = (DictionaryFallbackValuesWriter) writer; + BloomFilterValuesWriter bloomFilterValuesWriter = (BloomFilterValuesWriter) writer; + DictionaryFallbackValuesWriter fallbackValuesWriter = (DictionaryFallbackValuesWriter) bloomFilterValuesWriter.getWriter(); validateWriterType(fallbackValuesWriter.getInitialWriter(), initialWriterClass); - BloomFilterValuesWriter bloomFilterValuesWriter = (BloomFilterValuesWriter) fallbackValuesWriter.getFallBackWriter(); - validateWriterType(bloomFilterValuesWriter.getWriter(), fallbackWriterClass); + validateWriterType(fallbackValuesWriter.getFallBackWriter(), fallbackWriterClass); } } diff --git a/testing/trino-testing/src/main/java/io/trino/testing/BaseTestParquetWithBloomFilters.java b/testing/trino-testing/src/main/java/io/trino/testing/BaseTestParquetWithBloomFilters.java index 8f612293c644d..e4dbde6f61679 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/BaseTestParquetWithBloomFilters.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/BaseTestParquetWithBloomFilters.java @@ -13,6 +13,7 @@ */ package io.trino.testing; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import io.trino.Session; import io.trino.spi.connector.CatalogSchemaTableName; @@ -51,6 +52,12 @@ public void testBloomFilterRowGroupPruning() testBloomFilterRowGroupPruning(tableName, COLUMN_NAME); } + @Test + void testBloomFilterColumnWithDictionaryPage() + { + createParquetTableWithBloomFilter(COLUMN_NAME, ImmutableList.of(1, 1)); + } + protected void testBloomFilterRowGroupPruning(CatalogSchemaTableName tableName, String columnName) { // assert table is populated with data